Changeset 9342
- Timestamp:
- 16 Oct 2012, 09:42:18 (12 years ago)
- Location:
- main/waeup.kofa/trunk/src/waeup/kofa
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
main/waeup.kofa/trunk/src/waeup/kofa/testing.py
r9289 r9342 29 29 import zope.component 30 30 import waeup.kofa 31 from zc.async.interfaces import COMPLETED 31 32 from zope.app.testing.functional import ( 32 33 ZCMLLayer, FunctionalTestSetup, getRootFolder, sync, FunctionalTestCase) … … 430 431 suite.addTest(test) 431 432 return suite 433 434 class FakeJob(object): 435 # A job usable for simple async tests 436 status = COMPLETED 437 result = None 438 439 class FakeJobManager(object): 440 # A fake job manager for testing async functionality 441 _jobs = dict() 442 _curr_num = 1 443 444 def get(self, job_id): 445 if job_id == '3': 446 return FakeJob() 447 return self._jobs.get(job_id, None) 448 449 def put(self, job): 450 num = str(self._curr_num) 451 self._jobs[num] = job 452 self._curr_num += 1 453 return num 454 455 def remove(self, job_id, site): 456 if job_id in self._jobs: 457 del self._jobs[job_id] 458 return -
main/waeup.kofa/trunk/src/waeup/kofa/utils/tests/test_batching.py
r9283 r9342 23 23 import tempfile 24 24 import unittest 25 from zc.async.interfaces import IJob , COMPLETED25 from zc.async.interfaces import IJob 26 26 from zope import schema 27 27 from zope.component import provideUtility, getGlobalSiteManager … … 34 34 ICSVExporter, IBatchProcessor, IExportJobContainer, IJobManager, 35 35 IExportJob) 36 from waeup.kofa.testing import FunctionalLayer, FunctionalTestCase 36 from waeup.kofa.testing import ( 37 FunctionalLayer, FunctionalTestCase, FakeJob, FakeJobManager) 37 38 from waeup.kofa.utils.batching import ( 38 39 ExporterBase, BatchProcessor, export_job, AsyncExportJob, … … 384 385 385 386 386 class FakeJob(object):387 388 status = COMPLETED389 result = None390 391 387 class FakeJobWithResult(FakeJob): 392 388 … … 395 391 self.result = os.path.join(self.dir_path, 'fake.csv') 396 392 open(self.result, 'wb').write('a fake result') 397 return398 399 class FakeJobManager(object):400 401 _jobs = dict()402 _curr_num = 1403 404 def get(self, job_id):405 if job_id == '3':406 return FakeJob()407 return self._jobs.get(job_id, None)408 409 def put(self, job):410 num = str(self._curr_num)411 self._jobs[num] = job412 self._curr_num += 1413 return num414 415 def remove(self, job_id, site):416 if job_id in self._jobs:417 del self._jobs[job_id]418 393 return 419 394
Note: See TracChangeset for help on using the changeset viewer.