source: main/waeup.ikoba/trunk/src/waeup/ikoba/utils/batching.py @ 12347

Last change on this file since 12347 was 12334, checked in by Henrik Bettermann, 10 years ago

Show contract category title on products container pages.

  • Property svn:keywords set to Id
File size: 27.8 KB
RevLine 
[7196]1## $Id: batching.py 12334 2014-12-29 06:35:45Z henrik $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
[11949]18"""Ikoba components for batch processing.
[4806]19
20Batch processors eat CSV files to add, update or remove large numbers
21of certain kinds of objects at once.
22"""
23import grok
[8380]24import datetime
[4821]25import os
[9217]26import shutil
[4900]27import tempfile
[4821]28import time
[10027]29import unicodecsv
[9816]30import zc.async.interfaces
[7859]31from cStringIO import StringIO
[9217]32from persistent.list import PersistentList
33from zope.component import createObject, getUtility
34from zope.component.hooks import setSite
[9726]35from zope.interface import Interface, implementer
[4806]36from zope.schema import getFields
[11849]37from zope.schema.interfaces import ConstraintNotSatisfied
[8332]38from zope.event import notify
[11949]39from waeup.ikoba.async import AsyncJob
40from waeup.ikoba.interfaces import (
[9217]41    IBatchProcessor, FatalCSVError, IObjectConverter, IJobManager,
42    ICSVExporter, IGNORE_MARKER, DuplicationError, JOB_STATUS_MAP,
[9726]43    IExportJobContainer, IExportJob, IExportContainerFinder)
[4806]44
45class BatchProcessor(grok.GlobalUtility):
46    """A processor to add, update, or remove data.
47
48    This is a non-active baseclass.
49    """
[8220]50    grok.implements(IBatchProcessor)
[4806]51    grok.context(Interface)
52    grok.baseclass()
53
54    # Name used in pages and forms...
[7933]55    name = u'Non-registered base processor'
[6259]56
[4806]57    # Internal name...
[7933]58    util_name = 'baseprocessor'
[6259]59
[4806]60    # Items for this processor need an interface with zope.schema fields.
[5009]61    iface = Interface
[6259]62
[4806]63    # The name must be the same as the util_name attribute in order to
64    # register this utility correctly.
65    grok.name(util_name)
66
67    # Headers needed to locate items...
[12178]68    location_fields = ['any_id']
[6259]69
[4806]70    # A factory with this name must be registered...
[12334]71    factory_name = 'waeup.any_factory'
[4806]72
73    @property
74    def required_fields(self):
[4829]75        """Required fields that have no default.
76
77        A list of names of field, whose value cannot be set if not
78        given during creation. Therefore these fields must exist in
79        input.
80
81        Fields with a default != missing_value do not belong to this
82        category.
83        """
[4806]84        result = []
85        for key, field in getFields(self.iface).items():
86            if key in self.location_fields:
87                continue
[4829]88            if field.default is not field.missing_value:
89                continue
[4806]90            if field.required:
91                result.append(key)
92        return result
[6259]93
[4806]94    @property
95    def req(self):
96        result = dict(
97            create = self.location_fields + self.required_fields,
98            update = self.location_fields,
99            remove = self.location_fields,
100        )
101        return result
102
103    @property
104    def available_fields(self):
105        return sorted(list(set(
106                    self.location_fields + getFields(self.iface).keys())))
[6259]107
[4806]108    def getHeaders(self, mode='create'):
109        return self.available_fields
110
111    def checkHeaders(self, headerfields, mode='create'):
112        req = self.req[mode]
113        # Check for required fields...
114        for field in req:
115            if not field in headerfields:
116                raise FatalCSVError(
117                    "Need at least columns %s for import!" %
118                    ', '.join(["'%s'" % x for x in req]))
[6828]119        # Check for double fields. Cannot happen because this error is
120        # already catched in views
[4806]121        not_ignored_fields = [x for x in headerfields
122                              if not x.startswith('--')]
123        if len(set(not_ignored_fields)) < len(not_ignored_fields):
124            raise FatalCSVError(
125                "Double headers: each column name may only appear once.")
126        return True
127
128    def applyMapping(self, row, mapping):
[4811]129        """Apply mapping to a row of CSV data.
[6824]130
[4811]131        """
[4806]132        result = dict()
133        for key, replacement in mapping.items():
[6824]134            if replacement == u'--IGNORE--':
135                # Skip ignored columns in failed and finished data files.
136                continue
[4806]137            result[replacement] = row[key]
138        return result
[6259]139
[4832]140    def getMapping(self, path, headerfields, mode):
[6824]141        """Get a mapping from CSV file headerfields to actually used fieldnames.
142
[4811]143        """
[4832]144        result = dict()
[10027]145        reader = unicodecsv.reader(open(path, 'rb'))
[4806]146        raw_header = reader.next()
[4832]147        for num, field in enumerate(headerfields):
148            if field not in self.location_fields and mode == 'remove':
[6824]149                # Skip non-location fields when removing.
150                continue
151            if field == u'--IGNORE--':
152                # Skip ignored columns in failed and finished data files.
153                continue
[4832]154            result[raw_header[num]] = field
155        return result
[4806]156
[6273]157    def stringFromErrs(self, errors, inv_errors):
158        result = []
159        for err in errors:
160            fieldname, message = err
161            result.append("%s: %s" % (fieldname, message))
162        for err in inv_errors:
163            result.append("invariant: %s" % err)
164        return '; '.join(result)
165
[4806]166    def callFactory(self, *args, **kw):
167        return createObject(self.factory_name)
168
169    def parentsExist(self, row, site):
[4811]170        """Tell whether the parent object for data in ``row`` exists.
171        """
[4806]172        raise NotImplementedError('method not implemented')
173
174    def entryExists(self, row, site):
[4811]175        """Tell whether there already exists an entry for ``row`` data.
176        """
[4806]177        raise NotImplementedError('method not implemented')
178
179    def getParent(self, row, site):
[4811]180        """Get the parent object for the entry in ``row``.
181        """
[4806]182        raise NotImplementedError('method not implemented')
[6259]183
[5009]184    def getEntry(self, row, site):
185        """Get the parent object for the entry in ``row``.
186        """
187        raise NotImplementedError('method not implemented')
[6259]188
[4806]189    def addEntry(self, obj, row, site):
[4811]190        """Add the entry given given by ``row`` data.
191        """
[4806]192        raise NotImplementedError('method not implemented')
193
194    def delEntry(self, row, site):
[4811]195        """Delete entry given by ``row`` data.
196        """
[6259]197        raise NotImplementedError('method not implemented')
[4806]198
[7950]199    def checkUpdateRequirements(self, obj, row, site):
200        """Checks requirements the object must fulfill when being updated.
[7938]201
202        This method is not used in case of deleting or adding objects.
203
[7950]204        Returns error messages as strings in case of requirement
[7938]205        problems.
[7937]206        """
[7938]207        return None
[7937]208
[9706]209    def updateEntry(self, obj, row, site, filename):
[4984]210        """Update obj to the values given in row.
[8220]211
212        Returns a string describing the fields changed.
[4984]213        """
[8220]214        changed = []
[4829]215        for key, value in row.items():
[8220]216            # Skip fields to be ignored.
217            if value == IGNORE_MARKER:
218                continue
[8304]219            # Skip fields not declared in interface and which are
220            # not yet attributes of existing objects. We can thus not
221            # add non-existing attributes here.
[8220]222            if not hasattr(obj, key):
223                continue
[9265]224            try:
225                setattr(obj, key, value)
226            except AttributeError:
227                # Computed attributes can't be set.
228                continue
[12258]229            log_value = getattr(value, 'product_id', value)
230            log_value = getattr(value, 'document_id', log_value)
[8222]231            changed.append('%s=%s' % (key, log_value))
[8332]232
[8333]233        # If any catalog is involved it must be updated.
234        #
235        # XXX: The event is also triggered when creating objects as
236        # updateEntry is called also when creating entries resulting
237        # in objectAdded and additional objectModified events.
238        if len(changed):
239            notify(grok.ObjectModifiedEvent(obj))
[8332]240
[8220]241        return ', '.join(changed)
[4821]242
[4832]243    def createLogfile(self, path, fail_path, num, warnings, mode, user,
[4885]244                      timedelta, logger=None):
245        """Write to log file.
[4821]246        """
[4885]247        if logger is None:
248            return
[9739]249        logger.info(
250            "processed: %s, %s mode, %s lines (%s successful/ %s failed), "
251            "%0.3f s (%0.4f s/item)" % (
252            path, mode, num, num - warnings, warnings,
253            timedelta, timedelta/(num or 1)))
[4821]254        return
[4877]255
256    def writeFailedRow(self, writer, row, warnings):
257        """Write a row with error messages to error CSV.
258
259        If warnings is a list of strings, they will be concatenated.
260        """
261        error_col = warnings
262        if isinstance(warnings, list):
263            error_col = ' / '.join(warnings)
264        row['--ERRORS--'] = error_col
265        writer.writerow(row)
266        return
[6259]267
[8220]268    def checkConversion(self, row, mode='ignore', ignore_empty=True):
[6847]269        """Validates all values in row.
270        """
271        converter = IObjectConverter(self.iface)
272        errs, inv_errs, conv_dict =  converter.fromStringDict(
[8220]273            row, self.factory_name, mode=mode)
[6847]274        return errs, inv_errs, conv_dict
275
[4885]276    def doImport(self, path, headerfields, mode='create', user='Unknown',
[8220]277                 logger=None, ignore_empty=True):
[4811]278        """Perform actual import.
279        """
[4832]280        time_start = time.time()
[4806]281        self.checkHeaders(headerfields, mode)
[4832]282        mapping = self.getMapping(path, headerfields, mode)
[10027]283        reader = unicodecsv.DictReader(open(path, 'rb'))
[4889]284
[4900]285        temp_dir = tempfile.mkdtemp()
[6259]286
[6273]287        base = os.path.basename(path)
288        (base, ext) = os.path.splitext(base)
[4900]289        failed_path = os.path.join(temp_dir, "%s.pending%s" % (base, ext))
[6831]290        failed_headers = mapping.values()
[4877]291        failed_headers.append('--ERRORS--')
[10027]292        failed_writer = unicodecsv.DictWriter(open(failed_path, 'wb'),
293                                              failed_headers)
[8573]294        os.chmod(failed_path, 0664)
[6831]295        failed_writer.writerow(dict([(x,x) for x in failed_headers]))
[4891]296
[4900]297        finished_path = os.path.join(temp_dir, "%s.finished%s" % (base, ext))
[6831]298        finished_headers = mapping.values()
[10027]299        finished_writer = unicodecsv.DictWriter(open(finished_path, 'wb'),
300                                                finished_headers)
[8905]301        os.chmod(finished_path, 0664)
[4891]302        finished_writer.writerow(dict([(x,x) for x in finished_headers]))
[6259]303
[4806]304        num =0
[4878]305        num_warns = 0
[4806]306        site = grok.getSite()
[7859]307
[4806]308        for raw_row in reader:
309            num += 1
310            string_row = self.applyMapping(raw_row, mapping)
[8222]311            if ignore_empty and mode in ('update',):
[8220]312                # replace empty strings with ignore-markers
[8222]313                for key, val in string_row.items():
[8220]314                    if val == '':
[8222]315                        string_row[key] = IGNORE_MARKER
316            row = dict(string_row.items()) # create deep copy
[6847]317            errs, inv_errs, conv_dict = self.checkConversion(string_row, mode)
[6273]318            if errs or inv_errs:
[4878]319                num_warns += 1
[6273]320                conv_warnings = self.stringFromErrs(errs, inv_errs)
321                self.writeFailedRow(
[6824]322                    failed_writer, string_row, conv_warnings)
[4821]323                continue
[6273]324            row.update(conv_dict)
[6259]325
[4806]326            if mode == 'create':
327                if not self.parentsExist(row, site):
[4878]328                    num_warns += 1
[4877]329                    self.writeFailedRow(
[6824]330                        failed_writer, string_row,
[4877]331                        "Not all parents do exist yet. Skipping")
[4806]332                    continue
333                if self.entryExists(row, site):
[4878]334                    num_warns += 1
[4877]335                    self.writeFailedRow(
[6824]336                        failed_writer, string_row,
[8330]337                        "This object already exists. Skipping.")
[4806]338                    continue
339                obj = self.callFactory()
[7273]340                # Override all values in row, also
[12250]341                # ids which have been
[7273]342                # generated in the respective __init__ methods before.
[9706]343                self.updateEntry(obj, row, site, base)
[6243]344                try:
345                    self.addEntry(obj, row, site)
[6273]346                except KeyError, error:
[6219]347                    num_warns += 1
348                    self.writeFailedRow(
[6824]349                        failed_writer, string_row,
[6273]350                        "%s Skipping." % error.message)
[8540]351                    continue
[8509]352                except DuplicationError, error:
353                    num_warns += 1
354                    self.writeFailedRow(
355                        failed_writer, string_row,
356                        "%s Skipping." % error.msg)
[6219]357                    continue
[4806]358            elif mode == 'remove':
359                if not self.entryExists(row, site):
[4878]360                    num_warns += 1
[4877]361                    self.writeFailedRow(
[6824]362                        failed_writer, string_row,
[9219]363                        "Cannot remove: no such entry")
[4806]364                    continue
365                self.delEntry(row, site)
366            elif mode == 'update':
367                obj = self.getEntry(row, site)
368                if obj is None:
[4878]369                    num_warns += 1
[4877]370                    self.writeFailedRow(
[6824]371                        failed_writer, string_row,
[9219]372                        "Cannot update: no such entry")
[4806]373                    continue
[7950]374                update_errors = self.checkUpdateRequirements(obj, row, site)
[7938]375                if update_errors is not None:
[7937]376                    num_warns += 1
377                    self.writeFailedRow(
378                        failed_writer, string_row, update_errors)
379                    continue
[11849]380                try:
381                    self.updateEntry(obj, row, site, base)
382                except ConstraintNotSatisfied, err:
383                    num_warns += 1
384                    self.writeFailedRow(
385                        failed_writer, string_row,
386                        "ConstraintNotSatisfied: %s" % err)
387                    continue
[4891]388            finished_writer.writerow(string_row)
[4821]389
[4832]390        time_end = time.time()
391        timedelta = time_end - time_start
[6259]392
[4878]393        self.createLogfile(path, failed_path, num, num_warns, mode, user,
[4885]394                           timedelta, logger=logger)
[4894]395        failed_path = os.path.abspath(failed_path)
[4878]396        if num_warns == 0:
[4821]397            del failed_writer
398            os.unlink(failed_path)
[4894]399            failed_path = None
400        return (num, num_warns,
401                os.path.abspath(finished_path), failed_path)
[7859]402
[9032]403    def get_csv_skeleton(self):
404        """Export CSV file only with a header of available fields.
405
406        A raw string with CSV data should be returned.
407        """
408        outfile = StringIO()
[10027]409        writer = unicodecsv.DictWriter(outfile, self.available_fields)
[9734]410        writer.writerow(
411            dict(zip(self.available_fields, self.available_fields))) # header
[9032]412        outfile.seek(0)
413        return outfile.read()
414
[7859]415class ExporterBase(object):
416    """A base for exporters.
417    """
418    grok.implements(ICSVExporter)
419
420    #: Fieldnames considered by this exporter
[12178]421    fields = ('any_id', 'any_title')
[7859]422
[7907]423    #: The title under which this exporter will be displayed
424    #: (if registered as a utility)
425    title = 'Override this title'
426
[7859]427    def mangle_value(self, value, name, context=None):
428        """Hook for mangling values in derived classes
429        """
430        if isinstance(value, bool):
431            value = value and '1' or '0'
432        elif isinstance(value, unicode):
433            # CSV writers like byte streams better than unicode
434            value = value.encode('utf-8')
[8380]435        elif isinstance(value, datetime.datetime):
[11737]436            #value = str(value)
437            value = str('%s#' % value) # changed 2014-07-06, see ticket #941
[8380]438        elif isinstance(value, datetime.date):
439            # Order is important here: check for date after datetime as
440            # datetimes are also dates.
441            #
442            # Append hash '#' to dates to circumvent unwanted excel automatic
443            value = str('%s#' % value)
[7859]444        elif value is None:
445            # None is not really representable in CSV files
446            value = ''
447        return value
448
449    def get_csv_writer(self, filepath=None):
450        """Get a CSV dict writer instance open for writing.
451
452        Returns a tuple (<writer>, <outfile>) where ``<writer>`` is a
453        :class:`csv.DictWriter` instance and outfile is the real file
454        which is written to. The latter is important when writing to
455        StringIO and can normally be ignored otherwise.
456
457        The returned file will already be filled with the header row.
458
459        Please note that if you give a filepath, the returned outfile
460        is open for writing only and you might have to close it before
461        reopening it for reading.
462        """
463        if filepath is None:
464            outfile = StringIO()
465        else:
466            outfile = open(filepath, 'wb')
[10027]467        writer = unicodecsv.DictWriter(outfile, self.fields)
[7859]468        writer.writerow(dict(zip(self.fields, self.fields))) # header
469        return writer, outfile
470
471    def write_item(self, obj, writer):
472        """Write a row extracted from `obj` into CSV file using `writer`.
473        """
474        row = {}
475        for name in self.fields:
476            value = getattr(obj, name, None)
477            value = self.mangle_value(value, name, obj)
478            row[name] = value
479        writer.writerow(row)
480        return
481
482    def close_outfile(self, filepath, outfile):
483        """Close outfile.
484
485        If filepath is None, the contents of outfile is returned.
486        """
487        outfile.seek(0)
488        if filepath is None:
489            return outfile.read()
490        outfile.close()
491        return
492
[9797]493    def get_filtered(self, site, **kw):
494        """Get datasets to export filtered by keyword arguments.
495
496        Returns an iterable.
497        """
498        raise NotImplementedError
499
[7859]500    def export(self, iterable, filepath=None):
501        """Export `iterable` as CSV file.
502
503        If `filepath` is ``None``, a raw string with CSV data should
504        be returned.
505        """
506        raise NotImplementedError
507
508    def export_all(self, site, filepath=None):
509        """Export all appropriate objects in `site` into `filepath` as
510        CSV data.
511
512        If `filepath` is ``None``, a raw string with CSV data should
513        be returned.
514        """
515        raise NotImplementedError
[9217]516
[9797]517    def export_filtered(self, site, filepath=None, **kw):
518        """Export items denoted by `args` and `kw`.
[9217]519
[9797]520        If `filepath` is ``None``, a raw string with CSV data should
521        be returned.
522        """
523        data = self.get_filtered(site, **kw)
524        return self.export(data, filepath=filepath)
525
526def export_job(site, exporter_name, **kw):
[9217]527    """Export all entries delivered by exporter and store it in a temp file.
528
529    `site` gives the site to search. It will be passed to the exporter
530    and also be set as 'current site' as the function is used in
531    asynchronous jobs which run in their own threads and have no site
532    set initially. Therefore `site` must also be a valid value for use
533    with `zope.component.hooks.setSite()`.
534
535    `exporter_name` is the utility name under which the desired
536    exporter was registered with the ZCA.
537
538    The resulting CSV file will be stored in a new temporary directory
539    (using :func:`tempfile.mkdtemp`). It will be named after the
540    exporter used with `.csv` filename extension.
541
542    Returns the path to the created CSV file.
543
544    .. note:: It is the callers responsibility to clean up the used
545              file and its parent directory.
546    """
547    setSite(site)
548    exporter = getUtility(ICSVExporter, name=exporter_name)
549    output_dir = tempfile.mkdtemp()
550    filename = '%s.csv' % exporter_name
551    output_path = os.path.join(output_dir, filename)
[9797]552    if kw == {}:
553        exporter.export_all(site, filepath=output_path)
554    else:
555        exporter.export_filtered(site, filepath=output_path, **kw)
[9217]556    return output_path
557
558class AsyncExportJob(AsyncJob):
559    """An IJob that exports data to CSV files.
560
561    `AsyncExportJob` instances are regular `AsyncJob` instances with a
562    different constructor API. Instead of a callable to execute, you
563    must pass a `site` and some `exporter_name` to trigger an export.
564
565    The real work is done when an instance of this class is put into a
[11949]566    queue. See :mod:`waeup.ikoba.async` to learn more about
[9217]567    asynchronous jobs.
568
569    The `exporter_name` must be the name under which an ICSVExporter
570    utility was registered with the ZCA.
571
572    The `site` must be a valid site  or ``None``.
573
574    The result of an `AsyncExportJob` is the path to generated CSV
575    file. The file will reside in a temporary directory that should be
576    removed after being used.
577    """
578    grok.implements(IExportJob)
579
[9718]580    def __init__(self, site, exporter_name, *args, **kwargs):
[9217]581        super(AsyncExportJob, self).__init__(
[9718]582            export_job, site, exporter_name, *args, **kwargs)
[9217]583
[9816]584    @property
585    def finished(self):
586        """A job is marked `finished` if it is completed.
587
588        Please note: a finished report job does not neccessarily
589        provide an IReport result. See meth:`failed`.
590        """
591        return self.status == zc.async.interfaces.COMPLETED
592
593    @property
594    def failed(self):
595        """A report job is marked failed iff it is finished and the
596        result is None.
597
598        While a job is unfinished, the `failed` status is ``None``.
599
600        Failed jobs normally provide a `traceback` to examine reasons.
601        """
602        if not self.finished:
603            return None
604        if getattr(self, 'result', None) is None:
605            return True
606        return False
607
[9217]608class ExportJobContainer(object):
609    """A mix-in that provides functionality for asynchronous export jobs.
610    """
611    grok.implements(IExportJobContainer)
612    running_exports = PersistentList()
613
[9718]614    def start_export_job(self, exporter_name, user_id, *args, **kwargs):
[9217]615        """Start asynchronous export job.
616
617        `exporter_name` is the name of an exporter utility to be used.
618
619        `user_id` is the ID of the user that triggers the export.
620
621        The job_id is stored along with exporter name and user id in a
622        persistent list.
623
[9718]624        The method supports additional positional and keyword
625        arguments, which are passed as-is to the respective
626        :class:`AsyncExportJob`.
627
[9217]628        Returns the job ID of the job started.
629        """
630        site = grok.getSite()
631        manager = getUtility(IJobManager)
[9718]632        job = AsyncExportJob(site, exporter_name, *args, **kwargs)
[9217]633        job_id = manager.put(job)
634        # Make sure that the persisted list is stored in ZODB
635        self.running_exports = PersistentList(self.running_exports)
636        self.running_exports.append((job_id, exporter_name, user_id))
637        return job_id
638
639    def get_running_export_jobs(self, user_id=None):
640        """Get export jobs for user with `user_id` as list of tuples.
641
642        Each tuples holds ``<job_id>, <exporter_name>, <user_id>`` in
643        that order. The ``<exporter_name>`` is the utility name of the
644        used exporter.
645
646        If `user_id` is ``None``, all running jobs are returned.
647        """
648        entries = []
649        to_delete = []
650        manager = getUtility(IJobManager)
651        for entry in self.running_exports:
652            if user_id is not None and entry[2] != user_id:
653                continue
654            if manager.get(entry[0]) is None:
655                to_delete.append(entry)
656                continue
657            entries.append(entry)
658        if to_delete:
659            self.running_exports = PersistentList(
660                [x for x in self.running_exports if x not in to_delete])
661        return entries
662
663    def get_export_jobs_status(self, user_id=None):
664        """Get running/completed export jobs for `user_id` as list of tuples.
665
666        Each tuple holds ``<raw status>, <status translated>,
667        <exporter title>`` in that order, where ``<status
668        translated>`` and ``<exporter title>`` are translated strings
669        representing the status of the job and the human readable
670        title of the exporter used.
671        """
672        entries = self.get_running_export_jobs(user_id)
673        result = []
674        manager = getUtility(IJobManager)
675        for entry in entries:
676            job = manager.get(entry[0])
677            if job is None:
678                continue
679            status, status_translated = JOB_STATUS_MAP[job.status]
680            exporter_name = getUtility(ICSVExporter, name=entry[1]).title
681            result.append((status, status_translated, exporter_name))
682        return result
683
684    def delete_export_entry(self, entry):
685        """Delete the export denoted by `entry`.
686
687        Removes given entry from the local `running_exports` list and also
688        removes the regarding job via the local job manager.
689
690        `entry` must be a tuple ``(<job id>, <exporter name>, <user
691        id>)`` as created by :meth:`start_export_job` or returned by
692        :meth:`get_running_export_jobs`.
693        """
694        manager = getUtility(IJobManager)
695        job = manager.get(entry[0])
696        if job is not None:
697            # remove created export file
698            if isinstance(job.result, basestring):
699                if os.path.exists(os.path.dirname(job.result)):
700                    shutil.rmtree(os.path.dirname(job.result))
701        manager.remove(entry[0], self)
702        new_entries = [x for x in self.running_exports
703                       if x != entry]
704        self.running_exports = PersistentList(new_entries)
705        return
706
707    def entry_from_job_id(self, job_id):
708        """Get entry tuple for `job_id`.
709
710        Returns ``None`` if no such entry can be found.
711        """
712        for entry in self.running_exports:
713            if entry[0] == job_id:
714                return entry
715        return None
[9726]716
717class VirtualExportJobContainer(ExportJobContainer):
718    """A virtual export job container.
719
720    Virtual ExportJobContainers can be used as a mixin just like real
721    ExportJobContainer.
722
723    They retrieve and store data in the site-wide ExportJobContainer.
724
725    Functionality is currently entirely as for regular
726    ExportJobContainers, except that data is stored elsewhere.
727
728    VirtualExportJobContainers need a registered
729    IExportContainerFinder utility to find a suitable container for
730    storing data.
731    """
732    grok.implements(IExportJobContainer)
733
734    @property
735    def _site_container(self):
736        return getUtility(IExportContainerFinder)()
737
738    # The following is a simple trick. While ExportJobContainers store
739    # only one attribute in ZODB, it is sufficient to replace this
740    # attribute `running_exports` with a suitable manager to make the
741    # whole virtual container work like the original but with the data
742    # stored in the site-wide exports container. This way, virtual
743    # export containers provide the whole functionality of a regular
744    # exports container but store no data at all with themselves.
745    @property
746    def running_exports(self):
747        """Exports stored in the site-wide exports container.
748        """
749        return self._site_container.running_exports
750
751    @running_exports.setter
752    def running_exports(self, value):
753        self._site_container.running_exports = value
754
755    @running_exports.deleter
756    def running_exports(self):
757        del self._site_container.running_exports
758
[9823]759    @property
760    def logger(self):
761        return self._site_container.logger
[9726]762
763@implementer(IExportContainerFinder)
764class ExportContainerFinder(grok.GlobalUtility):
765    """Finder for local (site-wide) export container.
766    """
767
768    def __call__(self):
769        """Get the local export container-
770
771        If no site can be determined or the site provides no export
772        container, None is returned.
773        """
774        site = grok.getSite()
775        if site is None:
776            return None
777        return site.get('datacenter', None)
Note: See TracBrowser for help on using the repository browser.