source: main/waeup.kofa/trunk/src/waeup/kofa/utils/batching.py @ 16231

Last change on this file since 16231 was 16012, checked in by Henrik Bettermann, 5 years ago

Implement BatchProcessor.checkCreateRequirements and
BatchProcessor.checkUpdateRequirements methods mainly
to protect course result lists of graduated student.

  • Property svn:keywords set to Id
File size: 35.1 KB
RevLine 
[7196]1## $Id: batching.py 16012 2020-02-24 21:26:35Z henrik $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
[7819]18"""Kofa components for batch processing.
[4806]19
20Batch processors eat CSV files to add, update or remove large numbers
21of certain kinds of objects at once.
22"""
23import grok
[8380]24import datetime
[4821]25import os
[9217]26import shutil
[4900]27import tempfile
[4821]28import time
[10027]29import unicodecsv
[9816]30import zc.async.interfaces
[7859]31from cStringIO import StringIO
[9217]32from persistent.list import PersistentList
33from zope.component import createObject, getUtility
34from zope.component.hooks import setSite
[9726]35from zope.interface import Interface, implementer
[4806]36from zope.schema import getFields
[14552]37from zope.schema.interfaces import ConstraintNotSatisfied, RequiredMissing
[8332]38from zope.event import notify
[9217]39from waeup.kofa.async import AsyncJob
[7811]40from waeup.kofa.interfaces import (
[9217]41    IBatchProcessor, FatalCSVError, IObjectConverter, IJobManager,
42    ICSVExporter, IGNORE_MARKER, DuplicationError, JOB_STATUS_MAP,
[9726]43    IExportJobContainer, IExportJob, IExportContainerFinder)
[4806]44
45class BatchProcessor(grok.GlobalUtility):
46    """A processor to add, update, or remove data.
47
48    This is a non-active baseclass.
49    """
[8220]50    grok.implements(IBatchProcessor)
[4806]51    grok.context(Interface)
52    grok.baseclass()
53
54    # Name used in pages and forms...
[7933]55    name = u'Non-registered base processor'
[6259]56
[4806]57    # Internal name...
[12869]58    util_name = ''
[6259]59
[4806]60    # Items for this processor need an interface with zope.schema fields.
[5009]61    iface = Interface
[6259]62
[4806]63    # The name must be the same as the util_name attribute in order to
64    # register this utility correctly.
65    grok.name(util_name)
66
67    # Headers needed to locate items...
[12869]68    location_fields = []
[6259]69
[4806]70    # A factory with this name must be registered...
[12869]71    factory_name = ''
[4806]72
73    @property
74    def required_fields(self):
[4829]75        """Required fields that have no default.
76
77        A list of names of field, whose value cannot be set if not
78        given during creation. Therefore these fields must exist in
79        input.
80
81        Fields with a default != missing_value do not belong to this
82        category.
83        """
[4806]84        result = []
85        for key, field in getFields(self.iface).items():
86            if key in self.location_fields:
87                continue
[4829]88            if field.default is not field.missing_value:
89                continue
[4806]90            if field.required:
91                result.append(key)
92        return result
[6259]93
[4806]94    @property
95    def req(self):
96        result = dict(
97            create = self.location_fields + self.required_fields,
98            update = self.location_fields,
99            remove = self.location_fields,
100        )
101        return result
102
103    @property
104    def available_fields(self):
105        return sorted(list(set(
106                    self.location_fields + getFields(self.iface).keys())))
[6259]107
[4806]108    def getHeaders(self, mode='create'):
109        return self.available_fields
110
111    def checkHeaders(self, headerfields, mode='create'):
112        req = self.req[mode]
113        # Check for required fields...
114        for field in req:
115            if not field in headerfields:
116                raise FatalCSVError(
117                    "Need at least columns %s for import!" %
118                    ', '.join(["'%s'" % x for x in req]))
[6828]119        # Check for double fields. Cannot happen because this error is
120        # already catched in views
[4806]121        not_ignored_fields = [x for x in headerfields
122                              if not x.startswith('--')]
123        if len(set(not_ignored_fields)) < len(not_ignored_fields):
124            raise FatalCSVError(
125                "Double headers: each column name may only appear once.")
126        return True
127
128    def applyMapping(self, row, mapping):
[4811]129        """Apply mapping to a row of CSV data.
130        """
[4806]131        result = dict()
132        for key, replacement in mapping.items():
[6824]133            if replacement == u'--IGNORE--':
134                # Skip ignored columns in failed and finished data files.
135                continue
[4806]136            result[replacement] = row[key]
137        return result
[6259]138
[4832]139    def getMapping(self, path, headerfields, mode):
[6824]140        """Get a mapping from CSV file headerfields to actually used fieldnames.
141
[4811]142        """
[4832]143        result = dict()
[10027]144        reader = unicodecsv.reader(open(path, 'rb'))
[4806]145        raw_header = reader.next()
[4832]146        for num, field in enumerate(headerfields):
147            if field not in self.location_fields and mode == 'remove':
[6824]148                # Skip non-location fields when removing.
149                continue
150            if field == u'--IGNORE--':
151                # Skip ignored columns in failed and finished data files.
152                continue
[4832]153            result[raw_header[num]] = field
154        return result
[4806]155
[6273]156    def stringFromErrs(self, errors, inv_errors):
157        result = []
158        for err in errors:
159            fieldname, message = err
160            result.append("%s: %s" % (fieldname, message))
161        for err in inv_errors:
162            result.append("invariant: %s" % err)
163        return '; '.join(result)
164
[4806]165    def callFactory(self, *args, **kw):
166        return createObject(self.factory_name)
167
168    def parentsExist(self, row, site):
[4811]169        """Tell whether the parent object for data in ``row`` exists.
170        """
[4806]171        raise NotImplementedError('method not implemented')
172
173    def entryExists(self, row, site):
[4811]174        """Tell whether there already exists an entry for ``row`` data.
175        """
[4806]176        raise NotImplementedError('method not implemented')
177
178    def getParent(self, row, site):
[4811]179        """Get the parent object for the entry in ``row``.
180        """
[4806]181        raise NotImplementedError('method not implemented')
[6259]182
[5009]183    def getEntry(self, row, site):
[12513]184        """Get the object for the entry in ``row``.
[5009]185        """
186        raise NotImplementedError('method not implemented')
[6259]187
[4806]188    def addEntry(self, obj, row, site):
[4811]189        """Add the entry given given by ``row`` data.
190        """
[4806]191        raise NotImplementedError('method not implemented')
192
193    def delEntry(self, row, site):
[4811]194        """Delete entry given by ``row`` data.
195        """
[6259]196        raise NotImplementedError('method not implemented')
[4806]197
[16012]198    def checkCreateRequirements(self, parent, row, site):
199        """Checks requirements the parent object must fulfill when
200        a new subobject is being created.
201
202        This method is not used in case of updating or removing objects.
203
204        Returns error messages as strings in case of requirement
205        problems.
206        """
207        return None
208
[7950]209    def checkUpdateRequirements(self, obj, row, site):
210        """Checks requirements the object must fulfill when being updated.
[7938]211
212        This method is not used in case of deleting or adding objects.
213
[7950]214        Returns error messages as strings in case of requirement
[7938]215        problems.
[7937]216        """
[7938]217        return None
[7937]218
[16012]219
220    def checkRemoveRequirements(self, obj, row, site):
221        """Checks requirements the object must fulfill when being removed.
222
223        This method is not used in case of updating or adding objects.
224
225        Returns error messages as strings in case of requirement
226        problems.
227        """
228        return None
229
[9706]230    def updateEntry(self, obj, row, site, filename):
[4984]231        """Update obj to the values given in row.
[8220]232
233        Returns a string describing the fields changed.
[4984]234        """
[8220]235        changed = []
[4829]236        for key, value in row.items():
[8220]237            # Skip fields to be ignored.
238            if value == IGNORE_MARKER:
239                continue
[8304]240            # Skip fields not declared in interface and which are
241            # not yet attributes of existing objects. We can thus not
242            # add non-existing attributes here.
[8220]243            if not hasattr(obj, key):
244                continue
[13159]245            # DefaultObjectConverter.fromStringDict fails for
246            # list-of-choices fields because we are using a different
247            # widget for this combination. Thus the ListFieldConverter
248            # returns a useless dictionary which causes getWidgetsData to
249            # skip the field. The value in row remains unchanged.
250            # We have to evaluate the string and replace the value here.
[9265]251            try:
[13159]252                evalvalue = eval(value)
253                if isinstance(evalvalue, list):
254                    value = evalvalue
255            except:
256                pass
257            try:
[9265]258                setattr(obj, key, value)
259            except AttributeError:
260                # Computed attributes can't be set.
261                continue
[8222]262            log_value = getattr(value, 'code', value)
263            changed.append('%s=%s' % (key, log_value))
[8332]264
[8333]265        # If any catalog is involved it must be updated.
266        #
267        # XXX: The event is also triggered when creating objects as
268        # updateEntry is called also when creating entries resulting
269        # in objectAdded and additional objectModified events.
270        if len(changed):
271            notify(grok.ObjectModifiedEvent(obj))
[8332]272
[8220]273        return ', '.join(changed)
[4821]274
[4832]275    def createLogfile(self, path, fail_path, num, warnings, mode, user,
[4885]276                      timedelta, logger=None):
277        """Write to log file.
[4821]278        """
[4885]279        if logger is None:
280            return
[9739]281        logger.info(
282            "processed: %s, %s mode, %s lines (%s successful/ %s failed), "
283            "%0.3f s (%0.4f s/item)" % (
284            path, mode, num, num - warnings, warnings,
285            timedelta, timedelta/(num or 1)))
[4821]286        return
[4877]287
288    def writeFailedRow(self, writer, row, warnings):
289        """Write a row with error messages to error CSV.
290
291        If warnings is a list of strings, they will be concatenated.
292        """
293        error_col = warnings
294        if isinstance(warnings, list):
295            error_col = ' / '.join(warnings)
296        row['--ERRORS--'] = error_col
297        writer.writerow(row)
298        return
[6259]299
[8220]300    def checkConversion(self, row, mode='ignore', ignore_empty=True):
[6847]301        """Validates all values in row.
302        """
303        converter = IObjectConverter(self.iface)
304        errs, inv_errs, conv_dict =  converter.fromStringDict(
[8220]305            row, self.factory_name, mode=mode)
[6847]306        return errs, inv_errs, conv_dict
307
[12810]308
309    def emptyRow(self, row):
310        """Detect empty rows.
311        """
312        for value in row.values():
[14424]313            if not value in (None, IGNORE_MARKER) and value.strip():
[12810]314                return False
315        return True
316
[4885]317    def doImport(self, path, headerfields, mode='create', user='Unknown',
[8220]318                 logger=None, ignore_empty=True):
[12869]319        """In contrast to most other methods, `doImport` is not supposed to
[12867]320        be customized, neither in custom packages nor in derived batch
321        processor classes. Therefore, this is the only place where we
322        do import data.
323
324        Before this method starts creating or updating persistent data, it
325        prepares two more files in a temporary folder of the filesystem: (1)
326        a file for pending data with file extension ``.pending`` and (2)
327        a file for successfully processed data with file extension
328        ``.finished``. Then the method starts iterating over all rows of
329        the CSV file. Each row is treated as follows:
330
331        1. An empty row is skipped.
332
[12997]333        2. Empty strings or lists (``[]``) in the row are replaced by
334           ignore markers.
[12867]335
[12868]336        3. The `BatchProcessor.checkConversion` method validates and converts
337           all values in the row. Conversion means the transformation of strings
338           into Python objects. For instance, number expressions have to be
339           transformed into integers, dates into datetime objects, phone number
340           expressions into phone number objects, etc. The converter returns a
341           dictionary with converted values or, if the validation of one of the
342           elements fails, an appropriate warning message. If the conversion
343           fails a pending record is created and stored in the pending data file
344           together with a warning message the converter has raised.
[12867]345
[12868]346        4. In **create mode** only:
[12867]347
[12868]348           The parent object must be found and a child
349           object with same object id must not exist. Otherwise the row
350           is skipped, a corresponding warning message is raised and a
351           record is stored in the pending data file.
[12867]352
[16012]353           The `BatchProcessor.checkCreateRequirements` method checks additional
354           requirements the parent object must fulfill before a new sububject
355           is being added. These requirements are not imposed by the data
356           type but the context of the object. For example, the course results
357           of graduated students must not changed by import, neither by
358           creating nor updating or removing course tickets.
359
[12869]360           Now `doImport` tries to add the new object with the data
[12868]361           from the conversion dictionary. In some cases this
[12869]362           may fail and a `DuplicationError` is raised. For example, a new
[12868]363           payment ticket is created but the same payment for same session
364           has already been made. In this case the object id is unique, no
365           other object with same id exists, but making the 'same' payment
366           twice does not make sense. The import is skipped and a
367           record is stored in the pending data file.
[12867]368
[12868]369        5. In **update mode** only:
370
371           If the object can't be found, the row is skipped,
372           a ``no such entry`` warning message is raised and a record is
373           stored in the pending data file.
374
375           The `BatchProcessor.checkUpdateRequirements` method checks additional
376           requirements the object must fulfill before being updated. These
377           requirements are not imposed by the data type but the context
378           of the object. For example, post-graduate students have a different
379           registration workflow. With this method we do forbid certain workflow
380           transitions or states.
381
[12869]382           Finally, `doImport` updates the existing object with the data
[12868]383           from the conversion dictionary.
384
385        6. In **remove mode** only:
386
387           If the object can't be found, the row is skipped,
388           a ``no such entry`` warning message is raised and a record is
389           stored in the pending data file.
390
[16012]391           The `BatchProcessor.checkRemoveRequirements` method checks additional
392           requirements the object must fulfill before being removed.
393           These requirements are not imposed by the data type but the context
394           of the object. For example, the course results of graduated students
395           must not changed by import, neither by creating nor updating or
396           removing course tickets.
397
[12869]398           Finally, `doImport` removes the existing object.
[12868]399
[4811]400        """
[4832]401        time_start = time.time()
[4806]402        self.checkHeaders(headerfields, mode)
[4832]403        mapping = self.getMapping(path, headerfields, mode)
[10027]404        reader = unicodecsv.DictReader(open(path, 'rb'))
[4889]405
[4900]406        temp_dir = tempfile.mkdtemp()
[6259]407
[6273]408        base = os.path.basename(path)
409        (base, ext) = os.path.splitext(base)
[4900]410        failed_path = os.path.join(temp_dir, "%s.pending%s" % (base, ext))
[6831]411        failed_headers = mapping.values()
[4877]412        failed_headers.append('--ERRORS--')
[10027]413        failed_writer = unicodecsv.DictWriter(open(failed_path, 'wb'),
414                                              failed_headers)
[8573]415        os.chmod(failed_path, 0664)
[6831]416        failed_writer.writerow(dict([(x,x) for x in failed_headers]))
[4891]417
[4900]418        finished_path = os.path.join(temp_dir, "%s.finished%s" % (base, ext))
[6831]419        finished_headers = mapping.values()
[10027]420        finished_writer = unicodecsv.DictWriter(open(finished_path, 'wb'),
421                                                finished_headers)
[8905]422        os.chmod(finished_path, 0664)
[4891]423        finished_writer.writerow(dict([(x,x) for x in finished_headers]))
[6259]424
[4806]425        num =0
[4878]426        num_warns = 0
[4806]427        site = grok.getSite()
[7859]428
[4806]429        for raw_row in reader:
430            num += 1
[12810]431            # Skip row if empty
432            if self.emptyRow(raw_row):
433                continue
[4806]434            string_row = self.applyMapping(raw_row, mapping)
[12981]435            if ignore_empty:
436                # Replace empty strings and empty lists with ignore-markers
[8222]437                for key, val in string_row.items():
[12981]438                    if val == '' or val == '[]':
[8222]439                        string_row[key] = IGNORE_MARKER
440            row = dict(string_row.items()) # create deep copy
[6847]441            errs, inv_errs, conv_dict = self.checkConversion(string_row, mode)
[6273]442            if errs or inv_errs:
[4878]443                num_warns += 1
[6273]444                conv_warnings = self.stringFromErrs(errs, inv_errs)
445                self.writeFailedRow(
[6824]446                    failed_writer, string_row, conv_warnings)
[4821]447                continue
[6273]448            row.update(conv_dict)
[6259]449
[4806]450            if mode == 'create':
451                if not self.parentsExist(row, site):
[4878]452                    num_warns += 1
[4877]453                    self.writeFailedRow(
[6824]454                        failed_writer, string_row,
[12868]455                        "Not all parents do exist yet.")
[4806]456                    continue
457                if self.entryExists(row, site):
[4878]458                    num_warns += 1
[4877]459                    self.writeFailedRow(
[6824]460                        failed_writer, string_row,
[12868]461                        "This object already exists.")
[4806]462                    continue
[16012]463                parent = self.getParent(row, site)
464                create_errors = self.checkCreateRequirements(parent, row, site)
465                if create_errors is not None:
466                    num_warns += 1
467                    self.writeFailedRow(
468                        failed_writer, string_row, create_errors)
469                    continue
[4806]470                obj = self.callFactory()
[7273]471                # Override all values in row, also
472                # student_ids and applicant_ids which have been
473                # generated in the respective __init__ methods before.
[9706]474                self.updateEntry(obj, row, site, base)
[6243]475                try:
476                    self.addEntry(obj, row, site)
[6273]477                except KeyError, error:
[6219]478                    num_warns += 1
479                    self.writeFailedRow(
[12868]480                        failed_writer, string_row, error.message)
[8540]481                    continue
[8509]482                except DuplicationError, error:
483                    num_warns += 1
484                    self.writeFailedRow(
[12868]485                        failed_writer, string_row, error.msg)
[6219]486                    continue
[15065]487                except FatalCSVError, error:
488                    num_warns += 1
489                    self.writeFailedRow(
490                        failed_writer, string_row, error.message)
491                    continue
[4806]492            elif mode == 'remove':
493                if not self.entryExists(row, site):
[4878]494                    num_warns += 1
[4877]495                    self.writeFailedRow(
[6824]496                        failed_writer, string_row,
[9219]497                        "Cannot remove: no such entry")
[4806]498                    continue
[16012]499                obj = self.getEntry(row, site)
500                remove_errors = self.checkRemoveRequirements(obj, row, site)
501                if remove_errors is not None:
502                    num_warns += 1
503                    self.writeFailedRow(
504                        failed_writer, string_row, remove_errors)
505                    continue
[4806]506                self.delEntry(row, site)
507            elif mode == 'update':
508                obj = self.getEntry(row, site)
509                if obj is None:
[4878]510                    num_warns += 1
[4877]511                    self.writeFailedRow(
[6824]512                        failed_writer, string_row,
[9219]513                        "Cannot update: no such entry")
[4806]514                    continue
[7950]515                update_errors = self.checkUpdateRequirements(obj, row, site)
[7938]516                if update_errors is not None:
[7937]517                    num_warns += 1
518                    self.writeFailedRow(
519                        failed_writer, string_row, update_errors)
520                    continue
[11849]521                try:
522                    self.updateEntry(obj, row, site, base)
523                except ConstraintNotSatisfied, err:
524                    num_warns += 1
525                    self.writeFailedRow(
526                        failed_writer, string_row,
527                        "ConstraintNotSatisfied: %s" % err)
[14976]528                    continue
[14552]529                except RequiredMissing, err:
530                    num_warns += 1
531                    self.writeFailedRow(
532                        failed_writer, string_row,
533                        "RequiredMissing: %s" % err)
[11849]534                    continue
[4891]535            finished_writer.writerow(string_row)
[4821]536
[4832]537        time_end = time.time()
538        timedelta = time_end - time_start
[6259]539
[4878]540        self.createLogfile(path, failed_path, num, num_warns, mode, user,
[4885]541                           timedelta, logger=logger)
[4894]542        failed_path = os.path.abspath(failed_path)
[4878]543        if num_warns == 0:
[4821]544            del failed_writer
545            os.unlink(failed_path)
[4894]546            failed_path = None
547        return (num, num_warns,
548                os.path.abspath(finished_path), failed_path)
[7859]549
[9032]550    def get_csv_skeleton(self):
551        """Export CSV file only with a header of available fields.
552
553        A raw string with CSV data should be returned.
554        """
555        outfile = StringIO()
[10027]556        writer = unicodecsv.DictWriter(outfile, self.available_fields)
[9734]557        writer.writerow(
558            dict(zip(self.available_fields, self.available_fields))) # header
[9032]559        outfile.seek(0)
560        return outfile.read()
561
[7859]562class ExporterBase(object):
563    """A base for exporters.
564    """
565    grok.implements(ICSVExporter)
566
567    #: Fieldnames considered by this exporter
568    fields = ('code', 'title', 'title_prefix')
569
[7907]570    #: The title under which this exporter will be displayed
571    #: (if registered as a utility)
572    title = 'Override this title'
573
[7859]574    def mangle_value(self, value, name, context=None):
[12857]575        """Hook for mangling values in derived classes.
[7859]576        """
577        if isinstance(value, bool):
578            value = value and '1' or '0'
579        elif isinstance(value, unicode):
580            # CSV writers like byte streams better than unicode
581            value = value.encode('utf-8')
[8380]582        elif isinstance(value, datetime.datetime):
[11737]583            #value = str(value)
584            value = str('%s#' % value) # changed 2014-07-06, see ticket #941
[8380]585        elif isinstance(value, datetime.date):
586            # Order is important here: check for date after datetime as
587            # datetimes are also dates.
588            #
589            # Append hash '#' to dates to circumvent unwanted excel automatic
590            value = str('%s#' % value)
[7859]591        elif value is None:
592            # None is not really representable in CSV files
593            value = ''
594        return value
595
596    def get_csv_writer(self, filepath=None):
597        """Get a CSV dict writer instance open for writing.
598
599        Returns a tuple (<writer>, <outfile>) where ``<writer>`` is a
600        :class:`csv.DictWriter` instance and outfile is the real file
601        which is written to. The latter is important when writing to
602        StringIO and can normally be ignored otherwise.
603
604        The returned file will already be filled with the header row.
605
606        Please note that if you give a filepath, the returned outfile
607        is open for writing only and you might have to close it before
608        reopening it for reading.
609        """
610        if filepath is None:
611            outfile = StringIO()
612        else:
613            outfile = open(filepath, 'wb')
[10027]614        writer = unicodecsv.DictWriter(outfile, self.fields)
[7859]615        writer.writerow(dict(zip(self.fields, self.fields))) # header
616        return writer, outfile
617
618    def write_item(self, obj, writer):
619        """Write a row extracted from `obj` into CSV file using `writer`.
620        """
621        row = {}
622        for name in self.fields:
623            value = getattr(obj, name, None)
624            value = self.mangle_value(value, name, obj)
625            row[name] = value
626        writer.writerow(row)
627        return
628
629    def close_outfile(self, filepath, outfile):
630        """Close outfile.
631        If filepath is None, the contents of outfile is returned.
632        """
633        outfile.seek(0)
634        if filepath is None:
635            return outfile.read()
636        outfile.close()
637        return
638
[9797]639    def get_filtered(self, site, **kw):
640        """Get datasets to export filtered by keyword arguments.
641        Returns an iterable.
642        """
643        raise NotImplementedError
644
[12516]645    def get_selected(self, site, selected):
646        """Get datasets to export for selected items
647        specified by a list of identifiers.
648        Returns an iterable.
649        """
650        raise NotImplementedError
651
[7859]652    def export(self, iterable, filepath=None):
653        """Export `iterable` as CSV file.
654        If `filepath` is ``None``, a raw string with CSV data should
655        be returned.
656        """
657        raise NotImplementedError
658
659    def export_all(self, site, filepath=None):
660        """Export all appropriate objects in `site` into `filepath` as
661        CSV data.
662        If `filepath` is ``None``, a raw string with CSV data should
663        be returned.
664        """
665        raise NotImplementedError
[9217]666
[9797]667    def export_filtered(self, site, filepath=None, **kw):
[12861]668        """Export items denoted by `kw`.
[9797]669        If `filepath` is ``None``, a raw string with CSV data should
670        be returned.
671        """
672        data = self.get_filtered(site, **kw)
673        return self.export(data, filepath=filepath)
674
[12516]675    def export_selected(self, site, filepath=None, **kw):
676        """Export those items specified by a list of identifiers
677        called `selected`.
678        If `filepath` is ``None``, a raw string with CSV data should
679        be returned.
680        """
681        selected = kw.get('selected', [])
682        data = self.get_selected(site, selected)
683        return self.export(data, filepath=filepath)
684
[9797]685def export_job(site, exporter_name, **kw):
[9217]686    """Export all entries delivered by exporter and store it in a temp file.
687
688    `site` gives the site to search. It will be passed to the exporter
689    and also be set as 'current site' as the function is used in
690    asynchronous jobs which run in their own threads and have no site
691    set initially. Therefore `site` must also be a valid value for use
692    with `zope.component.hooks.setSite()`.
693
694    `exporter_name` is the utility name under which the desired
695    exporter was registered with the ZCA.
696
697    The resulting CSV file will be stored in a new temporary directory
698    (using :func:`tempfile.mkdtemp`). It will be named after the
699    exporter used with `.csv` filename extension.
700
701    Returns the path to the created CSV file.
702
703    .. note:: It is the callers responsibility to clean up the used
704              file and its parent directory.
705    """
706    setSite(site)
707    exporter = getUtility(ICSVExporter, name=exporter_name)
708    output_dir = tempfile.mkdtemp()
709    filename = '%s.csv' % exporter_name
710    output_path = os.path.join(output_dir, filename)
[9797]711    if kw == {}:
712        exporter.export_all(site, filepath=output_path)
[12516]713    elif kw.has_key('selected'):
714        exporter.export_selected(site, filepath=output_path, **kw)
[9797]715    else:
716        exporter.export_filtered(site, filepath=output_path, **kw)
[9217]717    return output_path
718
719class AsyncExportJob(AsyncJob):
720    """An IJob that exports data to CSV files.
721
722    `AsyncExportJob` instances are regular `AsyncJob` instances with a
723    different constructor API. Instead of a callable to execute, you
724    must pass a `site` and some `exporter_name` to trigger an export.
725
726    The real work is done when an instance of this class is put into a
727    queue. See :mod:`waeup.kofa.async` to learn more about
728    asynchronous jobs.
729
730    The `exporter_name` must be the name under which an ICSVExporter
731    utility was registered with the ZCA.
732
733    The `site` must be a valid site  or ``None``.
734
735    The result of an `AsyncExportJob` is the path to generated CSV
736    file. The file will reside in a temporary directory that should be
737    removed after being used.
738    """
739    grok.implements(IExportJob)
740
[9718]741    def __init__(self, site, exporter_name, *args, **kwargs):
[9217]742        super(AsyncExportJob, self).__init__(
[9718]743            export_job, site, exporter_name, *args, **kwargs)
[9217]744
[9816]745    @property
746    def finished(self):
747        """A job is marked `finished` if it is completed.
748
749        Please note: a finished report job does not neccessarily
750        provide an IReport result. See meth:`failed`.
751        """
752        return self.status == zc.async.interfaces.COMPLETED
753
754    @property
755    def failed(self):
756        """A report job is marked failed iff it is finished and the
757        result is None.
758
759        While a job is unfinished, the `failed` status is ``None``.
760
761        Failed jobs normally provide a `traceback` to examine reasons.
762        """
763        if not self.finished:
764            return None
765        if getattr(self, 'result', None) is None:
766            return True
767        return False
768
[9217]769class ExportJobContainer(object):
770    """A mix-in that provides functionality for asynchronous export jobs.
771    """
772    grok.implements(IExportJobContainer)
773    running_exports = PersistentList()
774
[9718]775    def start_export_job(self, exporter_name, user_id, *args, **kwargs):
[9217]776        """Start asynchronous export job.
777
778        `exporter_name` is the name of an exporter utility to be used.
779
780        `user_id` is the ID of the user that triggers the export.
781
782        The job_id is stored along with exporter name and user id in a
783        persistent list.
784
[9718]785        The method supports additional positional and keyword
786        arguments, which are passed as-is to the respective
787        :class:`AsyncExportJob`.
788
[9217]789        Returns the job ID of the job started.
790        """
791        site = grok.getSite()
792        manager = getUtility(IJobManager)
[9718]793        job = AsyncExportJob(site, exporter_name, *args, **kwargs)
[9217]794        job_id = manager.put(job)
795        # Make sure that the persisted list is stored in ZODB
796        self.running_exports = PersistentList(self.running_exports)
797        self.running_exports.append((job_id, exporter_name, user_id))
798        return job_id
799
800    def get_running_export_jobs(self, user_id=None):
801        """Get export jobs for user with `user_id` as list of tuples.
802
803        Each tuples holds ``<job_id>, <exporter_name>, <user_id>`` in
804        that order. The ``<exporter_name>`` is the utility name of the
805        used exporter.
806
807        If `user_id` is ``None``, all running jobs are returned.
808        """
809        entries = []
810        to_delete = []
811        manager = getUtility(IJobManager)
812        for entry in self.running_exports:
813            if user_id is not None and entry[2] != user_id:
814                continue
815            if manager.get(entry[0]) is None:
816                to_delete.append(entry)
817                continue
818            entries.append(entry)
819        if to_delete:
820            self.running_exports = PersistentList(
821                [x for x in self.running_exports if x not in to_delete])
822        return entries
823
824    def get_export_jobs_status(self, user_id=None):
825        """Get running/completed export jobs for `user_id` as list of tuples.
826
827        Each tuple holds ``<raw status>, <status translated>,
828        <exporter title>`` in that order, where ``<status
829        translated>`` and ``<exporter title>`` are translated strings
830        representing the status of the job and the human readable
831        title of the exporter used.
832        """
833        entries = self.get_running_export_jobs(user_id)
834        result = []
835        manager = getUtility(IJobManager)
836        for entry in entries:
837            job = manager.get(entry[0])
838            if job is None:
839                continue
840            status, status_translated = JOB_STATUS_MAP[job.status]
841            exporter_name = getUtility(ICSVExporter, name=entry[1]).title
842            result.append((status, status_translated, exporter_name))
843        return result
844
845    def delete_export_entry(self, entry):
846        """Delete the export denoted by `entry`.
847
848        Removes given entry from the local `running_exports` list and also
849        removes the regarding job via the local job manager.
850
851        `entry` must be a tuple ``(<job id>, <exporter name>, <user
852        id>)`` as created by :meth:`start_export_job` or returned by
853        :meth:`get_running_export_jobs`.
854        """
855        manager = getUtility(IJobManager)
856        job = manager.get(entry[0])
857        if job is not None:
858            # remove created export file
859            if isinstance(job.result, basestring):
860                if os.path.exists(os.path.dirname(job.result)):
861                    shutil.rmtree(os.path.dirname(job.result))
862        manager.remove(entry[0], self)
863        new_entries = [x for x in self.running_exports
864                       if x != entry]
865        self.running_exports = PersistentList(new_entries)
866        return
867
868    def entry_from_job_id(self, job_id):
869        """Get entry tuple for `job_id`.
870
871        Returns ``None`` if no such entry can be found.
872        """
873        for entry in self.running_exports:
874            if entry[0] == job_id:
875                return entry
876        return None
[9726]877
878class VirtualExportJobContainer(ExportJobContainer):
879    """A virtual export job container.
880
881    Virtual ExportJobContainers can be used as a mixin just like real
882    ExportJobContainer.
883
884    They retrieve and store data in the site-wide ExportJobContainer.
885
886    Functionality is currently entirely as for regular
887    ExportJobContainers, except that data is stored elsewhere.
888
889    VirtualExportJobContainers need a registered
890    IExportContainerFinder utility to find a suitable container for
891    storing data.
892    """
893    grok.implements(IExportJobContainer)
894
895    @property
896    def _site_container(self):
897        return getUtility(IExportContainerFinder)()
898
899    # The following is a simple trick. While ExportJobContainers store
900    # only one attribute in ZODB, it is sufficient to replace this
901    # attribute `running_exports` with a suitable manager to make the
902    # whole virtual container work like the original but with the data
903    # stored in the site-wide exports container. This way, virtual
904    # export containers provide the whole functionality of a regular
905    # exports container but store no data at all with themselves.
906    @property
907    def running_exports(self):
908        """Exports stored in the site-wide exports container.
909        """
910        return self._site_container.running_exports
911
912    @running_exports.setter
913    def running_exports(self, value):
914        self._site_container.running_exports = value
915
916    @running_exports.deleter
917    def running_exports(self):
918        del self._site_container.running_exports
919
[9823]920    @property
921    def logger(self):
922        return self._site_container.logger
[9726]923
924@implementer(IExportContainerFinder)
925class ExportContainerFinder(grok.GlobalUtility):
926    """Finder for local (site-wide) export container.
927    """
928
929    def __call__(self):
930        """Get the local export container-
931
932        If no site can be determined or the site provides no export
933        container, None is returned.
934        """
935        site = grok.getSite()
936        if site is None:
937            return None
938        return site.get('datacenter', None)
Note: See TracBrowser for help on using the repository browser.