source: main/waeup.kofa/trunk/src/waeup/kofa/utils/batching.py @ 12200

Last change on this file since 12200 was 12191, checked in by Henrik Bettermann, 10 years ago

Do not import codes with spaces.

  • Property svn:keywords set to Id
File size: 27.9 KB
RevLine 
[7196]1## $Id: batching.py 12191 2014-12-10 14:07:27Z henrik $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
[7819]18"""Kofa components for batch processing.
[4806]19
20Batch processors eat CSV files to add, update or remove large numbers
21of certain kinds of objects at once.
22"""
23import grok
[8380]24import datetime
[4821]25import os
[9217]26import shutil
[4900]27import tempfile
[4821]28import time
[10027]29import unicodecsv
[9816]30import zc.async.interfaces
[7859]31from cStringIO import StringIO
[9217]32from persistent.list import PersistentList
33from zope.component import createObject, getUtility
34from zope.component.hooks import setSite
[9726]35from zope.interface import Interface, implementer
[4806]36from zope.schema import getFields
[11849]37from zope.schema.interfaces import ConstraintNotSatisfied
[8332]38from zope.event import notify
[9217]39from waeup.kofa.async import AsyncJob
[7811]40from waeup.kofa.interfaces import (
[9217]41    IBatchProcessor, FatalCSVError, IObjectConverter, IJobManager,
42    ICSVExporter, IGNORE_MARKER, DuplicationError, JOB_STATUS_MAP,
[9726]43    IExportJobContainer, IExportJob, IExportContainerFinder)
[4806]44
45class BatchProcessor(grok.GlobalUtility):
46    """A processor to add, update, or remove data.
47
48    This is a non-active baseclass.
49    """
[8220]50    grok.implements(IBatchProcessor)
[4806]51    grok.context(Interface)
52    grok.baseclass()
53
54    # Name used in pages and forms...
[7933]55    name = u'Non-registered base processor'
[6259]56
[4806]57    # Internal name...
[7933]58    util_name = 'baseprocessor'
[6259]59
[4806]60    # Items for this processor need an interface with zope.schema fields.
[5009]61    iface = Interface
[6259]62
[4806]63    # The name must be the same as the util_name attribute in order to
64    # register this utility correctly.
65    grok.name(util_name)
66
67    # Headers needed to locate items...
68    location_fields = ['code', 'faculty_code']
[6259]69
[4806]70    # A factory with this name must be registered...
71    factory_name = 'waeup.Department'
72
73    @property
74    def required_fields(self):
[4829]75        """Required fields that have no default.
76
77        A list of names of field, whose value cannot be set if not
78        given during creation. Therefore these fields must exist in
79        input.
80
81        Fields with a default != missing_value do not belong to this
82        category.
83        """
[4806]84        result = []
85        for key, field in getFields(self.iface).items():
86            if key in self.location_fields:
87                continue
[4829]88            if field.default is not field.missing_value:
89                continue
[4806]90            if field.required:
91                result.append(key)
92        return result
[6259]93
[4806]94    @property
95    def req(self):
96        result = dict(
97            create = self.location_fields + self.required_fields,
98            update = self.location_fields,
99            remove = self.location_fields,
100        )
101        return result
102
103    @property
104    def available_fields(self):
105        return sorted(list(set(
106                    self.location_fields + getFields(self.iface).keys())))
[6259]107
[4806]108    def getHeaders(self, mode='create'):
109        return self.available_fields
110
111    def checkHeaders(self, headerfields, mode='create'):
112        req = self.req[mode]
113        # Check for required fields...
114        for field in req:
115            if not field in headerfields:
116                raise FatalCSVError(
117                    "Need at least columns %s for import!" %
118                    ', '.join(["'%s'" % x for x in req]))
[6828]119        # Check for double fields. Cannot happen because this error is
120        # already catched in views
[4806]121        not_ignored_fields = [x for x in headerfields
122                              if not x.startswith('--')]
123        if len(set(not_ignored_fields)) < len(not_ignored_fields):
124            raise FatalCSVError(
125                "Double headers: each column name may only appear once.")
126        return True
127
128    def applyMapping(self, row, mapping):
[4811]129        """Apply mapping to a row of CSV data.
[6824]130
[4811]131        """
[4806]132        result = dict()
133        for key, replacement in mapping.items():
[6824]134            if replacement == u'--IGNORE--':
135                # Skip ignored columns in failed and finished data files.
136                continue
[4806]137            result[replacement] = row[key]
138        return result
[6259]139
[4832]140    def getMapping(self, path, headerfields, mode):
[6824]141        """Get a mapping from CSV file headerfields to actually used fieldnames.
142
[4811]143        """
[4832]144        result = dict()
[10027]145        reader = unicodecsv.reader(open(path, 'rb'))
[4806]146        raw_header = reader.next()
[4832]147        for num, field in enumerate(headerfields):
148            if field not in self.location_fields and mode == 'remove':
[6824]149                # Skip non-location fields when removing.
150                continue
151            if field == u'--IGNORE--':
152                # Skip ignored columns in failed and finished data files.
153                continue
[4832]154            result[raw_header[num]] = field
155        return result
[4806]156
[6273]157    def stringFromErrs(self, errors, inv_errors):
158        result = []
159        for err in errors:
160            fieldname, message = err
161            result.append("%s: %s" % (fieldname, message))
162        for err in inv_errors:
163            result.append("invariant: %s" % err)
164        return '; '.join(result)
165
[4806]166    def callFactory(self, *args, **kw):
167        return createObject(self.factory_name)
168
169    def parentsExist(self, row, site):
[4811]170        """Tell whether the parent object for data in ``row`` exists.
171        """
[4806]172        raise NotImplementedError('method not implemented')
173
174    def entryExists(self, row, site):
[4811]175        """Tell whether there already exists an entry for ``row`` data.
176        """
[4806]177        raise NotImplementedError('method not implemented')
178
179    def getParent(self, row, site):
[4811]180        """Get the parent object for the entry in ``row``.
181        """
[4806]182        raise NotImplementedError('method not implemented')
[6259]183
[5009]184    def getEntry(self, row, site):
185        """Get the parent object for the entry in ``row``.
186        """
187        raise NotImplementedError('method not implemented')
[6259]188
[4806]189    def addEntry(self, obj, row, site):
[4811]190        """Add the entry given given by ``row`` data.
191        """
[4806]192        raise NotImplementedError('method not implemented')
193
194    def delEntry(self, row, site):
[4811]195        """Delete entry given by ``row`` data.
196        """
[6259]197        raise NotImplementedError('method not implemented')
[4806]198
[7950]199    def checkUpdateRequirements(self, obj, row, site):
200        """Checks requirements the object must fulfill when being updated.
[7938]201
202        This method is not used in case of deleting or adding objects.
203
[7950]204        Returns error messages as strings in case of requirement
[7938]205        problems.
[7937]206        """
[7938]207        return None
[7937]208
[9706]209    def updateEntry(self, obj, row, site, filename):
[4984]210        """Update obj to the values given in row.
[8220]211
212        Returns a string describing the fields changed.
[4984]213        """
[8220]214        changed = []
[4829]215        for key, value in row.items():
[8220]216            # Skip fields to be ignored.
217            if value == IGNORE_MARKER:
218                continue
[8304]219            # Skip fields not declared in interface and which are
220            # not yet attributes of existing objects. We can thus not
221            # add non-existing attributes here.
[8220]222            if not hasattr(obj, key):
223                continue
[9265]224            try:
225                setattr(obj, key, value)
226            except AttributeError:
227                # Computed attributes can't be set.
228                continue
[8222]229            log_value = getattr(value, 'code', value)
230            changed.append('%s=%s' % (key, log_value))
[8332]231
[8333]232        # If any catalog is involved it must be updated.
233        #
234        # XXX: The event is also triggered when creating objects as
235        # updateEntry is called also when creating entries resulting
236        # in objectAdded and additional objectModified events.
237        if len(changed):
238            notify(grok.ObjectModifiedEvent(obj))
[8332]239
[8220]240        return ', '.join(changed)
[4821]241
[4832]242    def createLogfile(self, path, fail_path, num, warnings, mode, user,
[4885]243                      timedelta, logger=None):
244        """Write to log file.
[4821]245        """
[4885]246        if logger is None:
247            return
[9739]248        logger.info(
249            "processed: %s, %s mode, %s lines (%s successful/ %s failed), "
250            "%0.3f s (%0.4f s/item)" % (
251            path, mode, num, num - warnings, warnings,
252            timedelta, timedelta/(num or 1)))
[4821]253        return
[4877]254
255    def writeFailedRow(self, writer, row, warnings):
256        """Write a row with error messages to error CSV.
257
258        If warnings is a list of strings, they will be concatenated.
259        """
260        error_col = warnings
261        if isinstance(warnings, list):
262            error_col = ' / '.join(warnings)
263        row['--ERRORS--'] = error_col
264        writer.writerow(row)
265        return
[6259]266
[8220]267    def checkConversion(self, row, mode='ignore', ignore_empty=True):
[6847]268        """Validates all values in row.
269        """
270        converter = IObjectConverter(self.iface)
271        errs, inv_errs, conv_dict =  converter.fromStringDict(
[8220]272            row, self.factory_name, mode=mode)
[12191]273        if 'code' in row and ' ' in row['code']:
274            errs.append(('code','must not contain spaces'))
[6847]275        return errs, inv_errs, conv_dict
276
[4885]277    def doImport(self, path, headerfields, mode='create', user='Unknown',
[8220]278                 logger=None, ignore_empty=True):
[4811]279        """Perform actual import.
280        """
[4832]281        time_start = time.time()
[4806]282        self.checkHeaders(headerfields, mode)
[4832]283        mapping = self.getMapping(path, headerfields, mode)
[10027]284        reader = unicodecsv.DictReader(open(path, 'rb'))
[4889]285
[4900]286        temp_dir = tempfile.mkdtemp()
[6259]287
[6273]288        base = os.path.basename(path)
289        (base, ext) = os.path.splitext(base)
[4900]290        failed_path = os.path.join(temp_dir, "%s.pending%s" % (base, ext))
[6831]291        failed_headers = mapping.values()
[4877]292        failed_headers.append('--ERRORS--')
[10027]293        failed_writer = unicodecsv.DictWriter(open(failed_path, 'wb'),
294                                              failed_headers)
[8573]295        os.chmod(failed_path, 0664)
[6831]296        failed_writer.writerow(dict([(x,x) for x in failed_headers]))
[4891]297
[4900]298        finished_path = os.path.join(temp_dir, "%s.finished%s" % (base, ext))
[6831]299        finished_headers = mapping.values()
[10027]300        finished_writer = unicodecsv.DictWriter(open(finished_path, 'wb'),
301                                                finished_headers)
[8905]302        os.chmod(finished_path, 0664)
[4891]303        finished_writer.writerow(dict([(x,x) for x in finished_headers]))
[6259]304
[4806]305        num =0
[4878]306        num_warns = 0
[4806]307        site = grok.getSite()
[7859]308
[4806]309        for raw_row in reader:
310            num += 1
311            string_row = self.applyMapping(raw_row, mapping)
[8222]312            if ignore_empty and mode in ('update',):
[8220]313                # replace empty strings with ignore-markers
[8222]314                for key, val in string_row.items():
[8220]315                    if val == '':
[8222]316                        string_row[key] = IGNORE_MARKER
317            row = dict(string_row.items()) # create deep copy
[6847]318            errs, inv_errs, conv_dict = self.checkConversion(string_row, mode)
[6273]319            if errs or inv_errs:
[4878]320                num_warns += 1
[6273]321                conv_warnings = self.stringFromErrs(errs, inv_errs)
322                self.writeFailedRow(
[6824]323                    failed_writer, string_row, conv_warnings)
[4821]324                continue
[6273]325            row.update(conv_dict)
[6259]326
[4806]327            if mode == 'create':
328                if not self.parentsExist(row, site):
[4878]329                    num_warns += 1
[4877]330                    self.writeFailedRow(
[6824]331                        failed_writer, string_row,
[4877]332                        "Not all parents do exist yet. Skipping")
[4806]333                    continue
334                if self.entryExists(row, site):
[4878]335                    num_warns += 1
[4877]336                    self.writeFailedRow(
[6824]337                        failed_writer, string_row,
[8330]338                        "This object already exists. Skipping.")
[4806]339                    continue
340                obj = self.callFactory()
[7273]341                # Override all values in row, also
342                # student_ids and applicant_ids which have been
343                # generated in the respective __init__ methods before.
[9706]344                self.updateEntry(obj, row, site, base)
[6243]345                try:
346                    self.addEntry(obj, row, site)
[6273]347                except KeyError, error:
[6219]348                    num_warns += 1
349                    self.writeFailedRow(
[6824]350                        failed_writer, string_row,
[6273]351                        "%s Skipping." % error.message)
[8540]352                    continue
[8509]353                except DuplicationError, error:
354                    num_warns += 1
355                    self.writeFailedRow(
356                        failed_writer, string_row,
357                        "%s Skipping." % error.msg)
[6219]358                    continue
[4806]359            elif mode == 'remove':
360                if not self.entryExists(row, site):
[4878]361                    num_warns += 1
[4877]362                    self.writeFailedRow(
[6824]363                        failed_writer, string_row,
[9219]364                        "Cannot remove: no such entry")
[4806]365                    continue
366                self.delEntry(row, site)
367            elif mode == 'update':
368                obj = self.getEntry(row, site)
369                if obj is None:
[4878]370                    num_warns += 1
[4877]371                    self.writeFailedRow(
[6824]372                        failed_writer, string_row,
[9219]373                        "Cannot update: no such entry")
[4806]374                    continue
[7950]375                update_errors = self.checkUpdateRequirements(obj, row, site)
[7938]376                if update_errors is not None:
[7937]377                    num_warns += 1
378                    self.writeFailedRow(
379                        failed_writer, string_row, update_errors)
380                    continue
[11849]381                try:
382                    self.updateEntry(obj, row, site, base)
383                except ConstraintNotSatisfied, err:
384                    num_warns += 1
385                    self.writeFailedRow(
386                        failed_writer, string_row,
387                        "ConstraintNotSatisfied: %s" % err)
388                    continue
[4891]389            finished_writer.writerow(string_row)
[4821]390
[4832]391        time_end = time.time()
392        timedelta = time_end - time_start
[6259]393
[4878]394        self.createLogfile(path, failed_path, num, num_warns, mode, user,
[4885]395                           timedelta, logger=logger)
[4894]396        failed_path = os.path.abspath(failed_path)
[4878]397        if num_warns == 0:
[4821]398            del failed_writer
399            os.unlink(failed_path)
[4894]400            failed_path = None
401        return (num, num_warns,
402                os.path.abspath(finished_path), failed_path)
[7859]403
[9032]404    def get_csv_skeleton(self):
405        """Export CSV file only with a header of available fields.
406
407        A raw string with CSV data should be returned.
408        """
409        outfile = StringIO()
[10027]410        writer = unicodecsv.DictWriter(outfile, self.available_fields)
[9734]411        writer.writerow(
412            dict(zip(self.available_fields, self.available_fields))) # header
[9032]413        outfile.seek(0)
414        return outfile.read()
415
[7859]416class ExporterBase(object):
417    """A base for exporters.
418    """
419    grok.implements(ICSVExporter)
420
421    #: Fieldnames considered by this exporter
422    fields = ('code', 'title', 'title_prefix')
423
[7907]424    #: The title under which this exporter will be displayed
425    #: (if registered as a utility)
426    title = 'Override this title'
427
[7859]428    def mangle_value(self, value, name, context=None):
429        """Hook for mangling values in derived classes
430        """
431        if isinstance(value, bool):
432            value = value and '1' or '0'
433        elif isinstance(value, unicode):
434            # CSV writers like byte streams better than unicode
435            value = value.encode('utf-8')
[8380]436        elif isinstance(value, datetime.datetime):
[11737]437            #value = str(value)
438            value = str('%s#' % value) # changed 2014-07-06, see ticket #941
[8380]439        elif isinstance(value, datetime.date):
440            # Order is important here: check for date after datetime as
441            # datetimes are also dates.
442            #
443            # Append hash '#' to dates to circumvent unwanted excel automatic
444            value = str('%s#' % value)
[7859]445        elif value is None:
446            # None is not really representable in CSV files
447            value = ''
448        return value
449
450    def get_csv_writer(self, filepath=None):
451        """Get a CSV dict writer instance open for writing.
452
453        Returns a tuple (<writer>, <outfile>) where ``<writer>`` is a
454        :class:`csv.DictWriter` instance and outfile is the real file
455        which is written to. The latter is important when writing to
456        StringIO and can normally be ignored otherwise.
457
458        The returned file will already be filled with the header row.
459
460        Please note that if you give a filepath, the returned outfile
461        is open for writing only and you might have to close it before
462        reopening it for reading.
463        """
464        if filepath is None:
465            outfile = StringIO()
466        else:
467            outfile = open(filepath, 'wb')
[10027]468        writer = unicodecsv.DictWriter(outfile, self.fields)
[7859]469        writer.writerow(dict(zip(self.fields, self.fields))) # header
470        return writer, outfile
471
472    def write_item(self, obj, writer):
473        """Write a row extracted from `obj` into CSV file using `writer`.
474        """
475        row = {}
476        for name in self.fields:
477            value = getattr(obj, name, None)
478            value = self.mangle_value(value, name, obj)
479            row[name] = value
480        writer.writerow(row)
481        return
482
483    def close_outfile(self, filepath, outfile):
484        """Close outfile.
485
486        If filepath is None, the contents of outfile is returned.
487        """
488        outfile.seek(0)
489        if filepath is None:
490            return outfile.read()
491        outfile.close()
492        return
493
[9797]494    def get_filtered(self, site, **kw):
495        """Get datasets to export filtered by keyword arguments.
496
497        Returns an iterable.
498        """
499        raise NotImplementedError
500
[7859]501    def export(self, iterable, filepath=None):
502        """Export `iterable` as CSV file.
503
504        If `filepath` is ``None``, a raw string with CSV data should
505        be returned.
506        """
507        raise NotImplementedError
508
509    def export_all(self, site, filepath=None):
510        """Export all appropriate objects in `site` into `filepath` as
511        CSV data.
512
513        If `filepath` is ``None``, a raw string with CSV data should
514        be returned.
515        """
516        raise NotImplementedError
[9217]517
[9797]518    def export_filtered(self, site, filepath=None, **kw):
519        """Export items denoted by `args` and `kw`.
[9217]520
[9797]521        If `filepath` is ``None``, a raw string with CSV data should
522        be returned.
523        """
524        data = self.get_filtered(site, **kw)
525        return self.export(data, filepath=filepath)
526
527def export_job(site, exporter_name, **kw):
[9217]528    """Export all entries delivered by exporter and store it in a temp file.
529
530    `site` gives the site to search. It will be passed to the exporter
531    and also be set as 'current site' as the function is used in
532    asynchronous jobs which run in their own threads and have no site
533    set initially. Therefore `site` must also be a valid value for use
534    with `zope.component.hooks.setSite()`.
535
536    `exporter_name` is the utility name under which the desired
537    exporter was registered with the ZCA.
538
539    The resulting CSV file will be stored in a new temporary directory
540    (using :func:`tempfile.mkdtemp`). It will be named after the
541    exporter used with `.csv` filename extension.
542
543    Returns the path to the created CSV file.
544
545    .. note:: It is the callers responsibility to clean up the used
546              file and its parent directory.
547    """
548    setSite(site)
549    exporter = getUtility(ICSVExporter, name=exporter_name)
550    output_dir = tempfile.mkdtemp()
551    filename = '%s.csv' % exporter_name
552    output_path = os.path.join(output_dir, filename)
[9797]553    if kw == {}:
554        exporter.export_all(site, filepath=output_path)
555    else:
556        exporter.export_filtered(site, filepath=output_path, **kw)
[9217]557    return output_path
558
559class AsyncExportJob(AsyncJob):
560    """An IJob that exports data to CSV files.
561
562    `AsyncExportJob` instances are regular `AsyncJob` instances with a
563    different constructor API. Instead of a callable to execute, you
564    must pass a `site` and some `exporter_name` to trigger an export.
565
566    The real work is done when an instance of this class is put into a
567    queue. See :mod:`waeup.kofa.async` to learn more about
568    asynchronous jobs.
569
570    The `exporter_name` must be the name under which an ICSVExporter
571    utility was registered with the ZCA.
572
573    The `site` must be a valid site  or ``None``.
574
575    The result of an `AsyncExportJob` is the path to generated CSV
576    file. The file will reside in a temporary directory that should be
577    removed after being used.
578    """
579    grok.implements(IExportJob)
580
[9718]581    def __init__(self, site, exporter_name, *args, **kwargs):
[9217]582        super(AsyncExportJob, self).__init__(
[9718]583            export_job, site, exporter_name, *args, **kwargs)
[9217]584
[9816]585    @property
586    def finished(self):
587        """A job is marked `finished` if it is completed.
588
589        Please note: a finished report job does not neccessarily
590        provide an IReport result. See meth:`failed`.
591        """
592        return self.status == zc.async.interfaces.COMPLETED
593
594    @property
595    def failed(self):
596        """A report job is marked failed iff it is finished and the
597        result is None.
598
599        While a job is unfinished, the `failed` status is ``None``.
600
601        Failed jobs normally provide a `traceback` to examine reasons.
602        """
603        if not self.finished:
604            return None
605        if getattr(self, 'result', None) is None:
606            return True
607        return False
608
[9217]609class ExportJobContainer(object):
610    """A mix-in that provides functionality for asynchronous export jobs.
611    """
612    grok.implements(IExportJobContainer)
613    running_exports = PersistentList()
614
[9718]615    def start_export_job(self, exporter_name, user_id, *args, **kwargs):
[9217]616        """Start asynchronous export job.
617
618        `exporter_name` is the name of an exporter utility to be used.
619
620        `user_id` is the ID of the user that triggers the export.
621
622        The job_id is stored along with exporter name and user id in a
623        persistent list.
624
[9718]625        The method supports additional positional and keyword
626        arguments, which are passed as-is to the respective
627        :class:`AsyncExportJob`.
628
[9217]629        Returns the job ID of the job started.
630        """
631        site = grok.getSite()
632        manager = getUtility(IJobManager)
[9718]633        job = AsyncExportJob(site, exporter_name, *args, **kwargs)
[9217]634        job_id = manager.put(job)
635        # Make sure that the persisted list is stored in ZODB
636        self.running_exports = PersistentList(self.running_exports)
637        self.running_exports.append((job_id, exporter_name, user_id))
638        return job_id
639
640    def get_running_export_jobs(self, user_id=None):
641        """Get export jobs for user with `user_id` as list of tuples.
642
643        Each tuples holds ``<job_id>, <exporter_name>, <user_id>`` in
644        that order. The ``<exporter_name>`` is the utility name of the
645        used exporter.
646
647        If `user_id` is ``None``, all running jobs are returned.
648        """
649        entries = []
650        to_delete = []
651        manager = getUtility(IJobManager)
652        for entry in self.running_exports:
653            if user_id is not None and entry[2] != user_id:
654                continue
655            if manager.get(entry[0]) is None:
656                to_delete.append(entry)
657                continue
658            entries.append(entry)
659        if to_delete:
660            self.running_exports = PersistentList(
661                [x for x in self.running_exports if x not in to_delete])
662        return entries
663
664    def get_export_jobs_status(self, user_id=None):
665        """Get running/completed export jobs for `user_id` as list of tuples.
666
667        Each tuple holds ``<raw status>, <status translated>,
668        <exporter title>`` in that order, where ``<status
669        translated>`` and ``<exporter title>`` are translated strings
670        representing the status of the job and the human readable
671        title of the exporter used.
672        """
673        entries = self.get_running_export_jobs(user_id)
674        result = []
675        manager = getUtility(IJobManager)
676        for entry in entries:
677            job = manager.get(entry[0])
678            if job is None:
679                continue
680            status, status_translated = JOB_STATUS_MAP[job.status]
681            exporter_name = getUtility(ICSVExporter, name=entry[1]).title
682            result.append((status, status_translated, exporter_name))
683        return result
684
685    def delete_export_entry(self, entry):
686        """Delete the export denoted by `entry`.
687
688        Removes given entry from the local `running_exports` list and also
689        removes the regarding job via the local job manager.
690
691        `entry` must be a tuple ``(<job id>, <exporter name>, <user
692        id>)`` as created by :meth:`start_export_job` or returned by
693        :meth:`get_running_export_jobs`.
694        """
695        manager = getUtility(IJobManager)
696        job = manager.get(entry[0])
697        if job is not None:
698            # remove created export file
699            if isinstance(job.result, basestring):
700                if os.path.exists(os.path.dirname(job.result)):
701                    shutil.rmtree(os.path.dirname(job.result))
702        manager.remove(entry[0], self)
703        new_entries = [x for x in self.running_exports
704                       if x != entry]
705        self.running_exports = PersistentList(new_entries)
706        return
707
708    def entry_from_job_id(self, job_id):
709        """Get entry tuple for `job_id`.
710
711        Returns ``None`` if no such entry can be found.
712        """
713        for entry in self.running_exports:
714            if entry[0] == job_id:
715                return entry
716        return None
[9726]717
718class VirtualExportJobContainer(ExportJobContainer):
719    """A virtual export job container.
720
721    Virtual ExportJobContainers can be used as a mixin just like real
722    ExportJobContainer.
723
724    They retrieve and store data in the site-wide ExportJobContainer.
725
726    Functionality is currently entirely as for regular
727    ExportJobContainers, except that data is stored elsewhere.
728
729    VirtualExportJobContainers need a registered
730    IExportContainerFinder utility to find a suitable container for
731    storing data.
732    """
733    grok.implements(IExportJobContainer)
734
735    @property
736    def _site_container(self):
737        return getUtility(IExportContainerFinder)()
738
739    # The following is a simple trick. While ExportJobContainers store
740    # only one attribute in ZODB, it is sufficient to replace this
741    # attribute `running_exports` with a suitable manager to make the
742    # whole virtual container work like the original but with the data
743    # stored in the site-wide exports container. This way, virtual
744    # export containers provide the whole functionality of a regular
745    # exports container but store no data at all with themselves.
746    @property
747    def running_exports(self):
748        """Exports stored in the site-wide exports container.
749        """
750        return self._site_container.running_exports
751
752    @running_exports.setter
753    def running_exports(self, value):
754        self._site_container.running_exports = value
755
756    @running_exports.deleter
757    def running_exports(self):
758        del self._site_container.running_exports
759
[9823]760    @property
761    def logger(self):
762        return self._site_container.logger
[9726]763
764@implementer(IExportContainerFinder)
765class ExportContainerFinder(grok.GlobalUtility):
766    """Finder for local (site-wide) export container.
767    """
768
769    def __call__(self):
770        """Get the local export container-
771
772        If no site can be determined or the site provides no export
773        container, None is returned.
774        """
775        site = grok.getSite()
776        if site is None:
777            return None
778        return site.get('datacenter', None)
Note: See TracBrowser for help on using the repository browser.