Ignore:
Timestamp:
8 Aug 2012, 13:19:41 (12 years ago)
Author:
uli
Message:

Add async functionality for exporters:

  • an exporter function usable with decoupled threads
  • a mix-in class for components storing export jobs
  • specialized async-aware job implementation
File:
1 edited

Legend:

Unmodified
Added
Removed
  • main/waeup.kofa/branches/uli-zc-async/src/waeup/kofa/utils/batching.py

    r8573 r9089  
    2525import datetime
    2626import os
     27import shutil
    2728import tempfile
    2829import time
    2930from cStringIO import StringIO
    30 from zope.component import createObject
     31from persistent.list import PersistentList
     32from zope.component import createObject, getUtility
     33from zope.component.hooks import setSite
    3134from zope.interface import Interface
    3235from zope.schema import getFields
    3336from zope.event import notify
     37from waeup.kofa.async import AsyncJob
    3438from waeup.kofa.interfaces import (
    35     IBatchProcessor, FatalCSVError, IObjectConverter,
    36     ICSVExporter, IGNORE_MARKER, DuplicationError)
     39    IBatchProcessor, FatalCSVError, IObjectConverter, IJobManager,
     40    ICSVExporter, IGNORE_MARKER, DuplicationError, JOB_STATUS_MAP)
    3741
    3842class BatchProcessor(grok.GlobalUtility):
     
    486490        """
    487491        raise NotImplementedError
     492
     493
     494def export_job(site, exporter_name):
     495    """Export all entries delivered by exporter and store it in a temp file.
     496
     497    `site` gives the site to search. It will be passed to the exporter
     498    and also be set as 'current site' as the function is used in
     499    asynchronous jobs which run in their own threads and have no site
     500    set initially. Therefore `site` must also be a valid value for use
     501    with `zope.component.hooks.setSite()`.
     502
     503    `exporter_name` is the utility name under which the desired
     504    exporter was registered with the ZCA.
     505
     506    The resulting CSV file will be stored in a new temporary directory
     507    (using :func:`tempfile.mkdtemp`).
     508
     509    Returns the path to the created CSV file.
     510
     511    .. note:: It is the callers responsibility to clean up the used
     512              file and its parent directory.
     513    """
     514    setSite(site)
     515    exporter = getUtility(ICSVExporter, name=exporter_name)
     516    output_dir = tempfile.mkdtemp()
     517    output_path = os.path.join(output_dir, 'export.csv')
     518    exporter.export_all(site, filepath=output_path)
     519    return output_path
     520
     521class AsyncExportJob(AsyncJob):
     522
     523    def __init__(self, site, exporter_name):
     524        super(AsyncExportJob, self).__init__(
     525            export_job, site, exporter_name)
     526
     527class ExportJobContainer(object):
     528    """A mix-in that provides functionality for asynchronous export jobs.
     529    """
     530    running_exports = PersistentList()
     531
     532    def start_export_job(self, exporter_name, user_id):
     533        """Start asynchronous export job.
     534
     535        `exporter_name` is the name of an exporter utility to be used.
     536
     537        `user_id` is the ID of the user that triggers the export.
     538
     539        The job_id is stored along with exporter name and user id in a
     540        persistent list.
     541
     542        Returns the job ID of the job started.
     543        """
     544        site = grok.getSite()
     545        manager = getUtility(IJobManager)
     546        job = AsyncExportJob(site, exporter_name)
     547        job_id = manager.put(job)
     548        self.running_exports.append((job_id, exporter_name, user_id))
     549        # Make sure that the persisted list is stored in ZODB
     550        self.running_exports = PersistentList(self.running_exports)
     551        return job_id
     552
     553    def get_running_export_jobs(self, user_id=None):
     554        """Get export jobs for user with `user_id` as list of tuples.
     555
     556        Each tuples holds ``<job_id>, <exporter_name>, <user_id>`` in
     557        that order. The ``<exporter_name>`` is the utility name of the
     558        used exporter.
     559
     560        If `user_id` is ``None``, all running jobs are returned.
     561        """
     562        entries = []
     563        to_delete = []
     564        manager = getUtility(IJobManager)
     565        for entry in self.running_exports:
     566            if user_id is not None and entry[2] != user_id:
     567                continue
     568            if manager.get(entry[0]) is None:
     569                to_delete.append(entry)
     570                continue
     571            entries.append(entry)
     572        if to_delete:
     573            self.running_exports = PersistentList(
     574                [x for x in self.running_exports if x not in to_delete])
     575        return entries
     576
     577    def get_export_jobs_status(self, user_id=None):
     578        """Get running/completed export jobs for `user_id` as list of tuples.
     579
     580        Each tuple holds ``<raw status>, <status translated>,
     581        <exporter title>`` in that order, where ``<status
     582        translated>`` and ``<exporter title>`` are translated strings
     583        representing the status of the job and the human readable
     584        title of the exporter used.
     585        """
     586        entries = self.get_running_export_jobs(user_id)
     587        result = []
     588        manager = getUtility(IJobManager)
     589        for entry in entries:
     590            job = manager.get(entry[0])
     591            if job is None:
     592                continue
     593            status, status_translated = JOB_STATUS_MAP[job.status]
     594            exporter_name = getUtility(ICSVExporter, name=entry[1]).title
     595            result.append((status, status_translated, exporter_name))
     596        return result
     597
     598    def delete_export_entry(self, entry):
     599        """Delete the export denoted by `entry`.
     600
     601        Removes `entry` from the local `running_exports` list and also
     602        removes the regarding job via the local job manager.
     603
     604        `entry` is a tuple ``(<job id>, <exporter name>, <user id>)``
     605        as created by :meth:`start_export_job` or returned by
     606        :meth:`get_running_export_jobs`.
     607        """
     608        manager = getUtility(IJobManager)
     609        job = manager.get(entry[0])
     610        if job is not None:
     611            # remove created export file
     612            if isinstance(job.result, basestring):
     613                if os.path.exists(os.path.dirname(job.result)):
     614                    shutil.rmtree(os.path.dirname(job.result))
     615        manager.remove(entry[0], self)
     616        new_entries = [x for x in self.running_exports
     617                       if x != entry]
     618        self.running_exports = PersistentList(new_entries)
     619        return
Note: See TracChangeset for help on using the changeset viewer.