source: main/waeup.kofa/branches/uli-fix-zc-async/src/waeup/kofa/utils/tests/test_logger.py @ 17949

Last change on this file since 17949 was 13776, checked in by Henrik Bettermann, 9 years ago

Do not import deprecated DatedRotatingFileHandler?.

  • Property svn:keywords set to Id
File size: 14.6 KB
Line 
1## $Id: test_logger.py 13776 2016-03-11 11:45:01Z henrik $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
18
19# Tests for waeup.kofa.utils.logger
20import logging
21import os
22import shutil
23import tempfile
24from zope.component import queryUtility
25from zope.component.hooks import setSite, clearSite
26from zope.interface.verify import verifyClass, verifyObject
27from waeup.kofa.app import University
28from waeup.kofa.testing import FunctionalLayer, FunctionalTestCase
29
30from waeup.kofa.utils.logger import (
31    Logger, ILoggerCollector, LoggerCollector,
32    ILogger)
33
34class FakeComponent(object):
35    # A component that can use the Logger mixin-class
36    def do_something(self):
37        self.logger.info('msg from do_something')
38
39class LoggingComponent(FakeComponent, Logger):
40    # A component that supports logging by mixing in Logger
41    logger_name = 'sample.${sitename}'
42    logger_filename = 'sample.log'
43
44class LoggersTests(FunctionalTestCase):
45
46    layer = FunctionalLayer
47
48    def setUp(self):
49        super(LoggersTests, self).setUp()
50        # Setup a sample site for each test
51        app = University()
52        self.dc_root = tempfile.mkdtemp()
53        self.dc_root2 = None
54        app['datacenter'].setStoragePath(self.dc_root)
55
56        # Prepopulate the ZODB...
57        self.getRootFolder()['app'] = app
58        self.app = self.getRootFolder()['app']
59        self.workdir = tempfile.mkdtemp()
60        return
61
62    def tearDown(self):
63        super(LoggersTests, self).tearDown()
64        shutil.rmtree(self.workdir)
65        shutil.rmtree(self.dc_root)
66        clearSite()
67        return
68
69    def clear_logger(self, logger):
70        del_list = [x for x in logger.handlers]
71        for handler in del_list:
72            logger.removeHandler(handler)
73        return
74
75    def test_iface(self):
76        setSite(self.app)
77        logger = LoggingComponent()
78        verifyClass(ILogger, Logger)
79        verifyObject(ILogger, logger)
80        return
81
82    def test_logger_prop_no_site(self):
83        # If not in a site, we will get a logger without handlers
84        logger = Logger().logger
85        self.assertTrue(isinstance(logger, logging.Logger))
86        self.assertEqual(len(logger.handlers), 0)
87        return
88
89    def test_site_no_site_name(self):
90        # If a site has not name we will get a logger anyway
91        setSite(self.app)
92        del self.app.__name__
93        logger = Logger().logger
94        self.assertTrue(logger is not None)
95        return
96
97    def test_get_logdir(self):
98        # We can get a logdir
99        setSite(self.app)
100        logger = Logger()
101        logdir = logger.logger_get_logdir()
102        expected_logdir = os.path.join(
103            self.app['datacenter'].storage, 'logs')
104        self.assertEqual(logdir, expected_logdir)
105        return
106
107    def test_get_logdir_no_site(self):
108        # We cannot get a logdir without a site
109        logger = Logger()
110        logdir = logger.logger_get_logdir()
111        self.assertTrue(logdir is None)
112        return
113
114    def test_get_logdir_no_datacenter(self):
115        # Having a site, is not enough. We also need a datacenter in it.
116        setSite(self.app)
117        del self.app['datacenter']
118        logger = Logger()
119        logdir = logger.logger_get_logdir()
120        self.assertTrue(logdir is None)
121        return
122
123    def test_get_logdir_create_dir(self):
124        # If the logdirectory does not exist, we will create it
125        setSite(self.app)
126        logger = Logger()
127        expected_logdir = os.path.join(
128            self.app['datacenter'].storage, 'logs')
129        if os.path.exists(expected_logdir):
130            shutil.rmtree(expected_logdir)
131        logdir = logger.logger_get_logdir()
132        exists_after = os.path.exists(expected_logdir)
133        self.assertEqual(logdir, expected_logdir)
134        self.assertTrue(exists_after is True)
135        return
136
137    def test_get_logfile(self):
138        # We can get a logfile
139        setSite(self.app)
140        logger = Logger()
141        expected_filepath = os.path.join(
142            self.app['datacenter'].storage, 'logs', 'main.log')
143        path = logger.logger_get_logfile_path()
144        self.assertEqual(expected_filepath, path)
145        return
146
147    def test_get_logfile_no_dir(self):
148        # We cannot get a logfilepath if there is no dir for it
149        setSite(self.app)
150        logger = Logger()
151        del self.app['datacenter']
152        path = logger.logger_get_logfile_path()
153        self.assertTrue(path is None)
154        return
155
156    def test_setup(self):
157        # We can setup a logger.
158        mylogger = logging.getLogger('test.sample')
159        setSite(self.app)
160        logger = Logger()
161        result = logger.logger_setup(mylogger)
162        self.assertEqual(len(result.handlers), 1)
163        handler = result.handlers[0]
164        self.assertTrue(result.propagate is False)
165        return
166
167    def test_setup_no_site(self):
168        # Without a site we get no logger from setup
169        mylogger = logging.getLogger('test.sample')
170        logger = Logger()
171        result = logger.logger_setup(mylogger)
172        self.assertTrue(result is None)
173        return
174
175    def test_logfile_change(self):
176        # When the logfile location changes, logger_logfile_changed can react
177        setSite(self.app)
178        logger = Logger()
179        logger.logger.warn('Warning 1')  # Log something to old logfile
180        filename1 = logger.logger.handlers[0].baseFilename
181        content1 = open(filename1, 'r').read()
182        # Now move the logfile
183        self.dc_root2 = tempfile.mkdtemp()
184        self.app['datacenter'].setStoragePath(self.dc_root2)
185        logger.logger_logfile_changed()
186        logger.logger.warn('Warning 2')  # Log something to new logfile
187        filename2 = logger.logger.handlers[0].baseFilename
188        content2 = open(filename2, 'r').read()
189        self.assertTrue('Warning 1' in content1)
190        self.assertTrue('Warning 2' in content2)
191        self.assertTrue('Warning 1' not in content2)
192        self.assertTrue('Warning 2' not in content1)
193        self.assertTrue(filename1 != filename2)
194        shutil.rmtree(self.dc_root2)
195        return
196
197    def test_logger_additional_handlers(self):
198        # When we detect additional handlers in a logger, setup anew.
199        setSite(self.app)
200        logger = Logger()
201        old_py_logger = logger.logger
202        handler = logging.StreamHandler()
203        logger.logger.addHandler(handler)
204        old_handler_num = len(old_py_logger.handlers)
205        new_py_logger = logger.logger  # This should detect new handler
206        new_handler_num = len(new_py_logger.handlers)
207        self.assertEqual(new_handler_num, 1)
208        self.assertTrue(isinstance(
209            new_py_logger.handlers[0],
210            logging.handlers.WatchedFileHandler))
211
212class LoggersFunctionalTests(FunctionalTestCase):
213    # Check loggers with real components using them
214
215    layer = FunctionalLayer
216
217    def setUp(self):
218        super(LoggersFunctionalTests, self).setUp()
219
220        # Setup a sample site for each test
221        app = University()
222        self.dc_root = tempfile.mkdtemp()
223        app['datacenter'].setStoragePath(self.dc_root)
224
225        # Prepopulate the ZODB...
226        self.getRootFolder()['app'] = app
227        self.app = self.getRootFolder()['app']
228        self.workdir = tempfile.mkdtemp()
229
230        self.app['mycomponent'] = LoggingComponent()
231        self.component = self.app['mycomponent']
232        setSite(self.app)
233        return
234
235    def tearDown(self):
236        super(LoggersFunctionalTests, self).tearDown()
237        shutil.rmtree(self.workdir)
238        shutil.rmtree(self.dc_root)
239        clearSite()
240        return
241
242    def test_component_can_get_logger(self):
243        # Components can get a logger
244        logger = self.component.logger
245        self.assertTrue(logger is not None)
246        self.assertEqual(len(logger.handlers), 1)
247        self.assertEqual(logger.name, 'sample.app')
248        return
249
250    def test_component_handler_setup(self):
251        # handlers of components are setup
252        handler = self.component.logger.handlers[0]
253        self.assertTrue(isinstance(
254                handler, logging.handlers.WatchedFileHandler))
255        if hasattr(handler, 'maxBytes'):
256            self.assertEqual(handler.maxBytes, MAX_BYTES)
257        if hasattr(handler, 'backupCount'):
258            self.assertEqual(handler.backupCount, BACKUP_COUNT)
259        self.assertTrue(handler.baseFilename.endswith('logs/sample.log'))
260        return
261
262    def test_component_can_log(self):
263        # a method of some 'loggerized' instance can log like this.
264        self.component.logger.setLevel(logging.INFO)
265        self.component.do_something()
266        handler = logging.getLogger('sample.app').handlers[0]
267        logfile = handler.baseFilename
268        content = open(logfile, 'rb').read()
269        self.assertTrue(content.endswith(' - INFO - system - msg from do_something\n'))
270        return
271
272class LoggerCollectorTests(FunctionalTestCase):
273
274    layer = FunctionalLayer
275
276    def setUp(self):
277        super(LoggerCollectorTests, self).setUp()
278        # Setup a sample site for each test
279        app = University()
280        self.dc_root = tempfile.mkdtemp()
281        app['datacenter'].setStoragePath(self.dc_root)
282
283        # Prepopulate the ZODB...
284        self.getRootFolder()['app'] = app
285        self.app = self.getRootFolder()['app']
286        self.workdir = tempfile.mkdtemp()
287
288        self.app['mycomponent'] = LoggingComponent()
289        self.component = self.app['mycomponent']
290        setSite(self.app)
291
292        self.component = LoggingComponent()
293
294
295        return
296
297    def tearDown(self):
298        super(LoggerCollectorTests, self).tearDown()
299        shutil.rmtree(self.workdir)
300        shutil.rmtree(self.dc_root)
301        clearSite()
302        return
303
304    def test_ifaces(self):
305        collector = LoggerCollector()
306        verifyClass(ILoggerCollector, LoggerCollector)
307        verifyObject(ILoggerCollector, collector)
308        return
309
310    def test_get_utility(self):
311        # We can get a logger collector via utility lookup
312        util1 = queryUtility(ILoggerCollector, default=None)
313        util2 = queryUtility(ILoggerCollector, default=None)
314        self.assertTrue(util1 is util2)
315        self.assertTrue(isinstance(util1, LoggerCollector))
316        return
317
318    def test_get_loggers(self):
319        # We can get loggers from a logger collector
320        collector = LoggerCollector()
321        result0 = collector.getLoggers(None)
322        result1 = collector.getLoggers('not-a-site')
323        result2 = collector.getLoggers(self.app)
324        collector.registerLogger(self.app, self.component)
325        result3 = collector.getLoggers(self.app)
326        self.assertEqual(result0, [])
327        self.assertEqual(result1, [])
328        self.assertEqual(result2, [])
329        self.assertEqual(result3, [self.component])
330        self.assertTrue(result3[0] is self.component)
331        return
332
333    def test_register_logger(self):
334        # We can register loggers by name
335        collector = LoggerCollector()
336        collector.registerLogger(None, self.component) # Should have no effect
337        collector.registerLogger(object(), self.component) # Same here
338        collector.registerLogger(self.app, self.component)
339        self.assertEqual(['app'], collector.keys())
340        return
341
342    def test_register_logger_double(self):
343        # Registering the same component twice gives one entry, not two
344        collector = LoggerCollector()
345        collector.registerLogger(self.app, self.component)
346        collector.registerLogger(self.app, self.component)
347        self.assertEqual(len(collector['app']), 1)
348        return
349
350    def test_unregister_logger(self):
351        # We can also unregister loggers
352        collector = LoggerCollector()
353        collector.registerLogger(self.app, self.component)
354        collector.unregisterLogger(self.app, self.component)
355        result = collector.getLoggers(self.app)
356        self.assertEqual(result, [])
357        return
358
359    def test_unregister_logger_invalid(self):
360        # We can cope with unregistration of non-existing entries.
361        collector = LoggerCollector()
362        collector.registerLogger(self.app, self.component)
363        collector.unregisterLogger(None, self.component)
364        collector.unregisterLogger(self.app, 'nonsense')
365        self.assertEqual(len(collector), 1)
366        return
367
368class LogfileChangeTests(FunctionalTestCase):
369
370    layer = FunctionalLayer
371
372    def setUp(self):
373        super(LogfileChangeTests, self).setUp()
374        # Setup a sample site for each test
375        app = University()
376        self.dc_root = tempfile.mkdtemp()
377        self.dc_root_new = None
378        app['datacenter'].setStoragePath(self.dc_root)
379
380        # Prepopulate the ZODB...
381        self.getRootFolder()['app'] = app
382        self.app = self.getRootFolder()['app']
383        self.workdir = tempfile.mkdtemp()
384
385        self.app['mycomponent'] = LoggingComponent()
386        self.component = self.app['mycomponent']
387        setSite(self.app)
388        self.component = LoggingComponent()
389        return
390
391    def tearDown(self):
392        super(LogfileChangeTests, self).tearDown()
393        shutil.rmtree(self.workdir)
394        shutil.rmtree(self.dc_root)
395        if self.dc_root_new is not None:
396            shutil.rmtree(self.dc_root_new)
397        clearSite()
398        return
399
400    def test_storage_move(self):
401        # When a datacenter storage (its path) changes, we have to
402        # move the logfiles as well.
403        filename1 = self.component.logger.handlers[0].baseFilename
404        self.dc_root_new = tempfile.mkdtemp()
405        self.app['datacenter'].setStoragePath(self.dc_root_new)
406        filename2 = self.component.logger.handlers[0].baseFilename
407        self.assertTrue(filename1 != filename2)
408        self.assertTrue(filename2.startswith(self.dc_root_new))
409        return
410
411    def test_storage_deletion(self):
412        # When a site is deleted, the loggers should go away as well
413        self.dc_root_new = tempfile.mkdtemp()
414        logger1 = self.component.logger.handlers # create entry in collector
415        collector = queryUtility(ILoggerCollector)
416        loggers1 = collector.keys()
417        root = self.app.__parent__
418        del root['app']
419        loggers2 = collector.keys()
420        self.assertEqual(loggers1, ['app'])
421        self.assertEqual(loggers2, [])
422        return
Note: See TracBrowser for help on using the repository browser.