source: main/waeup.kofa/branches/uli-zc-async/src/waeup/kofa/tests/test_datacenter.py @ 9023

Last change on this file since 9023 was 8725, checked in by uli, 13 years ago

Search only logfiles with correct basename, fix tests.

File size: 10.5 KB
Line 
1## Tests for datacenter
2##
3## XXX: Most tests for datacenter are still in doctest datacenter.txt
4##
5import os
6import shutil
7import tempfile
8import unittest
9from zope.component import getUtility, getGlobalSiteManager
10from zope.interface.verify import verifyObject, verifyClass
11from waeup.kofa.datacenter import DataCenter
12from waeup.kofa.interfaces import IDataCenter, IDataCenterConfig
13
14class DataCenterLogQueryTests(unittest.TestCase):
15    # Tests for querying logfiles via datacenter.
16
17    def setUp(self):
18        # create a temporary place to store files
19        self.workdir = tempfile.mkdtemp()
20        self.storage = os.path.join(self.workdir, 'storage')
21        os.mkdir(self.storage)
22        self.logpath = os.path.join(self.storage, 'logs', 'myapp.log')
23        # register a datacenter config that provides the set up location
24        self.config = {'path': self.storage,}
25        self.gsm = getGlobalSiteManager()
26        self.gsm.registerUtility(self.config, IDataCenterConfig)
27        pass
28
29    def tearDown(self):
30        self.gsm.unregisterUtility(self.config, IDataCenterConfig)
31        shutil.rmtree(self.workdir)
32        return
33
34    def fill_logfile(self, num=1):
35        # write 100 messages into logfile, with 50 containing 'Msg'
36        path = self.logpath
37        for m in range(num-1,-1,-1):
38            # write messages in order: lowest message in oldest file
39            fp = open(path, 'wb')
40            for n in range(50*m, 50*m+50):
41                fp.write('Msg %d\n' % (n + 1))
42                fp.write('Other Line %d\n' % (n + 1))
43            fp.close()
44            path = self.logpath + '.%d' % m
45        return
46
47    def test_util_available(self):
48        # a self-test
49        config = getUtility(IDataCenterConfig)
50        self.assertTrue(config is not None)
51        return
52
53    def test_query_logfiles(self):
54        # We can find entries in logfiles
55        datacenter = DataCenter()
56        open(self.logpath, 'wb').write('Message 1\n')
57        result = list(datacenter.queryLogfiles('myapp.log', 'Message'))
58        self.assertEqual(result, ['Message 1\n'])
59        return
60
61    def test_query_logfiles_multi_logs(self):
62        # We can find entries in multiple logfiles (backups)
63        datacenter = DataCenter()
64        open(self.logpath, 'wb').write('Msg 3\n')
65        open(self.logpath + '.2', 'wb').write('Msg 2\n')
66        open(self.logpath + '.10', 'wb').write('Msg 1\n')
67        result = list(datacenter.queryLogfiles('myapp.log', 'Msg'))
68        # entry of logfile .10 comes after entry of logfile .2
69        self.assertEqual(result, ['Msg 1\n', 'Msg 2\n', 'Msg 3\n'])
70        return
71
72    def test_query_logfiles_ignores_other_logs(self):
73        # We look only for the basename specified
74        datacenter = DataCenter()
75        open(self.logpath, 'wb').write('Msg 1\n')
76        open(self.logpath + '-not-a-real-log', 'wb').write('Msg 2\n')
77        open(self.logpath + '-not-a-real-log.1', 'wb').write('Msg 3\n')
78        result = list(datacenter.queryLogfiles('myapp.log', 'Msg'))
79        # Msg 2 and 3 won't show up in results.
80        self.assertEqual(result, ['Msg 1\n'])
81        return
82
83    def test_query_logfiles_not_existant(self):
84        # We make sure only existing logs are searched
85        datacenter = DataCenter()
86        open(self.logpath + '.1', 'wb').write('Msg 1\n')
87        result = list(datacenter.queryLogfiles('myapp.log', 'Msg'))
88        # works, although there is no myapp.log, only myapp.log.1
89        self.assertEqual(result, ['Msg 1\n'])
90        return
91
92    def test_query_logfiles_invalid_regexp(self):
93        # Invalid regular expressions give a ValueError
94        datacenter = DataCenter()
95        open(self.logpath, 'wb').write('Msg 1\n')
96        result = datacenter.queryLogfiles('myapp.log', '(a')
97        self.assertRaises(ValueError, list, result)
98        return
99
100    def test_query_logfiles_batching_limit(self):
101        # we can use `limit` for batching
102        datacenter = DataCenter()
103        self.fill_logfile()
104        result = list(datacenter.queryLogfiles(
105            'myapp.log', 'Msg', limit=10))
106        self.assertEqual(len(result), 10)
107        self.assertEqual(result[-1], 'Msg 10\n')
108        return
109
110    def test_query_logfiles_batching_start(self):
111        # `start` is respected when batching
112        datacenter = DataCenter()
113        self.fill_logfile()
114        result = list(datacenter.queryLogfiles(
115            'myapp.log', 'Msg', start=25))
116        self.assertEqual(len(result), 25)
117        self.assertEqual(result[0], 'Msg 26\n')
118        return
119
120    def test_query_logfiles_batching_limit_and_start(self):
121        # we can use `start` and `limit` simultanously
122        datacenter = DataCenter()
123        self.fill_logfile()
124        result = list(datacenter.queryLogfiles(
125            'myapp.log', 'Msg', start=25, limit=10))
126        self.assertEqual(len(result), 10)
127        self.assertEqual(result[0], 'Msg 26\n')
128        self.assertEqual(result[-1], 'Msg 35\n')
129        return
130
131    def test_query_logfiles_batching_edge_cases(self):
132        # we can find last matches if found num < limit.
133        datacenter = DataCenter()
134        self.fill_logfile()
135        result = list(datacenter.queryLogfiles(
136            'myapp.log', 'Msg', start=45, limit=10))
137        self.assertEqual(len(result), 5)
138        self.assertEqual(result[0], 'Msg 46\n')
139        self.assertEqual(result[-1], 'Msg 50\n')
140        return
141
142    def test_query_logfiles_batching_multiple_files(self):
143        # batching works also with multiple log files
144        datacenter = DataCenter()
145        self.fill_logfile(num=2)
146        result = list(datacenter.queryLogfiles(
147            'myapp.log', 'Msg', start=45, limit=10))
148        self.assertEqual(len(result), 10)
149        self.assertEqual(result[0], 'Msg 46\n')
150        self.assertEqual(result[-1], 'Msg 55\n')
151        return
152
153    def test_query_logfiles_regex_match_inner(self):
154        # we also find lines that match at some inner part
155        datacenter = DataCenter()
156        self.fill_logfile()
157        result = list(datacenter.queryLogfiles('myapp.log', 'sg 1\n'))
158        self.assertEqual(len(result), 1)
159        self.assertEqual(result[0], 'Msg 1\n')
160        return
161
162class DataCenterTests(unittest.TestCase):
163    # General datacenter tests.
164
165    def setUp(self):
166        # create a temporary place to store files
167        self.workdir = tempfile.mkdtemp()
168        self.storage = os.path.join(self.workdir, 'storage')
169        os.mkdir(self.storage)
170        self.logpath = os.path.join(self.storage, 'logs', 'myapp.log')
171        # register a datacenter config that provides the set up location
172        self.config = {'path': self.storage,}
173        self.gsm = getGlobalSiteManager()
174        self.gsm.registerUtility(self.config, IDataCenterConfig)
175        pass
176
177    def tearDown(self):
178        self.gsm.unregisterUtility(self.config, IDataCenterConfig)
179        shutil.rmtree(self.workdir)
180        return
181
182    def test_iface(self):
183        # we comply with interfaces
184        obj = DataCenter()
185        verifyClass(IDataCenter, DataCenter)
186        verifyObject(IDataCenter, obj)
187        return
188
189    def test_get_log_files(self):
190        # We can get lists of logfiles available.
191        # By default, backups are skipped.
192        datacenter = DataCenter()
193        logpath2 = self.logpath + '.1'
194        logpath3 = self.logpath + '.2'
195        for path in self.logpath, logpath2, logpath3:
196            open(path, 'wb').write('some contents')
197        result = datacenter.getLogFiles()
198        self.assertEqual(len(result), 1)
199        self.assertEqual(result[0].name, os.path.basename(self.logpath))
200        return
201
202    def test_get_log_files_incl_backups(self):
203        # We can get lists of logfiles including backup logs.
204        datacenter = DataCenter()
205        logpath2 = self.logpath + '.1'
206        logpath3 = self.logpath + '.2'
207        for path in self.logpath, logpath2, logpath3:
208            open(path, 'wb').write('some contents')
209        result = datacenter.getLogFiles(exclude_backups=False)
210        self.assertEqual(len(result), 3)
211        names = [x.name for x in result]
212        expected = [os.path.basename(x) for x in [
213            self.logpath, logpath2, logpath3]]
214        self.assertEqual(names, expected)
215        return
216
217    def test_append_csv_file(self):
218        # we can append CSV files to others
219        datacenter = DataCenter()
220        csv_file1 = os.path.join(self.workdir, 'foo.csv')
221        csv_file2 = os.path.join(self.workdir, 'bar.csv')
222        open(csv_file1, 'wb').write('name,age\nBarney,28')
223        open(csv_file2, 'wb').write('name,age\nManfred,28')
224        datacenter._appendCSVFile(csv_file2, csv_file1)
225        result = open(csv_file1, 'rb').read()
226        self.assertEqual(result, 'age,name\r\n28,Barney\r\n28,Manfred\r\n')
227        # The source is deleted afterwards
228        self.assertEqual(os.path.exists(csv_file2), False)
229        return
230
231    def test_append_csv_file_no_dest(self):
232        # a non-existing dest CSV file will result in a simple move
233        datacenter = DataCenter()
234        csv_file1 = os.path.join(self.workdir, 'foo.csv')
235        csv_file2 = os.path.join(self.workdir, 'bar.csv')
236        # csv_file1 does not exist
237        open(csv_file2, 'wb').write('name,age\nManfred,28\n')
238        datacenter._appendCSVFile(csv_file2, csv_file1)
239        result = open(csv_file1, 'rb').read()
240        # raw input, no CSV mangling
241        self.assertEqual(result, 'name,age\nManfred,28\n')
242        # The source is deleted afterwards
243        self.assertEqual(os.path.exists(csv_file2), False)
244        return
245
246    def test_append_csv_file_no_source(self):
247        # a non existing source file will mean no changes at all
248        datacenter = DataCenter()
249        csv_file1 = os.path.join(self.workdir, 'foo.csv')
250        csv_file2 = os.path.join(self.workdir, 'bar.csv')
251        open(csv_file1, 'wb').write('name,age\nManfred,28\n')
252        # csv_file2 does not exist
253        datacenter._appendCSVFile(csv_file2, csv_file1)
254        result = open(csv_file1, 'rb').read()
255        # csv_file1 is the same as before
256        self.assertEqual(result, 'name,age\nManfred,28\n')
257        return
258
259    def test_append_csv_file_same_src_and_dest(self):
260        # if both csv files are the same file, nothing will be changed
261        datacenter = DataCenter()
262        csv_file1 = os.path.join(self.workdir, 'foo.csv')
263        csv_file2 = csv_file1
264        open(csv_file1, 'wb').write('name,age\nManfred,28\n')
265        # csv_file2 does not exist
266        datacenter._appendCSVFile(csv_file2, csv_file1)
267        result = open(csv_file1, 'rb').read()
268        # csv_file1 is the same as before
269        self.assertEqual(result, 'name,age\nManfred,28\n')
270        self.assertEqual(os.path.exists(csv_file2), True)
271        return
Note: See TracBrowser for help on using the repository browser.