Changeset 9342 for main/waeup.kofa/trunk/src/waeup/kofa/utils
- Timestamp:
- 16 Oct 2012, 09:42:18 (12 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
main/waeup.kofa/trunk/src/waeup/kofa/utils/tests/test_batching.py
r9283 r9342 23 23 import tempfile 24 24 import unittest 25 from zc.async.interfaces import IJob , COMPLETED25 from zc.async.interfaces import IJob 26 26 from zope import schema 27 27 from zope.component import provideUtility, getGlobalSiteManager … … 34 34 ICSVExporter, IBatchProcessor, IExportJobContainer, IJobManager, 35 35 IExportJob) 36 from waeup.kofa.testing import FunctionalLayer, FunctionalTestCase 36 from waeup.kofa.testing import ( 37 FunctionalLayer, FunctionalTestCase, FakeJob, FakeJobManager) 37 38 from waeup.kofa.utils.batching import ( 38 39 ExporterBase, BatchProcessor, export_job, AsyncExportJob, … … 384 385 385 386 386 class FakeJob(object):387 388 status = COMPLETED389 result = None390 391 387 class FakeJobWithResult(FakeJob): 392 388 … … 395 391 self.result = os.path.join(self.dir_path, 'fake.csv') 396 392 open(self.result, 'wb').write('a fake result') 397 return398 399 class FakeJobManager(object):400 401 _jobs = dict()402 _curr_num = 1403 404 def get(self, job_id):405 if job_id == '3':406 return FakeJob()407 return self._jobs.get(job_id, None)408 409 def put(self, job):410 num = str(self._curr_num)411 self._jobs[num] = job412 self._curr_num += 1413 return num414 415 def remove(self, job_id, site):416 if job_id in self._jobs:417 del self._jobs[job_id]418 393 return 419 394
Note: See TracChangeset for help on using the changeset viewer.