source: main/waeup.kofa/trunk/src/waeup/kofa/testing.py @ 16727

Last change on this file since 16727 was 12598, checked in by uli, 10 years ago

flake8.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Id
File size: 15.6 KB
RevLine 
[7193]1## $Id: testing.py 12598 2015-02-11 16:22:44Z uli $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
[7811]18"""Testing support for :mod:`waeup.kofa`.
[5780]19"""
[6238]20import grok
21import doctest
[6577]22import logging
[3521]23import os.path
[6238]24import re
[7581]25import tempfile
26import shutil
[6735]27import unittest
[5796]28import warnings
[5780]29import zope.component
[7811]30import waeup.kofa
[12595]31from contextlib import contextmanager
[9342]32from zc.async.interfaces import COMPLETED
[6238]33from zope.app.testing.functional import (
[6463]34    ZCMLLayer, FunctionalTestSetup, getRootFolder, sync, FunctionalTestCase)
[12595]35from zope.component import getGlobalSiteManager, queryUtility, getUtility
[5865]36from zope.security.testing import addCheckerPublic
[6238]37from zope.testing import renormalizing
[5796]38from zope.testing.cleanup import cleanUp
[12595]39from waeup.kofa.interfaces import IKofaUtils
[3521]40
41ftesting_zcml = os.path.join(
[7811]42    os.path.dirname(waeup.kofa.__file__), 'ftesting.zcml')
[4789]43FunctionalLayer = ZCMLLayer(ftesting_zcml, __name__, 'FunctionalLayer',
44                            allow_teardown=True)
[5139]45
[12598]46
[6577]47def get_all_loggers():
48    """Get the keys of all logger defined globally.
49    """
[6727]50    result = logging.root.manager.loggerDict.keys()
[7811]51    ws_loggers = [x for x in result if 'waeup.kofa' in x]
[6747]52    if ws_loggers:
[7817]53        # For debugging: show any remaining loggers from w.k. namespace
[6747]54        print "\nLOGGERS: ", ws_loggers
[6727]55    return result
[6577]56
[12598]57
[6577]58def remove_new_loggers(old_loggers):
[6658]59    """Remove the loggers except `old_loggers`.
[6577]60
61    `old_loggers` is a list of logger keys as returned by
62    :func:`get_all_loggers`. All globally registered loggers whose
63    name is not in `old_loggers` is removed.
64    """
65    new_loggers = [key for key in logging.root.manager.loggerDict
66                   if key not in old_loggers]
[7827]67    for key in sorted(new_loggers, reverse=True):
[6578]68        logger = logging.getLogger(key)
69        for handler in logger.handlers:
70            handler.close()
71        del logger
[6577]72        del logging.root.manager.loggerDict[key]
73    return
74
[12598]75
[6578]76def remove_logger(name):
77    """Remove logger with name `name`.
78
79    Use is safe. If the logger does not exist nothing happens.
80    """
81    if name in logging.root.manager.loggerDict.keys():
82        del logging.root.manager.loggerDict[name]
83    return
84
85
[5780]86def setUpZope(test=None):
87    """Initialize a Zope-compatible environment.
88
89    Currently, we only initialize the event machinery.
90    """
91    zope.component.eventtesting.setUp(test)
92
[12598]93
[5780]94def cleanUpZope(test=None):
95    """Clean up Zope-related registrations.
96
97    Cleans up all registrations and the like.
98    """
99    cleanUp()
100
[12598]101
[5780]102def maybe_grok():
[7811]103    """Try to grok the :mod:`waeup.kofa` package.
[5780]104
105    For many tests, even simple ones, we want the components defined
[7811]106    somewhere in the :mod:`waeup.kofa` package being registered. While
[5780]107    grokking the complete package can become expensive when done many
108    times, we only want to grok if it did not happen
109    before. Furthermore regrokking the whole package makes no sense if
110    done already.
111
112    :func:`maybe_grok` checks whether any eventhandlers are already
113    registered and does nothing in that case.
114
[7811]115    The grokking of :mod:`waeup.kofa` is done with warnings disabled.
[5780]116
117    Returns ``True`` if grokking was done, ``False`` else.
118
[5796]119    .. The following samples should go into Sphinx docs directly....
[6237]120
[5780]121    Sample
122    ******
[5796]123
124    Usage with plain Python testrunners
125    -----------------------------------
[6237]126
[5780]127    Together with the :func:`setUpZope` and :func:`cleanUpZope`
128    functions we then can do unittests with all components registered
[5796]129    and the event dispatcher in place like this::
[5780]130
131      import unittest2 as unittest # Want Python 2.7 features
[7811]132      from waeup.kofa.testing import (
[5780]133        maybe_grok, setUpZope, cleanUpZope,
134        )
[7811]135      from waeup.kofa.app import University
[5780]136
137      class MyTestCase(unittest.TestCase):
138
139          @classmethod
140          def setUpClass(cls):
141              grokked = maybe_grok()
142              if grokked:
143                  setUpZope(None)
144              return
145
146          @classmethod
147          def tearDownClass(cls):
148              cleanUpZope(None)
149
150          def setUp(self):
151              pass
[6237]152
[5780]153          def tearDown(self):
154              pass
155
156          def test_jambdata_in_site(self):
157              u = University()
158              self.assertTrue('jambdata' in u.keys())
159              return
160
161    Here the component registration is done only once for the whole
162    :class:`unittest.TestCase` and no ZODB is needed. That means
[7811]163    inside the tests you can expect to have all :mod:`waeup.kofa`
[5780]164    components (utilities, adapter, events) registered but as objects
165    have here still have no place inside a ZODB things like 'browsing'
166    won't work out of the box. The benefit is the faster test
167    setup/teardown.
[5796]168
169    .. note:: This works only with the default Python testrunners.
[6237]170
[5796]171         If you use the Zope testrunner (from :mod:`zope.testing`)
172         then you have to use appropriate layers like the
[7819]173         :class:`waeup.kofa.testing.KofaUnitTestLayer`.
[5796]174
175    Usage with :mod:`zope.testing` testrunners
176    ------------------------------------------
177
178    If you use the standard Zope testrunner, classmethods like
179    `setUpClass` are not executed. Instead you have to use a layer
180    like the one defined in this module.
181
[7819]182    .. seealso:: :class:`waeup.kofa.testing.KofaUnitTestLayer`
[6237]183
[5780]184    """
[12598]185    gsm = getGlobalSiteManager()
[5780]186    # If there are any event handlers registered already, we assume
[7811]187    # that waeup.kofa was grokked already. There might be a batter
[5780]188    # check, though.
189    if len(list(gsm.registeredHandlers())) > 0:
190        return False
[5865]191    # Register the zope.Public permission, normally done via ZCML setup.
192    addCheckerPublic()
[12598]193    warnings.simplefilter('ignore')   # disable (erraneous) warnings
[7811]194    grok.testing.grok('waeup.kofa')
[12598]195    warnings.simplefilter('default')  # reenable warnings
[5780]196    return True
[5796]197
[12598]198
[7581]199def setup_datacenter_conf():
200    """Register a datacenter config utility for non-functional tests.
201    """
[7811]202    from waeup.kofa.interfaces import IDataCenterConfig
[7581]203    conf = queryUtility(IDataCenterConfig)
204    if conf is not None:
205        return
206    path = tempfile.mkdtemp()
207    conf = {'path': path}
208    gsm = getGlobalSiteManager()
209    gsm.registerUtility(conf, IDataCenterConfig)
210    return
[5796]211
[12598]212
[7581]213def teardown_datacenter_conf():
214    """Unregister a datacenter config utility for non-functional tests.
215    """
[7811]216    from waeup.kofa.interfaces import IDataCenterConfig
[7581]217    conf = queryUtility(IDataCenterConfig)
218    if conf is None:
219        return
220    path = conf['path']
221    shutil.rmtree(path)
222    gsm = getGlobalSiteManager()
[9289]223    gsm.unregisterUtility(conf, IDataCenterConfig)
[7581]224    return
225
[12598]226
[7819]227class KofaUnitTestLayer(object):
[7811]228    """A layer for tests that groks `waeup.kofa`.
[5796]229
[7811]230    A Zope test layer that registers all :mod:`waeup.kofa` components
[5796]231    before attached tests are run and cleans this registrations up
[7811]232    afterwards. Also basic (non-waeup.kofa) components like the event
[5796]233    dispatcher machinery are registered, set up and cleaned up.
234
235    This layer does not provide a complete ZODB setup (and is
236    therefore much faster than complete functional setups) but does
237    only the registrations (which also takes some time, so running
238    this layer is slower than test cases that need none or only a
239    few registrations).
240
241    The registrations are done for all tests the layer is attached to
242    once before all these tests are run (and torn down once
243    afterwards).
[6237]244
[5796]245    To make use of this layer, you have to write a
246    :mod:`unittest.TestCase` class that provides an attribute called
247    ``layer`` with this class as value like this::
248
249      import unittest
[7819]250      from waeup.kofa.testing import KofaUnitTestLayer
[6237]251
[5796]252      class MyTestCase(unittest.TestCase):
253
[7819]254          layer = KofaUnitTestLayer
[5796]255
256          # per-test setups and real tests go here...
257          def test_foo(self):
258              self.assertEqual(1, 1)
259              return
260
261    """
262    @classmethod
263    def setUp(cls):
[5865]264        #setUpZope(None)
[5796]265        grokked = maybe_grok()
[7581]266        setup_datacenter_conf()
[5796]267        return
268
269    @classmethod
270    def tearDown(cls):
[9289]271        teardown_datacenter_conf()
[5796]272        cleanUpZope(None)
[7581]273        return
[6238]274
[7581]275
[6463]276#: This extended :class:`doctest.OutputChecker` allows the following
277#: additional matches when looking for output diffs:
278#:
279#: `N.NNN seconds`
280#:    matches strings like ``12.123 seconds``
281#:
282#: `HTTPError:`
283#:    matches ``httperror_seek_wrapper:``. This string is output by some
284#:    virtual browsers you might use in functional browser tests to signal
285#:    HTTP error state.
286#:
287#: `1034h`
288#:    is ignored. This sequence of control chars is output by some
289#:    (buggy) testrunners at beginning of output.
290#:
291#: `<10-DIGITS>`
292#:    matches a sequence of 10 digits. Useful when checking accesscode
293#:    numbers if you don't know the exact (random) code.
294#:
[9425]295#: `<6-DIGITS>`
[10463]296#:    matches a sequence of 6 digits. Useful when checking accesscode
[9425]297#:    numbers if you don't know the exact (random) code.
298#:
[6466]299#: `<YYYY-MM-DD hh:mm:ss>`
[6463]300#:    matches any date and time like `2011-05-01 12:01:32`.
301#:
302#: `<DATE-AND-TIME>`
[6466]303#:    same like ``<YYYY-MM-DD hh:mm:ss>`` but shorter.
[6238]304checker = renormalizing.RENormalizing([
305    # Relevant normalizers from zope.testing.testrunner.tests:
306    (re.compile(r'\d+[.]\d\d\d seconds'), 'N.NNN seconds'),
307    # Our own one to work around
308    # http://reinout.vanrees.org/weblog/2009/07/16/invisible-test-diff.html:
309    (re.compile(r'.*1034h'), ''),
[12598]310    (re.compile(r'httperror_seek_wrapper:'), 'HTTPError:'),
[9425]311    (re.compile('[\d]{6}'), '<6-DIGITS>'),
[6463]312    (re.compile('[\d]{10}'), '<10-DIGITS>'),
[8186]313    (re.compile('\d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d [\+\-]\d\d\d\d [^ ]+'),
314     '<YYYY-MM-DD hh:mm:ss TZ>'),
[6466]315    (re.compile('\d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d'), '<YYYY-MM-DD hh:mm:ss>'),
316    (re.compile('\d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d'), '<DATETIME>'),
[6238]317    ])
318
[12598]319
[6727]320old_loggers = []
[12598]321
322
[6238]323def setUp(test):
[12598]324    global old_loggers
[6727]325    old_loggers = get_all_loggers()
[6238]326    FunctionalTestSetup().setUp()
327
[12598]328
[6238]329def tearDown(test):
[12598]330    global old_loggers
[6238]331    FunctionalTestSetup().tearDown()
[6727]332    remove_new_loggers(old_loggers)
[6238]333
[12598]334
[6238]335def doctestsuite_for_module(dotted_path):
336    """Create a doctest suite for the module at `dotted_path`.
337    """
338    test = doctest.DocTestSuite(
339        dotted_path,
[12598]340        setUp=setUp,
341        tearDown=tearDown,
342        checker=checker,
343        extraglobs=dict(
[6238]344            getRootFolder=getRootFolder,
345            sync=sync,),
[12598]346        optionflags=(doctest.ELLIPSIS +
347                     doctest.NORMALIZE_WHITESPACE +
348                     doctest.REPORT_NDIFF),
[6238]349        )
350    test.layer = FunctionalLayer
351    return test
[6463]352
[12598]353
[6463]354optionflags = (
355    doctest.REPORT_NDIFF + doctest.ELLIPSIS + doctest.NORMALIZE_WHITESPACE)
356
[12598]357
[6754]358def clear_logger_collector():
[12598]359    from zope.component import queryUtility
[7811]360    from waeup.kofa.interfaces import ILoggerCollector
[6754]361    collector = queryUtility(ILoggerCollector)
362    if collector is None:
363        return
[6980]364    keys = list(collector.keys())
[6754]365    for key in keys:
366        del collector[key]
367    return
368
[12598]369
[6463]370class FunctionalTestCase(FunctionalTestCase):
371    """A test case that supports checking output diffs in doctest style.
372    """
373
[6578]374    def setUp(self):
375        super(FunctionalTestCase, self).setUp()
376        self.functional_old_loggers = get_all_loggers()
377        return
378
379    def tearDown(self):
380        super(FunctionalTestCase, self).tearDown()
381        remove_new_loggers(self.functional_old_loggers)
[6754]382        clear_logger_collector()
[6578]383        return
384
[6463]385    def assertMatches(self, want, got, checker=checker,
386                      optionflags=optionflags):
387        """Assert that the multiline string `want` matches `got`.
388
389        In `want` you can use shortcuts like ``...`` as in regular doctests.
390
391        If no special `checker` is passed, we use an extended
392        :class:`doctest.OutputChecker` as defined in
[7811]393        :mod:`waeup.kofa.testing`.
[6463]394
395        If optional `optionflags` are not given, use ``REPORT_NDIFF``,
396        ``ELLIPSIS``, and ``NORMALIZE_WHITESPACE``.
397
[7811]398        .. seealso:: :data:`waeup.kofa.testing.optionflags`
[6463]399
[7811]400        .. seealso:: :data:`waeup.kofa.testing.checker`
[6463]401        """
402        if checker.check_output(want, got, optionflags):
403            return
404        diff = checker.output_difference(
405            doctest.Example('', want), got, optionflags)
406        self.fail(diff)
[6727]407
[12598]408
[6727]409class FunctionalTestSetup(FunctionalTestSetup):
410    """A replacement for the zope.app.testing class.
411
412    Removes also loggers.
413    """
414
415    def setUp(self):
416        self.old_loggers = get_all_loggers()
417        super(FunctionalTestSetup, self).setUp()
418        return
419
420    def tearDown(self):
421        super(FunctionalTestSetup, self).tearDown()
422        remove_new_loggers(self.old_loggers)
423        return
[6735]424
[12598]425
[6735]426def get_doctest_suite(filename_list=[]):
427    """Helper function to create doctest suites for doctests.
428
429    The `filename_list` is a list of filenames relative to the
[7817]430    w.k. dir.  So, to get a doctest suite for ``browser.txt`` and
[6735]431    ``blah.txt`` in the ``browser/`` subpackage you have to pass
432    ``filename_list=['browser/browser.txt','browser/blah.txt']`` and
433    so on.
434
435    The returned test suite must be registered somewhere locally for
436    instance by something like:
437
[7811]438      from waeup.kofa.testing import get_doctest_suite
[6735]439      def test_suite():
440        suite = get_doctest_suite(['mypkg/foo.txt', 'mypkg/bar.txt'])
441        return suite
442
443    and that's it.
444    """
445    suite = unittest.TestSuite()
446    for filename in filename_list:
447        path = os.path.join(
448            os.path.dirname(__file__), filename)
449        test = doctest.DocFileSuite(
450            path,
451            module_relative=False,
452            setUp=setUp, tearDown=tearDown,
[12598]453            globs=dict(getRootFolder=getRootFolder),
454            optionflags=doctest.ELLIPSIS + doctest.NORMALIZE_WHITESPACE,
455            checker=checker,
[6735]456            )
457        test.layer = FunctionalLayer
458        suite.addTest(test)
459    return suite
[9342]460
[12598]461
[9342]462class FakeJob(object):
463    # A job usable for simple async tests
464    status = COMPLETED
465    result = None
[12582]466    acts_under_heavy_load = False  # set to True to mimic system high load
[9342]467
[9483]468    def __init__(self, *args, **kw):
469        self.args = args
470        self.kwargs = kw
471
[12598]472
[9342]473class FakeJobManager(object):
474    # A fake job manager for testing async functionality
475
[9343]476    def __init__(self):
477        # make sure each instance maintains an own set of jobs/nums.
478        self._jobs = dict()
479        self._curr_num = 1
480
[9342]481    def get(self, job_id):
482        if job_id == '3':
483            return FakeJob()
484        return self._jobs.get(job_id, None)
485
486    def put(self, job):
[12582]487        if getattr(job, 'acts_under_heavy_load', False):
488            return None
[9342]489        num = str(self._curr_num)
490        self._jobs[num] = job
491        self._curr_num += 1
492        return num
493
494    def remove(self, job_id, site):
495        if job_id in self._jobs:
496            del self._jobs[job_id]
497        return
[12595]498
499
500@contextmanager
501def expensive_actions():
502    """A contextmanager to work with a modified `expensive_acrtions_allowed()`
503
504    This method is part of local IKofaUtils.
505    """
506    utils = getUtility(IKofaUtils)
507    old_func = utils.expensive_actions_allowed
508    try:
509        yield utils
510    finally:
511        # restore function after use
512        utils.expensive_actions_allowed = old_func
Note: See TracBrowser for help on using the repository browser.