source: main/waeup.kofa/trunk/src/waeup/kofa/utils/batching.py @ 12571

Last change on this file since 12571 was 12516, checked in by Henrik Bettermann, 10 years ago

Export datasets of selected items specified by a list of identifiers. Will be used in students and applicants modules.

  • Property svn:keywords set to Id
File size: 28.5 KB
RevLine 
[7196]1## $Id: batching.py 12516 2015-01-27 08:31:34Z henrik $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
[7819]18"""Kofa components for batch processing.
[4806]19
20Batch processors eat CSV files to add, update or remove large numbers
21of certain kinds of objects at once.
22"""
23import grok
[8380]24import datetime
[4821]25import os
[9217]26import shutil
[4900]27import tempfile
[4821]28import time
[10027]29import unicodecsv
[9816]30import zc.async.interfaces
[7859]31from cStringIO import StringIO
[9217]32from persistent.list import PersistentList
33from zope.component import createObject, getUtility
34from zope.component.hooks import setSite
[9726]35from zope.interface import Interface, implementer
[4806]36from zope.schema import getFields
[11849]37from zope.schema.interfaces import ConstraintNotSatisfied
[8332]38from zope.event import notify
[9217]39from waeup.kofa.async import AsyncJob
[7811]40from waeup.kofa.interfaces import (
[9217]41    IBatchProcessor, FatalCSVError, IObjectConverter, IJobManager,
42    ICSVExporter, IGNORE_MARKER, DuplicationError, JOB_STATUS_MAP,
[9726]43    IExportJobContainer, IExportJob, IExportContainerFinder)
[4806]44
45class BatchProcessor(grok.GlobalUtility):
46    """A processor to add, update, or remove data.
47
48    This is a non-active baseclass.
49    """
[8220]50    grok.implements(IBatchProcessor)
[4806]51    grok.context(Interface)
52    grok.baseclass()
53
54    # Name used in pages and forms...
[7933]55    name = u'Non-registered base processor'
[6259]56
[4806]57    # Internal name...
[7933]58    util_name = 'baseprocessor'
[6259]59
[4806]60    # Items for this processor need an interface with zope.schema fields.
[5009]61    iface = Interface
[6259]62
[4806]63    # The name must be the same as the util_name attribute in order to
64    # register this utility correctly.
65    grok.name(util_name)
66
67    # Headers needed to locate items...
68    location_fields = ['code', 'faculty_code']
[6259]69
[4806]70    # A factory with this name must be registered...
71    factory_name = 'waeup.Department'
72
73    @property
74    def required_fields(self):
[4829]75        """Required fields that have no default.
76
77        A list of names of field, whose value cannot be set if not
78        given during creation. Therefore these fields must exist in
79        input.
80
81        Fields with a default != missing_value do not belong to this
82        category.
83        """
[4806]84        result = []
85        for key, field in getFields(self.iface).items():
86            if key in self.location_fields:
87                continue
[4829]88            if field.default is not field.missing_value:
89                continue
[4806]90            if field.required:
91                result.append(key)
92        return result
[6259]93
[4806]94    @property
95    def req(self):
96        result = dict(
97            create = self.location_fields + self.required_fields,
98            update = self.location_fields,
99            remove = self.location_fields,
100        )
101        return result
102
103    @property
104    def available_fields(self):
105        return sorted(list(set(
106                    self.location_fields + getFields(self.iface).keys())))
[6259]107
[4806]108    def getHeaders(self, mode='create'):
109        return self.available_fields
110
111    def checkHeaders(self, headerfields, mode='create'):
112        req = self.req[mode]
113        # Check for required fields...
114        for field in req:
115            if not field in headerfields:
116                raise FatalCSVError(
117                    "Need at least columns %s for import!" %
118                    ', '.join(["'%s'" % x for x in req]))
[6828]119        # Check for double fields. Cannot happen because this error is
120        # already catched in views
[4806]121        not_ignored_fields = [x for x in headerfields
122                              if not x.startswith('--')]
123        if len(set(not_ignored_fields)) < len(not_ignored_fields):
124            raise FatalCSVError(
125                "Double headers: each column name may only appear once.")
126        return True
127
128    def applyMapping(self, row, mapping):
[4811]129        """Apply mapping to a row of CSV data.
[6824]130
[4811]131        """
[4806]132        result = dict()
133        for key, replacement in mapping.items():
[6824]134            if replacement == u'--IGNORE--':
135                # Skip ignored columns in failed and finished data files.
136                continue
[4806]137            result[replacement] = row[key]
138        return result
[6259]139
[4832]140    def getMapping(self, path, headerfields, mode):
[6824]141        """Get a mapping from CSV file headerfields to actually used fieldnames.
142
[4811]143        """
[4832]144        result = dict()
[10027]145        reader = unicodecsv.reader(open(path, 'rb'))
[4806]146        raw_header = reader.next()
[4832]147        for num, field in enumerate(headerfields):
148            if field not in self.location_fields and mode == 'remove':
[6824]149                # Skip non-location fields when removing.
150                continue
151            if field == u'--IGNORE--':
152                # Skip ignored columns in failed and finished data files.
153                continue
[4832]154            result[raw_header[num]] = field
155        return result
[4806]156
[6273]157    def stringFromErrs(self, errors, inv_errors):
158        result = []
159        for err in errors:
160            fieldname, message = err
161            result.append("%s: %s" % (fieldname, message))
162        for err in inv_errors:
163            result.append("invariant: %s" % err)
164        return '; '.join(result)
165
[4806]166    def callFactory(self, *args, **kw):
167        return createObject(self.factory_name)
168
169    def parentsExist(self, row, site):
[4811]170        """Tell whether the parent object for data in ``row`` exists.
171        """
[4806]172        raise NotImplementedError('method not implemented')
173
174    def entryExists(self, row, site):
[4811]175        """Tell whether there already exists an entry for ``row`` data.
176        """
[4806]177        raise NotImplementedError('method not implemented')
178
179    def getParent(self, row, site):
[4811]180        """Get the parent object for the entry in ``row``.
181        """
[4806]182        raise NotImplementedError('method not implemented')
[6259]183
[5009]184    def getEntry(self, row, site):
[12513]185        """Get the object for the entry in ``row``.
[5009]186        """
187        raise NotImplementedError('method not implemented')
[6259]188
[4806]189    def addEntry(self, obj, row, site):
[4811]190        """Add the entry given given by ``row`` data.
191        """
[4806]192        raise NotImplementedError('method not implemented')
193
194    def delEntry(self, row, site):
[4811]195        """Delete entry given by ``row`` data.
196        """
[6259]197        raise NotImplementedError('method not implemented')
[4806]198
[7950]199    def checkUpdateRequirements(self, obj, row, site):
200        """Checks requirements the object must fulfill when being updated.
[7938]201
202        This method is not used in case of deleting or adding objects.
203
[7950]204        Returns error messages as strings in case of requirement
[7938]205        problems.
[7937]206        """
[7938]207        return None
[7937]208
[9706]209    def updateEntry(self, obj, row, site, filename):
[4984]210        """Update obj to the values given in row.
[8220]211
212        Returns a string describing the fields changed.
[4984]213        """
[8220]214        changed = []
[4829]215        for key, value in row.items():
[8220]216            # Skip fields to be ignored.
217            if value == IGNORE_MARKER:
218                continue
[8304]219            # Skip fields not declared in interface and which are
220            # not yet attributes of existing objects. We can thus not
221            # add non-existing attributes here.
[8220]222            if not hasattr(obj, key):
223                continue
[9265]224            try:
225                setattr(obj, key, value)
226            except AttributeError:
227                # Computed attributes can't be set.
228                continue
[8222]229            log_value = getattr(value, 'code', value)
230            changed.append('%s=%s' % (key, log_value))
[8332]231
[8333]232        # If any catalog is involved it must be updated.
233        #
234        # XXX: The event is also triggered when creating objects as
235        # updateEntry is called also when creating entries resulting
236        # in objectAdded and additional objectModified events.
237        if len(changed):
238            notify(grok.ObjectModifiedEvent(obj))
[8332]239
[8220]240        return ', '.join(changed)
[4821]241
[4832]242    def createLogfile(self, path, fail_path, num, warnings, mode, user,
[4885]243                      timedelta, logger=None):
244        """Write to log file.
[4821]245        """
[4885]246        if logger is None:
247            return
[9739]248        logger.info(
249            "processed: %s, %s mode, %s lines (%s successful/ %s failed), "
250            "%0.3f s (%0.4f s/item)" % (
251            path, mode, num, num - warnings, warnings,
252            timedelta, timedelta/(num or 1)))
[4821]253        return
[4877]254
255    def writeFailedRow(self, writer, row, warnings):
256        """Write a row with error messages to error CSV.
257
258        If warnings is a list of strings, they will be concatenated.
259        """
260        error_col = warnings
261        if isinstance(warnings, list):
262            error_col = ' / '.join(warnings)
263        row['--ERRORS--'] = error_col
264        writer.writerow(row)
265        return
[6259]266
[8220]267    def checkConversion(self, row, mode='ignore', ignore_empty=True):
[6847]268        """Validates all values in row.
269        """
270        converter = IObjectConverter(self.iface)
271        errs, inv_errs, conv_dict =  converter.fromStringDict(
[8220]272            row, self.factory_name, mode=mode)
[6847]273        return errs, inv_errs, conv_dict
274
[4885]275    def doImport(self, path, headerfields, mode='create', user='Unknown',
[8220]276                 logger=None, ignore_empty=True):
[4811]277        """Perform actual import.
278        """
[4832]279        time_start = time.time()
[4806]280        self.checkHeaders(headerfields, mode)
[4832]281        mapping = self.getMapping(path, headerfields, mode)
[10027]282        reader = unicodecsv.DictReader(open(path, 'rb'))
[4889]283
[4900]284        temp_dir = tempfile.mkdtemp()
[6259]285
[6273]286        base = os.path.basename(path)
287        (base, ext) = os.path.splitext(base)
[4900]288        failed_path = os.path.join(temp_dir, "%s.pending%s" % (base, ext))
[6831]289        failed_headers = mapping.values()
[4877]290        failed_headers.append('--ERRORS--')
[10027]291        failed_writer = unicodecsv.DictWriter(open(failed_path, 'wb'),
292                                              failed_headers)
[8573]293        os.chmod(failed_path, 0664)
[6831]294        failed_writer.writerow(dict([(x,x) for x in failed_headers]))
[4891]295
[4900]296        finished_path = os.path.join(temp_dir, "%s.finished%s" % (base, ext))
[6831]297        finished_headers = mapping.values()
[10027]298        finished_writer = unicodecsv.DictWriter(open(finished_path, 'wb'),
299                                                finished_headers)
[8905]300        os.chmod(finished_path, 0664)
[4891]301        finished_writer.writerow(dict([(x,x) for x in finished_headers]))
[6259]302
[4806]303        num =0
[4878]304        num_warns = 0
[4806]305        site = grok.getSite()
[7859]306
[4806]307        for raw_row in reader:
308            num += 1
309            string_row = self.applyMapping(raw_row, mapping)
[8222]310            if ignore_empty and mode in ('update',):
[8220]311                # replace empty strings with ignore-markers
[8222]312                for key, val in string_row.items():
[8220]313                    if val == '':
[8222]314                        string_row[key] = IGNORE_MARKER
315            row = dict(string_row.items()) # create deep copy
[6847]316            errs, inv_errs, conv_dict = self.checkConversion(string_row, mode)
[6273]317            if errs or inv_errs:
[4878]318                num_warns += 1
[6273]319                conv_warnings = self.stringFromErrs(errs, inv_errs)
320                self.writeFailedRow(
[6824]321                    failed_writer, string_row, conv_warnings)
[4821]322                continue
[6273]323            row.update(conv_dict)
[6259]324
[4806]325            if mode == 'create':
326                if not self.parentsExist(row, site):
[4878]327                    num_warns += 1
[4877]328                    self.writeFailedRow(
[6824]329                        failed_writer, string_row,
[4877]330                        "Not all parents do exist yet. Skipping")
[4806]331                    continue
332                if self.entryExists(row, site):
[4878]333                    num_warns += 1
[4877]334                    self.writeFailedRow(
[6824]335                        failed_writer, string_row,
[8330]336                        "This object already exists. Skipping.")
[4806]337                    continue
338                obj = self.callFactory()
[7273]339                # Override all values in row, also
340                # student_ids and applicant_ids which have been
341                # generated in the respective __init__ methods before.
[9706]342                self.updateEntry(obj, row, site, base)
[6243]343                try:
344                    self.addEntry(obj, row, site)
[6273]345                except KeyError, error:
[6219]346                    num_warns += 1
347                    self.writeFailedRow(
[6824]348                        failed_writer, string_row,
[6273]349                        "%s Skipping." % error.message)
[8540]350                    continue
[8509]351                except DuplicationError, error:
352                    num_warns += 1
353                    self.writeFailedRow(
354                        failed_writer, string_row,
355                        "%s Skipping." % error.msg)
[6219]356                    continue
[4806]357            elif mode == 'remove':
358                if not self.entryExists(row, site):
[4878]359                    num_warns += 1
[4877]360                    self.writeFailedRow(
[6824]361                        failed_writer, string_row,
[9219]362                        "Cannot remove: no such entry")
[4806]363                    continue
364                self.delEntry(row, site)
365            elif mode == 'update':
366                obj = self.getEntry(row, site)
367                if obj is None:
[4878]368                    num_warns += 1
[4877]369                    self.writeFailedRow(
[6824]370                        failed_writer, string_row,
[9219]371                        "Cannot update: no such entry")
[4806]372                    continue
[7950]373                update_errors = self.checkUpdateRequirements(obj, row, site)
[7938]374                if update_errors is not None:
[7937]375                    num_warns += 1
376                    self.writeFailedRow(
377                        failed_writer, string_row, update_errors)
378                    continue
[11849]379                try:
380                    self.updateEntry(obj, row, site, base)
381                except ConstraintNotSatisfied, err:
382                    num_warns += 1
383                    self.writeFailedRow(
384                        failed_writer, string_row,
385                        "ConstraintNotSatisfied: %s" % err)
386                    continue
[4891]387            finished_writer.writerow(string_row)
[4821]388
[4832]389        time_end = time.time()
390        timedelta = time_end - time_start
[6259]391
[4878]392        self.createLogfile(path, failed_path, num, num_warns, mode, user,
[4885]393                           timedelta, logger=logger)
[4894]394        failed_path = os.path.abspath(failed_path)
[4878]395        if num_warns == 0:
[4821]396            del failed_writer
397            os.unlink(failed_path)
[4894]398            failed_path = None
399        return (num, num_warns,
400                os.path.abspath(finished_path), failed_path)
[7859]401
[9032]402    def get_csv_skeleton(self):
403        """Export CSV file only with a header of available fields.
404
405        A raw string with CSV data should be returned.
406        """
407        outfile = StringIO()
[10027]408        writer = unicodecsv.DictWriter(outfile, self.available_fields)
[9734]409        writer.writerow(
410            dict(zip(self.available_fields, self.available_fields))) # header
[9032]411        outfile.seek(0)
412        return outfile.read()
413
[7859]414class ExporterBase(object):
415    """A base for exporters.
416    """
417    grok.implements(ICSVExporter)
418
419    #: Fieldnames considered by this exporter
420    fields = ('code', 'title', 'title_prefix')
421
[7907]422    #: The title under which this exporter will be displayed
423    #: (if registered as a utility)
424    title = 'Override this title'
425
[7859]426    def mangle_value(self, value, name, context=None):
427        """Hook for mangling values in derived classes
428        """
429        if isinstance(value, bool):
430            value = value and '1' or '0'
431        elif isinstance(value, unicode):
432            # CSV writers like byte streams better than unicode
433            value = value.encode('utf-8')
[8380]434        elif isinstance(value, datetime.datetime):
[11737]435            #value = str(value)
436            value = str('%s#' % value) # changed 2014-07-06, see ticket #941
[8380]437        elif isinstance(value, datetime.date):
438            # Order is important here: check for date after datetime as
439            # datetimes are also dates.
440            #
441            # Append hash '#' to dates to circumvent unwanted excel automatic
442            value = str('%s#' % value)
[7859]443        elif value is None:
444            # None is not really representable in CSV files
445            value = ''
446        return value
447
448    def get_csv_writer(self, filepath=None):
449        """Get a CSV dict writer instance open for writing.
450
451        Returns a tuple (<writer>, <outfile>) where ``<writer>`` is a
452        :class:`csv.DictWriter` instance and outfile is the real file
453        which is written to. The latter is important when writing to
454        StringIO and can normally be ignored otherwise.
455
456        The returned file will already be filled with the header row.
457
458        Please note that if you give a filepath, the returned outfile
459        is open for writing only and you might have to close it before
460        reopening it for reading.
461        """
462        if filepath is None:
463            outfile = StringIO()
464        else:
465            outfile = open(filepath, 'wb')
[10027]466        writer = unicodecsv.DictWriter(outfile, self.fields)
[7859]467        writer.writerow(dict(zip(self.fields, self.fields))) # header
468        return writer, outfile
469
470    def write_item(self, obj, writer):
471        """Write a row extracted from `obj` into CSV file using `writer`.
472        """
473        row = {}
474        for name in self.fields:
475            value = getattr(obj, name, None)
476            value = self.mangle_value(value, name, obj)
477            row[name] = value
478        writer.writerow(row)
479        return
480
481    def close_outfile(self, filepath, outfile):
482        """Close outfile.
483
484        If filepath is None, the contents of outfile is returned.
485        """
486        outfile.seek(0)
487        if filepath is None:
488            return outfile.read()
489        outfile.close()
490        return
491
[9797]492    def get_filtered(self, site, **kw):
493        """Get datasets to export filtered by keyword arguments.
494
495        Returns an iterable.
496        """
497        raise NotImplementedError
498
[12516]499    def get_selected(self, site, selected):
500        """Get datasets to export for selected items
501        specified by a list of identifiers.
502
503        Returns an iterable.
504        """
505        raise NotImplementedError
506
[7859]507    def export(self, iterable, filepath=None):
508        """Export `iterable` as CSV file.
509
510        If `filepath` is ``None``, a raw string with CSV data should
511        be returned.
512        """
513        raise NotImplementedError
514
515    def export_all(self, site, filepath=None):
516        """Export all appropriate objects in `site` into `filepath` as
517        CSV data.
518
519        If `filepath` is ``None``, a raw string with CSV data should
520        be returned.
521        """
522        raise NotImplementedError
[9217]523
[9797]524    def export_filtered(self, site, filepath=None, **kw):
525        """Export items denoted by `args` and `kw`.
[9217]526
[9797]527        If `filepath` is ``None``, a raw string with CSV data should
528        be returned.
529        """
530        data = self.get_filtered(site, **kw)
531        return self.export(data, filepath=filepath)
532
[12516]533    def export_selected(self, site, filepath=None, **kw):
534        """Export those items specified by a list of identifiers
535        called `selected`.
536
537        If `filepath` is ``None``, a raw string with CSV data should
538        be returned.
539        """
540        selected = kw.get('selected', [])
541        data = self.get_selected(site, selected)
542        return self.export(data, filepath=filepath)
543
[9797]544def export_job(site, exporter_name, **kw):
[9217]545    """Export all entries delivered by exporter and store it in a temp file.
546
547    `site` gives the site to search. It will be passed to the exporter
548    and also be set as 'current site' as the function is used in
549    asynchronous jobs which run in their own threads and have no site
550    set initially. Therefore `site` must also be a valid value for use
551    with `zope.component.hooks.setSite()`.
552
553    `exporter_name` is the utility name under which the desired
554    exporter was registered with the ZCA.
555
556    The resulting CSV file will be stored in a new temporary directory
557    (using :func:`tempfile.mkdtemp`). It will be named after the
558    exporter used with `.csv` filename extension.
559
560    Returns the path to the created CSV file.
561
562    .. note:: It is the callers responsibility to clean up the used
563              file and its parent directory.
564    """
565    setSite(site)
566    exporter = getUtility(ICSVExporter, name=exporter_name)
567    output_dir = tempfile.mkdtemp()
568    filename = '%s.csv' % exporter_name
569    output_path = os.path.join(output_dir, filename)
[9797]570    if kw == {}:
571        exporter.export_all(site, filepath=output_path)
[12516]572    elif kw.has_key('selected'):
573        exporter.export_selected(site, filepath=output_path, **kw)
[9797]574    else:
575        exporter.export_filtered(site, filepath=output_path, **kw)
[9217]576    return output_path
577
578class AsyncExportJob(AsyncJob):
579    """An IJob that exports data to CSV files.
580
581    `AsyncExportJob` instances are regular `AsyncJob` instances with a
582    different constructor API. Instead of a callable to execute, you
583    must pass a `site` and some `exporter_name` to trigger an export.
584
585    The real work is done when an instance of this class is put into a
586    queue. See :mod:`waeup.kofa.async` to learn more about
587    asynchronous jobs.
588
589    The `exporter_name` must be the name under which an ICSVExporter
590    utility was registered with the ZCA.
591
592    The `site` must be a valid site  or ``None``.
593
594    The result of an `AsyncExportJob` is the path to generated CSV
595    file. The file will reside in a temporary directory that should be
596    removed after being used.
597    """
598    grok.implements(IExportJob)
599
[9718]600    def __init__(self, site, exporter_name, *args, **kwargs):
[9217]601        super(AsyncExportJob, self).__init__(
[9718]602            export_job, site, exporter_name, *args, **kwargs)
[9217]603
[9816]604    @property
605    def finished(self):
606        """A job is marked `finished` if it is completed.
607
608        Please note: a finished report job does not neccessarily
609        provide an IReport result. See meth:`failed`.
610        """
611        return self.status == zc.async.interfaces.COMPLETED
612
613    @property
614    def failed(self):
615        """A report job is marked failed iff it is finished and the
616        result is None.
617
618        While a job is unfinished, the `failed` status is ``None``.
619
620        Failed jobs normally provide a `traceback` to examine reasons.
621        """
622        if not self.finished:
623            return None
624        if getattr(self, 'result', None) is None:
625            return True
626        return False
627
[9217]628class ExportJobContainer(object):
629    """A mix-in that provides functionality for asynchronous export jobs.
630    """
631    grok.implements(IExportJobContainer)
632    running_exports = PersistentList()
633
[9718]634    def start_export_job(self, exporter_name, user_id, *args, **kwargs):
[9217]635        """Start asynchronous export job.
636
637        `exporter_name` is the name of an exporter utility to be used.
638
639        `user_id` is the ID of the user that triggers the export.
640
641        The job_id is stored along with exporter name and user id in a
642        persistent list.
643
[9718]644        The method supports additional positional and keyword
645        arguments, which are passed as-is to the respective
646        :class:`AsyncExportJob`.
647
[9217]648        Returns the job ID of the job started.
649        """
650        site = grok.getSite()
651        manager = getUtility(IJobManager)
[9718]652        job = AsyncExportJob(site, exporter_name, *args, **kwargs)
[9217]653        job_id = manager.put(job)
654        # Make sure that the persisted list is stored in ZODB
655        self.running_exports = PersistentList(self.running_exports)
656        self.running_exports.append((job_id, exporter_name, user_id))
657        return job_id
658
659    def get_running_export_jobs(self, user_id=None):
660        """Get export jobs for user with `user_id` as list of tuples.
661
662        Each tuples holds ``<job_id>, <exporter_name>, <user_id>`` in
663        that order. The ``<exporter_name>`` is the utility name of the
664        used exporter.
665
666        If `user_id` is ``None``, all running jobs are returned.
667        """
668        entries = []
669        to_delete = []
670        manager = getUtility(IJobManager)
671        for entry in self.running_exports:
672            if user_id is not None and entry[2] != user_id:
673                continue
674            if manager.get(entry[0]) is None:
675                to_delete.append(entry)
676                continue
677            entries.append(entry)
678        if to_delete:
679            self.running_exports = PersistentList(
680                [x for x in self.running_exports if x not in to_delete])
681        return entries
682
683    def get_export_jobs_status(self, user_id=None):
684        """Get running/completed export jobs for `user_id` as list of tuples.
685
686        Each tuple holds ``<raw status>, <status translated>,
687        <exporter title>`` in that order, where ``<status
688        translated>`` and ``<exporter title>`` are translated strings
689        representing the status of the job and the human readable
690        title of the exporter used.
691        """
692        entries = self.get_running_export_jobs(user_id)
693        result = []
694        manager = getUtility(IJobManager)
695        for entry in entries:
696            job = manager.get(entry[0])
697            if job is None:
698                continue
699            status, status_translated = JOB_STATUS_MAP[job.status]
700            exporter_name = getUtility(ICSVExporter, name=entry[1]).title
701            result.append((status, status_translated, exporter_name))
702        return result
703
704    def delete_export_entry(self, entry):
705        """Delete the export denoted by `entry`.
706
707        Removes given entry from the local `running_exports` list and also
708        removes the regarding job via the local job manager.
709
710        `entry` must be a tuple ``(<job id>, <exporter name>, <user
711        id>)`` as created by :meth:`start_export_job` or returned by
712        :meth:`get_running_export_jobs`.
713        """
714        manager = getUtility(IJobManager)
715        job = manager.get(entry[0])
716        if job is not None:
717            # remove created export file
718            if isinstance(job.result, basestring):
719                if os.path.exists(os.path.dirname(job.result)):
720                    shutil.rmtree(os.path.dirname(job.result))
721        manager.remove(entry[0], self)
722        new_entries = [x for x in self.running_exports
723                       if x != entry]
724        self.running_exports = PersistentList(new_entries)
725        return
726
727    def entry_from_job_id(self, job_id):
728        """Get entry tuple for `job_id`.
729
730        Returns ``None`` if no such entry can be found.
731        """
732        for entry in self.running_exports:
733            if entry[0] == job_id:
734                return entry
735        return None
[9726]736
737class VirtualExportJobContainer(ExportJobContainer):
738    """A virtual export job container.
739
740    Virtual ExportJobContainers can be used as a mixin just like real
741    ExportJobContainer.
742
743    They retrieve and store data in the site-wide ExportJobContainer.
744
745    Functionality is currently entirely as for regular
746    ExportJobContainers, except that data is stored elsewhere.
747
748    VirtualExportJobContainers need a registered
749    IExportContainerFinder utility to find a suitable container for
750    storing data.
751    """
752    grok.implements(IExportJobContainer)
753
754    @property
755    def _site_container(self):
756        return getUtility(IExportContainerFinder)()
757
758    # The following is a simple trick. While ExportJobContainers store
759    # only one attribute in ZODB, it is sufficient to replace this
760    # attribute `running_exports` with a suitable manager to make the
761    # whole virtual container work like the original but with the data
762    # stored in the site-wide exports container. This way, virtual
763    # export containers provide the whole functionality of a regular
764    # exports container but store no data at all with themselves.
765    @property
766    def running_exports(self):
767        """Exports stored in the site-wide exports container.
768        """
769        return self._site_container.running_exports
770
771    @running_exports.setter
772    def running_exports(self, value):
773        self._site_container.running_exports = value
774
775    @running_exports.deleter
776    def running_exports(self):
777        del self._site_container.running_exports
778
[9823]779    @property
780    def logger(self):
781        return self._site_container.logger
[9726]782
783@implementer(IExportContainerFinder)
784class ExportContainerFinder(grok.GlobalUtility):
785    """Finder for local (site-wide) export container.
786    """
787
788    def __call__(self):
789        """Get the local export container-
790
791        If no site can be determined or the site provides no export
792        container, None is returned.
793        """
794        site = grok.getSite()
795        if site is None:
796            return None
797        return site.get('datacenter', None)
Note: See TracBrowser for help on using the repository browser.