Ignore:
Timestamp:
16 Oct 2012, 09:42:18 (12 years ago)
Author:
uli
Message:

Move fake job components to a more general location.

Location:
main/waeup.kofa/trunk/src/waeup/kofa
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • main/waeup.kofa/trunk/src/waeup/kofa/testing.py

    r9289 r9342  
    2929import zope.component
    3030import waeup.kofa
     31from zc.async.interfaces import COMPLETED
    3132from zope.app.testing.functional import (
    3233    ZCMLLayer, FunctionalTestSetup, getRootFolder, sync, FunctionalTestCase)
     
    430431        suite.addTest(test)
    431432    return suite
     433
     434class FakeJob(object):
     435    # A job usable for simple async tests
     436    status = COMPLETED
     437    result = None
     438
     439class FakeJobManager(object):
     440    # A fake job manager for testing async functionality
     441    _jobs = dict()
     442    _curr_num = 1
     443
     444    def get(self, job_id):
     445        if job_id == '3':
     446            return FakeJob()
     447        return self._jobs.get(job_id, None)
     448
     449    def put(self, job):
     450        num = str(self._curr_num)
     451        self._jobs[num] = job
     452        self._curr_num += 1
     453        return num
     454
     455    def remove(self, job_id, site):
     456        if job_id in self._jobs:
     457            del self._jobs[job_id]
     458        return
  • main/waeup.kofa/trunk/src/waeup/kofa/utils/tests/test_batching.py

    r9283 r9342  
    2323import tempfile
    2424import unittest
    25 from zc.async.interfaces import IJob, COMPLETED
     25from zc.async.interfaces import IJob
    2626from zope import schema
    2727from zope.component import provideUtility, getGlobalSiteManager
     
    3434    ICSVExporter, IBatchProcessor, IExportJobContainer, IJobManager,
    3535    IExportJob)
    36 from waeup.kofa.testing import FunctionalLayer, FunctionalTestCase
     36from waeup.kofa.testing import (
     37    FunctionalLayer, FunctionalTestCase, FakeJob, FakeJobManager)
    3738from waeup.kofa.utils.batching import (
    3839    ExporterBase, BatchProcessor, export_job, AsyncExportJob,
     
    384385
    385386
    386 class FakeJob(object):
    387 
    388     status = COMPLETED
    389     result = None
    390 
    391387class FakeJobWithResult(FakeJob):
    392388
     
    395391        self.result = os.path.join(self.dir_path, 'fake.csv')
    396392        open(self.result, 'wb').write('a fake result')
    397         return
    398 
    399 class FakeJobManager(object):
    400 
    401     _jobs = dict()
    402     _curr_num = 1
    403 
    404     def get(self, job_id):
    405         if job_id == '3':
    406             return FakeJob()
    407         return self._jobs.get(job_id, None)
    408 
    409     def put(self, job):
    410         num = str(self._curr_num)
    411         self._jobs[num] = job
    412         self._curr_num += 1
    413         return num
    414 
    415     def remove(self, job_id, site):
    416         if job_id in self._jobs:
    417             del self._jobs[job_id]
    418393        return
    419394
Note: See TracChangeset for help on using the changeset viewer.