Changeset 9084 for main/waeup.kofa/branches/uli-zc-async
- Timestamp:
- 6 Aug 2012, 17:10:53 (12 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
main/waeup.kofa/branches/uli-zc-async/src/waeup/kofa/tests/test_async.py
r9083 r9084 1 1 # Tests for the async module of waeup.kofa 2 import os 2 3 import re 4 import shutil 5 import tempfile 6 import transaction 3 7 import zc.async.configure 4 8 import zc.async.ftesting 5 9 import zc.async.queue 6 10 from zc.async.interfaces import IJob 11 from ZODB import DB, FileStorage 7 12 from zope.component import getUtility, provideAdapter 13 from zope.component.hooks import setSite 8 14 from zope.container.interfaces import IContainer 9 15 from zope.interface import verify … … 13 19 from waeup.kofa.testing import FunctionalTestCase, FunctionalLayer 14 20 from waeup.kofa.async import ( 15 get_job_id, AsyncJob, ProgressableJob, JobManager, JobContainer 21 get_job_id, AsyncJob, ProgressableJob, JobManager, JobContainer, 22 JobManagerTraverser, JobContainerInstaller, 16 23 ) 17 24 … … 67 74 job = AsyncJob(fake_async_func1) 68 75 self.getRootFolder()['myjob'] = job 69 import transaction70 76 transaction.commit() 71 77 oid = job.__repr__().split(' ')[2][:-1] … … 176 182 self.assertEqual(len(container), 0) 177 183 return 184 185 def test_start_test_job(self): 186 # we can start a test job 187 manager = getUtility(IJobManager) 188 job_id = manager.start_test_job(duration=1, site=self.root_folder) 189 container = self.root[WAEUP_KEY] 190 self.assertEqual(len(container), 1) 191 self.assertTrue(job_id is not None) 192 return 193 194 class JobManagerTraverserTests(FunctionalAsyncTestCase): 195 196 layer = FunctionalLayer 197 198 def setUp(self): 199 super(JobManagerTraverserTests, self).setUp() 200 self.root_folder = self.getRootFolder() 201 self.root = self.root_folder._p_jar.root() 202 self.manager = getUtility(IJobManager) 203 setSite(self.getRootFolder()) 204 205 def tearDown(self): 206 setSite(None) 207 super(JobManagerTraverserTests, self).tearDown() 208 209 def test_no_jobs(self): 210 # the traverser returns None if no job is available 211 traverser = JobManagerTraverser(self.manager, None) 212 result = traverser.traverse('123') 213 self.assertTrue(result is None) 214 return 215 216 def test_valid_job(self): 217 # the traverser returns the appropriate job if available 218 myjob = AsyncJob(fake_async_func1) 219 job_id = self.manager.put(myjob, site=self.root_folder) 220 traverser = JobManagerTraverser(self.manager, None) 221 result = traverser.traverse(job_id) 222 self.assertEqual(myjob, result) 223 return 224 225 class FakeEvent(object): 226 # A faked DatabaseOpenedEvent as triggered at instance startup. 227 # The only attribute we need here is `database` which should 228 # return a valid ZODB. 229 230 _storage = None 231 _db = None 232 233 def __init__(self, workdir, db_name=None): 234 # workdir is the place to create a ZODB in, db_name its name. 235 self.workdir = workdir 236 self.db_name = db_name 237 238 @property 239 def database(self): 240 if self._storage is None: 241 # storage not created yet. Do it now. 242 path = os.path.join(self.workdir, 'testdb.fs') 243 self._storage = FileStorage.FileStorage(path) 244 self._db = DB(self._storage, database_name=self.db_name) 245 return self._db 246 247 class FakeDB(object): 248 249 closed = False 250 251 def open(self, transaction_manager=None): 252 return self 253 254 def close(self): 255 self.closed = True 256 return 257 258 class JobContainerInstallerTests(FunctionalTestCase): 259 260 layer = FunctionalLayer 261 262 def setUp(self): 263 super(JobContainerInstallerTests, self).setUp() 264 # setup two ZODBs, one unnamed, one named... 265 self.workdir1 = tempfile.mkdtemp() 266 self.workdir2 = tempfile.mkdtemp() 267 self.fake_event1 = FakeEvent(self.workdir1) 268 self.fake_event2 = FakeEvent(self.workdir2, 'mytestdb') 269 self.tm1 = transaction.TransactionManager() 270 self.tm2 = transaction.TransactionManager() 271 self.db1 = self.fake_event1.database 272 self.db2 = self.fake_event2.database 273 self.conn1 = self.db1.open(self.tm1) 274 self.conn2 = self.db2.open(self.tm2) 275 276 def tearDown(self): 277 self.tm1.abort() 278 self.tm2.abort() 279 shutil.rmtree(self.workdir1) 280 shutil.rmtree(self.workdir2) 281 super(JobContainerInstallerTests, self).tearDown() 282 283 def reopen(self): 284 # reopen connections to the ZODBs (for retrieving fresh data)... 285 self.conn1.close() 286 self.conn2.close() 287 self.conn1 = self.db1.open(self.tm1) 288 self.conn2 = self.db2.open(self.tm2) 289 return 290 291 @property 292 def root1(self): 293 # get root of first (unnamed) database 294 self.reopen() 295 return self.conn1.root() 296 297 @property 298 def root2(self): 299 # get root of second (named 'mytestdb') database 300 self.reopen() 301 return self.conn2.root() 302 303 def test_new_container(self): 304 # we can install a job container if none is available yet 305 installer = JobContainerInstaller() 306 self.assertTrue(WAEUP_KEY not in self.root1) 307 installer(self.fake_event1) 308 self.assertTrue(WAEUP_KEY in self.root1) 309 self.assertTrue(IJobContainer.providedBy(self.root1[WAEUP_KEY])) 310 return 311 312 def test_existing_container(self): 313 # we won't try to install a new container if one is already present 314 installer = JobContainerInstaller() 315 self.assertTrue(WAEUP_KEY not in self.root1) 316 installer(self.fake_event1) 317 self.assertTrue(WAEUP_KEY in self.root1) 318 self.assertTrue(IJobContainer.providedBy(self.root1[WAEUP_KEY])) 319 container = self.root1[WAEUP_KEY] 320 installer(self.fake_event1) 321 self.assertTrue(WAEUP_KEY in self.root1) 322 self.assertTrue(self.root1[WAEUP_KEY] is container) 323 return 324 325 def test_named_db(self): 326 # we can also install containers in named dbs 327 installer = JobContainerInstaller(db_name='mytestdb') 328 self.assertTrue(WAEUP_KEY not in self.root1) 329 self.assertTrue(WAEUP_KEY not in self.root2) 330 installer(self.fake_event2) 331 # now the job container is in database2 (but not database1) 332 self.assertTrue(WAEUP_KEY not in self.root1) 333 self.assertTrue(WAEUP_KEY in self.root2) 334 self.assertTrue(IJobContainer.providedBy(self.root2[WAEUP_KEY])) 335 return 336 337 def test_broken_db(self): 338 # if writing to db fails, we close the connection anyway... 339 installer = JobContainerInstaller() 340 self.conn1.close() 341 # create a fake db 342 db = FakeDB() 343 self.fake_event1._storage = 'dummy' 344 self.fake_event1._db = db 345 self.assertEqual(db.closed, False) 346 self.assertRaises(AttributeError, installer, self.fake_event1) 347 self.assertEqual(db.closed, True) 348 return
Note: See TracChangeset for help on using the changeset viewer.