source: main/waeup.kofa/trunk/src/waeup/kofa/utils/batching.py @ 11820

Last change on this file since 11820 was 11737, checked in by Henrik Bettermann, 10 years ago

Append hash '#' also to datetimes to circumvent unwanted excel automatic.

  • Property svn:keywords set to Id
File size: 27.4 KB
RevLine 
[7196]1## $Id: batching.py 11737 2014-07-06 16:15:52Z henrik $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
[7819]18"""Kofa components for batch processing.
[4806]19
20Batch processors eat CSV files to add, update or remove large numbers
21of certain kinds of objects at once.
22"""
23import grok
[8380]24import datetime
[4821]25import os
[9217]26import shutil
[4900]27import tempfile
[4821]28import time
[10027]29import unicodecsv
[9816]30import zc.async.interfaces
[7859]31from cStringIO import StringIO
[9217]32from persistent.list import PersistentList
33from zope.component import createObject, getUtility
34from zope.component.hooks import setSite
[9726]35from zope.interface import Interface, implementer
[4806]36from zope.schema import getFields
[8332]37from zope.event import notify
[9217]38from waeup.kofa.async import AsyncJob
[7811]39from waeup.kofa.interfaces import (
[9217]40    IBatchProcessor, FatalCSVError, IObjectConverter, IJobManager,
41    ICSVExporter, IGNORE_MARKER, DuplicationError, JOB_STATUS_MAP,
[9726]42    IExportJobContainer, IExportJob, IExportContainerFinder)
[4806]43
44class BatchProcessor(grok.GlobalUtility):
45    """A processor to add, update, or remove data.
46
47    This is a non-active baseclass.
48    """
[8220]49    grok.implements(IBatchProcessor)
[4806]50    grok.context(Interface)
51    grok.baseclass()
52
53    # Name used in pages and forms...
[7933]54    name = u'Non-registered base processor'
[6259]55
[4806]56    # Internal name...
[7933]57    util_name = 'baseprocessor'
[6259]58
[4806]59    # Items for this processor need an interface with zope.schema fields.
[5009]60    iface = Interface
[6259]61
[4806]62    # The name must be the same as the util_name attribute in order to
63    # register this utility correctly.
64    grok.name(util_name)
65
66    # Headers needed to locate items...
67    location_fields = ['code', 'faculty_code']
[6259]68
[4806]69    # A factory with this name must be registered...
70    factory_name = 'waeup.Department'
71
72    @property
73    def required_fields(self):
[4829]74        """Required fields that have no default.
75
76        A list of names of field, whose value cannot be set if not
77        given during creation. Therefore these fields must exist in
78        input.
79
80        Fields with a default != missing_value do not belong to this
81        category.
82        """
[4806]83        result = []
84        for key, field in getFields(self.iface).items():
85            if key in self.location_fields:
86                continue
[4829]87            if field.default is not field.missing_value:
88                continue
[4806]89            if field.required:
90                result.append(key)
91        return result
[6259]92
[4806]93    @property
94    def req(self):
95        result = dict(
96            create = self.location_fields + self.required_fields,
97            update = self.location_fields,
98            remove = self.location_fields,
99        )
100        return result
101
102    @property
103    def available_fields(self):
104        return sorted(list(set(
105                    self.location_fields + getFields(self.iface).keys())))
[6259]106
[4806]107    def getHeaders(self, mode='create'):
108        return self.available_fields
109
110    def checkHeaders(self, headerfields, mode='create'):
111        req = self.req[mode]
112        # Check for required fields...
113        for field in req:
114            if not field in headerfields:
115                raise FatalCSVError(
116                    "Need at least columns %s for import!" %
117                    ', '.join(["'%s'" % x for x in req]))
[6828]118        # Check for double fields. Cannot happen because this error is
119        # already catched in views
[4806]120        not_ignored_fields = [x for x in headerfields
121                              if not x.startswith('--')]
122        if len(set(not_ignored_fields)) < len(not_ignored_fields):
123            raise FatalCSVError(
124                "Double headers: each column name may only appear once.")
125        return True
126
127    def applyMapping(self, row, mapping):
[4811]128        """Apply mapping to a row of CSV data.
[6824]129
[4811]130        """
[4806]131        result = dict()
132        for key, replacement in mapping.items():
[6824]133            if replacement == u'--IGNORE--':
134                # Skip ignored columns in failed and finished data files.
135                continue
[4806]136            result[replacement] = row[key]
137        return result
[6259]138
[4832]139    def getMapping(self, path, headerfields, mode):
[6824]140        """Get a mapping from CSV file headerfields to actually used fieldnames.
141
[4811]142        """
[4832]143        result = dict()
[10027]144        reader = unicodecsv.reader(open(path, 'rb'))
[4806]145        raw_header = reader.next()
[4832]146        for num, field in enumerate(headerfields):
147            if field not in self.location_fields and mode == 'remove':
[6824]148                # Skip non-location fields when removing.
149                continue
150            if field == u'--IGNORE--':
151                # Skip ignored columns in failed and finished data files.
152                continue
[4832]153            result[raw_header[num]] = field
154        return result
[4806]155
[6273]156    def stringFromErrs(self, errors, inv_errors):
157        result = []
158        for err in errors:
159            fieldname, message = err
160            result.append("%s: %s" % (fieldname, message))
161        for err in inv_errors:
162            result.append("invariant: %s" % err)
163        return '; '.join(result)
164
[4806]165    def callFactory(self, *args, **kw):
166        return createObject(self.factory_name)
167
168    def parentsExist(self, row, site):
[4811]169        """Tell whether the parent object for data in ``row`` exists.
170        """
[4806]171        raise NotImplementedError('method not implemented')
172
173    def entryExists(self, row, site):
[4811]174        """Tell whether there already exists an entry for ``row`` data.
175        """
[4806]176        raise NotImplementedError('method not implemented')
177
178    def getParent(self, row, site):
[4811]179        """Get the parent object for the entry in ``row``.
180        """
[4806]181        raise NotImplementedError('method not implemented')
[6259]182
[5009]183    def getEntry(self, row, site):
184        """Get the parent object for the entry in ``row``.
185        """
186        raise NotImplementedError('method not implemented')
[6259]187
[4806]188    def addEntry(self, obj, row, site):
[4811]189        """Add the entry given given by ``row`` data.
190        """
[4806]191        raise NotImplementedError('method not implemented')
192
193    def delEntry(self, row, site):
[4811]194        """Delete entry given by ``row`` data.
195        """
[6259]196        raise NotImplementedError('method not implemented')
[4806]197
[7950]198    def checkUpdateRequirements(self, obj, row, site):
199        """Checks requirements the object must fulfill when being updated.
[7938]200
201        This method is not used in case of deleting or adding objects.
202
[7950]203        Returns error messages as strings in case of requirement
[7938]204        problems.
[7937]205        """
[7938]206        return None
[7937]207
[9706]208    def updateEntry(self, obj, row, site, filename):
[4984]209        """Update obj to the values given in row.
[8220]210
211        Returns a string describing the fields changed.
[4984]212        """
[8220]213        changed = []
[4829]214        for key, value in row.items():
[8220]215            # Skip fields to be ignored.
216            if value == IGNORE_MARKER:
217                continue
[8304]218            # Skip fields not declared in interface and which are
219            # not yet attributes of existing objects. We can thus not
220            # add non-existing attributes here.
[8220]221            if not hasattr(obj, key):
222                continue
[9265]223            try:
224                setattr(obj, key, value)
225            except AttributeError:
226                # Computed attributes can't be set.
227                continue
[8222]228            log_value = getattr(value, 'code', value)
229            changed.append('%s=%s' % (key, log_value))
[8332]230
[8333]231        # If any catalog is involved it must be updated.
232        #
233        # XXX: The event is also triggered when creating objects as
234        # updateEntry is called also when creating entries resulting
235        # in objectAdded and additional objectModified events.
236        if len(changed):
237            notify(grok.ObjectModifiedEvent(obj))
[8332]238
[8220]239        return ', '.join(changed)
[4821]240
[4832]241    def createLogfile(self, path, fail_path, num, warnings, mode, user,
[4885]242                      timedelta, logger=None):
243        """Write to log file.
[4821]244        """
[4885]245        if logger is None:
246            return
[9739]247        logger.info(
248            "processed: %s, %s mode, %s lines (%s successful/ %s failed), "
249            "%0.3f s (%0.4f s/item)" % (
250            path, mode, num, num - warnings, warnings,
251            timedelta, timedelta/(num or 1)))
[4821]252        return
[4877]253
254    def writeFailedRow(self, writer, row, warnings):
255        """Write a row with error messages to error CSV.
256
257        If warnings is a list of strings, they will be concatenated.
258        """
259        error_col = warnings
260        if isinstance(warnings, list):
261            error_col = ' / '.join(warnings)
262        row['--ERRORS--'] = error_col
263        writer.writerow(row)
264        return
[6259]265
[8220]266    def checkConversion(self, row, mode='ignore', ignore_empty=True):
[6847]267        """Validates all values in row.
268        """
269        converter = IObjectConverter(self.iface)
270        errs, inv_errs, conv_dict =  converter.fromStringDict(
[8220]271            row, self.factory_name, mode=mode)
[6847]272        return errs, inv_errs, conv_dict
273
[4885]274    def doImport(self, path, headerfields, mode='create', user='Unknown',
[8220]275                 logger=None, ignore_empty=True):
[4811]276        """Perform actual import.
277        """
[4832]278        time_start = time.time()
[4806]279        self.checkHeaders(headerfields, mode)
[4832]280        mapping = self.getMapping(path, headerfields, mode)
[10027]281        reader = unicodecsv.DictReader(open(path, 'rb'))
[4889]282
[4900]283        temp_dir = tempfile.mkdtemp()
[6259]284
[6273]285        base = os.path.basename(path)
286        (base, ext) = os.path.splitext(base)
[4900]287        failed_path = os.path.join(temp_dir, "%s.pending%s" % (base, ext))
[6831]288        failed_headers = mapping.values()
[4877]289        failed_headers.append('--ERRORS--')
[10027]290        failed_writer = unicodecsv.DictWriter(open(failed_path, 'wb'),
291                                              failed_headers)
[8573]292        os.chmod(failed_path, 0664)
[6831]293        failed_writer.writerow(dict([(x,x) for x in failed_headers]))
[4891]294
[4900]295        finished_path = os.path.join(temp_dir, "%s.finished%s" % (base, ext))
[6831]296        finished_headers = mapping.values()
[10027]297        finished_writer = unicodecsv.DictWriter(open(finished_path, 'wb'),
298                                                finished_headers)
[8905]299        os.chmod(finished_path, 0664)
[4891]300        finished_writer.writerow(dict([(x,x) for x in finished_headers]))
[6259]301
[4806]302        num =0
[4878]303        num_warns = 0
[4806]304        site = grok.getSite()
[7859]305
[4806]306        for raw_row in reader:
307            num += 1
308            string_row = self.applyMapping(raw_row, mapping)
[8222]309            if ignore_empty and mode in ('update',):
[8220]310                # replace empty strings with ignore-markers
[8222]311                for key, val in string_row.items():
[8220]312                    if val == '':
[8222]313                        string_row[key] = IGNORE_MARKER
314            row = dict(string_row.items()) # create deep copy
[6847]315            errs, inv_errs, conv_dict = self.checkConversion(string_row, mode)
[6273]316            if errs or inv_errs:
[4878]317                num_warns += 1
[6273]318                conv_warnings = self.stringFromErrs(errs, inv_errs)
319                self.writeFailedRow(
[6824]320                    failed_writer, string_row, conv_warnings)
[4821]321                continue
[6273]322            row.update(conv_dict)
[6259]323
[4806]324            if mode == 'create':
325                if not self.parentsExist(row, site):
[4878]326                    num_warns += 1
[4877]327                    self.writeFailedRow(
[6824]328                        failed_writer, string_row,
[4877]329                        "Not all parents do exist yet. Skipping")
[4806]330                    continue
331                if self.entryExists(row, site):
[4878]332                    num_warns += 1
[4877]333                    self.writeFailedRow(
[6824]334                        failed_writer, string_row,
[8330]335                        "This object already exists. Skipping.")
[4806]336                    continue
337                obj = self.callFactory()
[7273]338                # Override all values in row, also
339                # student_ids and applicant_ids which have been
340                # generated in the respective __init__ methods before.
[9706]341                self.updateEntry(obj, row, site, base)
[6243]342                try:
343                    self.addEntry(obj, row, site)
[6273]344                except KeyError, error:
[6219]345                    num_warns += 1
346                    self.writeFailedRow(
[6824]347                        failed_writer, string_row,
[6273]348                        "%s Skipping." % error.message)
[8540]349                    continue
[8509]350                except DuplicationError, error:
351                    num_warns += 1
352                    self.writeFailedRow(
353                        failed_writer, string_row,
354                        "%s Skipping." % error.msg)
[6219]355                    continue
[4806]356            elif mode == 'remove':
357                if not self.entryExists(row, site):
[4878]358                    num_warns += 1
[4877]359                    self.writeFailedRow(
[6824]360                        failed_writer, string_row,
[9219]361                        "Cannot remove: no such entry")
[4806]362                    continue
363                self.delEntry(row, site)
364            elif mode == 'update':
365                obj = self.getEntry(row, site)
366                if obj is None:
[4878]367                    num_warns += 1
[4877]368                    self.writeFailedRow(
[6824]369                        failed_writer, string_row,
[9219]370                        "Cannot update: no such entry")
[4806]371                    continue
[7950]372                update_errors = self.checkUpdateRequirements(obj, row, site)
[7938]373                if update_errors is not None:
[7937]374                    num_warns += 1
375                    self.writeFailedRow(
376                        failed_writer, string_row, update_errors)
377                    continue
[9706]378                self.updateEntry(obj, row, site, base)
[4891]379            finished_writer.writerow(string_row)
[4821]380
[4832]381        time_end = time.time()
382        timedelta = time_end - time_start
[6259]383
[4878]384        self.createLogfile(path, failed_path, num, num_warns, mode, user,
[4885]385                           timedelta, logger=logger)
[4894]386        failed_path = os.path.abspath(failed_path)
[4878]387        if num_warns == 0:
[4821]388            del failed_writer
389            os.unlink(failed_path)
[4894]390            failed_path = None
391        return (num, num_warns,
392                os.path.abspath(finished_path), failed_path)
[7859]393
[9032]394    def get_csv_skeleton(self):
395        """Export CSV file only with a header of available fields.
396
397        A raw string with CSV data should be returned.
398        """
399        outfile = StringIO()
[10027]400        writer = unicodecsv.DictWriter(outfile, self.available_fields)
[9734]401        writer.writerow(
402            dict(zip(self.available_fields, self.available_fields))) # header
[9032]403        outfile.seek(0)
404        return outfile.read()
405
[7859]406class ExporterBase(object):
407    """A base for exporters.
408    """
409    grok.implements(ICSVExporter)
410
411    #: Fieldnames considered by this exporter
412    fields = ('code', 'title', 'title_prefix')
413
[7907]414    #: The title under which this exporter will be displayed
415    #: (if registered as a utility)
416    title = 'Override this title'
417
[7859]418    def mangle_value(self, value, name, context=None):
419        """Hook for mangling values in derived classes
420        """
421        if isinstance(value, bool):
422            value = value and '1' or '0'
423        elif isinstance(value, unicode):
424            # CSV writers like byte streams better than unicode
425            value = value.encode('utf-8')
[8380]426        elif isinstance(value, datetime.datetime):
[11737]427            #value = str(value)
428            value = str('%s#' % value) # changed 2014-07-06, see ticket #941
[8380]429        elif isinstance(value, datetime.date):
430            # Order is important here: check for date after datetime as
431            # datetimes are also dates.
432            #
433            # Append hash '#' to dates to circumvent unwanted excel automatic
434            value = str('%s#' % value)
[7859]435        elif value is None:
436            # None is not really representable in CSV files
437            value = ''
438        return value
439
440    def get_csv_writer(self, filepath=None):
441        """Get a CSV dict writer instance open for writing.
442
443        Returns a tuple (<writer>, <outfile>) where ``<writer>`` is a
444        :class:`csv.DictWriter` instance and outfile is the real file
445        which is written to. The latter is important when writing to
446        StringIO and can normally be ignored otherwise.
447
448        The returned file will already be filled with the header row.
449
450        Please note that if you give a filepath, the returned outfile
451        is open for writing only and you might have to close it before
452        reopening it for reading.
453        """
454        if filepath is None:
455            outfile = StringIO()
456        else:
457            outfile = open(filepath, 'wb')
[10027]458        writer = unicodecsv.DictWriter(outfile, self.fields)
[7859]459        writer.writerow(dict(zip(self.fields, self.fields))) # header
460        return writer, outfile
461
462    def write_item(self, obj, writer):
463        """Write a row extracted from `obj` into CSV file using `writer`.
464        """
465        row = {}
466        for name in self.fields:
467            value = getattr(obj, name, None)
468            value = self.mangle_value(value, name, obj)
469            row[name] = value
470        writer.writerow(row)
471        return
472
473    def close_outfile(self, filepath, outfile):
474        """Close outfile.
475
476        If filepath is None, the contents of outfile is returned.
477        """
478        outfile.seek(0)
479        if filepath is None:
480            return outfile.read()
481        outfile.close()
482        return
483
[9797]484    def get_filtered(self, site, **kw):
485        """Get datasets to export filtered by keyword arguments.
486
487        Returns an iterable.
488        """
489        raise NotImplementedError
490
[7859]491    def export(self, iterable, filepath=None):
492        """Export `iterable` as CSV file.
493
494        If `filepath` is ``None``, a raw string with CSV data should
495        be returned.
496        """
497        raise NotImplementedError
498
499    def export_all(self, site, filepath=None):
500        """Export all appropriate objects in `site` into `filepath` as
501        CSV data.
502
503        If `filepath` is ``None``, a raw string with CSV data should
504        be returned.
505        """
506        raise NotImplementedError
[9217]507
[9797]508    def export_filtered(self, site, filepath=None, **kw):
509        """Export items denoted by `args` and `kw`.
[9217]510
[9797]511        If `filepath` is ``None``, a raw string with CSV data should
512        be returned.
513        """
514        data = self.get_filtered(site, **kw)
515        return self.export(data, filepath=filepath)
516
517def export_job(site, exporter_name, **kw):
[9217]518    """Export all entries delivered by exporter and store it in a temp file.
519
520    `site` gives the site to search. It will be passed to the exporter
521    and also be set as 'current site' as the function is used in
522    asynchronous jobs which run in their own threads and have no site
523    set initially. Therefore `site` must also be a valid value for use
524    with `zope.component.hooks.setSite()`.
525
526    `exporter_name` is the utility name under which the desired
527    exporter was registered with the ZCA.
528
529    The resulting CSV file will be stored in a new temporary directory
530    (using :func:`tempfile.mkdtemp`). It will be named after the
531    exporter used with `.csv` filename extension.
532
533    Returns the path to the created CSV file.
534
535    .. note:: It is the callers responsibility to clean up the used
536              file and its parent directory.
537    """
538    setSite(site)
539    exporter = getUtility(ICSVExporter, name=exporter_name)
540    output_dir = tempfile.mkdtemp()
541    filename = '%s.csv' % exporter_name
542    output_path = os.path.join(output_dir, filename)
[9797]543    if kw == {}:
544        exporter.export_all(site, filepath=output_path)
545    else:
546        exporter.export_filtered(site, filepath=output_path, **kw)
[9217]547    return output_path
548
549class AsyncExportJob(AsyncJob):
550    """An IJob that exports data to CSV files.
551
552    `AsyncExportJob` instances are regular `AsyncJob` instances with a
553    different constructor API. Instead of a callable to execute, you
554    must pass a `site` and some `exporter_name` to trigger an export.
555
556    The real work is done when an instance of this class is put into a
557    queue. See :mod:`waeup.kofa.async` to learn more about
558    asynchronous jobs.
559
560    The `exporter_name` must be the name under which an ICSVExporter
561    utility was registered with the ZCA.
562
563    The `site` must be a valid site  or ``None``.
564
565    The result of an `AsyncExportJob` is the path to generated CSV
566    file. The file will reside in a temporary directory that should be
567    removed after being used.
568    """
569    grok.implements(IExportJob)
570
[9718]571    def __init__(self, site, exporter_name, *args, **kwargs):
[9217]572        super(AsyncExportJob, self).__init__(
[9718]573            export_job, site, exporter_name, *args, **kwargs)
[9217]574
[9816]575    @property
576    def finished(self):
577        """A job is marked `finished` if it is completed.
578
579        Please note: a finished report job does not neccessarily
580        provide an IReport result. See meth:`failed`.
581        """
582        return self.status == zc.async.interfaces.COMPLETED
583
584    @property
585    def failed(self):
586        """A report job is marked failed iff it is finished and the
587        result is None.
588
589        While a job is unfinished, the `failed` status is ``None``.
590
591        Failed jobs normally provide a `traceback` to examine reasons.
592        """
593        if not self.finished:
594            return None
595        if getattr(self, 'result', None) is None:
596            return True
597        return False
598
[9217]599class ExportJobContainer(object):
600    """A mix-in that provides functionality for asynchronous export jobs.
601    """
602    grok.implements(IExportJobContainer)
603    running_exports = PersistentList()
604
[9718]605    def start_export_job(self, exporter_name, user_id, *args, **kwargs):
[9217]606        """Start asynchronous export job.
607
608        `exporter_name` is the name of an exporter utility to be used.
609
610        `user_id` is the ID of the user that triggers the export.
611
612        The job_id is stored along with exporter name and user id in a
613        persistent list.
614
[9718]615        The method supports additional positional and keyword
616        arguments, which are passed as-is to the respective
617        :class:`AsyncExportJob`.
618
[9217]619        Returns the job ID of the job started.
620        """
621        site = grok.getSite()
622        manager = getUtility(IJobManager)
[9718]623        job = AsyncExportJob(site, exporter_name, *args, **kwargs)
[9217]624        job_id = manager.put(job)
625        # Make sure that the persisted list is stored in ZODB
626        self.running_exports = PersistentList(self.running_exports)
627        self.running_exports.append((job_id, exporter_name, user_id))
628        return job_id
629
630    def get_running_export_jobs(self, user_id=None):
631        """Get export jobs for user with `user_id` as list of tuples.
632
633        Each tuples holds ``<job_id>, <exporter_name>, <user_id>`` in
634        that order. The ``<exporter_name>`` is the utility name of the
635        used exporter.
636
637        If `user_id` is ``None``, all running jobs are returned.
638        """
639        entries = []
640        to_delete = []
641        manager = getUtility(IJobManager)
642        for entry in self.running_exports:
643            if user_id is not None and entry[2] != user_id:
644                continue
645            if manager.get(entry[0]) is None:
646                to_delete.append(entry)
647                continue
648            entries.append(entry)
649        if to_delete:
650            self.running_exports = PersistentList(
651                [x for x in self.running_exports if x not in to_delete])
652        return entries
653
654    def get_export_jobs_status(self, user_id=None):
655        """Get running/completed export jobs for `user_id` as list of tuples.
656
657        Each tuple holds ``<raw status>, <status translated>,
658        <exporter title>`` in that order, where ``<status
659        translated>`` and ``<exporter title>`` are translated strings
660        representing the status of the job and the human readable
661        title of the exporter used.
662        """
663        entries = self.get_running_export_jobs(user_id)
664        result = []
665        manager = getUtility(IJobManager)
666        for entry in entries:
667            job = manager.get(entry[0])
668            if job is None:
669                continue
670            status, status_translated = JOB_STATUS_MAP[job.status]
671            exporter_name = getUtility(ICSVExporter, name=entry[1]).title
672            result.append((status, status_translated, exporter_name))
673        return result
674
675    def delete_export_entry(self, entry):
676        """Delete the export denoted by `entry`.
677
678        Removes given entry from the local `running_exports` list and also
679        removes the regarding job via the local job manager.
680
681        `entry` must be a tuple ``(<job id>, <exporter name>, <user
682        id>)`` as created by :meth:`start_export_job` or returned by
683        :meth:`get_running_export_jobs`.
684        """
685        manager = getUtility(IJobManager)
686        job = manager.get(entry[0])
687        if job is not None:
688            # remove created export file
689            if isinstance(job.result, basestring):
690                if os.path.exists(os.path.dirname(job.result)):
691                    shutil.rmtree(os.path.dirname(job.result))
692        manager.remove(entry[0], self)
693        new_entries = [x for x in self.running_exports
694                       if x != entry]
695        self.running_exports = PersistentList(new_entries)
696        return
697
698    def entry_from_job_id(self, job_id):
699        """Get entry tuple for `job_id`.
700
701        Returns ``None`` if no such entry can be found.
702        """
703        for entry in self.running_exports:
704            if entry[0] == job_id:
705                return entry
706        return None
[9726]707
708class VirtualExportJobContainer(ExportJobContainer):
709    """A virtual export job container.
710
711    Virtual ExportJobContainers can be used as a mixin just like real
712    ExportJobContainer.
713
714    They retrieve and store data in the site-wide ExportJobContainer.
715
716    Functionality is currently entirely as for regular
717    ExportJobContainers, except that data is stored elsewhere.
718
719    VirtualExportJobContainers need a registered
720    IExportContainerFinder utility to find a suitable container for
721    storing data.
722    """
723    grok.implements(IExportJobContainer)
724
725    @property
726    def _site_container(self):
727        return getUtility(IExportContainerFinder)()
728
729    # The following is a simple trick. While ExportJobContainers store
730    # only one attribute in ZODB, it is sufficient to replace this
731    # attribute `running_exports` with a suitable manager to make the
732    # whole virtual container work like the original but with the data
733    # stored in the site-wide exports container. This way, virtual
734    # export containers provide the whole functionality of a regular
735    # exports container but store no data at all with themselves.
736    @property
737    def running_exports(self):
738        """Exports stored in the site-wide exports container.
739        """
740        return self._site_container.running_exports
741
742    @running_exports.setter
743    def running_exports(self, value):
744        self._site_container.running_exports = value
745
746    @running_exports.deleter
747    def running_exports(self):
748        del self._site_container.running_exports
749
[9823]750    @property
751    def logger(self):
752        return self._site_container.logger
[9726]753
754@implementer(IExportContainerFinder)
755class ExportContainerFinder(grok.GlobalUtility):
756    """Finder for local (site-wide) export container.
757    """
758
759    def __call__(self):
760        """Get the local export container-
761
762        If no site can be determined or the site provides no export
763        container, None is returned.
764        """
765        site = grok.getSite()
766        if site is None:
767            return None
768        return site.get('datacenter', None)
Note: See TracBrowser for help on using the repository browser.