source: main/waeup.kofa/trunk/src/waeup/kofa/testing.py @ 12596

Last change on this file since 12596 was 12595, checked in by uli, 10 years ago

Move test helper.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Id
File size: 15.6 KB
Line 
1## $Id: testing.py 12595 2015-02-11 15:28:49Z uli $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
18"""Testing support for :mod:`waeup.kofa`.
19"""
20import grok
21import doctest
22import logging
23import os.path
24import re
25import tempfile
26import shutil
27import unittest
28import warnings
29import zope.component
30import waeup.kofa
31from contextlib import contextmanager
32from zc.async.interfaces import COMPLETED
33from zope.app.testing.functional import (
34    ZCMLLayer, FunctionalTestSetup, getRootFolder, sync, FunctionalTestCase)
35from zope.component import getGlobalSiteManager, queryUtility, getUtility
36from zope.security.testing import addCheckerPublic
37from zope.testing import renormalizing
38from zope.testing.cleanup import cleanUp
39from waeup.kofa.interfaces import IKofaUtils
40
41ftesting_zcml = os.path.join(
42    os.path.dirname(waeup.kofa.__file__), 'ftesting.zcml')
43FunctionalLayer = ZCMLLayer(ftesting_zcml, __name__, 'FunctionalLayer',
44                            allow_teardown=True)
45
46def get_all_loggers():
47    """Get the keys of all logger defined globally.
48    """
49    result = logging.root.manager.loggerDict.keys()
50    ws_loggers = [x for x in result if 'waeup.kofa' in x]
51    if ws_loggers:
52        # For debugging: show any remaining loggers from w.k. namespace
53        print "\nLOGGERS: ", ws_loggers
54    return result
55
56def remove_new_loggers(old_loggers):
57    """Remove the loggers except `old_loggers`.
58
59    `old_loggers` is a list of logger keys as returned by
60    :func:`get_all_loggers`. All globally registered loggers whose
61    name is not in `old_loggers` is removed.
62    """
63    new_loggers = [key for key in logging.root.manager.loggerDict
64                   if key not in old_loggers]
65    for key in sorted(new_loggers, reverse=True):
66        logger = logging.getLogger(key)
67        for handler in logger.handlers:
68            handler.close()
69        del logger
70        del logging.root.manager.loggerDict[key]
71    return
72
73def remove_logger(name):
74    """Remove logger with name `name`.
75
76    Use is safe. If the logger does not exist nothing happens.
77    """
78    if name in logging.root.manager.loggerDict.keys():
79        del logging.root.manager.loggerDict[name]
80    return
81
82
83def setUpZope(test=None):
84    """Initialize a Zope-compatible environment.
85
86    Currently, we only initialize the event machinery.
87    """
88    zope.component.eventtesting.setUp(test)
89
90def cleanUpZope(test=None):
91    """Clean up Zope-related registrations.
92
93    Cleans up all registrations and the like.
94    """
95    cleanUp()
96
97def maybe_grok():
98    """Try to grok the :mod:`waeup.kofa` package.
99
100    For many tests, even simple ones, we want the components defined
101    somewhere in the :mod:`waeup.kofa` package being registered. While
102    grokking the complete package can become expensive when done many
103    times, we only want to grok if it did not happen
104    before. Furthermore regrokking the whole package makes no sense if
105    done already.
106
107    :func:`maybe_grok` checks whether any eventhandlers are already
108    registered and does nothing in that case.
109
110    The grokking of :mod:`waeup.kofa` is done with warnings disabled.
111
112    Returns ``True`` if grokking was done, ``False`` else.
113
114    .. The following samples should go into Sphinx docs directly....
115
116    Sample
117    ******
118
119    Usage with plain Python testrunners
120    -----------------------------------
121
122    Together with the :func:`setUpZope` and :func:`cleanUpZope`
123    functions we then can do unittests with all components registered
124    and the event dispatcher in place like this::
125
126      import unittest2 as unittest # Want Python 2.7 features
127      from waeup.kofa.testing import (
128        maybe_grok, setUpZope, cleanUpZope,
129        )
130      from waeup.kofa.app import University
131
132      class MyTestCase(unittest.TestCase):
133
134          @classmethod
135          def setUpClass(cls):
136              grokked = maybe_grok()
137              if grokked:
138                  setUpZope(None)
139              return
140
141          @classmethod
142          def tearDownClass(cls):
143              cleanUpZope(None)
144
145          def setUp(self):
146              pass
147
148          def tearDown(self):
149              pass
150
151          def test_jambdata_in_site(self):
152              u = University()
153              self.assertTrue('jambdata' in u.keys())
154              return
155
156    Here the component registration is done only once for the whole
157    :class:`unittest.TestCase` and no ZODB is needed. That means
158    inside the tests you can expect to have all :mod:`waeup.kofa`
159    components (utilities, adapter, events) registered but as objects
160    have here still have no place inside a ZODB things like 'browsing'
161    won't work out of the box. The benefit is the faster test
162    setup/teardown.
163
164    .. note:: This works only with the default Python testrunners.
165
166         If you use the Zope testrunner (from :mod:`zope.testing`)
167         then you have to use appropriate layers like the
168         :class:`waeup.kofa.testing.KofaUnitTestLayer`.
169
170    Usage with :mod:`zope.testing` testrunners
171    ------------------------------------------
172
173    If you use the standard Zope testrunner, classmethods like
174    `setUpClass` are not executed. Instead you have to use a layer
175    like the one defined in this module.
176
177    .. seealso:: :class:`waeup.kofa.testing.KofaUnitTestLayer`
178
179    """
180    gsm =  getGlobalSiteManager()
181    # If there are any event handlers registered already, we assume
182    # that waeup.kofa was grokked already. There might be a batter
183    # check, though.
184    if len(list(gsm.registeredHandlers())) > 0:
185        return False
186    # Register the zope.Public permission, normally done via ZCML setup.
187    addCheckerPublic()
188    warnings.simplefilter('ignore') # disable (erraneous) warnings
189    grok.testing.grok('waeup.kofa')
190    warnings.simplefilter('default') # reenable warnings
191    return True
192
193def setup_datacenter_conf():
194    """Register a datacenter config utility for non-functional tests.
195    """
196    from waeup.kofa.interfaces import IDataCenterConfig
197    conf = queryUtility(IDataCenterConfig)
198    if conf is not None:
199        return
200    path = tempfile.mkdtemp()
201    conf = {'path': path}
202    gsm = getGlobalSiteManager()
203    gsm.registerUtility(conf, IDataCenterConfig)
204    return
205
206def teardown_datacenter_conf():
207    """Unregister a datacenter config utility for non-functional tests.
208    """
209    from waeup.kofa.interfaces import IDataCenterConfig
210    conf = queryUtility(IDataCenterConfig)
211    if conf is None:
212        return
213    path = conf['path']
214    shutil.rmtree(path)
215    gsm = getGlobalSiteManager()
216    gsm.unregisterUtility(conf, IDataCenterConfig)
217    return
218
219class KofaUnitTestLayer(object):
220    """A layer for tests that groks `waeup.kofa`.
221
222    A Zope test layer that registers all :mod:`waeup.kofa` components
223    before attached tests are run and cleans this registrations up
224    afterwards. Also basic (non-waeup.kofa) components like the event
225    dispatcher machinery are registered, set up and cleaned up.
226
227    This layer does not provide a complete ZODB setup (and is
228    therefore much faster than complete functional setups) but does
229    only the registrations (which also takes some time, so running
230    this layer is slower than test cases that need none or only a
231    few registrations).
232
233    The registrations are done for all tests the layer is attached to
234    once before all these tests are run (and torn down once
235    afterwards).
236
237    To make use of this layer, you have to write a
238    :mod:`unittest.TestCase` class that provides an attribute called
239    ``layer`` with this class as value like this::
240
241      import unittest
242      from waeup.kofa.testing import KofaUnitTestLayer
243
244      class MyTestCase(unittest.TestCase):
245
246          layer = KofaUnitTestLayer
247
248          # per-test setups and real tests go here...
249          def test_foo(self):
250              self.assertEqual(1, 1)
251              return
252
253    """
254    @classmethod
255    def setUp(cls):
256        #setUpZope(None)
257        grokked = maybe_grok()
258        setup_datacenter_conf()
259        return
260
261    @classmethod
262    def tearDown(cls):
263        teardown_datacenter_conf()
264        cleanUpZope(None)
265        return
266
267
268#: This extended :class:`doctest.OutputChecker` allows the following
269#: additional matches when looking for output diffs:
270#:
271#: `N.NNN seconds`
272#:    matches strings like ``12.123 seconds``
273#:
274#: `HTTPError:`
275#:    matches ``httperror_seek_wrapper:``. This string is output by some
276#:    virtual browsers you might use in functional browser tests to signal
277#:    HTTP error state.
278#:
279#: `1034h`
280#:    is ignored. This sequence of control chars is output by some
281#:    (buggy) testrunners at beginning of output.
282#:
283#: `<10-DIGITS>`
284#:    matches a sequence of 10 digits. Useful when checking accesscode
285#:    numbers if you don't know the exact (random) code.
286#:
287#: `<6-DIGITS>`
288#:    matches a sequence of 6 digits. Useful when checking accesscode
289#:    numbers if you don't know the exact (random) code.
290#:
291#: `<YYYY-MM-DD hh:mm:ss>`
292#:    matches any date and time like `2011-05-01 12:01:32`.
293#:
294#: `<DATE-AND-TIME>`
295#:    same like ``<YYYY-MM-DD hh:mm:ss>`` but shorter.
296checker = renormalizing.RENormalizing([
297    # Relevant normalizers from zope.testing.testrunner.tests:
298    (re.compile(r'\d+[.]\d\d\d seconds'), 'N.NNN seconds'),
299    # Our own one to work around
300    # http://reinout.vanrees.org/weblog/2009/07/16/invisible-test-diff.html:
301    (re.compile(r'.*1034h'), ''),
302    (re.compile(r'httperror_seek_wrapper:'), 'HTTPError:' ),
303    (re.compile('[\d]{6}'), '<6-DIGITS>'),
304    (re.compile('[\d]{10}'), '<10-DIGITS>'),
305    (re.compile('\d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d [\+\-]\d\d\d\d [^ ]+'),
306     '<YYYY-MM-DD hh:mm:ss TZ>'),
307    (re.compile('\d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d'), '<YYYY-MM-DD hh:mm:ss>'),
308    (re.compile('\d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d'), '<DATETIME>'),
309    ])
310
311old_loggers = []
312def setUp(test):
313    old_loggers = get_all_loggers()
314    FunctionalTestSetup().setUp()
315
316def tearDown(test):
317    FunctionalTestSetup().tearDown()
318    remove_new_loggers(old_loggers)
319
320def doctestsuite_for_module(dotted_path):
321    """Create a doctest suite for the module at `dotted_path`.
322    """
323    test = doctest.DocTestSuite(
324        dotted_path,
325        setUp = setUp,
326        tearDown = tearDown,
327        checker = checker,
328        extraglobs = dict(
329            getRootFolder=getRootFolder,
330            sync=sync,),
331        optionflags = (doctest.ELLIPSIS +
332                       doctest.NORMALIZE_WHITESPACE +
333                       doctest.REPORT_NDIFF),
334        )
335    test.layer = FunctionalLayer
336    return test
337
338optionflags = (
339    doctest.REPORT_NDIFF + doctest.ELLIPSIS + doctest.NORMALIZE_WHITESPACE)
340
341def clear_logger_collector():
342    from zope.component import queryUtility, getGlobalSiteManager
343    from waeup.kofa.interfaces import ILoggerCollector
344    collector = queryUtility(ILoggerCollector)
345    if collector is None:
346        return
347    keys = list(collector.keys())
348    for key in keys:
349        del collector[key]
350    return
351
352class FunctionalTestCase(FunctionalTestCase):
353    """A test case that supports checking output diffs in doctest style.
354    """
355
356    def setUp(self):
357        super(FunctionalTestCase, self).setUp()
358        self.functional_old_loggers = get_all_loggers()
359        return
360
361    def tearDown(self):
362        super(FunctionalTestCase, self).tearDown()
363        remove_new_loggers(self.functional_old_loggers)
364        clear_logger_collector()
365        return
366
367    def assertMatches(self, want, got, checker=checker,
368                      optionflags=optionflags):
369        """Assert that the multiline string `want` matches `got`.
370
371        In `want` you can use shortcuts like ``...`` as in regular doctests.
372
373        If no special `checker` is passed, we use an extended
374        :class:`doctest.OutputChecker` as defined in
375        :mod:`waeup.kofa.testing`.
376
377        If optional `optionflags` are not given, use ``REPORT_NDIFF``,
378        ``ELLIPSIS``, and ``NORMALIZE_WHITESPACE``.
379
380        .. seealso:: :data:`waeup.kofa.testing.optionflags`
381
382        .. seealso:: :data:`waeup.kofa.testing.checker`
383        """
384        if checker.check_output(want, got, optionflags):
385            return
386        diff = checker.output_difference(
387            doctest.Example('', want), got, optionflags)
388        self.fail(diff)
389
390class FunctionalTestSetup(FunctionalTestSetup):
391    """A replacement for the zope.app.testing class.
392
393    Removes also loggers.
394    """
395
396    def setUp(self):
397        self.old_loggers = get_all_loggers()
398        super(FunctionalTestSetup, self).setUp()
399        return
400
401    def tearDown(self):
402        super(FunctionalTestSetup, self).tearDown()
403        remove_new_loggers(self.old_loggers)
404        return
405
406def get_doctest_suite(filename_list=[]):
407    """Helper function to create doctest suites for doctests.
408
409    The `filename_list` is a list of filenames relative to the
410    w.k. dir.  So, to get a doctest suite for ``browser.txt`` and
411    ``blah.txt`` in the ``browser/`` subpackage you have to pass
412    ``filename_list=['browser/browser.txt','browser/blah.txt']`` and
413    so on.
414
415    The returned test suite must be registered somewhere locally for
416    instance by something like:
417
418      from waeup.kofa.testing import get_doctest_suite
419      def test_suite():
420        suite = get_doctest_suite(['mypkg/foo.txt', 'mypkg/bar.txt'])
421        return suite
422
423    and that's it.
424    """
425    suite = unittest.TestSuite()
426    for filename in filename_list:
427        path = os.path.join(
428            os.path.dirname(__file__), filename)
429        test = doctest.DocFileSuite(
430            path,
431            module_relative=False,
432            setUp=setUp, tearDown=tearDown,
433            globs = dict(getRootFolder = getRootFolder),
434            optionflags = doctest.ELLIPSIS + doctest.NORMALIZE_WHITESPACE,
435            checker = checker,
436            )
437        test.layer = FunctionalLayer
438        suite.addTest(test)
439    return suite
440
441class FakeJob(object):
442    # A job usable for simple async tests
443    status = COMPLETED
444    result = None
445    acts_under_heavy_load = False  # set to True to mimic system high load
446
447    def __init__(self, *args, **kw):
448        self.args = args
449        self.kwargs = kw
450
451class FakeJobManager(object):
452    # A fake job manager for testing async functionality
453
454    def __init__(self):
455        # make sure each instance maintains an own set of jobs/nums.
456        self._jobs = dict()
457        self._curr_num = 1
458
459    def get(self, job_id):
460        if job_id == '3':
461            return FakeJob()
462        return self._jobs.get(job_id, None)
463
464    def put(self, job):
465        if getattr(job, 'acts_under_heavy_load', False):
466            return None
467        num = str(self._curr_num)
468        self._jobs[num] = job
469        self._curr_num += 1
470        return num
471
472    def remove(self, job_id, site):
473        if job_id in self._jobs:
474            del self._jobs[job_id]
475        return
476
477
478@contextmanager
479def expensive_actions():
480    """A contextmanager to work with a modified `expensive_acrtions_allowed()`
481
482    This method is part of local IKofaUtils.
483    """
484    utils = getUtility(IKofaUtils)
485    old_func = utils.expensive_actions_allowed
486    try:
487        yield utils
488    finally:
489        # restore function after use
490        utils.expensive_actions_allowed = old_func
Note: See TracBrowser for help on using the repository browser.