Changeset 9217 for main/waeup.kofa/trunk/src/waeup/kofa/utils
- Timestamp:
- 21 Sep 2012, 11:21:05 (12 years ago)
- Location:
- main/waeup.kofa/trunk/src/waeup/kofa/utils
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
main/waeup.kofa/trunk/src/waeup/kofa/utils/batching.py
r9170 r9217 25 25 import datetime 26 26 import os 27 import shutil 27 28 import tempfile 28 29 import time 29 30 from cStringIO import StringIO 30 from zope.component import createObject 31 from persistent.list import PersistentList 32 from zope.component import createObject, getUtility 33 from zope.component.hooks import setSite 31 34 from zope.interface import Interface 32 35 from zope.schema import getFields 33 36 from zope.event import notify 37 from waeup.kofa.async import AsyncJob 34 38 from waeup.kofa.interfaces import ( 35 IBatchProcessor, FatalCSVError, IObjectConverter, 36 ICSVExporter, IGNORE_MARKER, DuplicationError) 39 IBatchProcessor, FatalCSVError, IObjectConverter, IJobManager, 40 ICSVExporter, IGNORE_MARKER, DuplicationError, JOB_STATUS_MAP, 41 IExportJobContainer, IExportJob) 37 42 38 43 class BatchProcessor(grok.GlobalUtility): … … 361 366 self.writeFailedRow( 362 367 failed_writer, string_row, 363 "Cannot remove: no such entry ")368 "Cannot remove: no such entry.") 364 369 continue 365 370 self.delEntry(row, site) … … 498 503 """ 499 504 raise NotImplementedError 505 506 507 def export_job(site, exporter_name): 508 """Export all entries delivered by exporter and store it in a temp file. 509 510 `site` gives the site to search. It will be passed to the exporter 511 and also be set as 'current site' as the function is used in 512 asynchronous jobs which run in their own threads and have no site 513 set initially. Therefore `site` must also be a valid value for use 514 with `zope.component.hooks.setSite()`. 515 516 `exporter_name` is the utility name under which the desired 517 exporter was registered with the ZCA. 518 519 The resulting CSV file will be stored in a new temporary directory 520 (using :func:`tempfile.mkdtemp`). It will be named after the 521 exporter used with `.csv` filename extension. 522 523 Returns the path to the created CSV file. 524 525 .. note:: It is the callers responsibility to clean up the used 526 file and its parent directory. 527 """ 528 setSite(site) 529 exporter = getUtility(ICSVExporter, name=exporter_name) 530 output_dir = tempfile.mkdtemp() 531 filename = '%s.csv' % exporter_name 532 output_path = os.path.join(output_dir, filename) 533 exporter.export_all(site, filepath=output_path) 534 return output_path 535 536 class AsyncExportJob(AsyncJob): 537 """An IJob that exports data to CSV files. 538 539 `AsyncExportJob` instances are regular `AsyncJob` instances with a 540 different constructor API. Instead of a callable to execute, you 541 must pass a `site` and some `exporter_name` to trigger an export. 542 543 The real work is done when an instance of this class is put into a 544 queue. See :mod:`waeup.kofa.async` to learn more about 545 asynchronous jobs. 546 547 The `exporter_name` must be the name under which an ICSVExporter 548 utility was registered with the ZCA. 549 550 The `site` must be a valid site or ``None``. 551 552 The result of an `AsyncExportJob` is the path to generated CSV 553 file. The file will reside in a temporary directory that should be 554 removed after being used. 555 """ 556 grok.implements(IExportJob) 557 558 def __init__(self, site, exporter_name): 559 super(AsyncExportJob, self).__init__( 560 export_job, site, exporter_name) 561 562 class ExportJobContainer(object): 563 """A mix-in that provides functionality for asynchronous export jobs. 564 """ 565 grok.implements(IExportJobContainer) 566 running_exports = PersistentList() 567 568 def start_export_job(self, exporter_name, user_id): 569 """Start asynchronous export job. 570 571 `exporter_name` is the name of an exporter utility to be used. 572 573 `user_id` is the ID of the user that triggers the export. 574 575 The job_id is stored along with exporter name and user id in a 576 persistent list. 577 578 Returns the job ID of the job started. 579 """ 580 site = grok.getSite() 581 manager = getUtility(IJobManager) 582 job = AsyncExportJob(site, exporter_name) 583 job_id = manager.put(job) 584 # Make sure that the persisted list is stored in ZODB 585 self.running_exports = PersistentList(self.running_exports) 586 self.running_exports.append((job_id, exporter_name, user_id)) 587 return job_id 588 589 def get_running_export_jobs(self, user_id=None): 590 """Get export jobs for user with `user_id` as list of tuples. 591 592 Each tuples holds ``<job_id>, <exporter_name>, <user_id>`` in 593 that order. The ``<exporter_name>`` is the utility name of the 594 used exporter. 595 596 If `user_id` is ``None``, all running jobs are returned. 597 """ 598 entries = [] 599 to_delete = [] 600 manager = getUtility(IJobManager) 601 for entry in self.running_exports: 602 if user_id is not None and entry[2] != user_id: 603 continue 604 if manager.get(entry[0]) is None: 605 to_delete.append(entry) 606 continue 607 entries.append(entry) 608 if to_delete: 609 self.running_exports = PersistentList( 610 [x for x in self.running_exports if x not in to_delete]) 611 return entries 612 613 def get_export_jobs_status(self, user_id=None): 614 """Get running/completed export jobs for `user_id` as list of tuples. 615 616 Each tuple holds ``<raw status>, <status translated>, 617 <exporter title>`` in that order, where ``<status 618 translated>`` and ``<exporter title>`` are translated strings 619 representing the status of the job and the human readable 620 title of the exporter used. 621 """ 622 entries = self.get_running_export_jobs(user_id) 623 result = [] 624 manager = getUtility(IJobManager) 625 for entry in entries: 626 job = manager.get(entry[0]) 627 if job is None: 628 continue 629 status, status_translated = JOB_STATUS_MAP[job.status] 630 exporter_name = getUtility(ICSVExporter, name=entry[1]).title 631 result.append((status, status_translated, exporter_name)) 632 return result 633 634 def delete_export_entry(self, entry): 635 """Delete the export denoted by `entry`. 636 637 Removes given entry from the local `running_exports` list and also 638 removes the regarding job via the local job manager. 639 640 `entry` must be a tuple ``(<job id>, <exporter name>, <user 641 id>)`` as created by :meth:`start_export_job` or returned by 642 :meth:`get_running_export_jobs`. 643 """ 644 manager = getUtility(IJobManager) 645 job = manager.get(entry[0]) 646 if job is not None: 647 # remove created export file 648 if isinstance(job.result, basestring): 649 if os.path.exists(os.path.dirname(job.result)): 650 shutil.rmtree(os.path.dirname(job.result)) 651 manager.remove(entry[0], self) 652 new_entries = [x for x in self.running_exports 653 if x != entry] 654 self.running_exports = PersistentList(new_entries) 655 return 656 657 def entry_from_job_id(self, job_id): 658 """Get entry tuple for `job_id`. 659 660 Returns ``None`` if no such entry can be found. 661 """ 662 for entry in self.running_exports: 663 if entry[0] == job_id: 664 return entry 665 return None -
main/waeup.kofa/trunk/src/waeup/kofa/utils/tests/test_batching.py
r8380 r9217 23 23 import tempfile 24 24 import unittest 25 from zc.async.interfaces import IJob, COMPLETED 25 26 from zope import schema 26 from zope.component import provideUtility 27 from zope.component import provideUtility, getGlobalSiteManager 27 28 from zope.component.factory import Factory 28 29 from zope.component.hooks import clearSite … … 30 31 from zope.interface import Interface, implements, verify 31 32 from waeup.kofa.app import University 32 from waeup.kofa.interfaces import ICSVExporter, IBatchProcessor 33 from waeup.kofa.interfaces import ( 34 ICSVExporter, IBatchProcessor, IExportJobContainer, IJobManager, 35 IExportJob) 33 36 from waeup.kofa.testing import FunctionalLayer, FunctionalTestCase 34 from waeup.kofa.utils.batching import ExporterBase 37 from waeup.kofa.utils.batching import ( 38 ExporterBase, BatchProcessor, export_job, AsyncExportJob, 39 ExportJobContainer) 35 40 36 41 optionflags = ( … … 71 76 self.owner = owner 72 77 self.taxpayer = taxpayer 73 #Cave = attrs_to_fields(Cave)74 78 75 79 stoneville = dict 76 80 77 from waeup.kofa.utils.batching import BatchProcessor 81 SAMPLE_DATA = """name,dinoports,owner,taxpayer 82 Barneys Home,2,Barney,1 83 Wilmas Asylum,1,Wilma,1 84 Freds Dinoburgers,10,Fred,0 85 Joeys Drive-in,110,Joey,0 86 """ 87 78 88 class CaveProcessor(BatchProcessor): 79 89 util_name = 'caveprocessor' 80 #grok.name(util_name)81 90 name = 'Cave Processor' 82 91 iface = ICave … … 136 145 # Provide sample data 137 146 self.newcomers_csv = os.path.join(self.workdir, 'newcomers.csv') 138 open(self.newcomers_csv, 'wb').write( 139 """name,dinoports,owner,taxpayer 140 Barneys Home,2,Barney,1 141 Wilmas Asylum,1,Wilma,1 142 Freds Dinoburgers,10,Fred,0 143 Joeys Drive-in,110,Joey,0 144 """) 147 open(self.newcomers_csv, 'wb').write(SAMPLE_DATA) 145 148 self.setupLogger() 146 149 self.stoneville = stoneville … … 336 339 self.assertEqual(result, None) 337 340 return 341 342 343 class CaveExporter(ExporterBase): 344 # A minimal fake exporter suitable to be called by export_jobs 345 fields = ('name', 'dinoports', 'owner', 'taxpayer') 346 title = u'Dummy cave exporter' 347 348 def export_all(self, site, filepath=None): 349 if filepath is None: 350 return SAMPLE_DATA 351 open(filepath, 'wb').write(SAMPLE_DATA) 352 return 353 354 class ExportJobTests(unittest.TestCase): 355 # Test asynchronous export functionality (simple cases) 356 357 def setUp(self): 358 # register a suitable ICSVExporter as named utility 359 self.exporter = CaveExporter() 360 self.gsm = getGlobalSiteManager() 361 self.gsm.registerUtility( 362 self.exporter, ICSVExporter, name='cave_exporter') 363 364 def tearDown(self): 365 self.gsm.unregisterUtility(self.exporter) 366 367 def test_export_job_func(self): 368 # the export_job func does really export data... 369 result_path = export_job(None, 'cave_exporter') 370 self.assertTrue(os.path.isfile(result_path)) 371 contents = open(result_path, 'rb').read() 372 shutil.rmtree(os.path.dirname(result_path)) 373 self.assertEqual(contents, SAMPLE_DATA) 374 return 375 376 def test_export_job_interfaces(self): 377 # the AsyncExportJob implements promised interfaces correctly... 378 job = AsyncExportJob(None, None) 379 verify.verifyClass(IJob, AsyncExportJob) 380 verify.verifyObject(IJob, job) 381 verify.verifyClass(IExportJob, AsyncExportJob) 382 verify.verifyObject(IExportJob, job) 383 return 384 385 386 class FakeJob(object): 387 388 status = COMPLETED 389 result = None 390 391 class FakeJobWithResult(FakeJob): 392 393 def __init__(self): 394 dir_path = tempfile.mkdtemp() 395 self.result = os.path.join(dir_path, 'fake.csv') 396 open(self.result, 'wb').write('a fake result') 397 return 398 399 class FakeJobManager(object): 400 401 _jobs = dict() 402 _curr_num = 1 403 404 def get(self, job_id): 405 if job_id == '3': 406 return FakeJob() 407 return self._jobs.get(job_id, None) 408 409 def put(self, job): 410 num = str(self._curr_num) 411 self._jobs[num] = job 412 self._curr_num += 1 413 return num 414 415 def remove(self, job_id, site): 416 if job_id in self._jobs: 417 del self._jobs[job_id] 418 return 419 420 class ExportJobContainerTests(unittest.TestCase): 421 # Test ExportJobContainer 422 423 def setUp(self): 424 # register a suitable ICSVExporter as named utility 425 self.exporter = CaveExporter() 426 self.job_manager = FakeJobManager() 427 self.gsm = getGlobalSiteManager() 428 self.gsm.registerUtility( 429 self.exporter, ICSVExporter, name='cave_exporter') 430 self.gsm.registerUtility( 431 self.job_manager, IJobManager) 432 433 def tearDown(self): 434 self.gsm.unregisterUtility(self.exporter) 435 self.gsm.unregisterUtility(self.job_manager, IJobManager) 436 437 def test_export_job_interfaces(self): 438 # the ExportJobContainer implements promised interfaces correctly... 439 container = ExportJobContainer() 440 verify.verifyClass(IExportJobContainer, ExportJobContainer) 441 verify.verifyObject(IExportJobContainer, container) 442 return 443 444 def test_start_export_job(self): 445 # we can start jobs 446 container = ExportJobContainer() 447 container.start_export_job('cave_exporter', 'bob') 448 result = self.job_manager._jobs.values()[0] 449 self.assertTrue(IJob.providedBy(result)) 450 self.assertEqual( 451 container.running_exports, 452 [('1', 'cave_exporter', 'bob')] 453 ) 454 return 455 456 def test_get_running_export_jobs_all(self): 457 # we can get export jobs of all users 458 container = ExportJobContainer() 459 container.start_export_job('cave_exporter', 'bob') 460 container.start_export_job('cave_exporter', 'alice') 461 result = container.get_running_export_jobs() 462 self.assertEqual( 463 result, 464 [('1', 'cave_exporter', 'bob'), 465 ('2', 'cave_exporter', 'alice')] 466 ) 467 return 468 469 def test_get_running_export_jobs_user(self): 470 # we can get the export jobs running for a certain user 471 container = ExportJobContainer() 472 container.start_export_job('cave_exporter', 'bob') 473 container.start_export_job('cave_exporter', 'alice') 474 result1 = container.get_running_export_jobs(user_id='alice') 475 result2 = container.get_running_export_jobs(user_id='foo') 476 self.assertEqual( 477 result1, [('2', 'cave_exporter', 'alice')]) 478 self.assertEqual( 479 result2, []) 480 return 481 482 def test_get_running_export_jobs_only_if_exist(self): 483 # we get only jobs that are accessible through the job manager... 484 container = ExportJobContainer() 485 container.start_export_job('cave_exporter', 'bob') 486 container.start_export_job('cave_exporter', 'bob') 487 self.assertTrue( 488 ('2', 'cave_exporter', 'bob') in container.running_exports) 489 # we remove the second entry from job manager 490 del self.job_manager._jobs['2'] 491 result = container.get_running_export_jobs(user_id='bob') 492 self.assertEqual( 493 result, [('1', 'cave_exporter', 'bob')]) 494 self.assertTrue( 495 ('2', 'cave_exporter', 'bob') not in container.running_exports) 496 return 497 498 def test_get_export_job_status(self): 499 # we can get the stati of jobs... 500 container = ExportJobContainer() 501 container.start_export_job('cave_exporter', 'alice') 502 container.start_export_job('cave_exporter', 'bob') 503 container.start_export_job('cave_exporter', 'bob') 504 result = container.get_export_jobs_status(user_id='bob') 505 # we'll get the raw value, a translation and the title of the 506 # exporter 507 self.assertEqual( 508 result, 509 [('new', u'new', u'Dummy cave exporter'), 510 ('completed', u'completed', u'Dummy cave exporter')] 511 ) 512 return 513 514 def test_delete_export_entry(self): 515 # we can remove export entries in local lists and the job 516 # manager as well... 517 container = ExportJobContainer() 518 container.start_export_job('cave_exporter', 'bob') 519 entry = container.running_exports[0] 520 container.delete_export_entry(entry) 521 # both, running_exports list and job manager are empty now 522 self.assertEqual( 523 container.running_exports, []) 524 self.assertEqual( 525 self.job_manager._jobs, {}) 526 return 527 528 def test_delete_export_entry_remove_file(self): 529 # any result files of exports are deleted as well 530 container = ExportJobContainer() 531 entry = ('4', 'cave_exporter', 'bob') 532 container.running_exports = [entry] 533 fake_job = FakeJobWithResult() 534 self.job_manager._jobs['4'] = fake_job 535 self.assertTrue(os.path.isfile(fake_job.result)) 536 container.delete_export_entry(entry) 537 self.assertTrue(not os.path.exists(fake_job.result)) 538 return 539 540 def test_entry_from_job_id(self): 541 # we can get an entry for a job_id if the id exists 542 container = ExportJobContainer() 543 entry = ('4', 'cave_exporter', 'bob') 544 container.running_exports = [entry] 545 fake_job = FakeJobWithResult() 546 self.job_manager._jobs['4'] = fake_job 547 result1 = container.entry_from_job_id(None) 548 result2 = container.entry_from_job_id('4') 549 result3 = container.entry_from_job_id('23') 550 self.assertEqual(result1, None) 551 self.assertEqual(result2, ('4', 'cave_exporter', 'bob')) 552 self.assertEqual(result3, None) 553 return
Note: See TracChangeset for help on using the changeset viewer.