source: main/waeup.kofa/trunk/src/waeup/kofa/tests/test_datacenter.py @ 10451

Last change on this file since 10451 was 10045, checked in by uli, 12 years ago

Make logfile parsing unicode-aware.

File size: 11.1 KB
RevLine 
[10045]1# -*- coding: utf-8 -*-
[8512]2## Tests for datacenter
3##
4## XXX: Most tests for datacenter are still in doctest datacenter.txt
5##
6import os
7import shutil
8import tempfile
9import unittest
10from zope.component import getUtility, getGlobalSiteManager
11from zope.interface.verify import verifyObject, verifyClass
12from waeup.kofa.datacenter import DataCenter
[9217]13from waeup.kofa.interfaces import (
14    IDataCenter, IDataCenterConfig, IExportJobContainer)
[8512]15
[8517]16class DataCenterLogQueryTests(unittest.TestCase):
17    # Tests for querying logfiles via datacenter.
[8512]18
19    def setUp(self):
20        # create a temporary place to store files
21        self.workdir = tempfile.mkdtemp()
22        self.storage = os.path.join(self.workdir, 'storage')
23        os.mkdir(self.storage)
24        self.logpath = os.path.join(self.storage, 'logs', 'myapp.log')
25        # register a datacenter config that provides the set up location
26        self.config = {'path': self.storage,}
27        self.gsm = getGlobalSiteManager()
28        self.gsm.registerUtility(self.config, IDataCenterConfig)
29        pass
30
31    def tearDown(self):
32        self.gsm.unregisterUtility(self.config, IDataCenterConfig)
33        shutil.rmtree(self.workdir)
34        return
35
36    def fill_logfile(self, num=1):
37        # write 100 messages into logfile, with 50 containing 'Msg'
38        path = self.logpath
[8592]39        for m in range(num-1,-1,-1):
[8725]40            # write messages in order: lowest message in oldest file
[8512]41            fp = open(path, 'wb')
42            for n in range(50*m, 50*m+50):
43                fp.write('Msg %d\n' % (n + 1))
44                fp.write('Other Line %d\n' % (n + 1))
[10045]45            fp.write('A Message with Ümläüt')
[8512]46            fp.close()
[8725]47            path = self.logpath + '.%d' % m
[8512]48        return
49
50    def test_util_available(self):
51        # a self-test
52        config = getUtility(IDataCenterConfig)
53        self.assertTrue(config is not None)
54        return
55
56    def test_query_logfiles(self):
57        # We can find entries in logfiles
58        datacenter = DataCenter()
59        open(self.logpath, 'wb').write('Message 1\n')
60        result = list(datacenter.queryLogfiles('myapp.log', 'Message'))
[10045]61        self.assertEqual(result, [u'Message 1\n'])
[8512]62        return
63
64    def test_query_logfiles_multi_logs(self):
65        # We can find entries in multiple logfiles (backups)
66        datacenter = DataCenter()
[8592]67        open(self.logpath, 'wb').write('Msg 3\n')
[8512]68        open(self.logpath + '.2', 'wb').write('Msg 2\n')
[8592]69        open(self.logpath + '.10', 'wb').write('Msg 1\n')
[8512]70        result = list(datacenter.queryLogfiles('myapp.log', 'Msg'))
71        # entry of logfile .10 comes after entry of logfile .2
[10045]72        self.assertEqual(result, [u'Msg 1\n', u'Msg 2\n', u'Msg 3\n'])
[8512]73        return
74
75    def test_query_logfiles_ignores_other_logs(self):
76        # We look only for the basename specified
77        datacenter = DataCenter()
78        open(self.logpath, 'wb').write('Msg 1\n')
79        open(self.logpath + '-not-a-real-log', 'wb').write('Msg 2\n')
[8725]80        open(self.logpath + '-not-a-real-log.1', 'wb').write('Msg 3\n')
[8512]81        result = list(datacenter.queryLogfiles('myapp.log', 'Msg'))
[8725]82        # Msg 2 and 3 won't show up in results.
[10045]83        self.assertEqual(result, [u'Msg 1\n'])
[8512]84        return
85
86    def test_query_logfiles_not_existant(self):
87        # We make sure only existing logs are searched
88        datacenter = DataCenter()
89        open(self.logpath + '.1', 'wb').write('Msg 1\n')
90        result = list(datacenter.queryLogfiles('myapp.log', 'Msg'))
91        # works, although there is no myapp.log, only myapp.log.1
92        self.assertEqual(result, ['Msg 1\n'])
93        return
94
95    def test_query_logfiles_invalid_regexp(self):
96        # Invalid regular expressions give a ValueError
97        datacenter = DataCenter()
98        open(self.logpath, 'wb').write('Msg 1\n')
99        result = datacenter.queryLogfiles('myapp.log', '(a')
100        self.assertRaises(ValueError, list, result)
101        return
102
103    def test_query_logfiles_batching_limit(self):
104        # we can use `limit` for batching
105        datacenter = DataCenter()
106        self.fill_logfile()
107        result = list(datacenter.queryLogfiles(
108            'myapp.log', 'Msg', limit=10))
109        self.assertEqual(len(result), 10)
110        self.assertEqual(result[-1], 'Msg 10\n')
111        return
112
113    def test_query_logfiles_batching_start(self):
114        # `start` is respected when batching
115        datacenter = DataCenter()
116        self.fill_logfile()
117        result = list(datacenter.queryLogfiles(
118            'myapp.log', 'Msg', start=25))
119        self.assertEqual(len(result), 25)
[10045]120        self.assertEqual(result[0], u'Msg 26\n')
[8512]121        return
122
123    def test_query_logfiles_batching_limit_and_start(self):
124        # we can use `start` and `limit` simultanously
125        datacenter = DataCenter()
126        self.fill_logfile()
127        result = list(datacenter.queryLogfiles(
128            'myapp.log', 'Msg', start=25, limit=10))
129        self.assertEqual(len(result), 10)
[10045]130        self.assertEqual(result[0], u'Msg 26\n')
131        self.assertEqual(result[-1], u'Msg 35\n')
[8512]132        return
133
134    def test_query_logfiles_batching_edge_cases(self):
135        # we can find last matches if found num < limit.
136        datacenter = DataCenter()
137        self.fill_logfile()
138        result = list(datacenter.queryLogfiles(
139            'myapp.log', 'Msg', start=45, limit=10))
140        self.assertEqual(len(result), 5)
[10045]141        self.assertEqual(result[0], u'Msg 46\n')
142        self.assertEqual(result[-1], u'Msg 50\n')
[8512]143        return
144
145    def test_query_logfiles_batching_multiple_files(self):
146        # batching works also with multiple log files
147        datacenter = DataCenter()
148        self.fill_logfile(num=2)
149        result = list(datacenter.queryLogfiles(
150            'myapp.log', 'Msg', start=45, limit=10))
151        self.assertEqual(len(result), 10)
[10045]152        self.assertEqual(result[0], u'Msg 46\n')
153        self.assertEqual(result[-1], u'Msg 55\n')
[8512]154        return
[8514]155
156    def test_query_logfiles_regex_match_inner(self):
157        # we also find lines that match at some inner part
158        datacenter = DataCenter()
159        self.fill_logfile()
160        result = list(datacenter.queryLogfiles('myapp.log', 'sg 1\n'))
161        self.assertEqual(len(result), 1)
[10045]162        self.assertEqual(result[0], u'Msg 1\n')
[8514]163        return
[8517]164
[10045]165    def test_query_logfiles_umlauts(self):
166        # we return results as unicode decoded from utf-8
167        datacenter = DataCenter()
168        self.fill_logfile()
169        result = list(datacenter.queryLogfiles(
170            'myapp.log', u'Ümläüt'))
171        self.assertTrue(isinstance(result[0], unicode))
172        self.assertEqual(result, [u'A Message with Ümläüt'])
173        return
174
[8517]175class DataCenterTests(unittest.TestCase):
176    # General datacenter tests.
177
178    def setUp(self):
179        # create a temporary place to store files
180        self.workdir = tempfile.mkdtemp()
181        self.storage = os.path.join(self.workdir, 'storage')
182        os.mkdir(self.storage)
183        self.logpath = os.path.join(self.storage, 'logs', 'myapp.log')
184        # register a datacenter config that provides the set up location
185        self.config = {'path': self.storage,}
186        self.gsm = getGlobalSiteManager()
187        self.gsm.registerUtility(self.config, IDataCenterConfig)
188        pass
189
190    def tearDown(self):
191        self.gsm.unregisterUtility(self.config, IDataCenterConfig)
192        shutil.rmtree(self.workdir)
193        return
194
[9217]195    def test_ifaces(self):
[8517]196        # we comply with interfaces
197        obj = DataCenter()
198        verifyClass(IDataCenter, DataCenter)
[9217]199        verifyClass(IExportJobContainer, DataCenter)
[8517]200        verifyObject(IDataCenter, obj)
[9217]201        verifyObject(IExportJobContainer, obj)
[8517]202        return
203
204    def test_get_log_files(self):
205        # We can get lists of logfiles available.
206        # By default, backups are skipped.
207        datacenter = DataCenter()
208        logpath2 = self.logpath + '.1'
209        logpath3 = self.logpath + '.2'
210        for path in self.logpath, logpath2, logpath3:
211            open(path, 'wb').write('some contents')
212        result = datacenter.getLogFiles()
213        self.assertEqual(len(result), 1)
214        self.assertEqual(result[0].name, os.path.basename(self.logpath))
215        return
216
217    def test_get_log_files_incl_backups(self):
218        # We can get lists of logfiles including backup logs.
219        datacenter = DataCenter()
220        logpath2 = self.logpath + '.1'
221        logpath3 = self.logpath + '.2'
222        for path in self.logpath, logpath2, logpath3:
223            open(path, 'wb').write('some contents')
224        result = datacenter.getLogFiles(exclude_backups=False)
225        self.assertEqual(len(result), 3)
226        names = [x.name for x in result]
227        expected = [os.path.basename(x) for x in [
228            self.logpath, logpath2, logpath3]]
229        self.assertEqual(names, expected)
230        return
[8634]231
232    def test_append_csv_file(self):
233        # we can append CSV files to others
234        datacenter = DataCenter()
235        csv_file1 = os.path.join(self.workdir, 'foo.csv')
236        csv_file2 = os.path.join(self.workdir, 'bar.csv')
237        open(csv_file1, 'wb').write('name,age\nBarney,28')
238        open(csv_file2, 'wb').write('name,age\nManfred,28')
239        datacenter._appendCSVFile(csv_file2, csv_file1)
240        result = open(csv_file1, 'rb').read()
241        self.assertEqual(result, 'age,name\r\n28,Barney\r\n28,Manfred\r\n')
242        # The source is deleted afterwards
243        self.assertEqual(os.path.exists(csv_file2), False)
[8725]244        return
[8634]245
246    def test_append_csv_file_no_dest(self):
247        # a non-existing dest CSV file will result in a simple move
248        datacenter = DataCenter()
249        csv_file1 = os.path.join(self.workdir, 'foo.csv')
250        csv_file2 = os.path.join(self.workdir, 'bar.csv')
251        # csv_file1 does not exist
252        open(csv_file2, 'wb').write('name,age\nManfred,28\n')
253        datacenter._appendCSVFile(csv_file2, csv_file1)
254        result = open(csv_file1, 'rb').read()
255        # raw input, no CSV mangling
256        self.assertEqual(result, 'name,age\nManfred,28\n')
257        # The source is deleted afterwards
258        self.assertEqual(os.path.exists(csv_file2), False)
[8725]259        return
[8634]260
261    def test_append_csv_file_no_source(self):
262        # a non existing source file will mean no changes at all
263        datacenter = DataCenter()
264        csv_file1 = os.path.join(self.workdir, 'foo.csv')
265        csv_file2 = os.path.join(self.workdir, 'bar.csv')
266        open(csv_file1, 'wb').write('name,age\nManfred,28\n')
267        # csv_file2 does not exist
268        datacenter._appendCSVFile(csv_file2, csv_file1)
269        result = open(csv_file1, 'rb').read()
270        # csv_file1 is the same as before
271        self.assertEqual(result, 'name,age\nManfred,28\n')
[8725]272        return
[8634]273
274    def test_append_csv_file_same_src_and_dest(self):
275        # if both csv files are the same file, nothing will be changed
276        datacenter = DataCenter()
277        csv_file1 = os.path.join(self.workdir, 'foo.csv')
278        csv_file2 = csv_file1
279        open(csv_file1, 'wb').write('name,age\nManfred,28\n')
280        # csv_file2 does not exist
281        datacenter._appendCSVFile(csv_file2, csv_file1)
282        result = open(csv_file1, 'rb').read()
283        # csv_file1 is the same as before
284        self.assertEqual(result, 'name,age\nManfred,28\n')
285        self.assertEqual(os.path.exists(csv_file2), True)
[8725]286        return
Note: See TracBrowser for help on using the repository browser.