source: main/waeup.sirp/trunk/src/waeup/sirp/utils/batching.py @ 6998

Last change on this file since 6998 was 6847, checked in by Henrik Bettermann, 13 years ago

Define conversion checking method which can be customized by derived processors.

File size: 11.3 KB
RevLine 
[4806]1"""WAeUP components for batch processing.
2
3Batch processors eat CSV files to add, update or remove large numbers
4of certain kinds of objects at once.
5"""
6import grok
[4870]7import copy
[4806]8import csv
[4821]9import os
10import sys
[4900]11import tempfile
[4821]12import time
[4806]13from zope.component import createObject
14from zope.interface import Interface
15from zope.schema import getFields
[5005]16from waeup.sirp.interfaces import (
[6276]17    IBatchProcessor, FatalCSVError, DuplicationError, IObjectConverter)
[4806]18
19class BatchProcessor(grok.GlobalUtility):
20    """A processor to add, update, or remove data.
21
22    This is a non-active baseclass.
23    """
[4831]24    grok.provides(IBatchProcessor)
[4806]25    grok.context(Interface)
26    grok.baseclass()
27
28    # Name used in pages and forms...
[5009]29    name = u'Non-registered base importer'
[6259]30
[4806]31    # Internal name...
[5009]32    util_name = 'baseimporter'
[6259]33
[4806]34    # Items for this processor need an interface with zope.schema fields.
[5009]35    iface = Interface
[6259]36
[4806]37    # The name must be the same as the util_name attribute in order to
38    # register this utility correctly.
39    grok.name(util_name)
40
41    # Headers needed to locate items...
42    location_fields = ['code', 'faculty_code']
[6259]43
[4806]44    # A factory with this name must be registered...
45    factory_name = 'waeup.Department'
46
47    @property
48    def required_fields(self):
[4829]49        """Required fields that have no default.
50
51        A list of names of field, whose value cannot be set if not
52        given during creation. Therefore these fields must exist in
53        input.
54
55        Fields with a default != missing_value do not belong to this
56        category.
57        """
[4806]58        result = []
59        for key, field in getFields(self.iface).items():
60            if key in self.location_fields:
61                continue
[4829]62            if field.default is not field.missing_value:
63                continue
[4806]64            if field.required:
65                result.append(key)
66        return result
[6259]67
[4806]68    @property
69    def req(self):
70        result = dict(
71            create = self.location_fields + self.required_fields,
72            update = self.location_fields,
73            remove = self.location_fields,
74        )
75        return result
76
77    @property
78    def available_fields(self):
79        result = []
80        return sorted(list(set(
81                    self.location_fields + getFields(self.iface).keys())))
[6259]82
[4806]83    def getHeaders(self, mode='create'):
84        return self.available_fields
85
86    def checkHeaders(self, headerfields, mode='create'):
87        req = self.req[mode]
88        # Check for required fields...
89        for field in req:
90            if not field in headerfields:
91                raise FatalCSVError(
92                    "Need at least columns %s for import!" %
93                    ', '.join(["'%s'" % x for x in req]))
[6828]94        # Check for double fields. Cannot happen because this error is
95        # already catched in views
[4806]96        not_ignored_fields = [x for x in headerfields
97                              if not x.startswith('--')]
98        if len(set(not_ignored_fields)) < len(not_ignored_fields):
99            raise FatalCSVError(
100                "Double headers: each column name may only appear once.")
101        return True
102
103    def applyMapping(self, row, mapping):
[4811]104        """Apply mapping to a row of CSV data.
[6824]105
[4811]106        """
[4806]107        result = dict()
108        for key, replacement in mapping.items():
[6824]109            if replacement == u'--IGNORE--':
110                # Skip ignored columns in failed and finished data files.
111                continue
[4806]112            result[replacement] = row[key]
113        return result
[6259]114
[4832]115    def getMapping(self, path, headerfields, mode):
[6824]116        """Get a mapping from CSV file headerfields to actually used fieldnames.
117
[4811]118        """
[4832]119        result = dict()
[4806]120        reader = csv.reader(open(path, 'rb'))
121        raw_header = reader.next()
[4832]122        for num, field in enumerate(headerfields):
123            if field not in self.location_fields and mode == 'remove':
[6824]124                # Skip non-location fields when removing.
125                continue
126            if field == u'--IGNORE--':
127                # Skip ignored columns in failed and finished data files.
128                continue
[4832]129            result[raw_header[num]] = field
130        return result
[4806]131
[6273]132    def stringFromErrs(self, errors, inv_errors):
133        result = []
134        for err in errors:
135            fieldname, message = err
136            result.append("%s: %s" % (fieldname, message))
137        for err in inv_errors:
138            result.append("invariant: %s" % err)
139        return '; '.join(result)
140
[4806]141    def callFactory(self, *args, **kw):
142        return createObject(self.factory_name)
143
144    def parentsExist(self, row, site):
[4811]145        """Tell whether the parent object for data in ``row`` exists.
146        """
[4806]147        raise NotImplementedError('method not implemented')
148
149    def entryExists(self, row, site):
[4811]150        """Tell whether there already exists an entry for ``row`` data.
151        """
[4806]152        raise NotImplementedError('method not implemented')
153
154    def getParent(self, row, site):
[4811]155        """Get the parent object for the entry in ``row``.
156        """
[4806]157        raise NotImplementedError('method not implemented')
[6259]158
[5009]159    def getEntry(self, row, site):
160        """Get the parent object for the entry in ``row``.
161        """
162        raise NotImplementedError('method not implemented')
[6259]163
[4806]164    def addEntry(self, obj, row, site):
[4811]165        """Add the entry given given by ``row`` data.
166        """
[4806]167        raise NotImplementedError('method not implemented')
168
169    def delEntry(self, row, site):
[4811]170        """Delete entry given by ``row`` data.
171        """
[6259]172        raise NotImplementedError('method not implemented')
[4806]173
174    def updateEntry(self, obj, row, site):
[4984]175        """Update obj to the values given in row.
176        """
[4829]177        for key, value in row.items():
[6847]178            # Skip fields not declared in interface.
[6833]179            if hasattr(obj, key):
180                setattr(obj, key, value)
[4829]181        return
[4821]182
[4832]183    def createLogfile(self, path, fail_path, num, warnings, mode, user,
[4885]184                      timedelta, logger=None):
185        """Write to log file.
[4821]186        """
[4885]187        if logger is None:
188            return
189        status = 'OK'
190        if warnings > 0:
191            status = 'FAILED'
192        logger.info("-" * 20)
193        logger.info("%s: Batch processing finished: %s" % (user, status))
194        logger.info("%s: Source: %s" % (user, path))
195        logger.info("%s: Mode: %s" % (user, mode))
196        logger.info("%s: User: %s" % (user, user))
197        if warnings > 0:
[4900]198            logger.info("%s: Failed datasets: %s" % (
199                    user, os.path.basename(fail_path)))
[4885]200        logger.info("%s: Processing time: %0.3f s (%0.4f s/item)" % (
201                user, timedelta, timedelta/(num or 1)))
202        logger.info("%s: Processed: %s lines (%s successful/ %s failed)" % (
203                user, num, num - warnings, warnings
[4821]204                ))
[4885]205        logger.info("-" * 20)
[4821]206        return
[4877]207
208    def writeFailedRow(self, writer, row, warnings):
209        """Write a row with error messages to error CSV.
210
211        If warnings is a list of strings, they will be concatenated.
212        """
213        error_col = warnings
214        if isinstance(warnings, list):
215            error_col = ' / '.join(warnings)
216        row['--ERRORS--'] = error_col
217        writer.writerow(row)
218        return
[6259]219
[6847]220    def checkConversion(self, row, mode='ignore'):
221        """Validates all values in row.
222        """
223        converter = IObjectConverter(self.iface)
224        errs, inv_errs, conv_dict =  converter.fromStringDict(
225            row, self.factory_name)
226        return errs, inv_errs, conv_dict
227
[4885]228    def doImport(self, path, headerfields, mode='create', user='Unknown',
229                 logger=None):
[4811]230        """Perform actual import.
231        """
[4832]232        time_start = time.time()
[4806]233        self.checkHeaders(headerfields, mode)
[4832]234        mapping = self.getMapping(path, headerfields, mode)
[4806]235        reader = csv.DictReader(open(path, 'rb'))
[4889]236
[4900]237        temp_dir = tempfile.mkdtemp()
[6259]238
[6273]239        base = os.path.basename(path)
240        (base, ext) = os.path.splitext(base)
[4900]241        failed_path = os.path.join(temp_dir, "%s.pending%s" % (base, ext))
[6831]242        failed_headers = mapping.values()
[4877]243        failed_headers.append('--ERRORS--')
[4821]244        failed_writer = csv.DictWriter(open(failed_path, 'wb'),
245                                       failed_headers)
[6831]246        failed_writer.writerow(dict([(x,x) for x in failed_headers]))
[4891]247
[4900]248        finished_path = os.path.join(temp_dir, "%s.finished%s" % (base, ext))
[6831]249        finished_headers = mapping.values()
[4891]250        finished_writer = csv.DictWriter(open(finished_path, 'wb'),
251                                         finished_headers)
252        finished_writer.writerow(dict([(x,x) for x in finished_headers]))
[6259]253
[4806]254        num =0
[4878]255        num_warns = 0
[4806]256        site = grok.getSite()
[6847]257       
[4806]258        for raw_row in reader:
259            num += 1
260            string_row = self.applyMapping(raw_row, mapping)
[6273]261            row = dict(string_row.items()) # create deep copy
[6847]262            errs, inv_errs, conv_dict = self.checkConversion(string_row, mode)
[6273]263            if errs or inv_errs:
[4878]264                num_warns += 1
[6273]265                conv_warnings = self.stringFromErrs(errs, inv_errs)
266                self.writeFailedRow(
[6824]267                    failed_writer, string_row, conv_warnings)
[4821]268                continue
[6273]269            row.update(conv_dict)
[6259]270
[4806]271            if mode == 'create':
272                if not self.parentsExist(row, site):
[4878]273                    num_warns += 1
[4877]274                    self.writeFailedRow(
[6824]275                        failed_writer, string_row,
[4877]276                        "Not all parents do exist yet. Skipping")
[4806]277                    continue
278                if self.entryExists(row, site):
[4878]279                    num_warns += 1
[4877]280                    self.writeFailedRow(
[6824]281                        failed_writer, string_row,
[6219]282                        "This object already exists in the same container. Skipping.")
[4806]283                    continue
284                obj = self.callFactory()
[6833]285                self.updateEntry(obj, row, site)
[6243]286                try:
287                    self.addEntry(obj, row, site)
[6273]288                except KeyError, error:
[6219]289                    num_warns += 1
290                    self.writeFailedRow(
[6824]291                        failed_writer, string_row,
[6273]292                        "%s Skipping." % error.message)
[6219]293                    continue
[4806]294            elif mode == 'remove':
295                if not self.entryExists(row, site):
[4878]296                    num_warns += 1
[4877]297                    self.writeFailedRow(
[6824]298                        failed_writer, string_row,
[4877]299                        "Cannot remove: no such entry.")
[4806]300                    continue
301                self.delEntry(row, site)
302            elif mode == 'update':
303                obj = self.getEntry(row, site)
304                if obj is None:
[4878]305                    num_warns += 1
[4877]306                    self.writeFailedRow(
[6824]307                        failed_writer, string_row,
[4877]308                        "Cannot update: no such entry.")
[4806]309                    continue
310                self.updateEntry(obj, row, site)
[4891]311            finished_writer.writerow(string_row)
[4821]312
[4832]313        time_end = time.time()
314        timedelta = time_end - time_start
[6259]315
[4878]316        self.createLogfile(path, failed_path, num, num_warns, mode, user,
[4885]317                           timedelta, logger=logger)
[4894]318        failed_path = os.path.abspath(failed_path)
[4878]319        if num_warns == 0:
[4821]320            del failed_writer
321            os.unlink(failed_path)
[4894]322            failed_path = None
323        return (num, num_warns,
324                os.path.abspath(finished_path), failed_path)
Note: See TracBrowser for help on using the repository browser.