## Tests for datacenter ## ## XXX: Most tests for datacenter are still in doctest datacenter.txt ## import os import shutil import tempfile import unittest from zope.component import getUtility, getGlobalSiteManager from zope.interface.verify import verifyObject, verifyClass from waeup.kofa.datacenter import DataCenter from waeup.kofa.interfaces import IDataCenter, IDataCenterConfig class DataCenterLogQueryTests(unittest.TestCase): # Tests for querying logfiles via datacenter. def setUp(self): # create a temporary place to store files self.workdir = tempfile.mkdtemp() self.storage = os.path.join(self.workdir, 'storage') os.mkdir(self.storage) self.logpath = os.path.join(self.storage, 'logs', 'myapp.log') # register a datacenter config that provides the set up location self.config = {'path': self.storage,} self.gsm = getGlobalSiteManager() self.gsm.registerUtility(self.config, IDataCenterConfig) pass def tearDown(self): self.gsm.unregisterUtility(self.config, IDataCenterConfig) shutil.rmtree(self.workdir) return def fill_logfile(self, num=1): # write 100 messages into logfile, with 50 containing 'Msg' path = self.logpath for m in range(num): fp = open(path, 'wb') for n in range(50*m, 50*m+50): fp.write('Msg %d\n' % (n + 1)) fp.write('Other Line %d\n' % (n + 1)) fp.close() path = self.logpath + '.%d' % (m + 1) return def test_util_available(self): # a self-test config = getUtility(IDataCenterConfig) self.assertTrue(config is not None) return def test_query_logfiles(self): # We can find entries in logfiles datacenter = DataCenter() open(self.logpath, 'wb').write('Message 1\n') result = list(datacenter.queryLogfiles('myapp.log', 'Message')) self.assertEqual(result, ['Message 1\n']) return def test_query_logfiles_multi_logs(self): # We can find entries in multiple logfiles (backups) datacenter = DataCenter() open(self.logpath, 'wb').write('Msg 1\n') open(self.logpath + '.2', 'wb').write('Msg 2\n') open(self.logpath + '.10', 'wb').write('Msg 3\n') result = list(datacenter.queryLogfiles('myapp.log', 'Msg')) # entry of logfile .10 comes after entry of logfile .2 self.assertEqual(result, ['Msg 1\n', 'Msg 2\n', 'Msg 3\n']) return def test_query_logfiles_ignores_other_logs(self): # We look only for the basename specified datacenter = DataCenter() open(self.logpath, 'wb').write('Msg 1\n') open(self.logpath + '-not-a-real-log', 'wb').write('Msg 2\n') result = list(datacenter.queryLogfiles('myapp.log', 'Msg')) # Msg 2 won't show up in results. self.assertEqual(result, ['Msg 1\n']) return def test_query_logfiles_not_existant(self): # We make sure only existing logs are searched datacenter = DataCenter() open(self.logpath + '.1', 'wb').write('Msg 1\n') result = list(datacenter.queryLogfiles('myapp.log', 'Msg')) # works, although there is no myapp.log, only myapp.log.1 self.assertEqual(result, ['Msg 1\n']) return def test_query_logfiles_invalid_regexp(self): # Invalid regular expressions give a ValueError datacenter = DataCenter() open(self.logpath, 'wb').write('Msg 1\n') result = datacenter.queryLogfiles('myapp.log', '(a') self.assertRaises(ValueError, list, result) return def test_query_logfiles_batching_limit(self): # we can use `limit` for batching datacenter = DataCenter() self.fill_logfile() result = list(datacenter.queryLogfiles( 'myapp.log', 'Msg', limit=10)) self.assertEqual(len(result), 10) self.assertEqual(result[-1], 'Msg 10\n') return def test_query_logfiles_batching_start(self): # `start` is respected when batching datacenter = DataCenter() self.fill_logfile() result = list(datacenter.queryLogfiles( 'myapp.log', 'Msg', start=25)) self.assertEqual(len(result), 25) self.assertEqual(result[0], 'Msg 26\n') return def test_query_logfiles_batching_limit_and_start(self): # we can use `start` and `limit` simultanously datacenter = DataCenter() self.fill_logfile() result = list(datacenter.queryLogfiles( 'myapp.log', 'Msg', start=25, limit=10)) self.assertEqual(len(result), 10) self.assertEqual(result[0], 'Msg 26\n') self.assertEqual(result[-1], 'Msg 35\n') return def test_query_logfiles_batching_edge_cases(self): # we can find last matches if found num < limit. datacenter = DataCenter() self.fill_logfile() result = list(datacenter.queryLogfiles( 'myapp.log', 'Msg', start=45, limit=10)) self.assertEqual(len(result), 5) self.assertEqual(result[0], 'Msg 46\n') self.assertEqual(result[-1], 'Msg 50\n') return def test_query_logfiles_batching_multiple_files(self): # batching works also with multiple log files datacenter = DataCenter() self.fill_logfile(num=2) result = list(datacenter.queryLogfiles( 'myapp.log', 'Msg', start=45, limit=10)) self.assertEqual(len(result), 10) self.assertEqual(result[0], 'Msg 46\n') self.assertEqual(result[-1], 'Msg 55\n') return def test_query_logfiles_regex_match_inner(self): # we also find lines that match at some inner part datacenter = DataCenter() self.fill_logfile() result = list(datacenter.queryLogfiles('myapp.log', 'sg 1\n')) self.assertEqual(len(result), 1) self.assertEqual(result[0], 'Msg 1\n') return class DataCenterTests(unittest.TestCase): # General datacenter tests. def setUp(self): # create a temporary place to store files self.workdir = tempfile.mkdtemp() self.storage = os.path.join(self.workdir, 'storage') os.mkdir(self.storage) self.logpath = os.path.join(self.storage, 'logs', 'myapp.log') # register a datacenter config that provides the set up location self.config = {'path': self.storage,} self.gsm = getGlobalSiteManager() self.gsm.registerUtility(self.config, IDataCenterConfig) pass def tearDown(self): self.gsm.unregisterUtility(self.config, IDataCenterConfig) shutil.rmtree(self.workdir) return def test_iface(self): # we comply with interfaces obj = DataCenter() verifyClass(IDataCenter, DataCenter) verifyObject(IDataCenter, obj) return def test_get_log_files(self): # We can get lists of logfiles available. # By default, backups are skipped. datacenter = DataCenter() logpath2 = self.logpath + '.1' logpath3 = self.logpath + '.2' for path in self.logpath, logpath2, logpath3: open(path, 'wb').write('some contents') result = datacenter.getLogFiles() self.assertEqual(len(result), 1) self.assertEqual(result[0].name, os.path.basename(self.logpath)) return def test_get_log_files_incl_backups(self): # We can get lists of logfiles including backup logs. datacenter = DataCenter() logpath2 = self.logpath + '.1' logpath3 = self.logpath + '.2' for path in self.logpath, logpath2, logpath3: open(path, 'wb').write('some contents') result = datacenter.getLogFiles(exclude_backups=False) self.assertEqual(len(result), 3) names = [x.name for x in result] expected = [os.path.basename(x) for x in [ self.logpath, logpath2, logpath3]] self.assertEqual(names, expected) return