source: main/waeup.kofa/trunk/src/waeup/kofa/tests/test_datacenter.py @ 8592

Last change on this file since 8592 was 8592, checked in by Henrik Bettermann, 12 years ago

Search log files in chronological order.

File size: 7.9 KB
Line 
1## Tests for datacenter
2##
3## XXX: Most tests for datacenter are still in doctest datacenter.txt
4##
5import os
6import shutil
7import tempfile
8import unittest
9from zope.component import getUtility, getGlobalSiteManager
10from zope.interface.verify import verifyObject, verifyClass
11from waeup.kofa.datacenter import DataCenter
12from waeup.kofa.interfaces import IDataCenter, IDataCenterConfig
13
14
15class DataCenterLogQueryTests(unittest.TestCase):
16    # Tests for querying logfiles via datacenter.
17
18    def setUp(self):
19        # create a temporary place to store files
20        self.workdir = tempfile.mkdtemp()
21        self.storage = os.path.join(self.workdir, 'storage')
22        os.mkdir(self.storage)
23        self.logpath = os.path.join(self.storage, 'logs', 'myapp.log')
24        # register a datacenter config that provides the set up location
25        self.config = {'path': self.storage,}
26        self.gsm = getGlobalSiteManager()
27        self.gsm.registerUtility(self.config, IDataCenterConfig)
28        pass
29
30    def tearDown(self):
31        self.gsm.unregisterUtility(self.config, IDataCenterConfig)
32        shutil.rmtree(self.workdir)
33        return
34
35    def fill_logfile(self, num=1):
36        # write 100 messages into logfile, with 50 containing 'Msg'
37        path = self.logpath
38        for m in range(num-1,-1,-1):
39            fp = open(path, 'wb')
40            for n in range(50*m, 50*m+50):
41                fp.write('Msg %d\n' % (n + 1))
42                fp.write('Other Line %d\n' % (n + 1))
43            fp.close()
44            path = self.logpath + '.%d' % (m + 1)
45        return
46
47    def test_util_available(self):
48        # a self-test
49        config = getUtility(IDataCenterConfig)
50        self.assertTrue(config is not None)
51        return
52
53    def test_query_logfiles(self):
54        # We can find entries in logfiles
55        datacenter = DataCenter()
56        open(self.logpath, 'wb').write('Message 1\n')
57        result = list(datacenter.queryLogfiles('myapp.log', 'Message'))
58        self.assertEqual(result, ['Message 1\n'])
59        return
60
61    def test_query_logfiles_multi_logs(self):
62        # We can find entries in multiple logfiles (backups)
63        datacenter = DataCenter()
64        open(self.logpath, 'wb').write('Msg 3\n')
65        open(self.logpath + '.2', 'wb').write('Msg 2\n')
66        open(self.logpath + '.10', 'wb').write('Msg 1\n')
67        result = list(datacenter.queryLogfiles('myapp.log', 'Msg'))
68        # entry of logfile .10 comes after entry of logfile .2
69        self.assertEqual(result, ['Msg 1\n', 'Msg 2\n', 'Msg 3\n'])
70        return
71
72    def test_query_logfiles_ignores_other_logs(self):
73        # We look only for the basename specified
74        datacenter = DataCenter()
75        open(self.logpath, 'wb').write('Msg 1\n')
76        open(self.logpath + '-not-a-real-log', 'wb').write('Msg 2\n')
77        result = list(datacenter.queryLogfiles('myapp.log', 'Msg'))
78        # Msg 2 won't show up in results.
79        self.assertEqual(result, ['Msg 1\n'])
80        return
81
82    def test_query_logfiles_not_existant(self):
83        # We make sure only existing logs are searched
84        datacenter = DataCenter()
85        open(self.logpath + '.1', 'wb').write('Msg 1\n')
86        result = list(datacenter.queryLogfiles('myapp.log', 'Msg'))
87        # works, although there is no myapp.log, only myapp.log.1
88        self.assertEqual(result, ['Msg 1\n'])
89        return
90
91    def test_query_logfiles_invalid_regexp(self):
92        # Invalid regular expressions give a ValueError
93        datacenter = DataCenter()
94        open(self.logpath, 'wb').write('Msg 1\n')
95        result = datacenter.queryLogfiles('myapp.log', '(a')
96        self.assertRaises(ValueError, list, result)
97        return
98
99    def test_query_logfiles_batching_limit(self):
100        # we can use `limit` for batching
101        datacenter = DataCenter()
102        self.fill_logfile()
103        result = list(datacenter.queryLogfiles(
104            'myapp.log', 'Msg', limit=10))
105        self.assertEqual(len(result), 10)
106        self.assertEqual(result[-1], 'Msg 10\n')
107        return
108
109    def test_query_logfiles_batching_start(self):
110        # `start` is respected when batching
111        datacenter = DataCenter()
112        self.fill_logfile()
113        result = list(datacenter.queryLogfiles(
114            'myapp.log', 'Msg', start=25))
115        self.assertEqual(len(result), 25)
116        self.assertEqual(result[0], 'Msg 26\n')
117        return
118
119    def test_query_logfiles_batching_limit_and_start(self):
120        # we can use `start` and `limit` simultanously
121        datacenter = DataCenter()
122        self.fill_logfile()
123        result = list(datacenter.queryLogfiles(
124            'myapp.log', 'Msg', start=25, limit=10))
125        self.assertEqual(len(result), 10)
126        self.assertEqual(result[0], 'Msg 26\n')
127        self.assertEqual(result[-1], 'Msg 35\n')
128        return
129
130    def test_query_logfiles_batching_edge_cases(self):
131        # we can find last matches if found num < limit.
132        datacenter = DataCenter()
133        self.fill_logfile()
134        result = list(datacenter.queryLogfiles(
135            'myapp.log', 'Msg', start=45, limit=10))
136        self.assertEqual(len(result), 5)
137        self.assertEqual(result[0], 'Msg 46\n')
138        self.assertEqual(result[-1], 'Msg 50\n')
139        return
140
141    def test_query_logfiles_batching_multiple_files(self):
142        # batching works also with multiple log files
143        datacenter = DataCenter()
144        self.fill_logfile(num=2)
145        result = list(datacenter.queryLogfiles(
146            'myapp.log', 'Msg', start=45, limit=10))
147        self.assertEqual(len(result), 10)
148        self.assertEqual(result[0], 'Msg 46\n')
149        self.assertEqual(result[-1], 'Msg 55\n')
150        return
151
152    def test_query_logfiles_regex_match_inner(self):
153        # we also find lines that match at some inner part
154        datacenter = DataCenter()
155        self.fill_logfile()
156        result = list(datacenter.queryLogfiles('myapp.log', 'sg 1\n'))
157        self.assertEqual(len(result), 1)
158        self.assertEqual(result[0], 'Msg 1\n')
159        return
160
161class DataCenterTests(unittest.TestCase):
162    # General datacenter tests.
163
164    def setUp(self):
165        # create a temporary place to store files
166        self.workdir = tempfile.mkdtemp()
167        self.storage = os.path.join(self.workdir, 'storage')
168        os.mkdir(self.storage)
169        self.logpath = os.path.join(self.storage, 'logs', 'myapp.log')
170        # register a datacenter config that provides the set up location
171        self.config = {'path': self.storage,}
172        self.gsm = getGlobalSiteManager()
173        self.gsm.registerUtility(self.config, IDataCenterConfig)
174        pass
175
176    def tearDown(self):
177        self.gsm.unregisterUtility(self.config, IDataCenterConfig)
178        shutil.rmtree(self.workdir)
179        return
180
181    def test_iface(self):
182        # we comply with interfaces
183        obj = DataCenter()
184        verifyClass(IDataCenter, DataCenter)
185        verifyObject(IDataCenter, obj)
186        return
187
188    def test_get_log_files(self):
189        # We can get lists of logfiles available.
190        # By default, backups are skipped.
191        datacenter = DataCenter()
192        logpath2 = self.logpath + '.1'
193        logpath3 = self.logpath + '.2'
194        for path in self.logpath, logpath2, logpath3:
195            open(path, 'wb').write('some contents')
196        result = datacenter.getLogFiles()
197        self.assertEqual(len(result), 1)
198        self.assertEqual(result[0].name, os.path.basename(self.logpath))
199        return
200
201    def test_get_log_files_incl_backups(self):
202        # We can get lists of logfiles including backup logs.
203        datacenter = DataCenter()
204        logpath2 = self.logpath + '.1'
205        logpath3 = self.logpath + '.2'
206        for path in self.logpath, logpath2, logpath3:
207            open(path, 'wb').write('some contents')
208        result = datacenter.getLogFiles(exclude_backups=False)
209        self.assertEqual(len(result), 3)
210        names = [x.name for x in result]
211        expected = [os.path.basename(x) for x in [
212            self.logpath, logpath2, logpath3]]
213        self.assertEqual(names, expected)
214        return
Note: See TracBrowser for help on using the repository browser.