source: main/waeup.sirp/trunk/src/waeup/sirp/utils/batching.py @ 7165

Last change on this file since 7165 was 7137, checked in by Henrik Bettermann, 13 years ago

Set value Id for property svn:keywords in all Python files.

  • Property svn:keywords set to Id
File size: 11.3 KB
RevLine 
[4806]1"""WAeUP components for batch processing.
2
3Batch processors eat CSV files to add, update or remove large numbers
4of certain kinds of objects at once.
5"""
6import grok
[4870]7import copy
[4806]8import csv
[4821]9import os
10import sys
[4900]11import tempfile
[4821]12import time
[4806]13from zope.component import createObject
14from zope.interface import Interface
15from zope.schema import getFields
[5005]16from waeup.sirp.interfaces import (
[6276]17    IBatchProcessor, FatalCSVError, DuplicationError, IObjectConverter)
[4806]18
19class BatchProcessor(grok.GlobalUtility):
20    """A processor to add, update, or remove data.
21
22    This is a non-active baseclass.
23    """
[4831]24    grok.provides(IBatchProcessor)
[4806]25    grok.context(Interface)
26    grok.baseclass()
27
28    # Name used in pages and forms...
[5009]29    name = u'Non-registered base importer'
[6259]30
[4806]31    # Internal name...
[5009]32    util_name = 'baseimporter'
[6259]33
[4806]34    # Items for this processor need an interface with zope.schema fields.
[5009]35    iface = Interface
[6259]36
[4806]37    # The name must be the same as the util_name attribute in order to
38    # register this utility correctly.
39    grok.name(util_name)
40
41    # Headers needed to locate items...
42    location_fields = ['code', 'faculty_code']
[6259]43
[4806]44    # A factory with this name must be registered...
45    factory_name = 'waeup.Department'
46
47    @property
48    def required_fields(self):
[4829]49        """Required fields that have no default.
50
51        A list of names of field, whose value cannot be set if not
52        given during creation. Therefore these fields must exist in
53        input.
54
55        Fields with a default != missing_value do not belong to this
56        category.
57        """
[4806]58        result = []
59        for key, field in getFields(self.iface).items():
60            if key in self.location_fields:
61                continue
[4829]62            if field.default is not field.missing_value:
63                continue
[4806]64            if field.required:
65                result.append(key)
66        return result
[6259]67
[4806]68    @property
69    def req(self):
70        result = dict(
71            create = self.location_fields + self.required_fields,
72            update = self.location_fields,
73            remove = self.location_fields,
74        )
75        return result
76
77    @property
78    def available_fields(self):
79        result = []
80        return sorted(list(set(
81                    self.location_fields + getFields(self.iface).keys())))
[6259]82
[4806]83    def getHeaders(self, mode='create'):
84        return self.available_fields
85
86    def checkHeaders(self, headerfields, mode='create'):
87        req = self.req[mode]
88        # Check for required fields...
89        for field in req:
90            if not field in headerfields:
91                raise FatalCSVError(
92                    "Need at least columns %s for import!" %
93                    ', '.join(["'%s'" % x for x in req]))
[6828]94        # Check for double fields. Cannot happen because this error is
95        # already catched in views
[4806]96        not_ignored_fields = [x for x in headerfields
97                              if not x.startswith('--')]
98        if len(set(not_ignored_fields)) < len(not_ignored_fields):
99            raise FatalCSVError(
100                "Double headers: each column name may only appear once.")
101        return True
102
103    def applyMapping(self, row, mapping):
[4811]104        """Apply mapping to a row of CSV data.
[6824]105
[4811]106        """
[4806]107        result = dict()
108        for key, replacement in mapping.items():
[6824]109            if replacement == u'--IGNORE--':
110                # Skip ignored columns in failed and finished data files.
111                continue
[4806]112            result[replacement] = row[key]
113        return result
[6259]114
[4832]115    def getMapping(self, path, headerfields, mode):
[6824]116        """Get a mapping from CSV file headerfields to actually used fieldnames.
117
[4811]118        """
[4832]119        result = dict()
[4806]120        reader = csv.reader(open(path, 'rb'))
121        raw_header = reader.next()
[4832]122        for num, field in enumerate(headerfields):
123            if field not in self.location_fields and mode == 'remove':
[6824]124                # Skip non-location fields when removing.
125                continue
126            if field == u'--IGNORE--':
127                # Skip ignored columns in failed and finished data files.
128                continue
[4832]129            result[raw_header[num]] = field
130        return result
[4806]131
[6273]132    def stringFromErrs(self, errors, inv_errors):
133        result = []
134        for err in errors:
135            fieldname, message = err
136            result.append("%s: %s" % (fieldname, message))
137        for err in inv_errors:
138            result.append("invariant: %s" % err)
139        return '; '.join(result)
140
[4806]141    def callFactory(self, *args, **kw):
142        return createObject(self.factory_name)
143
144    def parentsExist(self, row, site):
[4811]145        """Tell whether the parent object for data in ``row`` exists.
146        """
[4806]147        raise NotImplementedError('method not implemented')
148
149    def entryExists(self, row, site):
[4811]150        """Tell whether there already exists an entry for ``row`` data.
151        """
[4806]152        raise NotImplementedError('method not implemented')
153
154    def getParent(self, row, site):
[4811]155        """Get the parent object for the entry in ``row``.
156        """
[4806]157        raise NotImplementedError('method not implemented')
[6259]158
[5009]159    def getEntry(self, row, site):
160        """Get the parent object for the entry in ``row``.
161        """
162        raise NotImplementedError('method not implemented')
[6259]163
[4806]164    def addEntry(self, obj, row, site):
[4811]165        """Add the entry given given by ``row`` data.
166        """
[4806]167        raise NotImplementedError('method not implemented')
168
169    def delEntry(self, row, site):
[4811]170        """Delete entry given by ``row`` data.
171        """
[6259]172        raise NotImplementedError('method not implemented')
[4806]173
174    def updateEntry(self, obj, row, site):
[4984]175        """Update obj to the values given in row.
176        """
[4829]177        for key, value in row.items():
[6847]178            # Skip fields not declared in interface.
[6833]179            if hasattr(obj, key):
180                setattr(obj, key, value)
[4829]181        return
[4821]182
[4832]183    def createLogfile(self, path, fail_path, num, warnings, mode, user,
[4885]184                      timedelta, logger=None):
185        """Write to log file.
[4821]186        """
[4885]187        if logger is None:
188            return
189        status = 'OK'
190        if warnings > 0:
191            status = 'FAILED'
192        logger.info("-" * 20)
193        logger.info("%s: Batch processing finished: %s" % (user, status))
194        logger.info("%s: Source: %s" % (user, path))
195        logger.info("%s: Mode: %s" % (user, mode))
196        logger.info("%s: User: %s" % (user, user))
197        if warnings > 0:
[4900]198            logger.info("%s: Failed datasets: %s" % (
199                    user, os.path.basename(fail_path)))
[4885]200        logger.info("%s: Processing time: %0.3f s (%0.4f s/item)" % (
201                user, timedelta, timedelta/(num or 1)))
202        logger.info("%s: Processed: %s lines (%s successful/ %s failed)" % (
203                user, num, num - warnings, warnings
[4821]204                ))
[4885]205        logger.info("-" * 20)
[4821]206        return
[4877]207
208    def writeFailedRow(self, writer, row, warnings):
209        """Write a row with error messages to error CSV.
210
211        If warnings is a list of strings, they will be concatenated.
212        """
213        error_col = warnings
214        if isinstance(warnings, list):
215            error_col = ' / '.join(warnings)
216        row['--ERRORS--'] = error_col
217        writer.writerow(row)
218        return
[6259]219
[6847]220    def checkConversion(self, row, mode='ignore'):
221        """Validates all values in row.
222        """
223        converter = IObjectConverter(self.iface)
224        errs, inv_errs, conv_dict =  converter.fromStringDict(
225            row, self.factory_name)
226        return errs, inv_errs, conv_dict
227
[4885]228    def doImport(self, path, headerfields, mode='create', user='Unknown',
229                 logger=None):
[4811]230        """Perform actual import.
231        """
[4832]232        time_start = time.time()
[4806]233        self.checkHeaders(headerfields, mode)
[4832]234        mapping = self.getMapping(path, headerfields, mode)
[4806]235        reader = csv.DictReader(open(path, 'rb'))
[4889]236
[4900]237        temp_dir = tempfile.mkdtemp()
[6259]238
[6273]239        base = os.path.basename(path)
240        (base, ext) = os.path.splitext(base)
[4900]241        failed_path = os.path.join(temp_dir, "%s.pending%s" % (base, ext))
[6831]242        failed_headers = mapping.values()
[4877]243        failed_headers.append('--ERRORS--')
[4821]244        failed_writer = csv.DictWriter(open(failed_path, 'wb'),
245                                       failed_headers)
[6831]246        failed_writer.writerow(dict([(x,x) for x in failed_headers]))
[4891]247
[4900]248        finished_path = os.path.join(temp_dir, "%s.finished%s" % (base, ext))
[6831]249        finished_headers = mapping.values()
[4891]250        finished_writer = csv.DictWriter(open(finished_path, 'wb'),
251                                         finished_headers)
252        finished_writer.writerow(dict([(x,x) for x in finished_headers]))
[6259]253
[4806]254        num =0
[4878]255        num_warns = 0
[4806]256        site = grok.getSite()
[6847]257       
[4806]258        for raw_row in reader:
259            num += 1
260            string_row = self.applyMapping(raw_row, mapping)
[6273]261            row = dict(string_row.items()) # create deep copy
[6847]262            errs, inv_errs, conv_dict = self.checkConversion(string_row, mode)
[6273]263            if errs or inv_errs:
[4878]264                num_warns += 1
[6273]265                conv_warnings = self.stringFromErrs(errs, inv_errs)
266                self.writeFailedRow(
[6824]267                    failed_writer, string_row, conv_warnings)
[4821]268                continue
[6273]269            row.update(conv_dict)
[6259]270
[4806]271            if mode == 'create':
272                if not self.parentsExist(row, site):
[4878]273                    num_warns += 1
[4877]274                    self.writeFailedRow(
[6824]275                        failed_writer, string_row,
[4877]276                        "Not all parents do exist yet. Skipping")
[4806]277                    continue
278                if self.entryExists(row, site):
[4878]279                    num_warns += 1
[4877]280                    self.writeFailedRow(
[6824]281                        failed_writer, string_row,
[6219]282                        "This object already exists in the same container. Skipping.")
[4806]283                    continue
284                obj = self.callFactory()
[6833]285                self.updateEntry(obj, row, site)
[6243]286                try:
287                    self.addEntry(obj, row, site)
[6273]288                except KeyError, error:
[6219]289                    num_warns += 1
290                    self.writeFailedRow(
[6824]291                        failed_writer, string_row,
[6273]292                        "%s Skipping." % error.message)
[6219]293                    continue
[4806]294            elif mode == 'remove':
295                if not self.entryExists(row, site):
[4878]296                    num_warns += 1
[4877]297                    self.writeFailedRow(
[6824]298                        failed_writer, string_row,
[4877]299                        "Cannot remove: no such entry.")
[4806]300                    continue
301                self.delEntry(row, site)
302            elif mode == 'update':
303                obj = self.getEntry(row, site)
304                if obj is None:
[4878]305                    num_warns += 1
[4877]306                    self.writeFailedRow(
[6824]307                        failed_writer, string_row,
[4877]308                        "Cannot update: no such entry.")
[4806]309                    continue
310                self.updateEntry(obj, row, site)
[4891]311            finished_writer.writerow(string_row)
[4821]312
[4832]313        time_end = time.time()
314        timedelta = time_end - time_start
[6259]315
[4878]316        self.createLogfile(path, failed_path, num, num_warns, mode, user,
[4885]317                           timedelta, logger=logger)
[4894]318        failed_path = os.path.abspath(failed_path)
[4878]319        if num_warns == 0:
[4821]320            del failed_writer
321            os.unlink(failed_path)
[4894]322            failed_path = None
323        return (num, num_warns,
324                os.path.abspath(finished_path), failed_path)
Note: See TracBrowser for help on using the repository browser.