source: main/waeup.kofa/trunk/src/waeup/kofa/utils/tests/test_logger.py @ 12657

Last change on this file since 12657 was 9988, checked in by uli, 12 years ago

Use WatchedFileHandler? instead of DatedRotatingFileHandler? for logging.

  • Property svn:keywords set to Id
File size: 14.9 KB
Line 
1## $Id: test_logger.py 9988 2013-02-24 13:42:04Z uli $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
18
19# Tests for waeup.kofa.utils.logger
20import logging
21import os
22import shutil
23import tempfile
24from zope.component import queryUtility
25from zope.component.hooks import setSite, clearSite
26from zope.interface.verify import verifyClass, verifyObject
27from waeup.kofa.app import University
28from waeup.kofa.testing import FunctionalLayer, FunctionalTestCase
29
30from waeup.kofa.utils.logger import (
31    Logger, MAX_BYTES, BACKUP_COUNT, ILoggerCollector, LoggerCollector,
32    ILogger)
33
34class FakeComponent(object):
35    # A component that can use the Logger mixin-class
36    def do_something(self):
37        self.logger.info('msg from do_something')
38
39class LoggingComponent(FakeComponent, Logger):
40    # A component that supports logging by mixing in Logger
41    logger_name = 'sample.${sitename}'
42    logger_filename = 'sample.log'
43
44class LoggersTests(FunctionalTestCase):
45
46    layer = FunctionalLayer
47
48    def setUp(self):
49        super(LoggersTests, self).setUp()
50        # Setup a sample site for each test
51        app = University()
52        self.dc_root = tempfile.mkdtemp()
53        self.dc_root2 = None
54        app['datacenter'].setStoragePath(self.dc_root)
55
56        # Prepopulate the ZODB...
57        self.getRootFolder()['app'] = app
58        self.app = self.getRootFolder()['app']
59        self.workdir = tempfile.mkdtemp()
60        return
61
62    def tearDown(self):
63        super(LoggersTests, self).tearDown()
64        shutil.rmtree(self.workdir)
65        shutil.rmtree(self.dc_root)
66        clearSite()
67        return
68
69    def clear_logger(self, logger):
70        del_list = [x for x in logger.handlers]
71        for handler in del_list:
72            logger.removeHandler(handler)
73        return
74
75    def test_iface(self):
76        setSite(self.app)
77        logger = LoggingComponent()
78        verifyClass(ILogger, Logger)
79        verifyObject(ILogger, logger)
80        return
81
82    def test_logger_prop_no_site(self):
83        # If not in a site, we will get a logger without handlers
84        logger = Logger().logger
85        self.assertTrue(isinstance(logger, logging.Logger))
86        self.assertEqual(len(logger.handlers), 0)
87        return
88
89    def test_site_no_site_name(self):
90        # If a site has not name we will get a logger anyway
91        setSite(self.app)
92        del self.app.__name__
93        logger = Logger().logger
94        self.assertTrue(logger is not None)
95        return
96
97    def test_get_logdir(self):
98        # We can get a logdir
99        setSite(self.app)
100        logger = Logger()
101        logdir = logger.logger_get_logdir()
102        expected_logdir = os.path.join(
103            self.app['datacenter'].storage, 'logs')
104        self.assertEqual(logdir, expected_logdir)
105        return
106
107    def test_get_logdir_no_site(self):
108        # We cannot get a logdir without a site
109        logger = Logger()
110        logdir = logger.logger_get_logdir()
111        self.assertTrue(logdir is None)
112        return
113
114    def test_get_logdir_no_datacenter(self):
115        # Having a site, is not enough. We also need a datacenter in it.
116        setSite(self.app)
117        del self.app['datacenter']
118        logger = Logger()
119        logdir = logger.logger_get_logdir()
120        self.assertTrue(logdir is None)
121        return
122
123    def test_get_logdir_create_dir(self):
124        # If the logdirectory does not exist, we will create it
125        setSite(self.app)
126        logger = Logger()
127        expected_logdir = os.path.join(
128            self.app['datacenter'].storage, 'logs')
129        if os.path.exists(expected_logdir):
130            shutil.rmtree(expected_logdir)
131        logdir = logger.logger_get_logdir()
132        exists_after = os.path.exists(expected_logdir)
133        self.assertEqual(logdir, expected_logdir)
134        self.assertTrue(exists_after is True)
135        return
136
137    def test_get_logfile(self):
138        # We can get a logfile
139        setSite(self.app)
140        logger = Logger()
141        expected_filepath = os.path.join(
142            self.app['datacenter'].storage, 'logs', 'main.log')
143        path = logger.logger_get_logfile_path()
144        self.assertEqual(expected_filepath, path)
145        return
146
147    def test_get_logfile_no_dir(self):
148        # We cannot get a logfilepath if there is no dir for it
149        setSite(self.app)
150        logger = Logger()
151        del self.app['datacenter']
152        path = logger.logger_get_logfile_path()
153        self.assertTrue(path is None)
154        return
155
156    def test_setup(self):
157        # We can setup a logger.
158        mylogger = logging.getLogger('test.sample')
159        setSite(self.app)
160        logger = Logger()
161        result = logger.logger_setup(mylogger)
162        self.assertEqual(len(result.handlers), 1)
163        handler = result.handlers[0]
164        if hasattr(handler, 'maxBytes'):
165            self.assertEqual(handler.maxBytes, MAX_BYTES)
166        if hasattr(handler, 'backupCount'):
167            self.assertEqual(handler.backupCount, BACKUP_COUNT)
168        self.assertTrue(result.propagate is False)
169        return
170
171    def test_setup_no_site(self):
172        # Without a site we get no logger from setup
173        mylogger = logging.getLogger('test.sample')
174        logger = Logger()
175        result = logger.logger_setup(mylogger)
176        self.assertTrue(result is None)
177        return
178
179    def test_logfile_change(self):
180        # When the logfile location changes, logger_logfile_changed can react
181        setSite(self.app)
182        logger = Logger()
183        logger.logger.warn('Warning 1')  # Log something to old logfile
184        filename1 = logger.logger.handlers[0].baseFilename
185        content1 = open(filename1, 'r').read()
186        # Now move the logfile
187        self.dc_root2 = tempfile.mkdtemp()
188        self.app['datacenter'].setStoragePath(self.dc_root2)
189        logger.logger_logfile_changed()
190        logger.logger.warn('Warning 2')  # Log something to new logfile
191        filename2 = logger.logger.handlers[0].baseFilename
192        content2 = open(filename2, 'r').read()
193        self.assertTrue('Warning 1' in content1)
194        self.assertTrue('Warning 2' in content2)
195        self.assertTrue('Warning 1' not in content2)
196        self.assertTrue('Warning 2' not in content1)
197        self.assertTrue(filename1 != filename2)
198        shutil.rmtree(self.dc_root2)
199        return
200
201    def test_logger_additional_handlers(self):
202        # When we detect additional handlers in a logger, setup anew.
203        setSite(self.app)
204        logger = Logger()
205        old_py_logger = logger.logger
206        handler = logging.StreamHandler()
207        logger.logger.addHandler(handler)
208        old_handler_num = len(old_py_logger.handlers)
209        new_py_logger = logger.logger  # This should detect new handler
210        new_handler_num = len(new_py_logger.handlers)
211        self.assertEqual(new_handler_num, 1)
212        self.assertTrue(isinstance(
213            new_py_logger.handlers[0],
214            logging.handlers.WatchedFileHandler))
215
216class LoggersFunctionalTests(FunctionalTestCase):
217    # Check loggers with real components using them
218
219    layer = FunctionalLayer
220
221    def setUp(self):
222        super(LoggersFunctionalTests, self).setUp()
223
224        # Setup a sample site for each test
225        app = University()
226        self.dc_root = tempfile.mkdtemp()
227        app['datacenter'].setStoragePath(self.dc_root)
228
229        # Prepopulate the ZODB...
230        self.getRootFolder()['app'] = app
231        self.app = self.getRootFolder()['app']
232        self.workdir = tempfile.mkdtemp()
233
234        self.app['mycomponent'] = LoggingComponent()
235        self.component = self.app['mycomponent']
236        setSite(self.app)
237        return
238
239    def tearDown(self):
240        super(LoggersFunctionalTests, self).tearDown()
241        shutil.rmtree(self.workdir)
242        shutil.rmtree(self.dc_root)
243        clearSite()
244        return
245
246    def test_component_can_get_logger(self):
247        # Components can get a logger
248        logger = self.component.logger
249        self.assertTrue(logger is not None)
250        self.assertEqual(len(logger.handlers), 1)
251        self.assertEqual(logger.name, 'sample.app')
252        return
253
254    def test_component_handler_setup(self):
255        # handlers of components are setup
256        handler = self.component.logger.handlers[0]
257        self.assertTrue(isinstance(
258                handler, logging.handlers.WatchedFileHandler))
259        if hasattr(handler, 'maxBytes'):
260            self.assertEqual(handler.maxBytes, MAX_BYTES)
261        if hasattr(handler, 'backupCount'):
262            self.assertEqual(handler.backupCount, BACKUP_COUNT)
263        self.assertTrue(handler.baseFilename.endswith('logs/sample.log'))
264        return
265
266    def test_component_can_log(self):
267        # a method of some 'loggerized' instance can log like this.
268        self.component.logger.setLevel(logging.INFO)
269        self.component.do_something()
270        handler = logging.getLogger('sample.app').handlers[0]
271        logfile = handler.baseFilename
272        content = open(logfile, 'rb').read()
273        self.assertTrue(content.endswith(' - INFO - system - msg from do_something\n'))
274        return
275
276class LoggerCollectorTests(FunctionalTestCase):
277
278    layer = FunctionalLayer
279
280    def setUp(self):
281        super(LoggerCollectorTests, self).setUp()
282        # Setup a sample site for each test
283        app = University()
284        self.dc_root = tempfile.mkdtemp()
285        app['datacenter'].setStoragePath(self.dc_root)
286
287        # Prepopulate the ZODB...
288        self.getRootFolder()['app'] = app
289        self.app = self.getRootFolder()['app']
290        self.workdir = tempfile.mkdtemp()
291
292        self.app['mycomponent'] = LoggingComponent()
293        self.component = self.app['mycomponent']
294        setSite(self.app)
295
296        self.component = LoggingComponent()
297
298
299        return
300
301    def tearDown(self):
302        super(LoggerCollectorTests, self).tearDown()
303        shutil.rmtree(self.workdir)
304        shutil.rmtree(self.dc_root)
305        clearSite()
306        return
307
308    def test_ifaces(self):
309        collector = LoggerCollector()
310        verifyClass(ILoggerCollector, LoggerCollector)
311        verifyObject(ILoggerCollector, collector)
312        return
313
314    def test_get_utility(self):
315        # We can get a logger collector via utility lookup
316        util1 = queryUtility(ILoggerCollector, default=None)
317        util2 = queryUtility(ILoggerCollector, default=None)
318        self.assertTrue(util1 is util2)
319        self.assertTrue(isinstance(util1, LoggerCollector))
320        return
321
322    def test_get_loggers(self):
323        # We can get loggers from a logger collector
324        collector = LoggerCollector()
325        result0 = collector.getLoggers(None)
326        result1 = collector.getLoggers('not-a-site')
327        result2 = collector.getLoggers(self.app)
328        collector.registerLogger(self.app, self.component)
329        result3 = collector.getLoggers(self.app)
330        self.assertEqual(result0, [])
331        self.assertEqual(result1, [])
332        self.assertEqual(result2, [])
333        self.assertEqual(result3, [self.component])
334        self.assertTrue(result3[0] is self.component)
335        return
336
337    def test_register_logger(self):
338        # We can register loggers by name
339        collector = LoggerCollector()
340        collector.registerLogger(None, self.component) # Should have no effect
341        collector.registerLogger(object(), self.component) # Same here
342        collector.registerLogger(self.app, self.component)
343        self.assertEqual(['app'], collector.keys())
344        return
345
346    def test_register_logger_double(self):
347        # Registering the same component twice gives one entry, not two
348        collector = LoggerCollector()
349        collector.registerLogger(self.app, self.component)
350        collector.registerLogger(self.app, self.component)
351        self.assertEqual(len(collector['app']), 1)
352        return
353
354    def test_unregister_logger(self):
355        # We can also unregister loggers
356        collector = LoggerCollector()
357        collector.registerLogger(self.app, self.component)
358        collector.unregisterLogger(self.app, self.component)
359        result = collector.getLoggers(self.app)
360        self.assertEqual(result, [])
361        return
362
363    def test_unregister_logger_invalid(self):
364        # We can cope with unregistration of non-existing entries.
365        collector = LoggerCollector()
366        collector.registerLogger(self.app, self.component)
367        collector.unregisterLogger(None, self.component)
368        collector.unregisterLogger(self.app, 'nonsense')
369        self.assertEqual(len(collector), 1)
370        return
371
372class LogfileChangeTests(FunctionalTestCase):
373
374    layer = FunctionalLayer
375
376    def setUp(self):
377        super(LogfileChangeTests, self).setUp()
378        # Setup a sample site for each test
379        app = University()
380        self.dc_root = tempfile.mkdtemp()
381        self.dc_root_new = None
382        app['datacenter'].setStoragePath(self.dc_root)
383
384        # Prepopulate the ZODB...
385        self.getRootFolder()['app'] = app
386        self.app = self.getRootFolder()['app']
387        self.workdir = tempfile.mkdtemp()
388
389        self.app['mycomponent'] = LoggingComponent()
390        self.component = self.app['mycomponent']
391        setSite(self.app)
392        self.component = LoggingComponent()
393        return
394
395    def tearDown(self):
396        super(LogfileChangeTests, self).tearDown()
397        shutil.rmtree(self.workdir)
398        shutil.rmtree(self.dc_root)
399        if self.dc_root_new is not None:
400            shutil.rmtree(self.dc_root_new)
401        clearSite()
402        return
403
404    def test_storage_move(self):
405        # When a datacenter storage (its path) changes, we have to
406        # move the logfiles as well.
407        filename1 = self.component.logger.handlers[0].baseFilename
408        self.dc_root_new = tempfile.mkdtemp()
409        self.app['datacenter'].setStoragePath(self.dc_root_new)
410        filename2 = self.component.logger.handlers[0].baseFilename
411        self.assertTrue(filename1 != filename2)
412        self.assertTrue(filename2.startswith(self.dc_root_new))
413        return
414
415    def test_storage_deletion(self):
416        # When a site is deleted, the loggers should go away as well
417        self.dc_root_new = tempfile.mkdtemp()
418        logger1 = self.component.logger.handlers # create entry in collector
419        collector = queryUtility(ILoggerCollector)
420        loggers1 = collector.keys()
421        root = self.app.__parent__
422        del root['app']
423        loggers2 = collector.keys()
424        self.assertEqual(loggers1, ['app'])
425        self.assertEqual(loggers2, [])
426        return
Note: See TracBrowser for help on using the repository browser.