Changeset 9090 for main/waeup.kofa/branches/uli-zc-async
- Timestamp:
- 8 Aug 2012, 13:27:57 (12 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
main/waeup.kofa/branches/uli-zc-async/src/waeup/kofa/utils/tests/test_batching.py
r8380 r9090 24 24 import unittest 25 25 from zope import schema 26 from zope.component import provideUtility 26 from zope.component import provideUtility, getGlobalSiteManager 27 27 from zope.component.factory import Factory 28 28 from zope.component.hooks import clearSite … … 32 32 from waeup.kofa.interfaces import ICSVExporter, IBatchProcessor 33 33 from waeup.kofa.testing import FunctionalLayer, FunctionalTestCase 34 from waeup.kofa.utils.batching import ExporterBase 34 from waeup.kofa.utils.batching import ( 35 ExporterBase, BatchProcessor, export_job) 35 36 36 37 optionflags = ( … … 71 72 self.owner = owner 72 73 self.taxpayer = taxpayer 73 #Cave = attrs_to_fields(Cave)74 74 75 75 stoneville = dict 76 76 77 from waeup.kofa.utils.batching import BatchProcessor 77 SAMPLE_DATA = """name,dinoports,owner,taxpayer 78 Barneys Home,2,Barney,1 79 Wilmas Asylum,1,Wilma,1 80 Freds Dinoburgers,10,Fred,0 81 Joeys Drive-in,110,Joey,0 82 """ 83 78 84 class CaveProcessor(BatchProcessor): 79 85 util_name = 'caveprocessor' 80 #grok.name(util_name)81 86 name = 'Cave Processor' 82 87 iface = ICave … … 136 141 # Provide sample data 137 142 self.newcomers_csv = os.path.join(self.workdir, 'newcomers.csv') 138 open(self.newcomers_csv, 'wb').write( 139 """name,dinoports,owner,taxpayer 140 Barneys Home,2,Barney,1 141 Wilmas Asylum,1,Wilma,1 142 Freds Dinoburgers,10,Fred,0 143 Joeys Drive-in,110,Joey,0 144 """) 143 open(self.newcomers_csv, 'wb').write(SAMPLE_DATA) 145 144 self.setupLogger() 146 145 self.stoneville = stoneville … … 336 335 self.assertEqual(result, None) 337 336 return 337 338 339 class CaveExporter(ExporterBase): 340 # A minimal fake exporter suitable to be called by export_jobs 341 fields = ('name', 'dinoports', 'owner', 'taxpayer') 342 343 def export_all(self, site, filepath=None): 344 if filepath is None: 345 return SAMPLE_DATA 346 open(filepath, 'wb').write(SAMPLE_DATA) 347 return 348 349 class ExportJobTests(unittest.TestCase): 350 # Test asynchronous export functionality (simple cases) 351 352 def setUp(self): 353 # register a suitable ICSVExporter as named utility 354 self.exporter = CaveExporter() 355 self.gsm = getGlobalSiteManager() 356 self.gsm.registerUtility( 357 self.exporter, ICSVExporter, name='cave_exporter') 358 359 def tearDown(self): 360 self.gsm.unregisterUtility(self.exporter) 361 362 def test_export_job_func(self): 363 # the export_job func does really export data... 364 result_path = export_job(None, 'cave_exporter') 365 self.assertTrue(os.path.isfile(result_path)) 366 contents = open(result_path, 'rb').read() 367 shutil.rmtree(os.path.dirname(result_path)) 368 self.assertEqual(contents, SAMPLE_DATA) 369 return
Note: See TracChangeset for help on using the changeset viewer.