Changeset 9726 for main/waeup.kofa/trunk/src/waeup/kofa/utils/tests
- Timestamp:
- 26 Nov 2012, 23:27:46 (12 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
main/waeup.kofa/trunk/src/waeup/kofa/utils/tests/test_batching.py
r9342 r9726 25 25 from zc.async.interfaces import IJob 26 26 from zope import schema 27 from zope.component import provideUtility, getGlobalSiteManager 27 from zope.component import provideUtility, getGlobalSiteManager, getUtility 28 28 from zope.component.factory import Factory 29 from zope.component.hooks import clearSite 29 from zope.component.hooks import clearSite, setSite 30 30 from zope.component.interfaces import IFactory 31 31 from zope.interface import Interface, implements, verify … … 33 33 from waeup.kofa.interfaces import ( 34 34 ICSVExporter, IBatchProcessor, IExportJobContainer, IJobManager, 35 IExportJob )35 IExportJob, IExportContainerFinder) 36 36 from waeup.kofa.testing import ( 37 37 FunctionalLayer, FunctionalTestCase, FakeJob, FakeJobManager) 38 38 from waeup.kofa.utils.batching import ( 39 39 ExporterBase, BatchProcessor, export_job, AsyncExportJob, 40 ExportJobContainer )40 ExportJobContainer, VirtualExportJobContainer, ExportContainerFinder) 41 41 42 42 optionflags = ( … … 396 396 # Test ExportJobContainer 397 397 398 TestedClass = ExportJobContainer 399 398 400 def setUp(self): 399 401 # register a suitable ICSVExporter as named utility … … 412 414 def test_export_job_interfaces(self): 413 415 # the ExportJobContainer implements promised interfaces correctly... 414 container = ExportJobContainer()415 verify.verifyClass(IExportJobContainer, ExportJobContainer)416 container = self.TestedClass() 417 verify.verifyClass(IExportJobContainer, self.TestedClass) 416 418 verify.verifyObject(IExportJobContainer, container) 417 419 return … … 419 421 def test_start_export_job(self): 420 422 # we can start jobs 421 container = ExportJobContainer()423 container = self.TestedClass() 422 424 container.start_export_job('cave_exporter', 'bob') 423 425 result = self.job_manager._jobs.values()[0] … … 431 433 def test_get_running_export_jobs_all(self): 432 434 # we can get export jobs of all users 433 container = ExportJobContainer()435 container = self.TestedClass() 434 436 container.start_export_job('cave_exporter', 'bob') 435 437 container.start_export_job('cave_exporter', 'alice') … … 444 446 def test_get_running_export_jobs_user(self): 445 447 # we can get the export jobs running for a certain user 446 container = ExportJobContainer()448 container = self.TestedClass() 447 449 container.start_export_job('cave_exporter', 'bob') 448 450 container.start_export_job('cave_exporter', 'alice') … … 457 459 def test_get_running_export_jobs_only_if_exist(self): 458 460 # we get only jobs that are accessible through the job manager... 459 container = ExportJobContainer()461 container = self.TestedClass() 460 462 container.start_export_job('cave_exporter', 'bob') 461 463 container.start_export_job('cave_exporter', 'bob') … … 473 475 def test_get_export_job_status(self): 474 476 # we can get the stati of jobs... 475 container = ExportJobContainer()477 container = self.TestedClass() 476 478 container.start_export_job('cave_exporter', 'alice') 477 479 container.start_export_job('cave_exporter', 'bob') … … 490 492 # we can remove export entries in local lists and the job 491 493 # manager as well... 492 container = ExportJobContainer()494 container = self.TestedClass() 493 495 container.start_export_job('cave_exporter', 'bob') 494 496 entry = container.running_exports[0] … … 503 505 def test_delete_export_entry_remove_file(self): 504 506 # any result files of exports are deleted as well 505 container = ExportJobContainer()507 container = self.TestedClass() 506 508 entry = ('4', 'cave_exporter', 'bob') 507 509 container.running_exports = [entry] … … 515 517 def test_entry_from_job_id(self): 516 518 # we can get an entry for a job_id if the id exists 517 container = ExportJobContainer()519 container = self.TestedClass() 518 520 entry = ('4', 'cave_exporter', 'bob') 519 521 container.running_exports = [entry] … … 528 530 shutil.rmtree(fake_job.dir_path) 529 531 return 532 533 class VirtualExportJobContainerTests(ExportJobContainerTests): 534 # VirtualExportJobContainers should provide the 535 # same functionality as regular ones. 536 537 TestedClass = VirtualExportJobContainer 538 539 def setUp(self): 540 super(VirtualExportJobContainerTests, self).setUp() 541 self.root_job_container = ExportJobContainer() 542 def fake_finder(): 543 return self.root_job_container 544 self.gsm = getGlobalSiteManager() 545 self.gsm.registerUtility(fake_finder, IExportContainerFinder) 546 return 547 548 class ExportContainerFinderTests(FunctionalTestCase): 549 # Tests for export container finder. 550 551 layer = FunctionalLayer 552 553 def test_get_finder_as_util(self): 554 # we can get a finder by utility lookup 555 finder = getUtility(IExportContainerFinder) 556 self.assertTrue(finder is not None) 557 self.assertEqual( 558 IExportContainerFinder.providedBy(finder), 559 True) 560 return 561 562 def test_iface(self): 563 # the finder complies with the promised interface 564 finder = ExportContainerFinder() 565 verify.verifyClass(IExportContainerFinder, 566 ExportContainerFinder) 567 verify.verifyObject(IExportContainerFinder, finder) 568 return 569 570 def test_no_site(self): 571 # a finder returns None if no site is available 572 finder = ExportContainerFinder() 573 self.assertEqual( 574 finder(), None) 575 return 576 577 def test_active_site(self): 578 # we get the datafinder if one is installed and site set 579 self.getRootFolder()['app'] = University() 580 finder = getUtility(IExportContainerFinder) 581 setSite(self.getRootFolder()['app']) 582 container = finder() 583 self.assertTrue(container is not None) 584 return 585 586 def test_broken_site(self): 587 # if the current site has no ExportContainer, we get None 588 self.getRootFolder()['app'] = University() 589 app = self.getRootFolder()['app'] 590 del app['datacenter'] # datacenter _is_ the export container 591 setSite(app) 592 finder = getUtility(IExportContainerFinder) 593 container = finder() 594 self.assertTrue(container is None) 595 return
Note: See TracChangeset for help on using the changeset viewer.