source: main/waeup.kofa/trunk/src/waeup/kofa/utils/batching.py @ 9736

Last change on this file since 9736 was 9734, checked in by uli, 12 years ago

Try to make clearer where we head to regarding filtered exports.

  • Property svn:keywords set to Id
File size: 26.4 KB
Line 
1## $Id: batching.py 9734 2012-11-28 01:14:33Z uli $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
18"""Kofa components for batch processing.
19
20Batch processors eat CSV files to add, update or remove large numbers
21of certain kinds of objects at once.
22"""
23import grok
24import csv
25import datetime
26import os
27import shutil
28import tempfile
29import time
30from cStringIO import StringIO
31from persistent.list import PersistentList
32from zope.component import createObject, getUtility
33from zope.component.hooks import setSite
34from zope.interface import Interface, implementer
35from zope.schema import getFields
36from zope.event import notify
37from waeup.kofa.async import AsyncJob
38from waeup.kofa.interfaces import (
39    IBatchProcessor, FatalCSVError, IObjectConverter, IJobManager,
40    ICSVExporter, IGNORE_MARKER, DuplicationError, JOB_STATUS_MAP,
41    IExportJobContainer, IExportJob, IExportContainerFinder)
42
43class BatchProcessor(grok.GlobalUtility):
44    """A processor to add, update, or remove data.
45
46    This is a non-active baseclass.
47    """
48    grok.implements(IBatchProcessor)
49    grok.context(Interface)
50    grok.baseclass()
51
52    # Name used in pages and forms...
53    name = u'Non-registered base processor'
54
55    # Internal name...
56    util_name = 'baseprocessor'
57
58    # Items for this processor need an interface with zope.schema fields.
59    iface = Interface
60
61    # The name must be the same as the util_name attribute in order to
62    # register this utility correctly.
63    grok.name(util_name)
64
65    # Headers needed to locate items...
66    location_fields = ['code', 'faculty_code']
67
68    # A factory with this name must be registered...
69    factory_name = 'waeup.Department'
70
71    @property
72    def required_fields(self):
73        """Required fields that have no default.
74
75        A list of names of field, whose value cannot be set if not
76        given during creation. Therefore these fields must exist in
77        input.
78
79        Fields with a default != missing_value do not belong to this
80        category.
81        """
82        result = []
83        for key, field in getFields(self.iface).items():
84            if key in self.location_fields:
85                continue
86            if field.default is not field.missing_value:
87                continue
88            if field.required:
89                result.append(key)
90        return result
91
92    @property
93    def req(self):
94        result = dict(
95            create = self.location_fields + self.required_fields,
96            update = self.location_fields,
97            remove = self.location_fields,
98        )
99        return result
100
101    @property
102    def available_fields(self):
103        return sorted(list(set(
104                    self.location_fields + getFields(self.iface).keys())))
105
106    def getHeaders(self, mode='create'):
107        return self.available_fields
108
109    def checkHeaders(self, headerfields, mode='create'):
110        req = self.req[mode]
111        # Check for required fields...
112        for field in req:
113            if not field in headerfields:
114                raise FatalCSVError(
115                    "Need at least columns %s for import!" %
116                    ', '.join(["'%s'" % x for x in req]))
117        # Check for double fields. Cannot happen because this error is
118        # already catched in views
119        not_ignored_fields = [x for x in headerfields
120                              if not x.startswith('--')]
121        if len(set(not_ignored_fields)) < len(not_ignored_fields):
122            raise FatalCSVError(
123                "Double headers: each column name may only appear once.")
124        return True
125
126    def applyMapping(self, row, mapping):
127        """Apply mapping to a row of CSV data.
128
129        """
130        result = dict()
131        for key, replacement in mapping.items():
132            if replacement == u'--IGNORE--':
133                # Skip ignored columns in failed and finished data files.
134                continue
135            result[replacement] = row[key]
136        return result
137
138    def getMapping(self, path, headerfields, mode):
139        """Get a mapping from CSV file headerfields to actually used fieldnames.
140
141        """
142        result = dict()
143        reader = csv.reader(open(path, 'rb'))
144        raw_header = reader.next()
145        for num, field in enumerate(headerfields):
146            if field not in self.location_fields and mode == 'remove':
147                # Skip non-location fields when removing.
148                continue
149            if field == u'--IGNORE--':
150                # Skip ignored columns in failed and finished data files.
151                continue
152            result[raw_header[num]] = field
153        return result
154
155    def stringFromErrs(self, errors, inv_errors):
156        result = []
157        for err in errors:
158            fieldname, message = err
159            result.append("%s: %s" % (fieldname, message))
160        for err in inv_errors:
161            result.append("invariant: %s" % err)
162        return '; '.join(result)
163
164    def callFactory(self, *args, **kw):
165        return createObject(self.factory_name)
166
167    def parentsExist(self, row, site):
168        """Tell whether the parent object for data in ``row`` exists.
169        """
170        raise NotImplementedError('method not implemented')
171
172    def entryExists(self, row, site):
173        """Tell whether there already exists an entry for ``row`` data.
174        """
175        raise NotImplementedError('method not implemented')
176
177    def getParent(self, row, site):
178        """Get the parent object for the entry in ``row``.
179        """
180        raise NotImplementedError('method not implemented')
181
182    def getEntry(self, row, site):
183        """Get the parent object for the entry in ``row``.
184        """
185        raise NotImplementedError('method not implemented')
186
187    def addEntry(self, obj, row, site):
188        """Add the entry given given by ``row`` data.
189        """
190        raise NotImplementedError('method not implemented')
191
192    def delEntry(self, row, site):
193        """Delete entry given by ``row`` data.
194        """
195        raise NotImplementedError('method not implemented')
196
197    def checkUpdateRequirements(self, obj, row, site):
198        """Checks requirements the object must fulfill when being updated.
199
200        This method is not used in case of deleting or adding objects.
201
202        Returns error messages as strings in case of requirement
203        problems.
204        """
205        return None
206
207    def updateEntry(self, obj, row, site, filename):
208        """Update obj to the values given in row.
209
210        Returns a string describing the fields changed.
211        """
212        changed = []
213        for key, value in row.items():
214            # Skip fields to be ignored.
215            if value == IGNORE_MARKER:
216                continue
217            # Skip fields not declared in interface and which are
218            # not yet attributes of existing objects. We can thus not
219            # add non-existing attributes here.
220            if not hasattr(obj, key):
221                continue
222            try:
223                setattr(obj, key, value)
224            except AttributeError:
225                # Computed attributes can't be set.
226                continue
227            log_value = getattr(value, 'code', value)
228            changed.append('%s=%s' % (key, log_value))
229
230        # If any catalog is involved it must be updated.
231        #
232        # XXX: The event is also triggered when creating objects as
233        # updateEntry is called also when creating entries resulting
234        # in objectAdded and additional objectModified events.
235        if len(changed):
236            notify(grok.ObjectModifiedEvent(obj))
237
238        return ', '.join(changed)
239
240    def createLogfile(self, path, fail_path, num, warnings, mode, user,
241                      timedelta, logger=None):
242        """Write to log file.
243        """
244        if logger is None:
245            return
246        status = 'OK'
247        if warnings > 0:
248            status = 'FAILED'
249        logger.info("-" * 20)
250        logger.info("%s: Batch processing finished: %s" % (user, status))
251        logger.info("%s: Source: %s" % (user, path))
252        logger.info("%s: Mode: %s" % (user, mode))
253        logger.info("%s: User: %s" % (user, user))
254        if warnings > 0:
255            logger.info("%s: Failed datasets: %s" % (
256                    user, os.path.basename(fail_path)))
257        logger.info("%s: Processing time: %0.3f s (%0.4f s/item)" % (
258                user, timedelta, timedelta/(num or 1)))
259        logger.info("%s: Processed: %s lines (%s successful/ %s failed)" % (
260                user, num, num - warnings, warnings
261                ))
262        logger.info("-" * 20)
263        return
264
265    def writeFailedRow(self, writer, row, warnings):
266        """Write a row with error messages to error CSV.
267
268        If warnings is a list of strings, they will be concatenated.
269        """
270        error_col = warnings
271        if isinstance(warnings, list):
272            error_col = ' / '.join(warnings)
273        row['--ERRORS--'] = error_col
274        writer.writerow(row)
275        return
276
277    def checkConversion(self, row, mode='ignore', ignore_empty=True):
278        """Validates all values in row.
279        """
280        converter = IObjectConverter(self.iface)
281        errs, inv_errs, conv_dict =  converter.fromStringDict(
282            row, self.factory_name, mode=mode)
283        return errs, inv_errs, conv_dict
284
285    def doImport(self, path, headerfields, mode='create', user='Unknown',
286                 logger=None, ignore_empty=True):
287        """Perform actual import.
288        """
289        time_start = time.time()
290        self.checkHeaders(headerfields, mode)
291        mapping = self.getMapping(path, headerfields, mode)
292        reader = csv.DictReader(open(path, 'rb'))
293
294        temp_dir = tempfile.mkdtemp()
295
296        base = os.path.basename(path)
297        (base, ext) = os.path.splitext(base)
298        failed_path = os.path.join(temp_dir, "%s.pending%s" % (base, ext))
299        failed_headers = mapping.values()
300        failed_headers.append('--ERRORS--')
301        failed_writer = csv.DictWriter(open(failed_path, 'wb'),
302                                       failed_headers)
303        os.chmod(failed_path, 0664)
304        failed_writer.writerow(dict([(x,x) for x in failed_headers]))
305
306        finished_path = os.path.join(temp_dir, "%s.finished%s" % (base, ext))
307        finished_headers = mapping.values()
308        finished_writer = csv.DictWriter(open(finished_path, 'wb'),
309                                         finished_headers)
310        os.chmod(finished_path, 0664)
311        finished_writer.writerow(dict([(x,x) for x in finished_headers]))
312
313        num =0
314        num_warns = 0
315        site = grok.getSite()
316
317        for raw_row in reader:
318            num += 1
319            string_row = self.applyMapping(raw_row, mapping)
320            if ignore_empty and mode in ('update',):
321                # replace empty strings with ignore-markers
322                for key, val in string_row.items():
323                    if val == '':
324                        string_row[key] = IGNORE_MARKER
325            row = dict(string_row.items()) # create deep copy
326            errs, inv_errs, conv_dict = self.checkConversion(string_row, mode)
327            if errs or inv_errs:
328                num_warns += 1
329                conv_warnings = self.stringFromErrs(errs, inv_errs)
330                self.writeFailedRow(
331                    failed_writer, string_row, conv_warnings)
332                continue
333            row.update(conv_dict)
334
335            if mode == 'create':
336                if not self.parentsExist(row, site):
337                    num_warns += 1
338                    self.writeFailedRow(
339                        failed_writer, string_row,
340                        "Not all parents do exist yet. Skipping")
341                    continue
342                if self.entryExists(row, site):
343                    num_warns += 1
344                    self.writeFailedRow(
345                        failed_writer, string_row,
346                        "This object already exists. Skipping.")
347                    continue
348                obj = self.callFactory()
349                # Override all values in row, also
350                # student_ids and applicant_ids which have been
351                # generated in the respective __init__ methods before.
352                self.updateEntry(obj, row, site, base)
353                try:
354                    self.addEntry(obj, row, site)
355                except KeyError, error:
356                    num_warns += 1
357                    self.writeFailedRow(
358                        failed_writer, string_row,
359                        "%s Skipping." % error.message)
360                    continue
361                except DuplicationError, error:
362                    num_warns += 1
363                    self.writeFailedRow(
364                        failed_writer, string_row,
365                        "%s Skipping." % error.msg)
366                    continue
367            elif mode == 'remove':
368                if not self.entryExists(row, site):
369                    num_warns += 1
370                    self.writeFailedRow(
371                        failed_writer, string_row,
372                        "Cannot remove: no such entry")
373                    continue
374                self.delEntry(row, site)
375            elif mode == 'update':
376                obj = self.getEntry(row, site)
377                if obj is None:
378                    num_warns += 1
379                    self.writeFailedRow(
380                        failed_writer, string_row,
381                        "Cannot update: no such entry")
382                    continue
383                update_errors = self.checkUpdateRequirements(obj, row, site)
384                if update_errors is not None:
385                    num_warns += 1
386                    self.writeFailedRow(
387                        failed_writer, string_row, update_errors)
388                    continue
389                self.updateEntry(obj, row, site, base)
390            finished_writer.writerow(string_row)
391
392        time_end = time.time()
393        timedelta = time_end - time_start
394
395        self.createLogfile(path, failed_path, num, num_warns, mode, user,
396                           timedelta, logger=logger)
397        failed_path = os.path.abspath(failed_path)
398        if num_warns == 0:
399            del failed_writer
400            os.unlink(failed_path)
401            failed_path = None
402        return (num, num_warns,
403                os.path.abspath(finished_path), failed_path)
404
405    def get_csv_skeleton(self):
406        """Export CSV file only with a header of available fields.
407
408        A raw string with CSV data should be returned.
409        """
410        outfile = StringIO()
411        writer = csv.DictWriter(outfile, self.available_fields)
412        writer.writerow(
413            dict(zip(self.available_fields, self.available_fields))) # header
414        outfile.seek(0)
415        return outfile.read()
416
417class ExporterBase(object):
418    """A base for exporters.
419    """
420    grok.implements(ICSVExporter)
421
422    #: Fieldnames considered by this exporter
423    fields = ('code', 'title', 'title_prefix')
424
425    #: The title under which this exporter will be displayed
426    #: (if registered as a utility)
427    title = 'Override this title'
428
429    def mangle_value(self, value, name, context=None):
430        """Hook for mangling values in derived classes
431        """
432        if isinstance(value, bool):
433            value = value and '1' or '0'
434        elif isinstance(value, unicode):
435            # CSV writers like byte streams better than unicode
436            value = value.encode('utf-8')
437        elif isinstance(value, datetime.datetime):
438            value = str(value)
439        elif isinstance(value, datetime.date):
440            # Order is important here: check for date after datetime as
441            # datetimes are also dates.
442            #
443            # Append hash '#' to dates to circumvent unwanted excel automatic
444            value = str('%s#' % value)
445        elif value is None:
446            # None is not really representable in CSV files
447            value = ''
448        return value
449
450    def get_csv_writer(self, filepath=None):
451        """Get a CSV dict writer instance open for writing.
452
453        Returns a tuple (<writer>, <outfile>) where ``<writer>`` is a
454        :class:`csv.DictWriter` instance and outfile is the real file
455        which is written to. The latter is important when writing to
456        StringIO and can normally be ignored otherwise.
457
458        The returned file will already be filled with the header row.
459
460        Please note that if you give a filepath, the returned outfile
461        is open for writing only and you might have to close it before
462        reopening it for reading.
463        """
464        if filepath is None:
465            outfile = StringIO()
466        else:
467            outfile = open(filepath, 'wb')
468        writer = csv.DictWriter(outfile, self.fields)
469        writer.writerow(dict(zip(self.fields, self.fields))) # header
470        return writer, outfile
471
472    def write_item(self, obj, writer):
473        """Write a row extracted from `obj` into CSV file using `writer`.
474        """
475        row = {}
476        for name in self.fields:
477            value = getattr(obj, name, None)
478            value = self.mangle_value(value, name, obj)
479            row[name] = value
480        writer.writerow(row)
481        return
482
483    def close_outfile(self, filepath, outfile):
484        """Close outfile.
485
486        If filepath is None, the contents of outfile is returned.
487        """
488        outfile.seek(0)
489        if filepath is None:
490            return outfile.read()
491        outfile.close()
492        return
493
494    def export(self, iterable, filepath=None):
495        """Export `iterable` as CSV file.
496
497        If `filepath` is ``None``, a raw string with CSV data should
498        be returned.
499        """
500        raise NotImplementedError
501
502    def export_all(self, site, filepath=None):
503        """Export all appropriate objects in `site` into `filepath` as
504        CSV data.
505
506        If `filepath` is ``None``, a raw string with CSV data should
507        be returned.
508        """
509        raise NotImplementedError
510
511
512def export_job(site, exporter_name):
513    """Export all entries delivered by exporter and store it in a temp file.
514
515    `site` gives the site to search. It will be passed to the exporter
516    and also be set as 'current site' as the function is used in
517    asynchronous jobs which run in their own threads and have no site
518    set initially. Therefore `site` must also be a valid value for use
519    with `zope.component.hooks.setSite()`.
520
521    `exporter_name` is the utility name under which the desired
522    exporter was registered with the ZCA.
523
524    The resulting CSV file will be stored in a new temporary directory
525    (using :func:`tempfile.mkdtemp`). It will be named after the
526    exporter used with `.csv` filename extension.
527
528    Returns the path to the created CSV file.
529
530    .. note:: It is the callers responsibility to clean up the used
531              file and its parent directory.
532    """
533    setSite(site)
534    exporter = getUtility(ICSVExporter, name=exporter_name)
535    output_dir = tempfile.mkdtemp()
536    filename = '%s.csv' % exporter_name
537    output_path = os.path.join(output_dir, filename)
538    exporter.export_all(site, filepath=output_path)
539    return output_path
540
541class AsyncExportJob(AsyncJob):
542    """An IJob that exports data to CSV files.
543
544    `AsyncExportJob` instances are regular `AsyncJob` instances with a
545    different constructor API. Instead of a callable to execute, you
546    must pass a `site` and some `exporter_name` to trigger an export.
547
548    The real work is done when an instance of this class is put into a
549    queue. See :mod:`waeup.kofa.async` to learn more about
550    asynchronous jobs.
551
552    The `exporter_name` must be the name under which an ICSVExporter
553    utility was registered with the ZCA.
554
555    The `site` must be a valid site  or ``None``.
556
557    The result of an `AsyncExportJob` is the path to generated CSV
558    file. The file will reside in a temporary directory that should be
559    removed after being used.
560    """
561    grok.implements(IExportJob)
562
563    def __init__(self, site, exporter_name, *args, **kwargs):
564        super(AsyncExportJob, self).__init__(
565            export_job, site, exporter_name, *args, **kwargs)
566
567class ExportJobContainer(object):
568    """A mix-in that provides functionality for asynchronous export jobs.
569    """
570    grok.implements(IExportJobContainer)
571    running_exports = PersistentList()
572
573    def start_export_job(self, exporter_name, user_id, *args, **kwargs):
574        """Start asynchronous export job.
575
576        `exporter_name` is the name of an exporter utility to be used.
577
578        `user_id` is the ID of the user that triggers the export.
579
580        The job_id is stored along with exporter name and user id in a
581        persistent list.
582
583        The method supports additional positional and keyword
584        arguments, which are passed as-is to the respective
585        :class:`AsyncExportJob`.
586
587        Returns the job ID of the job started.
588        """
589        site = grok.getSite()
590        manager = getUtility(IJobManager)
591        job = AsyncExportJob(site, exporter_name, *args, **kwargs)
592        job_id = manager.put(job)
593        # Make sure that the persisted list is stored in ZODB
594        self.running_exports = PersistentList(self.running_exports)
595        self.running_exports.append((job_id, exporter_name, user_id))
596        return job_id
597
598    def get_running_export_jobs(self, user_id=None):
599        """Get export jobs for user with `user_id` as list of tuples.
600
601        Each tuples holds ``<job_id>, <exporter_name>, <user_id>`` in
602        that order. The ``<exporter_name>`` is the utility name of the
603        used exporter.
604
605        If `user_id` is ``None``, all running jobs are returned.
606        """
607        entries = []
608        to_delete = []
609        manager = getUtility(IJobManager)
610        for entry in self.running_exports:
611            if user_id is not None and entry[2] != user_id:
612                continue
613            if manager.get(entry[0]) is None:
614                to_delete.append(entry)
615                continue
616            entries.append(entry)
617        if to_delete:
618            self.running_exports = PersistentList(
619                [x for x in self.running_exports if x not in to_delete])
620        return entries
621
622    def get_export_jobs_status(self, user_id=None):
623        """Get running/completed export jobs for `user_id` as list of tuples.
624
625        Each tuple holds ``<raw status>, <status translated>,
626        <exporter title>`` in that order, where ``<status
627        translated>`` and ``<exporter title>`` are translated strings
628        representing the status of the job and the human readable
629        title of the exporter used.
630        """
631        entries = self.get_running_export_jobs(user_id)
632        result = []
633        manager = getUtility(IJobManager)
634        for entry in entries:
635            job = manager.get(entry[0])
636            if job is None:
637                continue
638            status, status_translated = JOB_STATUS_MAP[job.status]
639            exporter_name = getUtility(ICSVExporter, name=entry[1]).title
640            result.append((status, status_translated, exporter_name))
641        return result
642
643    def delete_export_entry(self, entry):
644        """Delete the export denoted by `entry`.
645
646        Removes given entry from the local `running_exports` list and also
647        removes the regarding job via the local job manager.
648
649        `entry` must be a tuple ``(<job id>, <exporter name>, <user
650        id>)`` as created by :meth:`start_export_job` or returned by
651        :meth:`get_running_export_jobs`.
652        """
653        manager = getUtility(IJobManager)
654        job = manager.get(entry[0])
655        if job is not None:
656            # remove created export file
657            if isinstance(job.result, basestring):
658                if os.path.exists(os.path.dirname(job.result)):
659                    shutil.rmtree(os.path.dirname(job.result))
660        manager.remove(entry[0], self)
661        new_entries = [x for x in self.running_exports
662                       if x != entry]
663        self.running_exports = PersistentList(new_entries)
664        return
665
666    def entry_from_job_id(self, job_id):
667        """Get entry tuple for `job_id`.
668
669        Returns ``None`` if no such entry can be found.
670        """
671        for entry in self.running_exports:
672            if entry[0] == job_id:
673                return entry
674        return None
675
676class VirtualExportJobContainer(ExportJobContainer):
677    """A virtual export job container.
678
679    Virtual ExportJobContainers can be used as a mixin just like real
680    ExportJobContainer.
681
682    They retrieve and store data in the site-wide ExportJobContainer.
683
684    Functionality is currently entirely as for regular
685    ExportJobContainers, except that data is stored elsewhere.
686
687    VirtualExportJobContainers need a registered
688    IExportContainerFinder utility to find a suitable container for
689    storing data.
690    """
691    grok.implements(IExportJobContainer)
692
693    @property
694    def _site_container(self):
695        return getUtility(IExportContainerFinder)()
696
697    # The following is a simple trick. While ExportJobContainers store
698    # only one attribute in ZODB, it is sufficient to replace this
699    # attribute `running_exports` with a suitable manager to make the
700    # whole virtual container work like the original but with the data
701    # stored in the site-wide exports container. This way, virtual
702    # export containers provide the whole functionality of a regular
703    # exports container but store no data at all with themselves.
704    @property
705    def running_exports(self):
706        """Exports stored in the site-wide exports container.
707        """
708        return self._site_container.running_exports
709
710    @running_exports.setter
711    def running_exports(self, value):
712        self._site_container.running_exports = value
713
714    @running_exports.deleter
715    def running_exports(self):
716        del self._site_container.running_exports
717
718
719@implementer(IExportContainerFinder)
720class ExportContainerFinder(grok.GlobalUtility):
721    """Finder for local (site-wide) export container.
722    """
723
724    def __call__(self):
725        """Get the local export container-
726
727        If no site can be determined or the site provides no export
728        container, None is returned.
729        """
730        site = grok.getSite()
731        if site is None:
732            return None
733        return site.get('datacenter', None)
Note: See TracBrowser for help on using the repository browser.