Ignore:
Timestamp:
6 Aug 2012, 17:10:53 (12 years ago)
Author:
uli
Message:

Bring test coverage to maximum value we can get (as asynchronous tasks
running in a different thread cannot be seen by the coverage
detector).

File:
1 edited

Legend:

Unmodified
Added
Removed
  • main/waeup.kofa/branches/uli-zc-async/src/waeup/kofa/tests/test_async.py

    r9083 r9084  
    11# Tests for the async module of waeup.kofa
     2import os
    23import re
     4import shutil
     5import tempfile
     6import transaction
    37import zc.async.configure
    48import zc.async.ftesting
    59import zc.async.queue
    610from zc.async.interfaces import IJob
     11from ZODB import DB, FileStorage
    712from zope.component import getUtility, provideAdapter
     13from zope.component.hooks import setSite
    814from zope.container.interfaces import IContainer
    915from zope.interface import verify
     
    1319from waeup.kofa.testing import FunctionalTestCase, FunctionalLayer
    1420from waeup.kofa.async import (
    15     get_job_id, AsyncJob, ProgressableJob, JobManager, JobContainer
     21    get_job_id, AsyncJob, ProgressableJob, JobManager, JobContainer,
     22    JobManagerTraverser, JobContainerInstaller,
    1623    )
    1724
     
    6774        job = AsyncJob(fake_async_func1)
    6875        self.getRootFolder()['myjob'] = job
    69         import transaction
    7076        transaction.commit()
    7177        oid = job.__repr__().split(' ')[2][:-1]
     
    176182        self.assertEqual(len(container), 0)
    177183        return
     184
     185    def test_start_test_job(self):
     186        # we can start a test job
     187        manager = getUtility(IJobManager)
     188        job_id = manager.start_test_job(duration=1, site=self.root_folder)
     189        container = self.root[WAEUP_KEY]
     190        self.assertEqual(len(container), 1)
     191        self.assertTrue(job_id is not None)
     192        return
     193
     194class JobManagerTraverserTests(FunctionalAsyncTestCase):
     195
     196    layer = FunctionalLayer
     197
     198    def setUp(self):
     199        super(JobManagerTraverserTests, self).setUp()
     200        self.root_folder = self.getRootFolder()
     201        self.root = self.root_folder._p_jar.root()
     202        self.manager = getUtility(IJobManager)
     203        setSite(self.getRootFolder())
     204
     205    def tearDown(self):
     206        setSite(None)
     207        super(JobManagerTraverserTests, self).tearDown()
     208
     209    def test_no_jobs(self):
     210        # the traverser returns None if no job is available
     211        traverser = JobManagerTraverser(self.manager, None)
     212        result = traverser.traverse('123')
     213        self.assertTrue(result is None)
     214        return
     215
     216    def test_valid_job(self):
     217        # the traverser returns the appropriate job if available
     218        myjob = AsyncJob(fake_async_func1)
     219        job_id = self.manager.put(myjob, site=self.root_folder)
     220        traverser = JobManagerTraverser(self.manager, None)
     221        result = traverser.traverse(job_id)
     222        self.assertEqual(myjob, result)
     223        return
     224
     225class FakeEvent(object):
     226    # A faked DatabaseOpenedEvent as triggered at instance startup.
     227    # The only attribute we need here is `database` which should
     228    # return a valid ZODB.
     229
     230    _storage = None
     231    _db = None
     232
     233    def __init__(self, workdir, db_name=None):
     234        # workdir is the place to create a ZODB in, db_name its name.
     235        self.workdir = workdir
     236        self.db_name = db_name
     237
     238    @property
     239    def database(self):
     240        if self._storage is None:
     241            # storage not created yet. Do it now.
     242            path = os.path.join(self.workdir, 'testdb.fs')
     243            self._storage = FileStorage.FileStorage(path)
     244            self._db = DB(self._storage, database_name=self.db_name)
     245        return self._db
     246
     247class FakeDB(object):
     248
     249    closed = False
     250
     251    def open(self, transaction_manager=None):
     252        return self
     253
     254    def close(self):
     255        self.closed = True
     256        return
     257
     258class JobContainerInstallerTests(FunctionalTestCase):
     259
     260    layer = FunctionalLayer
     261
     262    def setUp(self):
     263        super(JobContainerInstallerTests, self).setUp()
     264        # setup two ZODBs, one unnamed, one named...
     265        self.workdir1 = tempfile.mkdtemp()
     266        self.workdir2 = tempfile.mkdtemp()
     267        self.fake_event1 = FakeEvent(self.workdir1)
     268        self.fake_event2 = FakeEvent(self.workdir2, 'mytestdb')
     269        self.tm1 = transaction.TransactionManager()
     270        self.tm2 = transaction.TransactionManager()
     271        self.db1 = self.fake_event1.database
     272        self.db2 = self.fake_event2.database
     273        self.conn1 = self.db1.open(self.tm1)
     274        self.conn2 = self.db2.open(self.tm2)
     275
     276    def tearDown(self):
     277        self.tm1.abort()
     278        self.tm2.abort()
     279        shutil.rmtree(self.workdir1)
     280        shutil.rmtree(self.workdir2)
     281        super(JobContainerInstallerTests, self).tearDown()
     282
     283    def reopen(self):
     284        # reopen connections to the ZODBs (for retrieving fresh data)...
     285        self.conn1.close()
     286        self.conn2.close()
     287        self.conn1 = self.db1.open(self.tm1)
     288        self.conn2 = self.db2.open(self.tm2)
     289        return
     290
     291    @property
     292    def root1(self):
     293        # get root of first (unnamed) database
     294        self.reopen()
     295        return self.conn1.root()
     296
     297    @property
     298    def root2(self):
     299        # get root of second (named 'mytestdb') database
     300        self.reopen()
     301        return self.conn2.root()
     302
     303    def test_new_container(self):
     304        # we can install a job container if none is available yet
     305        installer = JobContainerInstaller()
     306        self.assertTrue(WAEUP_KEY not in self.root1)
     307        installer(self.fake_event1)
     308        self.assertTrue(WAEUP_KEY in self.root1)
     309        self.assertTrue(IJobContainer.providedBy(self.root1[WAEUP_KEY]))
     310        return
     311
     312    def test_existing_container(self):
     313        # we won't try to install a new container if one is already present
     314        installer = JobContainerInstaller()
     315        self.assertTrue(WAEUP_KEY not in self.root1)
     316        installer(self.fake_event1)
     317        self.assertTrue(WAEUP_KEY in self.root1)
     318        self.assertTrue(IJobContainer.providedBy(self.root1[WAEUP_KEY]))
     319        container = self.root1[WAEUP_KEY]
     320        installer(self.fake_event1)
     321        self.assertTrue(WAEUP_KEY in self.root1)
     322        self.assertTrue(self.root1[WAEUP_KEY] is container)
     323        return
     324
     325    def test_named_db(self):
     326        # we can also install containers in named dbs
     327        installer = JobContainerInstaller(db_name='mytestdb')
     328        self.assertTrue(WAEUP_KEY not in self.root1)
     329        self.assertTrue(WAEUP_KEY not in self.root2)
     330        installer(self.fake_event2)
     331        # now the job container is in database2 (but not database1)
     332        self.assertTrue(WAEUP_KEY not in self.root1)
     333        self.assertTrue(WAEUP_KEY in self.root2)
     334        self.assertTrue(IJobContainer.providedBy(self.root2[WAEUP_KEY]))
     335        return
     336
     337    def test_broken_db(self):
     338        # if writing to db fails, we close the connection anyway...
     339        installer = JobContainerInstaller()
     340        self.conn1.close()
     341        # create a fake db
     342        db = FakeDB()
     343        self.fake_event1._storage = 'dummy'
     344        self.fake_event1._db = db
     345        self.assertEqual(db.closed, False)
     346        self.assertRaises(AttributeError, installer, self.fake_event1)
     347        self.assertEqual(db.closed, True)
     348        return
Note: See TracChangeset for help on using the changeset viewer.