Changeset 9217 for main/waeup.kofa/trunk/src/waeup/kofa/utils/tests
- Timestamp:
- 21 Sep 2012, 11:21:05 (12 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
main/waeup.kofa/trunk/src/waeup/kofa/utils/tests/test_batching.py
r8380 r9217 23 23 import tempfile 24 24 import unittest 25 from zc.async.interfaces import IJob, COMPLETED 25 26 from zope import schema 26 from zope.component import provideUtility 27 from zope.component import provideUtility, getGlobalSiteManager 27 28 from zope.component.factory import Factory 28 29 from zope.component.hooks import clearSite … … 30 31 from zope.interface import Interface, implements, verify 31 32 from waeup.kofa.app import University 32 from waeup.kofa.interfaces import ICSVExporter, IBatchProcessor 33 from waeup.kofa.interfaces import ( 34 ICSVExporter, IBatchProcessor, IExportJobContainer, IJobManager, 35 IExportJob) 33 36 from waeup.kofa.testing import FunctionalLayer, FunctionalTestCase 34 from waeup.kofa.utils.batching import ExporterBase 37 from waeup.kofa.utils.batching import ( 38 ExporterBase, BatchProcessor, export_job, AsyncExportJob, 39 ExportJobContainer) 35 40 36 41 optionflags = ( … … 71 76 self.owner = owner 72 77 self.taxpayer = taxpayer 73 #Cave = attrs_to_fields(Cave)74 78 75 79 stoneville = dict 76 80 77 from waeup.kofa.utils.batching import BatchProcessor 81 SAMPLE_DATA = """name,dinoports,owner,taxpayer 82 Barneys Home,2,Barney,1 83 Wilmas Asylum,1,Wilma,1 84 Freds Dinoburgers,10,Fred,0 85 Joeys Drive-in,110,Joey,0 86 """ 87 78 88 class CaveProcessor(BatchProcessor): 79 89 util_name = 'caveprocessor' 80 #grok.name(util_name)81 90 name = 'Cave Processor' 82 91 iface = ICave … … 136 145 # Provide sample data 137 146 self.newcomers_csv = os.path.join(self.workdir, 'newcomers.csv') 138 open(self.newcomers_csv, 'wb').write( 139 """name,dinoports,owner,taxpayer 140 Barneys Home,2,Barney,1 141 Wilmas Asylum,1,Wilma,1 142 Freds Dinoburgers,10,Fred,0 143 Joeys Drive-in,110,Joey,0 144 """) 147 open(self.newcomers_csv, 'wb').write(SAMPLE_DATA) 145 148 self.setupLogger() 146 149 self.stoneville = stoneville … … 336 339 self.assertEqual(result, None) 337 340 return 341 342 343 class CaveExporter(ExporterBase): 344 # A minimal fake exporter suitable to be called by export_jobs 345 fields = ('name', 'dinoports', 'owner', 'taxpayer') 346 title = u'Dummy cave exporter' 347 348 def export_all(self, site, filepath=None): 349 if filepath is None: 350 return SAMPLE_DATA 351 open(filepath, 'wb').write(SAMPLE_DATA) 352 return 353 354 class ExportJobTests(unittest.TestCase): 355 # Test asynchronous export functionality (simple cases) 356 357 def setUp(self): 358 # register a suitable ICSVExporter as named utility 359 self.exporter = CaveExporter() 360 self.gsm = getGlobalSiteManager() 361 self.gsm.registerUtility( 362 self.exporter, ICSVExporter, name='cave_exporter') 363 364 def tearDown(self): 365 self.gsm.unregisterUtility(self.exporter) 366 367 def test_export_job_func(self): 368 # the export_job func does really export data... 369 result_path = export_job(None, 'cave_exporter') 370 self.assertTrue(os.path.isfile(result_path)) 371 contents = open(result_path, 'rb').read() 372 shutil.rmtree(os.path.dirname(result_path)) 373 self.assertEqual(contents, SAMPLE_DATA) 374 return 375 376 def test_export_job_interfaces(self): 377 # the AsyncExportJob implements promised interfaces correctly... 378 job = AsyncExportJob(None, None) 379 verify.verifyClass(IJob, AsyncExportJob) 380 verify.verifyObject(IJob, job) 381 verify.verifyClass(IExportJob, AsyncExportJob) 382 verify.verifyObject(IExportJob, job) 383 return 384 385 386 class FakeJob(object): 387 388 status = COMPLETED 389 result = None 390 391 class FakeJobWithResult(FakeJob): 392 393 def __init__(self): 394 dir_path = tempfile.mkdtemp() 395 self.result = os.path.join(dir_path, 'fake.csv') 396 open(self.result, 'wb').write('a fake result') 397 return 398 399 class FakeJobManager(object): 400 401 _jobs = dict() 402 _curr_num = 1 403 404 def get(self, job_id): 405 if job_id == '3': 406 return FakeJob() 407 return self._jobs.get(job_id, None) 408 409 def put(self, job): 410 num = str(self._curr_num) 411 self._jobs[num] = job 412 self._curr_num += 1 413 return num 414 415 def remove(self, job_id, site): 416 if job_id in self._jobs: 417 del self._jobs[job_id] 418 return 419 420 class ExportJobContainerTests(unittest.TestCase): 421 # Test ExportJobContainer 422 423 def setUp(self): 424 # register a suitable ICSVExporter as named utility 425 self.exporter = CaveExporter() 426 self.job_manager = FakeJobManager() 427 self.gsm = getGlobalSiteManager() 428 self.gsm.registerUtility( 429 self.exporter, ICSVExporter, name='cave_exporter') 430 self.gsm.registerUtility( 431 self.job_manager, IJobManager) 432 433 def tearDown(self): 434 self.gsm.unregisterUtility(self.exporter) 435 self.gsm.unregisterUtility(self.job_manager, IJobManager) 436 437 def test_export_job_interfaces(self): 438 # the ExportJobContainer implements promised interfaces correctly... 439 container = ExportJobContainer() 440 verify.verifyClass(IExportJobContainer, ExportJobContainer) 441 verify.verifyObject(IExportJobContainer, container) 442 return 443 444 def test_start_export_job(self): 445 # we can start jobs 446 container = ExportJobContainer() 447 container.start_export_job('cave_exporter', 'bob') 448 result = self.job_manager._jobs.values()[0] 449 self.assertTrue(IJob.providedBy(result)) 450 self.assertEqual( 451 container.running_exports, 452 [('1', 'cave_exporter', 'bob')] 453 ) 454 return 455 456 def test_get_running_export_jobs_all(self): 457 # we can get export jobs of all users 458 container = ExportJobContainer() 459 container.start_export_job('cave_exporter', 'bob') 460 container.start_export_job('cave_exporter', 'alice') 461 result = container.get_running_export_jobs() 462 self.assertEqual( 463 result, 464 [('1', 'cave_exporter', 'bob'), 465 ('2', 'cave_exporter', 'alice')] 466 ) 467 return 468 469 def test_get_running_export_jobs_user(self): 470 # we can get the export jobs running for a certain user 471 container = ExportJobContainer() 472 container.start_export_job('cave_exporter', 'bob') 473 container.start_export_job('cave_exporter', 'alice') 474 result1 = container.get_running_export_jobs(user_id='alice') 475 result2 = container.get_running_export_jobs(user_id='foo') 476 self.assertEqual( 477 result1, [('2', 'cave_exporter', 'alice')]) 478 self.assertEqual( 479 result2, []) 480 return 481 482 def test_get_running_export_jobs_only_if_exist(self): 483 # we get only jobs that are accessible through the job manager... 484 container = ExportJobContainer() 485 container.start_export_job('cave_exporter', 'bob') 486 container.start_export_job('cave_exporter', 'bob') 487 self.assertTrue( 488 ('2', 'cave_exporter', 'bob') in container.running_exports) 489 # we remove the second entry from job manager 490 del self.job_manager._jobs['2'] 491 result = container.get_running_export_jobs(user_id='bob') 492 self.assertEqual( 493 result, [('1', 'cave_exporter', 'bob')]) 494 self.assertTrue( 495 ('2', 'cave_exporter', 'bob') not in container.running_exports) 496 return 497 498 def test_get_export_job_status(self): 499 # we can get the stati of jobs... 500 container = ExportJobContainer() 501 container.start_export_job('cave_exporter', 'alice') 502 container.start_export_job('cave_exporter', 'bob') 503 container.start_export_job('cave_exporter', 'bob') 504 result = container.get_export_jobs_status(user_id='bob') 505 # we'll get the raw value, a translation and the title of the 506 # exporter 507 self.assertEqual( 508 result, 509 [('new', u'new', u'Dummy cave exporter'), 510 ('completed', u'completed', u'Dummy cave exporter')] 511 ) 512 return 513 514 def test_delete_export_entry(self): 515 # we can remove export entries in local lists and the job 516 # manager as well... 517 container = ExportJobContainer() 518 container.start_export_job('cave_exporter', 'bob') 519 entry = container.running_exports[0] 520 container.delete_export_entry(entry) 521 # both, running_exports list and job manager are empty now 522 self.assertEqual( 523 container.running_exports, []) 524 self.assertEqual( 525 self.job_manager._jobs, {}) 526 return 527 528 def test_delete_export_entry_remove_file(self): 529 # any result files of exports are deleted as well 530 container = ExportJobContainer() 531 entry = ('4', 'cave_exporter', 'bob') 532 container.running_exports = [entry] 533 fake_job = FakeJobWithResult() 534 self.job_manager._jobs['4'] = fake_job 535 self.assertTrue(os.path.isfile(fake_job.result)) 536 container.delete_export_entry(entry) 537 self.assertTrue(not os.path.exists(fake_job.result)) 538 return 539 540 def test_entry_from_job_id(self): 541 # we can get an entry for a job_id if the id exists 542 container = ExportJobContainer() 543 entry = ('4', 'cave_exporter', 'bob') 544 container.running_exports = [entry] 545 fake_job = FakeJobWithResult() 546 self.job_manager._jobs['4'] = fake_job 547 result1 = container.entry_from_job_id(None) 548 result2 = container.entry_from_job_id('4') 549 result3 = container.entry_from_job_id('23') 550 self.assertEqual(result1, None) 551 self.assertEqual(result2, ('4', 'cave_exporter', 'bob')) 552 self.assertEqual(result3, None) 553 return
Note: See TracChangeset for help on using the changeset viewer.