source: main/waeup.kofa/trunk/src/waeup/kofa/tests/test_datacenter.py @ 8512

Last change on this file since 8512 was 8512, checked in by uli, 13 years ago

Add support for datacenter to search log files.

File size: 5.7 KB
Line 
1## Tests for datacenter
2##
3## XXX: Most tests for datacenter are still in doctest datacenter.txt
4##
5import os
6import shutil
7import tempfile
8import unittest
9from zope.component import getUtility, getGlobalSiteManager
10from zope.interface.verify import verifyObject, verifyClass
11from waeup.kofa.datacenter import DataCenter
12from waeup.kofa.interfaces import IDataCenter, IDataCenterConfig
13
14
15class DataCenterTests(unittest.TestCase):
16
17    def setUp(self):
18        # create a temporary place to store files
19        self.workdir = tempfile.mkdtemp()
20        self.storage = os.path.join(self.workdir, 'storage')
21        os.mkdir(self.storage)
22        self.logpath = os.path.join(self.storage, 'logs', 'myapp.log')
23        # register a datacenter config that provides the set up location
24        self.config = {'path': self.storage,}
25        self.gsm = getGlobalSiteManager()
26        self.gsm.registerUtility(self.config, IDataCenterConfig)
27        pass
28
29    def tearDown(self):
30        self.gsm.unregisterUtility(self.config, IDataCenterConfig)
31        shutil.rmtree(self.workdir)
32        return
33
34    def fill_logfile(self, num=1):
35        # write 100 messages into logfile, with 50 containing 'Msg'
36        path = self.logpath
37        for m in range(num):
38            fp = open(path, 'wb')
39            for n in range(50*m, 50*m+50):
40                fp.write('Msg %d\n' % (n + 1))
41                fp.write('Other Line %d\n' % (n + 1))
42            fp.close()
43            path = self.logpath + '.%d' % (m + 1)
44        return
45
46    def test_util_available(self):
47        # a self-test
48        config = getUtility(IDataCenterConfig)
49        self.assertTrue(config is not None)
50        return
51
52    def test_iface(self):
53        # we comply with interfaces
54        obj = DataCenter()
55        verifyClass(IDataCenter, DataCenter)
56        verifyObject(IDataCenter, obj)
57        return
58
59    def test_query_logfiles(self):
60        # We can find entries in logfiles
61        datacenter = DataCenter()
62        open(self.logpath, 'wb').write('Message 1\n')
63        result = list(datacenter.queryLogfiles('myapp.log', 'Message'))
64        self.assertEqual(result, ['Message 1\n'])
65        return
66
67    def test_query_logfiles_multi_logs(self):
68        # We can find entries in multiple logfiles (backups)
69        datacenter = DataCenter()
70        open(self.logpath, 'wb').write('Msg 1\n')
71        open(self.logpath + '.2', 'wb').write('Msg 2\n')
72        open(self.logpath + '.10', 'wb').write('Msg 3\n')
73        result = list(datacenter.queryLogfiles('myapp.log', 'Msg'))
74        # entry of logfile .10 comes after entry of logfile .2
75        self.assertEqual(result, ['Msg 1\n', 'Msg 2\n', 'Msg 3\n'])
76        return
77
78    def test_query_logfiles_ignores_other_logs(self):
79        # We look only for the basename specified
80        datacenter = DataCenter()
81        open(self.logpath, 'wb').write('Msg 1\n')
82        open(self.logpath + '-not-a-real-log', 'wb').write('Msg 2\n')
83        result = list(datacenter.queryLogfiles('myapp.log', 'Msg'))
84        # Msg 2 won't show up in results.
85        self.assertEqual(result, ['Msg 1\n'])
86        return
87
88    def test_query_logfiles_not_existant(self):
89        # We make sure only existing logs are searched
90        datacenter = DataCenter()
91        open(self.logpath + '.1', 'wb').write('Msg 1\n')
92        result = list(datacenter.queryLogfiles('myapp.log', 'Msg'))
93        # works, although there is no myapp.log, only myapp.log.1
94        self.assertEqual(result, ['Msg 1\n'])
95        return
96
97    def test_query_logfiles_invalid_regexp(self):
98        # Invalid regular expressions give a ValueError
99        datacenter = DataCenter()
100        open(self.logpath, 'wb').write('Msg 1\n')
101        result = datacenter.queryLogfiles('myapp.log', '(a')
102        self.assertRaises(ValueError, list, result)
103        return
104
105    def test_query_logfiles_batching_limit(self):
106        # we can use `limit` for batching
107        datacenter = DataCenter()
108        self.fill_logfile()
109        result = list(datacenter.queryLogfiles(
110            'myapp.log', 'Msg', limit=10))
111        self.assertEqual(len(result), 10)
112        self.assertEqual(result[-1], 'Msg 10\n')
113        return
114
115    def test_query_logfiles_batching_start(self):
116        # `start` is respected when batching
117        datacenter = DataCenter()
118        self.fill_logfile()
119        result = list(datacenter.queryLogfiles(
120            'myapp.log', 'Msg', start=25))
121        self.assertEqual(len(result), 25)
122        self.assertEqual(result[0], 'Msg 26\n')
123        return
124
125    def test_query_logfiles_batching_limit_and_start(self):
126        # we can use `start` and `limit` simultanously
127        datacenter = DataCenter()
128        self.fill_logfile()
129        result = list(datacenter.queryLogfiles(
130            'myapp.log', 'Msg', start=25, limit=10))
131        self.assertEqual(len(result), 10)
132        self.assertEqual(result[0], 'Msg 26\n')
133        self.assertEqual(result[-1], 'Msg 35\n')
134        return
135
136    def test_query_logfiles_batching_edge_cases(self):
137        # we can find last matches if found num < limit.
138        datacenter = DataCenter()
139        self.fill_logfile()
140        result = list(datacenter.queryLogfiles(
141            'myapp.log', 'Msg', start=45, limit=10))
142        self.assertEqual(len(result), 5)
143        self.assertEqual(result[0], 'Msg 46\n')
144        self.assertEqual(result[-1], 'Msg 50\n')
145        return
146
147    def test_query_logfiles_batching_multiple_files(self):
148        # batching works also with multiple log files
149        datacenter = DataCenter()
150        self.fill_logfile(num=2)
151        result = list(datacenter.queryLogfiles(
152            'myapp.log', 'Msg', start=45, limit=10))
153        self.assertEqual(len(result), 10)
154        self.assertEqual(result[0], 'Msg 46\n')
155        self.assertEqual(result[-1], 'Msg 55\n')
156        return
Note: See TracBrowser for help on using the repository browser.