source: main/waeup.kofa/trunk/src/waeup/kofa/utils/batching.py @ 8351

Last change on this file since 8351 was 8333, checked in by uli, 13 years ago

Do not notify if objects stay unchanged.

  • Property svn:keywords set to Id
File size: 16.7 KB
RevLine 
[7196]1## $Id: batching.py 8333 2012-05-03 13:01:03Z uli $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
[7819]18"""Kofa components for batch processing.
[4806]19
20Batch processors eat CSV files to add, update or remove large numbers
21of certain kinds of objects at once.
22"""
23import grok
24import csv
[4821]25import os
[4900]26import tempfile
[4821]27import time
[7859]28from cStringIO import StringIO
[4806]29from zope.component import createObject
30from zope.interface import Interface
31from zope.schema import getFields
[8332]32from zope.event import notify
[7811]33from waeup.kofa.interfaces import (
[7860]34    IBatchProcessor, FatalCSVError, IObjectConverter,
[8300]35    ICSVExporter, IGNORE_MARKER)
[4806]36
37class BatchProcessor(grok.GlobalUtility):
38    """A processor to add, update, or remove data.
39
40    This is a non-active baseclass.
41    """
[8220]42    grok.implements(IBatchProcessor)
[4806]43    grok.context(Interface)
44    grok.baseclass()
45
46    # Name used in pages and forms...
[7933]47    name = u'Non-registered base processor'
[6259]48
[4806]49    # Internal name...
[7933]50    util_name = 'baseprocessor'
[6259]51
[4806]52    # Items for this processor need an interface with zope.schema fields.
[5009]53    iface = Interface
[6259]54
[4806]55    # The name must be the same as the util_name attribute in order to
56    # register this utility correctly.
57    grok.name(util_name)
58
59    # Headers needed to locate items...
60    location_fields = ['code', 'faculty_code']
[6259]61
[4806]62    # A factory with this name must be registered...
63    factory_name = 'waeup.Department'
64
65    @property
66    def required_fields(self):
[4829]67        """Required fields that have no default.
68
69        A list of names of field, whose value cannot be set if not
70        given during creation. Therefore these fields must exist in
71        input.
72
73        Fields with a default != missing_value do not belong to this
74        category.
75        """
[4806]76        result = []
77        for key, field in getFields(self.iface).items():
78            if key in self.location_fields:
79                continue
[4829]80            if field.default is not field.missing_value:
81                continue
[4806]82            if field.required:
83                result.append(key)
84        return result
[6259]85
[4806]86    @property
87    def req(self):
88        result = dict(
89            create = self.location_fields + self.required_fields,
90            update = self.location_fields,
91            remove = self.location_fields,
92        )
93        return result
94
95    @property
96    def available_fields(self):
97        return sorted(list(set(
98                    self.location_fields + getFields(self.iface).keys())))
[6259]99
[4806]100    def getHeaders(self, mode='create'):
101        return self.available_fields
102
103    def checkHeaders(self, headerfields, mode='create'):
104        req = self.req[mode]
105        # Check for required fields...
106        for field in req:
107            if not field in headerfields:
108                raise FatalCSVError(
109                    "Need at least columns %s for import!" %
110                    ', '.join(["'%s'" % x for x in req]))
[6828]111        # Check for double fields. Cannot happen because this error is
112        # already catched in views
[4806]113        not_ignored_fields = [x for x in headerfields
114                              if not x.startswith('--')]
115        if len(set(not_ignored_fields)) < len(not_ignored_fields):
116            raise FatalCSVError(
117                "Double headers: each column name may only appear once.")
118        return True
119
120    def applyMapping(self, row, mapping):
[4811]121        """Apply mapping to a row of CSV data.
[6824]122
[4811]123        """
[4806]124        result = dict()
125        for key, replacement in mapping.items():
[6824]126            if replacement == u'--IGNORE--':
127                # Skip ignored columns in failed and finished data files.
128                continue
[4806]129            result[replacement] = row[key]
130        return result
[6259]131
[4832]132    def getMapping(self, path, headerfields, mode):
[6824]133        """Get a mapping from CSV file headerfields to actually used fieldnames.
134
[4811]135        """
[4832]136        result = dict()
[4806]137        reader = csv.reader(open(path, 'rb'))
138        raw_header = reader.next()
[4832]139        for num, field in enumerate(headerfields):
140            if field not in self.location_fields and mode == 'remove':
[6824]141                # Skip non-location fields when removing.
142                continue
143            if field == u'--IGNORE--':
144                # Skip ignored columns in failed and finished data files.
145                continue
[4832]146            result[raw_header[num]] = field
147        return result
[4806]148
[6273]149    def stringFromErrs(self, errors, inv_errors):
150        result = []
151        for err in errors:
152            fieldname, message = err
153            result.append("%s: %s" % (fieldname, message))
154        for err in inv_errors:
155            result.append("invariant: %s" % err)
156        return '; '.join(result)
157
[4806]158    def callFactory(self, *args, **kw):
159        return createObject(self.factory_name)
160
161    def parentsExist(self, row, site):
[4811]162        """Tell whether the parent object for data in ``row`` exists.
163        """
[4806]164        raise NotImplementedError('method not implemented')
165
166    def entryExists(self, row, site):
[4811]167        """Tell whether there already exists an entry for ``row`` data.
168        """
[4806]169        raise NotImplementedError('method not implemented')
170
171    def getParent(self, row, site):
[4811]172        """Get the parent object for the entry in ``row``.
173        """
[4806]174        raise NotImplementedError('method not implemented')
[6259]175
[5009]176    def getEntry(self, row, site):
177        """Get the parent object for the entry in ``row``.
178        """
179        raise NotImplementedError('method not implemented')
[6259]180
[4806]181    def addEntry(self, obj, row, site):
[4811]182        """Add the entry given given by ``row`` data.
183        """
[4806]184        raise NotImplementedError('method not implemented')
185
186    def delEntry(self, row, site):
[4811]187        """Delete entry given by ``row`` data.
188        """
[6259]189        raise NotImplementedError('method not implemented')
[4806]190
[7950]191    def checkUpdateRequirements(self, obj, row, site):
192        """Checks requirements the object must fulfill when being updated.
[7938]193
194        This method is not used in case of deleting or adding objects.
195
[7950]196        Returns error messages as strings in case of requirement
[7938]197        problems.
[7937]198        """
[7938]199        return None
[7937]200
[4806]201    def updateEntry(self, obj, row, site):
[4984]202        """Update obj to the values given in row.
[8220]203
204        Returns a string describing the fields changed.
[4984]205        """
[8220]206        changed = []
[4829]207        for key, value in row.items():
[8220]208            # Skip fields to be ignored.
209            if value == IGNORE_MARKER:
210                continue
[8304]211            # Skip fields not declared in interface and which are
212            # not yet attributes of existing objects. We can thus not
213            # add non-existing attributes here.
[8220]214            if not hasattr(obj, key):
215                continue
216            setattr(obj, key, value)
[8222]217            log_value = getattr(value, 'code', value)
218            changed.append('%s=%s' % (key, log_value))
[8332]219
[8333]220        # If any catalog is involved it must be updated.
221        #
222        # XXX: The event is also triggered when creating objects as
223        # updateEntry is called also when creating entries resulting
224        # in objectAdded and additional objectModified events.
225        if len(changed):
226            notify(grok.ObjectModifiedEvent(obj))
[8332]227
[8220]228        return ', '.join(changed)
[4821]229
[4832]230    def createLogfile(self, path, fail_path, num, warnings, mode, user,
[4885]231                      timedelta, logger=None):
232        """Write to log file.
[4821]233        """
[4885]234        if logger is None:
235            return
236        status = 'OK'
237        if warnings > 0:
238            status = 'FAILED'
239        logger.info("-" * 20)
240        logger.info("%s: Batch processing finished: %s" % (user, status))
241        logger.info("%s: Source: %s" % (user, path))
242        logger.info("%s: Mode: %s" % (user, mode))
243        logger.info("%s: User: %s" % (user, user))
244        if warnings > 0:
[4900]245            logger.info("%s: Failed datasets: %s" % (
246                    user, os.path.basename(fail_path)))
[4885]247        logger.info("%s: Processing time: %0.3f s (%0.4f s/item)" % (
248                user, timedelta, timedelta/(num or 1)))
249        logger.info("%s: Processed: %s lines (%s successful/ %s failed)" % (
250                user, num, num - warnings, warnings
[4821]251                ))
[4885]252        logger.info("-" * 20)
[4821]253        return
[4877]254
255    def writeFailedRow(self, writer, row, warnings):
256        """Write a row with error messages to error CSV.
257
258        If warnings is a list of strings, they will be concatenated.
259        """
260        error_col = warnings
261        if isinstance(warnings, list):
262            error_col = ' / '.join(warnings)
263        row['--ERRORS--'] = error_col
264        writer.writerow(row)
265        return
[6259]266
[8220]267    def checkConversion(self, row, mode='ignore', ignore_empty=True):
[6847]268        """Validates all values in row.
269        """
270        converter = IObjectConverter(self.iface)
271        errs, inv_errs, conv_dict =  converter.fromStringDict(
[8220]272            row, self.factory_name, mode=mode)
[6847]273        return errs, inv_errs, conv_dict
274
[4885]275    def doImport(self, path, headerfields, mode='create', user='Unknown',
[8220]276                 logger=None, ignore_empty=True):
[4811]277        """Perform actual import.
278        """
[4832]279        time_start = time.time()
[4806]280        self.checkHeaders(headerfields, mode)
[4832]281        mapping = self.getMapping(path, headerfields, mode)
[4806]282        reader = csv.DictReader(open(path, 'rb'))
[4889]283
[4900]284        temp_dir = tempfile.mkdtemp()
[6259]285
[6273]286        base = os.path.basename(path)
287        (base, ext) = os.path.splitext(base)
[4900]288        failed_path = os.path.join(temp_dir, "%s.pending%s" % (base, ext))
[6831]289        failed_headers = mapping.values()
[4877]290        failed_headers.append('--ERRORS--')
[4821]291        failed_writer = csv.DictWriter(open(failed_path, 'wb'),
292                                       failed_headers)
[6831]293        failed_writer.writerow(dict([(x,x) for x in failed_headers]))
[4891]294
[4900]295        finished_path = os.path.join(temp_dir, "%s.finished%s" % (base, ext))
[6831]296        finished_headers = mapping.values()
[4891]297        finished_writer = csv.DictWriter(open(finished_path, 'wb'),
298                                         finished_headers)
299        finished_writer.writerow(dict([(x,x) for x in finished_headers]))
[6259]300
[4806]301        num =0
[4878]302        num_warns = 0
[4806]303        site = grok.getSite()
[7859]304
[4806]305        for raw_row in reader:
306            num += 1
307            string_row = self.applyMapping(raw_row, mapping)
[8222]308            if ignore_empty and mode in ('update',):
[8220]309                # replace empty strings with ignore-markers
[8222]310                for key, val in string_row.items():
[8220]311                    if val == '':
[8222]312                        string_row[key] = IGNORE_MARKER
313            row = dict(string_row.items()) # create deep copy
[6847]314            errs, inv_errs, conv_dict = self.checkConversion(string_row, mode)
[6273]315            if errs or inv_errs:
[4878]316                num_warns += 1
[6273]317                conv_warnings = self.stringFromErrs(errs, inv_errs)
318                self.writeFailedRow(
[6824]319                    failed_writer, string_row, conv_warnings)
[4821]320                continue
[6273]321            row.update(conv_dict)
[6259]322
[4806]323            if mode == 'create':
324                if not self.parentsExist(row, site):
[4878]325                    num_warns += 1
[4877]326                    self.writeFailedRow(
[6824]327                        failed_writer, string_row,
[4877]328                        "Not all parents do exist yet. Skipping")
[4806]329                    continue
330                if self.entryExists(row, site):
[4878]331                    num_warns += 1
[4877]332                    self.writeFailedRow(
[6824]333                        failed_writer, string_row,
[8330]334                        "This object already exists. Skipping.")
[4806]335                    continue
336                obj = self.callFactory()
[7273]337                # Override all values in row, also
338                # student_ids and applicant_ids which have been
339                # generated in the respective __init__ methods before.
[7649]340                self.updateEntry(obj, row, site)
[6243]341                try:
342                    self.addEntry(obj, row, site)
[6273]343                except KeyError, error:
[6219]344                    num_warns += 1
345                    self.writeFailedRow(
[6824]346                        failed_writer, string_row,
[6273]347                        "%s Skipping." % error.message)
[6219]348                    continue
[4806]349            elif mode == 'remove':
350                if not self.entryExists(row, site):
[4878]351                    num_warns += 1
[4877]352                    self.writeFailedRow(
[6824]353                        failed_writer, string_row,
[4877]354                        "Cannot remove: no such entry.")
[4806]355                    continue
356                self.delEntry(row, site)
357            elif mode == 'update':
358                obj = self.getEntry(row, site)
359                if obj is None:
[4878]360                    num_warns += 1
[4877]361                    self.writeFailedRow(
[6824]362                        failed_writer, string_row,
[4877]363                        "Cannot update: no such entry.")
[4806]364                    continue
[7950]365                update_errors = self.checkUpdateRequirements(obj, row, site)
[7938]366                if update_errors is not None:
[7937]367                    num_warns += 1
368                    self.writeFailedRow(
369                        failed_writer, string_row, update_errors)
370                    continue
[4806]371                self.updateEntry(obj, row, site)
[4891]372            finished_writer.writerow(string_row)
[4821]373
[4832]374        time_end = time.time()
375        timedelta = time_end - time_start
[6259]376
[4878]377        self.createLogfile(path, failed_path, num, num_warns, mode, user,
[4885]378                           timedelta, logger=logger)
[4894]379        failed_path = os.path.abspath(failed_path)
[4878]380        if num_warns == 0:
[4821]381            del failed_writer
382            os.unlink(failed_path)
[4894]383            failed_path = None
384        return (num, num_warns,
385                os.path.abspath(finished_path), failed_path)
[7859]386
387class ExporterBase(object):
388    """A base for exporters.
389    """
390    grok.implements(ICSVExporter)
391
392    #: Fieldnames considered by this exporter
393    fields = ('code', 'title', 'title_prefix')
394
[7907]395    #: The title under which this exporter will be displayed
396    #: (if registered as a utility)
397    title = 'Override this title'
398
[7859]399    def mangle_value(self, value, name, context=None):
400        """Hook for mangling values in derived classes
401        """
402        if isinstance(value, bool):
403            value = value and '1' or '0'
404        elif isinstance(value, unicode):
405            # CSV writers like byte streams better than unicode
406            value = value.encode('utf-8')
407        elif value is None:
408            # None is not really representable in CSV files
409            value = ''
410        return value
411
412    def get_csv_writer(self, filepath=None):
413        """Get a CSV dict writer instance open for writing.
414
415        Returns a tuple (<writer>, <outfile>) where ``<writer>`` is a
416        :class:`csv.DictWriter` instance and outfile is the real file
417        which is written to. The latter is important when writing to
418        StringIO and can normally be ignored otherwise.
419
420        The returned file will already be filled with the header row.
421
422        Please note that if you give a filepath, the returned outfile
423        is open for writing only and you might have to close it before
424        reopening it for reading.
425        """
426        if filepath is None:
427            outfile = StringIO()
428        else:
429            outfile = open(filepath, 'wb')
430        writer = csv.DictWriter(outfile, self.fields)
431        writer.writerow(dict(zip(self.fields, self.fields))) # header
432        return writer, outfile
433
434    def write_item(self, obj, writer):
435        """Write a row extracted from `obj` into CSV file using `writer`.
436        """
437        row = {}
438        for name in self.fields:
439            value = getattr(obj, name, None)
440            value = self.mangle_value(value, name, obj)
441            row[name] = value
442        writer.writerow(row)
443        return
444
445    def close_outfile(self, filepath, outfile):
446        """Close outfile.
447
448        If filepath is None, the contents of outfile is returned.
449        """
450        outfile.seek(0)
451        if filepath is None:
452            return outfile.read()
453        outfile.close()
454        return
455
456    def export(self, iterable, filepath=None):
457        """Export `iterable` as CSV file.
458
459        If `filepath` is ``None``, a raw string with CSV data should
460        be returned.
461        """
462        raise NotImplementedError
463
464    def export_all(self, site, filepath=None):
465        """Export all appropriate objects in `site` into `filepath` as
466        CSV data.
467
468        If `filepath` is ``None``, a raw string with CSV data should
469        be returned.
470        """
471        raise NotImplementedError
Note: See TracBrowser for help on using the repository browser.