source: main/waeup.kofa/trunk/src/waeup/kofa/utils/batching.py @ 9707

Last change on this file since 9707 was 9706, checked in by Henrik Bettermann, 12 years ago

Rework logging of batch processing. Remove redundant text but add name of import file.

  • Property svn:keywords set to Id
File size: 24.2 KB
RevLine 
[7196]1## $Id: batching.py 9706 2012-11-21 22:37:03Z henrik $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
[7819]18"""Kofa components for batch processing.
[4806]19
20Batch processors eat CSV files to add, update or remove large numbers
21of certain kinds of objects at once.
22"""
23import grok
24import csv
[8380]25import datetime
[4821]26import os
[9217]27import shutil
[4900]28import tempfile
[4821]29import time
[7859]30from cStringIO import StringIO
[9217]31from persistent.list import PersistentList
32from zope.component import createObject, getUtility
33from zope.component.hooks import setSite
[4806]34from zope.interface import Interface
35from zope.schema import getFields
[8332]36from zope.event import notify
[9217]37from waeup.kofa.async import AsyncJob
[7811]38from waeup.kofa.interfaces import (
[9217]39    IBatchProcessor, FatalCSVError, IObjectConverter, IJobManager,
40    ICSVExporter, IGNORE_MARKER, DuplicationError, JOB_STATUS_MAP,
41    IExportJobContainer, IExportJob)
[4806]42
43class BatchProcessor(grok.GlobalUtility):
44    """A processor to add, update, or remove data.
45
46    This is a non-active baseclass.
47    """
[8220]48    grok.implements(IBatchProcessor)
[4806]49    grok.context(Interface)
50    grok.baseclass()
51
52    # Name used in pages and forms...
[7933]53    name = u'Non-registered base processor'
[6259]54
[4806]55    # Internal name...
[7933]56    util_name = 'baseprocessor'
[6259]57
[4806]58    # Items for this processor need an interface with zope.schema fields.
[5009]59    iface = Interface
[6259]60
[4806]61    # The name must be the same as the util_name attribute in order to
62    # register this utility correctly.
63    grok.name(util_name)
64
65    # Headers needed to locate items...
66    location_fields = ['code', 'faculty_code']
[6259]67
[4806]68    # A factory with this name must be registered...
69    factory_name = 'waeup.Department'
70
71    @property
72    def required_fields(self):
[4829]73        """Required fields that have no default.
74
75        A list of names of field, whose value cannot be set if not
76        given during creation. Therefore these fields must exist in
77        input.
78
79        Fields with a default != missing_value do not belong to this
80        category.
81        """
[4806]82        result = []
83        for key, field in getFields(self.iface).items():
84            if key in self.location_fields:
85                continue
[4829]86            if field.default is not field.missing_value:
87                continue
[4806]88            if field.required:
89                result.append(key)
90        return result
[6259]91
[4806]92    @property
93    def req(self):
94        result = dict(
95            create = self.location_fields + self.required_fields,
96            update = self.location_fields,
97            remove = self.location_fields,
98        )
99        return result
100
101    @property
102    def available_fields(self):
103        return sorted(list(set(
104                    self.location_fields + getFields(self.iface).keys())))
[6259]105
[4806]106    def getHeaders(self, mode='create'):
107        return self.available_fields
108
109    def checkHeaders(self, headerfields, mode='create'):
110        req = self.req[mode]
111        # Check for required fields...
112        for field in req:
113            if not field in headerfields:
114                raise FatalCSVError(
115                    "Need at least columns %s for import!" %
116                    ', '.join(["'%s'" % x for x in req]))
[6828]117        # Check for double fields. Cannot happen because this error is
118        # already catched in views
[4806]119        not_ignored_fields = [x for x in headerfields
120                              if not x.startswith('--')]
121        if len(set(not_ignored_fields)) < len(not_ignored_fields):
122            raise FatalCSVError(
123                "Double headers: each column name may only appear once.")
124        return True
125
126    def applyMapping(self, row, mapping):
[4811]127        """Apply mapping to a row of CSV data.
[6824]128
[4811]129        """
[4806]130        result = dict()
131        for key, replacement in mapping.items():
[6824]132            if replacement == u'--IGNORE--':
133                # Skip ignored columns in failed and finished data files.
134                continue
[4806]135            result[replacement] = row[key]
136        return result
[6259]137
[4832]138    def getMapping(self, path, headerfields, mode):
[6824]139        """Get a mapping from CSV file headerfields to actually used fieldnames.
140
[4811]141        """
[4832]142        result = dict()
[4806]143        reader = csv.reader(open(path, 'rb'))
144        raw_header = reader.next()
[4832]145        for num, field in enumerate(headerfields):
146            if field not in self.location_fields and mode == 'remove':
[6824]147                # Skip non-location fields when removing.
148                continue
149            if field == u'--IGNORE--':
150                # Skip ignored columns in failed and finished data files.
151                continue
[4832]152            result[raw_header[num]] = field
153        return result
[4806]154
[6273]155    def stringFromErrs(self, errors, inv_errors):
156        result = []
157        for err in errors:
158            fieldname, message = err
159            result.append("%s: %s" % (fieldname, message))
160        for err in inv_errors:
161            result.append("invariant: %s" % err)
162        return '; '.join(result)
163
[4806]164    def callFactory(self, *args, **kw):
165        return createObject(self.factory_name)
166
167    def parentsExist(self, row, site):
[4811]168        """Tell whether the parent object for data in ``row`` exists.
169        """
[4806]170        raise NotImplementedError('method not implemented')
171
172    def entryExists(self, row, site):
[4811]173        """Tell whether there already exists an entry for ``row`` data.
174        """
[4806]175        raise NotImplementedError('method not implemented')
176
177    def getParent(self, row, site):
[4811]178        """Get the parent object for the entry in ``row``.
179        """
[4806]180        raise NotImplementedError('method not implemented')
[6259]181
[5009]182    def getEntry(self, row, site):
183        """Get the parent object for the entry in ``row``.
184        """
185        raise NotImplementedError('method not implemented')
[6259]186
[4806]187    def addEntry(self, obj, row, site):
[4811]188        """Add the entry given given by ``row`` data.
189        """
[4806]190        raise NotImplementedError('method not implemented')
191
192    def delEntry(self, row, site):
[4811]193        """Delete entry given by ``row`` data.
194        """
[6259]195        raise NotImplementedError('method not implemented')
[4806]196
[7950]197    def checkUpdateRequirements(self, obj, row, site):
198        """Checks requirements the object must fulfill when being updated.
[7938]199
200        This method is not used in case of deleting or adding objects.
201
[7950]202        Returns error messages as strings in case of requirement
[7938]203        problems.
[7937]204        """
[7938]205        return None
[7937]206
[9706]207    def updateEntry(self, obj, row, site, filename):
[4984]208        """Update obj to the values given in row.
[8220]209
210        Returns a string describing the fields changed.
[4984]211        """
[8220]212        changed = []
[4829]213        for key, value in row.items():
[8220]214            # Skip fields to be ignored.
215            if value == IGNORE_MARKER:
216                continue
[8304]217            # Skip fields not declared in interface and which are
218            # not yet attributes of existing objects. We can thus not
219            # add non-existing attributes here.
[8220]220            if not hasattr(obj, key):
221                continue
[9265]222            try:
223                setattr(obj, key, value)
224            except AttributeError:
225                # Computed attributes can't be set.
226                continue
[8222]227            log_value = getattr(value, 'code', value)
228            changed.append('%s=%s' % (key, log_value))
[8332]229
[8333]230        # If any catalog is involved it must be updated.
231        #
232        # XXX: The event is also triggered when creating objects as
233        # updateEntry is called also when creating entries resulting
234        # in objectAdded and additional objectModified events.
235        if len(changed):
236            notify(grok.ObjectModifiedEvent(obj))
[8332]237
[8220]238        return ', '.join(changed)
[4821]239
[4832]240    def createLogfile(self, path, fail_path, num, warnings, mode, user,
[4885]241                      timedelta, logger=None):
242        """Write to log file.
[4821]243        """
[4885]244        if logger is None:
245            return
246        status = 'OK'
247        if warnings > 0:
248            status = 'FAILED'
249        logger.info("-" * 20)
250        logger.info("%s: Batch processing finished: %s" % (user, status))
251        logger.info("%s: Source: %s" % (user, path))
252        logger.info("%s: Mode: %s" % (user, mode))
253        logger.info("%s: User: %s" % (user, user))
254        if warnings > 0:
[4900]255            logger.info("%s: Failed datasets: %s" % (
256                    user, os.path.basename(fail_path)))
[4885]257        logger.info("%s: Processing time: %0.3f s (%0.4f s/item)" % (
258                user, timedelta, timedelta/(num or 1)))
259        logger.info("%s: Processed: %s lines (%s successful/ %s failed)" % (
260                user, num, num - warnings, warnings
[4821]261                ))
[4885]262        logger.info("-" * 20)
[4821]263        return
[4877]264
265    def writeFailedRow(self, writer, row, warnings):
266        """Write a row with error messages to error CSV.
267
268        If warnings is a list of strings, they will be concatenated.
269        """
270        error_col = warnings
271        if isinstance(warnings, list):
272            error_col = ' / '.join(warnings)
273        row['--ERRORS--'] = error_col
274        writer.writerow(row)
275        return
[6259]276
[8220]277    def checkConversion(self, row, mode='ignore', ignore_empty=True):
[6847]278        """Validates all values in row.
279        """
280        converter = IObjectConverter(self.iface)
281        errs, inv_errs, conv_dict =  converter.fromStringDict(
[8220]282            row, self.factory_name, mode=mode)
[6847]283        return errs, inv_errs, conv_dict
284
[4885]285    def doImport(self, path, headerfields, mode='create', user='Unknown',
[8220]286                 logger=None, ignore_empty=True):
[4811]287        """Perform actual import.
288        """
[4832]289        time_start = time.time()
[4806]290        self.checkHeaders(headerfields, mode)
[4832]291        mapping = self.getMapping(path, headerfields, mode)
[4806]292        reader = csv.DictReader(open(path, 'rb'))
[4889]293
[4900]294        temp_dir = tempfile.mkdtemp()
[6259]295
[6273]296        base = os.path.basename(path)
297        (base, ext) = os.path.splitext(base)
[4900]298        failed_path = os.path.join(temp_dir, "%s.pending%s" % (base, ext))
[6831]299        failed_headers = mapping.values()
[4877]300        failed_headers.append('--ERRORS--')
[4821]301        failed_writer = csv.DictWriter(open(failed_path, 'wb'),
302                                       failed_headers)
[8573]303        os.chmod(failed_path, 0664)
[6831]304        failed_writer.writerow(dict([(x,x) for x in failed_headers]))
[4891]305
[4900]306        finished_path = os.path.join(temp_dir, "%s.finished%s" % (base, ext))
[6831]307        finished_headers = mapping.values()
[4891]308        finished_writer = csv.DictWriter(open(finished_path, 'wb'),
309                                         finished_headers)
[8905]310        os.chmod(finished_path, 0664)
[4891]311        finished_writer.writerow(dict([(x,x) for x in finished_headers]))
[6259]312
[4806]313        num =0
[4878]314        num_warns = 0
[4806]315        site = grok.getSite()
[7859]316
[4806]317        for raw_row in reader:
318            num += 1
319            string_row = self.applyMapping(raw_row, mapping)
[8222]320            if ignore_empty and mode in ('update',):
[8220]321                # replace empty strings with ignore-markers
[8222]322                for key, val in string_row.items():
[8220]323                    if val == '':
[8222]324                        string_row[key] = IGNORE_MARKER
325            row = dict(string_row.items()) # create deep copy
[6847]326            errs, inv_errs, conv_dict = self.checkConversion(string_row, mode)
[6273]327            if errs or inv_errs:
[4878]328                num_warns += 1
[6273]329                conv_warnings = self.stringFromErrs(errs, inv_errs)
330                self.writeFailedRow(
[6824]331                    failed_writer, string_row, conv_warnings)
[4821]332                continue
[6273]333            row.update(conv_dict)
[6259]334
[4806]335            if mode == 'create':
336                if not self.parentsExist(row, site):
[4878]337                    num_warns += 1
[4877]338                    self.writeFailedRow(
[6824]339                        failed_writer, string_row,
[4877]340                        "Not all parents do exist yet. Skipping")
[4806]341                    continue
342                if self.entryExists(row, site):
[4878]343                    num_warns += 1
[4877]344                    self.writeFailedRow(
[6824]345                        failed_writer, string_row,
[8330]346                        "This object already exists. Skipping.")
[4806]347                    continue
348                obj = self.callFactory()
[7273]349                # Override all values in row, also
350                # student_ids and applicant_ids which have been
351                # generated in the respective __init__ methods before.
[9706]352                self.updateEntry(obj, row, site, base)
[6243]353                try:
354                    self.addEntry(obj, row, site)
[6273]355                except KeyError, error:
[6219]356                    num_warns += 1
357                    self.writeFailedRow(
[6824]358                        failed_writer, string_row,
[6273]359                        "%s Skipping." % error.message)
[8540]360                    continue
[8509]361                except DuplicationError, error:
362                    num_warns += 1
363                    self.writeFailedRow(
364                        failed_writer, string_row,
365                        "%s Skipping." % error.msg)
[6219]366                    continue
[4806]367            elif mode == 'remove':
368                if not self.entryExists(row, site):
[4878]369                    num_warns += 1
[4877]370                    self.writeFailedRow(
[6824]371                        failed_writer, string_row,
[9219]372                        "Cannot remove: no such entry")
[4806]373                    continue
374                self.delEntry(row, site)
375            elif mode == 'update':
376                obj = self.getEntry(row, site)
377                if obj is None:
[4878]378                    num_warns += 1
[4877]379                    self.writeFailedRow(
[6824]380                        failed_writer, string_row,
[9219]381                        "Cannot update: no such entry")
[4806]382                    continue
[7950]383                update_errors = self.checkUpdateRequirements(obj, row, site)
[7938]384                if update_errors is not None:
[7937]385                    num_warns += 1
386                    self.writeFailedRow(
387                        failed_writer, string_row, update_errors)
388                    continue
[9706]389                self.updateEntry(obj, row, site, base)
[4891]390            finished_writer.writerow(string_row)
[4821]391
[4832]392        time_end = time.time()
393        timedelta = time_end - time_start
[6259]394
[4878]395        self.createLogfile(path, failed_path, num, num_warns, mode, user,
[4885]396                           timedelta, logger=logger)
[4894]397        failed_path = os.path.abspath(failed_path)
[4878]398        if num_warns == 0:
[4821]399            del failed_writer
400            os.unlink(failed_path)
[4894]401            failed_path = None
402        return (num, num_warns,
403                os.path.abspath(finished_path), failed_path)
[7859]404
[9032]405    def get_csv_skeleton(self):
406        """Export CSV file only with a header of available fields.
407
408        A raw string with CSV data should be returned.
409        """
410        outfile = StringIO()
411        writer = csv.DictWriter(outfile, self.available_fields)
412        writer.writerow(dict(zip(self.available_fields, self.available_fields))) # header
413        outfile.seek(0)
414        return outfile.read()
415
[7859]416class ExporterBase(object):
417    """A base for exporters.
418    """
419    grok.implements(ICSVExporter)
420
421    #: Fieldnames considered by this exporter
422    fields = ('code', 'title', 'title_prefix')
423
[7907]424    #: The title under which this exporter will be displayed
425    #: (if registered as a utility)
426    title = 'Override this title'
427
[7859]428    def mangle_value(self, value, name, context=None):
429        """Hook for mangling values in derived classes
430        """
431        if isinstance(value, bool):
432            value = value and '1' or '0'
433        elif isinstance(value, unicode):
434            # CSV writers like byte streams better than unicode
435            value = value.encode('utf-8')
[8380]436        elif isinstance(value, datetime.datetime):
437            value = str(value)
438        elif isinstance(value, datetime.date):
439            # Order is important here: check for date after datetime as
440            # datetimes are also dates.
441            #
442            # Append hash '#' to dates to circumvent unwanted excel automatic
443            value = str('%s#' % value)
[7859]444        elif value is None:
445            # None is not really representable in CSV files
446            value = ''
447        return value
448
449    def get_csv_writer(self, filepath=None):
450        """Get a CSV dict writer instance open for writing.
451
452        Returns a tuple (<writer>, <outfile>) where ``<writer>`` is a
453        :class:`csv.DictWriter` instance and outfile is the real file
454        which is written to. The latter is important when writing to
455        StringIO and can normally be ignored otherwise.
456
457        The returned file will already be filled with the header row.
458
459        Please note that if you give a filepath, the returned outfile
460        is open for writing only and you might have to close it before
461        reopening it for reading.
462        """
463        if filepath is None:
464            outfile = StringIO()
465        else:
466            outfile = open(filepath, 'wb')
467        writer = csv.DictWriter(outfile, self.fields)
468        writer.writerow(dict(zip(self.fields, self.fields))) # header
469        return writer, outfile
470
471    def write_item(self, obj, writer):
472        """Write a row extracted from `obj` into CSV file using `writer`.
473        """
474        row = {}
475        for name in self.fields:
476            value = getattr(obj, name, None)
477            value = self.mangle_value(value, name, obj)
478            row[name] = value
479        writer.writerow(row)
480        return
481
482    def close_outfile(self, filepath, outfile):
483        """Close outfile.
484
485        If filepath is None, the contents of outfile is returned.
486        """
487        outfile.seek(0)
488        if filepath is None:
489            return outfile.read()
490        outfile.close()
491        return
492
493    def export(self, iterable, filepath=None):
494        """Export `iterable` as CSV file.
495
496        If `filepath` is ``None``, a raw string with CSV data should
497        be returned.
498        """
499        raise NotImplementedError
500
501    def export_all(self, site, filepath=None):
502        """Export all appropriate objects in `site` into `filepath` as
503        CSV data.
504
505        If `filepath` is ``None``, a raw string with CSV data should
506        be returned.
507        """
508        raise NotImplementedError
[9217]509
510
511def export_job(site, exporter_name):
512    """Export all entries delivered by exporter and store it in a temp file.
513
514    `site` gives the site to search. It will be passed to the exporter
515    and also be set as 'current site' as the function is used in
516    asynchronous jobs which run in their own threads and have no site
517    set initially. Therefore `site` must also be a valid value for use
518    with `zope.component.hooks.setSite()`.
519
520    `exporter_name` is the utility name under which the desired
521    exporter was registered with the ZCA.
522
523    The resulting CSV file will be stored in a new temporary directory
524    (using :func:`tempfile.mkdtemp`). It will be named after the
525    exporter used with `.csv` filename extension.
526
527    Returns the path to the created CSV file.
528
529    .. note:: It is the callers responsibility to clean up the used
530              file and its parent directory.
531    """
532    setSite(site)
533    exporter = getUtility(ICSVExporter, name=exporter_name)
534    output_dir = tempfile.mkdtemp()
535    filename = '%s.csv' % exporter_name
536    output_path = os.path.join(output_dir, filename)
537    exporter.export_all(site, filepath=output_path)
538    return output_path
539
540class AsyncExportJob(AsyncJob):
541    """An IJob that exports data to CSV files.
542
543    `AsyncExportJob` instances are regular `AsyncJob` instances with a
544    different constructor API. Instead of a callable to execute, you
545    must pass a `site` and some `exporter_name` to trigger an export.
546
547    The real work is done when an instance of this class is put into a
548    queue. See :mod:`waeup.kofa.async` to learn more about
549    asynchronous jobs.
550
551    The `exporter_name` must be the name under which an ICSVExporter
552    utility was registered with the ZCA.
553
554    The `site` must be a valid site  or ``None``.
555
556    The result of an `AsyncExportJob` is the path to generated CSV
557    file. The file will reside in a temporary directory that should be
558    removed after being used.
559    """
560    grok.implements(IExportJob)
561
562    def __init__(self, site, exporter_name):
563        super(AsyncExportJob, self).__init__(
564            export_job, site, exporter_name)
565
566class ExportJobContainer(object):
567    """A mix-in that provides functionality for asynchronous export jobs.
568    """
569    grok.implements(IExportJobContainer)
570    running_exports = PersistentList()
571
572    def start_export_job(self, exporter_name, user_id):
573        """Start asynchronous export job.
574
575        `exporter_name` is the name of an exporter utility to be used.
576
577        `user_id` is the ID of the user that triggers the export.
578
579        The job_id is stored along with exporter name and user id in a
580        persistent list.
581
582        Returns the job ID of the job started.
583        """
584        site = grok.getSite()
585        manager = getUtility(IJobManager)
586        job = AsyncExportJob(site, exporter_name)
587        job_id = manager.put(job)
588        # Make sure that the persisted list is stored in ZODB
589        self.running_exports = PersistentList(self.running_exports)
590        self.running_exports.append((job_id, exporter_name, user_id))
591        return job_id
592
593    def get_running_export_jobs(self, user_id=None):
594        """Get export jobs for user with `user_id` as list of tuples.
595
596        Each tuples holds ``<job_id>, <exporter_name>, <user_id>`` in
597        that order. The ``<exporter_name>`` is the utility name of the
598        used exporter.
599
600        If `user_id` is ``None``, all running jobs are returned.
601        """
602        entries = []
603        to_delete = []
604        manager = getUtility(IJobManager)
605        for entry in self.running_exports:
606            if user_id is not None and entry[2] != user_id:
607                continue
608            if manager.get(entry[0]) is None:
609                to_delete.append(entry)
610                continue
611            entries.append(entry)
612        if to_delete:
613            self.running_exports = PersistentList(
614                [x for x in self.running_exports if x not in to_delete])
615        return entries
616
617    def get_export_jobs_status(self, user_id=None):
618        """Get running/completed export jobs for `user_id` as list of tuples.
619
620        Each tuple holds ``<raw status>, <status translated>,
621        <exporter title>`` in that order, where ``<status
622        translated>`` and ``<exporter title>`` are translated strings
623        representing the status of the job and the human readable
624        title of the exporter used.
625        """
626        entries = self.get_running_export_jobs(user_id)
627        result = []
628        manager = getUtility(IJobManager)
629        for entry in entries:
630            job = manager.get(entry[0])
631            if job is None:
632                continue
633            status, status_translated = JOB_STATUS_MAP[job.status]
634            exporter_name = getUtility(ICSVExporter, name=entry[1]).title
635            result.append((status, status_translated, exporter_name))
636        return result
637
638    def delete_export_entry(self, entry):
639        """Delete the export denoted by `entry`.
640
641        Removes given entry from the local `running_exports` list and also
642        removes the regarding job via the local job manager.
643
644        `entry` must be a tuple ``(<job id>, <exporter name>, <user
645        id>)`` as created by :meth:`start_export_job` or returned by
646        :meth:`get_running_export_jobs`.
647        """
648        manager = getUtility(IJobManager)
649        job = manager.get(entry[0])
650        if job is not None:
651            # remove created export file
652            if isinstance(job.result, basestring):
653                if os.path.exists(os.path.dirname(job.result)):
654                    shutil.rmtree(os.path.dirname(job.result))
655        manager.remove(entry[0], self)
656        new_entries = [x for x in self.running_exports
657                       if x != entry]
658        self.running_exports = PersistentList(new_entries)
659        return
660
661    def entry_from_job_id(self, job_id):
662        """Get entry tuple for `job_id`.
663
664        Returns ``None`` if no such entry can be found.
665        """
666        for entry in self.running_exports:
667            if entry[0] == job_id:
668                return entry
669        return None
Note: See TracBrowser for help on using the repository browser.