source: main/waeup.kofa/trunk/src/waeup/kofa/utils/batching.py @ 9730

Last change on this file since 9730 was 9726, checked in by uli, 12 years ago

Add VirtualExportJobContainers? for storing export jobs in a central exports container.

  • Property svn:keywords set to Id
File size: 26.4 KB
Line 
1## $Id: batching.py 9726 2012-11-26 23:27:46Z uli $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
18"""Kofa components for batch processing.
19
20Batch processors eat CSV files to add, update or remove large numbers
21of certain kinds of objects at once.
22"""
23import grok
24import csv
25import datetime
26import os
27import shutil
28import tempfile
29import time
30from cStringIO import StringIO
31from persistent.list import PersistentList
32from zope.component import createObject, getUtility
33from zope.component.hooks import setSite
34from zope.interface import Interface, implementer
35from zope.schema import getFields
36from zope.event import notify
37from waeup.kofa.async import AsyncJob
38from waeup.kofa.interfaces import (
39    IBatchProcessor, FatalCSVError, IObjectConverter, IJobManager,
40    ICSVExporter, IGNORE_MARKER, DuplicationError, JOB_STATUS_MAP,
41    IExportJobContainer, IExportJob, IExportContainerFinder)
42
43class BatchProcessor(grok.GlobalUtility):
44    """A processor to add, update, or remove data.
45
46    This is a non-active baseclass.
47    """
48    grok.implements(IBatchProcessor)
49    grok.context(Interface)
50    grok.baseclass()
51
52    # Name used in pages and forms...
53    name = u'Non-registered base processor'
54
55    # Internal name...
56    util_name = 'baseprocessor'
57
58    # Items for this processor need an interface with zope.schema fields.
59    iface = Interface
60
61    # The name must be the same as the util_name attribute in order to
62    # register this utility correctly.
63    grok.name(util_name)
64
65    # Headers needed to locate items...
66    location_fields = ['code', 'faculty_code']
67
68    # A factory with this name must be registered...
69    factory_name = 'waeup.Department'
70
71    @property
72    def required_fields(self):
73        """Required fields that have no default.
74
75        A list of names of field, whose value cannot be set if not
76        given during creation. Therefore these fields must exist in
77        input.
78
79        Fields with a default != missing_value do not belong to this
80        category.
81        """
82        result = []
83        for key, field in getFields(self.iface).items():
84            if key in self.location_fields:
85                continue
86            if field.default is not field.missing_value:
87                continue
88            if field.required:
89                result.append(key)
90        return result
91
92    @property
93    def req(self):
94        result = dict(
95            create = self.location_fields + self.required_fields,
96            update = self.location_fields,
97            remove = self.location_fields,
98        )
99        return result
100
101    @property
102    def available_fields(self):
103        return sorted(list(set(
104                    self.location_fields + getFields(self.iface).keys())))
105
106    def getHeaders(self, mode='create'):
107        return self.available_fields
108
109    def checkHeaders(self, headerfields, mode='create'):
110        req = self.req[mode]
111        # Check for required fields...
112        for field in req:
113            if not field in headerfields:
114                raise FatalCSVError(
115                    "Need at least columns %s for import!" %
116                    ', '.join(["'%s'" % x for x in req]))
117        # Check for double fields. Cannot happen because this error is
118        # already catched in views
119        not_ignored_fields = [x for x in headerfields
120                              if not x.startswith('--')]
121        if len(set(not_ignored_fields)) < len(not_ignored_fields):
122            raise FatalCSVError(
123                "Double headers: each column name may only appear once.")
124        return True
125
126    def applyMapping(self, row, mapping):
127        """Apply mapping to a row of CSV data.
128
129        """
130        result = dict()
131        for key, replacement in mapping.items():
132            if replacement == u'--IGNORE--':
133                # Skip ignored columns in failed and finished data files.
134                continue
135            result[replacement] = row[key]
136        return result
137
138    def getMapping(self, path, headerfields, mode):
139        """Get a mapping from CSV file headerfields to actually used fieldnames.
140
141        """
142        result = dict()
143        reader = csv.reader(open(path, 'rb'))
144        raw_header = reader.next()
145        for num, field in enumerate(headerfields):
146            if field not in self.location_fields and mode == 'remove':
147                # Skip non-location fields when removing.
148                continue
149            if field == u'--IGNORE--':
150                # Skip ignored columns in failed and finished data files.
151                continue
152            result[raw_header[num]] = field
153        return result
154
155    def stringFromErrs(self, errors, inv_errors):
156        result = []
157        for err in errors:
158            fieldname, message = err
159            result.append("%s: %s" % (fieldname, message))
160        for err in inv_errors:
161            result.append("invariant: %s" % err)
162        return '; '.join(result)
163
164    def callFactory(self, *args, **kw):
165        return createObject(self.factory_name)
166
167    def parentsExist(self, row, site):
168        """Tell whether the parent object for data in ``row`` exists.
169        """
170        raise NotImplementedError('method not implemented')
171
172    def entryExists(self, row, site):
173        """Tell whether there already exists an entry for ``row`` data.
174        """
175        raise NotImplementedError('method not implemented')
176
177    def getParent(self, row, site):
178        """Get the parent object for the entry in ``row``.
179        """
180        raise NotImplementedError('method not implemented')
181
182    def getEntry(self, row, site):
183        """Get the parent object for the entry in ``row``.
184        """
185        raise NotImplementedError('method not implemented')
186
187    def addEntry(self, obj, row, site):
188        """Add the entry given given by ``row`` data.
189        """
190        raise NotImplementedError('method not implemented')
191
192    def delEntry(self, row, site):
193        """Delete entry given by ``row`` data.
194        """
195        raise NotImplementedError('method not implemented')
196
197    def checkUpdateRequirements(self, obj, row, site):
198        """Checks requirements the object must fulfill when being updated.
199
200        This method is not used in case of deleting or adding objects.
201
202        Returns error messages as strings in case of requirement
203        problems.
204        """
205        return None
206
207    def updateEntry(self, obj, row, site, filename):
208        """Update obj to the values given in row.
209
210        Returns a string describing the fields changed.
211        """
212        changed = []
213        for key, value in row.items():
214            # Skip fields to be ignored.
215            if value == IGNORE_MARKER:
216                continue
217            # Skip fields not declared in interface and which are
218            # not yet attributes of existing objects. We can thus not
219            # add non-existing attributes here.
220            if not hasattr(obj, key):
221                continue
222            try:
223                setattr(obj, key, value)
224            except AttributeError:
225                # Computed attributes can't be set.
226                continue
227            log_value = getattr(value, 'code', value)
228            changed.append('%s=%s' % (key, log_value))
229
230        # If any catalog is involved it must be updated.
231        #
232        # XXX: The event is also triggered when creating objects as
233        # updateEntry is called also when creating entries resulting
234        # in objectAdded and additional objectModified events.
235        if len(changed):
236            notify(grok.ObjectModifiedEvent(obj))
237
238        return ', '.join(changed)
239
240    def createLogfile(self, path, fail_path, num, warnings, mode, user,
241                      timedelta, logger=None):
242        """Write to log file.
243        """
244        if logger is None:
245            return
246        status = 'OK'
247        if warnings > 0:
248            status = 'FAILED'
249        logger.info("-" * 20)
250        logger.info("%s: Batch processing finished: %s" % (user, status))
251        logger.info("%s: Source: %s" % (user, path))
252        logger.info("%s: Mode: %s" % (user, mode))
253        logger.info("%s: User: %s" % (user, user))
254        if warnings > 0:
255            logger.info("%s: Failed datasets: %s" % (
256                    user, os.path.basename(fail_path)))
257        logger.info("%s: Processing time: %0.3f s (%0.4f s/item)" % (
258                user, timedelta, timedelta/(num or 1)))
259        logger.info("%s: Processed: %s lines (%s successful/ %s failed)" % (
260                user, num, num - warnings, warnings
261                ))
262        logger.info("-" * 20)
263        return
264
265    def writeFailedRow(self, writer, row, warnings):
266        """Write a row with error messages to error CSV.
267
268        If warnings is a list of strings, they will be concatenated.
269        """
270        error_col = warnings
271        if isinstance(warnings, list):
272            error_col = ' / '.join(warnings)
273        row['--ERRORS--'] = error_col
274        writer.writerow(row)
275        return
276
277    def checkConversion(self, row, mode='ignore', ignore_empty=True):
278        """Validates all values in row.
279        """
280        converter = IObjectConverter(self.iface)
281        errs, inv_errs, conv_dict =  converter.fromStringDict(
282            row, self.factory_name, mode=mode)
283        return errs, inv_errs, conv_dict
284
285    def doImport(self, path, headerfields, mode='create', user='Unknown',
286                 logger=None, ignore_empty=True):
287        """Perform actual import.
288        """
289        time_start = time.time()
290        self.checkHeaders(headerfields, mode)
291        mapping = self.getMapping(path, headerfields, mode)
292        reader = csv.DictReader(open(path, 'rb'))
293
294        temp_dir = tempfile.mkdtemp()
295
296        base = os.path.basename(path)
297        (base, ext) = os.path.splitext(base)
298        failed_path = os.path.join(temp_dir, "%s.pending%s" % (base, ext))
299        failed_headers = mapping.values()
300        failed_headers.append('--ERRORS--')
301        failed_writer = csv.DictWriter(open(failed_path, 'wb'),
302                                       failed_headers)
303        os.chmod(failed_path, 0664)
304        failed_writer.writerow(dict([(x,x) for x in failed_headers]))
305
306        finished_path = os.path.join(temp_dir, "%s.finished%s" % (base, ext))
307        finished_headers = mapping.values()
308        finished_writer = csv.DictWriter(open(finished_path, 'wb'),
309                                         finished_headers)
310        os.chmod(finished_path, 0664)
311        finished_writer.writerow(dict([(x,x) for x in finished_headers]))
312
313        num =0
314        num_warns = 0
315        site = grok.getSite()
316
317        for raw_row in reader:
318            num += 1
319            string_row = self.applyMapping(raw_row, mapping)
320            if ignore_empty and mode in ('update',):
321                # replace empty strings with ignore-markers
322                for key, val in string_row.items():
323                    if val == '':
324                        string_row[key] = IGNORE_MARKER
325            row = dict(string_row.items()) # create deep copy
326            errs, inv_errs, conv_dict = self.checkConversion(string_row, mode)
327            if errs or inv_errs:
328                num_warns += 1
329                conv_warnings = self.stringFromErrs(errs, inv_errs)
330                self.writeFailedRow(
331                    failed_writer, string_row, conv_warnings)
332                continue
333            row.update(conv_dict)
334
335            if mode == 'create':
336                if not self.parentsExist(row, site):
337                    num_warns += 1
338                    self.writeFailedRow(
339                        failed_writer, string_row,
340                        "Not all parents do exist yet. Skipping")
341                    continue
342                if self.entryExists(row, site):
343                    num_warns += 1
344                    self.writeFailedRow(
345                        failed_writer, string_row,
346                        "This object already exists. Skipping.")
347                    continue
348                obj = self.callFactory()
349                # Override all values in row, also
350                # student_ids and applicant_ids which have been
351                # generated in the respective __init__ methods before.
352                self.updateEntry(obj, row, site, base)
353                try:
354                    self.addEntry(obj, row, site)
355                except KeyError, error:
356                    num_warns += 1
357                    self.writeFailedRow(
358                        failed_writer, string_row,
359                        "%s Skipping." % error.message)
360                    continue
361                except DuplicationError, error:
362                    num_warns += 1
363                    self.writeFailedRow(
364                        failed_writer, string_row,
365                        "%s Skipping." % error.msg)
366                    continue
367            elif mode == 'remove':
368                if not self.entryExists(row, site):
369                    num_warns += 1
370                    self.writeFailedRow(
371                        failed_writer, string_row,
372                        "Cannot remove: no such entry")
373                    continue
374                self.delEntry(row, site)
375            elif mode == 'update':
376                obj = self.getEntry(row, site)
377                if obj is None:
378                    num_warns += 1
379                    self.writeFailedRow(
380                        failed_writer, string_row,
381                        "Cannot update: no such entry")
382                    continue
383                update_errors = self.checkUpdateRequirements(obj, row, site)
384                if update_errors is not None:
385                    num_warns += 1
386                    self.writeFailedRow(
387                        failed_writer, string_row, update_errors)
388                    continue
389                self.updateEntry(obj, row, site, base)
390            finished_writer.writerow(string_row)
391
392        time_end = time.time()
393        timedelta = time_end - time_start
394
395        self.createLogfile(path, failed_path, num, num_warns, mode, user,
396                           timedelta, logger=logger)
397        failed_path = os.path.abspath(failed_path)
398        if num_warns == 0:
399            del failed_writer
400            os.unlink(failed_path)
401            failed_path = None
402        return (num, num_warns,
403                os.path.abspath(finished_path), failed_path)
404
405    def get_csv_skeleton(self):
406        """Export CSV file only with a header of available fields.
407
408        A raw string with CSV data should be returned.
409        """
410        outfile = StringIO()
411        writer = csv.DictWriter(outfile, self.available_fields)
412        writer.writerow(dict(zip(self.available_fields, self.available_fields))) # header
413        outfile.seek(0)
414        return outfile.read()
415
416class ExporterBase(object):
417    """A base for exporters.
418    """
419    grok.implements(ICSVExporter)
420
421    #: Fieldnames considered by this exporter
422    fields = ('code', 'title', 'title_prefix')
423
424    #: The title under which this exporter will be displayed
425    #: (if registered as a utility)
426    title = 'Override this title'
427
428    def mangle_value(self, value, name, context=None):
429        """Hook for mangling values in derived classes
430        """
431        if isinstance(value, bool):
432            value = value and '1' or '0'
433        elif isinstance(value, unicode):
434            # CSV writers like byte streams better than unicode
435            value = value.encode('utf-8')
436        elif isinstance(value, datetime.datetime):
437            value = str(value)
438        elif isinstance(value, datetime.date):
439            # Order is important here: check for date after datetime as
440            # datetimes are also dates.
441            #
442            # Append hash '#' to dates to circumvent unwanted excel automatic
443            value = str('%s#' % value)
444        elif value is None:
445            # None is not really representable in CSV files
446            value = ''
447        return value
448
449    def get_csv_writer(self, filepath=None):
450        """Get a CSV dict writer instance open for writing.
451
452        Returns a tuple (<writer>, <outfile>) where ``<writer>`` is a
453        :class:`csv.DictWriter` instance and outfile is the real file
454        which is written to. The latter is important when writing to
455        StringIO and can normally be ignored otherwise.
456
457        The returned file will already be filled with the header row.
458
459        Please note that if you give a filepath, the returned outfile
460        is open for writing only and you might have to close it before
461        reopening it for reading.
462        """
463        if filepath is None:
464            outfile = StringIO()
465        else:
466            outfile = open(filepath, 'wb')
467        writer = csv.DictWriter(outfile, self.fields)
468        writer.writerow(dict(zip(self.fields, self.fields))) # header
469        return writer, outfile
470
471    def write_item(self, obj, writer):
472        """Write a row extracted from `obj` into CSV file using `writer`.
473        """
474        row = {}
475        for name in self.fields:
476            value = getattr(obj, name, None)
477            value = self.mangle_value(value, name, obj)
478            row[name] = value
479        writer.writerow(row)
480        return
481
482    def close_outfile(self, filepath, outfile):
483        """Close outfile.
484
485        If filepath is None, the contents of outfile is returned.
486        """
487        outfile.seek(0)
488        if filepath is None:
489            return outfile.read()
490        outfile.close()
491        return
492
493    def export(self, iterable, filepath=None):
494        """Export `iterable` as CSV file.
495
496        If `filepath` is ``None``, a raw string with CSV data should
497        be returned.
498        """
499        raise NotImplementedError
500
501    def export_all(self, site, filepath=None):
502        """Export all appropriate objects in `site` into `filepath` as
503        CSV data.
504
505        If `filepath` is ``None``, a raw string with CSV data should
506        be returned.
507        """
508        raise NotImplementedError
509
510
511def export_job(site, exporter_name):
512    """Export all entries delivered by exporter and store it in a temp file.
513
514    `site` gives the site to search. It will be passed to the exporter
515    and also be set as 'current site' as the function is used in
516    asynchronous jobs which run in their own threads and have no site
517    set initially. Therefore `site` must also be a valid value for use
518    with `zope.component.hooks.setSite()`.
519
520    `exporter_name` is the utility name under which the desired
521    exporter was registered with the ZCA.
522
523    The resulting CSV file will be stored in a new temporary directory
524    (using :func:`tempfile.mkdtemp`). It will be named after the
525    exporter used with `.csv` filename extension.
526
527    Returns the path to the created CSV file.
528
529    .. note:: It is the callers responsibility to clean up the used
530              file and its parent directory.
531    """
532    setSite(site)
533    exporter = getUtility(ICSVExporter, name=exporter_name)
534    output_dir = tempfile.mkdtemp()
535    filename = '%s.csv' % exporter_name
536    output_path = os.path.join(output_dir, filename)
537    exporter.export_all(site, filepath=output_path)
538    return output_path
539
540class AsyncExportJob(AsyncJob):
541    """An IJob that exports data to CSV files.
542
543    `AsyncExportJob` instances are regular `AsyncJob` instances with a
544    different constructor API. Instead of a callable to execute, you
545    must pass a `site` and some `exporter_name` to trigger an export.
546
547    The real work is done when an instance of this class is put into a
548    queue. See :mod:`waeup.kofa.async` to learn more about
549    asynchronous jobs.
550
551    The `exporter_name` must be the name under which an ICSVExporter
552    utility was registered with the ZCA.
553
554    The `site` must be a valid site  or ``None``.
555
556    The result of an `AsyncExportJob` is the path to generated CSV
557    file. The file will reside in a temporary directory that should be
558    removed after being used.
559    """
560    grok.implements(IExportJob)
561
562    def __init__(self, site, exporter_name, *args, **kwargs):
563        super(AsyncExportJob, self).__init__(
564            export_job, site, exporter_name, *args, **kwargs)
565
566class ExportJobContainer(object):
567    """A mix-in that provides functionality for asynchronous export jobs.
568    """
569    grok.implements(IExportJobContainer)
570    running_exports = PersistentList()
571
572    def start_export_job(self, exporter_name, user_id, *args, **kwargs):
573        """Start asynchronous export job.
574
575        `exporter_name` is the name of an exporter utility to be used.
576
577        `user_id` is the ID of the user that triggers the export.
578
579        The job_id is stored along with exporter name and user id in a
580        persistent list.
581
582        The method supports additional positional and keyword
583        arguments, which are passed as-is to the respective
584        :class:`AsyncExportJob`.
585
586        Returns the job ID of the job started.
587        """
588        site = grok.getSite()
589        manager = getUtility(IJobManager)
590        job = AsyncExportJob(site, exporter_name, *args, **kwargs)
591        job_id = manager.put(job)
592        # Make sure that the persisted list is stored in ZODB
593        self.running_exports = PersistentList(self.running_exports)
594        self.running_exports.append((job_id, exporter_name, user_id))
595        return job_id
596
597    def get_running_export_jobs(self, user_id=None):
598        """Get export jobs for user with `user_id` as list of tuples.
599
600        Each tuples holds ``<job_id>, <exporter_name>, <user_id>`` in
601        that order. The ``<exporter_name>`` is the utility name of the
602        used exporter.
603
604        If `user_id` is ``None``, all running jobs are returned.
605        """
606        entries = []
607        to_delete = []
608        manager = getUtility(IJobManager)
609        for entry in self.running_exports:
610            if user_id is not None and entry[2] != user_id:
611                continue
612            if manager.get(entry[0]) is None:
613                to_delete.append(entry)
614                continue
615            entries.append(entry)
616        if to_delete:
617            self.running_exports = PersistentList(
618                [x for x in self.running_exports if x not in to_delete])
619        return entries
620
621    def get_export_jobs_status(self, user_id=None):
622        """Get running/completed export jobs for `user_id` as list of tuples.
623
624        Each tuple holds ``<raw status>, <status translated>,
625        <exporter title>`` in that order, where ``<status
626        translated>`` and ``<exporter title>`` are translated strings
627        representing the status of the job and the human readable
628        title of the exporter used.
629        """
630        entries = self.get_running_export_jobs(user_id)
631        result = []
632        manager = getUtility(IJobManager)
633        for entry in entries:
634            job = manager.get(entry[0])
635            if job is None:
636                continue
637            status, status_translated = JOB_STATUS_MAP[job.status]
638            exporter_name = getUtility(ICSVExporter, name=entry[1]).title
639            result.append((status, status_translated, exporter_name))
640        return result
641
642    def delete_export_entry(self, entry):
643        """Delete the export denoted by `entry`.
644
645        Removes given entry from the local `running_exports` list and also
646        removes the regarding job via the local job manager.
647
648        `entry` must be a tuple ``(<job id>, <exporter name>, <user
649        id>)`` as created by :meth:`start_export_job` or returned by
650        :meth:`get_running_export_jobs`.
651        """
652        manager = getUtility(IJobManager)
653        job = manager.get(entry[0])
654        if job is not None:
655            # remove created export file
656            if isinstance(job.result, basestring):
657                if os.path.exists(os.path.dirname(job.result)):
658                    shutil.rmtree(os.path.dirname(job.result))
659        manager.remove(entry[0], self)
660        new_entries = [x for x in self.running_exports
661                       if x != entry]
662        self.running_exports = PersistentList(new_entries)
663        return
664
665    def entry_from_job_id(self, job_id):
666        """Get entry tuple for `job_id`.
667
668        Returns ``None`` if no such entry can be found.
669        """
670        for entry in self.running_exports:
671            if entry[0] == job_id:
672                return entry
673        return None
674
675class VirtualExportJobContainer(ExportJobContainer):
676    """A virtual export job container.
677
678    Virtual ExportJobContainers can be used as a mixin just like real
679    ExportJobContainer.
680
681    They retrieve and store data in the site-wide ExportJobContainer.
682
683    Functionality is currently entirely as for regular
684    ExportJobContainers, except that data is stored elsewhere.
685
686    VirtualExportJobContainers need a registered
687    IExportContainerFinder utility to find a suitable container for
688    storing data.
689    """
690    grok.implements(IExportJobContainer)
691
692    @property
693    def _site_container(self):
694        return getUtility(IExportContainerFinder)()
695
696    # The following is a simple trick. While ExportJobContainers store
697    # only one attribute in ZODB, it is sufficient to replace this
698    # attribute `running_exports` with a suitable manager to make the
699    # whole virtual container work like the original but with the data
700    # stored in the site-wide exports container. This way, virtual
701    # export containers provide the whole functionality of a regular
702    # exports container but store no data at all with themselves.
703    @property
704    def running_exports(self):
705        """Exports stored in the site-wide exports container.
706        """
707        return self._site_container.running_exports
708
709    @running_exports.setter
710    def running_exports(self, value):
711        self._site_container.running_exports = value
712
713    @running_exports.deleter
714    def running_exports(self):
715        del self._site_container.running_exports
716
717
718@implementer(IExportContainerFinder)
719class ExportContainerFinder(grok.GlobalUtility):
720    """Finder for local (site-wide) export container.
721    """
722
723    def __call__(self):
724        """Get the local export container-
725
726        If no site can be determined or the site provides no export
727        container, None is returned.
728        """
729        site = grok.getSite()
730        if site is None:
731            return None
732        return site.get('datacenter', None)
Note: See TracBrowser for help on using the repository browser.