source: main/waeup.kofa/trunk/src/waeup/kofa/tests/test_async.py @ 17937

Last change on this file since 17937 was 12110, checked in by Henrik Bettermann, 10 years ago

svn propset svn:keywords "Id"

  • Property svn:keywords set to Id
File size: 14.0 KB
RevLine 
[9217]1# Tests for the async module of waeup.kofa
2import os
3import re
4import shutil
5import tempfile
6import transaction
7import zc.async.configure
8import zc.async.ftesting
9import zc.async.queue
10from zc.async.interfaces import IJob
11from ZODB import DB, FileStorage
12from zope.component import getUtility, provideAdapter
13from zope.component.hooks import setSite
14from zope.container.interfaces import IContainer
15from zope.interface import verify
[9793]16from zope.site.folder import rootFolder
17from zope.site.testing import createSiteManager
[9217]18from waeup.kofa.interfaces import (
19    WAEUP_KEY, IProgressable, IJobManager, IJobContainer
20    )
21from waeup.kofa.testing import FunctionalTestCase, FunctionalLayer
22from waeup.kofa.async import (
23    get_job_id, AsyncJob, ProgressableJob, JobManager, JobContainer,
24    JobManagerTraverser, JobContainerInstaller,
25    )
26
27def fake_async_func1():
28    return 23
29
30def fake_async_func2():
31    return 42
32
33class FunctionalAsyncTestCase(FunctionalTestCase):
34    """A functional test case that additionally sets up asynchronous
35    machinery.
36
37    These type of test case takes _much_ more time than regular
38    functional tests! So, only use it if really neccessary.
39    """
40
41    def setUp(self):
42        super(FunctionalAsyncTestCase, self).setUp()
43        zc.async.configure.base()
44        provideAdapter(zc.async.queue.getDefaultQueue)
45        connection = self.getRootFolder()._p_jar
46        zc.async.ftesting.setUp(connection=connection)
47        return
48
49    def tearDown(self):
50        zc.async.ftesting.tearDown()
51        super(FunctionalAsyncTestCase, self).tearDown()
52        return
53
54class CommonAsyncFunctionalTests(FunctionalTestCase):
55
56    layer = FunctionalLayer
57
58    def test_iface_async_job(self):
59        # AsyncJob implements the promised interfaces
60        job = AsyncJob(fake_async_func1)
61        verify.verifyClass(IJob, AsyncJob)
62        verify.verifyObject(IJob, job)
63        return
64
65    def test_iface_progressable_job(self):
66        # ProgressableJob implements the promised interfaces
67        job = ProgressableJob(fake_async_func1)
68        verify.verifyClass(IJob, ProgressableJob)
69        verify.verifyClass(IProgressable, ProgressableJob)
70        verify.verifyObject(IJob, job)
71        verify.verifyObject(IProgressable, job)
72        return
73
74    def test_get_job_id(self):
75        # for persisted jobs we can get an oid as job id
76        job = AsyncJob(fake_async_func1)
77        self.getRootFolder()['myjob'] = job
78        transaction.commit()
79        oid = job.__repr__().split(' ')[2][:-1]
80        result = get_job_id(job)
81        self.assertEqual(result, oid)
82        return
83
84class JobContainerTests(FunctionalTestCase):
85
86    layer = FunctionalLayer
87
88    def test_iface_job_container(self):
89        # JobContainers implement the promised interfaces
90        container = JobContainer()
91        verify.verifyClass(IContainer, JobContainer)
92        verify.verifyObject(IContainer, container)
93        verify.verifyClass(IJobContainer, JobContainer)
94        verify.verifyObject(IJobContainer, container)
95        return
96
97class JobManagerTests(FunctionalAsyncTestCase):
98
99    layer = FunctionalLayer
100
101    def setUp(self):
102        super(JobManagerTests, self).setUp()
103        self.root_folder = self.getRootFolder()
104        self.root = self.root_folder._p_jar.root()
105
[9794]106    def set_site(self):
107        # helper to set a site 'externally'.
108        # This is a bit complicated under the given limited setup conditions.
109        # Returns a persisted site.
110        site = rootFolder()
111        self.root_folder['application'] = site
112        createSiteManager(site, setsite=True)
113        transaction.commit()
114        return site
115
[9217]116    def test_iface_job_manager(self):
117        # JobManager implements the promised interfaces
118        manager = JobManager()
119        verify.verifyClass(IJobManager, JobManager)
120        verify.verifyObject(IJobManager, manager)
121        return
122
123    def test_get_job_manager_as_util(self):
124        # we can get a job manager as global, unnamed utility
125        manager = getUtility(IJobManager)
126        self.assertTrue(isinstance(manager, JobManager))
127        return
128
[9793]129    def test_get_site_no_site(self):
130        # if no site is around, we get a lookup error
131        manager = getUtility(IJobManager)
132        self.assertRaises(LookupError, manager._get_site, None)
133        return
134
135    def test_get_site_invalid_site(self):
136        # if we pass an invalid site, we get a lookup error
137        manager = getUtility(IJobManager)
138        self.assertRaises(LookupError, manager._get_site, object())
139        return
140
141    def test_get_site_valid_site(self):
142        # if we pass a valid site, that will be respected
143        manager = getUtility(IJobManager)
144        result = manager._get_site(self.root_folder)
145        self.assertEqual(result, self.root_folder)
146        return
147
[9794]148    def test_get_site_ext_set_none_given(self):
149        # with an externally set site we don't have to pass in a site
[9793]150        manager = getUtility(IJobManager)
[9794]151        site = self.set_site()
152        result = manager._get_site(None)
153        self.assertEqual(result, site)
[9793]154        return
155
[9794]156    def test_get_site_ext_set_valid_site(self):
157        # with an externally set site we can pass in this site
158        manager = getUtility(IJobManager)
159        site = self.set_site()
160        result = manager._get_site(site)
161        self.assertEqual(result, site)
162        return
163
164    def test_get_site_ext_set_invalid_site(self):
165        # with an externally set site we can pass in invalid sites
166        manager = getUtility(IJobManager)
167        site = self.set_site()
168        result = manager._get_site(object())
169        self.assertEqual(result, site)
170        return
171
172    def test_get_site_ext_set_valid_obj(self):
173        # valid persisted objects override externally set sites
174        manager = getUtility(IJobManager)
175        site = self.set_site()
176        result = manager._get_site(self.root_folder)
177        self.assertEqual(result, self.root_folder)
178        return
179
[9217]180    def test_put(self):
181        # we can put jobs into the manager
182        myjob = AsyncJob(fake_async_func1)
183        manager = getUtility(IJobManager)
184        job_id = manager.put(myjob, site=self.root_folder)
185        # the returned job_id should be a string with a bunch of numbers
186        self.assertTrue(re.match('^\d+$', job_id))
187        # the job is also in the (new) jobs container
188        self.assertTrue(job_id in self.root[WAEUP_KEY].keys())
189        # the job is really stored
190        stored_job =self.root[WAEUP_KEY][job_id]
191        self.assertEqual(myjob, stored_job)
192        return
193
194    def test_put_no_site(self):
195        # if no site is around we cannot store jobs
196        myjob = AsyncJob(fake_async_func1)
197        manager = getUtility(IJobManager)
198        self.assertRaises(LookupError, manager.put, myjob)
199        return
200
201    def test_get(self):
202        # we can get a job back with a valid job_id
203        manager = getUtility(IJobManager)
204        myjob = AsyncJob(fake_async_func1)
205        job_id = manager.put(myjob, site=self.root_folder)
206        stored_job = manager.get(job_id, site=self.root_folder)
207        self.assertEqual(myjob, stored_job)
208        return
209
210    def test_get_no_site(self):
211        # if no site is around we cannot retrieve jobs
212        manager = getUtility(IJobManager)
213        self.assertRaises(LookupError, manager.get, 'some_id')
214        return
215
216    def test_get_invalid_jobid(self):
217        # invalid job_ids will result in `None` (if site is valid)
218        manager = getUtility(IJobManager)
219        result = manager.get('not-a-valid-job-id', site=self.root_folder)
220        self.assertEqual(result, None)
221        return
222
223    def test_jobs(self):
224        # we can get all jobs contained
225        manager = getUtility(IJobManager)
226        myjob1 = AsyncJob(fake_async_func1)
227        job_id1 = manager.put(myjob1, site=self.root_folder)
228        myjob2 = AsyncJob(fake_async_func2)
229        job_id2 = manager.put(myjob2, site=self.root_folder)
230        result = sorted(list(manager.jobs(site=self.root_folder)))
231        self.assertEqual(
232            result,
233            [(job_id1, myjob1), (job_id2, myjob2)]
234            )
235        return
236
237    def test_remove(self):
238        # we can remove jobs from the job container
239        manager = getUtility(IJobManager)
240        myjob = AsyncJob(fake_async_func1)
241        job_id = manager.put(myjob, site=self.root_folder)
242        container = self.root[WAEUP_KEY]
243        self.assertEqual(len(container), 1)
244        manager.remove(job_id, site=self.root_folder)
245        self.assertEqual(len(container), 0)
246        return
247
248    def test_start_test_job(self):
249        # we can start a test job
250        manager = getUtility(IJobManager)
251        job_id = manager.start_test_job(duration=1, site=self.root_folder)
252        container = self.root[WAEUP_KEY]
253        self.assertEqual(len(container), 1)
254        self.assertTrue(job_id is not None)
255        return
256
257class JobManagerTraverserTests(FunctionalAsyncTestCase):
258
259    layer = FunctionalLayer
260
261    def setUp(self):
262        super(JobManagerTraverserTests, self).setUp()
263        self.root_folder = self.getRootFolder()
264        self.root = self.root_folder._p_jar.root()
265        self.manager = getUtility(IJobManager)
266        setSite(self.getRootFolder())
267
268    def tearDown(self):
269        setSite(None)
270        super(JobManagerTraverserTests, self).tearDown()
271
272    def test_no_jobs(self):
273        # the traverser returns None if no job is available
274        traverser = JobManagerTraverser(self.manager, None)
275        result = traverser.traverse('123')
276        self.assertTrue(result is None)
277        return
278
279    def test_valid_job(self):
280        # the traverser returns the appropriate job if available
281        myjob = AsyncJob(fake_async_func1)
282        job_id = self.manager.put(myjob, site=self.root_folder)
283        traverser = JobManagerTraverser(self.manager, None)
284        result = traverser.traverse(job_id)
285        self.assertEqual(myjob, result)
286        return
287
288class FakeEvent(object):
289    # A faked DatabaseOpenedEvent as triggered at instance startup.
290    # The only attribute we need here is `database` which should
291    # return a valid ZODB.
292
293    _storage = None
294    _db = None
295
296    def __init__(self, workdir, db_name=None):
297        # workdir is the place to create a ZODB in, db_name its name.
298        self.workdir = workdir
299        self.db_name = db_name
300
301    @property
302    def database(self):
303        if self._storage is None:
304            # storage not created yet. Do it now.
305            path = os.path.join(self.workdir, 'testdb.fs')
306            self._storage = FileStorage.FileStorage(path)
307            self._db = DB(self._storage, database_name=self.db_name)
308        return self._db
309
310class FakeDB(object):
311
312    closed = False
313
314    def open(self, transaction_manager=None):
315        return self
316
317    def close(self):
318        self.closed = True
319        return
320
321class JobContainerInstallerTests(FunctionalTestCase):
322
323    layer = FunctionalLayer
324
325    def setUp(self):
326        super(JobContainerInstallerTests, self).setUp()
327        # setup two ZODBs, one unnamed, one named...
328        self.workdir1 = tempfile.mkdtemp()
329        self.workdir2 = tempfile.mkdtemp()
330        self.fake_event1 = FakeEvent(self.workdir1)
331        self.fake_event2 = FakeEvent(self.workdir2, 'mytestdb')
332        self.tm1 = transaction.TransactionManager()
333        self.tm2 = transaction.TransactionManager()
334        self.db1 = self.fake_event1.database
335        self.db2 = self.fake_event2.database
336        self.conn1 = self.db1.open(self.tm1)
337        self.conn2 = self.db2.open(self.tm2)
338
339    def tearDown(self):
340        self.tm1.abort()
341        self.tm2.abort()
342        shutil.rmtree(self.workdir1)
343        shutil.rmtree(self.workdir2)
344        super(JobContainerInstallerTests, self).tearDown()
345
346    def reopen(self):
347        # reopen connections to the ZODBs (for retrieving fresh data)...
348        self.conn1.close()
349        self.conn2.close()
350        self.conn1 = self.db1.open(self.tm1)
351        self.conn2 = self.db2.open(self.tm2)
352        return
353
354    @property
355    def root1(self):
356        # get root of first (unnamed) database
357        self.reopen()
358        return self.conn1.root()
359
360    @property
361    def root2(self):
362        # get root of second (named 'mytestdb') database
363        self.reopen()
364        return self.conn2.root()
365
366    def test_new_container(self):
367        # we can install a job container if none is available yet
368        installer = JobContainerInstaller()
369        self.assertTrue(WAEUP_KEY not in self.root1)
370        installer(self.fake_event1)
371        self.assertTrue(WAEUP_KEY in self.root1)
372        self.assertTrue(IJobContainer.providedBy(self.root1[WAEUP_KEY]))
373        return
374
375    def test_existing_container(self):
376        # we won't try to install a new container if one is already present
377        installer = JobContainerInstaller()
378        self.assertTrue(WAEUP_KEY not in self.root1)
379        installer(self.fake_event1)
380        self.assertTrue(WAEUP_KEY in self.root1)
381        self.assertTrue(IJobContainer.providedBy(self.root1[WAEUP_KEY]))
382        container = self.root1[WAEUP_KEY]
383        installer(self.fake_event1)
384        self.assertTrue(WAEUP_KEY in self.root1)
385        self.assertTrue(self.root1[WAEUP_KEY] is container)
386        return
387
388    def test_named_db(self):
389        # we can also install containers in named dbs
390        installer = JobContainerInstaller(db_name='mytestdb')
391        self.assertTrue(WAEUP_KEY not in self.root1)
392        self.assertTrue(WAEUP_KEY not in self.root2)
393        installer(self.fake_event2)
394        # now the job container is in database2 (but not database1)
395        self.assertTrue(WAEUP_KEY not in self.root1)
396        self.assertTrue(WAEUP_KEY in self.root2)
397        self.assertTrue(IJobContainer.providedBy(self.root2[WAEUP_KEY]))
398        return
399
400    def test_broken_db(self):
401        # if writing to db fails, we close the connection anyway...
402        installer = JobContainerInstaller()
403        self.conn1.close()
404        # create a fake db
405        db = FakeDB()
406        self.fake_event1._storage = 'dummy'
407        self.fake_event1._db = db
408        self.assertEqual(db.closed, False)
409        self.assertRaises(AttributeError, installer, self.fake_event1)
410        self.assertEqual(db.closed, True)
411        return
Note: See TracBrowser for help on using the repository browser.