source: main/waeup.kofa/trunk/src/waeup/kofa/utils/batching.py @ 12950

Last change on this file since 12950 was 12869, checked in by Henrik Bettermann, 10 years ago

Start documenting batch processors.

  • Property svn:keywords set to Id
File size: 31.8 KB
RevLine 
[7196]1## $Id: batching.py 12869 2015-04-22 19:14:14Z henrik $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
[7819]18"""Kofa components for batch processing.
[4806]19
20Batch processors eat CSV files to add, update or remove large numbers
21of certain kinds of objects at once.
22"""
23import grok
[8380]24import datetime
[4821]25import os
[9217]26import shutil
[4900]27import tempfile
[4821]28import time
[10027]29import unicodecsv
[9816]30import zc.async.interfaces
[7859]31from cStringIO import StringIO
[9217]32from persistent.list import PersistentList
33from zope.component import createObject, getUtility
34from zope.component.hooks import setSite
[9726]35from zope.interface import Interface, implementer
[4806]36from zope.schema import getFields
[11849]37from zope.schema.interfaces import ConstraintNotSatisfied
[8332]38from zope.event import notify
[9217]39from waeup.kofa.async import AsyncJob
[7811]40from waeup.kofa.interfaces import (
[9217]41    IBatchProcessor, FatalCSVError, IObjectConverter, IJobManager,
42    ICSVExporter, IGNORE_MARKER, DuplicationError, JOB_STATUS_MAP,
[9726]43    IExportJobContainer, IExportJob, IExportContainerFinder)
[4806]44
45class BatchProcessor(grok.GlobalUtility):
46    """A processor to add, update, or remove data.
47
48    This is a non-active baseclass.
49    """
[8220]50    grok.implements(IBatchProcessor)
[4806]51    grok.context(Interface)
52    grok.baseclass()
53
54    # Name used in pages and forms...
[7933]55    name = u'Non-registered base processor'
[6259]56
[4806]57    # Internal name...
[12869]58    util_name = ''
[6259]59
[4806]60    # Items for this processor need an interface with zope.schema fields.
[5009]61    iface = Interface
[6259]62
[4806]63    # The name must be the same as the util_name attribute in order to
64    # register this utility correctly.
65    grok.name(util_name)
66
67    # Headers needed to locate items...
[12869]68    location_fields = []
[6259]69
[4806]70    # A factory with this name must be registered...
[12869]71    factory_name = ''
[4806]72
73    @property
74    def required_fields(self):
[4829]75        """Required fields that have no default.
76
77        A list of names of field, whose value cannot be set if not
78        given during creation. Therefore these fields must exist in
79        input.
80
81        Fields with a default != missing_value do not belong to this
82        category.
83        """
[4806]84        result = []
85        for key, field in getFields(self.iface).items():
86            if key in self.location_fields:
87                continue
[4829]88            if field.default is not field.missing_value:
89                continue
[4806]90            if field.required:
91                result.append(key)
92        return result
[6259]93
[4806]94    @property
95    def req(self):
96        result = dict(
97            create = self.location_fields + self.required_fields,
98            update = self.location_fields,
99            remove = self.location_fields,
100        )
101        return result
102
103    @property
104    def available_fields(self):
105        return sorted(list(set(
106                    self.location_fields + getFields(self.iface).keys())))
[6259]107
[4806]108    def getHeaders(self, mode='create'):
109        return self.available_fields
110
111    def checkHeaders(self, headerfields, mode='create'):
112        req = self.req[mode]
113        # Check for required fields...
114        for field in req:
115            if not field in headerfields:
116                raise FatalCSVError(
117                    "Need at least columns %s for import!" %
118                    ', '.join(["'%s'" % x for x in req]))
[6828]119        # Check for double fields. Cannot happen because this error is
120        # already catched in views
[4806]121        not_ignored_fields = [x for x in headerfields
122                              if not x.startswith('--')]
123        if len(set(not_ignored_fields)) < len(not_ignored_fields):
124            raise FatalCSVError(
125                "Double headers: each column name may only appear once.")
126        return True
127
128    def applyMapping(self, row, mapping):
[4811]129        """Apply mapping to a row of CSV data.
130        """
[4806]131        result = dict()
132        for key, replacement in mapping.items():
[6824]133            if replacement == u'--IGNORE--':
134                # Skip ignored columns in failed and finished data files.
135                continue
[4806]136            result[replacement] = row[key]
137        return result
[6259]138
[4832]139    def getMapping(self, path, headerfields, mode):
[6824]140        """Get a mapping from CSV file headerfields to actually used fieldnames.
141
[4811]142        """
[4832]143        result = dict()
[10027]144        reader = unicodecsv.reader(open(path, 'rb'))
[4806]145        raw_header = reader.next()
[4832]146        for num, field in enumerate(headerfields):
147            if field not in self.location_fields and mode == 'remove':
[6824]148                # Skip non-location fields when removing.
149                continue
150            if field == u'--IGNORE--':
151                # Skip ignored columns in failed and finished data files.
152                continue
[4832]153            result[raw_header[num]] = field
154        return result
[4806]155
[6273]156    def stringFromErrs(self, errors, inv_errors):
157        result = []
158        for err in errors:
159            fieldname, message = err
160            result.append("%s: %s" % (fieldname, message))
161        for err in inv_errors:
162            result.append("invariant: %s" % err)
163        return '; '.join(result)
164
[4806]165    def callFactory(self, *args, **kw):
166        return createObject(self.factory_name)
167
168    def parentsExist(self, row, site):
[4811]169        """Tell whether the parent object for data in ``row`` exists.
170        """
[4806]171        raise NotImplementedError('method not implemented')
172
173    def entryExists(self, row, site):
[4811]174        """Tell whether there already exists an entry for ``row`` data.
175        """
[4806]176        raise NotImplementedError('method not implemented')
177
178    def getParent(self, row, site):
[4811]179        """Get the parent object for the entry in ``row``.
180        """
[4806]181        raise NotImplementedError('method not implemented')
[6259]182
[5009]183    def getEntry(self, row, site):
[12513]184        """Get the object for the entry in ``row``.
[5009]185        """
186        raise NotImplementedError('method not implemented')
[6259]187
[4806]188    def addEntry(self, obj, row, site):
[4811]189        """Add the entry given given by ``row`` data.
190        """
[4806]191        raise NotImplementedError('method not implemented')
192
193    def delEntry(self, row, site):
[4811]194        """Delete entry given by ``row`` data.
195        """
[6259]196        raise NotImplementedError('method not implemented')
[4806]197
[7950]198    def checkUpdateRequirements(self, obj, row, site):
199        """Checks requirements the object must fulfill when being updated.
[7938]200
201        This method is not used in case of deleting or adding objects.
202
[7950]203        Returns error messages as strings in case of requirement
[7938]204        problems.
[7937]205        """
[7938]206        return None
[7937]207
[9706]208    def updateEntry(self, obj, row, site, filename):
[4984]209        """Update obj to the values given in row.
[8220]210
211        Returns a string describing the fields changed.
[4984]212        """
[8220]213        changed = []
[4829]214        for key, value in row.items():
[8220]215            # Skip fields to be ignored.
216            if value == IGNORE_MARKER:
217                continue
[8304]218            # Skip fields not declared in interface and which are
219            # not yet attributes of existing objects. We can thus not
220            # add non-existing attributes here.
[8220]221            if not hasattr(obj, key):
222                continue
[9265]223            try:
224                setattr(obj, key, value)
225            except AttributeError:
226                # Computed attributes can't be set.
227                continue
[8222]228            log_value = getattr(value, 'code', value)
229            changed.append('%s=%s' % (key, log_value))
[8332]230
[8333]231        # If any catalog is involved it must be updated.
232        #
233        # XXX: The event is also triggered when creating objects as
234        # updateEntry is called also when creating entries resulting
235        # in objectAdded and additional objectModified events.
236        if len(changed):
237            notify(grok.ObjectModifiedEvent(obj))
[8332]238
[8220]239        return ', '.join(changed)
[4821]240
[4832]241    def createLogfile(self, path, fail_path, num, warnings, mode, user,
[4885]242                      timedelta, logger=None):
243        """Write to log file.
[4821]244        """
[4885]245        if logger is None:
246            return
[9739]247        logger.info(
248            "processed: %s, %s mode, %s lines (%s successful/ %s failed), "
249            "%0.3f s (%0.4f s/item)" % (
250            path, mode, num, num - warnings, warnings,
251            timedelta, timedelta/(num or 1)))
[4821]252        return
[4877]253
254    def writeFailedRow(self, writer, row, warnings):
255        """Write a row with error messages to error CSV.
256
257        If warnings is a list of strings, they will be concatenated.
258        """
259        error_col = warnings
260        if isinstance(warnings, list):
261            error_col = ' / '.join(warnings)
262        row['--ERRORS--'] = error_col
263        writer.writerow(row)
264        return
[6259]265
[8220]266    def checkConversion(self, row, mode='ignore', ignore_empty=True):
[6847]267        """Validates all values in row.
268        """
269        converter = IObjectConverter(self.iface)
270        errs, inv_errs, conv_dict =  converter.fromStringDict(
[8220]271            row, self.factory_name, mode=mode)
[6847]272        return errs, inv_errs, conv_dict
273
[12810]274
275    def emptyRow(self, row):
276        """Detect empty rows.
277        """
278        for value in row.values():
[12811]279            if value.strip() and not value in (None, IGNORE_MARKER):
[12810]280                return False
281        return True
282
[4885]283    def doImport(self, path, headerfields, mode='create', user='Unknown',
[8220]284                 logger=None, ignore_empty=True):
[12869]285        """In contrast to most other methods, `doImport` is not supposed to
[12867]286        be customized, neither in custom packages nor in derived batch
287        processor classes. Therefore, this is the only place where we
288        do import data.
289
290        Before this method starts creating or updating persistent data, it
291        prepares two more files in a temporary folder of the filesystem: (1)
292        a file for pending data with file extension ``.pending`` and (2)
293        a file for successfully processed data with file extension
294        ``.finished``. Then the method starts iterating over all rows of
295        the CSV file. Each row is treated as follows:
296
297        1. An empty row is skipped.
298
[12868]299        2. Empty strings in the row are replaced by ignore-markers.
[12867]300
[12868]301        3. The `BatchProcessor.checkConversion` method validates and converts
302           all values in the row. Conversion means the transformation of strings
303           into Python objects. For instance, number expressions have to be
304           transformed into integers, dates into datetime objects, phone number
305           expressions into phone number objects, etc. The converter returns a
306           dictionary with converted values or, if the validation of one of the
307           elements fails, an appropriate warning message. If the conversion
308           fails a pending record is created and stored in the pending data file
309           together with a warning message the converter has raised.
[12867]310
[12868]311        4. In **create mode** only:
[12867]312
[12868]313           The parent object must be found and a child
314           object with same object id must not exist. Otherwise the row
315           is skipped, a corresponding warning message is raised and a
316           record is stored in the pending data file.
[12867]317
[12869]318           Now `doImport` tries to add the new object with the data
[12868]319           from the conversion dictionary. In some cases this
[12869]320           may fail and a `DuplicationError` is raised. For example, a new
[12868]321           payment ticket is created but the same payment for same session
322           has already been made. In this case the object id is unique, no
323           other object with same id exists, but making the 'same' payment
324           twice does not make sense. The import is skipped and a
325           record is stored in the pending data file.
[12867]326
[12868]327        5. In **update mode** only:
328
329           If the object can't be found, the row is skipped,
330           a ``no such entry`` warning message is raised and a record is
331           stored in the pending data file.
332
333           The `BatchProcessor.checkUpdateRequirements` method checks additional
334           requirements the object must fulfill before being updated. These
335           requirements are not imposed by the data type but the context
336           of the object. For example, post-graduate students have a different
337           registration workflow. With this method we do forbid certain workflow
338           transitions or states.
339
[12869]340           Finally, `doImport` updates the existing object with the data
[12868]341           from the conversion dictionary.
342
343        6. In **remove mode** only:
344
345           If the object can't be found, the row is skipped,
346           a ``no such entry`` warning message is raised and a record is
347           stored in the pending data file.
348
[12869]349           Finally, `doImport` removes the existing object.
[12868]350
[4811]351        """
[4832]352        time_start = time.time()
[4806]353        self.checkHeaders(headerfields, mode)
[4832]354        mapping = self.getMapping(path, headerfields, mode)
[10027]355        reader = unicodecsv.DictReader(open(path, 'rb'))
[4889]356
[4900]357        temp_dir = tempfile.mkdtemp()
[6259]358
[6273]359        base = os.path.basename(path)
360        (base, ext) = os.path.splitext(base)
[4900]361        failed_path = os.path.join(temp_dir, "%s.pending%s" % (base, ext))
[6831]362        failed_headers = mapping.values()
[4877]363        failed_headers.append('--ERRORS--')
[10027]364        failed_writer = unicodecsv.DictWriter(open(failed_path, 'wb'),
365                                              failed_headers)
[8573]366        os.chmod(failed_path, 0664)
[6831]367        failed_writer.writerow(dict([(x,x) for x in failed_headers]))
[4891]368
[4900]369        finished_path = os.path.join(temp_dir, "%s.finished%s" % (base, ext))
[6831]370        finished_headers = mapping.values()
[10027]371        finished_writer = unicodecsv.DictWriter(open(finished_path, 'wb'),
372                                                finished_headers)
[8905]373        os.chmod(finished_path, 0664)
[4891]374        finished_writer.writerow(dict([(x,x) for x in finished_headers]))
[6259]375
[4806]376        num =0
[4878]377        num_warns = 0
[4806]378        site = grok.getSite()
[7859]379
[4806]380        for raw_row in reader:
381            num += 1
[12810]382            # Skip row if empty
383            if self.emptyRow(raw_row):
384                continue
[4806]385            string_row = self.applyMapping(raw_row, mapping)
[8222]386            if ignore_empty and mode in ('update',):
[12810]387                # Replace empty strings with ignore-markers
[8222]388                for key, val in string_row.items():
[8220]389                    if val == '':
[8222]390                        string_row[key] = IGNORE_MARKER
391            row = dict(string_row.items()) # create deep copy
[6847]392            errs, inv_errs, conv_dict = self.checkConversion(string_row, mode)
[6273]393            if errs or inv_errs:
[4878]394                num_warns += 1
[6273]395                conv_warnings = self.stringFromErrs(errs, inv_errs)
396                self.writeFailedRow(
[6824]397                    failed_writer, string_row, conv_warnings)
[4821]398                continue
[6273]399            row.update(conv_dict)
[6259]400
[4806]401            if mode == 'create':
402                if not self.parentsExist(row, site):
[4878]403                    num_warns += 1
[4877]404                    self.writeFailedRow(
[6824]405                        failed_writer, string_row,
[12868]406                        "Not all parents do exist yet.")
[4806]407                    continue
408                if self.entryExists(row, site):
[4878]409                    num_warns += 1
[4877]410                    self.writeFailedRow(
[6824]411                        failed_writer, string_row,
[12868]412                        "This object already exists.")
[4806]413                    continue
414                obj = self.callFactory()
[7273]415                # Override all values in row, also
416                # student_ids and applicant_ids which have been
417                # generated in the respective __init__ methods before.
[9706]418                self.updateEntry(obj, row, site, base)
[6243]419                try:
420                    self.addEntry(obj, row, site)
[6273]421                except KeyError, error:
[6219]422                    num_warns += 1
423                    self.writeFailedRow(
[12868]424                        failed_writer, string_row, error.message)
[8540]425                    continue
[8509]426                except DuplicationError, error:
427                    num_warns += 1
428                    self.writeFailedRow(
[12868]429                        failed_writer, string_row, error.msg)
[6219]430                    continue
[4806]431            elif mode == 'remove':
432                if not self.entryExists(row, site):
[4878]433                    num_warns += 1
[4877]434                    self.writeFailedRow(
[6824]435                        failed_writer, string_row,
[9219]436                        "Cannot remove: no such entry")
[4806]437                    continue
438                self.delEntry(row, site)
439            elif mode == 'update':
440                obj = self.getEntry(row, site)
441                if obj is None:
[4878]442                    num_warns += 1
[4877]443                    self.writeFailedRow(
[6824]444                        failed_writer, string_row,
[9219]445                        "Cannot update: no such entry")
[4806]446                    continue
[7950]447                update_errors = self.checkUpdateRequirements(obj, row, site)
[7938]448                if update_errors is not None:
[7937]449                    num_warns += 1
450                    self.writeFailedRow(
451                        failed_writer, string_row, update_errors)
452                    continue
[11849]453                try:
454                    self.updateEntry(obj, row, site, base)
455                except ConstraintNotSatisfied, err:
456                    num_warns += 1
457                    self.writeFailedRow(
458                        failed_writer, string_row,
459                        "ConstraintNotSatisfied: %s" % err)
460                    continue
[4891]461            finished_writer.writerow(string_row)
[4821]462
[4832]463        time_end = time.time()
464        timedelta = time_end - time_start
[6259]465
[4878]466        self.createLogfile(path, failed_path, num, num_warns, mode, user,
[4885]467                           timedelta, logger=logger)
[4894]468        failed_path = os.path.abspath(failed_path)
[4878]469        if num_warns == 0:
[4821]470            del failed_writer
471            os.unlink(failed_path)
[4894]472            failed_path = None
473        return (num, num_warns,
474                os.path.abspath(finished_path), failed_path)
[7859]475
[9032]476    def get_csv_skeleton(self):
477        """Export CSV file only with a header of available fields.
478
479        A raw string with CSV data should be returned.
480        """
481        outfile = StringIO()
[10027]482        writer = unicodecsv.DictWriter(outfile, self.available_fields)
[9734]483        writer.writerow(
484            dict(zip(self.available_fields, self.available_fields))) # header
[9032]485        outfile.seek(0)
486        return outfile.read()
487
[7859]488class ExporterBase(object):
489    """A base for exporters.
490    """
491    grok.implements(ICSVExporter)
492
493    #: Fieldnames considered by this exporter
494    fields = ('code', 'title', 'title_prefix')
495
[7907]496    #: The title under which this exporter will be displayed
497    #: (if registered as a utility)
498    title = 'Override this title'
499
[7859]500    def mangle_value(self, value, name, context=None):
[12857]501        """Hook for mangling values in derived classes.
[7859]502        """
503        if isinstance(value, bool):
504            value = value and '1' or '0'
505        elif isinstance(value, unicode):
506            # CSV writers like byte streams better than unicode
507            value = value.encode('utf-8')
[8380]508        elif isinstance(value, datetime.datetime):
[11737]509            #value = str(value)
510            value = str('%s#' % value) # changed 2014-07-06, see ticket #941
[8380]511        elif isinstance(value, datetime.date):
512            # Order is important here: check for date after datetime as
513            # datetimes are also dates.
514            #
515            # Append hash '#' to dates to circumvent unwanted excel automatic
516            value = str('%s#' % value)
[7859]517        elif value is None:
518            # None is not really representable in CSV files
519            value = ''
520        return value
521
522    def get_csv_writer(self, filepath=None):
523        """Get a CSV dict writer instance open for writing.
524
525        Returns a tuple (<writer>, <outfile>) where ``<writer>`` is a
526        :class:`csv.DictWriter` instance and outfile is the real file
527        which is written to. The latter is important when writing to
528        StringIO and can normally be ignored otherwise.
529
530        The returned file will already be filled with the header row.
531
532        Please note that if you give a filepath, the returned outfile
533        is open for writing only and you might have to close it before
534        reopening it for reading.
535        """
536        if filepath is None:
537            outfile = StringIO()
538        else:
539            outfile = open(filepath, 'wb')
[10027]540        writer = unicodecsv.DictWriter(outfile, self.fields)
[7859]541        writer.writerow(dict(zip(self.fields, self.fields))) # header
542        return writer, outfile
543
544    def write_item(self, obj, writer):
545        """Write a row extracted from `obj` into CSV file using `writer`.
546        """
547        row = {}
548        for name in self.fields:
549            value = getattr(obj, name, None)
550            value = self.mangle_value(value, name, obj)
551            row[name] = value
552        writer.writerow(row)
553        return
554
555    def close_outfile(self, filepath, outfile):
556        """Close outfile.
557        If filepath is None, the contents of outfile is returned.
558        """
559        outfile.seek(0)
560        if filepath is None:
561            return outfile.read()
562        outfile.close()
563        return
564
[9797]565    def get_filtered(self, site, **kw):
566        """Get datasets to export filtered by keyword arguments.
567        Returns an iterable.
568        """
569        raise NotImplementedError
570
[12516]571    def get_selected(self, site, selected):
572        """Get datasets to export for selected items
573        specified by a list of identifiers.
574        Returns an iterable.
575        """
576        raise NotImplementedError
577
[7859]578    def export(self, iterable, filepath=None):
579        """Export `iterable` as CSV file.
580        If `filepath` is ``None``, a raw string with CSV data should
581        be returned.
582        """
583        raise NotImplementedError
584
585    def export_all(self, site, filepath=None):
586        """Export all appropriate objects in `site` into `filepath` as
587        CSV data.
588        If `filepath` is ``None``, a raw string with CSV data should
589        be returned.
590        """
591        raise NotImplementedError
[9217]592
[9797]593    def export_filtered(self, site, filepath=None, **kw):
[12861]594        """Export items denoted by `kw`.
[9797]595        If `filepath` is ``None``, a raw string with CSV data should
596        be returned.
597        """
598        data = self.get_filtered(site, **kw)
599        return self.export(data, filepath=filepath)
600
[12516]601    def export_selected(self, site, filepath=None, **kw):
602        """Export those items specified by a list of identifiers
603        called `selected`.
604        If `filepath` is ``None``, a raw string with CSV data should
605        be returned.
606        """
607        selected = kw.get('selected', [])
608        data = self.get_selected(site, selected)
609        return self.export(data, filepath=filepath)
610
[9797]611def export_job(site, exporter_name, **kw):
[9217]612    """Export all entries delivered by exporter and store it in a temp file.
613
614    `site` gives the site to search. It will be passed to the exporter
615    and also be set as 'current site' as the function is used in
616    asynchronous jobs which run in their own threads and have no site
617    set initially. Therefore `site` must also be a valid value for use
618    with `zope.component.hooks.setSite()`.
619
620    `exporter_name` is the utility name under which the desired
621    exporter was registered with the ZCA.
622
623    The resulting CSV file will be stored in a new temporary directory
624    (using :func:`tempfile.mkdtemp`). It will be named after the
625    exporter used with `.csv` filename extension.
626
627    Returns the path to the created CSV file.
628
629    .. note:: It is the callers responsibility to clean up the used
630              file and its parent directory.
631    """
632    setSite(site)
633    exporter = getUtility(ICSVExporter, name=exporter_name)
634    output_dir = tempfile.mkdtemp()
635    filename = '%s.csv' % exporter_name
636    output_path = os.path.join(output_dir, filename)
[9797]637    if kw == {}:
638        exporter.export_all(site, filepath=output_path)
[12516]639    elif kw.has_key('selected'):
640        exporter.export_selected(site, filepath=output_path, **kw)
[9797]641    else:
642        exporter.export_filtered(site, filepath=output_path, **kw)
[9217]643    return output_path
644
645class AsyncExportJob(AsyncJob):
646    """An IJob that exports data to CSV files.
647
648    `AsyncExportJob` instances are regular `AsyncJob` instances with a
649    different constructor API. Instead of a callable to execute, you
650    must pass a `site` and some `exporter_name` to trigger an export.
651
652    The real work is done when an instance of this class is put into a
653    queue. See :mod:`waeup.kofa.async` to learn more about
654    asynchronous jobs.
655
656    The `exporter_name` must be the name under which an ICSVExporter
657    utility was registered with the ZCA.
658
659    The `site` must be a valid site  or ``None``.
660
661    The result of an `AsyncExportJob` is the path to generated CSV
662    file. The file will reside in a temporary directory that should be
663    removed after being used.
664    """
665    grok.implements(IExportJob)
666
[9718]667    def __init__(self, site, exporter_name, *args, **kwargs):
[9217]668        super(AsyncExportJob, self).__init__(
[9718]669            export_job, site, exporter_name, *args, **kwargs)
[9217]670
[9816]671    @property
672    def finished(self):
673        """A job is marked `finished` if it is completed.
674
675        Please note: a finished report job does not neccessarily
676        provide an IReport result. See meth:`failed`.
677        """
678        return self.status == zc.async.interfaces.COMPLETED
679
680    @property
681    def failed(self):
682        """A report job is marked failed iff it is finished and the
683        result is None.
684
685        While a job is unfinished, the `failed` status is ``None``.
686
687        Failed jobs normally provide a `traceback` to examine reasons.
688        """
689        if not self.finished:
690            return None
691        if getattr(self, 'result', None) is None:
692            return True
693        return False
694
[9217]695class ExportJobContainer(object):
696    """A mix-in that provides functionality for asynchronous export jobs.
697    """
698    grok.implements(IExportJobContainer)
699    running_exports = PersistentList()
700
[9718]701    def start_export_job(self, exporter_name, user_id, *args, **kwargs):
[9217]702        """Start asynchronous export job.
703
704        `exporter_name` is the name of an exporter utility to be used.
705
706        `user_id` is the ID of the user that triggers the export.
707
708        The job_id is stored along with exporter name and user id in a
709        persistent list.
710
[9718]711        The method supports additional positional and keyword
712        arguments, which are passed as-is to the respective
713        :class:`AsyncExportJob`.
714
[9217]715        Returns the job ID of the job started.
716        """
717        site = grok.getSite()
718        manager = getUtility(IJobManager)
[9718]719        job = AsyncExportJob(site, exporter_name, *args, **kwargs)
[9217]720        job_id = manager.put(job)
721        # Make sure that the persisted list is stored in ZODB
722        self.running_exports = PersistentList(self.running_exports)
723        self.running_exports.append((job_id, exporter_name, user_id))
724        return job_id
725
726    def get_running_export_jobs(self, user_id=None):
727        """Get export jobs for user with `user_id` as list of tuples.
728
729        Each tuples holds ``<job_id>, <exporter_name>, <user_id>`` in
730        that order. The ``<exporter_name>`` is the utility name of the
731        used exporter.
732
733        If `user_id` is ``None``, all running jobs are returned.
734        """
735        entries = []
736        to_delete = []
737        manager = getUtility(IJobManager)
738        for entry in self.running_exports:
739            if user_id is not None and entry[2] != user_id:
740                continue
741            if manager.get(entry[0]) is None:
742                to_delete.append(entry)
743                continue
744            entries.append(entry)
745        if to_delete:
746            self.running_exports = PersistentList(
747                [x for x in self.running_exports if x not in to_delete])
748        return entries
749
750    def get_export_jobs_status(self, user_id=None):
751        """Get running/completed export jobs for `user_id` as list of tuples.
752
753        Each tuple holds ``<raw status>, <status translated>,
754        <exporter title>`` in that order, where ``<status
755        translated>`` and ``<exporter title>`` are translated strings
756        representing the status of the job and the human readable
757        title of the exporter used.
758        """
759        entries = self.get_running_export_jobs(user_id)
760        result = []
761        manager = getUtility(IJobManager)
762        for entry in entries:
763            job = manager.get(entry[0])
764            if job is None:
765                continue
766            status, status_translated = JOB_STATUS_MAP[job.status]
767            exporter_name = getUtility(ICSVExporter, name=entry[1]).title
768            result.append((status, status_translated, exporter_name))
769        return result
770
771    def delete_export_entry(self, entry):
772        """Delete the export denoted by `entry`.
773
774        Removes given entry from the local `running_exports` list and also
775        removes the regarding job via the local job manager.
776
777        `entry` must be a tuple ``(<job id>, <exporter name>, <user
778        id>)`` as created by :meth:`start_export_job` or returned by
779        :meth:`get_running_export_jobs`.
780        """
781        manager = getUtility(IJobManager)
782        job = manager.get(entry[0])
783        if job is not None:
784            # remove created export file
785            if isinstance(job.result, basestring):
786                if os.path.exists(os.path.dirname(job.result)):
787                    shutil.rmtree(os.path.dirname(job.result))
788        manager.remove(entry[0], self)
789        new_entries = [x for x in self.running_exports
790                       if x != entry]
791        self.running_exports = PersistentList(new_entries)
792        return
793
794    def entry_from_job_id(self, job_id):
795        """Get entry tuple for `job_id`.
796
797        Returns ``None`` if no such entry can be found.
798        """
799        for entry in self.running_exports:
800            if entry[0] == job_id:
801                return entry
802        return None
[9726]803
804class VirtualExportJobContainer(ExportJobContainer):
805    """A virtual export job container.
806
807    Virtual ExportJobContainers can be used as a mixin just like real
808    ExportJobContainer.
809
810    They retrieve and store data in the site-wide ExportJobContainer.
811
812    Functionality is currently entirely as for regular
813    ExportJobContainers, except that data is stored elsewhere.
814
815    VirtualExportJobContainers need a registered
816    IExportContainerFinder utility to find a suitable container for
817    storing data.
818    """
819    grok.implements(IExportJobContainer)
820
821    @property
822    def _site_container(self):
823        return getUtility(IExportContainerFinder)()
824
825    # The following is a simple trick. While ExportJobContainers store
826    # only one attribute in ZODB, it is sufficient to replace this
827    # attribute `running_exports` with a suitable manager to make the
828    # whole virtual container work like the original but with the data
829    # stored in the site-wide exports container. This way, virtual
830    # export containers provide the whole functionality of a regular
831    # exports container but store no data at all with themselves.
832    @property
833    def running_exports(self):
834        """Exports stored in the site-wide exports container.
835        """
836        return self._site_container.running_exports
837
838    @running_exports.setter
839    def running_exports(self, value):
840        self._site_container.running_exports = value
841
842    @running_exports.deleter
843    def running_exports(self):
844        del self._site_container.running_exports
845
[9823]846    @property
847    def logger(self):
848        return self._site_container.logger
[9726]849
850@implementer(IExportContainerFinder)
851class ExportContainerFinder(grok.GlobalUtility):
852    """Finder for local (site-wide) export container.
853    """
854
855    def __call__(self):
856        """Get the local export container-
857
858        If no site can be determined or the site provides no export
859        container, None is returned.
860        """
861        site = grok.getSite()
862        if site is None:
863            return None
864        return site.get('datacenter', None)
Note: See TracBrowser for help on using the repository browser.