Ignore:
Timestamp:
21 Sep 2012, 11:21:05 (12 years ago)
Author:
uli
Message:

Merge changes from uli-async-update back into trunk.

Location:
main/waeup.kofa/trunk/src/waeup/kofa/utils
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • main/waeup.kofa/trunk/src/waeup/kofa/utils/batching.py

    r9170 r9217  
    2525import datetime
    2626import os
     27import shutil
    2728import tempfile
    2829import time
    2930from cStringIO import StringIO
    30 from zope.component import createObject
     31from persistent.list import PersistentList
     32from zope.component import createObject, getUtility
     33from zope.component.hooks import setSite
    3134from zope.interface import Interface
    3235from zope.schema import getFields
    3336from zope.event import notify
     37from waeup.kofa.async import AsyncJob
    3438from waeup.kofa.interfaces import (
    35     IBatchProcessor, FatalCSVError, IObjectConverter,
    36     ICSVExporter, IGNORE_MARKER, DuplicationError)
     39    IBatchProcessor, FatalCSVError, IObjectConverter, IJobManager,
     40    ICSVExporter, IGNORE_MARKER, DuplicationError, JOB_STATUS_MAP,
     41    IExportJobContainer, IExportJob)
    3742
    3843class BatchProcessor(grok.GlobalUtility):
     
    361366                    self.writeFailedRow(
    362367                        failed_writer, string_row,
    363                         "Cannot remove: no such entry")
     368                        "Cannot remove: no such entry.")
    364369                    continue
    365370                self.delEntry(row, site)
     
    498503        """
    499504        raise NotImplementedError
     505
     506
     507def export_job(site, exporter_name):
     508    """Export all entries delivered by exporter and store it in a temp file.
     509
     510    `site` gives the site to search. It will be passed to the exporter
     511    and also be set as 'current site' as the function is used in
     512    asynchronous jobs which run in their own threads and have no site
     513    set initially. Therefore `site` must also be a valid value for use
     514    with `zope.component.hooks.setSite()`.
     515
     516    `exporter_name` is the utility name under which the desired
     517    exporter was registered with the ZCA.
     518
     519    The resulting CSV file will be stored in a new temporary directory
     520    (using :func:`tempfile.mkdtemp`). It will be named after the
     521    exporter used with `.csv` filename extension.
     522
     523    Returns the path to the created CSV file.
     524
     525    .. note:: It is the callers responsibility to clean up the used
     526              file and its parent directory.
     527    """
     528    setSite(site)
     529    exporter = getUtility(ICSVExporter, name=exporter_name)
     530    output_dir = tempfile.mkdtemp()
     531    filename = '%s.csv' % exporter_name
     532    output_path = os.path.join(output_dir, filename)
     533    exporter.export_all(site, filepath=output_path)
     534    return output_path
     535
     536class AsyncExportJob(AsyncJob):
     537    """An IJob that exports data to CSV files.
     538
     539    `AsyncExportJob` instances are regular `AsyncJob` instances with a
     540    different constructor API. Instead of a callable to execute, you
     541    must pass a `site` and some `exporter_name` to trigger an export.
     542
     543    The real work is done when an instance of this class is put into a
     544    queue. See :mod:`waeup.kofa.async` to learn more about
     545    asynchronous jobs.
     546
     547    The `exporter_name` must be the name under which an ICSVExporter
     548    utility was registered with the ZCA.
     549
     550    The `site` must be a valid site  or ``None``.
     551
     552    The result of an `AsyncExportJob` is the path to generated CSV
     553    file. The file will reside in a temporary directory that should be
     554    removed after being used.
     555    """
     556    grok.implements(IExportJob)
     557
     558    def __init__(self, site, exporter_name):
     559        super(AsyncExportJob, self).__init__(
     560            export_job, site, exporter_name)
     561
     562class ExportJobContainer(object):
     563    """A mix-in that provides functionality for asynchronous export jobs.
     564    """
     565    grok.implements(IExportJobContainer)
     566    running_exports = PersistentList()
     567
     568    def start_export_job(self, exporter_name, user_id):
     569        """Start asynchronous export job.
     570
     571        `exporter_name` is the name of an exporter utility to be used.
     572
     573        `user_id` is the ID of the user that triggers the export.
     574
     575        The job_id is stored along with exporter name and user id in a
     576        persistent list.
     577
     578        Returns the job ID of the job started.
     579        """
     580        site = grok.getSite()
     581        manager = getUtility(IJobManager)
     582        job = AsyncExportJob(site, exporter_name)
     583        job_id = manager.put(job)
     584        # Make sure that the persisted list is stored in ZODB
     585        self.running_exports = PersistentList(self.running_exports)
     586        self.running_exports.append((job_id, exporter_name, user_id))
     587        return job_id
     588
     589    def get_running_export_jobs(self, user_id=None):
     590        """Get export jobs for user with `user_id` as list of tuples.
     591
     592        Each tuples holds ``<job_id>, <exporter_name>, <user_id>`` in
     593        that order. The ``<exporter_name>`` is the utility name of the
     594        used exporter.
     595
     596        If `user_id` is ``None``, all running jobs are returned.
     597        """
     598        entries = []
     599        to_delete = []
     600        manager = getUtility(IJobManager)
     601        for entry in self.running_exports:
     602            if user_id is not None and entry[2] != user_id:
     603                continue
     604            if manager.get(entry[0]) is None:
     605                to_delete.append(entry)
     606                continue
     607            entries.append(entry)
     608        if to_delete:
     609            self.running_exports = PersistentList(
     610                [x for x in self.running_exports if x not in to_delete])
     611        return entries
     612
     613    def get_export_jobs_status(self, user_id=None):
     614        """Get running/completed export jobs for `user_id` as list of tuples.
     615
     616        Each tuple holds ``<raw status>, <status translated>,
     617        <exporter title>`` in that order, where ``<status
     618        translated>`` and ``<exporter title>`` are translated strings
     619        representing the status of the job and the human readable
     620        title of the exporter used.
     621        """
     622        entries = self.get_running_export_jobs(user_id)
     623        result = []
     624        manager = getUtility(IJobManager)
     625        for entry in entries:
     626            job = manager.get(entry[0])
     627            if job is None:
     628                continue
     629            status, status_translated = JOB_STATUS_MAP[job.status]
     630            exporter_name = getUtility(ICSVExporter, name=entry[1]).title
     631            result.append((status, status_translated, exporter_name))
     632        return result
     633
     634    def delete_export_entry(self, entry):
     635        """Delete the export denoted by `entry`.
     636
     637        Removes given entry from the local `running_exports` list and also
     638        removes the regarding job via the local job manager.
     639
     640        `entry` must be a tuple ``(<job id>, <exporter name>, <user
     641        id>)`` as created by :meth:`start_export_job` or returned by
     642        :meth:`get_running_export_jobs`.
     643        """
     644        manager = getUtility(IJobManager)
     645        job = manager.get(entry[0])
     646        if job is not None:
     647            # remove created export file
     648            if isinstance(job.result, basestring):
     649                if os.path.exists(os.path.dirname(job.result)):
     650                    shutil.rmtree(os.path.dirname(job.result))
     651        manager.remove(entry[0], self)
     652        new_entries = [x for x in self.running_exports
     653                       if x != entry]
     654        self.running_exports = PersistentList(new_entries)
     655        return
     656
     657    def entry_from_job_id(self, job_id):
     658        """Get entry tuple for `job_id`.
     659
     660        Returns ``None`` if no such entry can be found.
     661        """
     662        for entry in self.running_exports:
     663            if entry[0] == job_id:
     664                return entry
     665        return None
  • main/waeup.kofa/trunk/src/waeup/kofa/utils/tests/test_batching.py

    r8380 r9217  
    2323import tempfile
    2424import unittest
     25from zc.async.interfaces import IJob, COMPLETED
    2526from zope import schema
    26 from zope.component import provideUtility
     27from zope.component import provideUtility, getGlobalSiteManager
    2728from zope.component.factory import Factory
    2829from zope.component.hooks import clearSite
     
    3031from zope.interface import Interface, implements, verify
    3132from waeup.kofa.app import University
    32 from waeup.kofa.interfaces import ICSVExporter, IBatchProcessor
     33from waeup.kofa.interfaces import (
     34    ICSVExporter, IBatchProcessor, IExportJobContainer, IJobManager,
     35    IExportJob)
    3336from waeup.kofa.testing import FunctionalLayer, FunctionalTestCase
    34 from waeup.kofa.utils.batching import ExporterBase
     37from waeup.kofa.utils.batching import (
     38    ExporterBase, BatchProcessor, export_job, AsyncExportJob,
     39    ExportJobContainer)
    3540
    3641optionflags = (
     
    7176        self.owner = owner
    7277        self.taxpayer = taxpayer
    73 #Cave = attrs_to_fields(Cave)
    7478
    7579stoneville = dict
    7680
    77 from waeup.kofa.utils.batching import BatchProcessor
     81SAMPLE_DATA = """name,dinoports,owner,taxpayer
     82Barneys Home,2,Barney,1
     83Wilmas Asylum,1,Wilma,1
     84Freds Dinoburgers,10,Fred,0
     85Joeys Drive-in,110,Joey,0
     86"""
     87
    7888class CaveProcessor(BatchProcessor):
    7989    util_name = 'caveprocessor'
    80     #grok.name(util_name)
    8190    name = 'Cave Processor'
    8291    iface = ICave
     
    136145        # Provide sample data
    137146        self.newcomers_csv = os.path.join(self.workdir, 'newcomers.csv')
    138         open(self.newcomers_csv, 'wb').write(
    139             """name,dinoports,owner,taxpayer
    140 Barneys Home,2,Barney,1
    141 Wilmas Asylum,1,Wilma,1
    142 Freds Dinoburgers,10,Fred,0
    143 Joeys Drive-in,110,Joey,0
    144 """)
     147        open(self.newcomers_csv, 'wb').write(SAMPLE_DATA)
    145148        self.setupLogger()
    146149        self.stoneville = stoneville
     
    336339        self.assertEqual(result, None)
    337340        return
     341
     342
     343class CaveExporter(ExporterBase):
     344    # A minimal fake exporter suitable to be called by export_jobs
     345    fields = ('name', 'dinoports', 'owner', 'taxpayer')
     346    title = u'Dummy cave exporter'
     347
     348    def export_all(self, site, filepath=None):
     349        if filepath is None:
     350            return SAMPLE_DATA
     351        open(filepath, 'wb').write(SAMPLE_DATA)
     352        return
     353
     354class ExportJobTests(unittest.TestCase):
     355    # Test asynchronous export functionality (simple cases)
     356
     357    def setUp(self):
     358        # register a suitable ICSVExporter as named utility
     359        self.exporter = CaveExporter()
     360        self.gsm = getGlobalSiteManager()
     361        self.gsm.registerUtility(
     362            self.exporter, ICSVExporter, name='cave_exporter')
     363
     364    def tearDown(self):
     365        self.gsm.unregisterUtility(self.exporter)
     366
     367    def test_export_job_func(self):
     368        # the export_job func does really export data...
     369        result_path = export_job(None, 'cave_exporter')
     370        self.assertTrue(os.path.isfile(result_path))
     371        contents = open(result_path, 'rb').read()
     372        shutil.rmtree(os.path.dirname(result_path))
     373        self.assertEqual(contents, SAMPLE_DATA)
     374        return
     375
     376    def test_export_job_interfaces(self):
     377        # the AsyncExportJob implements promised interfaces correctly...
     378        job = AsyncExportJob(None, None)
     379        verify.verifyClass(IJob, AsyncExportJob)
     380        verify.verifyObject(IJob, job)
     381        verify.verifyClass(IExportJob, AsyncExportJob)
     382        verify.verifyObject(IExportJob, job)
     383        return
     384
     385
     386class FakeJob(object):
     387
     388    status = COMPLETED
     389    result = None
     390
     391class FakeJobWithResult(FakeJob):
     392
     393    def __init__(self):
     394        dir_path = tempfile.mkdtemp()
     395        self.result = os.path.join(dir_path, 'fake.csv')
     396        open(self.result, 'wb').write('a fake result')
     397        return
     398
     399class FakeJobManager(object):
     400
     401    _jobs = dict()
     402    _curr_num = 1
     403
     404    def get(self, job_id):
     405        if job_id == '3':
     406            return FakeJob()
     407        return self._jobs.get(job_id, None)
     408
     409    def put(self, job):
     410        num = str(self._curr_num)
     411        self._jobs[num] = job
     412        self._curr_num += 1
     413        return num
     414
     415    def remove(self, job_id, site):
     416        if job_id in self._jobs:
     417            del self._jobs[job_id]
     418        return
     419
     420class ExportJobContainerTests(unittest.TestCase):
     421    # Test ExportJobContainer
     422
     423    def setUp(self):
     424        # register a suitable ICSVExporter as named utility
     425        self.exporter = CaveExporter()
     426        self.job_manager = FakeJobManager()
     427        self.gsm = getGlobalSiteManager()
     428        self.gsm.registerUtility(
     429            self.exporter, ICSVExporter, name='cave_exporter')
     430        self.gsm.registerUtility(
     431            self.job_manager, IJobManager)
     432
     433    def tearDown(self):
     434        self.gsm.unregisterUtility(self.exporter)
     435        self.gsm.unregisterUtility(self.job_manager, IJobManager)
     436
     437    def test_export_job_interfaces(self):
     438        # the ExportJobContainer implements promised interfaces correctly...
     439        container = ExportJobContainer()
     440        verify.verifyClass(IExportJobContainer, ExportJobContainer)
     441        verify.verifyObject(IExportJobContainer, container)
     442        return
     443
     444    def test_start_export_job(self):
     445        # we can start jobs
     446        container = ExportJobContainer()
     447        container.start_export_job('cave_exporter', 'bob')
     448        result = self.job_manager._jobs.values()[0]
     449        self.assertTrue(IJob.providedBy(result))
     450        self.assertEqual(
     451            container.running_exports,
     452            [('1', 'cave_exporter', 'bob')]
     453            )
     454        return
     455
     456    def test_get_running_export_jobs_all(self):
     457        # we can get export jobs of all users
     458        container = ExportJobContainer()
     459        container.start_export_job('cave_exporter', 'bob')
     460        container.start_export_job('cave_exporter', 'alice')
     461        result = container.get_running_export_jobs()
     462        self.assertEqual(
     463            result,
     464            [('1', 'cave_exporter', 'bob'),
     465             ('2', 'cave_exporter', 'alice')]
     466            )
     467        return
     468
     469    def test_get_running_export_jobs_user(self):
     470        # we can get the export jobs running for a certain user
     471        container = ExportJobContainer()
     472        container.start_export_job('cave_exporter', 'bob')
     473        container.start_export_job('cave_exporter', 'alice')
     474        result1 = container.get_running_export_jobs(user_id='alice')
     475        result2 = container.get_running_export_jobs(user_id='foo')
     476        self.assertEqual(
     477            result1, [('2', 'cave_exporter', 'alice')])
     478        self.assertEqual(
     479            result2, [])
     480        return
     481
     482    def test_get_running_export_jobs_only_if_exist(self):
     483        # we get only jobs that are accessible through the job manager...
     484        container = ExportJobContainer()
     485        container.start_export_job('cave_exporter', 'bob')
     486        container.start_export_job('cave_exporter', 'bob')
     487        self.assertTrue(
     488            ('2', 'cave_exporter', 'bob') in container.running_exports)
     489        # we remove the second entry from job manager
     490        del self.job_manager._jobs['2']
     491        result = container.get_running_export_jobs(user_id='bob')
     492        self.assertEqual(
     493            result, [('1', 'cave_exporter', 'bob')])
     494        self.assertTrue(
     495            ('2', 'cave_exporter', 'bob') not in container.running_exports)
     496        return
     497
     498    def test_get_export_job_status(self):
     499        # we can get the stati of jobs...
     500        container = ExportJobContainer()
     501        container.start_export_job('cave_exporter', 'alice')
     502        container.start_export_job('cave_exporter', 'bob')
     503        container.start_export_job('cave_exporter', 'bob')
     504        result = container.get_export_jobs_status(user_id='bob')
     505        # we'll get the raw value, a translation and the title of the
     506        # exporter
     507        self.assertEqual(
     508            result,
     509            [('new', u'new', u'Dummy cave exporter'),
     510             ('completed', u'completed', u'Dummy cave exporter')]
     511            )
     512        return
     513
     514    def test_delete_export_entry(self):
     515        # we can remove export entries in local lists and the job
     516        # manager as well...
     517        container = ExportJobContainer()
     518        container.start_export_job('cave_exporter', 'bob')
     519        entry = container.running_exports[0]
     520        container.delete_export_entry(entry)
     521        # both, running_exports list and job manager are empty now
     522        self.assertEqual(
     523            container.running_exports, [])
     524        self.assertEqual(
     525            self.job_manager._jobs, {})
     526        return
     527
     528    def test_delete_export_entry_remove_file(self):
     529        # any result files of exports are deleted as well
     530        container = ExportJobContainer()
     531        entry = ('4', 'cave_exporter', 'bob')
     532        container.running_exports = [entry]
     533        fake_job = FakeJobWithResult()
     534        self.job_manager._jobs['4'] = fake_job
     535        self.assertTrue(os.path.isfile(fake_job.result))
     536        container.delete_export_entry(entry)
     537        self.assertTrue(not os.path.exists(fake_job.result))
     538        return
     539
     540    def test_entry_from_job_id(self):
     541        # we can get an entry for a job_id if the id exists
     542        container = ExportJobContainer()
     543        entry = ('4', 'cave_exporter', 'bob')
     544        container.running_exports = [entry]
     545        fake_job = FakeJobWithResult()
     546        self.job_manager._jobs['4'] = fake_job
     547        result1 = container.entry_from_job_id(None)
     548        result2 = container.entry_from_job_id('4')
     549        result3 = container.entry_from_job_id('23')
     550        self.assertEqual(result1, None)
     551        self.assertEqual(result2, ('4', 'cave_exporter', 'bob'))
     552        self.assertEqual(result3, None)
     553        return
Note: See TracChangeset for help on using the changeset viewer.