source: main/waeup.kofa/trunk/src/waeup/kofa/utils/batching.py @ 12868

Last change on this file since 12868 was 12868, checked in by Henrik Bettermann, 10 years ago

More docs.

  • Property svn:keywords set to Id
File size: 31.9 KB
Line 
1## $Id: batching.py 12868 2015-04-21 20:56:58Z henrik $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
18"""Kofa components for batch processing.
19
20Batch processors eat CSV files to add, update or remove large numbers
21of certain kinds of objects at once.
22"""
23import grok
24import datetime
25import os
26import shutil
27import tempfile
28import time
29import unicodecsv
30import zc.async.interfaces
31from cStringIO import StringIO
32from persistent.list import PersistentList
33from zope.component import createObject, getUtility
34from zope.component.hooks import setSite
35from zope.interface import Interface, implementer
36from zope.schema import getFields
37from zope.schema.interfaces import ConstraintNotSatisfied
38from zope.event import notify
39from waeup.kofa.async import AsyncJob
40from waeup.kofa.interfaces import (
41    IBatchProcessor, FatalCSVError, IObjectConverter, IJobManager,
42    ICSVExporter, IGNORE_MARKER, DuplicationError, JOB_STATUS_MAP,
43    IExportJobContainer, IExportJob, IExportContainerFinder)
44
45class BatchProcessor(grok.GlobalUtility):
46    """A processor to add, update, or remove data.
47
48    This is a non-active baseclass.
49    """
50    grok.implements(IBatchProcessor)
51    grok.context(Interface)
52    grok.baseclass()
53
54    # Name used in pages and forms...
55    name = u'Non-registered base processor'
56
57    # Internal name...
58    util_name = 'baseprocessor'
59
60    # Items for this processor need an interface with zope.schema fields.
61    iface = Interface
62
63    # The name must be the same as the util_name attribute in order to
64    # register this utility correctly.
65    grok.name(util_name)
66
67    # Headers needed to locate items...
68    location_fields = ['code', 'faculty_code']
69
70    # A factory with this name must be registered...
71    factory_name = 'waeup.Department'
72
73    @property
74    def required_fields(self):
75        """Required fields that have no default.
76
77        A list of names of field, whose value cannot be set if not
78        given during creation. Therefore these fields must exist in
79        input.
80
81        Fields with a default != missing_value do not belong to this
82        category.
83        """
84        result = []
85        for key, field in getFields(self.iface).items():
86            if key in self.location_fields:
87                continue
88            if field.default is not field.missing_value:
89                continue
90            if field.required:
91                result.append(key)
92        return result
93
94    @property
95    def req(self):
96        result = dict(
97            create = self.location_fields + self.required_fields,
98            update = self.location_fields,
99            remove = self.location_fields,
100        )
101        return result
102
103    @property
104    def available_fields(self):
105        return sorted(list(set(
106                    self.location_fields + getFields(self.iface).keys())))
107
108    def getHeaders(self, mode='create'):
109        return self.available_fields
110
111    def checkHeaders(self, headerfields, mode='create'):
112        req = self.req[mode]
113        # Check for required fields...
114        for field in req:
115            if not field in headerfields:
116                raise FatalCSVError(
117                    "Need at least columns %s for import!" %
118                    ', '.join(["'%s'" % x for x in req]))
119        # Check for double fields. Cannot happen because this error is
120        # already catched in views
121        not_ignored_fields = [x for x in headerfields
122                              if not x.startswith('--')]
123        if len(set(not_ignored_fields)) < len(not_ignored_fields):
124            raise FatalCSVError(
125                "Double headers: each column name may only appear once.")
126        return True
127
128    def applyMapping(self, row, mapping):
129        """Apply mapping to a row of CSV data.
130
131        """
132        result = dict()
133        for key, replacement in mapping.items():
134            if replacement == u'--IGNORE--':
135                # Skip ignored columns in failed and finished data files.
136                continue
137            result[replacement] = row[key]
138        return result
139
140    def getMapping(self, path, headerfields, mode):
141        """Get a mapping from CSV file headerfields to actually used fieldnames.
142
143        """
144        result = dict()
145        reader = unicodecsv.reader(open(path, 'rb'))
146        raw_header = reader.next()
147        for num, field in enumerate(headerfields):
148            if field not in self.location_fields and mode == 'remove':
149                # Skip non-location fields when removing.
150                continue
151            if field == u'--IGNORE--':
152                # Skip ignored columns in failed and finished data files.
153                continue
154            result[raw_header[num]] = field
155        return result
156
157    def stringFromErrs(self, errors, inv_errors):
158        result = []
159        for err in errors:
160            fieldname, message = err
161            result.append("%s: %s" % (fieldname, message))
162        for err in inv_errors:
163            result.append("invariant: %s" % err)
164        return '; '.join(result)
165
166    def callFactory(self, *args, **kw):
167        return createObject(self.factory_name)
168
169    def parentsExist(self, row, site):
170        """Tell whether the parent object for data in ``row`` exists.
171        """
172        raise NotImplementedError('method not implemented')
173
174    def entryExists(self, row, site):
175        """Tell whether there already exists an entry for ``row`` data.
176        """
177        raise NotImplementedError('method not implemented')
178
179    def getParent(self, row, site):
180        """Get the parent object for the entry in ``row``.
181        """
182        raise NotImplementedError('method not implemented')
183
184    def getEntry(self, row, site):
185        """Get the object for the entry in ``row``.
186        """
187        raise NotImplementedError('method not implemented')
188
189    def addEntry(self, obj, row, site):
190        """Add the entry given given by ``row`` data.
191        """
192        raise NotImplementedError('method not implemented')
193
194    def delEntry(self, row, site):
195        """Delete entry given by ``row`` data.
196        """
197        raise NotImplementedError('method not implemented')
198
199    def checkUpdateRequirements(self, obj, row, site):
200        """Checks requirements the object must fulfill when being updated.
201
202        This method is not used in case of deleting or adding objects.
203
204        Returns error messages as strings in case of requirement
205        problems.
206        """
207        return None
208
209    def updateEntry(self, obj, row, site, filename):
210        """Update obj to the values given in row.
211
212        Returns a string describing the fields changed.
213        """
214        changed = []
215        for key, value in row.items():
216            # Skip fields to be ignored.
217            if value == IGNORE_MARKER:
218                continue
219            # Skip fields not declared in interface and which are
220            # not yet attributes of existing objects. We can thus not
221            # add non-existing attributes here.
222            if not hasattr(obj, key):
223                continue
224            try:
225                setattr(obj, key, value)
226            except AttributeError:
227                # Computed attributes can't be set.
228                continue
229            log_value = getattr(value, 'code', value)
230            changed.append('%s=%s' % (key, log_value))
231
232        # If any catalog is involved it must be updated.
233        #
234        # XXX: The event is also triggered when creating objects as
235        # updateEntry is called also when creating entries resulting
236        # in objectAdded and additional objectModified events.
237        if len(changed):
238            notify(grok.ObjectModifiedEvent(obj))
239
240        return ', '.join(changed)
241
242    def createLogfile(self, path, fail_path, num, warnings, mode, user,
243                      timedelta, logger=None):
244        """Write to log file.
245        """
246        if logger is None:
247            return
248        logger.info(
249            "processed: %s, %s mode, %s lines (%s successful/ %s failed), "
250            "%0.3f s (%0.4f s/item)" % (
251            path, mode, num, num - warnings, warnings,
252            timedelta, timedelta/(num or 1)))
253        return
254
255    def writeFailedRow(self, writer, row, warnings):
256        """Write a row with error messages to error CSV.
257
258        If warnings is a list of strings, they will be concatenated.
259        """
260        error_col = warnings
261        if isinstance(warnings, list):
262            error_col = ' / '.join(warnings)
263        row['--ERRORS--'] = error_col
264        writer.writerow(row)
265        return
266
267    def checkConversion(self, row, mode='ignore', ignore_empty=True):
268        """Validates all values in row.
269        """
270        converter = IObjectConverter(self.iface)
271        errs, inv_errs, conv_dict =  converter.fromStringDict(
272            row, self.factory_name, mode=mode)
273        return errs, inv_errs, conv_dict
274
275
276    def emptyRow(self, row):
277        """Detect empty rows.
278        """
279        for value in row.values():
280            if value.strip() and not value in (None, IGNORE_MARKER):
281                return False
282        return True
283
284    def doImport(self, path, headerfields, mode='create', user='Unknown',
285                 logger=None, ignore_empty=True):
286        """In contrast to most other methods, ``doImport`` is not supposed to
287        be customized, neither in custom packages nor in derived batch
288        processor classes. Therefore, this is the only place where we
289        do import data.
290
291        Before this method starts creating or updating persistent data, it
292        prepares two more files in a temporary folder of the filesystem: (1)
293        a file for pending data with file extension ``.pending`` and (2)
294        a file for successfully processed data with file extension
295        ``.finished``. Then the method starts iterating over all rows of
296        the CSV file. Each row is treated as follows:
297
298        1. An empty row is skipped.
299
300        2. Empty strings in the row are replaced by ignore-markers.
301
302        3. The `BatchProcessor.checkConversion` method validates and converts
303           all values in the row. Conversion means the transformation of strings
304           into Python objects. For instance, number expressions have to be
305           transformed into integers, dates into datetime objects, phone number
306           expressions into phone number objects, etc. The converter returns a
307           dictionary with converted values or, if the validation of one of the
308           elements fails, an appropriate warning message. If the conversion
309           fails a pending record is created and stored in the pending data file
310           together with a warning message the converter has raised.
311
312        4. In **create mode** only:
313
314           The parent object must be found and a child
315           object with same object id must not exist. Otherwise the row
316           is skipped, a corresponding warning message is raised and a
317           record is stored in the pending data file.
318
319           Now ``doImport`` tries to add the new object with the data
320           from the conversion dictionary. In some cases this
321           may fail and a DuplicationError is raised. For example, a new
322           payment ticket is created but the same payment for same session
323           has already been made. In this case the object id is unique, no
324           other object with same id exists, but making the 'same' payment
325           twice does not make sense. The import is skipped and a
326           record is stored in the pending data file.
327
328        5. In **update mode** only:
329
330           If the object can't be found, the row is skipped,
331           a ``no such entry`` warning message is raised and a record is
332           stored in the pending data file.
333
334           The `BatchProcessor.checkUpdateRequirements` method checks additional
335           requirements the object must fulfill before being updated. These
336           requirements are not imposed by the data type but the context
337           of the object. For example, post-graduate students have a different
338           registration workflow. With this method we do forbid certain workflow
339           transitions or states.
340
341           Finally, ``doImport`` updates the existing object with the data
342           from the conversion dictionary.
343
344        6. In **remove mode** only:
345
346           If the object can't be found, the row is skipped,
347           a ``no such entry`` warning message is raised and a record is
348           stored in the pending data file.
349
350           Finally, ``doImport`` removes the existing object.
351
352        """
353        time_start = time.time()
354        self.checkHeaders(headerfields, mode)
355        mapping = self.getMapping(path, headerfields, mode)
356        reader = unicodecsv.DictReader(open(path, 'rb'))
357
358        temp_dir = tempfile.mkdtemp()
359
360        base = os.path.basename(path)
361        (base, ext) = os.path.splitext(base)
362        failed_path = os.path.join(temp_dir, "%s.pending%s" % (base, ext))
363        failed_headers = mapping.values()
364        failed_headers.append('--ERRORS--')
365        failed_writer = unicodecsv.DictWriter(open(failed_path, 'wb'),
366                                              failed_headers)
367        os.chmod(failed_path, 0664)
368        failed_writer.writerow(dict([(x,x) for x in failed_headers]))
369
370        finished_path = os.path.join(temp_dir, "%s.finished%s" % (base, ext))
371        finished_headers = mapping.values()
372        finished_writer = unicodecsv.DictWriter(open(finished_path, 'wb'),
373                                                finished_headers)
374        os.chmod(finished_path, 0664)
375        finished_writer.writerow(dict([(x,x) for x in finished_headers]))
376
377        num =0
378        num_warns = 0
379        site = grok.getSite()
380
381        for raw_row in reader:
382            num += 1
383            # Skip row if empty
384            if self.emptyRow(raw_row):
385                continue
386            string_row = self.applyMapping(raw_row, mapping)
387            if ignore_empty and mode in ('update',):
388                # Replace empty strings with ignore-markers
389                for key, val in string_row.items():
390                    if val == '':
391                        string_row[key] = IGNORE_MARKER
392            row = dict(string_row.items()) # create deep copy
393            errs, inv_errs, conv_dict = self.checkConversion(string_row, mode)
394            if errs or inv_errs:
395                num_warns += 1
396                conv_warnings = self.stringFromErrs(errs, inv_errs)
397                self.writeFailedRow(
398                    failed_writer, string_row, conv_warnings)
399                continue
400            row.update(conv_dict)
401
402            if mode == 'create':
403                if not self.parentsExist(row, site):
404                    num_warns += 1
405                    self.writeFailedRow(
406                        failed_writer, string_row,
407                        "Not all parents do exist yet.")
408                    continue
409                if self.entryExists(row, site):
410                    num_warns += 1
411                    self.writeFailedRow(
412                        failed_writer, string_row,
413                        "This object already exists.")
414                    continue
415                obj = self.callFactory()
416                # Override all values in row, also
417                # student_ids and applicant_ids which have been
418                # generated in the respective __init__ methods before.
419                self.updateEntry(obj, row, site, base)
420                try:
421                    self.addEntry(obj, row, site)
422                except KeyError, error:
423                    num_warns += 1
424                    self.writeFailedRow(
425                        failed_writer, string_row, error.message)
426                    continue
427                except DuplicationError, error:
428                    num_warns += 1
429                    self.writeFailedRow(
430                        failed_writer, string_row, error.msg)
431                    continue
432            elif mode == 'remove':
433                if not self.entryExists(row, site):
434                    num_warns += 1
435                    self.writeFailedRow(
436                        failed_writer, string_row,
437                        "Cannot remove: no such entry")
438                    continue
439                self.delEntry(row, site)
440            elif mode == 'update':
441                obj = self.getEntry(row, site)
442                if obj is None:
443                    num_warns += 1
444                    self.writeFailedRow(
445                        failed_writer, string_row,
446                        "Cannot update: no such entry")
447                    continue
448                update_errors = self.checkUpdateRequirements(obj, row, site)
449                if update_errors is not None:
450                    num_warns += 1
451                    self.writeFailedRow(
452                        failed_writer, string_row, update_errors)
453                    continue
454                try:
455                    self.updateEntry(obj, row, site, base)
456                except ConstraintNotSatisfied, err:
457                    num_warns += 1
458                    self.writeFailedRow(
459                        failed_writer, string_row,
460                        "ConstraintNotSatisfied: %s" % err)
461                    continue
462            finished_writer.writerow(string_row)
463
464        time_end = time.time()
465        timedelta = time_end - time_start
466
467        self.createLogfile(path, failed_path, num, num_warns, mode, user,
468                           timedelta, logger=logger)
469        failed_path = os.path.abspath(failed_path)
470        if num_warns == 0:
471            del failed_writer
472            os.unlink(failed_path)
473            failed_path = None
474        return (num, num_warns,
475                os.path.abspath(finished_path), failed_path)
476
477    def get_csv_skeleton(self):
478        """Export CSV file only with a header of available fields.
479
480        A raw string with CSV data should be returned.
481        """
482        outfile = StringIO()
483        writer = unicodecsv.DictWriter(outfile, self.available_fields)
484        writer.writerow(
485            dict(zip(self.available_fields, self.available_fields))) # header
486        outfile.seek(0)
487        return outfile.read()
488
489class ExporterBase(object):
490    """A base for exporters.
491    """
492    grok.implements(ICSVExporter)
493
494    #: Fieldnames considered by this exporter
495    fields = ('code', 'title', 'title_prefix')
496
497    #: The title under which this exporter will be displayed
498    #: (if registered as a utility)
499    title = 'Override this title'
500
501    def mangle_value(self, value, name, context=None):
502        """Hook for mangling values in derived classes.
503        """
504        if isinstance(value, bool):
505            value = value and '1' or '0'
506        elif isinstance(value, unicode):
507            # CSV writers like byte streams better than unicode
508            value = value.encode('utf-8')
509        elif isinstance(value, datetime.datetime):
510            #value = str(value)
511            value = str('%s#' % value) # changed 2014-07-06, see ticket #941
512        elif isinstance(value, datetime.date):
513            # Order is important here: check for date after datetime as
514            # datetimes are also dates.
515            #
516            # Append hash '#' to dates to circumvent unwanted excel automatic
517            value = str('%s#' % value)
518        elif value is None:
519            # None is not really representable in CSV files
520            value = ''
521        return value
522
523    def get_csv_writer(self, filepath=None):
524        """Get a CSV dict writer instance open for writing.
525
526        Returns a tuple (<writer>, <outfile>) where ``<writer>`` is a
527        :class:`csv.DictWriter` instance and outfile is the real file
528        which is written to. The latter is important when writing to
529        StringIO and can normally be ignored otherwise.
530
531        The returned file will already be filled with the header row.
532
533        Please note that if you give a filepath, the returned outfile
534        is open for writing only and you might have to close it before
535        reopening it for reading.
536        """
537        if filepath is None:
538            outfile = StringIO()
539        else:
540            outfile = open(filepath, 'wb')
541        writer = unicodecsv.DictWriter(outfile, self.fields)
542        writer.writerow(dict(zip(self.fields, self.fields))) # header
543        return writer, outfile
544
545    def write_item(self, obj, writer):
546        """Write a row extracted from `obj` into CSV file using `writer`.
547        """
548        row = {}
549        for name in self.fields:
550            value = getattr(obj, name, None)
551            value = self.mangle_value(value, name, obj)
552            row[name] = value
553        writer.writerow(row)
554        return
555
556    def close_outfile(self, filepath, outfile):
557        """Close outfile.
558        If filepath is None, the contents of outfile is returned.
559        """
560        outfile.seek(0)
561        if filepath is None:
562            return outfile.read()
563        outfile.close()
564        return
565
566    def get_filtered(self, site, **kw):
567        """Get datasets to export filtered by keyword arguments.
568        Returns an iterable.
569        """
570        raise NotImplementedError
571
572    def get_selected(self, site, selected):
573        """Get datasets to export for selected items
574        specified by a list of identifiers.
575        Returns an iterable.
576        """
577        raise NotImplementedError
578
579    def export(self, iterable, filepath=None):
580        """Export `iterable` as CSV file.
581        If `filepath` is ``None``, a raw string with CSV data should
582        be returned.
583        """
584        raise NotImplementedError
585
586    def export_all(self, site, filepath=None):
587        """Export all appropriate objects in `site` into `filepath` as
588        CSV data.
589        If `filepath` is ``None``, a raw string with CSV data should
590        be returned.
591        """
592        raise NotImplementedError
593
594    def export_filtered(self, site, filepath=None, **kw):
595        """Export items denoted by `kw`.
596        If `filepath` is ``None``, a raw string with CSV data should
597        be returned.
598        """
599        data = self.get_filtered(site, **kw)
600        return self.export(data, filepath=filepath)
601
602    def export_selected(self, site, filepath=None, **kw):
603        """Export those items specified by a list of identifiers
604        called `selected`.
605        If `filepath` is ``None``, a raw string with CSV data should
606        be returned.
607        """
608        selected = kw.get('selected', [])
609        data = self.get_selected(site, selected)
610        return self.export(data, filepath=filepath)
611
612def export_job(site, exporter_name, **kw):
613    """Export all entries delivered by exporter and store it in a temp file.
614
615    `site` gives the site to search. It will be passed to the exporter
616    and also be set as 'current site' as the function is used in
617    asynchronous jobs which run in their own threads and have no site
618    set initially. Therefore `site` must also be a valid value for use
619    with `zope.component.hooks.setSite()`.
620
621    `exporter_name` is the utility name under which the desired
622    exporter was registered with the ZCA.
623
624    The resulting CSV file will be stored in a new temporary directory
625    (using :func:`tempfile.mkdtemp`). It will be named after the
626    exporter used with `.csv` filename extension.
627
628    Returns the path to the created CSV file.
629
630    .. note:: It is the callers responsibility to clean up the used
631              file and its parent directory.
632    """
633    setSite(site)
634    exporter = getUtility(ICSVExporter, name=exporter_name)
635    output_dir = tempfile.mkdtemp()
636    filename = '%s.csv' % exporter_name
637    output_path = os.path.join(output_dir, filename)
638    if kw == {}:
639        exporter.export_all(site, filepath=output_path)
640    elif kw.has_key('selected'):
641        exporter.export_selected(site, filepath=output_path, **kw)
642    else:
643        exporter.export_filtered(site, filepath=output_path, **kw)
644    return output_path
645
646class AsyncExportJob(AsyncJob):
647    """An IJob that exports data to CSV files.
648
649    `AsyncExportJob` instances are regular `AsyncJob` instances with a
650    different constructor API. Instead of a callable to execute, you
651    must pass a `site` and some `exporter_name` to trigger an export.
652
653    The real work is done when an instance of this class is put into a
654    queue. See :mod:`waeup.kofa.async` to learn more about
655    asynchronous jobs.
656
657    The `exporter_name` must be the name under which an ICSVExporter
658    utility was registered with the ZCA.
659
660    The `site` must be a valid site  or ``None``.
661
662    The result of an `AsyncExportJob` is the path to generated CSV
663    file. The file will reside in a temporary directory that should be
664    removed after being used.
665    """
666    grok.implements(IExportJob)
667
668    def __init__(self, site, exporter_name, *args, **kwargs):
669        super(AsyncExportJob, self).__init__(
670            export_job, site, exporter_name, *args, **kwargs)
671
672    @property
673    def finished(self):
674        """A job is marked `finished` if it is completed.
675
676        Please note: a finished report job does not neccessarily
677        provide an IReport result. See meth:`failed`.
678        """
679        return self.status == zc.async.interfaces.COMPLETED
680
681    @property
682    def failed(self):
683        """A report job is marked failed iff it is finished and the
684        result is None.
685
686        While a job is unfinished, the `failed` status is ``None``.
687
688        Failed jobs normally provide a `traceback` to examine reasons.
689        """
690        if not self.finished:
691            return None
692        if getattr(self, 'result', None) is None:
693            return True
694        return False
695
696class ExportJobContainer(object):
697    """A mix-in that provides functionality for asynchronous export jobs.
698    """
699    grok.implements(IExportJobContainer)
700    running_exports = PersistentList()
701
702    def start_export_job(self, exporter_name, user_id, *args, **kwargs):
703        """Start asynchronous export job.
704
705        `exporter_name` is the name of an exporter utility to be used.
706
707        `user_id` is the ID of the user that triggers the export.
708
709        The job_id is stored along with exporter name and user id in a
710        persistent list.
711
712        The method supports additional positional and keyword
713        arguments, which are passed as-is to the respective
714        :class:`AsyncExportJob`.
715
716        Returns the job ID of the job started.
717        """
718        site = grok.getSite()
719        manager = getUtility(IJobManager)
720        job = AsyncExportJob(site, exporter_name, *args, **kwargs)
721        job_id = manager.put(job)
722        # Make sure that the persisted list is stored in ZODB
723        self.running_exports = PersistentList(self.running_exports)
724        self.running_exports.append((job_id, exporter_name, user_id))
725        return job_id
726
727    def get_running_export_jobs(self, user_id=None):
728        """Get export jobs for user with `user_id` as list of tuples.
729
730        Each tuples holds ``<job_id>, <exporter_name>, <user_id>`` in
731        that order. The ``<exporter_name>`` is the utility name of the
732        used exporter.
733
734        If `user_id` is ``None``, all running jobs are returned.
735        """
736        entries = []
737        to_delete = []
738        manager = getUtility(IJobManager)
739        for entry in self.running_exports:
740            if user_id is not None and entry[2] != user_id:
741                continue
742            if manager.get(entry[0]) is None:
743                to_delete.append(entry)
744                continue
745            entries.append(entry)
746        if to_delete:
747            self.running_exports = PersistentList(
748                [x for x in self.running_exports if x not in to_delete])
749        return entries
750
751    def get_export_jobs_status(self, user_id=None):
752        """Get running/completed export jobs for `user_id` as list of tuples.
753
754        Each tuple holds ``<raw status>, <status translated>,
755        <exporter title>`` in that order, where ``<status
756        translated>`` and ``<exporter title>`` are translated strings
757        representing the status of the job and the human readable
758        title of the exporter used.
759        """
760        entries = self.get_running_export_jobs(user_id)
761        result = []
762        manager = getUtility(IJobManager)
763        for entry in entries:
764            job = manager.get(entry[0])
765            if job is None:
766                continue
767            status, status_translated = JOB_STATUS_MAP[job.status]
768            exporter_name = getUtility(ICSVExporter, name=entry[1]).title
769            result.append((status, status_translated, exporter_name))
770        return result
771
772    def delete_export_entry(self, entry):
773        """Delete the export denoted by `entry`.
774
775        Removes given entry from the local `running_exports` list and also
776        removes the regarding job via the local job manager.
777
778        `entry` must be a tuple ``(<job id>, <exporter name>, <user
779        id>)`` as created by :meth:`start_export_job` or returned by
780        :meth:`get_running_export_jobs`.
781        """
782        manager = getUtility(IJobManager)
783        job = manager.get(entry[0])
784        if job is not None:
785            # remove created export file
786            if isinstance(job.result, basestring):
787                if os.path.exists(os.path.dirname(job.result)):
788                    shutil.rmtree(os.path.dirname(job.result))
789        manager.remove(entry[0], self)
790        new_entries = [x for x in self.running_exports
791                       if x != entry]
792        self.running_exports = PersistentList(new_entries)
793        return
794
795    def entry_from_job_id(self, job_id):
796        """Get entry tuple for `job_id`.
797
798        Returns ``None`` if no such entry can be found.
799        """
800        for entry in self.running_exports:
801            if entry[0] == job_id:
802                return entry
803        return None
804
805class VirtualExportJobContainer(ExportJobContainer):
806    """A virtual export job container.
807
808    Virtual ExportJobContainers can be used as a mixin just like real
809    ExportJobContainer.
810
811    They retrieve and store data in the site-wide ExportJobContainer.
812
813    Functionality is currently entirely as for regular
814    ExportJobContainers, except that data is stored elsewhere.
815
816    VirtualExportJobContainers need a registered
817    IExportContainerFinder utility to find a suitable container for
818    storing data.
819    """
820    grok.implements(IExportJobContainer)
821
822    @property
823    def _site_container(self):
824        return getUtility(IExportContainerFinder)()
825
826    # The following is a simple trick. While ExportJobContainers store
827    # only one attribute in ZODB, it is sufficient to replace this
828    # attribute `running_exports` with a suitable manager to make the
829    # whole virtual container work like the original but with the data
830    # stored in the site-wide exports container. This way, virtual
831    # export containers provide the whole functionality of a regular
832    # exports container but store no data at all with themselves.
833    @property
834    def running_exports(self):
835        """Exports stored in the site-wide exports container.
836        """
837        return self._site_container.running_exports
838
839    @running_exports.setter
840    def running_exports(self, value):
841        self._site_container.running_exports = value
842
843    @running_exports.deleter
844    def running_exports(self):
845        del self._site_container.running_exports
846
847    @property
848    def logger(self):
849        return self._site_container.logger
850
851@implementer(IExportContainerFinder)
852class ExportContainerFinder(grok.GlobalUtility):
853    """Finder for local (site-wide) export container.
854    """
855
856    def __call__(self):
857        """Get the local export container-
858
859        If no site can be determined or the site provides no export
860        container, None is returned.
861        """
862        site = grok.getSite()
863        if site is None:
864            return None
865        return site.get('datacenter', None)
Note: See TracBrowser for help on using the repository browser.