source: main/waeup.kofa/trunk/src/waeup/kofa/utils/batching.py @ 9859

Last change on this file since 9859 was 9823, checked in by Henrik Bettermann, 12 years ago

Use logger of datacenter for VirtualExportJobContainers?.

  • Property svn:keywords set to Id
File size: 27.3 KB
RevLine 
[7196]1## $Id: batching.py 9823 2012-12-22 09:42:58Z henrik $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
[7819]18"""Kofa components for batch processing.
[4806]19
20Batch processors eat CSV files to add, update or remove large numbers
21of certain kinds of objects at once.
22"""
23import grok
24import csv
[8380]25import datetime
[4821]26import os
[9217]27import shutil
[4900]28import tempfile
[4821]29import time
[9816]30import zc.async.interfaces
[7859]31from cStringIO import StringIO
[9217]32from persistent.list import PersistentList
33from zope.component import createObject, getUtility
34from zope.component.hooks import setSite
[9726]35from zope.interface import Interface, implementer
[4806]36from zope.schema import getFields
[8332]37from zope.event import notify
[9217]38from waeup.kofa.async import AsyncJob
[7811]39from waeup.kofa.interfaces import (
[9217]40    IBatchProcessor, FatalCSVError, IObjectConverter, IJobManager,
41    ICSVExporter, IGNORE_MARKER, DuplicationError, JOB_STATUS_MAP,
[9726]42    IExportJobContainer, IExportJob, IExportContainerFinder)
[4806]43
44class BatchProcessor(grok.GlobalUtility):
45    """A processor to add, update, or remove data.
46
47    This is a non-active baseclass.
48    """
[8220]49    grok.implements(IBatchProcessor)
[4806]50    grok.context(Interface)
51    grok.baseclass()
52
53    # Name used in pages and forms...
[7933]54    name = u'Non-registered base processor'
[6259]55
[4806]56    # Internal name...
[7933]57    util_name = 'baseprocessor'
[6259]58
[4806]59    # Items for this processor need an interface with zope.schema fields.
[5009]60    iface = Interface
[6259]61
[4806]62    # The name must be the same as the util_name attribute in order to
63    # register this utility correctly.
64    grok.name(util_name)
65
66    # Headers needed to locate items...
67    location_fields = ['code', 'faculty_code']
[6259]68
[4806]69    # A factory with this name must be registered...
70    factory_name = 'waeup.Department'
71
72    @property
73    def required_fields(self):
[4829]74        """Required fields that have no default.
75
76        A list of names of field, whose value cannot be set if not
77        given during creation. Therefore these fields must exist in
78        input.
79
80        Fields with a default != missing_value do not belong to this
81        category.
82        """
[4806]83        result = []
84        for key, field in getFields(self.iface).items():
85            if key in self.location_fields:
86                continue
[4829]87            if field.default is not field.missing_value:
88                continue
[4806]89            if field.required:
90                result.append(key)
91        return result
[6259]92
[4806]93    @property
94    def req(self):
95        result = dict(
96            create = self.location_fields + self.required_fields,
97            update = self.location_fields,
98            remove = self.location_fields,
99        )
100        return result
101
102    @property
103    def available_fields(self):
104        return sorted(list(set(
105                    self.location_fields + getFields(self.iface).keys())))
[6259]106
[4806]107    def getHeaders(self, mode='create'):
108        return self.available_fields
109
110    def checkHeaders(self, headerfields, mode='create'):
111        req = self.req[mode]
112        # Check for required fields...
113        for field in req:
114            if not field in headerfields:
115                raise FatalCSVError(
116                    "Need at least columns %s for import!" %
117                    ', '.join(["'%s'" % x for x in req]))
[6828]118        # Check for double fields. Cannot happen because this error is
119        # already catched in views
[4806]120        not_ignored_fields = [x for x in headerfields
121                              if not x.startswith('--')]
122        if len(set(not_ignored_fields)) < len(not_ignored_fields):
123            raise FatalCSVError(
124                "Double headers: each column name may only appear once.")
125        return True
126
127    def applyMapping(self, row, mapping):
[4811]128        """Apply mapping to a row of CSV data.
[6824]129
[4811]130        """
[4806]131        result = dict()
132        for key, replacement in mapping.items():
[6824]133            if replacement == u'--IGNORE--':
134                # Skip ignored columns in failed and finished data files.
135                continue
[4806]136            result[replacement] = row[key]
137        return result
[6259]138
[4832]139    def getMapping(self, path, headerfields, mode):
[6824]140        """Get a mapping from CSV file headerfields to actually used fieldnames.
141
[4811]142        """
[4832]143        result = dict()
[4806]144        reader = csv.reader(open(path, 'rb'))
145        raw_header = reader.next()
[4832]146        for num, field in enumerate(headerfields):
147            if field not in self.location_fields and mode == 'remove':
[6824]148                # Skip non-location fields when removing.
149                continue
150            if field == u'--IGNORE--':
151                # Skip ignored columns in failed and finished data files.
152                continue
[4832]153            result[raw_header[num]] = field
154        return result
[4806]155
[6273]156    def stringFromErrs(self, errors, inv_errors):
157        result = []
158        for err in errors:
159            fieldname, message = err
160            result.append("%s: %s" % (fieldname, message))
161        for err in inv_errors:
162            result.append("invariant: %s" % err)
163        return '; '.join(result)
164
[4806]165    def callFactory(self, *args, **kw):
166        return createObject(self.factory_name)
167
168    def parentsExist(self, row, site):
[4811]169        """Tell whether the parent object for data in ``row`` exists.
170        """
[4806]171        raise NotImplementedError('method not implemented')
172
173    def entryExists(self, row, site):
[4811]174        """Tell whether there already exists an entry for ``row`` data.
175        """
[4806]176        raise NotImplementedError('method not implemented')
177
178    def getParent(self, row, site):
[4811]179        """Get the parent object for the entry in ``row``.
180        """
[4806]181        raise NotImplementedError('method not implemented')
[6259]182
[5009]183    def getEntry(self, row, site):
184        """Get the parent object for the entry in ``row``.
185        """
186        raise NotImplementedError('method not implemented')
[6259]187
[4806]188    def addEntry(self, obj, row, site):
[4811]189        """Add the entry given given by ``row`` data.
190        """
[4806]191        raise NotImplementedError('method not implemented')
192
193    def delEntry(self, row, site):
[4811]194        """Delete entry given by ``row`` data.
195        """
[6259]196        raise NotImplementedError('method not implemented')
[4806]197
[7950]198    def checkUpdateRequirements(self, obj, row, site):
199        """Checks requirements the object must fulfill when being updated.
[7938]200
201        This method is not used in case of deleting or adding objects.
202
[7950]203        Returns error messages as strings in case of requirement
[7938]204        problems.
[7937]205        """
[7938]206        return None
[7937]207
[9706]208    def updateEntry(self, obj, row, site, filename):
[4984]209        """Update obj to the values given in row.
[8220]210
211        Returns a string describing the fields changed.
[4984]212        """
[8220]213        changed = []
[4829]214        for key, value in row.items():
[8220]215            # Skip fields to be ignored.
216            if value == IGNORE_MARKER:
217                continue
[8304]218            # Skip fields not declared in interface and which are
219            # not yet attributes of existing objects. We can thus not
220            # add non-existing attributes here.
[8220]221            if not hasattr(obj, key):
222                continue
[9265]223            try:
224                setattr(obj, key, value)
225            except AttributeError:
226                # Computed attributes can't be set.
227                continue
[8222]228            log_value = getattr(value, 'code', value)
229            changed.append('%s=%s' % (key, log_value))
[8332]230
[8333]231        # If any catalog is involved it must be updated.
232        #
233        # XXX: The event is also triggered when creating objects as
234        # updateEntry is called also when creating entries resulting
235        # in objectAdded and additional objectModified events.
236        if len(changed):
237            notify(grok.ObjectModifiedEvent(obj))
[8332]238
[8220]239        return ', '.join(changed)
[4821]240
[4832]241    def createLogfile(self, path, fail_path, num, warnings, mode, user,
[4885]242                      timedelta, logger=None):
243        """Write to log file.
[4821]244        """
[4885]245        if logger is None:
246            return
[9739]247        logger.info(
248            "processed: %s, %s mode, %s lines (%s successful/ %s failed), "
249            "%0.3f s (%0.4f s/item)" % (
250            path, mode, num, num - warnings, warnings,
251            timedelta, timedelta/(num or 1)))
[4821]252        return
[4877]253
254    def writeFailedRow(self, writer, row, warnings):
255        """Write a row with error messages to error CSV.
256
257        If warnings is a list of strings, they will be concatenated.
258        """
259        error_col = warnings
260        if isinstance(warnings, list):
261            error_col = ' / '.join(warnings)
262        row['--ERRORS--'] = error_col
263        writer.writerow(row)
264        return
[6259]265
[8220]266    def checkConversion(self, row, mode='ignore', ignore_empty=True):
[6847]267        """Validates all values in row.
268        """
269        converter = IObjectConverter(self.iface)
270        errs, inv_errs, conv_dict =  converter.fromStringDict(
[8220]271            row, self.factory_name, mode=mode)
[6847]272        return errs, inv_errs, conv_dict
273
[4885]274    def doImport(self, path, headerfields, mode='create', user='Unknown',
[8220]275                 logger=None, ignore_empty=True):
[4811]276        """Perform actual import.
277        """
[4832]278        time_start = time.time()
[4806]279        self.checkHeaders(headerfields, mode)
[4832]280        mapping = self.getMapping(path, headerfields, mode)
[4806]281        reader = csv.DictReader(open(path, 'rb'))
[4889]282
[4900]283        temp_dir = tempfile.mkdtemp()
[6259]284
[6273]285        base = os.path.basename(path)
286        (base, ext) = os.path.splitext(base)
[4900]287        failed_path = os.path.join(temp_dir, "%s.pending%s" % (base, ext))
[6831]288        failed_headers = mapping.values()
[4877]289        failed_headers.append('--ERRORS--')
[4821]290        failed_writer = csv.DictWriter(open(failed_path, 'wb'),
291                                       failed_headers)
[8573]292        os.chmod(failed_path, 0664)
[6831]293        failed_writer.writerow(dict([(x,x) for x in failed_headers]))
[4891]294
[4900]295        finished_path = os.path.join(temp_dir, "%s.finished%s" % (base, ext))
[6831]296        finished_headers = mapping.values()
[4891]297        finished_writer = csv.DictWriter(open(finished_path, 'wb'),
298                                         finished_headers)
[8905]299        os.chmod(finished_path, 0664)
[4891]300        finished_writer.writerow(dict([(x,x) for x in finished_headers]))
[6259]301
[4806]302        num =0
[4878]303        num_warns = 0
[4806]304        site = grok.getSite()
[7859]305
[4806]306        for raw_row in reader:
307            num += 1
308            string_row = self.applyMapping(raw_row, mapping)
[8222]309            if ignore_empty and mode in ('update',):
[8220]310                # replace empty strings with ignore-markers
[8222]311                for key, val in string_row.items():
[8220]312                    if val == '':
[8222]313                        string_row[key] = IGNORE_MARKER
314            row = dict(string_row.items()) # create deep copy
[6847]315            errs, inv_errs, conv_dict = self.checkConversion(string_row, mode)
[6273]316            if errs or inv_errs:
[4878]317                num_warns += 1
[6273]318                conv_warnings = self.stringFromErrs(errs, inv_errs)
319                self.writeFailedRow(
[6824]320                    failed_writer, string_row, conv_warnings)
[4821]321                continue
[6273]322            row.update(conv_dict)
[6259]323
[4806]324            if mode == 'create':
325                if not self.parentsExist(row, site):
[4878]326                    num_warns += 1
[4877]327                    self.writeFailedRow(
[6824]328                        failed_writer, string_row,
[4877]329                        "Not all parents do exist yet. Skipping")
[4806]330                    continue
331                if self.entryExists(row, site):
[4878]332                    num_warns += 1
[4877]333                    self.writeFailedRow(
[6824]334                        failed_writer, string_row,
[8330]335                        "This object already exists. Skipping.")
[4806]336                    continue
337                obj = self.callFactory()
[7273]338                # Override all values in row, also
339                # student_ids and applicant_ids which have been
340                # generated in the respective __init__ methods before.
[9706]341                self.updateEntry(obj, row, site, base)
[6243]342                try:
343                    self.addEntry(obj, row, site)
[6273]344                except KeyError, error:
[6219]345                    num_warns += 1
346                    self.writeFailedRow(
[6824]347                        failed_writer, string_row,
[6273]348                        "%s Skipping." % error.message)
[8540]349                    continue
[8509]350                except DuplicationError, error:
351                    num_warns += 1
352                    self.writeFailedRow(
353                        failed_writer, string_row,
354                        "%s Skipping." % error.msg)
[6219]355                    continue
[4806]356            elif mode == 'remove':
357                if not self.entryExists(row, site):
[4878]358                    num_warns += 1
[4877]359                    self.writeFailedRow(
[6824]360                        failed_writer, string_row,
[9219]361                        "Cannot remove: no such entry")
[4806]362                    continue
363                self.delEntry(row, site)
364            elif mode == 'update':
365                obj = self.getEntry(row, site)
366                if obj is None:
[4878]367                    num_warns += 1
[4877]368                    self.writeFailedRow(
[6824]369                        failed_writer, string_row,
[9219]370                        "Cannot update: no such entry")
[4806]371                    continue
[7950]372                update_errors = self.checkUpdateRequirements(obj, row, site)
[7938]373                if update_errors is not None:
[7937]374                    num_warns += 1
375                    self.writeFailedRow(
376                        failed_writer, string_row, update_errors)
377                    continue
[9706]378                self.updateEntry(obj, row, site, base)
[4891]379            finished_writer.writerow(string_row)
[4821]380
[4832]381        time_end = time.time()
382        timedelta = time_end - time_start
[6259]383
[4878]384        self.createLogfile(path, failed_path, num, num_warns, mode, user,
[4885]385                           timedelta, logger=logger)
[4894]386        failed_path = os.path.abspath(failed_path)
[4878]387        if num_warns == 0:
[4821]388            del failed_writer
389            os.unlink(failed_path)
[4894]390            failed_path = None
391        return (num, num_warns,
392                os.path.abspath(finished_path), failed_path)
[7859]393
[9032]394    def get_csv_skeleton(self):
395        """Export CSV file only with a header of available fields.
396
397        A raw string with CSV data should be returned.
398        """
399        outfile = StringIO()
400        writer = csv.DictWriter(outfile, self.available_fields)
[9734]401        writer.writerow(
402            dict(zip(self.available_fields, self.available_fields))) # header
[9032]403        outfile.seek(0)
404        return outfile.read()
405
[7859]406class ExporterBase(object):
407    """A base for exporters.
408    """
409    grok.implements(ICSVExporter)
410
411    #: Fieldnames considered by this exporter
412    fields = ('code', 'title', 'title_prefix')
413
[7907]414    #: The title under which this exporter will be displayed
415    #: (if registered as a utility)
416    title = 'Override this title'
417
[7859]418    def mangle_value(self, value, name, context=None):
419        """Hook for mangling values in derived classes
420        """
421        if isinstance(value, bool):
422            value = value and '1' or '0'
423        elif isinstance(value, unicode):
424            # CSV writers like byte streams better than unicode
425            value = value.encode('utf-8')
[8380]426        elif isinstance(value, datetime.datetime):
427            value = str(value)
428        elif isinstance(value, datetime.date):
429            # Order is important here: check for date after datetime as
430            # datetimes are also dates.
431            #
432            # Append hash '#' to dates to circumvent unwanted excel automatic
433            value = str('%s#' % value)
[7859]434        elif value is None:
435            # None is not really representable in CSV files
436            value = ''
437        return value
438
439    def get_csv_writer(self, filepath=None):
440        """Get a CSV dict writer instance open for writing.
441
442        Returns a tuple (<writer>, <outfile>) where ``<writer>`` is a
443        :class:`csv.DictWriter` instance and outfile is the real file
444        which is written to. The latter is important when writing to
445        StringIO and can normally be ignored otherwise.
446
447        The returned file will already be filled with the header row.
448
449        Please note that if you give a filepath, the returned outfile
450        is open for writing only and you might have to close it before
451        reopening it for reading.
452        """
453        if filepath is None:
454            outfile = StringIO()
455        else:
456            outfile = open(filepath, 'wb')
457        writer = csv.DictWriter(outfile, self.fields)
458        writer.writerow(dict(zip(self.fields, self.fields))) # header
459        return writer, outfile
460
461    def write_item(self, obj, writer):
462        """Write a row extracted from `obj` into CSV file using `writer`.
463        """
464        row = {}
465        for name in self.fields:
466            value = getattr(obj, name, None)
467            value = self.mangle_value(value, name, obj)
468            row[name] = value
469        writer.writerow(row)
470        return
471
472    def close_outfile(self, filepath, outfile):
473        """Close outfile.
474
475        If filepath is None, the contents of outfile is returned.
476        """
477        outfile.seek(0)
478        if filepath is None:
479            return outfile.read()
480        outfile.close()
481        return
482
[9797]483    def get_filtered(self, site, **kw):
484        """Get datasets to export filtered by keyword arguments.
485
486        Returns an iterable.
487        """
488        raise NotImplementedError
489
[7859]490    def export(self, iterable, filepath=None):
491        """Export `iterable` as CSV file.
492
493        If `filepath` is ``None``, a raw string with CSV data should
494        be returned.
495        """
496        raise NotImplementedError
497
498    def export_all(self, site, filepath=None):
499        """Export all appropriate objects in `site` into `filepath` as
500        CSV data.
501
502        If `filepath` is ``None``, a raw string with CSV data should
503        be returned.
504        """
505        raise NotImplementedError
[9217]506
[9797]507    def export_filtered(self, site, filepath=None, **kw):
508        """Export items denoted by `args` and `kw`.
[9217]509
[9797]510        If `filepath` is ``None``, a raw string with CSV data should
511        be returned.
512        """
513        data = self.get_filtered(site, **kw)
514        return self.export(data, filepath=filepath)
515
516def export_job(site, exporter_name, **kw):
[9217]517    """Export all entries delivered by exporter and store it in a temp file.
518
519    `site` gives the site to search. It will be passed to the exporter
520    and also be set as 'current site' as the function is used in
521    asynchronous jobs which run in their own threads and have no site
522    set initially. Therefore `site` must also be a valid value for use
523    with `zope.component.hooks.setSite()`.
524
525    `exporter_name` is the utility name under which the desired
526    exporter was registered with the ZCA.
527
528    The resulting CSV file will be stored in a new temporary directory
529    (using :func:`tempfile.mkdtemp`). It will be named after the
530    exporter used with `.csv` filename extension.
531
532    Returns the path to the created CSV file.
533
534    .. note:: It is the callers responsibility to clean up the used
535              file and its parent directory.
536    """
537    setSite(site)
538    exporter = getUtility(ICSVExporter, name=exporter_name)
539    output_dir = tempfile.mkdtemp()
540    filename = '%s.csv' % exporter_name
541    output_path = os.path.join(output_dir, filename)
[9797]542    if kw == {}:
543        exporter.export_all(site, filepath=output_path)
544    else:
545        exporter.export_filtered(site, filepath=output_path, **kw)
[9217]546    return output_path
547
548class AsyncExportJob(AsyncJob):
549    """An IJob that exports data to CSV files.
550
551    `AsyncExportJob` instances are regular `AsyncJob` instances with a
552    different constructor API. Instead of a callable to execute, you
553    must pass a `site` and some `exporter_name` to trigger an export.
554
555    The real work is done when an instance of this class is put into a
556    queue. See :mod:`waeup.kofa.async` to learn more about
557    asynchronous jobs.
558
559    The `exporter_name` must be the name under which an ICSVExporter
560    utility was registered with the ZCA.
561
562    The `site` must be a valid site  or ``None``.
563
564    The result of an `AsyncExportJob` is the path to generated CSV
565    file. The file will reside in a temporary directory that should be
566    removed after being used.
567    """
568    grok.implements(IExportJob)
569
[9718]570    def __init__(self, site, exporter_name, *args, **kwargs):
[9217]571        super(AsyncExportJob, self).__init__(
[9718]572            export_job, site, exporter_name, *args, **kwargs)
[9217]573
[9816]574    @property
575    def finished(self):
576        """A job is marked `finished` if it is completed.
577
578        Please note: a finished report job does not neccessarily
579        provide an IReport result. See meth:`failed`.
580        """
581        return self.status == zc.async.interfaces.COMPLETED
582
583    @property
584    def failed(self):
585        """A report job is marked failed iff it is finished and the
586        result is None.
587
588        While a job is unfinished, the `failed` status is ``None``.
589
590        Failed jobs normally provide a `traceback` to examine reasons.
591        """
592        if not self.finished:
593            return None
594        if getattr(self, 'result', None) is None:
595            return True
596        return False
597
[9217]598class ExportJobContainer(object):
599    """A mix-in that provides functionality for asynchronous export jobs.
600    """
601    grok.implements(IExportJobContainer)
602    running_exports = PersistentList()
603
[9718]604    def start_export_job(self, exporter_name, user_id, *args, **kwargs):
[9217]605        """Start asynchronous export job.
606
607        `exporter_name` is the name of an exporter utility to be used.
608
609        `user_id` is the ID of the user that triggers the export.
610
611        The job_id is stored along with exporter name and user id in a
612        persistent list.
613
[9718]614        The method supports additional positional and keyword
615        arguments, which are passed as-is to the respective
616        :class:`AsyncExportJob`.
617
[9217]618        Returns the job ID of the job started.
619        """
620        site = grok.getSite()
621        manager = getUtility(IJobManager)
[9718]622        job = AsyncExportJob(site, exporter_name, *args, **kwargs)
[9217]623        job_id = manager.put(job)
624        # Make sure that the persisted list is stored in ZODB
625        self.running_exports = PersistentList(self.running_exports)
626        self.running_exports.append((job_id, exporter_name, user_id))
627        return job_id
628
629    def get_running_export_jobs(self, user_id=None):
630        """Get export jobs for user with `user_id` as list of tuples.
631
632        Each tuples holds ``<job_id>, <exporter_name>, <user_id>`` in
633        that order. The ``<exporter_name>`` is the utility name of the
634        used exporter.
635
636        If `user_id` is ``None``, all running jobs are returned.
637        """
638        entries = []
639        to_delete = []
640        manager = getUtility(IJobManager)
641        for entry in self.running_exports:
642            if user_id is not None and entry[2] != user_id:
643                continue
644            if manager.get(entry[0]) is None:
645                to_delete.append(entry)
646                continue
647            entries.append(entry)
648        if to_delete:
649            self.running_exports = PersistentList(
650                [x for x in self.running_exports if x not in to_delete])
651        return entries
652
653    def get_export_jobs_status(self, user_id=None):
654        """Get running/completed export jobs for `user_id` as list of tuples.
655
656        Each tuple holds ``<raw status>, <status translated>,
657        <exporter title>`` in that order, where ``<status
658        translated>`` and ``<exporter title>`` are translated strings
659        representing the status of the job and the human readable
660        title of the exporter used.
661        """
662        entries = self.get_running_export_jobs(user_id)
663        result = []
664        manager = getUtility(IJobManager)
665        for entry in entries:
666            job = manager.get(entry[0])
667            if job is None:
668                continue
669            status, status_translated = JOB_STATUS_MAP[job.status]
670            exporter_name = getUtility(ICSVExporter, name=entry[1]).title
671            result.append((status, status_translated, exporter_name))
672        return result
673
674    def delete_export_entry(self, entry):
675        """Delete the export denoted by `entry`.
676
677        Removes given entry from the local `running_exports` list and also
678        removes the regarding job via the local job manager.
679
680        `entry` must be a tuple ``(<job id>, <exporter name>, <user
681        id>)`` as created by :meth:`start_export_job` or returned by
682        :meth:`get_running_export_jobs`.
683        """
684        manager = getUtility(IJobManager)
685        job = manager.get(entry[0])
686        if job is not None:
687            # remove created export file
688            if isinstance(job.result, basestring):
689                if os.path.exists(os.path.dirname(job.result)):
690                    shutil.rmtree(os.path.dirname(job.result))
691        manager.remove(entry[0], self)
692        new_entries = [x for x in self.running_exports
693                       if x != entry]
694        self.running_exports = PersistentList(new_entries)
695        return
696
697    def entry_from_job_id(self, job_id):
698        """Get entry tuple for `job_id`.
699
700        Returns ``None`` if no such entry can be found.
701        """
702        for entry in self.running_exports:
703            if entry[0] == job_id:
704                return entry
705        return None
[9726]706
707class VirtualExportJobContainer(ExportJobContainer):
708    """A virtual export job container.
709
710    Virtual ExportJobContainers can be used as a mixin just like real
711    ExportJobContainer.
712
713    They retrieve and store data in the site-wide ExportJobContainer.
714
715    Functionality is currently entirely as for regular
716    ExportJobContainers, except that data is stored elsewhere.
717
718    VirtualExportJobContainers need a registered
719    IExportContainerFinder utility to find a suitable container for
720    storing data.
721    """
722    grok.implements(IExportJobContainer)
723
724    @property
725    def _site_container(self):
726        return getUtility(IExportContainerFinder)()
727
728    # The following is a simple trick. While ExportJobContainers store
729    # only one attribute in ZODB, it is sufficient to replace this
730    # attribute `running_exports` with a suitable manager to make the
731    # whole virtual container work like the original but with the data
732    # stored in the site-wide exports container. This way, virtual
733    # export containers provide the whole functionality of a regular
734    # exports container but store no data at all with themselves.
735    @property
736    def running_exports(self):
737        """Exports stored in the site-wide exports container.
738        """
739        return self._site_container.running_exports
740
741    @running_exports.setter
742    def running_exports(self, value):
743        self._site_container.running_exports = value
744
745    @running_exports.deleter
746    def running_exports(self):
747        del self._site_container.running_exports
748
[9823]749    @property
750    def logger(self):
751        return self._site_container.logger
[9726]752
753@implementer(IExportContainerFinder)
754class ExportContainerFinder(grok.GlobalUtility):
755    """Finder for local (site-wide) export container.
756    """
757
758    def __call__(self):
759        """Get the local export container-
760
761        If no site can be determined or the site provides no export
762        container, None is returned.
763        """
764        site = grok.getSite()
765        if site is None:
766            return None
767        return site.get('datacenter', None)
Note: See TracBrowser for help on using the repository browser.