source: main/waeup.kofa/trunk/src/waeup/kofa/utils/batching.py @ 15152

Last change on this file since 15152 was 15065, checked in by Henrik Bettermann, 7 years ago

AAUE staff used the IGNORE_Marker ('<IGNORE>') in the level column for the StudentStudyLevelProcessor? in create mode. The record was successfully imported but caused a traceback in level reports. Such records are now being detected during batch processing.

  • Property svn:keywords set to Id
File size: 32.9 KB
RevLine 
[7196]1## $Id: batching.py 15065 2018-06-28 07:42:03Z henrik $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
[7819]18"""Kofa components for batch processing.
[4806]19
20Batch processors eat CSV files to add, update or remove large numbers
21of certain kinds of objects at once.
22"""
23import grok
[8380]24import datetime
[4821]25import os
[9217]26import shutil
[4900]27import tempfile
[4821]28import time
[10027]29import unicodecsv
[9816]30import zc.async.interfaces
[7859]31from cStringIO import StringIO
[9217]32from persistent.list import PersistentList
33from zope.component import createObject, getUtility
34from zope.component.hooks import setSite
[9726]35from zope.interface import Interface, implementer
[4806]36from zope.schema import getFields
[14552]37from zope.schema.interfaces import ConstraintNotSatisfied, RequiredMissing
[8332]38from zope.event import notify
[9217]39from waeup.kofa.async import AsyncJob
[7811]40from waeup.kofa.interfaces import (
[9217]41    IBatchProcessor, FatalCSVError, IObjectConverter, IJobManager,
42    ICSVExporter, IGNORE_MARKER, DuplicationError, JOB_STATUS_MAP,
[9726]43    IExportJobContainer, IExportJob, IExportContainerFinder)
[4806]44
45class BatchProcessor(grok.GlobalUtility):
46    """A processor to add, update, or remove data.
47
48    This is a non-active baseclass.
49    """
[8220]50    grok.implements(IBatchProcessor)
[4806]51    grok.context(Interface)
52    grok.baseclass()
53
54    # Name used in pages and forms...
[7933]55    name = u'Non-registered base processor'
[6259]56
[4806]57    # Internal name...
[12869]58    util_name = ''
[6259]59
[4806]60    # Items for this processor need an interface with zope.schema fields.
[5009]61    iface = Interface
[6259]62
[4806]63    # The name must be the same as the util_name attribute in order to
64    # register this utility correctly.
65    grok.name(util_name)
66
67    # Headers needed to locate items...
[12869]68    location_fields = []
[6259]69
[4806]70    # A factory with this name must be registered...
[12869]71    factory_name = ''
[4806]72
73    @property
74    def required_fields(self):
[4829]75        """Required fields that have no default.
76
77        A list of names of field, whose value cannot be set if not
78        given during creation. Therefore these fields must exist in
79        input.
80
81        Fields with a default != missing_value do not belong to this
82        category.
83        """
[4806]84        result = []
85        for key, field in getFields(self.iface).items():
86            if key in self.location_fields:
87                continue
[4829]88            if field.default is not field.missing_value:
89                continue
[4806]90            if field.required:
91                result.append(key)
92        return result
[6259]93
[4806]94    @property
95    def req(self):
96        result = dict(
97            create = self.location_fields + self.required_fields,
98            update = self.location_fields,
99            remove = self.location_fields,
100        )
101        return result
102
103    @property
104    def available_fields(self):
105        return sorted(list(set(
106                    self.location_fields + getFields(self.iface).keys())))
[6259]107
[4806]108    def getHeaders(self, mode='create'):
109        return self.available_fields
110
111    def checkHeaders(self, headerfields, mode='create'):
112        req = self.req[mode]
113        # Check for required fields...
114        for field in req:
115            if not field in headerfields:
116                raise FatalCSVError(
117                    "Need at least columns %s for import!" %
118                    ', '.join(["'%s'" % x for x in req]))
[6828]119        # Check for double fields. Cannot happen because this error is
120        # already catched in views
[4806]121        not_ignored_fields = [x for x in headerfields
122                              if not x.startswith('--')]
123        if len(set(not_ignored_fields)) < len(not_ignored_fields):
124            raise FatalCSVError(
125                "Double headers: each column name may only appear once.")
126        return True
127
128    def applyMapping(self, row, mapping):
[4811]129        """Apply mapping to a row of CSV data.
130        """
[4806]131        result = dict()
132        for key, replacement in mapping.items():
[6824]133            if replacement == u'--IGNORE--':
134                # Skip ignored columns in failed and finished data files.
135                continue
[4806]136            result[replacement] = row[key]
137        return result
[6259]138
[4832]139    def getMapping(self, path, headerfields, mode):
[6824]140        """Get a mapping from CSV file headerfields to actually used fieldnames.
141
[4811]142        """
[4832]143        result = dict()
[10027]144        reader = unicodecsv.reader(open(path, 'rb'))
[4806]145        raw_header = reader.next()
[4832]146        for num, field in enumerate(headerfields):
147            if field not in self.location_fields and mode == 'remove':
[6824]148                # Skip non-location fields when removing.
149                continue
150            if field == u'--IGNORE--':
151                # Skip ignored columns in failed and finished data files.
152                continue
[4832]153            result[raw_header[num]] = field
154        return result
[4806]155
[6273]156    def stringFromErrs(self, errors, inv_errors):
157        result = []
158        for err in errors:
159            fieldname, message = err
160            result.append("%s: %s" % (fieldname, message))
161        for err in inv_errors:
162            result.append("invariant: %s" % err)
163        return '; '.join(result)
164
[4806]165    def callFactory(self, *args, **kw):
166        return createObject(self.factory_name)
167
168    def parentsExist(self, row, site):
[4811]169        """Tell whether the parent object for data in ``row`` exists.
170        """
[4806]171        raise NotImplementedError('method not implemented')
172
173    def entryExists(self, row, site):
[4811]174        """Tell whether there already exists an entry for ``row`` data.
175        """
[4806]176        raise NotImplementedError('method not implemented')
177
178    def getParent(self, row, site):
[4811]179        """Get the parent object for the entry in ``row``.
180        """
[4806]181        raise NotImplementedError('method not implemented')
[6259]182
[5009]183    def getEntry(self, row, site):
[12513]184        """Get the object for the entry in ``row``.
[5009]185        """
186        raise NotImplementedError('method not implemented')
[6259]187
[4806]188    def addEntry(self, obj, row, site):
[4811]189        """Add the entry given given by ``row`` data.
190        """
[4806]191        raise NotImplementedError('method not implemented')
192
193    def delEntry(self, row, site):
[4811]194        """Delete entry given by ``row`` data.
195        """
[6259]196        raise NotImplementedError('method not implemented')
[4806]197
[7950]198    def checkUpdateRequirements(self, obj, row, site):
199        """Checks requirements the object must fulfill when being updated.
[7938]200
201        This method is not used in case of deleting or adding objects.
202
[7950]203        Returns error messages as strings in case of requirement
[7938]204        problems.
[7937]205        """
[7938]206        return None
[7937]207
[9706]208    def updateEntry(self, obj, row, site, filename):
[4984]209        """Update obj to the values given in row.
[8220]210
211        Returns a string describing the fields changed.
[4984]212        """
[8220]213        changed = []
[4829]214        for key, value in row.items():
[8220]215            # Skip fields to be ignored.
216            if value == IGNORE_MARKER:
217                continue
[8304]218            # Skip fields not declared in interface and which are
219            # not yet attributes of existing objects. We can thus not
220            # add non-existing attributes here.
[8220]221            if not hasattr(obj, key):
222                continue
[13159]223            # DefaultObjectConverter.fromStringDict fails for
224            # list-of-choices fields because we are using a different
225            # widget for this combination. Thus the ListFieldConverter
226            # returns a useless dictionary which causes getWidgetsData to
227            # skip the field. The value in row remains unchanged.
228            # We have to evaluate the string and replace the value here.
[9265]229            try:
[13159]230                evalvalue = eval(value)
231                if isinstance(evalvalue, list):
232                    value = evalvalue
233            except:
234                pass
235            try:
[9265]236                setattr(obj, key, value)
237            except AttributeError:
238                # Computed attributes can't be set.
239                continue
[8222]240            log_value = getattr(value, 'code', value)
241            changed.append('%s=%s' % (key, log_value))
[8332]242
[8333]243        # If any catalog is involved it must be updated.
244        #
245        # XXX: The event is also triggered when creating objects as
246        # updateEntry is called also when creating entries resulting
247        # in objectAdded and additional objectModified events.
248        if len(changed):
249            notify(grok.ObjectModifiedEvent(obj))
[8332]250
[8220]251        return ', '.join(changed)
[4821]252
[4832]253    def createLogfile(self, path, fail_path, num, warnings, mode, user,
[4885]254                      timedelta, logger=None):
255        """Write to log file.
[4821]256        """
[4885]257        if logger is None:
258            return
[9739]259        logger.info(
260            "processed: %s, %s mode, %s lines (%s successful/ %s failed), "
261            "%0.3f s (%0.4f s/item)" % (
262            path, mode, num, num - warnings, warnings,
263            timedelta, timedelta/(num or 1)))
[4821]264        return
[4877]265
266    def writeFailedRow(self, writer, row, warnings):
267        """Write a row with error messages to error CSV.
268
269        If warnings is a list of strings, they will be concatenated.
270        """
271        error_col = warnings
272        if isinstance(warnings, list):
273            error_col = ' / '.join(warnings)
274        row['--ERRORS--'] = error_col
275        writer.writerow(row)
276        return
[6259]277
[8220]278    def checkConversion(self, row, mode='ignore', ignore_empty=True):
[6847]279        """Validates all values in row.
280        """
281        converter = IObjectConverter(self.iface)
282        errs, inv_errs, conv_dict =  converter.fromStringDict(
[8220]283            row, self.factory_name, mode=mode)
[6847]284        return errs, inv_errs, conv_dict
285
[12810]286
287    def emptyRow(self, row):
288        """Detect empty rows.
289        """
290        for value in row.values():
[14424]291            if not value in (None, IGNORE_MARKER) and value.strip():
[12810]292                return False
293        return True
294
[4885]295    def doImport(self, path, headerfields, mode='create', user='Unknown',
[8220]296                 logger=None, ignore_empty=True):
[12869]297        """In contrast to most other methods, `doImport` is not supposed to
[12867]298        be customized, neither in custom packages nor in derived batch
299        processor classes. Therefore, this is the only place where we
300        do import data.
301
302        Before this method starts creating or updating persistent data, it
303        prepares two more files in a temporary folder of the filesystem: (1)
304        a file for pending data with file extension ``.pending`` and (2)
305        a file for successfully processed data with file extension
306        ``.finished``. Then the method starts iterating over all rows of
307        the CSV file. Each row is treated as follows:
308
309        1. An empty row is skipped.
310
[12997]311        2. Empty strings or lists (``[]``) in the row are replaced by
312           ignore markers.
[12867]313
[12868]314        3. The `BatchProcessor.checkConversion` method validates and converts
315           all values in the row. Conversion means the transformation of strings
316           into Python objects. For instance, number expressions have to be
317           transformed into integers, dates into datetime objects, phone number
318           expressions into phone number objects, etc. The converter returns a
319           dictionary with converted values or, if the validation of one of the
320           elements fails, an appropriate warning message. If the conversion
321           fails a pending record is created and stored in the pending data file
322           together with a warning message the converter has raised.
[12867]323
[12868]324        4. In **create mode** only:
[12867]325
[12868]326           The parent object must be found and a child
327           object with same object id must not exist. Otherwise the row
328           is skipped, a corresponding warning message is raised and a
329           record is stored in the pending data file.
[12867]330
[12869]331           Now `doImport` tries to add the new object with the data
[12868]332           from the conversion dictionary. In some cases this
[12869]333           may fail and a `DuplicationError` is raised. For example, a new
[12868]334           payment ticket is created but the same payment for same session
335           has already been made. In this case the object id is unique, no
336           other object with same id exists, but making the 'same' payment
337           twice does not make sense. The import is skipped and a
338           record is stored in the pending data file.
[12867]339
[12868]340        5. In **update mode** only:
341
342           If the object can't be found, the row is skipped,
343           a ``no such entry`` warning message is raised and a record is
344           stored in the pending data file.
345
346           The `BatchProcessor.checkUpdateRequirements` method checks additional
347           requirements the object must fulfill before being updated. These
348           requirements are not imposed by the data type but the context
349           of the object. For example, post-graduate students have a different
350           registration workflow. With this method we do forbid certain workflow
351           transitions or states.
352
[12869]353           Finally, `doImport` updates the existing object with the data
[12868]354           from the conversion dictionary.
355
356        6. In **remove mode** only:
357
358           If the object can't be found, the row is skipped,
359           a ``no such entry`` warning message is raised and a record is
360           stored in the pending data file.
361
[12869]362           Finally, `doImport` removes the existing object.
[12868]363
[4811]364        """
[4832]365        time_start = time.time()
[4806]366        self.checkHeaders(headerfields, mode)
[4832]367        mapping = self.getMapping(path, headerfields, mode)
[10027]368        reader = unicodecsv.DictReader(open(path, 'rb'))
[4889]369
[4900]370        temp_dir = tempfile.mkdtemp()
[6259]371
[6273]372        base = os.path.basename(path)
373        (base, ext) = os.path.splitext(base)
[4900]374        failed_path = os.path.join(temp_dir, "%s.pending%s" % (base, ext))
[6831]375        failed_headers = mapping.values()
[4877]376        failed_headers.append('--ERRORS--')
[10027]377        failed_writer = unicodecsv.DictWriter(open(failed_path, 'wb'),
378                                              failed_headers)
[8573]379        os.chmod(failed_path, 0664)
[6831]380        failed_writer.writerow(dict([(x,x) for x in failed_headers]))
[4891]381
[4900]382        finished_path = os.path.join(temp_dir, "%s.finished%s" % (base, ext))
[6831]383        finished_headers = mapping.values()
[10027]384        finished_writer = unicodecsv.DictWriter(open(finished_path, 'wb'),
385                                                finished_headers)
[8905]386        os.chmod(finished_path, 0664)
[4891]387        finished_writer.writerow(dict([(x,x) for x in finished_headers]))
[6259]388
[4806]389        num =0
[4878]390        num_warns = 0
[4806]391        site = grok.getSite()
[7859]392
[4806]393        for raw_row in reader:
394            num += 1
[12810]395            # Skip row if empty
396            if self.emptyRow(raw_row):
397                continue
[4806]398            string_row = self.applyMapping(raw_row, mapping)
[12981]399            if ignore_empty:
400                # Replace empty strings and empty lists with ignore-markers
[8222]401                for key, val in string_row.items():
[12981]402                    if val == '' or val == '[]':
[8222]403                        string_row[key] = IGNORE_MARKER
404            row = dict(string_row.items()) # create deep copy
[6847]405            errs, inv_errs, conv_dict = self.checkConversion(string_row, mode)
[6273]406            if errs or inv_errs:
[4878]407                num_warns += 1
[6273]408                conv_warnings = self.stringFromErrs(errs, inv_errs)
409                self.writeFailedRow(
[6824]410                    failed_writer, string_row, conv_warnings)
[4821]411                continue
[6273]412            row.update(conv_dict)
[6259]413
[4806]414            if mode == 'create':
415                if not self.parentsExist(row, site):
[4878]416                    num_warns += 1
[4877]417                    self.writeFailedRow(
[6824]418                        failed_writer, string_row,
[12868]419                        "Not all parents do exist yet.")
[4806]420                    continue
421                if self.entryExists(row, site):
[4878]422                    num_warns += 1
[4877]423                    self.writeFailedRow(
[6824]424                        failed_writer, string_row,
[12868]425                        "This object already exists.")
[4806]426                    continue
427                obj = self.callFactory()
[7273]428                # Override all values in row, also
429                # student_ids and applicant_ids which have been
430                # generated in the respective __init__ methods before.
[9706]431                self.updateEntry(obj, row, site, base)
[6243]432                try:
433                    self.addEntry(obj, row, site)
[6273]434                except KeyError, error:
[6219]435                    num_warns += 1
436                    self.writeFailedRow(
[12868]437                        failed_writer, string_row, error.message)
[8540]438                    continue
[8509]439                except DuplicationError, error:
440                    num_warns += 1
441                    self.writeFailedRow(
[12868]442                        failed_writer, string_row, error.msg)
[6219]443                    continue
[15065]444                except FatalCSVError, error:
445                    num_warns += 1
446                    self.writeFailedRow(
447                        failed_writer, string_row, error.message)
448                    continue
[4806]449            elif mode == 'remove':
450                if not self.entryExists(row, site):
[4878]451                    num_warns += 1
[4877]452                    self.writeFailedRow(
[6824]453                        failed_writer, string_row,
[9219]454                        "Cannot remove: no such entry")
[4806]455                    continue
456                self.delEntry(row, site)
457            elif mode == 'update':
458                obj = self.getEntry(row, site)
459                if obj is None:
[4878]460                    num_warns += 1
[4877]461                    self.writeFailedRow(
[6824]462                        failed_writer, string_row,
[9219]463                        "Cannot update: no such entry")
[4806]464                    continue
[7950]465                update_errors = self.checkUpdateRequirements(obj, row, site)
[7938]466                if update_errors is not None:
[7937]467                    num_warns += 1
468                    self.writeFailedRow(
469                        failed_writer, string_row, update_errors)
470                    continue
[11849]471                try:
472                    self.updateEntry(obj, row, site, base)
473                except ConstraintNotSatisfied, err:
474                    num_warns += 1
475                    self.writeFailedRow(
476                        failed_writer, string_row,
477                        "ConstraintNotSatisfied: %s" % err)
[14976]478                    continue
[14552]479                except RequiredMissing, err:
480                    num_warns += 1
481                    self.writeFailedRow(
482                        failed_writer, string_row,
483                        "RequiredMissing: %s" % err)
[11849]484                    continue
[4891]485            finished_writer.writerow(string_row)
[4821]486
[4832]487        time_end = time.time()
488        timedelta = time_end - time_start
[6259]489
[4878]490        self.createLogfile(path, failed_path, num, num_warns, mode, user,
[4885]491                           timedelta, logger=logger)
[4894]492        failed_path = os.path.abspath(failed_path)
[4878]493        if num_warns == 0:
[4821]494            del failed_writer
495            os.unlink(failed_path)
[4894]496            failed_path = None
497        return (num, num_warns,
498                os.path.abspath(finished_path), failed_path)
[7859]499
[9032]500    def get_csv_skeleton(self):
501        """Export CSV file only with a header of available fields.
502
503        A raw string with CSV data should be returned.
504        """
505        outfile = StringIO()
[10027]506        writer = unicodecsv.DictWriter(outfile, self.available_fields)
[9734]507        writer.writerow(
508            dict(zip(self.available_fields, self.available_fields))) # header
[9032]509        outfile.seek(0)
510        return outfile.read()
511
[7859]512class ExporterBase(object):
513    """A base for exporters.
514    """
515    grok.implements(ICSVExporter)
516
517    #: Fieldnames considered by this exporter
518    fields = ('code', 'title', 'title_prefix')
519
[7907]520    #: The title under which this exporter will be displayed
521    #: (if registered as a utility)
522    title = 'Override this title'
523
[7859]524    def mangle_value(self, value, name, context=None):
[12857]525        """Hook for mangling values in derived classes.
[7859]526        """
527        if isinstance(value, bool):
528            value = value and '1' or '0'
529        elif isinstance(value, unicode):
530            # CSV writers like byte streams better than unicode
531            value = value.encode('utf-8')
[8380]532        elif isinstance(value, datetime.datetime):
[11737]533            #value = str(value)
534            value = str('%s#' % value) # changed 2014-07-06, see ticket #941
[8380]535        elif isinstance(value, datetime.date):
536            # Order is important here: check for date after datetime as
537            # datetimes are also dates.
538            #
539            # Append hash '#' to dates to circumvent unwanted excel automatic
540            value = str('%s#' % value)
[7859]541        elif value is None:
542            # None is not really representable in CSV files
543            value = ''
544        return value
545
546    def get_csv_writer(self, filepath=None):
547        """Get a CSV dict writer instance open for writing.
548
549        Returns a tuple (<writer>, <outfile>) where ``<writer>`` is a
550        :class:`csv.DictWriter` instance and outfile is the real file
551        which is written to. The latter is important when writing to
552        StringIO and can normally be ignored otherwise.
553
554        The returned file will already be filled with the header row.
555
556        Please note that if you give a filepath, the returned outfile
557        is open for writing only and you might have to close it before
558        reopening it for reading.
559        """
560        if filepath is None:
561            outfile = StringIO()
562        else:
563            outfile = open(filepath, 'wb')
[10027]564        writer = unicodecsv.DictWriter(outfile, self.fields)
[7859]565        writer.writerow(dict(zip(self.fields, self.fields))) # header
566        return writer, outfile
567
568    def write_item(self, obj, writer):
569        """Write a row extracted from `obj` into CSV file using `writer`.
570        """
571        row = {}
572        for name in self.fields:
573            value = getattr(obj, name, None)
574            value = self.mangle_value(value, name, obj)
575            row[name] = value
576        writer.writerow(row)
577        return
578
579    def close_outfile(self, filepath, outfile):
580        """Close outfile.
581        If filepath is None, the contents of outfile is returned.
582        """
583        outfile.seek(0)
584        if filepath is None:
585            return outfile.read()
586        outfile.close()
587        return
588
[9797]589    def get_filtered(self, site, **kw):
590        """Get datasets to export filtered by keyword arguments.
591        Returns an iterable.
592        """
593        raise NotImplementedError
594
[12516]595    def get_selected(self, site, selected):
596        """Get datasets to export for selected items
597        specified by a list of identifiers.
598        Returns an iterable.
599        """
600        raise NotImplementedError
601
[7859]602    def export(self, iterable, filepath=None):
603        """Export `iterable` as CSV file.
604        If `filepath` is ``None``, a raw string with CSV data should
605        be returned.
606        """
607        raise NotImplementedError
608
609    def export_all(self, site, filepath=None):
610        """Export all appropriate objects in `site` into `filepath` as
611        CSV data.
612        If `filepath` is ``None``, a raw string with CSV data should
613        be returned.
614        """
615        raise NotImplementedError
[9217]616
[9797]617    def export_filtered(self, site, filepath=None, **kw):
[12861]618        """Export items denoted by `kw`.
[9797]619        If `filepath` is ``None``, a raw string with CSV data should
620        be returned.
621        """
622        data = self.get_filtered(site, **kw)
623        return self.export(data, filepath=filepath)
624
[12516]625    def export_selected(self, site, filepath=None, **kw):
626        """Export those items specified by a list of identifiers
627        called `selected`.
628        If `filepath` is ``None``, a raw string with CSV data should
629        be returned.
630        """
631        selected = kw.get('selected', [])
632        data = self.get_selected(site, selected)
633        return self.export(data, filepath=filepath)
634
[9797]635def export_job(site, exporter_name, **kw):
[9217]636    """Export all entries delivered by exporter and store it in a temp file.
637
638    `site` gives the site to search. It will be passed to the exporter
639    and also be set as 'current site' as the function is used in
640    asynchronous jobs which run in their own threads and have no site
641    set initially. Therefore `site` must also be a valid value for use
642    with `zope.component.hooks.setSite()`.
643
644    `exporter_name` is the utility name under which the desired
645    exporter was registered with the ZCA.
646
647    The resulting CSV file will be stored in a new temporary directory
648    (using :func:`tempfile.mkdtemp`). It will be named after the
649    exporter used with `.csv` filename extension.
650
651    Returns the path to the created CSV file.
652
653    .. note:: It is the callers responsibility to clean up the used
654              file and its parent directory.
655    """
656    setSite(site)
657    exporter = getUtility(ICSVExporter, name=exporter_name)
658    output_dir = tempfile.mkdtemp()
659    filename = '%s.csv' % exporter_name
660    output_path = os.path.join(output_dir, filename)
[9797]661    if kw == {}:
662        exporter.export_all(site, filepath=output_path)
[12516]663    elif kw.has_key('selected'):
664        exporter.export_selected(site, filepath=output_path, **kw)
[9797]665    else:
666        exporter.export_filtered(site, filepath=output_path, **kw)
[9217]667    return output_path
668
669class AsyncExportJob(AsyncJob):
670    """An IJob that exports data to CSV files.
671
672    `AsyncExportJob` instances are regular `AsyncJob` instances with a
673    different constructor API. Instead of a callable to execute, you
674    must pass a `site` and some `exporter_name` to trigger an export.
675
676    The real work is done when an instance of this class is put into a
677    queue. See :mod:`waeup.kofa.async` to learn more about
678    asynchronous jobs.
679
680    The `exporter_name` must be the name under which an ICSVExporter
681    utility was registered with the ZCA.
682
683    The `site` must be a valid site  or ``None``.
684
685    The result of an `AsyncExportJob` is the path to generated CSV
686    file. The file will reside in a temporary directory that should be
687    removed after being used.
688    """
689    grok.implements(IExportJob)
690
[9718]691    def __init__(self, site, exporter_name, *args, **kwargs):
[9217]692        super(AsyncExportJob, self).__init__(
[9718]693            export_job, site, exporter_name, *args, **kwargs)
[9217]694
[9816]695    @property
696    def finished(self):
697        """A job is marked `finished` if it is completed.
698
699        Please note: a finished report job does not neccessarily
700        provide an IReport result. See meth:`failed`.
701        """
702        return self.status == zc.async.interfaces.COMPLETED
703
704    @property
705    def failed(self):
706        """A report job is marked failed iff it is finished and the
707        result is None.
708
709        While a job is unfinished, the `failed` status is ``None``.
710
711        Failed jobs normally provide a `traceback` to examine reasons.
712        """
713        if not self.finished:
714            return None
715        if getattr(self, 'result', None) is None:
716            return True
717        return False
718
[9217]719class ExportJobContainer(object):
720    """A mix-in that provides functionality for asynchronous export jobs.
721    """
722    grok.implements(IExportJobContainer)
723    running_exports = PersistentList()
724
[9718]725    def start_export_job(self, exporter_name, user_id, *args, **kwargs):
[9217]726        """Start asynchronous export job.
727
728        `exporter_name` is the name of an exporter utility to be used.
729
730        `user_id` is the ID of the user that triggers the export.
731
732        The job_id is stored along with exporter name and user id in a
733        persistent list.
734
[9718]735        The method supports additional positional and keyword
736        arguments, which are passed as-is to the respective
737        :class:`AsyncExportJob`.
738
[9217]739        Returns the job ID of the job started.
740        """
741        site = grok.getSite()
742        manager = getUtility(IJobManager)
[9718]743        job = AsyncExportJob(site, exporter_name, *args, **kwargs)
[9217]744        job_id = manager.put(job)
745        # Make sure that the persisted list is stored in ZODB
746        self.running_exports = PersistentList(self.running_exports)
747        self.running_exports.append((job_id, exporter_name, user_id))
748        return job_id
749
750    def get_running_export_jobs(self, user_id=None):
751        """Get export jobs for user with `user_id` as list of tuples.
752
753        Each tuples holds ``<job_id>, <exporter_name>, <user_id>`` in
754        that order. The ``<exporter_name>`` is the utility name of the
755        used exporter.
756
757        If `user_id` is ``None``, all running jobs are returned.
758        """
759        entries = []
760        to_delete = []
761        manager = getUtility(IJobManager)
762        for entry in self.running_exports:
763            if user_id is not None and entry[2] != user_id:
764                continue
765            if manager.get(entry[0]) is None:
766                to_delete.append(entry)
767                continue
768            entries.append(entry)
769        if to_delete:
770            self.running_exports = PersistentList(
771                [x for x in self.running_exports if x not in to_delete])
772        return entries
773
774    def get_export_jobs_status(self, user_id=None):
775        """Get running/completed export jobs for `user_id` as list of tuples.
776
777        Each tuple holds ``<raw status>, <status translated>,
778        <exporter title>`` in that order, where ``<status
779        translated>`` and ``<exporter title>`` are translated strings
780        representing the status of the job and the human readable
781        title of the exporter used.
782        """
783        entries = self.get_running_export_jobs(user_id)
784        result = []
785        manager = getUtility(IJobManager)
786        for entry in entries:
787            job = manager.get(entry[0])
788            if job is None:
789                continue
790            status, status_translated = JOB_STATUS_MAP[job.status]
791            exporter_name = getUtility(ICSVExporter, name=entry[1]).title
792            result.append((status, status_translated, exporter_name))
793        return result
794
795    def delete_export_entry(self, entry):
796        """Delete the export denoted by `entry`.
797
798        Removes given entry from the local `running_exports` list and also
799        removes the regarding job via the local job manager.
800
801        `entry` must be a tuple ``(<job id>, <exporter name>, <user
802        id>)`` as created by :meth:`start_export_job` or returned by
803        :meth:`get_running_export_jobs`.
804        """
805        manager = getUtility(IJobManager)
806        job = manager.get(entry[0])
807        if job is not None:
808            # remove created export file
809            if isinstance(job.result, basestring):
810                if os.path.exists(os.path.dirname(job.result)):
811                    shutil.rmtree(os.path.dirname(job.result))
812        manager.remove(entry[0], self)
813        new_entries = [x for x in self.running_exports
814                       if x != entry]
815        self.running_exports = PersistentList(new_entries)
816        return
817
818    def entry_from_job_id(self, job_id):
819        """Get entry tuple for `job_id`.
820
821        Returns ``None`` if no such entry can be found.
822        """
823        for entry in self.running_exports:
824            if entry[0] == job_id:
825                return entry
826        return None
[9726]827
828class VirtualExportJobContainer(ExportJobContainer):
829    """A virtual export job container.
830
831    Virtual ExportJobContainers can be used as a mixin just like real
832    ExportJobContainer.
833
834    They retrieve and store data in the site-wide ExportJobContainer.
835
836    Functionality is currently entirely as for regular
837    ExportJobContainers, except that data is stored elsewhere.
838
839    VirtualExportJobContainers need a registered
840    IExportContainerFinder utility to find a suitable container for
841    storing data.
842    """
843    grok.implements(IExportJobContainer)
844
845    @property
846    def _site_container(self):
847        return getUtility(IExportContainerFinder)()
848
849    # The following is a simple trick. While ExportJobContainers store
850    # only one attribute in ZODB, it is sufficient to replace this
851    # attribute `running_exports` with a suitable manager to make the
852    # whole virtual container work like the original but with the data
853    # stored in the site-wide exports container. This way, virtual
854    # export containers provide the whole functionality of a regular
855    # exports container but store no data at all with themselves.
856    @property
857    def running_exports(self):
858        """Exports stored in the site-wide exports container.
859        """
860        return self._site_container.running_exports
861
862    @running_exports.setter
863    def running_exports(self, value):
864        self._site_container.running_exports = value
865
866    @running_exports.deleter
867    def running_exports(self):
868        del self._site_container.running_exports
869
[9823]870    @property
871    def logger(self):
872        return self._site_container.logger
[9726]873
874@implementer(IExportContainerFinder)
875class ExportContainerFinder(grok.GlobalUtility):
876    """Finder for local (site-wide) export container.
877    """
878
879    def __call__(self):
880        """Get the local export container-
881
882        If no site can be determined or the site provides no export
883        container, None is returned.
884        """
885        site = grok.getSite()
886        if site is None:
887            return None
888        return site.get('datacenter', None)
Note: See TracBrowser for help on using the repository browser.