source: main/waeup.kofa/trunk/src/waeup/kofa/tests/test_datacenter.py @ 8685

Last change on this file since 8685 was 8634, checked in by uli, 13 years ago

Let datacenter use the new CSV file merger from helpers.

CAUTION: this is a bit tested, but not in real use. Try
some local imports before using it in production environment!

File size: 10.3 KB
Line 
1## Tests for datacenter
2##
3## XXX: Most tests for datacenter are still in doctest datacenter.txt
4##
5import os
6import shutil
7import tempfile
8import unittest
9from zope.component import getUtility, getGlobalSiteManager
10from zope.interface.verify import verifyObject, verifyClass
11from waeup.kofa.datacenter import DataCenter
12from waeup.kofa.interfaces import IDataCenter, IDataCenterConfig
13
14
15class DataCenterLogQueryTests(unittest.TestCase):
16    # Tests for querying logfiles via datacenter.
17
18    def setUp(self):
19        # create a temporary place to store files
20        self.workdir = tempfile.mkdtemp()
21        self.storage = os.path.join(self.workdir, 'storage')
22        os.mkdir(self.storage)
23        self.logpath = os.path.join(self.storage, 'logs', 'myapp.log')
24        # register a datacenter config that provides the set up location
25        self.config = {'path': self.storage,}
26        self.gsm = getGlobalSiteManager()
27        self.gsm.registerUtility(self.config, IDataCenterConfig)
28        pass
29
30    def tearDown(self):
31        self.gsm.unregisterUtility(self.config, IDataCenterConfig)
32        shutil.rmtree(self.workdir)
33        return
34
35    def fill_logfile(self, num=1):
36        # write 100 messages into logfile, with 50 containing 'Msg'
37        path = self.logpath
38        for m in range(num-1,-1,-1):
39            fp = open(path, 'wb')
40            for n in range(50*m, 50*m+50):
41                fp.write('Msg %d\n' % (n + 1))
42                fp.write('Other Line %d\n' % (n + 1))
43            fp.close()
44            path = self.logpath + '.%d' % (m + 1)
45        return
46
47    def test_util_available(self):
48        # a self-test
49        config = getUtility(IDataCenterConfig)
50        self.assertTrue(config is not None)
51        return
52
53    def test_query_logfiles(self):
54        # We can find entries in logfiles
55        datacenter = DataCenter()
56        open(self.logpath, 'wb').write('Message 1\n')
57        result = list(datacenter.queryLogfiles('myapp.log', 'Message'))
58        self.assertEqual(result, ['Message 1\n'])
59        return
60
61    def test_query_logfiles_multi_logs(self):
62        # We can find entries in multiple logfiles (backups)
63        datacenter = DataCenter()
64        open(self.logpath, 'wb').write('Msg 3\n')
65        open(self.logpath + '.2', 'wb').write('Msg 2\n')
66        open(self.logpath + '.10', 'wb').write('Msg 1\n')
67        result = list(datacenter.queryLogfiles('myapp.log', 'Msg'))
68        # entry of logfile .10 comes after entry of logfile .2
69        self.assertEqual(result, ['Msg 1\n', 'Msg 2\n', 'Msg 3\n'])
70        return
71
72    def test_query_logfiles_ignores_other_logs(self):
73        # We look only for the basename specified
74        datacenter = DataCenter()
75        open(self.logpath, 'wb').write('Msg 1\n')
76        open(self.logpath + '-not-a-real-log', 'wb').write('Msg 2\n')
77        result = list(datacenter.queryLogfiles('myapp.log', 'Msg'))
78        # Msg 2 won't show up in results.
79        self.assertEqual(result, ['Msg 1\n'])
80        return
81
82    def test_query_logfiles_not_existant(self):
83        # We make sure only existing logs are searched
84        datacenter = DataCenter()
85        open(self.logpath + '.1', 'wb').write('Msg 1\n')
86        result = list(datacenter.queryLogfiles('myapp.log', 'Msg'))
87        # works, although there is no myapp.log, only myapp.log.1
88        self.assertEqual(result, ['Msg 1\n'])
89        return
90
91    def test_query_logfiles_invalid_regexp(self):
92        # Invalid regular expressions give a ValueError
93        datacenter = DataCenter()
94        open(self.logpath, 'wb').write('Msg 1\n')
95        result = datacenter.queryLogfiles('myapp.log', '(a')
96        self.assertRaises(ValueError, list, result)
97        return
98
99    def test_query_logfiles_batching_limit(self):
100        # we can use `limit` for batching
101        datacenter = DataCenter()
102        self.fill_logfile()
103        result = list(datacenter.queryLogfiles(
104            'myapp.log', 'Msg', limit=10))
105        self.assertEqual(len(result), 10)
106        self.assertEqual(result[-1], 'Msg 10\n')
107        return
108
109    def test_query_logfiles_batching_start(self):
110        # `start` is respected when batching
111        datacenter = DataCenter()
112        self.fill_logfile()
113        result = list(datacenter.queryLogfiles(
114            'myapp.log', 'Msg', start=25))
115        self.assertEqual(len(result), 25)
116        self.assertEqual(result[0], 'Msg 26\n')
117        return
118
119    def test_query_logfiles_batching_limit_and_start(self):
120        # we can use `start` and `limit` simultanously
121        datacenter = DataCenter()
122        self.fill_logfile()
123        result = list(datacenter.queryLogfiles(
124            'myapp.log', 'Msg', start=25, limit=10))
125        self.assertEqual(len(result), 10)
126        self.assertEqual(result[0], 'Msg 26\n')
127        self.assertEqual(result[-1], 'Msg 35\n')
128        return
129
130    def test_query_logfiles_batching_edge_cases(self):
131        # we can find last matches if found num < limit.
132        datacenter = DataCenter()
133        self.fill_logfile()
134        result = list(datacenter.queryLogfiles(
135            'myapp.log', 'Msg', start=45, limit=10))
136        self.assertEqual(len(result), 5)
137        self.assertEqual(result[0], 'Msg 46\n')
138        self.assertEqual(result[-1], 'Msg 50\n')
139        return
140
141    def test_query_logfiles_batching_multiple_files(self):
142        # batching works also with multiple log files
143        datacenter = DataCenter()
144        self.fill_logfile(num=2)
145        result = list(datacenter.queryLogfiles(
146            'myapp.log', 'Msg', start=45, limit=10))
147        self.assertEqual(len(result), 10)
148        self.assertEqual(result[0], 'Msg 46\n')
149        self.assertEqual(result[-1], 'Msg 55\n')
150        return
151
152    def test_query_logfiles_regex_match_inner(self):
153        # we also find lines that match at some inner part
154        datacenter = DataCenter()
155        self.fill_logfile()
156        result = list(datacenter.queryLogfiles('myapp.log', 'sg 1\n'))
157        self.assertEqual(len(result), 1)
158        self.assertEqual(result[0], 'Msg 1\n')
159        return
160
161class DataCenterTests(unittest.TestCase):
162    # General datacenter tests.
163
164    def setUp(self):
165        # create a temporary place to store files
166        self.workdir = tempfile.mkdtemp()
167        self.storage = os.path.join(self.workdir, 'storage')
168        os.mkdir(self.storage)
169        self.logpath = os.path.join(self.storage, 'logs', 'myapp.log')
170        # register a datacenter config that provides the set up location
171        self.config = {'path': self.storage,}
172        self.gsm = getGlobalSiteManager()
173        self.gsm.registerUtility(self.config, IDataCenterConfig)
174        pass
175
176    def tearDown(self):
177        self.gsm.unregisterUtility(self.config, IDataCenterConfig)
178        shutil.rmtree(self.workdir)
179        return
180
181    def test_iface(self):
182        # we comply with interfaces
183        obj = DataCenter()
184        verifyClass(IDataCenter, DataCenter)
185        verifyObject(IDataCenter, obj)
186        return
187
188    def test_get_log_files(self):
189        # We can get lists of logfiles available.
190        # By default, backups are skipped.
191        datacenter = DataCenter()
192        logpath2 = self.logpath + '.1'
193        logpath3 = self.logpath + '.2'
194        for path in self.logpath, logpath2, logpath3:
195            open(path, 'wb').write('some contents')
196        result = datacenter.getLogFiles()
197        self.assertEqual(len(result), 1)
198        self.assertEqual(result[0].name, os.path.basename(self.logpath))
199        return
200
201    def test_get_log_files_incl_backups(self):
202        # We can get lists of logfiles including backup logs.
203        datacenter = DataCenter()
204        logpath2 = self.logpath + '.1'
205        logpath3 = self.logpath + '.2'
206        for path in self.logpath, logpath2, logpath3:
207            open(path, 'wb').write('some contents')
208        result = datacenter.getLogFiles(exclude_backups=False)
209        self.assertEqual(len(result), 3)
210        names = [x.name for x in result]
211        expected = [os.path.basename(x) for x in [
212            self.logpath, logpath2, logpath3]]
213        self.assertEqual(names, expected)
214        return
215
216    def test_append_csv_file(self):
217        # we can append CSV files to others
218        datacenter = DataCenter()
219        csv_file1 = os.path.join(self.workdir, 'foo.csv')
220        csv_file2 = os.path.join(self.workdir, 'bar.csv')
221        open(csv_file1, 'wb').write('name,age\nBarney,28')
222        open(csv_file2, 'wb').write('name,age\nManfred,28')
223        datacenter._appendCSVFile(csv_file2, csv_file1)
224        result = open(csv_file1, 'rb').read()
225        self.assertEqual(result, 'age,name\r\n28,Barney\r\n28,Manfred\r\n')
226        # The source is deleted afterwards
227        self.assertEqual(os.path.exists(csv_file2), False)
228
229    def test_append_csv_file_no_dest(self):
230        # a non-existing dest CSV file will result in a simple move
231        datacenter = DataCenter()
232        csv_file1 = os.path.join(self.workdir, 'foo.csv')
233        csv_file2 = os.path.join(self.workdir, 'bar.csv')
234        # csv_file1 does not exist
235        open(csv_file2, 'wb').write('name,age\nManfred,28\n')
236        datacenter._appendCSVFile(csv_file2, csv_file1)
237        result = open(csv_file1, 'rb').read()
238        # raw input, no CSV mangling
239        self.assertEqual(result, 'name,age\nManfred,28\n')
240        # The source is deleted afterwards
241        self.assertEqual(os.path.exists(csv_file2), False)
242
243    def test_append_csv_file_no_source(self):
244        # a non existing source file will mean no changes at all
245        datacenter = DataCenter()
246        csv_file1 = os.path.join(self.workdir, 'foo.csv')
247        csv_file2 = os.path.join(self.workdir, 'bar.csv')
248        open(csv_file1, 'wb').write('name,age\nManfred,28\n')
249        # csv_file2 does not exist
250        datacenter._appendCSVFile(csv_file2, csv_file1)
251        result = open(csv_file1, 'rb').read()
252        # csv_file1 is the same as before
253        self.assertEqual(result, 'name,age\nManfred,28\n')
254
255    def test_append_csv_file_same_src_and_dest(self):
256        # if both csv files are the same file, nothing will be changed
257        datacenter = DataCenter()
258        csv_file1 = os.path.join(self.workdir, 'foo.csv')
259        csv_file2 = csv_file1
260        open(csv_file1, 'wb').write('name,age\nManfred,28\n')
261        # csv_file2 does not exist
262        datacenter._appendCSVFile(csv_file2, csv_file1)
263        result = open(csv_file1, 'rb').read()
264        # csv_file1 is the same as before
265        self.assertEqual(result, 'name,age\nManfred,28\n')
266        self.assertEqual(os.path.exists(csv_file2), True)
Note: See TracBrowser for help on using the repository browser.