source: main/waeup.kofa/trunk/src/waeup/kofa/tests/test_datacenter.py @ 8607

Last change on this file since 8607 was 8592, checked in by Henrik Bettermann, 13 years ago

Search log files in chronological order.

File size: 7.9 KB
RevLine 
[8512]1## Tests for datacenter
2##
3## XXX: Most tests for datacenter are still in doctest datacenter.txt
4##
5import os
6import shutil
7import tempfile
8import unittest
9from zope.component import getUtility, getGlobalSiteManager
10from zope.interface.verify import verifyObject, verifyClass
11from waeup.kofa.datacenter import DataCenter
12from waeup.kofa.interfaces import IDataCenter, IDataCenterConfig
13
14
[8517]15class DataCenterLogQueryTests(unittest.TestCase):
16    # Tests for querying logfiles via datacenter.
[8512]17
18    def setUp(self):
19        # create a temporary place to store files
20        self.workdir = tempfile.mkdtemp()
21        self.storage = os.path.join(self.workdir, 'storage')
22        os.mkdir(self.storage)
23        self.logpath = os.path.join(self.storage, 'logs', 'myapp.log')
24        # register a datacenter config that provides the set up location
25        self.config = {'path': self.storage,}
26        self.gsm = getGlobalSiteManager()
27        self.gsm.registerUtility(self.config, IDataCenterConfig)
28        pass
29
30    def tearDown(self):
31        self.gsm.unregisterUtility(self.config, IDataCenterConfig)
32        shutil.rmtree(self.workdir)
33        return
34
35    def fill_logfile(self, num=1):
36        # write 100 messages into logfile, with 50 containing 'Msg'
37        path = self.logpath
[8592]38        for m in range(num-1,-1,-1):
[8512]39            fp = open(path, 'wb')
40            for n in range(50*m, 50*m+50):
41                fp.write('Msg %d\n' % (n + 1))
42                fp.write('Other Line %d\n' % (n + 1))
43            fp.close()
44            path = self.logpath + '.%d' % (m + 1)
45        return
46
47    def test_util_available(self):
48        # a self-test
49        config = getUtility(IDataCenterConfig)
50        self.assertTrue(config is not None)
51        return
52
53    def test_query_logfiles(self):
54        # We can find entries in logfiles
55        datacenter = DataCenter()
56        open(self.logpath, 'wb').write('Message 1\n')
57        result = list(datacenter.queryLogfiles('myapp.log', 'Message'))
58        self.assertEqual(result, ['Message 1\n'])
59        return
60
61    def test_query_logfiles_multi_logs(self):
62        # We can find entries in multiple logfiles (backups)
63        datacenter = DataCenter()
[8592]64        open(self.logpath, 'wb').write('Msg 3\n')
[8512]65        open(self.logpath + '.2', 'wb').write('Msg 2\n')
[8592]66        open(self.logpath + '.10', 'wb').write('Msg 1\n')
[8512]67        result = list(datacenter.queryLogfiles('myapp.log', 'Msg'))
68        # entry of logfile .10 comes after entry of logfile .2
69        self.assertEqual(result, ['Msg 1\n', 'Msg 2\n', 'Msg 3\n'])
70        return
71
72    def test_query_logfiles_ignores_other_logs(self):
73        # We look only for the basename specified
74        datacenter = DataCenter()
75        open(self.logpath, 'wb').write('Msg 1\n')
76        open(self.logpath + '-not-a-real-log', 'wb').write('Msg 2\n')
77        result = list(datacenter.queryLogfiles('myapp.log', 'Msg'))
78        # Msg 2 won't show up in results.
79        self.assertEqual(result, ['Msg 1\n'])
80        return
81
82    def test_query_logfiles_not_existant(self):
83        # We make sure only existing logs are searched
84        datacenter = DataCenter()
85        open(self.logpath + '.1', 'wb').write('Msg 1\n')
86        result = list(datacenter.queryLogfiles('myapp.log', 'Msg'))
87        # works, although there is no myapp.log, only myapp.log.1
88        self.assertEqual(result, ['Msg 1\n'])
89        return
90
91    def test_query_logfiles_invalid_regexp(self):
92        # Invalid regular expressions give a ValueError
93        datacenter = DataCenter()
94        open(self.logpath, 'wb').write('Msg 1\n')
95        result = datacenter.queryLogfiles('myapp.log', '(a')
96        self.assertRaises(ValueError, list, result)
97        return
98
99    def test_query_logfiles_batching_limit(self):
100        # we can use `limit` for batching
101        datacenter = DataCenter()
102        self.fill_logfile()
103        result = list(datacenter.queryLogfiles(
104            'myapp.log', 'Msg', limit=10))
105        self.assertEqual(len(result), 10)
106        self.assertEqual(result[-1], 'Msg 10\n')
107        return
108
109    def test_query_logfiles_batching_start(self):
110        # `start` is respected when batching
111        datacenter = DataCenter()
112        self.fill_logfile()
113        result = list(datacenter.queryLogfiles(
114            'myapp.log', 'Msg', start=25))
115        self.assertEqual(len(result), 25)
116        self.assertEqual(result[0], 'Msg 26\n')
117        return
118
119    def test_query_logfiles_batching_limit_and_start(self):
120        # we can use `start` and `limit` simultanously
121        datacenter = DataCenter()
122        self.fill_logfile()
123        result = list(datacenter.queryLogfiles(
124            'myapp.log', 'Msg', start=25, limit=10))
125        self.assertEqual(len(result), 10)
126        self.assertEqual(result[0], 'Msg 26\n')
127        self.assertEqual(result[-1], 'Msg 35\n')
128        return
129
130    def test_query_logfiles_batching_edge_cases(self):
131        # we can find last matches if found num < limit.
132        datacenter = DataCenter()
133        self.fill_logfile()
134        result = list(datacenter.queryLogfiles(
135            'myapp.log', 'Msg', start=45, limit=10))
136        self.assertEqual(len(result), 5)
137        self.assertEqual(result[0], 'Msg 46\n')
138        self.assertEqual(result[-1], 'Msg 50\n')
139        return
140
141    def test_query_logfiles_batching_multiple_files(self):
142        # batching works also with multiple log files
143        datacenter = DataCenter()
144        self.fill_logfile(num=2)
145        result = list(datacenter.queryLogfiles(
146            'myapp.log', 'Msg', start=45, limit=10))
147        self.assertEqual(len(result), 10)
148        self.assertEqual(result[0], 'Msg 46\n')
149        self.assertEqual(result[-1], 'Msg 55\n')
150        return
[8514]151
152    def test_query_logfiles_regex_match_inner(self):
153        # we also find lines that match at some inner part
154        datacenter = DataCenter()
155        self.fill_logfile()
156        result = list(datacenter.queryLogfiles('myapp.log', 'sg 1\n'))
157        self.assertEqual(len(result), 1)
158        self.assertEqual(result[0], 'Msg 1\n')
159        return
[8517]160
161class DataCenterTests(unittest.TestCase):
162    # General datacenter tests.
163
164    def setUp(self):
165        # create a temporary place to store files
166        self.workdir = tempfile.mkdtemp()
167        self.storage = os.path.join(self.workdir, 'storage')
168        os.mkdir(self.storage)
169        self.logpath = os.path.join(self.storage, 'logs', 'myapp.log')
170        # register a datacenter config that provides the set up location
171        self.config = {'path': self.storage,}
172        self.gsm = getGlobalSiteManager()
173        self.gsm.registerUtility(self.config, IDataCenterConfig)
174        pass
175
176    def tearDown(self):
177        self.gsm.unregisterUtility(self.config, IDataCenterConfig)
178        shutil.rmtree(self.workdir)
179        return
180
181    def test_iface(self):
182        # we comply with interfaces
183        obj = DataCenter()
184        verifyClass(IDataCenter, DataCenter)
185        verifyObject(IDataCenter, obj)
186        return
187
188    def test_get_log_files(self):
189        # We can get lists of logfiles available.
190        # By default, backups are skipped.
191        datacenter = DataCenter()
192        logpath2 = self.logpath + '.1'
193        logpath3 = self.logpath + '.2'
194        for path in self.logpath, logpath2, logpath3:
195            open(path, 'wb').write('some contents')
196        result = datacenter.getLogFiles()
197        self.assertEqual(len(result), 1)
198        self.assertEqual(result[0].name, os.path.basename(self.logpath))
199        return
200
201    def test_get_log_files_incl_backups(self):
202        # We can get lists of logfiles including backup logs.
203        datacenter = DataCenter()
204        logpath2 = self.logpath + '.1'
205        logpath3 = self.logpath + '.2'
206        for path in self.logpath, logpath2, logpath3:
207            open(path, 'wb').write('some contents')
208        result = datacenter.getLogFiles(exclude_backups=False)
209        self.assertEqual(len(result), 3)
210        names = [x.name for x in result]
211        expected = [os.path.basename(x) for x in [
212            self.logpath, logpath2, logpath3]]
213        self.assertEqual(names, expected)
214        return
Note: See TracBrowser for help on using the repository browser.