source: main/waeup.kofa/trunk/src/waeup/kofa/utils/batching.py @ 8222

Last change on this file since 8222 was 8222, checked in by uli, 13 years ago
  • Fix doImport. Still needs tests.
  • Use code in log messages.
  • Property svn:keywords set to Id
File size: 16.3 KB
RevLine 
[7196]1## $Id: batching.py 8222 2012-04-19 15:39:17Z uli $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
[7819]18"""Kofa components for batch processing.
[4806]19
20Batch processors eat CSV files to add, update or remove large numbers
21of certain kinds of objects at once.
22"""
23import grok
24import csv
[4821]25import os
[4900]26import tempfile
[4821]27import time
[7859]28from cStringIO import StringIO
[4806]29from zope.component import createObject
30from zope.interface import Interface
31from zope.schema import getFields
[7811]32from waeup.kofa.interfaces import (
[7860]33    IBatchProcessor, FatalCSVError, IObjectConverter,
[8214]34    ICSVExporter, DELETION_MARKER, IGNORE_MARKER)
[4806]35
36class BatchProcessor(grok.GlobalUtility):
37    """A processor to add, update, or remove data.
38
39    This is a non-active baseclass.
40    """
[8220]41    grok.implements(IBatchProcessor)
[4806]42    grok.context(Interface)
43    grok.baseclass()
44
45    # Name used in pages and forms...
[7933]46    name = u'Non-registered base processor'
[6259]47
[4806]48    # Internal name...
[7933]49    util_name = 'baseprocessor'
[6259]50
[4806]51    # Items for this processor need an interface with zope.schema fields.
[5009]52    iface = Interface
[6259]53
[4806]54    # The name must be the same as the util_name attribute in order to
55    # register this utility correctly.
56    grok.name(util_name)
57
58    # Headers needed to locate items...
59    location_fields = ['code', 'faculty_code']
[6259]60
[4806]61    # A factory with this name must be registered...
62    factory_name = 'waeup.Department'
63
64    @property
65    def required_fields(self):
[4829]66        """Required fields that have no default.
67
68        A list of names of field, whose value cannot be set if not
69        given during creation. Therefore these fields must exist in
70        input.
71
72        Fields with a default != missing_value do not belong to this
73        category.
74        """
[4806]75        result = []
76        for key, field in getFields(self.iface).items():
77            if key in self.location_fields:
78                continue
[4829]79            if field.default is not field.missing_value:
80                continue
[4806]81            if field.required:
82                result.append(key)
83        return result
[6259]84
[4806]85    @property
86    def req(self):
87        result = dict(
88            create = self.location_fields + self.required_fields,
89            update = self.location_fields,
90            remove = self.location_fields,
91        )
92        return result
93
94    @property
95    def available_fields(self):
96        return sorted(list(set(
97                    self.location_fields + getFields(self.iface).keys())))
[6259]98
[4806]99    def getHeaders(self, mode='create'):
100        return self.available_fields
101
102    def checkHeaders(self, headerfields, mode='create'):
103        req = self.req[mode]
104        # Check for required fields...
105        for field in req:
106            if not field in headerfields:
107                raise FatalCSVError(
108                    "Need at least columns %s for import!" %
109                    ', '.join(["'%s'" % x for x in req]))
[6828]110        # Check for double fields. Cannot happen because this error is
111        # already catched in views
[4806]112        not_ignored_fields = [x for x in headerfields
113                              if not x.startswith('--')]
114        if len(set(not_ignored_fields)) < len(not_ignored_fields):
115            raise FatalCSVError(
116                "Double headers: each column name may only appear once.")
117        return True
118
119    def applyMapping(self, row, mapping):
[4811]120        """Apply mapping to a row of CSV data.
[6824]121
[4811]122        """
[4806]123        result = dict()
124        for key, replacement in mapping.items():
[6824]125            if replacement == u'--IGNORE--':
126                # Skip ignored columns in failed and finished data files.
127                continue
[4806]128            result[replacement] = row[key]
129        return result
[6259]130
[4832]131    def getMapping(self, path, headerfields, mode):
[6824]132        """Get a mapping from CSV file headerfields to actually used fieldnames.
133
[4811]134        """
[4832]135        result = dict()
[4806]136        reader = csv.reader(open(path, 'rb'))
137        raw_header = reader.next()
[4832]138        for num, field in enumerate(headerfields):
139            if field not in self.location_fields and mode == 'remove':
[6824]140                # Skip non-location fields when removing.
141                continue
142            if field == u'--IGNORE--':
143                # Skip ignored columns in failed and finished data files.
144                continue
[4832]145            result[raw_header[num]] = field
146        return result
[4806]147
[6273]148    def stringFromErrs(self, errors, inv_errors):
149        result = []
150        for err in errors:
151            fieldname, message = err
152            result.append("%s: %s" % (fieldname, message))
153        for err in inv_errors:
154            result.append("invariant: %s" % err)
155        return '; '.join(result)
156
[4806]157    def callFactory(self, *args, **kw):
158        return createObject(self.factory_name)
159
160    def parentsExist(self, row, site):
[4811]161        """Tell whether the parent object for data in ``row`` exists.
162        """
[4806]163        raise NotImplementedError('method not implemented')
164
165    def entryExists(self, row, site):
[4811]166        """Tell whether there already exists an entry for ``row`` data.
167        """
[4806]168        raise NotImplementedError('method not implemented')
169
170    def getParent(self, row, site):
[4811]171        """Get the parent object for the entry in ``row``.
172        """
[4806]173        raise NotImplementedError('method not implemented')
[6259]174
[5009]175    def getEntry(self, row, site):
176        """Get the parent object for the entry in ``row``.
177        """
178        raise NotImplementedError('method not implemented')
[6259]179
[4806]180    def addEntry(self, obj, row, site):
[4811]181        """Add the entry given given by ``row`` data.
182        """
[4806]183        raise NotImplementedError('method not implemented')
184
185    def delEntry(self, row, site):
[4811]186        """Delete entry given by ``row`` data.
187        """
[6259]188        raise NotImplementedError('method not implemented')
[4806]189
[7950]190    def checkUpdateRequirements(self, obj, row, site):
191        """Checks requirements the object must fulfill when being updated.
[7938]192
193        This method is not used in case of deleting or adding objects.
194
[7950]195        Returns error messages as strings in case of requirement
[7938]196        problems.
[7937]197        """
[7938]198        return None
[7937]199
[4806]200    def updateEntry(self, obj, row, site):
[4984]201        """Update obj to the values given in row.
[8220]202
203        Returns a string describing the fields changed.
[4984]204        """
[8220]205        changed = []
[4829]206        for key, value in row.items():
[8220]207            # Skip fields to be ignored.
208            if value == IGNORE_MARKER:
209                continue
[6847]210            # Skip fields not declared in interface.
[8220]211            if not hasattr(obj, key):
212                continue
213            setattr(obj, key, value)
[8222]214            log_value = getattr(value, 'code', value)
215            changed.append('%s=%s' % (key, log_value))
[8220]216        return ', '.join(changed)
[4821]217
[4832]218    def createLogfile(self, path, fail_path, num, warnings, mode, user,
[4885]219                      timedelta, logger=None):
220        """Write to log file.
[4821]221        """
[4885]222        if logger is None:
223            return
224        status = 'OK'
225        if warnings > 0:
226            status = 'FAILED'
227        logger.info("-" * 20)
228        logger.info("%s: Batch processing finished: %s" % (user, status))
229        logger.info("%s: Source: %s" % (user, path))
230        logger.info("%s: Mode: %s" % (user, mode))
231        logger.info("%s: User: %s" % (user, user))
232        if warnings > 0:
[4900]233            logger.info("%s: Failed datasets: %s" % (
234                    user, os.path.basename(fail_path)))
[4885]235        logger.info("%s: Processing time: %0.3f s (%0.4f s/item)" % (
236                user, timedelta, timedelta/(num or 1)))
237        logger.info("%s: Processed: %s lines (%s successful/ %s failed)" % (
238                user, num, num - warnings, warnings
[4821]239                ))
[4885]240        logger.info("-" * 20)
[4821]241        return
[4877]242
243    def writeFailedRow(self, writer, row, warnings):
244        """Write a row with error messages to error CSV.
245
246        If warnings is a list of strings, they will be concatenated.
247        """
248        error_col = warnings
249        if isinstance(warnings, list):
250            error_col = ' / '.join(warnings)
251        row['--ERRORS--'] = error_col
252        writer.writerow(row)
253        return
[6259]254
[8220]255    def checkConversion(self, row, mode='ignore', ignore_empty=True):
[6847]256        """Validates all values in row.
257        """
258        converter = IObjectConverter(self.iface)
259        errs, inv_errs, conv_dict =  converter.fromStringDict(
[8220]260            row, self.factory_name, mode=mode)
[6847]261        return errs, inv_errs, conv_dict
262
[4885]263    def doImport(self, path, headerfields, mode='create', user='Unknown',
[8220]264                 logger=None, ignore_empty=True):
[4811]265        """Perform actual import.
266        """
[4832]267        time_start = time.time()
[4806]268        self.checkHeaders(headerfields, mode)
[4832]269        mapping = self.getMapping(path, headerfields, mode)
[4806]270        reader = csv.DictReader(open(path, 'rb'))
[4889]271
[4900]272        temp_dir = tempfile.mkdtemp()
[6259]273
[6273]274        base = os.path.basename(path)
275        (base, ext) = os.path.splitext(base)
[4900]276        failed_path = os.path.join(temp_dir, "%s.pending%s" % (base, ext))
[6831]277        failed_headers = mapping.values()
[4877]278        failed_headers.append('--ERRORS--')
[4821]279        failed_writer = csv.DictWriter(open(failed_path, 'wb'),
280                                       failed_headers)
[6831]281        failed_writer.writerow(dict([(x,x) for x in failed_headers]))
[4891]282
[4900]283        finished_path = os.path.join(temp_dir, "%s.finished%s" % (base, ext))
[6831]284        finished_headers = mapping.values()
[4891]285        finished_writer = csv.DictWriter(open(finished_path, 'wb'),
286                                         finished_headers)
287        finished_writer.writerow(dict([(x,x) for x in finished_headers]))
[6259]288
[4806]289        num =0
[4878]290        num_warns = 0
[4806]291        site = grok.getSite()
[7859]292
[4806]293        for raw_row in reader:
294            num += 1
295            string_row = self.applyMapping(raw_row, mapping)
[8222]296            if ignore_empty and mode in ('update',):
[8220]297                # replace empty strings with ignore-markers
[8222]298                for key, val in string_row.items():
[8220]299                    if val == '':
[8222]300                        string_row[key] = IGNORE_MARKER
301            row = dict(string_row.items()) # create deep copy
[6847]302            errs, inv_errs, conv_dict = self.checkConversion(string_row, mode)
[6273]303            if errs or inv_errs:
[4878]304                num_warns += 1
[6273]305                conv_warnings = self.stringFromErrs(errs, inv_errs)
306                self.writeFailedRow(
[6824]307                    failed_writer, string_row, conv_warnings)
[4821]308                continue
[6273]309            row.update(conv_dict)
[6259]310
[4806]311            if mode == 'create':
312                if not self.parentsExist(row, site):
[4878]313                    num_warns += 1
[4877]314                    self.writeFailedRow(
[6824]315                        failed_writer, string_row,
[4877]316                        "Not all parents do exist yet. Skipping")
[4806]317                    continue
318                if self.entryExists(row, site):
[4878]319                    num_warns += 1
[4877]320                    self.writeFailedRow(
[6824]321                        failed_writer, string_row,
[8220]322                        "This object already exists in the same container. "
323                        "Skipping.")
[4806]324                    continue
325                obj = self.callFactory()
[7273]326                # Override all values in row, also
327                # student_ids and applicant_ids which have been
328                # generated in the respective __init__ methods before.
[7649]329                self.updateEntry(obj, row, site)
[6243]330                try:
331                    self.addEntry(obj, row, site)
[6273]332                except KeyError, error:
[6219]333                    num_warns += 1
334                    self.writeFailedRow(
[6824]335                        failed_writer, string_row,
[6273]336                        "%s Skipping." % error.message)
[6219]337                    continue
[4806]338            elif mode == 'remove':
339                if not self.entryExists(row, site):
[4878]340                    num_warns += 1
[4877]341                    self.writeFailedRow(
[6824]342                        failed_writer, string_row,
[4877]343                        "Cannot remove: no such entry.")
[4806]344                    continue
345                self.delEntry(row, site)
346            elif mode == 'update':
347                obj = self.getEntry(row, site)
348                if obj is None:
[4878]349                    num_warns += 1
[4877]350                    self.writeFailedRow(
[6824]351                        failed_writer, string_row,
[4877]352                        "Cannot update: no such entry.")
[4806]353                    continue
[7950]354                update_errors = self.checkUpdateRequirements(obj, row, site)
[7938]355                if update_errors is not None:
[7937]356                    num_warns += 1
357                    self.writeFailedRow(
358                        failed_writer, string_row, update_errors)
359                    continue
[4806]360                self.updateEntry(obj, row, site)
[4891]361            finished_writer.writerow(string_row)
[4821]362
[4832]363        time_end = time.time()
364        timedelta = time_end - time_start
[6259]365
[4878]366        self.createLogfile(path, failed_path, num, num_warns, mode, user,
[4885]367                           timedelta, logger=logger)
[4894]368        failed_path = os.path.abspath(failed_path)
[4878]369        if num_warns == 0:
[4821]370            del failed_writer
371            os.unlink(failed_path)
[4894]372            failed_path = None
373        return (num, num_warns,
374                os.path.abspath(finished_path), failed_path)
[7859]375
376class ExporterBase(object):
377    """A base for exporters.
378    """
379    grok.implements(ICSVExporter)
380
381    #: Fieldnames considered by this exporter
382    fields = ('code', 'title', 'title_prefix')
383
[7907]384    #: The title under which this exporter will be displayed
385    #: (if registered as a utility)
386    title = 'Override this title'
387
[7859]388    def mangle_value(self, value, name, context=None):
389        """Hook for mangling values in derived classes
390        """
391        if isinstance(value, bool):
392            value = value and '1' or '0'
393        elif isinstance(value, unicode):
394            # CSV writers like byte streams better than unicode
395            value = value.encode('utf-8')
396        elif value is None:
397            # None is not really representable in CSV files
398            value = ''
399        return value
400
401    def get_csv_writer(self, filepath=None):
402        """Get a CSV dict writer instance open for writing.
403
404        Returns a tuple (<writer>, <outfile>) where ``<writer>`` is a
405        :class:`csv.DictWriter` instance and outfile is the real file
406        which is written to. The latter is important when writing to
407        StringIO and can normally be ignored otherwise.
408
409        The returned file will already be filled with the header row.
410
411        Please note that if you give a filepath, the returned outfile
412        is open for writing only and you might have to close it before
413        reopening it for reading.
414        """
415        if filepath is None:
416            outfile = StringIO()
417        else:
418            outfile = open(filepath, 'wb')
419        writer = csv.DictWriter(outfile, self.fields)
420        writer.writerow(dict(zip(self.fields, self.fields))) # header
421        return writer, outfile
422
423    def write_item(self, obj, writer):
424        """Write a row extracted from `obj` into CSV file using `writer`.
425        """
426        row = {}
427        for name in self.fields:
428            value = getattr(obj, name, None)
429            value = self.mangle_value(value, name, obj)
430            row[name] = value
431        writer.writerow(row)
432        return
433
434    def close_outfile(self, filepath, outfile):
435        """Close outfile.
436
437        If filepath is None, the contents of outfile is returned.
438        """
439        outfile.seek(0)
440        if filepath is None:
441            return outfile.read()
442        outfile.close()
443        return
444
445    def export(self, iterable, filepath=None):
446        """Export `iterable` as CSV file.
447
448        If `filepath` is ``None``, a raw string with CSV data should
449        be returned.
450        """
451        raise NotImplementedError
452
453    def export_all(self, site, filepath=None):
454        """Export all appropriate objects in `site` into `filepath` as
455        CSV data.
456
457        If `filepath` is ``None``, a raw string with CSV data should
458        be returned.
459        """
460        raise NotImplementedError
Note: See TracBrowser for help on using the repository browser.