source: main/waeup.kofa/trunk/src/waeup/kofa/utils/batching.py @ 13156

Last change on this file since 13156 was 12997, checked in by Henrik Bettermann, 10 years ago

More docs.

  • Property svn:keywords set to Id
File size: 31.9 KB
RevLine 
[7196]1## $Id: batching.py 12997 2015-05-23 22:37:39Z henrik $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
[7819]18"""Kofa components for batch processing.
[4806]19
20Batch processors eat CSV files to add, update or remove large numbers
21of certain kinds of objects at once.
22"""
23import grok
[8380]24import datetime
[4821]25import os
[9217]26import shutil
[4900]27import tempfile
[4821]28import time
[10027]29import unicodecsv
[9816]30import zc.async.interfaces
[7859]31from cStringIO import StringIO
[9217]32from persistent.list import PersistentList
33from zope.component import createObject, getUtility
34from zope.component.hooks import setSite
[9726]35from zope.interface import Interface, implementer
[4806]36from zope.schema import getFields
[11849]37from zope.schema.interfaces import ConstraintNotSatisfied
[8332]38from zope.event import notify
[9217]39from waeup.kofa.async import AsyncJob
[7811]40from waeup.kofa.interfaces import (
[9217]41    IBatchProcessor, FatalCSVError, IObjectConverter, IJobManager,
42    ICSVExporter, IGNORE_MARKER, DuplicationError, JOB_STATUS_MAP,
[9726]43    IExportJobContainer, IExportJob, IExportContainerFinder)
[4806]44
45class BatchProcessor(grok.GlobalUtility):
46    """A processor to add, update, or remove data.
47
48    This is a non-active baseclass.
49    """
[8220]50    grok.implements(IBatchProcessor)
[4806]51    grok.context(Interface)
52    grok.baseclass()
53
54    # Name used in pages and forms...
[7933]55    name = u'Non-registered base processor'
[6259]56
[4806]57    # Internal name...
[12869]58    util_name = ''
[6259]59
[4806]60    # Items for this processor need an interface with zope.schema fields.
[5009]61    iface = Interface
[6259]62
[4806]63    # The name must be the same as the util_name attribute in order to
64    # register this utility correctly.
65    grok.name(util_name)
66
67    # Headers needed to locate items...
[12869]68    location_fields = []
[6259]69
[4806]70    # A factory with this name must be registered...
[12869]71    factory_name = ''
[4806]72
73    @property
74    def required_fields(self):
[4829]75        """Required fields that have no default.
76
77        A list of names of field, whose value cannot be set if not
78        given during creation. Therefore these fields must exist in
79        input.
80
81        Fields with a default != missing_value do not belong to this
82        category.
83        """
[4806]84        result = []
85        for key, field in getFields(self.iface).items():
86            if key in self.location_fields:
87                continue
[4829]88            if field.default is not field.missing_value:
89                continue
[4806]90            if field.required:
91                result.append(key)
92        return result
[6259]93
[4806]94    @property
95    def req(self):
96        result = dict(
97            create = self.location_fields + self.required_fields,
98            update = self.location_fields,
99            remove = self.location_fields,
100        )
101        return result
102
103    @property
104    def available_fields(self):
105        return sorted(list(set(
106                    self.location_fields + getFields(self.iface).keys())))
[6259]107
[4806]108    def getHeaders(self, mode='create'):
109        return self.available_fields
110
111    def checkHeaders(self, headerfields, mode='create'):
112        req = self.req[mode]
113        # Check for required fields...
114        for field in req:
115            if not field in headerfields:
116                raise FatalCSVError(
117                    "Need at least columns %s for import!" %
118                    ', '.join(["'%s'" % x for x in req]))
[6828]119        # Check for double fields. Cannot happen because this error is
120        # already catched in views
[4806]121        not_ignored_fields = [x for x in headerfields
122                              if not x.startswith('--')]
123        if len(set(not_ignored_fields)) < len(not_ignored_fields):
124            raise FatalCSVError(
125                "Double headers: each column name may only appear once.")
126        return True
127
128    def applyMapping(self, row, mapping):
[4811]129        """Apply mapping to a row of CSV data.
130        """
[4806]131        result = dict()
132        for key, replacement in mapping.items():
[6824]133            if replacement == u'--IGNORE--':
134                # Skip ignored columns in failed and finished data files.
135                continue
[4806]136            result[replacement] = row[key]
137        return result
[6259]138
[4832]139    def getMapping(self, path, headerfields, mode):
[6824]140        """Get a mapping from CSV file headerfields to actually used fieldnames.
141
[4811]142        """
[4832]143        result = dict()
[10027]144        reader = unicodecsv.reader(open(path, 'rb'))
[4806]145        raw_header = reader.next()
[4832]146        for num, field in enumerate(headerfields):
147            if field not in self.location_fields and mode == 'remove':
[6824]148                # Skip non-location fields when removing.
149                continue
150            if field == u'--IGNORE--':
151                # Skip ignored columns in failed and finished data files.
152                continue
[4832]153            result[raw_header[num]] = field
154        return result
[4806]155
[6273]156    def stringFromErrs(self, errors, inv_errors):
157        result = []
158        for err in errors:
159            fieldname, message = err
160            result.append("%s: %s" % (fieldname, message))
161        for err in inv_errors:
162            result.append("invariant: %s" % err)
163        return '; '.join(result)
164
[4806]165    def callFactory(self, *args, **kw):
166        return createObject(self.factory_name)
167
168    def parentsExist(self, row, site):
[4811]169        """Tell whether the parent object for data in ``row`` exists.
170        """
[4806]171        raise NotImplementedError('method not implemented')
172
173    def entryExists(self, row, site):
[4811]174        """Tell whether there already exists an entry for ``row`` data.
175        """
[4806]176        raise NotImplementedError('method not implemented')
177
178    def getParent(self, row, site):
[4811]179        """Get the parent object for the entry in ``row``.
180        """
[4806]181        raise NotImplementedError('method not implemented')
[6259]182
[5009]183    def getEntry(self, row, site):
[12513]184        """Get the object for the entry in ``row``.
[5009]185        """
186        raise NotImplementedError('method not implemented')
[6259]187
[4806]188    def addEntry(self, obj, row, site):
[4811]189        """Add the entry given given by ``row`` data.
190        """
[4806]191        raise NotImplementedError('method not implemented')
192
193    def delEntry(self, row, site):
[4811]194        """Delete entry given by ``row`` data.
195        """
[6259]196        raise NotImplementedError('method not implemented')
[4806]197
[7950]198    def checkUpdateRequirements(self, obj, row, site):
199        """Checks requirements the object must fulfill when being updated.
[7938]200
201        This method is not used in case of deleting or adding objects.
202
[7950]203        Returns error messages as strings in case of requirement
[7938]204        problems.
[7937]205        """
[7938]206        return None
[7937]207
[9706]208    def updateEntry(self, obj, row, site, filename):
[4984]209        """Update obj to the values given in row.
[8220]210
211        Returns a string describing the fields changed.
[4984]212        """
[8220]213        changed = []
[4829]214        for key, value in row.items():
[8220]215            # Skip fields to be ignored.
216            if value == IGNORE_MARKER:
217                continue
[8304]218            # Skip fields not declared in interface and which are
219            # not yet attributes of existing objects. We can thus not
220            # add non-existing attributes here.
[8220]221            if not hasattr(obj, key):
222                continue
[9265]223            try:
224                setattr(obj, key, value)
225            except AttributeError:
226                # Computed attributes can't be set.
227                continue
[8222]228            log_value = getattr(value, 'code', value)
229            changed.append('%s=%s' % (key, log_value))
[8332]230
[8333]231        # If any catalog is involved it must be updated.
232        #
233        # XXX: The event is also triggered when creating objects as
234        # updateEntry is called also when creating entries resulting
235        # in objectAdded and additional objectModified events.
236        if len(changed):
237            notify(grok.ObjectModifiedEvent(obj))
[8332]238
[8220]239        return ', '.join(changed)
[4821]240
[4832]241    def createLogfile(self, path, fail_path, num, warnings, mode, user,
[4885]242                      timedelta, logger=None):
243        """Write to log file.
[4821]244        """
[4885]245        if logger is None:
246            return
[9739]247        logger.info(
248            "processed: %s, %s mode, %s lines (%s successful/ %s failed), "
249            "%0.3f s (%0.4f s/item)" % (
250            path, mode, num, num - warnings, warnings,
251            timedelta, timedelta/(num or 1)))
[4821]252        return
[4877]253
254    def writeFailedRow(self, writer, row, warnings):
255        """Write a row with error messages to error CSV.
256
257        If warnings is a list of strings, they will be concatenated.
258        """
259        error_col = warnings
260        if isinstance(warnings, list):
261            error_col = ' / '.join(warnings)
262        row['--ERRORS--'] = error_col
263        writer.writerow(row)
264        return
[6259]265
[8220]266    def checkConversion(self, row, mode='ignore', ignore_empty=True):
[6847]267        """Validates all values in row.
268        """
269        converter = IObjectConverter(self.iface)
270        errs, inv_errs, conv_dict =  converter.fromStringDict(
[8220]271            row, self.factory_name, mode=mode)
[6847]272        return errs, inv_errs, conv_dict
273
[12810]274
275    def emptyRow(self, row):
276        """Detect empty rows.
277        """
278        for value in row.values():
[12811]279            if value.strip() and not value in (None, IGNORE_MARKER):
[12810]280                return False
281        return True
282
[4885]283    def doImport(self, path, headerfields, mode='create', user='Unknown',
[8220]284                 logger=None, ignore_empty=True):
[12869]285        """In contrast to most other methods, `doImport` is not supposed to
[12867]286        be customized, neither in custom packages nor in derived batch
287        processor classes. Therefore, this is the only place where we
288        do import data.
289
290        Before this method starts creating or updating persistent data, it
291        prepares two more files in a temporary folder of the filesystem: (1)
292        a file for pending data with file extension ``.pending`` and (2)
293        a file for successfully processed data with file extension
294        ``.finished``. Then the method starts iterating over all rows of
295        the CSV file. Each row is treated as follows:
296
297        1. An empty row is skipped.
298
[12997]299        2. Empty strings or lists (``[]``) in the row are replaced by
300           ignore markers.
[12867]301
[12868]302        3. The `BatchProcessor.checkConversion` method validates and converts
303           all values in the row. Conversion means the transformation of strings
304           into Python objects. For instance, number expressions have to be
305           transformed into integers, dates into datetime objects, phone number
306           expressions into phone number objects, etc. The converter returns a
307           dictionary with converted values or, if the validation of one of the
308           elements fails, an appropriate warning message. If the conversion
309           fails a pending record is created and stored in the pending data file
310           together with a warning message the converter has raised.
[12867]311
[12868]312        4. In **create mode** only:
[12867]313
[12868]314           The parent object must be found and a child
315           object with same object id must not exist. Otherwise the row
316           is skipped, a corresponding warning message is raised and a
317           record is stored in the pending data file.
[12867]318
[12869]319           Now `doImport` tries to add the new object with the data
[12868]320           from the conversion dictionary. In some cases this
[12869]321           may fail and a `DuplicationError` is raised. For example, a new
[12868]322           payment ticket is created but the same payment for same session
323           has already been made. In this case the object id is unique, no
324           other object with same id exists, but making the 'same' payment
325           twice does not make sense. The import is skipped and a
326           record is stored in the pending data file.
[12867]327
[12868]328        5. In **update mode** only:
329
330           If the object can't be found, the row is skipped,
331           a ``no such entry`` warning message is raised and a record is
332           stored in the pending data file.
333
334           The `BatchProcessor.checkUpdateRequirements` method checks additional
335           requirements the object must fulfill before being updated. These
336           requirements are not imposed by the data type but the context
337           of the object. For example, post-graduate students have a different
338           registration workflow. With this method we do forbid certain workflow
339           transitions or states.
340
[12869]341           Finally, `doImport` updates the existing object with the data
[12868]342           from the conversion dictionary.
343
344        6. In **remove mode** only:
345
346           If the object can't be found, the row is skipped,
347           a ``no such entry`` warning message is raised and a record is
348           stored in the pending data file.
349
[12869]350           Finally, `doImport` removes the existing object.
[12868]351
[4811]352        """
[4832]353        time_start = time.time()
[4806]354        self.checkHeaders(headerfields, mode)
[4832]355        mapping = self.getMapping(path, headerfields, mode)
[10027]356        reader = unicodecsv.DictReader(open(path, 'rb'))
[4889]357
[4900]358        temp_dir = tempfile.mkdtemp()
[6259]359
[6273]360        base = os.path.basename(path)
361        (base, ext) = os.path.splitext(base)
[4900]362        failed_path = os.path.join(temp_dir, "%s.pending%s" % (base, ext))
[6831]363        failed_headers = mapping.values()
[4877]364        failed_headers.append('--ERRORS--')
[10027]365        failed_writer = unicodecsv.DictWriter(open(failed_path, 'wb'),
366                                              failed_headers)
[8573]367        os.chmod(failed_path, 0664)
[6831]368        failed_writer.writerow(dict([(x,x) for x in failed_headers]))
[4891]369
[4900]370        finished_path = os.path.join(temp_dir, "%s.finished%s" % (base, ext))
[6831]371        finished_headers = mapping.values()
[10027]372        finished_writer = unicodecsv.DictWriter(open(finished_path, 'wb'),
373                                                finished_headers)
[8905]374        os.chmod(finished_path, 0664)
[4891]375        finished_writer.writerow(dict([(x,x) for x in finished_headers]))
[6259]376
[4806]377        num =0
[4878]378        num_warns = 0
[4806]379        site = grok.getSite()
[7859]380
[4806]381        for raw_row in reader:
382            num += 1
[12810]383            # Skip row if empty
384            if self.emptyRow(raw_row):
385                continue
[4806]386            string_row = self.applyMapping(raw_row, mapping)
[12981]387            if ignore_empty:
388                # Replace empty strings and empty lists with ignore-markers
[8222]389                for key, val in string_row.items():
[12981]390                    if val == '' or val == '[]':
[8222]391                        string_row[key] = IGNORE_MARKER
392            row = dict(string_row.items()) # create deep copy
[6847]393            errs, inv_errs, conv_dict = self.checkConversion(string_row, mode)
[6273]394            if errs or inv_errs:
[4878]395                num_warns += 1
[6273]396                conv_warnings = self.stringFromErrs(errs, inv_errs)
397                self.writeFailedRow(
[6824]398                    failed_writer, string_row, conv_warnings)
[4821]399                continue
[6273]400            row.update(conv_dict)
[6259]401
[4806]402            if mode == 'create':
403                if not self.parentsExist(row, site):
[4878]404                    num_warns += 1
[4877]405                    self.writeFailedRow(
[6824]406                        failed_writer, string_row,
[12868]407                        "Not all parents do exist yet.")
[4806]408                    continue
409                if self.entryExists(row, site):
[4878]410                    num_warns += 1
[4877]411                    self.writeFailedRow(
[6824]412                        failed_writer, string_row,
[12868]413                        "This object already exists.")
[4806]414                    continue
415                obj = self.callFactory()
[7273]416                # Override all values in row, also
417                # student_ids and applicant_ids which have been
418                # generated in the respective __init__ methods before.
[9706]419                self.updateEntry(obj, row, site, base)
[6243]420                try:
421                    self.addEntry(obj, row, site)
[6273]422                except KeyError, error:
[6219]423                    num_warns += 1
424                    self.writeFailedRow(
[12868]425                        failed_writer, string_row, error.message)
[8540]426                    continue
[8509]427                except DuplicationError, error:
428                    num_warns += 1
429                    self.writeFailedRow(
[12868]430                        failed_writer, string_row, error.msg)
[6219]431                    continue
[4806]432            elif mode == 'remove':
433                if not self.entryExists(row, site):
[4878]434                    num_warns += 1
[4877]435                    self.writeFailedRow(
[6824]436                        failed_writer, string_row,
[9219]437                        "Cannot remove: no such entry")
[4806]438                    continue
439                self.delEntry(row, site)
440            elif mode == 'update':
441                obj = self.getEntry(row, site)
442                if obj is None:
[4878]443                    num_warns += 1
[4877]444                    self.writeFailedRow(
[6824]445                        failed_writer, string_row,
[9219]446                        "Cannot update: no such entry")
[4806]447                    continue
[7950]448                update_errors = self.checkUpdateRequirements(obj, row, site)
[7938]449                if update_errors is not None:
[7937]450                    num_warns += 1
451                    self.writeFailedRow(
452                        failed_writer, string_row, update_errors)
453                    continue
[11849]454                try:
455                    self.updateEntry(obj, row, site, base)
456                except ConstraintNotSatisfied, err:
457                    num_warns += 1
458                    self.writeFailedRow(
459                        failed_writer, string_row,
460                        "ConstraintNotSatisfied: %s" % err)
461                    continue
[4891]462            finished_writer.writerow(string_row)
[4821]463
[4832]464        time_end = time.time()
465        timedelta = time_end - time_start
[6259]466
[4878]467        self.createLogfile(path, failed_path, num, num_warns, mode, user,
[4885]468                           timedelta, logger=logger)
[4894]469        failed_path = os.path.abspath(failed_path)
[4878]470        if num_warns == 0:
[4821]471            del failed_writer
472            os.unlink(failed_path)
[4894]473            failed_path = None
474        return (num, num_warns,
475                os.path.abspath(finished_path), failed_path)
[7859]476
[9032]477    def get_csv_skeleton(self):
478        """Export CSV file only with a header of available fields.
479
480        A raw string with CSV data should be returned.
481        """
482        outfile = StringIO()
[10027]483        writer = unicodecsv.DictWriter(outfile, self.available_fields)
[9734]484        writer.writerow(
485            dict(zip(self.available_fields, self.available_fields))) # header
[9032]486        outfile.seek(0)
487        return outfile.read()
488
[7859]489class ExporterBase(object):
490    """A base for exporters.
491    """
492    grok.implements(ICSVExporter)
493
494    #: Fieldnames considered by this exporter
495    fields = ('code', 'title', 'title_prefix')
496
[7907]497    #: The title under which this exporter will be displayed
498    #: (if registered as a utility)
499    title = 'Override this title'
500
[7859]501    def mangle_value(self, value, name, context=None):
[12857]502        """Hook for mangling values in derived classes.
[7859]503        """
504        if isinstance(value, bool):
505            value = value and '1' or '0'
506        elif isinstance(value, unicode):
507            # CSV writers like byte streams better than unicode
508            value = value.encode('utf-8')
[8380]509        elif isinstance(value, datetime.datetime):
[11737]510            #value = str(value)
511            value = str('%s#' % value) # changed 2014-07-06, see ticket #941
[8380]512        elif isinstance(value, datetime.date):
513            # Order is important here: check for date after datetime as
514            # datetimes are also dates.
515            #
516            # Append hash '#' to dates to circumvent unwanted excel automatic
517            value = str('%s#' % value)
[7859]518        elif value is None:
519            # None is not really representable in CSV files
520            value = ''
521        return value
522
523    def get_csv_writer(self, filepath=None):
524        """Get a CSV dict writer instance open for writing.
525
526        Returns a tuple (<writer>, <outfile>) where ``<writer>`` is a
527        :class:`csv.DictWriter` instance and outfile is the real file
528        which is written to. The latter is important when writing to
529        StringIO and can normally be ignored otherwise.
530
531        The returned file will already be filled with the header row.
532
533        Please note that if you give a filepath, the returned outfile
534        is open for writing only and you might have to close it before
535        reopening it for reading.
536        """
537        if filepath is None:
538            outfile = StringIO()
539        else:
540            outfile = open(filepath, 'wb')
[10027]541        writer = unicodecsv.DictWriter(outfile, self.fields)
[7859]542        writer.writerow(dict(zip(self.fields, self.fields))) # header
543        return writer, outfile
544
545    def write_item(self, obj, writer):
546        """Write a row extracted from `obj` into CSV file using `writer`.
547        """
548        row = {}
549        for name in self.fields:
550            value = getattr(obj, name, None)
551            value = self.mangle_value(value, name, obj)
552            row[name] = value
553        writer.writerow(row)
554        return
555
556    def close_outfile(self, filepath, outfile):
557        """Close outfile.
558        If filepath is None, the contents of outfile is returned.
559        """
560        outfile.seek(0)
561        if filepath is None:
562            return outfile.read()
563        outfile.close()
564        return
565
[9797]566    def get_filtered(self, site, **kw):
567        """Get datasets to export filtered by keyword arguments.
568        Returns an iterable.
569        """
570        raise NotImplementedError
571
[12516]572    def get_selected(self, site, selected):
573        """Get datasets to export for selected items
574        specified by a list of identifiers.
575        Returns an iterable.
576        """
577        raise NotImplementedError
578
[7859]579    def export(self, iterable, filepath=None):
580        """Export `iterable` as CSV file.
581        If `filepath` is ``None``, a raw string with CSV data should
582        be returned.
583        """
584        raise NotImplementedError
585
586    def export_all(self, site, filepath=None):
587        """Export all appropriate objects in `site` into `filepath` as
588        CSV data.
589        If `filepath` is ``None``, a raw string with CSV data should
590        be returned.
591        """
592        raise NotImplementedError
[9217]593
[9797]594    def export_filtered(self, site, filepath=None, **kw):
[12861]595        """Export items denoted by `kw`.
[9797]596        If `filepath` is ``None``, a raw string with CSV data should
597        be returned.
598        """
599        data = self.get_filtered(site, **kw)
600        return self.export(data, filepath=filepath)
601
[12516]602    def export_selected(self, site, filepath=None, **kw):
603        """Export those items specified by a list of identifiers
604        called `selected`.
605        If `filepath` is ``None``, a raw string with CSV data should
606        be returned.
607        """
608        selected = kw.get('selected', [])
609        data = self.get_selected(site, selected)
610        return self.export(data, filepath=filepath)
611
[9797]612def export_job(site, exporter_name, **kw):
[9217]613    """Export all entries delivered by exporter and store it in a temp file.
614
615    `site` gives the site to search. It will be passed to the exporter
616    and also be set as 'current site' as the function is used in
617    asynchronous jobs which run in their own threads and have no site
618    set initially. Therefore `site` must also be a valid value for use
619    with `zope.component.hooks.setSite()`.
620
621    `exporter_name` is the utility name under which the desired
622    exporter was registered with the ZCA.
623
624    The resulting CSV file will be stored in a new temporary directory
625    (using :func:`tempfile.mkdtemp`). It will be named after the
626    exporter used with `.csv` filename extension.
627
628    Returns the path to the created CSV file.
629
630    .. note:: It is the callers responsibility to clean up the used
631              file and its parent directory.
632    """
633    setSite(site)
634    exporter = getUtility(ICSVExporter, name=exporter_name)
635    output_dir = tempfile.mkdtemp()
636    filename = '%s.csv' % exporter_name
637    output_path = os.path.join(output_dir, filename)
[9797]638    if kw == {}:
639        exporter.export_all(site, filepath=output_path)
[12516]640    elif kw.has_key('selected'):
641        exporter.export_selected(site, filepath=output_path, **kw)
[9797]642    else:
643        exporter.export_filtered(site, filepath=output_path, **kw)
[9217]644    return output_path
645
646class AsyncExportJob(AsyncJob):
647    """An IJob that exports data to CSV files.
648
649    `AsyncExportJob` instances are regular `AsyncJob` instances with a
650    different constructor API. Instead of a callable to execute, you
651    must pass a `site` and some `exporter_name` to trigger an export.
652
653    The real work is done when an instance of this class is put into a
654    queue. See :mod:`waeup.kofa.async` to learn more about
655    asynchronous jobs.
656
657    The `exporter_name` must be the name under which an ICSVExporter
658    utility was registered with the ZCA.
659
660    The `site` must be a valid site  or ``None``.
661
662    The result of an `AsyncExportJob` is the path to generated CSV
663    file. The file will reside in a temporary directory that should be
664    removed after being used.
665    """
666    grok.implements(IExportJob)
667
[9718]668    def __init__(self, site, exporter_name, *args, **kwargs):
[9217]669        super(AsyncExportJob, self).__init__(
[9718]670            export_job, site, exporter_name, *args, **kwargs)
[9217]671
[9816]672    @property
673    def finished(self):
674        """A job is marked `finished` if it is completed.
675
676        Please note: a finished report job does not neccessarily
677        provide an IReport result. See meth:`failed`.
678        """
679        return self.status == zc.async.interfaces.COMPLETED
680
681    @property
682    def failed(self):
683        """A report job is marked failed iff it is finished and the
684        result is None.
685
686        While a job is unfinished, the `failed` status is ``None``.
687
688        Failed jobs normally provide a `traceback` to examine reasons.
689        """
690        if not self.finished:
691            return None
692        if getattr(self, 'result', None) is None:
693            return True
694        return False
695
[9217]696class ExportJobContainer(object):
697    """A mix-in that provides functionality for asynchronous export jobs.
698    """
699    grok.implements(IExportJobContainer)
700    running_exports = PersistentList()
701
[9718]702    def start_export_job(self, exporter_name, user_id, *args, **kwargs):
[9217]703        """Start asynchronous export job.
704
705        `exporter_name` is the name of an exporter utility to be used.
706
707        `user_id` is the ID of the user that triggers the export.
708
709        The job_id is stored along with exporter name and user id in a
710        persistent list.
711
[9718]712        The method supports additional positional and keyword
713        arguments, which are passed as-is to the respective
714        :class:`AsyncExportJob`.
715
[9217]716        Returns the job ID of the job started.
717        """
718        site = grok.getSite()
719        manager = getUtility(IJobManager)
[9718]720        job = AsyncExportJob(site, exporter_name, *args, **kwargs)
[9217]721        job_id = manager.put(job)
722        # Make sure that the persisted list is stored in ZODB
723        self.running_exports = PersistentList(self.running_exports)
724        self.running_exports.append((job_id, exporter_name, user_id))
725        return job_id
726
727    def get_running_export_jobs(self, user_id=None):
728        """Get export jobs for user with `user_id` as list of tuples.
729
730        Each tuples holds ``<job_id>, <exporter_name>, <user_id>`` in
731        that order. The ``<exporter_name>`` is the utility name of the
732        used exporter.
733
734        If `user_id` is ``None``, all running jobs are returned.
735        """
736        entries = []
737        to_delete = []
738        manager = getUtility(IJobManager)
739        for entry in self.running_exports:
740            if user_id is not None and entry[2] != user_id:
741                continue
742            if manager.get(entry[0]) is None:
743                to_delete.append(entry)
744                continue
745            entries.append(entry)
746        if to_delete:
747            self.running_exports = PersistentList(
748                [x for x in self.running_exports if x not in to_delete])
749        return entries
750
751    def get_export_jobs_status(self, user_id=None):
752        """Get running/completed export jobs for `user_id` as list of tuples.
753
754        Each tuple holds ``<raw status>, <status translated>,
755        <exporter title>`` in that order, where ``<status
756        translated>`` and ``<exporter title>`` are translated strings
757        representing the status of the job and the human readable
758        title of the exporter used.
759        """
760        entries = self.get_running_export_jobs(user_id)
761        result = []
762        manager = getUtility(IJobManager)
763        for entry in entries:
764            job = manager.get(entry[0])
765            if job is None:
766                continue
767            status, status_translated = JOB_STATUS_MAP[job.status]
768            exporter_name = getUtility(ICSVExporter, name=entry[1]).title
769            result.append((status, status_translated, exporter_name))
770        return result
771
772    def delete_export_entry(self, entry):
773        """Delete the export denoted by `entry`.
774
775        Removes given entry from the local `running_exports` list and also
776        removes the regarding job via the local job manager.
777
778        `entry` must be a tuple ``(<job id>, <exporter name>, <user
779        id>)`` as created by :meth:`start_export_job` or returned by
780        :meth:`get_running_export_jobs`.
781        """
782        manager = getUtility(IJobManager)
783        job = manager.get(entry[0])
784        if job is not None:
785            # remove created export file
786            if isinstance(job.result, basestring):
787                if os.path.exists(os.path.dirname(job.result)):
788                    shutil.rmtree(os.path.dirname(job.result))
789        manager.remove(entry[0], self)
790        new_entries = [x for x in self.running_exports
791                       if x != entry]
792        self.running_exports = PersistentList(new_entries)
793        return
794
795    def entry_from_job_id(self, job_id):
796        """Get entry tuple for `job_id`.
797
798        Returns ``None`` if no such entry can be found.
799        """
800        for entry in self.running_exports:
801            if entry[0] == job_id:
802                return entry
803        return None
[9726]804
805class VirtualExportJobContainer(ExportJobContainer):
806    """A virtual export job container.
807
808    Virtual ExportJobContainers can be used as a mixin just like real
809    ExportJobContainer.
810
811    They retrieve and store data in the site-wide ExportJobContainer.
812
813    Functionality is currently entirely as for regular
814    ExportJobContainers, except that data is stored elsewhere.
815
816    VirtualExportJobContainers need a registered
817    IExportContainerFinder utility to find a suitable container for
818    storing data.
819    """
820    grok.implements(IExportJobContainer)
821
822    @property
823    def _site_container(self):
824        return getUtility(IExportContainerFinder)()
825
826    # The following is a simple trick. While ExportJobContainers store
827    # only one attribute in ZODB, it is sufficient to replace this
828    # attribute `running_exports` with a suitable manager to make the
829    # whole virtual container work like the original but with the data
830    # stored in the site-wide exports container. This way, virtual
831    # export containers provide the whole functionality of a regular
832    # exports container but store no data at all with themselves.
833    @property
834    def running_exports(self):
835        """Exports stored in the site-wide exports container.
836        """
837        return self._site_container.running_exports
838
839    @running_exports.setter
840    def running_exports(self, value):
841        self._site_container.running_exports = value
842
843    @running_exports.deleter
844    def running_exports(self):
845        del self._site_container.running_exports
846
[9823]847    @property
848    def logger(self):
849        return self._site_container.logger
[9726]850
851@implementer(IExportContainerFinder)
852class ExportContainerFinder(grok.GlobalUtility):
853    """Finder for local (site-wide) export container.
854    """
855
856    def __call__(self):
857        """Get the local export container-
858
859        If no site can be determined or the site provides no export
860        container, None is returned.
861        """
862        site = grok.getSite()
863        if site is None:
864            return None
865        return site.get('datacenter', None)
Note: See TracBrowser for help on using the repository browser.