Changeset 9094 for main/waeup.kofa/branches/uli-zc-async/src
- Timestamp:
- 8 Aug 2012, 22:57:01 (12 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
main/waeup.kofa/branches/uli-zc-async/src/waeup/kofa/utils/tests/test_batching.py
r9092 r9094 23 23 import tempfile 24 24 import unittest 25 from zc.async.interfaces import IJob 25 from zc.async.interfaces import IJob, COMPLETED 26 26 from zope import schema 27 27 from zope.component import provideUtility, getGlobalSiteManager … … 31 31 from zope.interface import Interface, implements, verify 32 32 from waeup.kofa.app import University 33 from waeup.kofa.interfaces import ICSVExporter, IBatchProcessor 33 from waeup.kofa.interfaces import ( 34 ICSVExporter, IBatchProcessor, IExportJobContainer, IJobManager) 34 35 from waeup.kofa.testing import FunctionalLayer, FunctionalTestCase 35 36 from waeup.kofa.utils.batching import ( 36 ExporterBase, BatchProcessor, export_job, AsyncExportJob) 37 ExporterBase, BatchProcessor, export_job, AsyncExportJob, 38 ExportJobContainer) 37 39 38 40 optionflags = ( … … 341 343 # A minimal fake exporter suitable to be called by export_jobs 342 344 fields = ('name', 'dinoports', 'owner', 'taxpayer') 345 title = u'Dummy cave exporter' 343 346 344 347 def export_all(self, site, filepath=None): … … 376 379 verify.verifyObject(IJob, job) 377 380 return 381 382 383 class FakeJob(object): 384 385 status = COMPLETED 386 result = None 387 388 class FakeJobWithResult(FakeJob): 389 390 def __init__(self): 391 dir_path = tempfile.mkdtemp() 392 self.result = os.path.join(dir_path, 'fake.csv') 393 open(self.result, 'wb').write('a fake result') 394 return 395 396 class FakeJobManager(object): 397 398 _jobs = dict() 399 _curr_num = 1 400 401 def get(self, job_id): 402 if job_id == '3': 403 return FakeJob() 404 return self._jobs.get(job_id, None) 405 406 def put(self, job): 407 num = str(self._curr_num) 408 self._jobs[num] = job 409 self._curr_num += 1 410 return num 411 412 def remove(self, job_id, site): 413 if job_id in self._jobs: 414 del self._jobs[job_id] 415 return 416 417 class ExportJobContainerTests(unittest.TestCase): 418 # Test ExportJobContainer 419 420 def setUp(self): 421 # register a suitable ICSVExporter as named utility 422 self.exporter = CaveExporter() 423 self.job_manager = FakeJobManager() 424 self.gsm = getGlobalSiteManager() 425 self.gsm.registerUtility( 426 self.exporter, ICSVExporter, name='cave_exporter') 427 self.gsm.registerUtility( 428 self.job_manager, IJobManager) 429 430 def tearDown(self): 431 self.gsm.unregisterUtility(self.exporter) 432 self.gsm.unregisterUtility(self.job_manager, IJobManager) 433 434 def test_export_job_interfaces(self): 435 # the ExportJobContainer implements promised interfaces correctly... 436 container = ExportJobContainer() 437 verify.verifyClass(IExportJobContainer, ExportJobContainer) 438 verify.verifyObject(IExportJobContainer, container) 439 return 440 441 def test_start_export_job(self): 442 # we can start jobs 443 container = ExportJobContainer() 444 container.start_export_job('cave_exporter', 'bob') 445 result = self.job_manager._jobs.values()[0] 446 self.assertTrue(IJob.providedBy(result)) 447 self.assertEqual( 448 container.running_exports, 449 [('1', 'cave_exporter', 'bob')] 450 ) 451 return 452 453 def test_get_running_export_jobs_all(self): 454 # we can get export jobs of all users 455 container = ExportJobContainer() 456 container.start_export_job('cave_exporter', 'bob') 457 container.start_export_job('cave_exporter', 'alice') 458 result = container.get_running_export_jobs() 459 self.assertEqual( 460 result, 461 [('1', 'cave_exporter', 'bob'), 462 ('2', 'cave_exporter', 'alice')] 463 ) 464 return 465 466 def test_get_running_export_jobs_user(self): 467 # we can get the export jobs running for a certain user 468 container = ExportJobContainer() 469 container.start_export_job('cave_exporter', 'bob') 470 container.start_export_job('cave_exporter', 'alice') 471 result1 = container.get_running_export_jobs(user_id='alice') 472 result2 = container.get_running_export_jobs(user_id='foo') 473 self.assertEqual( 474 result1, [('2', 'cave_exporter', 'alice')]) 475 self.assertEqual( 476 result2, []) 477 return 478 479 def test_get_running_export_jobs_only_if_exist(self): 480 # we get only jobs that are accessible through the job manager... 481 container = ExportJobContainer() 482 container.start_export_job('cave_exporter', 'bob') 483 container.start_export_job('cave_exporter', 'bob') 484 self.assertTrue( 485 ('2', 'cave_exporter', 'bob') in container.running_exports) 486 # we remove the second entry from job manager 487 del self.job_manager._jobs['2'] 488 result = container.get_running_export_jobs(user_id='bob') 489 self.assertEqual( 490 result, [('1', 'cave_exporter', 'bob')]) 491 self.assertTrue( 492 ('2', 'cave_exporter', 'bob') not in container.running_exports) 493 return 494 495 def test_get_export_job_status(self): 496 # we can get the stati of jobs... 497 container = ExportJobContainer() 498 container.start_export_job('cave_exporter', 'alice') 499 container.start_export_job('cave_exporter', 'bob') 500 container.start_export_job('cave_exporter', 'bob') 501 result = container.get_export_jobs_status(user_id='bob') 502 # we'll get the raw value, a translation and the title of the 503 # exporter 504 self.assertEqual( 505 result, 506 [('new', u'new', u'Dummy cave exporter'), 507 ('completed', u'completed', u'Dummy cave exporter')] 508 ) 509 return 510 511 def test_delete_export_entry(self): 512 # we can remove export entries in local lists and the job 513 # manager as well... 514 container = ExportJobContainer() 515 container.start_export_job('cave_exporter', 'bob') 516 entry = container.running_exports[0] 517 container.delete_export_entry(entry) 518 # both, running_exports list and job manager are empty now 519 self.assertEqual( 520 container.running_exports, []) 521 self.assertEqual( 522 self.job_manager._jobs, {}) 523 return 524 525 def test_delete_export_entry_remove_file(self): 526 # any result files of exports are deleted as well 527 container = ExportJobContainer() 528 entry = ('4', 'cave_exporter', 'bob') 529 container.running_exports = [entry] 530 fake_job = FakeJobWithResult() 531 self.job_manager._jobs['4'] = fake_job 532 self.assertTrue(os.path.isfile(fake_job.result)) 533 container.delete_export_entry(entry) 534 self.assertTrue(not os.path.exists(fake_job.result)) 535 return
Note: See TracChangeset for help on using the changeset viewer.