source: main/waeup.kofa/trunk/src/waeup/kofa/tests/test_async.py @ 9721

Last change on this file since 9721 was 9217, checked in by uli, 12 years ago

Merge changes from uli-async-update back into trunk.

File size: 11.7 KB
RevLine 
[9217]1# Tests for the async module of waeup.kofa
2import os
3import re
4import shutil
5import tempfile
6import transaction
7import zc.async.configure
8import zc.async.ftesting
9import zc.async.queue
10from zc.async.interfaces import IJob
11from ZODB import DB, FileStorage
12from zope.component import getUtility, provideAdapter
13from zope.component.hooks import setSite
14from zope.container.interfaces import IContainer
15from zope.interface import verify
16from waeup.kofa.interfaces import (
17    WAEUP_KEY, IProgressable, IJobManager, IJobContainer
18    )
19from waeup.kofa.testing import FunctionalTestCase, FunctionalLayer
20from waeup.kofa.async import (
21    get_job_id, AsyncJob, ProgressableJob, JobManager, JobContainer,
22    JobManagerTraverser, JobContainerInstaller,
23    )
24
25def fake_async_func1():
26    return 23
27
28def fake_async_func2():
29    return 42
30
31class FunctionalAsyncTestCase(FunctionalTestCase):
32    """A functional test case that additionally sets up asynchronous
33    machinery.
34
35    These type of test case takes _much_ more time than regular
36    functional tests! So, only use it if really neccessary.
37    """
38
39    def setUp(self):
40        super(FunctionalAsyncTestCase, self).setUp()
41        zc.async.configure.base()
42        provideAdapter(zc.async.queue.getDefaultQueue)
43        connection = self.getRootFolder()._p_jar
44        zc.async.ftesting.setUp(connection=connection)
45        return
46
47    def tearDown(self):
48        zc.async.ftesting.tearDown()
49        super(FunctionalAsyncTestCase, self).tearDown()
50        return
51
52class CommonAsyncFunctionalTests(FunctionalTestCase):
53
54    layer = FunctionalLayer
55
56    def test_iface_async_job(self):
57        # AsyncJob implements the promised interfaces
58        job = AsyncJob(fake_async_func1)
59        verify.verifyClass(IJob, AsyncJob)
60        verify.verifyObject(IJob, job)
61        return
62
63    def test_iface_progressable_job(self):
64        # ProgressableJob implements the promised interfaces
65        job = ProgressableJob(fake_async_func1)
66        verify.verifyClass(IJob, ProgressableJob)
67        verify.verifyClass(IProgressable, ProgressableJob)
68        verify.verifyObject(IJob, job)
69        verify.verifyObject(IProgressable, job)
70        return
71
72    def test_get_job_id(self):
73        # for persisted jobs we can get an oid as job id
74        job = AsyncJob(fake_async_func1)
75        self.getRootFolder()['myjob'] = job
76        transaction.commit()
77        oid = job.__repr__().split(' ')[2][:-1]
78        result = get_job_id(job)
79        self.assertEqual(result, oid)
80        return
81
82class JobContainerTests(FunctionalTestCase):
83
84    layer = FunctionalLayer
85
86    def test_iface_job_container(self):
87        # JobContainers implement the promised interfaces
88        container = JobContainer()
89        verify.verifyClass(IContainer, JobContainer)
90        verify.verifyObject(IContainer, container)
91        verify.verifyClass(IJobContainer, JobContainer)
92        verify.verifyObject(IJobContainer, container)
93        return
94
95class JobManagerTests(FunctionalAsyncTestCase):
96
97    layer = FunctionalLayer
98
99    def setUp(self):
100        super(JobManagerTests, self).setUp()
101        self.root_folder = self.getRootFolder()
102        self.root = self.root_folder._p_jar.root()
103
104    def test_iface_job_manager(self):
105        # JobManager implements the promised interfaces
106        manager = JobManager()
107        verify.verifyClass(IJobManager, JobManager)
108        verify.verifyObject(IJobManager, manager)
109        return
110
111    def test_get_job_manager_as_util(self):
112        # we can get a job manager as global, unnamed utility
113        manager = getUtility(IJobManager)
114        self.assertTrue(isinstance(manager, JobManager))
115        return
116
117    def test_put(self):
118        # we can put jobs into the manager
119        myjob = AsyncJob(fake_async_func1)
120        manager = getUtility(IJobManager)
121        job_id = manager.put(myjob, site=self.root_folder)
122        # the returned job_id should be a string with a bunch of numbers
123        self.assertTrue(re.match('^\d+$', job_id))
124        # the job is also in the (new) jobs container
125        self.assertTrue(job_id in self.root[WAEUP_KEY].keys())
126        # the job is really stored
127        stored_job =self.root[WAEUP_KEY][job_id]
128        self.assertEqual(myjob, stored_job)
129        return
130
131    def test_put_no_site(self):
132        # if no site is around we cannot store jobs
133        myjob = AsyncJob(fake_async_func1)
134        manager = getUtility(IJobManager)
135        self.assertRaises(LookupError, manager.put, myjob)
136        return
137
138    def test_get(self):
139        # we can get a job back with a valid job_id
140        manager = getUtility(IJobManager)
141        myjob = AsyncJob(fake_async_func1)
142        job_id = manager.put(myjob, site=self.root_folder)
143        stored_job = manager.get(job_id, site=self.root_folder)
144        self.assertEqual(myjob, stored_job)
145        return
146
147    def test_get_no_site(self):
148        # if no site is around we cannot retrieve jobs
149        manager = getUtility(IJobManager)
150        self.assertRaises(LookupError, manager.get, 'some_id')
151        return
152
153    def test_get_invalid_jobid(self):
154        # invalid job_ids will result in `None` (if site is valid)
155        manager = getUtility(IJobManager)
156        result = manager.get('not-a-valid-job-id', site=self.root_folder)
157        self.assertEqual(result, None)
158        return
159
160    def test_jobs(self):
161        # we can get all jobs contained
162        manager = getUtility(IJobManager)
163        myjob1 = AsyncJob(fake_async_func1)
164        job_id1 = manager.put(myjob1, site=self.root_folder)
165        myjob2 = AsyncJob(fake_async_func2)
166        job_id2 = manager.put(myjob2, site=self.root_folder)
167        result = sorted(list(manager.jobs(site=self.root_folder)))
168        self.assertEqual(
169            result,
170            [(job_id1, myjob1), (job_id2, myjob2)]
171            )
172        return
173
174    def test_remove(self):
175        # we can remove jobs from the job container
176        manager = getUtility(IJobManager)
177        myjob = AsyncJob(fake_async_func1)
178        job_id = manager.put(myjob, site=self.root_folder)
179        container = self.root[WAEUP_KEY]
180        self.assertEqual(len(container), 1)
181        manager.remove(job_id, site=self.root_folder)
182        self.assertEqual(len(container), 0)
183        return
184
185    def test_start_test_job(self):
186        # we can start a test job
187        manager = getUtility(IJobManager)
188        job_id = manager.start_test_job(duration=1, site=self.root_folder)
189        container = self.root[WAEUP_KEY]
190        self.assertEqual(len(container), 1)
191        self.assertTrue(job_id is not None)
192        return
193
194class JobManagerTraverserTests(FunctionalAsyncTestCase):
195
196    layer = FunctionalLayer
197
198    def setUp(self):
199        super(JobManagerTraverserTests, self).setUp()
200        self.root_folder = self.getRootFolder()
201        self.root = self.root_folder._p_jar.root()
202        self.manager = getUtility(IJobManager)
203        setSite(self.getRootFolder())
204
205    def tearDown(self):
206        setSite(None)
207        super(JobManagerTraverserTests, self).tearDown()
208
209    def test_no_jobs(self):
210        # the traverser returns None if no job is available
211        traverser = JobManagerTraverser(self.manager, None)
212        result = traverser.traverse('123')
213        self.assertTrue(result is None)
214        return
215
216    def test_valid_job(self):
217        # the traverser returns the appropriate job if available
218        myjob = AsyncJob(fake_async_func1)
219        job_id = self.manager.put(myjob, site=self.root_folder)
220        traverser = JobManagerTraverser(self.manager, None)
221        result = traverser.traverse(job_id)
222        self.assertEqual(myjob, result)
223        return
224
225class FakeEvent(object):
226    # A faked DatabaseOpenedEvent as triggered at instance startup.
227    # The only attribute we need here is `database` which should
228    # return a valid ZODB.
229
230    _storage = None
231    _db = None
232
233    def __init__(self, workdir, db_name=None):
234        # workdir is the place to create a ZODB in, db_name its name.
235        self.workdir = workdir
236        self.db_name = db_name
237
238    @property
239    def database(self):
240        if self._storage is None:
241            # storage not created yet. Do it now.
242            path = os.path.join(self.workdir, 'testdb.fs')
243            self._storage = FileStorage.FileStorage(path)
244            self._db = DB(self._storage, database_name=self.db_name)
245        return self._db
246
247class FakeDB(object):
248
249    closed = False
250
251    def open(self, transaction_manager=None):
252        return self
253
254    def close(self):
255        self.closed = True
256        return
257
258class JobContainerInstallerTests(FunctionalTestCase):
259
260    layer = FunctionalLayer
261
262    def setUp(self):
263        super(JobContainerInstallerTests, self).setUp()
264        # setup two ZODBs, one unnamed, one named...
265        self.workdir1 = tempfile.mkdtemp()
266        self.workdir2 = tempfile.mkdtemp()
267        self.fake_event1 = FakeEvent(self.workdir1)
268        self.fake_event2 = FakeEvent(self.workdir2, 'mytestdb')
269        self.tm1 = transaction.TransactionManager()
270        self.tm2 = transaction.TransactionManager()
271        self.db1 = self.fake_event1.database
272        self.db2 = self.fake_event2.database
273        self.conn1 = self.db1.open(self.tm1)
274        self.conn2 = self.db2.open(self.tm2)
275
276    def tearDown(self):
277        self.tm1.abort()
278        self.tm2.abort()
279        shutil.rmtree(self.workdir1)
280        shutil.rmtree(self.workdir2)
281        super(JobContainerInstallerTests, self).tearDown()
282
283    def reopen(self):
284        # reopen connections to the ZODBs (for retrieving fresh data)...
285        self.conn1.close()
286        self.conn2.close()
287        self.conn1 = self.db1.open(self.tm1)
288        self.conn2 = self.db2.open(self.tm2)
289        return
290
291    @property
292    def root1(self):
293        # get root of first (unnamed) database
294        self.reopen()
295        return self.conn1.root()
296
297    @property
298    def root2(self):
299        # get root of second (named 'mytestdb') database
300        self.reopen()
301        return self.conn2.root()
302
303    def test_new_container(self):
304        # we can install a job container if none is available yet
305        installer = JobContainerInstaller()
306        self.assertTrue(WAEUP_KEY not in self.root1)
307        installer(self.fake_event1)
308        self.assertTrue(WAEUP_KEY in self.root1)
309        self.assertTrue(IJobContainer.providedBy(self.root1[WAEUP_KEY]))
310        return
311
312    def test_existing_container(self):
313        # we won't try to install a new container if one is already present
314        installer = JobContainerInstaller()
315        self.assertTrue(WAEUP_KEY not in self.root1)
316        installer(self.fake_event1)
317        self.assertTrue(WAEUP_KEY in self.root1)
318        self.assertTrue(IJobContainer.providedBy(self.root1[WAEUP_KEY]))
319        container = self.root1[WAEUP_KEY]
320        installer(self.fake_event1)
321        self.assertTrue(WAEUP_KEY in self.root1)
322        self.assertTrue(self.root1[WAEUP_KEY] is container)
323        return
324
325    def test_named_db(self):
326        # we can also install containers in named dbs
327        installer = JobContainerInstaller(db_name='mytestdb')
328        self.assertTrue(WAEUP_KEY not in self.root1)
329        self.assertTrue(WAEUP_KEY not in self.root2)
330        installer(self.fake_event2)
331        # now the job container is in database2 (but not database1)
332        self.assertTrue(WAEUP_KEY not in self.root1)
333        self.assertTrue(WAEUP_KEY in self.root2)
334        self.assertTrue(IJobContainer.providedBy(self.root2[WAEUP_KEY]))
335        return
336
337    def test_broken_db(self):
338        # if writing to db fails, we close the connection anyway...
339        installer = JobContainerInstaller()
340        self.conn1.close()
341        # create a fake db
342        db = FakeDB()
343        self.fake_event1._storage = 'dummy'
344        self.fake_event1._db = db
345        self.assertEqual(db.closed, False)
346        self.assertRaises(AttributeError, installer, self.fake_event1)
347        self.assertEqual(db.closed, True)
348        return
Note: See TracBrowser for help on using the repository browser.