Changeset 9089 for main/waeup.kofa/branches/uli-zc-async/src/waeup
- Timestamp:
- 8 Aug 2012, 13:19:41 (12 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
main/waeup.kofa/branches/uli-zc-async/src/waeup/kofa/utils/batching.py
r8573 r9089 25 25 import datetime 26 26 import os 27 import shutil 27 28 import tempfile 28 29 import time 29 30 from cStringIO import StringIO 30 from zope.component import createObject 31 from persistent.list import PersistentList 32 from zope.component import createObject, getUtility 33 from zope.component.hooks import setSite 31 34 from zope.interface import Interface 32 35 from zope.schema import getFields 33 36 from zope.event import notify 37 from waeup.kofa.async import AsyncJob 34 38 from waeup.kofa.interfaces import ( 35 IBatchProcessor, FatalCSVError, IObjectConverter, 36 ICSVExporter, IGNORE_MARKER, DuplicationError )39 IBatchProcessor, FatalCSVError, IObjectConverter, IJobManager, 40 ICSVExporter, IGNORE_MARKER, DuplicationError, JOB_STATUS_MAP) 37 41 38 42 class BatchProcessor(grok.GlobalUtility): … … 486 490 """ 487 491 raise NotImplementedError 492 493 494 def export_job(site, exporter_name): 495 """Export all entries delivered by exporter and store it in a temp file. 496 497 `site` gives the site to search. It will be passed to the exporter 498 and also be set as 'current site' as the function is used in 499 asynchronous jobs which run in their own threads and have no site 500 set initially. Therefore `site` must also be a valid value for use 501 with `zope.component.hooks.setSite()`. 502 503 `exporter_name` is the utility name under which the desired 504 exporter was registered with the ZCA. 505 506 The resulting CSV file will be stored in a new temporary directory 507 (using :func:`tempfile.mkdtemp`). 508 509 Returns the path to the created CSV file. 510 511 .. note:: It is the callers responsibility to clean up the used 512 file and its parent directory. 513 """ 514 setSite(site) 515 exporter = getUtility(ICSVExporter, name=exporter_name) 516 output_dir = tempfile.mkdtemp() 517 output_path = os.path.join(output_dir, 'export.csv') 518 exporter.export_all(site, filepath=output_path) 519 return output_path 520 521 class AsyncExportJob(AsyncJob): 522 523 def __init__(self, site, exporter_name): 524 super(AsyncExportJob, self).__init__( 525 export_job, site, exporter_name) 526 527 class ExportJobContainer(object): 528 """A mix-in that provides functionality for asynchronous export jobs. 529 """ 530 running_exports = PersistentList() 531 532 def start_export_job(self, exporter_name, user_id): 533 """Start asynchronous export job. 534 535 `exporter_name` is the name of an exporter utility to be used. 536 537 `user_id` is the ID of the user that triggers the export. 538 539 The job_id is stored along with exporter name and user id in a 540 persistent list. 541 542 Returns the job ID of the job started. 543 """ 544 site = grok.getSite() 545 manager = getUtility(IJobManager) 546 job = AsyncExportJob(site, exporter_name) 547 job_id = manager.put(job) 548 self.running_exports.append((job_id, exporter_name, user_id)) 549 # Make sure that the persisted list is stored in ZODB 550 self.running_exports = PersistentList(self.running_exports) 551 return job_id 552 553 def get_running_export_jobs(self, user_id=None): 554 """Get export jobs for user with `user_id` as list of tuples. 555 556 Each tuples holds ``<job_id>, <exporter_name>, <user_id>`` in 557 that order. The ``<exporter_name>`` is the utility name of the 558 used exporter. 559 560 If `user_id` is ``None``, all running jobs are returned. 561 """ 562 entries = [] 563 to_delete = [] 564 manager = getUtility(IJobManager) 565 for entry in self.running_exports: 566 if user_id is not None and entry[2] != user_id: 567 continue 568 if manager.get(entry[0]) is None: 569 to_delete.append(entry) 570 continue 571 entries.append(entry) 572 if to_delete: 573 self.running_exports = PersistentList( 574 [x for x in self.running_exports if x not in to_delete]) 575 return entries 576 577 def get_export_jobs_status(self, user_id=None): 578 """Get running/completed export jobs for `user_id` as list of tuples. 579 580 Each tuple holds ``<raw status>, <status translated>, 581 <exporter title>`` in that order, where ``<status 582 translated>`` and ``<exporter title>`` are translated strings 583 representing the status of the job and the human readable 584 title of the exporter used. 585 """ 586 entries = self.get_running_export_jobs(user_id) 587 result = [] 588 manager = getUtility(IJobManager) 589 for entry in entries: 590 job = manager.get(entry[0]) 591 if job is None: 592 continue 593 status, status_translated = JOB_STATUS_MAP[job.status] 594 exporter_name = getUtility(ICSVExporter, name=entry[1]).title 595 result.append((status, status_translated, exporter_name)) 596 return result 597 598 def delete_export_entry(self, entry): 599 """Delete the export denoted by `entry`. 600 601 Removes `entry` from the local `running_exports` list and also 602 removes the regarding job via the local job manager. 603 604 `entry` is a tuple ``(<job id>, <exporter name>, <user id>)`` 605 as created by :meth:`start_export_job` or returned by 606 :meth:`get_running_export_jobs`. 607 """ 608 manager = getUtility(IJobManager) 609 job = manager.get(entry[0]) 610 if job is not None: 611 # remove created export file 612 if isinstance(job.result, basestring): 613 if os.path.exists(os.path.dirname(job.result)): 614 shutil.rmtree(os.path.dirname(job.result)) 615 manager.remove(entry[0], self) 616 new_entries = [x for x in self.running_exports 617 if x != entry] 618 self.running_exports = PersistentList(new_entries) 619 return
Note: See TracChangeset for help on using the changeset viewer.