source: main/waeup.kofa/trunk/src/waeup/kofa/utils/batching.py @ 12811

Last change on this file since 12811 was 12811, checked in by Henrik Bettermann, 10 years ago

Take also whitespaces and ignore marker into consideration.

  • Property svn:keywords set to Id
File size: 28.8 KB
RevLine 
[7196]1## $Id: batching.py 12811 2015-03-23 11:06:26Z henrik $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
[7819]18"""Kofa components for batch processing.
[4806]19
20Batch processors eat CSV files to add, update or remove large numbers
21of certain kinds of objects at once.
22"""
23import grok
[8380]24import datetime
[4821]25import os
[9217]26import shutil
[4900]27import tempfile
[4821]28import time
[10027]29import unicodecsv
[9816]30import zc.async.interfaces
[7859]31from cStringIO import StringIO
[9217]32from persistent.list import PersistentList
33from zope.component import createObject, getUtility
34from zope.component.hooks import setSite
[9726]35from zope.interface import Interface, implementer
[4806]36from zope.schema import getFields
[11849]37from zope.schema.interfaces import ConstraintNotSatisfied
[8332]38from zope.event import notify
[9217]39from waeup.kofa.async import AsyncJob
[7811]40from waeup.kofa.interfaces import (
[9217]41    IBatchProcessor, FatalCSVError, IObjectConverter, IJobManager,
42    ICSVExporter, IGNORE_MARKER, DuplicationError, JOB_STATUS_MAP,
[9726]43    IExportJobContainer, IExportJob, IExportContainerFinder)
[4806]44
45class BatchProcessor(grok.GlobalUtility):
46    """A processor to add, update, or remove data.
47
48    This is a non-active baseclass.
49    """
[8220]50    grok.implements(IBatchProcessor)
[4806]51    grok.context(Interface)
52    grok.baseclass()
53
54    # Name used in pages and forms...
[7933]55    name = u'Non-registered base processor'
[6259]56
[4806]57    # Internal name...
[7933]58    util_name = 'baseprocessor'
[6259]59
[4806]60    # Items for this processor need an interface with zope.schema fields.
[5009]61    iface = Interface
[6259]62
[4806]63    # The name must be the same as the util_name attribute in order to
64    # register this utility correctly.
65    grok.name(util_name)
66
67    # Headers needed to locate items...
68    location_fields = ['code', 'faculty_code']
[6259]69
[4806]70    # A factory with this name must be registered...
71    factory_name = 'waeup.Department'
72
73    @property
74    def required_fields(self):
[4829]75        """Required fields that have no default.
76
77        A list of names of field, whose value cannot be set if not
78        given during creation. Therefore these fields must exist in
79        input.
80
81        Fields with a default != missing_value do not belong to this
82        category.
83        """
[4806]84        result = []
85        for key, field in getFields(self.iface).items():
86            if key in self.location_fields:
87                continue
[4829]88            if field.default is not field.missing_value:
89                continue
[4806]90            if field.required:
91                result.append(key)
92        return result
[6259]93
[4806]94    @property
95    def req(self):
96        result = dict(
97            create = self.location_fields + self.required_fields,
98            update = self.location_fields,
99            remove = self.location_fields,
100        )
101        return result
102
103    @property
104    def available_fields(self):
105        return sorted(list(set(
106                    self.location_fields + getFields(self.iface).keys())))
[6259]107
[4806]108    def getHeaders(self, mode='create'):
109        return self.available_fields
110
111    def checkHeaders(self, headerfields, mode='create'):
112        req = self.req[mode]
113        # Check for required fields...
114        for field in req:
115            if not field in headerfields:
116                raise FatalCSVError(
117                    "Need at least columns %s for import!" %
118                    ', '.join(["'%s'" % x for x in req]))
[6828]119        # Check for double fields. Cannot happen because this error is
120        # already catched in views
[4806]121        not_ignored_fields = [x for x in headerfields
122                              if not x.startswith('--')]
123        if len(set(not_ignored_fields)) < len(not_ignored_fields):
124            raise FatalCSVError(
125                "Double headers: each column name may only appear once.")
126        return True
127
128    def applyMapping(self, row, mapping):
[4811]129        """Apply mapping to a row of CSV data.
[6824]130
[4811]131        """
[4806]132        result = dict()
133        for key, replacement in mapping.items():
[6824]134            if replacement == u'--IGNORE--':
135                # Skip ignored columns in failed and finished data files.
136                continue
[4806]137            result[replacement] = row[key]
138        return result
[6259]139
[4832]140    def getMapping(self, path, headerfields, mode):
[6824]141        """Get a mapping from CSV file headerfields to actually used fieldnames.
142
[4811]143        """
[4832]144        result = dict()
[10027]145        reader = unicodecsv.reader(open(path, 'rb'))
[4806]146        raw_header = reader.next()
[4832]147        for num, field in enumerate(headerfields):
148            if field not in self.location_fields and mode == 'remove':
[6824]149                # Skip non-location fields when removing.
150                continue
151            if field == u'--IGNORE--':
152                # Skip ignored columns in failed and finished data files.
153                continue
[4832]154            result[raw_header[num]] = field
155        return result
[4806]156
[6273]157    def stringFromErrs(self, errors, inv_errors):
158        result = []
159        for err in errors:
160            fieldname, message = err
161            result.append("%s: %s" % (fieldname, message))
162        for err in inv_errors:
163            result.append("invariant: %s" % err)
164        return '; '.join(result)
165
[4806]166    def callFactory(self, *args, **kw):
167        return createObject(self.factory_name)
168
169    def parentsExist(self, row, site):
[4811]170        """Tell whether the parent object for data in ``row`` exists.
171        """
[4806]172        raise NotImplementedError('method not implemented')
173
174    def entryExists(self, row, site):
[4811]175        """Tell whether there already exists an entry for ``row`` data.
176        """
[4806]177        raise NotImplementedError('method not implemented')
178
179    def getParent(self, row, site):
[4811]180        """Get the parent object for the entry in ``row``.
181        """
[4806]182        raise NotImplementedError('method not implemented')
[6259]183
[5009]184    def getEntry(self, row, site):
[12513]185        """Get the object for the entry in ``row``.
[5009]186        """
187        raise NotImplementedError('method not implemented')
[6259]188
[4806]189    def addEntry(self, obj, row, site):
[4811]190        """Add the entry given given by ``row`` data.
191        """
[4806]192        raise NotImplementedError('method not implemented')
193
194    def delEntry(self, row, site):
[4811]195        """Delete entry given by ``row`` data.
196        """
[6259]197        raise NotImplementedError('method not implemented')
[4806]198
[7950]199    def checkUpdateRequirements(self, obj, row, site):
200        """Checks requirements the object must fulfill when being updated.
[7938]201
202        This method is not used in case of deleting or adding objects.
203
[7950]204        Returns error messages as strings in case of requirement
[7938]205        problems.
[7937]206        """
[7938]207        return None
[7937]208
[9706]209    def updateEntry(self, obj, row, site, filename):
[4984]210        """Update obj to the values given in row.
[8220]211
212        Returns a string describing the fields changed.
[4984]213        """
[8220]214        changed = []
[4829]215        for key, value in row.items():
[8220]216            # Skip fields to be ignored.
217            if value == IGNORE_MARKER:
218                continue
[8304]219            # Skip fields not declared in interface and which are
220            # not yet attributes of existing objects. We can thus not
221            # add non-existing attributes here.
[8220]222            if not hasattr(obj, key):
223                continue
[9265]224            try:
225                setattr(obj, key, value)
226            except AttributeError:
227                # Computed attributes can't be set.
228                continue
[8222]229            log_value = getattr(value, 'code', value)
230            changed.append('%s=%s' % (key, log_value))
[8332]231
[8333]232        # If any catalog is involved it must be updated.
233        #
234        # XXX: The event is also triggered when creating objects as
235        # updateEntry is called also when creating entries resulting
236        # in objectAdded and additional objectModified events.
237        if len(changed):
238            notify(grok.ObjectModifiedEvent(obj))
[8332]239
[8220]240        return ', '.join(changed)
[4821]241
[4832]242    def createLogfile(self, path, fail_path, num, warnings, mode, user,
[4885]243                      timedelta, logger=None):
244        """Write to log file.
[4821]245        """
[4885]246        if logger is None:
247            return
[9739]248        logger.info(
249            "processed: %s, %s mode, %s lines (%s successful/ %s failed), "
250            "%0.3f s (%0.4f s/item)" % (
251            path, mode, num, num - warnings, warnings,
252            timedelta, timedelta/(num or 1)))
[4821]253        return
[4877]254
255    def writeFailedRow(self, writer, row, warnings):
256        """Write a row with error messages to error CSV.
257
258        If warnings is a list of strings, they will be concatenated.
259        """
260        error_col = warnings
261        if isinstance(warnings, list):
262            error_col = ' / '.join(warnings)
263        row['--ERRORS--'] = error_col
264        writer.writerow(row)
265        return
[6259]266
[8220]267    def checkConversion(self, row, mode='ignore', ignore_empty=True):
[6847]268        """Validates all values in row.
269        """
270        converter = IObjectConverter(self.iface)
271        errs, inv_errs, conv_dict =  converter.fromStringDict(
[8220]272            row, self.factory_name, mode=mode)
[6847]273        return errs, inv_errs, conv_dict
274
[12810]275
276    def emptyRow(self, row):
277        """Detect empty rows.
278        """
279        for value in row.values():
[12811]280            if value.strip() and not value in (None, IGNORE_MARKER):
[12810]281                return False
282        return True
283
[4885]284    def doImport(self, path, headerfields, mode='create', user='Unknown',
[8220]285                 logger=None, ignore_empty=True):
[4811]286        """Perform actual import.
287        """
[4832]288        time_start = time.time()
[4806]289        self.checkHeaders(headerfields, mode)
[4832]290        mapping = self.getMapping(path, headerfields, mode)
[10027]291        reader = unicodecsv.DictReader(open(path, 'rb'))
[4889]292
[4900]293        temp_dir = tempfile.mkdtemp()
[6259]294
[6273]295        base = os.path.basename(path)
296        (base, ext) = os.path.splitext(base)
[4900]297        failed_path = os.path.join(temp_dir, "%s.pending%s" % (base, ext))
[6831]298        failed_headers = mapping.values()
[4877]299        failed_headers.append('--ERRORS--')
[10027]300        failed_writer = unicodecsv.DictWriter(open(failed_path, 'wb'),
301                                              failed_headers)
[8573]302        os.chmod(failed_path, 0664)
[6831]303        failed_writer.writerow(dict([(x,x) for x in failed_headers]))
[4891]304
[4900]305        finished_path = os.path.join(temp_dir, "%s.finished%s" % (base, ext))
[6831]306        finished_headers = mapping.values()
[10027]307        finished_writer = unicodecsv.DictWriter(open(finished_path, 'wb'),
308                                                finished_headers)
[8905]309        os.chmod(finished_path, 0664)
[4891]310        finished_writer.writerow(dict([(x,x) for x in finished_headers]))
[6259]311
[4806]312        num =0
[4878]313        num_warns = 0
[4806]314        site = grok.getSite()
[7859]315
[4806]316        for raw_row in reader:
317            num += 1
[12810]318            # Skip row if empty
319            if self.emptyRow(raw_row):
320                continue
[4806]321            string_row = self.applyMapping(raw_row, mapping)
[8222]322            if ignore_empty and mode in ('update',):
[12810]323                # Replace empty strings with ignore-markers
[8222]324                for key, val in string_row.items():
[8220]325                    if val == '':
[8222]326                        string_row[key] = IGNORE_MARKER
327            row = dict(string_row.items()) # create deep copy
[6847]328            errs, inv_errs, conv_dict = self.checkConversion(string_row, mode)
[6273]329            if errs or inv_errs:
[4878]330                num_warns += 1
[6273]331                conv_warnings = self.stringFromErrs(errs, inv_errs)
332                self.writeFailedRow(
[6824]333                    failed_writer, string_row, conv_warnings)
[4821]334                continue
[6273]335            row.update(conv_dict)
[6259]336
[4806]337            if mode == 'create':
338                if not self.parentsExist(row, site):
[4878]339                    num_warns += 1
[4877]340                    self.writeFailedRow(
[6824]341                        failed_writer, string_row,
[4877]342                        "Not all parents do exist yet. Skipping")
[4806]343                    continue
344                if self.entryExists(row, site):
[4878]345                    num_warns += 1
[4877]346                    self.writeFailedRow(
[6824]347                        failed_writer, string_row,
[8330]348                        "This object already exists. Skipping.")
[4806]349                    continue
350                obj = self.callFactory()
[7273]351                # Override all values in row, also
352                # student_ids and applicant_ids which have been
353                # generated in the respective __init__ methods before.
[9706]354                self.updateEntry(obj, row, site, base)
[6243]355                try:
356                    self.addEntry(obj, row, site)
[6273]357                except KeyError, error:
[6219]358                    num_warns += 1
359                    self.writeFailedRow(
[6824]360                        failed_writer, string_row,
[6273]361                        "%s Skipping." % error.message)
[8540]362                    continue
[8509]363                except DuplicationError, error:
364                    num_warns += 1
365                    self.writeFailedRow(
366                        failed_writer, string_row,
367                        "%s Skipping." % error.msg)
[6219]368                    continue
[4806]369            elif mode == 'remove':
370                if not self.entryExists(row, site):
[4878]371                    num_warns += 1
[4877]372                    self.writeFailedRow(
[6824]373                        failed_writer, string_row,
[9219]374                        "Cannot remove: no such entry")
[4806]375                    continue
376                self.delEntry(row, site)
377            elif mode == 'update':
378                obj = self.getEntry(row, site)
379                if obj is None:
[4878]380                    num_warns += 1
[4877]381                    self.writeFailedRow(
[6824]382                        failed_writer, string_row,
[9219]383                        "Cannot update: no such entry")
[4806]384                    continue
[7950]385                update_errors = self.checkUpdateRequirements(obj, row, site)
[7938]386                if update_errors is not None:
[7937]387                    num_warns += 1
388                    self.writeFailedRow(
389                        failed_writer, string_row, update_errors)
390                    continue
[11849]391                try:
392                    self.updateEntry(obj, row, site, base)
393                except ConstraintNotSatisfied, err:
394                    num_warns += 1
395                    self.writeFailedRow(
396                        failed_writer, string_row,
397                        "ConstraintNotSatisfied: %s" % err)
398                    continue
[4891]399            finished_writer.writerow(string_row)
[4821]400
[4832]401        time_end = time.time()
402        timedelta = time_end - time_start
[6259]403
[4878]404        self.createLogfile(path, failed_path, num, num_warns, mode, user,
[4885]405                           timedelta, logger=logger)
[4894]406        failed_path = os.path.abspath(failed_path)
[4878]407        if num_warns == 0:
[4821]408            del failed_writer
409            os.unlink(failed_path)
[4894]410            failed_path = None
411        return (num, num_warns,
412                os.path.abspath(finished_path), failed_path)
[7859]413
[9032]414    def get_csv_skeleton(self):
415        """Export CSV file only with a header of available fields.
416
417        A raw string with CSV data should be returned.
418        """
419        outfile = StringIO()
[10027]420        writer = unicodecsv.DictWriter(outfile, self.available_fields)
[9734]421        writer.writerow(
422            dict(zip(self.available_fields, self.available_fields))) # header
[9032]423        outfile.seek(0)
424        return outfile.read()
425
[7859]426class ExporterBase(object):
427    """A base for exporters.
428    """
429    grok.implements(ICSVExporter)
430
431    #: Fieldnames considered by this exporter
432    fields = ('code', 'title', 'title_prefix')
433
[7907]434    #: The title under which this exporter will be displayed
435    #: (if registered as a utility)
436    title = 'Override this title'
437
[7859]438    def mangle_value(self, value, name, context=None):
439        """Hook for mangling values in derived classes
440        """
441        if isinstance(value, bool):
442            value = value and '1' or '0'
443        elif isinstance(value, unicode):
444            # CSV writers like byte streams better than unicode
445            value = value.encode('utf-8')
[8380]446        elif isinstance(value, datetime.datetime):
[11737]447            #value = str(value)
448            value = str('%s#' % value) # changed 2014-07-06, see ticket #941
[8380]449        elif isinstance(value, datetime.date):
450            # Order is important here: check for date after datetime as
451            # datetimes are also dates.
452            #
453            # Append hash '#' to dates to circumvent unwanted excel automatic
454            value = str('%s#' % value)
[7859]455        elif value is None:
456            # None is not really representable in CSV files
457            value = ''
458        return value
459
460    def get_csv_writer(self, filepath=None):
461        """Get a CSV dict writer instance open for writing.
462
463        Returns a tuple (<writer>, <outfile>) where ``<writer>`` is a
464        :class:`csv.DictWriter` instance and outfile is the real file
465        which is written to. The latter is important when writing to
466        StringIO and can normally be ignored otherwise.
467
468        The returned file will already be filled with the header row.
469
470        Please note that if you give a filepath, the returned outfile
471        is open for writing only and you might have to close it before
472        reopening it for reading.
473        """
474        if filepath is None:
475            outfile = StringIO()
476        else:
477            outfile = open(filepath, 'wb')
[10027]478        writer = unicodecsv.DictWriter(outfile, self.fields)
[7859]479        writer.writerow(dict(zip(self.fields, self.fields))) # header
480        return writer, outfile
481
482    def write_item(self, obj, writer):
483        """Write a row extracted from `obj` into CSV file using `writer`.
484        """
485        row = {}
486        for name in self.fields:
487            value = getattr(obj, name, None)
488            value = self.mangle_value(value, name, obj)
489            row[name] = value
490        writer.writerow(row)
491        return
492
493    def close_outfile(self, filepath, outfile):
494        """Close outfile.
495
496        If filepath is None, the contents of outfile is returned.
497        """
498        outfile.seek(0)
499        if filepath is None:
500            return outfile.read()
501        outfile.close()
502        return
503
[9797]504    def get_filtered(self, site, **kw):
505        """Get datasets to export filtered by keyword arguments.
506
507        Returns an iterable.
508        """
509        raise NotImplementedError
510
[12516]511    def get_selected(self, site, selected):
512        """Get datasets to export for selected items
513        specified by a list of identifiers.
514
515        Returns an iterable.
516        """
517        raise NotImplementedError
518
[7859]519    def export(self, iterable, filepath=None):
520        """Export `iterable` as CSV file.
521
522        If `filepath` is ``None``, a raw string with CSV data should
523        be returned.
524        """
525        raise NotImplementedError
526
527    def export_all(self, site, filepath=None):
528        """Export all appropriate objects in `site` into `filepath` as
529        CSV data.
530
531        If `filepath` is ``None``, a raw string with CSV data should
532        be returned.
533        """
534        raise NotImplementedError
[9217]535
[9797]536    def export_filtered(self, site, filepath=None, **kw):
537        """Export items denoted by `args` and `kw`.
[9217]538
[9797]539        If `filepath` is ``None``, a raw string with CSV data should
540        be returned.
541        """
542        data = self.get_filtered(site, **kw)
543        return self.export(data, filepath=filepath)
544
[12516]545    def export_selected(self, site, filepath=None, **kw):
546        """Export those items specified by a list of identifiers
547        called `selected`.
548
549        If `filepath` is ``None``, a raw string with CSV data should
550        be returned.
551        """
552        selected = kw.get('selected', [])
553        data = self.get_selected(site, selected)
554        return self.export(data, filepath=filepath)
555
[9797]556def export_job(site, exporter_name, **kw):
[9217]557    """Export all entries delivered by exporter and store it in a temp file.
558
559    `site` gives the site to search. It will be passed to the exporter
560    and also be set as 'current site' as the function is used in
561    asynchronous jobs which run in their own threads and have no site
562    set initially. Therefore `site` must also be a valid value for use
563    with `zope.component.hooks.setSite()`.
564
565    `exporter_name` is the utility name under which the desired
566    exporter was registered with the ZCA.
567
568    The resulting CSV file will be stored in a new temporary directory
569    (using :func:`tempfile.mkdtemp`). It will be named after the
570    exporter used with `.csv` filename extension.
571
572    Returns the path to the created CSV file.
573
574    .. note:: It is the callers responsibility to clean up the used
575              file and its parent directory.
576    """
577    setSite(site)
578    exporter = getUtility(ICSVExporter, name=exporter_name)
579    output_dir = tempfile.mkdtemp()
580    filename = '%s.csv' % exporter_name
581    output_path = os.path.join(output_dir, filename)
[9797]582    if kw == {}:
583        exporter.export_all(site, filepath=output_path)
[12516]584    elif kw.has_key('selected'):
585        exporter.export_selected(site, filepath=output_path, **kw)
[9797]586    else:
587        exporter.export_filtered(site, filepath=output_path, **kw)
[9217]588    return output_path
589
590class AsyncExportJob(AsyncJob):
591    """An IJob that exports data to CSV files.
592
593    `AsyncExportJob` instances are regular `AsyncJob` instances with a
594    different constructor API. Instead of a callable to execute, you
595    must pass a `site` and some `exporter_name` to trigger an export.
596
597    The real work is done when an instance of this class is put into a
598    queue. See :mod:`waeup.kofa.async` to learn more about
599    asynchronous jobs.
600
601    The `exporter_name` must be the name under which an ICSVExporter
602    utility was registered with the ZCA.
603
604    The `site` must be a valid site  or ``None``.
605
606    The result of an `AsyncExportJob` is the path to generated CSV
607    file. The file will reside in a temporary directory that should be
608    removed after being used.
609    """
610    grok.implements(IExportJob)
611
[9718]612    def __init__(self, site, exporter_name, *args, **kwargs):
[9217]613        super(AsyncExportJob, self).__init__(
[9718]614            export_job, site, exporter_name, *args, **kwargs)
[9217]615
[9816]616    @property
617    def finished(self):
618        """A job is marked `finished` if it is completed.
619
620        Please note: a finished report job does not neccessarily
621        provide an IReport result. See meth:`failed`.
622        """
623        return self.status == zc.async.interfaces.COMPLETED
624
625    @property
626    def failed(self):
627        """A report job is marked failed iff it is finished and the
628        result is None.
629
630        While a job is unfinished, the `failed` status is ``None``.
631
632        Failed jobs normally provide a `traceback` to examine reasons.
633        """
634        if not self.finished:
635            return None
636        if getattr(self, 'result', None) is None:
637            return True
638        return False
639
[9217]640class ExportJobContainer(object):
641    """A mix-in that provides functionality for asynchronous export jobs.
642    """
643    grok.implements(IExportJobContainer)
644    running_exports = PersistentList()
645
[9718]646    def start_export_job(self, exporter_name, user_id, *args, **kwargs):
[9217]647        """Start asynchronous export job.
648
649        `exporter_name` is the name of an exporter utility to be used.
650
651        `user_id` is the ID of the user that triggers the export.
652
653        The job_id is stored along with exporter name and user id in a
654        persistent list.
655
[9718]656        The method supports additional positional and keyword
657        arguments, which are passed as-is to the respective
658        :class:`AsyncExportJob`.
659
[9217]660        Returns the job ID of the job started.
661        """
662        site = grok.getSite()
663        manager = getUtility(IJobManager)
[9718]664        job = AsyncExportJob(site, exporter_name, *args, **kwargs)
[9217]665        job_id = manager.put(job)
666        # Make sure that the persisted list is stored in ZODB
667        self.running_exports = PersistentList(self.running_exports)
668        self.running_exports.append((job_id, exporter_name, user_id))
669        return job_id
670
671    def get_running_export_jobs(self, user_id=None):
672        """Get export jobs for user with `user_id` as list of tuples.
673
674        Each tuples holds ``<job_id>, <exporter_name>, <user_id>`` in
675        that order. The ``<exporter_name>`` is the utility name of the
676        used exporter.
677
678        If `user_id` is ``None``, all running jobs are returned.
679        """
680        entries = []
681        to_delete = []
682        manager = getUtility(IJobManager)
683        for entry in self.running_exports:
684            if user_id is not None and entry[2] != user_id:
685                continue
686            if manager.get(entry[0]) is None:
687                to_delete.append(entry)
688                continue
689            entries.append(entry)
690        if to_delete:
691            self.running_exports = PersistentList(
692                [x for x in self.running_exports if x not in to_delete])
693        return entries
694
695    def get_export_jobs_status(self, user_id=None):
696        """Get running/completed export jobs for `user_id` as list of tuples.
697
698        Each tuple holds ``<raw status>, <status translated>,
699        <exporter title>`` in that order, where ``<status
700        translated>`` and ``<exporter title>`` are translated strings
701        representing the status of the job and the human readable
702        title of the exporter used.
703        """
704        entries = self.get_running_export_jobs(user_id)
705        result = []
706        manager = getUtility(IJobManager)
707        for entry in entries:
708            job = manager.get(entry[0])
709            if job is None:
710                continue
711            status, status_translated = JOB_STATUS_MAP[job.status]
712            exporter_name = getUtility(ICSVExporter, name=entry[1]).title
713            result.append((status, status_translated, exporter_name))
714        return result
715
716    def delete_export_entry(self, entry):
717        """Delete the export denoted by `entry`.
718
719        Removes given entry from the local `running_exports` list and also
720        removes the regarding job via the local job manager.
721
722        `entry` must be a tuple ``(<job id>, <exporter name>, <user
723        id>)`` as created by :meth:`start_export_job` or returned by
724        :meth:`get_running_export_jobs`.
725        """
726        manager = getUtility(IJobManager)
727        job = manager.get(entry[0])
728        if job is not None:
729            # remove created export file
730            if isinstance(job.result, basestring):
731                if os.path.exists(os.path.dirname(job.result)):
732                    shutil.rmtree(os.path.dirname(job.result))
733        manager.remove(entry[0], self)
734        new_entries = [x for x in self.running_exports
735                       if x != entry]
736        self.running_exports = PersistentList(new_entries)
737        return
738
739    def entry_from_job_id(self, job_id):
740        """Get entry tuple for `job_id`.
741
742        Returns ``None`` if no such entry can be found.
743        """
744        for entry in self.running_exports:
745            if entry[0] == job_id:
746                return entry
747        return None
[9726]748
749class VirtualExportJobContainer(ExportJobContainer):
750    """A virtual export job container.
751
752    Virtual ExportJobContainers can be used as a mixin just like real
753    ExportJobContainer.
754
755    They retrieve and store data in the site-wide ExportJobContainer.
756
757    Functionality is currently entirely as for regular
758    ExportJobContainers, except that data is stored elsewhere.
759
760    VirtualExportJobContainers need a registered
761    IExportContainerFinder utility to find a suitable container for
762    storing data.
763    """
764    grok.implements(IExportJobContainer)
765
766    @property
767    def _site_container(self):
768        return getUtility(IExportContainerFinder)()
769
770    # The following is a simple trick. While ExportJobContainers store
771    # only one attribute in ZODB, it is sufficient to replace this
772    # attribute `running_exports` with a suitable manager to make the
773    # whole virtual container work like the original but with the data
774    # stored in the site-wide exports container. This way, virtual
775    # export containers provide the whole functionality of a regular
776    # exports container but store no data at all with themselves.
777    @property
778    def running_exports(self):
779        """Exports stored in the site-wide exports container.
780        """
781        return self._site_container.running_exports
782
783    @running_exports.setter
784    def running_exports(self, value):
785        self._site_container.running_exports = value
786
787    @running_exports.deleter
788    def running_exports(self):
789        del self._site_container.running_exports
790
[9823]791    @property
792    def logger(self):
793        return self._site_container.logger
[9726]794
795@implementer(IExportContainerFinder)
796class ExportContainerFinder(grok.GlobalUtility):
797    """Finder for local (site-wide) export container.
798    """
799
800    def __call__(self):
801        """Get the local export container-
802
803        If no site can be determined or the site provides no export
804        container, None is returned.
805        """
806        site = grok.getSite()
807        if site is None:
808            return None
809        return site.get('datacenter', None)
Note: See TracBrowser for help on using the repository browser.