source: main/waeup.kofa/branches/uli-zc-async/src/waeup/kofa/tests/test_async.py @ 9083

Last change on this file since 9083 was 9083, checked in by uli, 12 years ago

Add tests for async module.

File size: 6.0 KB
Line 
1# Tests for the async module of waeup.kofa
2import re
3import zc.async.configure
4import zc.async.ftesting
5import zc.async.queue
6from zc.async.interfaces import IJob
7from zope.component import getUtility, provideAdapter
8from zope.container.interfaces import IContainer
9from zope.interface import verify
10from waeup.kofa.interfaces import (
11    WAEUP_KEY, IProgressable, IJobManager, IJobContainer
12    )
13from waeup.kofa.testing import FunctionalTestCase, FunctionalLayer
14from waeup.kofa.async import (
15    get_job_id, AsyncJob, ProgressableJob, JobManager, JobContainer
16    )
17
18def fake_async_func1():
19    return 23
20
21def fake_async_func2():
22    return 42
23
24class FunctionalAsyncTestCase(FunctionalTestCase):
25    """A functional test case that additionally sets up asynchronous
26    machinery.
27
28    These type of test case takes _much_ more time than regular
29    functional tests! So, only use it if really neccessary.
30    """
31
32    def setUp(self):
33        super(FunctionalAsyncTestCase, self).setUp()
34        zc.async.configure.base()
35        provideAdapter(zc.async.queue.getDefaultQueue)
36        connection = self.getRootFolder()._p_jar
37        zc.async.ftesting.setUp(connection=connection)
38        return
39
40    def tearDown(self):
41        zc.async.ftesting.tearDown()
42        super(FunctionalAsyncTestCase, self).tearDown()
43        return
44
45class CommonAsyncFunctionalTests(FunctionalTestCase):
46
47    layer = FunctionalLayer
48
49    def test_iface_async_job(self):
50        # AsyncJob implements the promised interfaces
51        job = AsyncJob(fake_async_func1)
52        verify.verifyClass(IJob, AsyncJob)
53        verify.verifyObject(IJob, job)
54        return
55
56    def test_iface_progressable_job(self):
57        # ProgressableJob implements the promised interfaces
58        job = ProgressableJob(fake_async_func1)
59        verify.verifyClass(IJob, ProgressableJob)
60        verify.verifyClass(IProgressable, ProgressableJob)
61        verify.verifyObject(IJob, job)
62        verify.verifyObject(IProgressable, job)
63        return
64
65    def test_get_job_id(self):
66        # for persisted jobs we can get an oid as job id
67        job = AsyncJob(fake_async_func1)
68        self.getRootFolder()['myjob'] = job
69        import transaction
70        transaction.commit()
71        oid = job.__repr__().split(' ')[2][:-1]
72        result = get_job_id(job)
73        self.assertEqual(result, oid)
74        return
75
76class JobContainerTests(FunctionalTestCase):
77
78    layer = FunctionalLayer
79
80    def test_iface_job_container(self):
81        # JobContainers implement the promised interfaces
82        container = JobContainer()
83        verify.verifyClass(IContainer, JobContainer)
84        verify.verifyObject(IContainer, container)
85        verify.verifyClass(IJobContainer, JobContainer)
86        verify.verifyObject(IJobContainer, container)
87        return
88
89class JobManagerTests(FunctionalAsyncTestCase):
90
91    layer = FunctionalLayer
92
93    def setUp(self):
94        super(JobManagerTests, self).setUp()
95        self.root_folder = self.getRootFolder()
96        self.root = self.root_folder._p_jar.root()
97
98    def test_iface_job_manager(self):
99        # JobManager implements the promised interfaces
100        manager = JobManager()
101        verify.verifyClass(IJobManager, JobManager)
102        verify.verifyObject(IJobManager, manager)
103        return
104
105    def test_get_job_manager_as_util(self):
106        # we can get a job manager as global, unnamed utility
107        manager = getUtility(IJobManager)
108        self.assertTrue(isinstance(manager, JobManager))
109        return
110
111    def test_put(self):
112        # we can put jobs into the manager
113        myjob = AsyncJob(fake_async_func1)
114        manager = getUtility(IJobManager)
115        job_id = manager.put(myjob, site=self.root_folder)
116        # the returned job_id should be a string with a bunch of numbers
117        self.assertTrue(re.match('^\d+$', job_id))
118        # the job is also in the (new) jobs container
119        self.assertTrue(job_id in self.root[WAEUP_KEY].keys())
120        # the job is really stored
121        stored_job =self.root[WAEUP_KEY][job_id]
122        self.assertEqual(myjob, stored_job)
123        return
124
125    def test_put_no_site(self):
126        # if no site is around we cannot store jobs
127        myjob = AsyncJob(fake_async_func1)
128        manager = getUtility(IJobManager)
129        self.assertRaises(LookupError, manager.put, myjob)
130        return
131
132    def test_get(self):
133        # we can get a job back with a valid job_id
134        manager = getUtility(IJobManager)
135        myjob = AsyncJob(fake_async_func1)
136        job_id = manager.put(myjob, site=self.root_folder)
137        stored_job = manager.get(job_id, site=self.root_folder)
138        self.assertEqual(myjob, stored_job)
139        return
140
141    def test_get_no_site(self):
142        # if no site is around we cannot retrieve jobs
143        manager = getUtility(IJobManager)
144        self.assertRaises(LookupError, manager.get, 'some_id')
145        return
146
147    def test_get_invalid_jobid(self):
148        # invalid job_ids will result in `None` (if site is valid)
149        manager = getUtility(IJobManager)
150        result = manager.get('not-a-valid-job-id', site=self.root_folder)
151        self.assertEqual(result, None)
152        return
153
154    def test_jobs(self):
155        # we can get all jobs contained
156        manager = getUtility(IJobManager)
157        myjob1 = AsyncJob(fake_async_func1)
158        job_id1 = manager.put(myjob1, site=self.root_folder)
159        myjob2 = AsyncJob(fake_async_func2)
160        job_id2 = manager.put(myjob2, site=self.root_folder)
161        result = sorted(list(manager.jobs(site=self.root_folder)))
162        self.assertEqual(
163            result,
164            [(job_id1, myjob1), (job_id2, myjob2)]
165            )
166        return
167
168    def test_remove(self):
169        # we can remove jobs from the job container
170        manager = getUtility(IJobManager)
171        myjob = AsyncJob(fake_async_func1)
172        job_id = manager.put(myjob, site=self.root_folder)
173        container = self.root[WAEUP_KEY]
174        self.assertEqual(len(container), 1)
175        manager.remove(job_id, site=self.root_folder)
176        self.assertEqual(len(container), 0)
177        return
Note: See TracBrowser for help on using the repository browser.