Ignore:
Timestamp:
6 Aug 2011, 11:11:41 (13 years ago)
Author:
uli
Message:

Implement a new logging technique and use it.
Details are explained in waeup.sirp.utils.logger.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • main/waeup.sirp/trunk/src/waeup/sirp/utils/tests/test_logger.py

    r6481 r6578  
    1 ##
    2 ## test_logger.py
    3 ## Login : <uli@pu.smp.net>
    4 ## Started on  Mon Jun 13 14:40:04 2011 Uli Fouquet
    5 ## $Id$
    6 ##
    7 ## Copyright (C) 2011 Uli Fouquet
    8 ## This program is free software; you can redistribute it and/or modify
    9 ## it under the terms of the GNU General Public License as published by
    10 ## the Free Software Foundation; either version 2 of the License, or
    11 ## (at your option) any later version.
    12 ##
    13 ## This program is distributed in the hope that it will be useful,
    14 ## but WITHOUT ANY WARRANTY; without even the implied warranty of
    15 ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    16 ## GNU General Public License for more details.
    17 ##
    18 ## You should have received a copy of the GNU General Public License
    19 ## along with this program; if not, write to the Free Software
    20 ## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
    21 ##
     1# Tests for waeup.sirp.utils.logger
    222import logging
    233import os
    24 import re
    254import shutil
    265import tempfile
    27 from zope.app.testing.functional import FunctionalTestCase
    286from zope.component import queryUtility
    297from zope.component.hooks import setSite, clearSite
    308from zope.interface.verify import verifyClass, verifyObject
    319from waeup.sirp.app import University
    32 from waeup.sirp.interfaces import ISiteLoggers, ILogger
    33 from waeup.sirp.testing import FunctionalLayer, doctestsuite_for_module
     10from waeup.sirp.testing import (
     11    FunctionalLayer, FunctionalTestCase, get_all_loggers,
     12    remove_new_loggers, remove_logger)
     13
    3414from waeup.sirp.utils.logger import (
    35     SiteLoggers, WAeUPLogger, SiteLoggersRepository)
    36 
    37 class SiteLoggersTests(FunctionalTestCase):
     15    Logger, MAX_BYTES, BACKUP_COUNT, ILoggerCollector, LoggerCollector,
     16    ILogger)
     17
     18class FakeComponent(object):
     19    # A component that can use the Logger mixin-class
     20    def do_something(self):
     21        self.logger.info('msg from do_something')
     22
     23class LoggingComponent(FakeComponent, Logger):
     24    # A component that supports logging by mixing in Logger
     25    logger_name = 'sample.${sitename}'
     26    logger_filename = 'sample.log'
     27
     28class LoggersTests(FunctionalTestCase):
    3829
    3930    layer = FunctionalLayer
    4031
    4132    def setUp(self):
    42         super(SiteLoggersTests, self).setUp()
     33        super(LoggersTests, self).setUp()
     34        self.old_loggers = get_all_loggers()
     35        # Setup a sample site for each test
     36        app = University()
     37        self.dc_root = tempfile.mkdtemp()
     38        self.dc_root2 = None
     39        app['datacenter'].setStoragePath(self.dc_root)
     40
     41        # Prepopulate the ZODB...
     42        self.getRootFolder()['app'] = app
     43        self.app = self.getRootFolder()['app']
     44        self.workdir = tempfile.mkdtemp()
     45        return
     46
     47    def tearDown(self):
     48        super(LoggersTests, self).tearDown()
     49        remove_new_loggers(self.old_loggers)
     50        shutil.rmtree(self.workdir)
     51        shutil.rmtree(self.dc_root)
     52        clearSite()
     53        return
     54
     55    def clear_logger(self, logger):
     56        del_list = [x for x in logger.handlers]
     57        for handler in del_list:
     58            logger.removeHandler(handler)
     59        return
     60
     61    def test_iface(self):
     62        setSite(self.app)
     63        logger = LoggingComponent()
     64        verifyClass(ILogger, Logger)
     65        verifyObject(ILogger, logger)
     66        return
     67
     68    def test_logger_prop_no_site(self):
     69        # If not in a site, we will get a logger without handlers
     70        logger = Logger().logger
     71        self.assertTrue(isinstance(logger, logging.Logger))
     72        self.assertEqual(len(logger.handlers), 0)
     73        return
     74
     75    def test_site_no_site_name(self):
     76        # If a site has not name we will get a logger anyway
     77        setSite(self.app)
     78        del self.app.__name__
     79        logger = Logger().logger
     80        self.assertTrue(logger is not None)
     81        return
     82
     83    def test_get_logdir(self):
     84        # We can get a logdir
     85        setSite(self.app)
     86        logger = Logger()
     87        logdir = logger.logger_get_logdir()
     88        expected_logdir = os.path.join(
     89            self.app['datacenter'].storage, 'logs')
     90        self.assertEqual(logdir, expected_logdir)
     91        return
     92
     93    def test_get_logdir_no_site(self):
     94        # We cannot get a logdir without a site
     95        logger = Logger()
     96        logdir = logger.logger_get_logdir()
     97        self.assertTrue(logdir is None)
     98        return
     99
     100    def test_get_logdir_no_datacenter(self):
     101        # Having a site, is not enough. We also need a datacenter in it.
     102        setSite(self.app)
     103        del self.app['datacenter']
     104        logger = Logger()
     105        logdir = logger.logger_get_logdir()
     106        self.assertTrue(logdir is None)
     107        return
     108
     109    def test_get_logdir_create_dir(self):
     110        # If the logdirectory does not exist, we will create it
     111        setSite(self.app)
     112        logger = Logger()
     113        expected_logdir = os.path.join(
     114            self.app['datacenter'].storage, 'logs')
     115        if os.path.exists(expected_logdir):
     116            shutil.rmtree(expected_logdir)
     117        logdir = logger.logger_get_logdir()
     118        exists_after = os.path.exists(expected_logdir)
     119        self.assertEqual(logdir, expected_logdir)
     120        self.assertTrue(exists_after is True)
     121        return
     122
     123    def test_get_logfile(self):
     124        # We can get a logfile
     125        setSite(self.app)
     126        logger = Logger()
     127        expected_filepath = os.path.join(
     128            self.app['datacenter'].storage, 'logs', 'main.log')
     129        path = logger.logger_get_logfile_path()
     130        self.assertEqual(expected_filepath, path)
     131        return
     132
     133    def test_get_logfile_no_dir(self):
     134        # We cannot get a logfilepath if there is no dir for it
     135        setSite(self.app)
     136        logger = Logger()
     137        del self.app['datacenter']
     138        path = logger.logger_get_logfile_path()
     139        self.assertTrue(path is None)
     140        return
     141
     142    def test_setup(self):
     143        # We can setup a logger.
     144        mylogger = logging.getLogger('test.sample')
     145        setSite(self.app)
     146        logger = Logger()
     147        result = logger.logger_setup(mylogger)
     148        self.assertEqual(len(result.handlers), 1)
     149        handler = result.handlers[0]
     150        self.assertEqual(handler.maxBytes, MAX_BYTES)
     151        self.assertEqual(handler.backupCount, BACKUP_COUNT)
     152        self.assertTrue(result.propagate is False)
     153        return
     154
     155    def test_setup_no_site(self):
     156        # Without a site we get no logger from setup
     157        mylogger = logging.getLogger('test.sample')
     158        logger = Logger()
     159        result = logger.logger_setup(mylogger)
     160        self.assertTrue(result is None)
     161        return
     162
     163    def test_logfile_change(self):
     164        # When the logfile location changes, logger_logfile_changed can react
     165        setSite(self.app)
     166        logger = Logger()
     167        logger.logger.warn('Warning 1')  # Log something to old logfile
     168        filename1 = logger.logger.handlers[0].baseFilename
     169        content1 = open(filename1, 'r').read()
     170        # Now move the logfile
     171        self.dc_root2 = tempfile.mkdtemp()
     172        self.app['datacenter'].setStoragePath(self.dc_root2)
     173        logger.logger_logfile_changed()
     174        logger.logger.warn('Warning 2')  # Log something to new logfile
     175        filename2 = logger.logger.handlers[0].baseFilename
     176        content2 = open(filename2, 'r').read()
     177        self.assertTrue('Warning 1' in content1)
     178        self.assertTrue('Warning 2' in content2)
     179        self.assertTrue('Warning 1' not in content2)
     180        self.assertTrue('Warning 2' not in content1)
     181        self.assertTrue(filename1 != filename2)
     182        return
     183
     184
     185class LoggersFunctionalTests(FunctionalTestCase):
     186    # Check loggers with real components using them
     187
     188    layer = FunctionalLayer
     189
     190    def setUp(self):
     191        super(LoggersFunctionalTests, self).setUp()
    43192
    44193        # Setup a sample site for each test
     
    50199        self.getRootFolder()['app'] = app
    51200        self.app = self.getRootFolder()['app']
    52 
    53201        self.workdir = tempfile.mkdtemp()
    54         SiteLoggersRepository.clear()
     202
     203        self.app['mycomponent'] = LoggingComponent()
     204        self.component = self.app['mycomponent']
     205        setSite(self.app)
     206        self.old_loggers = get_all_loggers()
    55207        return
    56208
    57209    def tearDown(self):
    58         super(SiteLoggersTests, self).tearDown()
     210        super(LoggersFunctionalTests, self).tearDown()
     211        remove_new_loggers(self.old_loggers)
    59212        shutil.rmtree(self.workdir)
    60213        shutil.rmtree(self.dc_root)
     
    62215        return
    63216
    64     def test_iface(self):
    65         loggers = SiteLoggers()
    66         verifyClass(ISiteLoggers, SiteLoggers)
    67         verifyObject(ISiteLoggers, loggers)
    68         return
    69 
    70     def test_loggers(self):
    71         loggers = SiteLoggers()
    72         loggers.register('sample1')
    73         loggers.register('sample2')
    74         result = loggers.loggers
    75         assert len(result) == 2
    76 
    77     def test_register(self):
    78         loggers = SiteLoggers()
    79         loggers.register('sample')
    80         assert 'sample' in loggers._loggers.keys()
    81 
    82     def test_register_duplicate(self):
    83         loggers = SiteLoggers()
    84         loggers.register('sample')
    85         self.assertRaises(
    86             KeyError,
    87             loggers.register, 'sample')
    88 
    89     def test_unregister(self):
    90         loggers = SiteLoggers()
    91         loggers.register('sample')
    92         loggers.unregister('sample')
    93         self.assertEqual(loggers._loggers, {})
    94 
    95     def test_utility_no_site(self):
    96         # make sure we get loggers even if no site is defined/set
    97         util = queryUtility(ISiteLoggers)
    98         loggers = util.getLoggers()
    99         assert util is not None
    100         assert loggers is not None
    101         assert None in SiteLoggersRepository.keys()
    102 
    103     def test_utility(self):
    104         # make sute we get loggers for a certain site
    105         setSite(self.app)
    106         util = queryUtility(ISiteLoggers)
    107         loggers = util.getLoggers()
    108         assert util is not None
    109         assert loggers is not None
    110         assert self.app in SiteLoggersRepository.keys()
    111 
    112     def test_utility_multiple(self):
    113         # make sure we get the same loggers for same site
    114         setSite(self.app)
    115         self.assertEqual(SiteLoggersRepository.keys(), [])
    116         util = queryUtility(ISiteLoggers)
    117         loggers1 = util.getLoggers()
    118         loggers2 = util.getLoggers()
    119         assert loggers1 is loggers2
    120 
    121     def test_utility_given_site(self):
    122         # make sure a passed site is respected
    123         util = queryUtility(ISiteLoggers)
    124         loggers = util.getLoggers(site=self.app)
    125         assert loggers is not None
    126         assert self.app in SiteLoggersRepository.keys()
    127 
    128 class LoggerTests(FunctionalTestCase):
     217    def test_component_can_get_logger(self):
     218        # Components can get a logger
     219        logger = self.component.logger
     220        self.assertTrue(logger is not None)
     221        self.assertEqual(len(logger.handlers), 1)
     222        self.assertEqual(logger.name, 'sample.app')
     223        return
     224
     225    def test_component_handler_setup(self):
     226        # handlers of components are setup
     227        handler = self.component.logger.handlers[0]
     228        self.assertTrue(isinstance(
     229                handler, logging.handlers.RotatingFileHandler))
     230        self.assertEqual(handler.maxBytes, MAX_BYTES)
     231        self.assertEqual(handler.backupCount, BACKUP_COUNT)
     232        self.assertTrue(handler.baseFilename.endswith('logs/sample.log'))
     233        return
     234
     235    def test_component_can_log(self):
     236        # a method of some 'loggerized' instance can log like this.
     237        self.component.logger.setLevel(logging.INFO)
     238        self.component.do_something()
     239        handler = logging.getLogger('sample.app').handlers[0]
     240        logfile = handler.baseFilename
     241        content = open(logfile, 'rb').read()
     242        self.assertTrue(content.endswith(' - INFO - msg from do_something\n'))
     243        return
     244
     245class LoggerCollectorTests(FunctionalTestCase):
    129246
    130247    layer = FunctionalLayer
    131248
    132249    def setUp(self):
    133         super(LoggerTests, self).setUp()
    134 
     250        super(LoggerCollectorTests, self).setUp()
     251        self.old_loggers = get_all_loggers()
    135252        # Setup a sample site for each test
    136253        app = University()
     
    141258        self.getRootFolder()['app'] = app
    142259        self.app = self.getRootFolder()['app']
    143 
    144260        self.workdir = tempfile.mkdtemp()
     261
     262        self.app['mycomponent'] = LoggingComponent()
     263        self.component = self.app['mycomponent']
     264        setSite(self.app)
     265
     266        self.component = LoggingComponent()
     267
     268
    145269        return
    146270
    147271    def tearDown(self):
    148         super(LoggerTests, self).tearDown()
     272        super(LoggerCollectorTests, self).tearDown()
     273        remove_new_loggers(self.old_loggers)
    149274        shutil.rmtree(self.workdir)
    150275        shutil.rmtree(self.dc_root)
     
    152277        return
    153278
    154     def test_iface(self):
    155         setSite(self.app)
    156         logger = WAeUPLogger('sample')
    157         verifyClass(ILogger, WAeUPLogger)
    158         verifyObject(ILogger, logger)
    159         return
    160 
    161     def test_logger_attr(self):
    162         setSite(self.app)
    163         logger = WAeUPLogger('sample')
    164         raw_logger = logger.logger
    165         assert isinstance(raw_logger, logging.Logger)
    166 
    167     def test_site_name(self):
    168         setSite(self.app)
    169         logger = WAeUPLogger('waeup.sirp.%s.sample', site=self.app)
    170         self.assertEqual(logger.logger.name, 'waeup.sirp.app.sample')
    171 
    172     def test_site_no_name(self):
    173         # While a site has no name, we get __app as placeholder
    174         setSite(self.app)
    175         self.app.__name__ = None
    176         logger = WAeUPLogger('waeup.sirp.%s.sample', site=self.app)
    177         self.assertEqual(logger.logger.name, 'waeup.sirp.__app.sample')
    178 
    179     def test_site_changing_name(self):
    180         # Changing the sitename (from None to a value) will be reflected
    181         setSite(self.app)
    182         self.app.__name__ = None
    183         logger = WAeUPLogger('waeup.sirp.%s.sample', site=self.app)
    184         self.assertEqual(logger.logger.name, 'waeup.sirp.__app.sample')
    185         self.app.__name__ = 'myapp'
    186         self.assertEqual(logger.logger.name, 'waeup.sirp.myapp.sample')
    187         assert len(logger.logger.handlers) > 0
    188 
    189     def test_loggername_expansion(self):
    190         setSite(self.app)
    191         logger = WAeUPLogger('sample', site=self.app)
    192         logger.logger # Trigger setup
    193         self.assertEqual(logger._loggername, 'waeup.sirp.app.sample')
    194 
    195     def test_filename(self):
    196         setSite(self.app)
    197         logger = WAeUPLogger('sample')
    198         logger.logger # Trigger setup
    199         self.assertEqual(logger.filename, 'sample.log')
    200 
    201     def test_filename_from_dotted_name(self):
    202         setSite(self.app)
    203         logger = WAeUPLogger('waeup.sirp.%s.sample')
    204         logger.logger # Trigger setup
    205         logfile = os.path.join(self.dc_root, 'logs', 'sample.log')
    206         self.assertEqual(logger.filename, 'sample.log')
    207         assert os.path.isfile(logfile)
    208 
    209     def test_option_filename(self):
    210         setSite(self.app)
    211         logger = WAeUPLogger('sample', filename='my.log')
    212         logger.logger # Trigger setup
    213         logfile = os.path.join(self.dc_root, 'logs', 'my.log')
    214         self.assertEqual(logger.filename, 'my.log')
    215         assert os.path.isfile(logfile)
    216 
    217     def test_option_propagate(self):
    218         logger = WAeUPLogger('sample', site=self.app)
    219         assert logger.logger.propagate is False  # default
    220         logger = WAeUPLogger('sample', site=self.app, propagate=True)
    221         assert logger.logger.propagate is True
    222         logger = WAeUPLogger('sample', site=self.app, propagate=False)
    223         assert logger.logger.propagate is False
    224 
    225     def test_logging(self):
    226         logger = WAeUPLogger('sample', site=self.app)
    227         logger.logger.info('Hi there!')
    228         logfile = os.path.join(self.dc_root, 'logs', 'sample.log')
    229         assert os.path.isfile(logfile)
    230         contents = open(logfile, 'rb').read()
    231         self.assertTrue(re.match(
    232                 '^....-..-.. ..:..:..,... - INFO - Hi there!\n$', contents))
     279    def test_ifaces(self):
     280        collector = LoggerCollector()
     281        verifyClass(ILoggerCollector, LoggerCollector)
     282        verifyObject(ILoggerCollector, collector)
     283        return
     284
     285    def test_get_utility(self):
     286        # We can get a logger collector via utility lookup
     287        util1 = queryUtility(ILoggerCollector, default=None)
     288        util2 = queryUtility(ILoggerCollector, default=None)
     289        self.assertTrue(util1 is util2)
     290        self.assertTrue(isinstance(util1, LoggerCollector))
     291        return
     292
     293    def test_get_loggers(self):
     294        # We can get loggers from a logger collector
     295        collector = LoggerCollector()
     296        result0 = collector.getLoggers(None)
     297        result1 = collector.getLoggers('not-a-site')
     298        result2 = collector.getLoggers(self.app)
     299        collector.registerLogger(self.app, self.component)
     300        result3 = collector.getLoggers(self.app)
     301        self.assertEqual(result0, [])
     302        self.assertEqual(result1, [])
     303        self.assertEqual(result2, [])
     304        self.assertEqual(result3, [self.component])
     305        self.assertTrue(result3[0] is self.component)
     306        return
     307
     308    def test_register_logger(self):
     309        # We can register loggers by name
     310        collector = LoggerCollector()
     311        collector.registerLogger(None, self.component) # Should have no effect
     312        collector.registerLogger(object(), self.component) # Same here
     313        collector.registerLogger(self.app, self.component)
     314        self.assertEqual(['app'], collector.keys())
     315        return
     316
     317    def test_register_logger_double(self):
     318        # Registering the same component twice gives one entry, not two
     319        collector = LoggerCollector()
     320        collector.registerLogger(self.app, self.component)
     321        collector.registerLogger(self.app, self.component)
     322        self.assertEqual(len(collector['app']), 1)
     323        return
     324
     325    def test_unregister_logger(self):
     326        # We can also unregister loggers
     327        collector = LoggerCollector()
     328        collector.registerLogger(self.app, self.component)
     329        collector.unregisterLogger(self.app, self.component)
     330        result = collector.getLoggers(self.app)
     331        self.assertEqual(result, [])
     332        return
     333
     334    def test_unregister_logger_invalid(self):
     335        # We can cope with unregistration of non-existing entries.
     336        collector = LoggerCollector()
     337        collector.registerLogger(self.app, self.component)
     338        collector.unregisterLogger(None, self.component)
     339        collector.unregisterLogger(self.app, 'nonsense')
     340        self.assertEqual(len(collector), 1)
     341        return
     342
     343class LogfileChangeTests(FunctionalTestCase):
     344
     345    layer = FunctionalLayer
     346
     347    def setUp(self):
     348        super(LogfileChangeTests, self).setUp()
     349        remove_logger('waeup.sirp.app') # In case it lingers around
     350        self.old_loggers = get_all_loggers()
     351        # Setup a sample site for each test
     352        app = University()
     353        self.dc_root = tempfile.mkdtemp()
     354        self.dc_root_new = None
     355        app['datacenter'].setStoragePath(self.dc_root)
     356
     357        # Prepopulate the ZODB...
     358        self.getRootFolder()['app'] = app
     359        self.app = self.getRootFolder()['app']
     360        self.workdir = tempfile.mkdtemp()
     361
     362        self.app['mycomponent'] = LoggingComponent()
     363        self.component = self.app['mycomponent']
     364        setSite(self.app)
     365
     366        self.component = LoggingComponent()
     367        return
     368
     369    def tearDown(self):
     370        super(LogfileChangeTests, self).tearDown()
     371        remove_new_loggers(self.old_loggers)
     372        shutil.rmtree(self.workdir)
     373        shutil.rmtree(self.dc_root)
     374        if self.dc_root_new is not None:
     375            shutil.rmtree(self.dc_root_new)
     376        clearSite()
     377        return
     378
     379    def test_storage_move(self):
     380        # When a datacenter storage (its path) changes, we have to
     381        # move the logfiles as well.
     382        #return
     383        filename1 = self.component.logger.handlers[0].baseFilename
     384        self.dc_root_new = tempfile.mkdtemp()
     385        self.app['datacenter'].setStoragePath(self.dc_root_new)
     386        filename2 = self.component.logger.handlers[0].baseFilename
     387        self.assertTrue(filename1 != filename2)
     388        self.assertTrue(filename2.startswith(self.dc_root_new))
     389        return
Note: See TracChangeset for help on using the changeset viewer.