source: main/waeup.kofa/branches/uli-zc-async/src/waeup/kofa/tests/test_datacenter.py @ 10009

Last change on this file since 10009 was 9099, checked in by uli, 12 years ago

Make sure datacenter now fullfill ExportJobContainer? expectations.

File size: 10.7 KB
Line 
1## Tests for datacenter
2##
3## XXX: Most tests for datacenter are still in doctest datacenter.txt
4##
5import os
6import shutil
7import tempfile
8import unittest
9from zope.component import getUtility, getGlobalSiteManager
10from zope.interface.verify import verifyObject, verifyClass
11from waeup.kofa.datacenter import DataCenter
12from waeup.kofa.interfaces import (
13    IDataCenter, IDataCenterConfig, IExportJobContainer)
14
15class DataCenterLogQueryTests(unittest.TestCase):
16    # Tests for querying logfiles via datacenter.
17
18    def setUp(self):
19        # create a temporary place to store files
20        self.workdir = tempfile.mkdtemp()
21        self.storage = os.path.join(self.workdir, 'storage')
22        os.mkdir(self.storage)
23        self.logpath = os.path.join(self.storage, 'logs', 'myapp.log')
24        # register a datacenter config that provides the set up location
25        self.config = {'path': self.storage,}
26        self.gsm = getGlobalSiteManager()
27        self.gsm.registerUtility(self.config, IDataCenterConfig)
28        pass
29
30    def tearDown(self):
31        self.gsm.unregisterUtility(self.config, IDataCenterConfig)
32        shutil.rmtree(self.workdir)
33        return
34
35    def fill_logfile(self, num=1):
36        # write 100 messages into logfile, with 50 containing 'Msg'
37        path = self.logpath
38        for m in range(num-1,-1,-1):
39            # write messages in order: lowest message in oldest file
40            fp = open(path, 'wb')
41            for n in range(50*m, 50*m+50):
42                fp.write('Msg %d\n' % (n + 1))
43                fp.write('Other Line %d\n' % (n + 1))
44            fp.close()
45            path = self.logpath + '.%d' % m
46        return
47
48    def test_util_available(self):
49        # a self-test
50        config = getUtility(IDataCenterConfig)
51        self.assertTrue(config is not None)
52        return
53
54    def test_query_logfiles(self):
55        # We can find entries in logfiles
56        datacenter = DataCenter()
57        open(self.logpath, 'wb').write('Message 1\n')
58        result = list(datacenter.queryLogfiles('myapp.log', 'Message'))
59        self.assertEqual(result, ['Message 1\n'])
60        return
61
62    def test_query_logfiles_multi_logs(self):
63        # We can find entries in multiple logfiles (backups)
64        datacenter = DataCenter()
65        open(self.logpath, 'wb').write('Msg 3\n')
66        open(self.logpath + '.2', 'wb').write('Msg 2\n')
67        open(self.logpath + '.10', 'wb').write('Msg 1\n')
68        result = list(datacenter.queryLogfiles('myapp.log', 'Msg'))
69        # entry of logfile .10 comes after entry of logfile .2
70        self.assertEqual(result, ['Msg 1\n', 'Msg 2\n', 'Msg 3\n'])
71        return
72
73    def test_query_logfiles_ignores_other_logs(self):
74        # We look only for the basename specified
75        datacenter = DataCenter()
76        open(self.logpath, 'wb').write('Msg 1\n')
77        open(self.logpath + '-not-a-real-log', 'wb').write('Msg 2\n')
78        open(self.logpath + '-not-a-real-log.1', 'wb').write('Msg 3\n')
79        result = list(datacenter.queryLogfiles('myapp.log', 'Msg'))
80        # Msg 2 and 3 won't show up in results.
81        self.assertEqual(result, ['Msg 1\n'])
82        return
83
84    def test_query_logfiles_not_existant(self):
85        # We make sure only existing logs are searched
86        datacenter = DataCenter()
87        open(self.logpath + '.1', 'wb').write('Msg 1\n')
88        result = list(datacenter.queryLogfiles('myapp.log', 'Msg'))
89        # works, although there is no myapp.log, only myapp.log.1
90        self.assertEqual(result, ['Msg 1\n'])
91        return
92
93    def test_query_logfiles_invalid_regexp(self):
94        # Invalid regular expressions give a ValueError
95        datacenter = DataCenter()
96        open(self.logpath, 'wb').write('Msg 1\n')
97        result = datacenter.queryLogfiles('myapp.log', '(a')
98        self.assertRaises(ValueError, list, result)
99        return
100
101    def test_query_logfiles_batching_limit(self):
102        # we can use `limit` for batching
103        datacenter = DataCenter()
104        self.fill_logfile()
105        result = list(datacenter.queryLogfiles(
106            'myapp.log', 'Msg', limit=10))
107        self.assertEqual(len(result), 10)
108        self.assertEqual(result[-1], 'Msg 10\n')
109        return
110
111    def test_query_logfiles_batching_start(self):
112        # `start` is respected when batching
113        datacenter = DataCenter()
114        self.fill_logfile()
115        result = list(datacenter.queryLogfiles(
116            'myapp.log', 'Msg', start=25))
117        self.assertEqual(len(result), 25)
118        self.assertEqual(result[0], 'Msg 26\n')
119        return
120
121    def test_query_logfiles_batching_limit_and_start(self):
122        # we can use `start` and `limit` simultanously
123        datacenter = DataCenter()
124        self.fill_logfile()
125        result = list(datacenter.queryLogfiles(
126            'myapp.log', 'Msg', start=25, limit=10))
127        self.assertEqual(len(result), 10)
128        self.assertEqual(result[0], 'Msg 26\n')
129        self.assertEqual(result[-1], 'Msg 35\n')
130        return
131
132    def test_query_logfiles_batching_edge_cases(self):
133        # we can find last matches if found num < limit.
134        datacenter = DataCenter()
135        self.fill_logfile()
136        result = list(datacenter.queryLogfiles(
137            'myapp.log', 'Msg', start=45, limit=10))
138        self.assertEqual(len(result), 5)
139        self.assertEqual(result[0], 'Msg 46\n')
140        self.assertEqual(result[-1], 'Msg 50\n')
141        return
142
143    def test_query_logfiles_batching_multiple_files(self):
144        # batching works also with multiple log files
145        datacenter = DataCenter()
146        self.fill_logfile(num=2)
147        result = list(datacenter.queryLogfiles(
148            'myapp.log', 'Msg', start=45, limit=10))
149        self.assertEqual(len(result), 10)
150        self.assertEqual(result[0], 'Msg 46\n')
151        self.assertEqual(result[-1], 'Msg 55\n')
152        return
153
154    def test_query_logfiles_regex_match_inner(self):
155        # we also find lines that match at some inner part
156        datacenter = DataCenter()
157        self.fill_logfile()
158        result = list(datacenter.queryLogfiles('myapp.log', 'sg 1\n'))
159        self.assertEqual(len(result), 1)
160        self.assertEqual(result[0], 'Msg 1\n')
161        return
162
163class DataCenterTests(unittest.TestCase):
164    # General datacenter tests.
165
166    def setUp(self):
167        # create a temporary place to store files
168        self.workdir = tempfile.mkdtemp()
169        self.storage = os.path.join(self.workdir, 'storage')
170        os.mkdir(self.storage)
171        self.logpath = os.path.join(self.storage, 'logs', 'myapp.log')
172        # register a datacenter config that provides the set up location
173        self.config = {'path': self.storage,}
174        self.gsm = getGlobalSiteManager()
175        self.gsm.registerUtility(self.config, IDataCenterConfig)
176        pass
177
178    def tearDown(self):
179        self.gsm.unregisterUtility(self.config, IDataCenterConfig)
180        shutil.rmtree(self.workdir)
181        return
182
183    def test_ifaces(self):
184        # we comply with interfaces
185        obj = DataCenter()
186        verifyClass(IDataCenter, DataCenter)
187        verifyClass(IExportJobContainer, DataCenter)
188        verifyObject(IDataCenter, obj)
189        verifyObject(IExportJobContainer, obj)
190        return
191
192    def test_get_log_files(self):
193        # We can get lists of logfiles available.
194        # By default, backups are skipped.
195        datacenter = DataCenter()
196        logpath2 = self.logpath + '.1'
197        logpath3 = self.logpath + '.2'
198        for path in self.logpath, logpath2, logpath3:
199            open(path, 'wb').write('some contents')
200        result = datacenter.getLogFiles()
201        self.assertEqual(len(result), 1)
202        self.assertEqual(result[0].name, os.path.basename(self.logpath))
203        return
204
205    def test_get_log_files_incl_backups(self):
206        # We can get lists of logfiles including backup logs.
207        datacenter = DataCenter()
208        logpath2 = self.logpath + '.1'
209        logpath3 = self.logpath + '.2'
210        for path in self.logpath, logpath2, logpath3:
211            open(path, 'wb').write('some contents')
212        result = datacenter.getLogFiles(exclude_backups=False)
213        self.assertEqual(len(result), 3)
214        names = [x.name for x in result]
215        expected = [os.path.basename(x) for x in [
216            self.logpath, logpath2, logpath3]]
217        self.assertEqual(names, expected)
218        return
219
220    def test_append_csv_file(self):
221        # we can append CSV files to others
222        datacenter = DataCenter()
223        csv_file1 = os.path.join(self.workdir, 'foo.csv')
224        csv_file2 = os.path.join(self.workdir, 'bar.csv')
225        open(csv_file1, 'wb').write('name,age\nBarney,28')
226        open(csv_file2, 'wb').write('name,age\nManfred,28')
227        datacenter._appendCSVFile(csv_file2, csv_file1)
228        result = open(csv_file1, 'rb').read()
229        self.assertEqual(result, 'age,name\r\n28,Barney\r\n28,Manfred\r\n')
230        # The source is deleted afterwards
231        self.assertEqual(os.path.exists(csv_file2), False)
232        return
233
234    def test_append_csv_file_no_dest(self):
235        # a non-existing dest CSV file will result in a simple move
236        datacenter = DataCenter()
237        csv_file1 = os.path.join(self.workdir, 'foo.csv')
238        csv_file2 = os.path.join(self.workdir, 'bar.csv')
239        # csv_file1 does not exist
240        open(csv_file2, 'wb').write('name,age\nManfred,28\n')
241        datacenter._appendCSVFile(csv_file2, csv_file1)
242        result = open(csv_file1, 'rb').read()
243        # raw input, no CSV mangling
244        self.assertEqual(result, 'name,age\nManfred,28\n')
245        # The source is deleted afterwards
246        self.assertEqual(os.path.exists(csv_file2), False)
247        return
248
249    def test_append_csv_file_no_source(self):
250        # a non existing source file will mean no changes at all
251        datacenter = DataCenter()
252        csv_file1 = os.path.join(self.workdir, 'foo.csv')
253        csv_file2 = os.path.join(self.workdir, 'bar.csv')
254        open(csv_file1, 'wb').write('name,age\nManfred,28\n')
255        # csv_file2 does not exist
256        datacenter._appendCSVFile(csv_file2, csv_file1)
257        result = open(csv_file1, 'rb').read()
258        # csv_file1 is the same as before
259        self.assertEqual(result, 'name,age\nManfred,28\n')
260        return
261
262    def test_append_csv_file_same_src_and_dest(self):
263        # if both csv files are the same file, nothing will be changed
264        datacenter = DataCenter()
265        csv_file1 = os.path.join(self.workdir, 'foo.csv')
266        csv_file2 = csv_file1
267        open(csv_file1, 'wb').write('name,age\nManfred,28\n')
268        # csv_file2 does not exist
269        datacenter._appendCSVFile(csv_file2, csv_file1)
270        result = open(csv_file1, 'rb').read()
271        # csv_file1 is the same as before
272        self.assertEqual(result, 'name,age\nManfred,28\n')
273        self.assertEqual(os.path.exists(csv_file2), True)
274        return
Note: See TracBrowser for help on using the repository browser.