source: main/waeup.kofa/trunk/src/waeup/kofa/utils/batching.py @ 8466

Last change on this file since 8466 was 8380, checked in by uli, 13 years ago

Add hash by default when exporting dates.

  • Property svn:keywords set to Id
File size: 17.1 KB
RevLine 
[7196]1## $Id: batching.py 8380 2012-05-07 10:15:21Z uli $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
[7819]18"""Kofa components for batch processing.
[4806]19
20Batch processors eat CSV files to add, update or remove large numbers
21of certain kinds of objects at once.
22"""
23import grok
24import csv
[8380]25import datetime
[4821]26import os
[4900]27import tempfile
[4821]28import time
[7859]29from cStringIO import StringIO
[4806]30from zope.component import createObject
31from zope.interface import Interface
32from zope.schema import getFields
[8332]33from zope.event import notify
[7811]34from waeup.kofa.interfaces import (
[7860]35    IBatchProcessor, FatalCSVError, IObjectConverter,
[8300]36    ICSVExporter, IGNORE_MARKER)
[4806]37
38class BatchProcessor(grok.GlobalUtility):
39    """A processor to add, update, or remove data.
40
41    This is a non-active baseclass.
42    """
[8220]43    grok.implements(IBatchProcessor)
[4806]44    grok.context(Interface)
45    grok.baseclass()
46
47    # Name used in pages and forms...
[7933]48    name = u'Non-registered base processor'
[6259]49
[4806]50    # Internal name...
[7933]51    util_name = 'baseprocessor'
[6259]52
[4806]53    # Items for this processor need an interface with zope.schema fields.
[5009]54    iface = Interface
[6259]55
[4806]56    # The name must be the same as the util_name attribute in order to
57    # register this utility correctly.
58    grok.name(util_name)
59
60    # Headers needed to locate items...
61    location_fields = ['code', 'faculty_code']
[6259]62
[4806]63    # A factory with this name must be registered...
64    factory_name = 'waeup.Department'
65
66    @property
67    def required_fields(self):
[4829]68        """Required fields that have no default.
69
70        A list of names of field, whose value cannot be set if not
71        given during creation. Therefore these fields must exist in
72        input.
73
74        Fields with a default != missing_value do not belong to this
75        category.
76        """
[4806]77        result = []
78        for key, field in getFields(self.iface).items():
79            if key in self.location_fields:
80                continue
[4829]81            if field.default is not field.missing_value:
82                continue
[4806]83            if field.required:
84                result.append(key)
85        return result
[6259]86
[4806]87    @property
88    def req(self):
89        result = dict(
90            create = self.location_fields + self.required_fields,
91            update = self.location_fields,
92            remove = self.location_fields,
93        )
94        return result
95
96    @property
97    def available_fields(self):
98        return sorted(list(set(
99                    self.location_fields + getFields(self.iface).keys())))
[6259]100
[4806]101    def getHeaders(self, mode='create'):
102        return self.available_fields
103
104    def checkHeaders(self, headerfields, mode='create'):
105        req = self.req[mode]
106        # Check for required fields...
107        for field in req:
108            if not field in headerfields:
109                raise FatalCSVError(
110                    "Need at least columns %s for import!" %
111                    ', '.join(["'%s'" % x for x in req]))
[6828]112        # Check for double fields. Cannot happen because this error is
113        # already catched in views
[4806]114        not_ignored_fields = [x for x in headerfields
115                              if not x.startswith('--')]
116        if len(set(not_ignored_fields)) < len(not_ignored_fields):
117            raise FatalCSVError(
118                "Double headers: each column name may only appear once.")
119        return True
120
121    def applyMapping(self, row, mapping):
[4811]122        """Apply mapping to a row of CSV data.
[6824]123
[4811]124        """
[4806]125        result = dict()
126        for key, replacement in mapping.items():
[6824]127            if replacement == u'--IGNORE--':
128                # Skip ignored columns in failed and finished data files.
129                continue
[4806]130            result[replacement] = row[key]
131        return result
[6259]132
[4832]133    def getMapping(self, path, headerfields, mode):
[6824]134        """Get a mapping from CSV file headerfields to actually used fieldnames.
135
[4811]136        """
[4832]137        result = dict()
[4806]138        reader = csv.reader(open(path, 'rb'))
139        raw_header = reader.next()
[4832]140        for num, field in enumerate(headerfields):
141            if field not in self.location_fields and mode == 'remove':
[6824]142                # Skip non-location fields when removing.
143                continue
144            if field == u'--IGNORE--':
145                # Skip ignored columns in failed and finished data files.
146                continue
[4832]147            result[raw_header[num]] = field
148        return result
[4806]149
[6273]150    def stringFromErrs(self, errors, inv_errors):
151        result = []
152        for err in errors:
153            fieldname, message = err
154            result.append("%s: %s" % (fieldname, message))
155        for err in inv_errors:
156            result.append("invariant: %s" % err)
157        return '; '.join(result)
158
[4806]159    def callFactory(self, *args, **kw):
160        return createObject(self.factory_name)
161
162    def parentsExist(self, row, site):
[4811]163        """Tell whether the parent object for data in ``row`` exists.
164        """
[4806]165        raise NotImplementedError('method not implemented')
166
167    def entryExists(self, row, site):
[4811]168        """Tell whether there already exists an entry for ``row`` data.
169        """
[4806]170        raise NotImplementedError('method not implemented')
171
172    def getParent(self, row, site):
[4811]173        """Get the parent object for the entry in ``row``.
174        """
[4806]175        raise NotImplementedError('method not implemented')
[6259]176
[5009]177    def getEntry(self, row, site):
178        """Get the parent object for the entry in ``row``.
179        """
180        raise NotImplementedError('method not implemented')
[6259]181
[4806]182    def addEntry(self, obj, row, site):
[4811]183        """Add the entry given given by ``row`` data.
184        """
[4806]185        raise NotImplementedError('method not implemented')
186
187    def delEntry(self, row, site):
[4811]188        """Delete entry given by ``row`` data.
189        """
[6259]190        raise NotImplementedError('method not implemented')
[4806]191
[7950]192    def checkUpdateRequirements(self, obj, row, site):
193        """Checks requirements the object must fulfill when being updated.
[7938]194
195        This method is not used in case of deleting or adding objects.
196
[7950]197        Returns error messages as strings in case of requirement
[7938]198        problems.
[7937]199        """
[7938]200        return None
[7937]201
[4806]202    def updateEntry(self, obj, row, site):
[4984]203        """Update obj to the values given in row.
[8220]204
205        Returns a string describing the fields changed.
[4984]206        """
[8220]207        changed = []
[4829]208        for key, value in row.items():
[8220]209            # Skip fields to be ignored.
210            if value == IGNORE_MARKER:
211                continue
[8304]212            # Skip fields not declared in interface and which are
213            # not yet attributes of existing objects. We can thus not
214            # add non-existing attributes here.
[8220]215            if not hasattr(obj, key):
216                continue
217            setattr(obj, key, value)
[8222]218            log_value = getattr(value, 'code', value)
219            changed.append('%s=%s' % (key, log_value))
[8332]220
[8333]221        # If any catalog is involved it must be updated.
222        #
223        # XXX: The event is also triggered when creating objects as
224        # updateEntry is called also when creating entries resulting
225        # in objectAdded and additional objectModified events.
226        if len(changed):
227            notify(grok.ObjectModifiedEvent(obj))
[8332]228
[8220]229        return ', '.join(changed)
[4821]230
[4832]231    def createLogfile(self, path, fail_path, num, warnings, mode, user,
[4885]232                      timedelta, logger=None):
233        """Write to log file.
[4821]234        """
[4885]235        if logger is None:
236            return
237        status = 'OK'
238        if warnings > 0:
239            status = 'FAILED'
240        logger.info("-" * 20)
241        logger.info("%s: Batch processing finished: %s" % (user, status))
242        logger.info("%s: Source: %s" % (user, path))
243        logger.info("%s: Mode: %s" % (user, mode))
244        logger.info("%s: User: %s" % (user, user))
245        if warnings > 0:
[4900]246            logger.info("%s: Failed datasets: %s" % (
247                    user, os.path.basename(fail_path)))
[4885]248        logger.info("%s: Processing time: %0.3f s (%0.4f s/item)" % (
249                user, timedelta, timedelta/(num or 1)))
250        logger.info("%s: Processed: %s lines (%s successful/ %s failed)" % (
251                user, num, num - warnings, warnings
[4821]252                ))
[4885]253        logger.info("-" * 20)
[4821]254        return
[4877]255
256    def writeFailedRow(self, writer, row, warnings):
257        """Write a row with error messages to error CSV.
258
259        If warnings is a list of strings, they will be concatenated.
260        """
261        error_col = warnings
262        if isinstance(warnings, list):
263            error_col = ' / '.join(warnings)
264        row['--ERRORS--'] = error_col
265        writer.writerow(row)
266        return
[6259]267
[8220]268    def checkConversion(self, row, mode='ignore', ignore_empty=True):
[6847]269        """Validates all values in row.
270        """
271        converter = IObjectConverter(self.iface)
272        errs, inv_errs, conv_dict =  converter.fromStringDict(
[8220]273            row, self.factory_name, mode=mode)
[6847]274        return errs, inv_errs, conv_dict
275
[4885]276    def doImport(self, path, headerfields, mode='create', user='Unknown',
[8220]277                 logger=None, ignore_empty=True):
[4811]278        """Perform actual import.
279        """
[4832]280        time_start = time.time()
[4806]281        self.checkHeaders(headerfields, mode)
[4832]282        mapping = self.getMapping(path, headerfields, mode)
[4806]283        reader = csv.DictReader(open(path, 'rb'))
[4889]284
[4900]285        temp_dir = tempfile.mkdtemp()
[6259]286
[6273]287        base = os.path.basename(path)
288        (base, ext) = os.path.splitext(base)
[4900]289        failed_path = os.path.join(temp_dir, "%s.pending%s" % (base, ext))
[6831]290        failed_headers = mapping.values()
[4877]291        failed_headers.append('--ERRORS--')
[4821]292        failed_writer = csv.DictWriter(open(failed_path, 'wb'),
293                                       failed_headers)
[6831]294        failed_writer.writerow(dict([(x,x) for x in failed_headers]))
[4891]295
[4900]296        finished_path = os.path.join(temp_dir, "%s.finished%s" % (base, ext))
[6831]297        finished_headers = mapping.values()
[4891]298        finished_writer = csv.DictWriter(open(finished_path, 'wb'),
299                                         finished_headers)
300        finished_writer.writerow(dict([(x,x) for x in finished_headers]))
[6259]301
[4806]302        num =0
[4878]303        num_warns = 0
[4806]304        site = grok.getSite()
[7859]305
[4806]306        for raw_row in reader:
307            num += 1
308            string_row = self.applyMapping(raw_row, mapping)
[8222]309            if ignore_empty and mode in ('update',):
[8220]310                # replace empty strings with ignore-markers
[8222]311                for key, val in string_row.items():
[8220]312                    if val == '':
[8222]313                        string_row[key] = IGNORE_MARKER
314            row = dict(string_row.items()) # create deep copy
[6847]315            errs, inv_errs, conv_dict = self.checkConversion(string_row, mode)
[6273]316            if errs or inv_errs:
[4878]317                num_warns += 1
[6273]318                conv_warnings = self.stringFromErrs(errs, inv_errs)
319                self.writeFailedRow(
[6824]320                    failed_writer, string_row, conv_warnings)
[4821]321                continue
[6273]322            row.update(conv_dict)
[6259]323
[4806]324            if mode == 'create':
325                if not self.parentsExist(row, site):
[4878]326                    num_warns += 1
[4877]327                    self.writeFailedRow(
[6824]328                        failed_writer, string_row,
[4877]329                        "Not all parents do exist yet. Skipping")
[4806]330                    continue
331                if self.entryExists(row, site):
[4878]332                    num_warns += 1
[4877]333                    self.writeFailedRow(
[6824]334                        failed_writer, string_row,
[8330]335                        "This object already exists. Skipping.")
[4806]336                    continue
337                obj = self.callFactory()
[7273]338                # Override all values in row, also
339                # student_ids and applicant_ids which have been
340                # generated in the respective __init__ methods before.
[7649]341                self.updateEntry(obj, row, site)
[6243]342                try:
343                    self.addEntry(obj, row, site)
[6273]344                except KeyError, error:
[6219]345                    num_warns += 1
346                    self.writeFailedRow(
[6824]347                        failed_writer, string_row,
[6273]348                        "%s Skipping." % error.message)
[6219]349                    continue
[4806]350            elif mode == 'remove':
351                if not self.entryExists(row, site):
[4878]352                    num_warns += 1
[4877]353                    self.writeFailedRow(
[6824]354                        failed_writer, string_row,
[4877]355                        "Cannot remove: no such entry.")
[4806]356                    continue
357                self.delEntry(row, site)
358            elif mode == 'update':
359                obj = self.getEntry(row, site)
360                if obj is None:
[4878]361                    num_warns += 1
[4877]362                    self.writeFailedRow(
[6824]363                        failed_writer, string_row,
[4877]364                        "Cannot update: no such entry.")
[4806]365                    continue
[7950]366                update_errors = self.checkUpdateRequirements(obj, row, site)
[7938]367                if update_errors is not None:
[7937]368                    num_warns += 1
369                    self.writeFailedRow(
370                        failed_writer, string_row, update_errors)
371                    continue
[4806]372                self.updateEntry(obj, row, site)
[4891]373            finished_writer.writerow(string_row)
[4821]374
[4832]375        time_end = time.time()
376        timedelta = time_end - time_start
[6259]377
[4878]378        self.createLogfile(path, failed_path, num, num_warns, mode, user,
[4885]379                           timedelta, logger=logger)
[4894]380        failed_path = os.path.abspath(failed_path)
[4878]381        if num_warns == 0:
[4821]382            del failed_writer
383            os.unlink(failed_path)
[4894]384            failed_path = None
385        return (num, num_warns,
386                os.path.abspath(finished_path), failed_path)
[7859]387
388class ExporterBase(object):
389    """A base for exporters.
390    """
391    grok.implements(ICSVExporter)
392
393    #: Fieldnames considered by this exporter
394    fields = ('code', 'title', 'title_prefix')
395
[7907]396    #: The title under which this exporter will be displayed
397    #: (if registered as a utility)
398    title = 'Override this title'
399
[7859]400    def mangle_value(self, value, name, context=None):
401        """Hook for mangling values in derived classes
402        """
403        if isinstance(value, bool):
404            value = value and '1' or '0'
405        elif isinstance(value, unicode):
406            # CSV writers like byte streams better than unicode
407            value = value.encode('utf-8')
[8380]408        elif isinstance(value, datetime.datetime):
409            value = str(value)
410        elif isinstance(value, datetime.date):
411            # Order is important here: check for date after datetime as
412            # datetimes are also dates.
413            #
414            # Append hash '#' to dates to circumvent unwanted excel automatic
415            value = str('%s#' % value)
[7859]416        elif value is None:
417            # None is not really representable in CSV files
418            value = ''
419        return value
420
421    def get_csv_writer(self, filepath=None):
422        """Get a CSV dict writer instance open for writing.
423
424        Returns a tuple (<writer>, <outfile>) where ``<writer>`` is a
425        :class:`csv.DictWriter` instance and outfile is the real file
426        which is written to. The latter is important when writing to
427        StringIO and can normally be ignored otherwise.
428
429        The returned file will already be filled with the header row.
430
431        Please note that if you give a filepath, the returned outfile
432        is open for writing only and you might have to close it before
433        reopening it for reading.
434        """
435        if filepath is None:
436            outfile = StringIO()
437        else:
438            outfile = open(filepath, 'wb')
439        writer = csv.DictWriter(outfile, self.fields)
440        writer.writerow(dict(zip(self.fields, self.fields))) # header
441        return writer, outfile
442
443    def write_item(self, obj, writer):
444        """Write a row extracted from `obj` into CSV file using `writer`.
445        """
446        row = {}
447        for name in self.fields:
448            value = getattr(obj, name, None)
449            value = self.mangle_value(value, name, obj)
450            row[name] = value
451        writer.writerow(row)
452        return
453
454    def close_outfile(self, filepath, outfile):
455        """Close outfile.
456
457        If filepath is None, the contents of outfile is returned.
458        """
459        outfile.seek(0)
460        if filepath is None:
461            return outfile.read()
462        outfile.close()
463        return
464
465    def export(self, iterable, filepath=None):
466        """Export `iterable` as CSV file.
467
468        If `filepath` is ``None``, a raw string with CSV data should
469        be returned.
470        """
471        raise NotImplementedError
472
473    def export_all(self, site, filepath=None):
474        """Export all appropriate objects in `site` into `filepath` as
475        CSV data.
476
477        If `filepath` is ``None``, a raw string with CSV data should
478        be returned.
479        """
480        raise NotImplementedError
Note: See TracBrowser for help on using the repository browser.