source: main/waeup.kofa/trunk/src/waeup/kofa/testing.py @ 17550

Last change on this file since 17550 was 12598, checked in by uli, 10 years ago

flake8.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Id
File size: 15.6 KB
Line 
1## $Id: testing.py 12598 2015-02-11 16:22:44Z uli $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
18"""Testing support for :mod:`waeup.kofa`.
19"""
20import grok
21import doctest
22import logging
23import os.path
24import re
25import tempfile
26import shutil
27import unittest
28import warnings
29import zope.component
30import waeup.kofa
31from contextlib import contextmanager
32from zc.async.interfaces import COMPLETED
33from zope.app.testing.functional import (
34    ZCMLLayer, FunctionalTestSetup, getRootFolder, sync, FunctionalTestCase)
35from zope.component import getGlobalSiteManager, queryUtility, getUtility
36from zope.security.testing import addCheckerPublic
37from zope.testing import renormalizing
38from zope.testing.cleanup import cleanUp
39from waeup.kofa.interfaces import IKofaUtils
40
41ftesting_zcml = os.path.join(
42    os.path.dirname(waeup.kofa.__file__), 'ftesting.zcml')
43FunctionalLayer = ZCMLLayer(ftesting_zcml, __name__, 'FunctionalLayer',
44                            allow_teardown=True)
45
46
47def get_all_loggers():
48    """Get the keys of all logger defined globally.
49    """
50    result = logging.root.manager.loggerDict.keys()
51    ws_loggers = [x for x in result if 'waeup.kofa' in x]
52    if ws_loggers:
53        # For debugging: show any remaining loggers from w.k. namespace
54        print "\nLOGGERS: ", ws_loggers
55    return result
56
57
58def remove_new_loggers(old_loggers):
59    """Remove the loggers except `old_loggers`.
60
61    `old_loggers` is a list of logger keys as returned by
62    :func:`get_all_loggers`. All globally registered loggers whose
63    name is not in `old_loggers` is removed.
64    """
65    new_loggers = [key for key in logging.root.manager.loggerDict
66                   if key not in old_loggers]
67    for key in sorted(new_loggers, reverse=True):
68        logger = logging.getLogger(key)
69        for handler in logger.handlers:
70            handler.close()
71        del logger
72        del logging.root.manager.loggerDict[key]
73    return
74
75
76def remove_logger(name):
77    """Remove logger with name `name`.
78
79    Use is safe. If the logger does not exist nothing happens.
80    """
81    if name in logging.root.manager.loggerDict.keys():
82        del logging.root.manager.loggerDict[name]
83    return
84
85
86def setUpZope(test=None):
87    """Initialize a Zope-compatible environment.
88
89    Currently, we only initialize the event machinery.
90    """
91    zope.component.eventtesting.setUp(test)
92
93
94def cleanUpZope(test=None):
95    """Clean up Zope-related registrations.
96
97    Cleans up all registrations and the like.
98    """
99    cleanUp()
100
101
102def maybe_grok():
103    """Try to grok the :mod:`waeup.kofa` package.
104
105    For many tests, even simple ones, we want the components defined
106    somewhere in the :mod:`waeup.kofa` package being registered. While
107    grokking the complete package can become expensive when done many
108    times, we only want to grok if it did not happen
109    before. Furthermore regrokking the whole package makes no sense if
110    done already.
111
112    :func:`maybe_grok` checks whether any eventhandlers are already
113    registered and does nothing in that case.
114
115    The grokking of :mod:`waeup.kofa` is done with warnings disabled.
116
117    Returns ``True`` if grokking was done, ``False`` else.
118
119    .. The following samples should go into Sphinx docs directly....
120
121    Sample
122    ******
123
124    Usage with plain Python testrunners
125    -----------------------------------
126
127    Together with the :func:`setUpZope` and :func:`cleanUpZope`
128    functions we then can do unittests with all components registered
129    and the event dispatcher in place like this::
130
131      import unittest2 as unittest # Want Python 2.7 features
132      from waeup.kofa.testing import (
133        maybe_grok, setUpZope, cleanUpZope,
134        )
135      from waeup.kofa.app import University
136
137      class MyTestCase(unittest.TestCase):
138
139          @classmethod
140          def setUpClass(cls):
141              grokked = maybe_grok()
142              if grokked:
143                  setUpZope(None)
144              return
145
146          @classmethod
147          def tearDownClass(cls):
148              cleanUpZope(None)
149
150          def setUp(self):
151              pass
152
153          def tearDown(self):
154              pass
155
156          def test_jambdata_in_site(self):
157              u = University()
158              self.assertTrue('jambdata' in u.keys())
159              return
160
161    Here the component registration is done only once for the whole
162    :class:`unittest.TestCase` and no ZODB is needed. That means
163    inside the tests you can expect to have all :mod:`waeup.kofa`
164    components (utilities, adapter, events) registered but as objects
165    have here still have no place inside a ZODB things like 'browsing'
166    won't work out of the box. The benefit is the faster test
167    setup/teardown.
168
169    .. note:: This works only with the default Python testrunners.
170
171         If you use the Zope testrunner (from :mod:`zope.testing`)
172         then you have to use appropriate layers like the
173         :class:`waeup.kofa.testing.KofaUnitTestLayer`.
174
175    Usage with :mod:`zope.testing` testrunners
176    ------------------------------------------
177
178    If you use the standard Zope testrunner, classmethods like
179    `setUpClass` are not executed. Instead you have to use a layer
180    like the one defined in this module.
181
182    .. seealso:: :class:`waeup.kofa.testing.KofaUnitTestLayer`
183
184    """
185    gsm = getGlobalSiteManager()
186    # If there are any event handlers registered already, we assume
187    # that waeup.kofa was grokked already. There might be a batter
188    # check, though.
189    if len(list(gsm.registeredHandlers())) > 0:
190        return False
191    # Register the zope.Public permission, normally done via ZCML setup.
192    addCheckerPublic()
193    warnings.simplefilter('ignore')   # disable (erraneous) warnings
194    grok.testing.grok('waeup.kofa')
195    warnings.simplefilter('default')  # reenable warnings
196    return True
197
198
199def setup_datacenter_conf():
200    """Register a datacenter config utility for non-functional tests.
201    """
202    from waeup.kofa.interfaces import IDataCenterConfig
203    conf = queryUtility(IDataCenterConfig)
204    if conf is not None:
205        return
206    path = tempfile.mkdtemp()
207    conf = {'path': path}
208    gsm = getGlobalSiteManager()
209    gsm.registerUtility(conf, IDataCenterConfig)
210    return
211
212
213def teardown_datacenter_conf():
214    """Unregister a datacenter config utility for non-functional tests.
215    """
216    from waeup.kofa.interfaces import IDataCenterConfig
217    conf = queryUtility(IDataCenterConfig)
218    if conf is None:
219        return
220    path = conf['path']
221    shutil.rmtree(path)
222    gsm = getGlobalSiteManager()
223    gsm.unregisterUtility(conf, IDataCenterConfig)
224    return
225
226
227class KofaUnitTestLayer(object):
228    """A layer for tests that groks `waeup.kofa`.
229
230    A Zope test layer that registers all :mod:`waeup.kofa` components
231    before attached tests are run and cleans this registrations up
232    afterwards. Also basic (non-waeup.kofa) components like the event
233    dispatcher machinery are registered, set up and cleaned up.
234
235    This layer does not provide a complete ZODB setup (and is
236    therefore much faster than complete functional setups) but does
237    only the registrations (which also takes some time, so running
238    this layer is slower than test cases that need none or only a
239    few registrations).
240
241    The registrations are done for all tests the layer is attached to
242    once before all these tests are run (and torn down once
243    afterwards).
244
245    To make use of this layer, you have to write a
246    :mod:`unittest.TestCase` class that provides an attribute called
247    ``layer`` with this class as value like this::
248
249      import unittest
250      from waeup.kofa.testing import KofaUnitTestLayer
251
252      class MyTestCase(unittest.TestCase):
253
254          layer = KofaUnitTestLayer
255
256          # per-test setups and real tests go here...
257          def test_foo(self):
258              self.assertEqual(1, 1)
259              return
260
261    """
262    @classmethod
263    def setUp(cls):
264        #setUpZope(None)
265        grokked = maybe_grok()
266        setup_datacenter_conf()
267        return
268
269    @classmethod
270    def tearDown(cls):
271        teardown_datacenter_conf()
272        cleanUpZope(None)
273        return
274
275
276#: This extended :class:`doctest.OutputChecker` allows the following
277#: additional matches when looking for output diffs:
278#:
279#: `N.NNN seconds`
280#:    matches strings like ``12.123 seconds``
281#:
282#: `HTTPError:`
283#:    matches ``httperror_seek_wrapper:``. This string is output by some
284#:    virtual browsers you might use in functional browser tests to signal
285#:    HTTP error state.
286#:
287#: `1034h`
288#:    is ignored. This sequence of control chars is output by some
289#:    (buggy) testrunners at beginning of output.
290#:
291#: `<10-DIGITS>`
292#:    matches a sequence of 10 digits. Useful when checking accesscode
293#:    numbers if you don't know the exact (random) code.
294#:
295#: `<6-DIGITS>`
296#:    matches a sequence of 6 digits. Useful when checking accesscode
297#:    numbers if you don't know the exact (random) code.
298#:
299#: `<YYYY-MM-DD hh:mm:ss>`
300#:    matches any date and time like `2011-05-01 12:01:32`.
301#:
302#: `<DATE-AND-TIME>`
303#:    same like ``<YYYY-MM-DD hh:mm:ss>`` but shorter.
304checker = renormalizing.RENormalizing([
305    # Relevant normalizers from zope.testing.testrunner.tests:
306    (re.compile(r'\d+[.]\d\d\d seconds'), 'N.NNN seconds'),
307    # Our own one to work around
308    # http://reinout.vanrees.org/weblog/2009/07/16/invisible-test-diff.html:
309    (re.compile(r'.*1034h'), ''),
310    (re.compile(r'httperror_seek_wrapper:'), 'HTTPError:'),
311    (re.compile('[\d]{6}'), '<6-DIGITS>'),
312    (re.compile('[\d]{10}'), '<10-DIGITS>'),
313    (re.compile('\d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d [\+\-]\d\d\d\d [^ ]+'),
314     '<YYYY-MM-DD hh:mm:ss TZ>'),
315    (re.compile('\d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d'), '<YYYY-MM-DD hh:mm:ss>'),
316    (re.compile('\d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d'), '<DATETIME>'),
317    ])
318
319
320old_loggers = []
321
322
323def setUp(test):
324    global old_loggers
325    old_loggers = get_all_loggers()
326    FunctionalTestSetup().setUp()
327
328
329def tearDown(test):
330    global old_loggers
331    FunctionalTestSetup().tearDown()
332    remove_new_loggers(old_loggers)
333
334
335def doctestsuite_for_module(dotted_path):
336    """Create a doctest suite for the module at `dotted_path`.
337    """
338    test = doctest.DocTestSuite(
339        dotted_path,
340        setUp=setUp,
341        tearDown=tearDown,
342        checker=checker,
343        extraglobs=dict(
344            getRootFolder=getRootFolder,
345            sync=sync,),
346        optionflags=(doctest.ELLIPSIS +
347                     doctest.NORMALIZE_WHITESPACE +
348                     doctest.REPORT_NDIFF),
349        )
350    test.layer = FunctionalLayer
351    return test
352
353
354optionflags = (
355    doctest.REPORT_NDIFF + doctest.ELLIPSIS + doctest.NORMALIZE_WHITESPACE)
356
357
358def clear_logger_collector():
359    from zope.component import queryUtility
360    from waeup.kofa.interfaces import ILoggerCollector
361    collector = queryUtility(ILoggerCollector)
362    if collector is None:
363        return
364    keys = list(collector.keys())
365    for key in keys:
366        del collector[key]
367    return
368
369
370class FunctionalTestCase(FunctionalTestCase):
371    """A test case that supports checking output diffs in doctest style.
372    """
373
374    def setUp(self):
375        super(FunctionalTestCase, self).setUp()
376        self.functional_old_loggers = get_all_loggers()
377        return
378
379    def tearDown(self):
380        super(FunctionalTestCase, self).tearDown()
381        remove_new_loggers(self.functional_old_loggers)
382        clear_logger_collector()
383        return
384
385    def assertMatches(self, want, got, checker=checker,
386                      optionflags=optionflags):
387        """Assert that the multiline string `want` matches `got`.
388
389        In `want` you can use shortcuts like ``...`` as in regular doctests.
390
391        If no special `checker` is passed, we use an extended
392        :class:`doctest.OutputChecker` as defined in
393        :mod:`waeup.kofa.testing`.
394
395        If optional `optionflags` are not given, use ``REPORT_NDIFF``,
396        ``ELLIPSIS``, and ``NORMALIZE_WHITESPACE``.
397
398        .. seealso:: :data:`waeup.kofa.testing.optionflags`
399
400        .. seealso:: :data:`waeup.kofa.testing.checker`
401        """
402        if checker.check_output(want, got, optionflags):
403            return
404        diff = checker.output_difference(
405            doctest.Example('', want), got, optionflags)
406        self.fail(diff)
407
408
409class FunctionalTestSetup(FunctionalTestSetup):
410    """A replacement for the zope.app.testing class.
411
412    Removes also loggers.
413    """
414
415    def setUp(self):
416        self.old_loggers = get_all_loggers()
417        super(FunctionalTestSetup, self).setUp()
418        return
419
420    def tearDown(self):
421        super(FunctionalTestSetup, self).tearDown()
422        remove_new_loggers(self.old_loggers)
423        return
424
425
426def get_doctest_suite(filename_list=[]):
427    """Helper function to create doctest suites for doctests.
428
429    The `filename_list` is a list of filenames relative to the
430    w.k. dir.  So, to get a doctest suite for ``browser.txt`` and
431    ``blah.txt`` in the ``browser/`` subpackage you have to pass
432    ``filename_list=['browser/browser.txt','browser/blah.txt']`` and
433    so on.
434
435    The returned test suite must be registered somewhere locally for
436    instance by something like:
437
438      from waeup.kofa.testing import get_doctest_suite
439      def test_suite():
440        suite = get_doctest_suite(['mypkg/foo.txt', 'mypkg/bar.txt'])
441        return suite
442
443    and that's it.
444    """
445    suite = unittest.TestSuite()
446    for filename in filename_list:
447        path = os.path.join(
448            os.path.dirname(__file__), filename)
449        test = doctest.DocFileSuite(
450            path,
451            module_relative=False,
452            setUp=setUp, tearDown=tearDown,
453            globs=dict(getRootFolder=getRootFolder),
454            optionflags=doctest.ELLIPSIS + doctest.NORMALIZE_WHITESPACE,
455            checker=checker,
456            )
457        test.layer = FunctionalLayer
458        suite.addTest(test)
459    return suite
460
461
462class FakeJob(object):
463    # A job usable for simple async tests
464    status = COMPLETED
465    result = None
466    acts_under_heavy_load = False  # set to True to mimic system high load
467
468    def __init__(self, *args, **kw):
469        self.args = args
470        self.kwargs = kw
471
472
473class FakeJobManager(object):
474    # A fake job manager for testing async functionality
475
476    def __init__(self):
477        # make sure each instance maintains an own set of jobs/nums.
478        self._jobs = dict()
479        self._curr_num = 1
480
481    def get(self, job_id):
482        if job_id == '3':
483            return FakeJob()
484        return self._jobs.get(job_id, None)
485
486    def put(self, job):
487        if getattr(job, 'acts_under_heavy_load', False):
488            return None
489        num = str(self._curr_num)
490        self._jobs[num] = job
491        self._curr_num += 1
492        return num
493
494    def remove(self, job_id, site):
495        if job_id in self._jobs:
496            del self._jobs[job_id]
497        return
498
499
500@contextmanager
501def expensive_actions():
502    """A contextmanager to work with a modified `expensive_acrtions_allowed()`
503
504    This method is part of local IKofaUtils.
505    """
506    utils = getUtility(IKofaUtils)
507    old_func = utils.expensive_actions_allowed
508    try:
509        yield utils
510    finally:
511        # restore function after use
512        utils.expensive_actions_allowed = old_func
Note: See TracBrowser for help on using the repository browser.