source: main/waeup.kofa/branches/uli-zc-async/src/waeup/kofa/utils/batching.py @ 9091

Last change on this file since 9091 was 9091, checked in by uli, 12 years ago

Add docs.

  • Property svn:keywords set to Id
File size: 23.1 KB
RevLine 
[7196]1## $Id: batching.py 9091 2012-08-08 14:07:26Z uli $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
[7819]18"""Kofa components for batch processing.
[4806]19
20Batch processors eat CSV files to add, update or remove large numbers
21of certain kinds of objects at once.
22"""
23import grok
24import csv
[8380]25import datetime
[4821]26import os
[9089]27import shutil
[4900]28import tempfile
[4821]29import time
[7859]30from cStringIO import StringIO
[9089]31from persistent.list import PersistentList
32from zope.component import createObject, getUtility
33from zope.component.hooks import setSite
[4806]34from zope.interface import Interface
35from zope.schema import getFields
[8332]36from zope.event import notify
[9089]37from waeup.kofa.async import AsyncJob
[7811]38from waeup.kofa.interfaces import (
[9089]39    IBatchProcessor, FatalCSVError, IObjectConverter, IJobManager,
40    ICSVExporter, IGNORE_MARKER, DuplicationError, JOB_STATUS_MAP)
[4806]41
42class BatchProcessor(grok.GlobalUtility):
43    """A processor to add, update, or remove data.
44
45    This is a non-active baseclass.
46    """
[8220]47    grok.implements(IBatchProcessor)
[4806]48    grok.context(Interface)
49    grok.baseclass()
50
51    # Name used in pages and forms...
[7933]52    name = u'Non-registered base processor'
[6259]53
[4806]54    # Internal name...
[7933]55    util_name = 'baseprocessor'
[6259]56
[4806]57    # Items for this processor need an interface with zope.schema fields.
[5009]58    iface = Interface
[6259]59
[4806]60    # The name must be the same as the util_name attribute in order to
61    # register this utility correctly.
62    grok.name(util_name)
63
64    # Headers needed to locate items...
65    location_fields = ['code', 'faculty_code']
[6259]66
[4806]67    # A factory with this name must be registered...
68    factory_name = 'waeup.Department'
69
70    @property
71    def required_fields(self):
[4829]72        """Required fields that have no default.
73
74        A list of names of field, whose value cannot be set if not
75        given during creation. Therefore these fields must exist in
76        input.
77
78        Fields with a default != missing_value do not belong to this
79        category.
80        """
[4806]81        result = []
82        for key, field in getFields(self.iface).items():
83            if key in self.location_fields:
84                continue
[4829]85            if field.default is not field.missing_value:
86                continue
[4806]87            if field.required:
88                result.append(key)
89        return result
[6259]90
[4806]91    @property
92    def req(self):
93        result = dict(
94            create = self.location_fields + self.required_fields,
95            update = self.location_fields,
96            remove = self.location_fields,
97        )
98        return result
99
100    @property
101    def available_fields(self):
102        return sorted(list(set(
103                    self.location_fields + getFields(self.iface).keys())))
[6259]104
[4806]105    def getHeaders(self, mode='create'):
106        return self.available_fields
107
108    def checkHeaders(self, headerfields, mode='create'):
109        req = self.req[mode]
110        # Check for required fields...
111        for field in req:
112            if not field in headerfields:
113                raise FatalCSVError(
114                    "Need at least columns %s for import!" %
115                    ', '.join(["'%s'" % x for x in req]))
[6828]116        # Check for double fields. Cannot happen because this error is
117        # already catched in views
[4806]118        not_ignored_fields = [x for x in headerfields
119                              if not x.startswith('--')]
120        if len(set(not_ignored_fields)) < len(not_ignored_fields):
121            raise FatalCSVError(
122                "Double headers: each column name may only appear once.")
123        return True
124
125    def applyMapping(self, row, mapping):
[4811]126        """Apply mapping to a row of CSV data.
[6824]127
[4811]128        """
[4806]129        result = dict()
130        for key, replacement in mapping.items():
[6824]131            if replacement == u'--IGNORE--':
132                # Skip ignored columns in failed and finished data files.
133                continue
[4806]134            result[replacement] = row[key]
135        return result
[6259]136
[4832]137    def getMapping(self, path, headerfields, mode):
[6824]138        """Get a mapping from CSV file headerfields to actually used fieldnames.
139
[4811]140        """
[4832]141        result = dict()
[4806]142        reader = csv.reader(open(path, 'rb'))
143        raw_header = reader.next()
[4832]144        for num, field in enumerate(headerfields):
145            if field not in self.location_fields and mode == 'remove':
[6824]146                # Skip non-location fields when removing.
147                continue
148            if field == u'--IGNORE--':
149                # Skip ignored columns in failed and finished data files.
150                continue
[4832]151            result[raw_header[num]] = field
152        return result
[4806]153
[6273]154    def stringFromErrs(self, errors, inv_errors):
155        result = []
156        for err in errors:
157            fieldname, message = err
158            result.append("%s: %s" % (fieldname, message))
159        for err in inv_errors:
160            result.append("invariant: %s" % err)
161        return '; '.join(result)
162
[4806]163    def callFactory(self, *args, **kw):
164        return createObject(self.factory_name)
165
166    def parentsExist(self, row, site):
[4811]167        """Tell whether the parent object for data in ``row`` exists.
168        """
[4806]169        raise NotImplementedError('method not implemented')
170
171    def entryExists(self, row, site):
[4811]172        """Tell whether there already exists an entry for ``row`` data.
173        """
[4806]174        raise NotImplementedError('method not implemented')
175
176    def getParent(self, row, site):
[4811]177        """Get the parent object for the entry in ``row``.
178        """
[4806]179        raise NotImplementedError('method not implemented')
[6259]180
[5009]181    def getEntry(self, row, site):
182        """Get the parent object for the entry in ``row``.
183        """
184        raise NotImplementedError('method not implemented')
[6259]185
[4806]186    def addEntry(self, obj, row, site):
[4811]187        """Add the entry given given by ``row`` data.
188        """
[4806]189        raise NotImplementedError('method not implemented')
190
191    def delEntry(self, row, site):
[4811]192        """Delete entry given by ``row`` data.
193        """
[6259]194        raise NotImplementedError('method not implemented')
[4806]195
[7950]196    def checkUpdateRequirements(self, obj, row, site):
197        """Checks requirements the object must fulfill when being updated.
[7938]198
199        This method is not used in case of deleting or adding objects.
200
[7950]201        Returns error messages as strings in case of requirement
[7938]202        problems.
[7937]203        """
[7938]204        return None
[7937]205
[4806]206    def updateEntry(self, obj, row, site):
[4984]207        """Update obj to the values given in row.
[8220]208
209        Returns a string describing the fields changed.
[4984]210        """
[8220]211        changed = []
[4829]212        for key, value in row.items():
[8220]213            # Skip fields to be ignored.
214            if value == IGNORE_MARKER:
215                continue
[8304]216            # Skip fields not declared in interface and which are
217            # not yet attributes of existing objects. We can thus not
218            # add non-existing attributes here.
[8220]219            if not hasattr(obj, key):
220                continue
221            setattr(obj, key, value)
[8222]222            log_value = getattr(value, 'code', value)
223            changed.append('%s=%s' % (key, log_value))
[8332]224
[8333]225        # If any catalog is involved it must be updated.
226        #
227        # XXX: The event is also triggered when creating objects as
228        # updateEntry is called also when creating entries resulting
229        # in objectAdded and additional objectModified events.
230        if len(changed):
231            notify(grok.ObjectModifiedEvent(obj))
[8332]232
[8220]233        return ', '.join(changed)
[4821]234
[4832]235    def createLogfile(self, path, fail_path, num, warnings, mode, user,
[4885]236                      timedelta, logger=None):
237        """Write to log file.
[4821]238        """
[4885]239        if logger is None:
240            return
241        status = 'OK'
242        if warnings > 0:
243            status = 'FAILED'
244        logger.info("-" * 20)
245        logger.info("%s: Batch processing finished: %s" % (user, status))
246        logger.info("%s: Source: %s" % (user, path))
247        logger.info("%s: Mode: %s" % (user, mode))
248        logger.info("%s: User: %s" % (user, user))
249        if warnings > 0:
[4900]250            logger.info("%s: Failed datasets: %s" % (
251                    user, os.path.basename(fail_path)))
[4885]252        logger.info("%s: Processing time: %0.3f s (%0.4f s/item)" % (
253                user, timedelta, timedelta/(num or 1)))
254        logger.info("%s: Processed: %s lines (%s successful/ %s failed)" % (
255                user, num, num - warnings, warnings
[4821]256                ))
[4885]257        logger.info("-" * 20)
[4821]258        return
[4877]259
260    def writeFailedRow(self, writer, row, warnings):
261        """Write a row with error messages to error CSV.
262
263        If warnings is a list of strings, they will be concatenated.
264        """
265        error_col = warnings
266        if isinstance(warnings, list):
267            error_col = ' / '.join(warnings)
268        row['--ERRORS--'] = error_col
269        writer.writerow(row)
270        return
[6259]271
[8220]272    def checkConversion(self, row, mode='ignore', ignore_empty=True):
[6847]273        """Validates all values in row.
274        """
275        converter = IObjectConverter(self.iface)
276        errs, inv_errs, conv_dict =  converter.fromStringDict(
[8220]277            row, self.factory_name, mode=mode)
[6847]278        return errs, inv_errs, conv_dict
279
[4885]280    def doImport(self, path, headerfields, mode='create', user='Unknown',
[8220]281                 logger=None, ignore_empty=True):
[4811]282        """Perform actual import.
283        """
[4832]284        time_start = time.time()
[4806]285        self.checkHeaders(headerfields, mode)
[4832]286        mapping = self.getMapping(path, headerfields, mode)
[4806]287        reader = csv.DictReader(open(path, 'rb'))
[4889]288
[4900]289        temp_dir = tempfile.mkdtemp()
[6259]290
[6273]291        base = os.path.basename(path)
292        (base, ext) = os.path.splitext(base)
[4900]293        failed_path = os.path.join(temp_dir, "%s.pending%s" % (base, ext))
[6831]294        failed_headers = mapping.values()
[4877]295        failed_headers.append('--ERRORS--')
[4821]296        failed_writer = csv.DictWriter(open(failed_path, 'wb'),
297                                       failed_headers)
[8573]298        os.chmod(failed_path, 0664)
[6831]299        failed_writer.writerow(dict([(x,x) for x in failed_headers]))
[4891]300
[4900]301        finished_path = os.path.join(temp_dir, "%s.finished%s" % (base, ext))
[6831]302        finished_headers = mapping.values()
[4891]303        finished_writer = csv.DictWriter(open(finished_path, 'wb'),
304                                         finished_headers)
305        finished_writer.writerow(dict([(x,x) for x in finished_headers]))
[6259]306
[4806]307        num =0
[4878]308        num_warns = 0
[4806]309        site = grok.getSite()
[7859]310
[4806]311        for raw_row in reader:
312            num += 1
313            string_row = self.applyMapping(raw_row, mapping)
[8222]314            if ignore_empty and mode in ('update',):
[8220]315                # replace empty strings with ignore-markers
[8222]316                for key, val in string_row.items():
[8220]317                    if val == '':
[8222]318                        string_row[key] = IGNORE_MARKER
319            row = dict(string_row.items()) # create deep copy
[6847]320            errs, inv_errs, conv_dict = self.checkConversion(string_row, mode)
[6273]321            if errs or inv_errs:
[4878]322                num_warns += 1
[6273]323                conv_warnings = self.stringFromErrs(errs, inv_errs)
324                self.writeFailedRow(
[6824]325                    failed_writer, string_row, conv_warnings)
[4821]326                continue
[6273]327            row.update(conv_dict)
[6259]328
[4806]329            if mode == 'create':
330                if not self.parentsExist(row, site):
[4878]331                    num_warns += 1
[4877]332                    self.writeFailedRow(
[6824]333                        failed_writer, string_row,
[4877]334                        "Not all parents do exist yet. Skipping")
[4806]335                    continue
336                if self.entryExists(row, site):
[4878]337                    num_warns += 1
[4877]338                    self.writeFailedRow(
[6824]339                        failed_writer, string_row,
[8330]340                        "This object already exists. Skipping.")
[4806]341                    continue
342                obj = self.callFactory()
[7273]343                # Override all values in row, also
344                # student_ids and applicant_ids which have been
345                # generated in the respective __init__ methods before.
[7649]346                self.updateEntry(obj, row, site)
[6243]347                try:
348                    self.addEntry(obj, row, site)
[6273]349                except KeyError, error:
[6219]350                    num_warns += 1
351                    self.writeFailedRow(
[6824]352                        failed_writer, string_row,
[6273]353                        "%s Skipping." % error.message)
[8540]354                    continue
[8509]355                except DuplicationError, error:
356                    num_warns += 1
357                    self.writeFailedRow(
358                        failed_writer, string_row,
359                        "%s Skipping." % error.msg)
[6219]360                    continue
[4806]361            elif mode == 'remove':
362                if not self.entryExists(row, site):
[4878]363                    num_warns += 1
[4877]364                    self.writeFailedRow(
[6824]365                        failed_writer, string_row,
[4877]366                        "Cannot remove: no such entry.")
[4806]367                    continue
368                self.delEntry(row, site)
369            elif mode == 'update':
370                obj = self.getEntry(row, site)
371                if obj is None:
[4878]372                    num_warns += 1
[4877]373                    self.writeFailedRow(
[6824]374                        failed_writer, string_row,
[4877]375                        "Cannot update: no such entry.")
[4806]376                    continue
[7950]377                update_errors = self.checkUpdateRequirements(obj, row, site)
[7938]378                if update_errors is not None:
[7937]379                    num_warns += 1
380                    self.writeFailedRow(
381                        failed_writer, string_row, update_errors)
382                    continue
[4806]383                self.updateEntry(obj, row, site)
[4891]384            finished_writer.writerow(string_row)
[4821]385
[4832]386        time_end = time.time()
387        timedelta = time_end - time_start
[6259]388
[4878]389        self.createLogfile(path, failed_path, num, num_warns, mode, user,
[4885]390                           timedelta, logger=logger)
[4894]391        failed_path = os.path.abspath(failed_path)
[4878]392        if num_warns == 0:
[4821]393            del failed_writer
394            os.unlink(failed_path)
[4894]395            failed_path = None
396        return (num, num_warns,
397                os.path.abspath(finished_path), failed_path)
[7859]398
399class ExporterBase(object):
400    """A base for exporters.
401    """
402    grok.implements(ICSVExporter)
403
404    #: Fieldnames considered by this exporter
405    fields = ('code', 'title', 'title_prefix')
406
[7907]407    #: The title under which this exporter will be displayed
408    #: (if registered as a utility)
409    title = 'Override this title'
410
[7859]411    def mangle_value(self, value, name, context=None):
412        """Hook for mangling values in derived classes
413        """
414        if isinstance(value, bool):
415            value = value and '1' or '0'
416        elif isinstance(value, unicode):
417            # CSV writers like byte streams better than unicode
418            value = value.encode('utf-8')
[8380]419        elif isinstance(value, datetime.datetime):
420            value = str(value)
421        elif isinstance(value, datetime.date):
422            # Order is important here: check for date after datetime as
423            # datetimes are also dates.
424            #
425            # Append hash '#' to dates to circumvent unwanted excel automatic
426            value = str('%s#' % value)
[7859]427        elif value is None:
428            # None is not really representable in CSV files
429            value = ''
430        return value
431
432    def get_csv_writer(self, filepath=None):
433        """Get a CSV dict writer instance open for writing.
434
435        Returns a tuple (<writer>, <outfile>) where ``<writer>`` is a
436        :class:`csv.DictWriter` instance and outfile is the real file
437        which is written to. The latter is important when writing to
438        StringIO and can normally be ignored otherwise.
439
440        The returned file will already be filled with the header row.
441
442        Please note that if you give a filepath, the returned outfile
443        is open for writing only and you might have to close it before
444        reopening it for reading.
445        """
446        if filepath is None:
447            outfile = StringIO()
448        else:
449            outfile = open(filepath, 'wb')
450        writer = csv.DictWriter(outfile, self.fields)
451        writer.writerow(dict(zip(self.fields, self.fields))) # header
452        return writer, outfile
453
454    def write_item(self, obj, writer):
455        """Write a row extracted from `obj` into CSV file using `writer`.
456        """
457        row = {}
458        for name in self.fields:
459            value = getattr(obj, name, None)
460            value = self.mangle_value(value, name, obj)
461            row[name] = value
462        writer.writerow(row)
463        return
464
465    def close_outfile(self, filepath, outfile):
466        """Close outfile.
467
468        If filepath is None, the contents of outfile is returned.
469        """
470        outfile.seek(0)
471        if filepath is None:
472            return outfile.read()
473        outfile.close()
474        return
475
476    def export(self, iterable, filepath=None):
477        """Export `iterable` as CSV file.
478
479        If `filepath` is ``None``, a raw string with CSV data should
480        be returned.
481        """
482        raise NotImplementedError
483
484    def export_all(self, site, filepath=None):
485        """Export all appropriate objects in `site` into `filepath` as
486        CSV data.
487
488        If `filepath` is ``None``, a raw string with CSV data should
489        be returned.
490        """
491        raise NotImplementedError
[9089]492
493
494def export_job(site, exporter_name):
495    """Export all entries delivered by exporter and store it in a temp file.
496
497    `site` gives the site to search. It will be passed to the exporter
498    and also be set as 'current site' as the function is used in
499    asynchronous jobs which run in their own threads and have no site
500    set initially. Therefore `site` must also be a valid value for use
501    with `zope.component.hooks.setSite()`.
502
503    `exporter_name` is the utility name under which the desired
504    exporter was registered with the ZCA.
505
506    The resulting CSV file will be stored in a new temporary directory
507    (using :func:`tempfile.mkdtemp`).
508
509    Returns the path to the created CSV file.
510
511    .. note:: It is the callers responsibility to clean up the used
512              file and its parent directory.
513    """
514    setSite(site)
515    exporter = getUtility(ICSVExporter, name=exporter_name)
516    output_dir = tempfile.mkdtemp()
517    output_path = os.path.join(output_dir, 'export.csv')
518    exporter.export_all(site, filepath=output_path)
519    return output_path
520
521class AsyncExportJob(AsyncJob):
[9091]522    """An IJob that exports data to CSV files.
[9089]523
[9091]524    `AsyncExportJob` instances are regular `AsyncJob` instances with a
525    different constructor API. Instead of a callable to execute, you
526    must pass a `site` and some `exporter_name` to trigger an export.
527
528    The real work is done when an instance of this class is put into a
529    queue. See :mod:`waeup.kofa.async` to learn more about
530    asynchronous jobs.
531
532    The `exporter_name` must be the name under which an ICSVExporter
533    utility was registered with the ZCA.
534
535    The `site` must be a valid site  or ``None``.
536
537    The result of an `AsyncExportJob` is the path to generated CSV
538    file. The file will reside in a temporary directory that should be
539    removed after being used.
540    """
[9089]541    def __init__(self, site, exporter_name):
542        super(AsyncExportJob, self).__init__(
543            export_job, site, exporter_name)
544
545class ExportJobContainer(object):
546    """A mix-in that provides functionality for asynchronous export jobs.
547    """
548    running_exports = PersistentList()
549
550    def start_export_job(self, exporter_name, user_id):
551        """Start asynchronous export job.
552
553        `exporter_name` is the name of an exporter utility to be used.
554
555        `user_id` is the ID of the user that triggers the export.
556
557        The job_id is stored along with exporter name and user id in a
558        persistent list.
559
560        Returns the job ID of the job started.
561        """
562        site = grok.getSite()
563        manager = getUtility(IJobManager)
564        job = AsyncExportJob(site, exporter_name)
565        job_id = manager.put(job)
566        self.running_exports.append((job_id, exporter_name, user_id))
567        # Make sure that the persisted list is stored in ZODB
568        self.running_exports = PersistentList(self.running_exports)
569        return job_id
570
571    def get_running_export_jobs(self, user_id=None):
572        """Get export jobs for user with `user_id` as list of tuples.
573
574        Each tuples holds ``<job_id>, <exporter_name>, <user_id>`` in
575        that order. The ``<exporter_name>`` is the utility name of the
576        used exporter.
577
578        If `user_id` is ``None``, all running jobs are returned.
579        """
580        entries = []
581        to_delete = []
582        manager = getUtility(IJobManager)
583        for entry in self.running_exports:
584            if user_id is not None and entry[2] != user_id:
585                continue
586            if manager.get(entry[0]) is None:
587                to_delete.append(entry)
588                continue
589            entries.append(entry)
590        if to_delete:
591            self.running_exports = PersistentList(
592                [x for x in self.running_exports if x not in to_delete])
593        return entries
594
595    def get_export_jobs_status(self, user_id=None):
596        """Get running/completed export jobs for `user_id` as list of tuples.
597
598        Each tuple holds ``<raw status>, <status translated>,
599        <exporter title>`` in that order, where ``<status
600        translated>`` and ``<exporter title>`` are translated strings
601        representing the status of the job and the human readable
602        title of the exporter used.
603        """
604        entries = self.get_running_export_jobs(user_id)
605        result = []
606        manager = getUtility(IJobManager)
607        for entry in entries:
608            job = manager.get(entry[0])
609            if job is None:
610                continue
611            status, status_translated = JOB_STATUS_MAP[job.status]
612            exporter_name = getUtility(ICSVExporter, name=entry[1]).title
613            result.append((status, status_translated, exporter_name))
614        return result
615
616    def delete_export_entry(self, entry):
617        """Delete the export denoted by `entry`.
618
619        Removes `entry` from the local `running_exports` list and also
620        removes the regarding job via the local job manager.
621
622        `entry` is a tuple ``(<job id>, <exporter name>, <user id>)``
623        as created by :meth:`start_export_job` or returned by
624        :meth:`get_running_export_jobs`.
625        """
626        manager = getUtility(IJobManager)
627        job = manager.get(entry[0])
628        if job is not None:
629            # remove created export file
630            if isinstance(job.result, basestring):
631                if os.path.exists(os.path.dirname(job.result)):
632                    shutil.rmtree(os.path.dirname(job.result))
633        manager.remove(entry[0], self)
634        new_entries = [x for x in self.running_exports
635                       if x != entry]
636        self.running_exports = PersistentList(new_entries)
637        return
Note: See TracBrowser for help on using the repository browser.