source: main/waeup.sirp/trunk/src/waeup/sirp/utils/batching.py @ 6837

Last change on this file since 6837 was 6835, checked in by Henrik Bettermann, 13 years ago

Move comment to proper place.

File size: 11.1 KB
RevLine 
[4806]1"""WAeUP components for batch processing.
2
3Batch processors eat CSV files to add, update or remove large numbers
4of certain kinds of objects at once.
5"""
6import grok
[4870]7import copy
[4806]8import csv
[4821]9import os
10import sys
[4900]11import tempfile
[4821]12import time
[4806]13from zope.component import createObject
14from zope.interface import Interface
15from zope.schema import getFields
[5005]16from waeup.sirp.interfaces import (
[6276]17    IBatchProcessor, FatalCSVError, DuplicationError, IObjectConverter)
[4806]18
19class BatchProcessor(grok.GlobalUtility):
20    """A processor to add, update, or remove data.
21
22    This is a non-active baseclass.
23    """
[4831]24    grok.provides(IBatchProcessor)
[4806]25    grok.context(Interface)
26    grok.baseclass()
27
28    # Name used in pages and forms...
[5009]29    name = u'Non-registered base importer'
[6259]30
[4806]31    # Internal name...
[5009]32    util_name = 'baseimporter'
[6259]33
[4806]34    # Items for this processor need an interface with zope.schema fields.
[5009]35    iface = Interface
[6259]36
[4806]37    # The name must be the same as the util_name attribute in order to
38    # register this utility correctly.
39    grok.name(util_name)
40
41    # Headers needed to locate items...
42    location_fields = ['code', 'faculty_code']
[6259]43
[4806]44    # A factory with this name must be registered...
45    factory_name = 'waeup.Department'
46
47    @property
48    def required_fields(self):
[4829]49        """Required fields that have no default.
50
51        A list of names of field, whose value cannot be set if not
52        given during creation. Therefore these fields must exist in
53        input.
54
55        Fields with a default != missing_value do not belong to this
56        category.
57        """
[4806]58        result = []
59        for key, field in getFields(self.iface).items():
60            if key in self.location_fields:
61                continue
[4829]62            if field.default is not field.missing_value:
63                continue
[4806]64            if field.required:
65                result.append(key)
66        return result
[6259]67
[4806]68    @property
69    def req(self):
70        result = dict(
71            create = self.location_fields + self.required_fields,
72            update = self.location_fields,
73            remove = self.location_fields,
74        )
75        return result
76
77    @property
78    def available_fields(self):
79        result = []
80        return sorted(list(set(
81                    self.location_fields + getFields(self.iface).keys())))
[6259]82
[4806]83    def getHeaders(self, mode='create'):
84        return self.available_fields
85
86    def checkHeaders(self, headerfields, mode='create'):
87        req = self.req[mode]
88        # Check for required fields...
89        for field in req:
90            if not field in headerfields:
91                raise FatalCSVError(
92                    "Need at least columns %s for import!" %
93                    ', '.join(["'%s'" % x for x in req]))
[6828]94        # Check for double fields. Cannot happen because this error is
95        # already catched in views
[4806]96        not_ignored_fields = [x for x in headerfields
97                              if not x.startswith('--')]
98        if len(set(not_ignored_fields)) < len(not_ignored_fields):
99            raise FatalCSVError(
100                "Double headers: each column name may only appear once.")
101        return True
102
103    def applyMapping(self, row, mapping):
[4811]104        """Apply mapping to a row of CSV data.
[6824]105
[4811]106        """
[4806]107        result = dict()
108        for key, replacement in mapping.items():
[6824]109            if replacement == u'--IGNORE--':
110                # Skip ignored columns in failed and finished data files.
111                continue
[4806]112            result[replacement] = row[key]
113        return result
[6259]114
[4832]115    def getMapping(self, path, headerfields, mode):
[6824]116        """Get a mapping from CSV file headerfields to actually used fieldnames.
117
[4811]118        """
[4832]119        result = dict()
[4806]120        reader = csv.reader(open(path, 'rb'))
121        raw_header = reader.next()
[4832]122        for num, field in enumerate(headerfields):
123            if field not in self.location_fields and mode == 'remove':
[6824]124                # Skip non-location fields when removing.
125                continue
126            if field == u'--IGNORE--':
127                # Skip ignored columns in failed and finished data files.
128                continue
[4832]129            result[raw_header[num]] = field
130        return result
[4806]131
[6273]132    def stringFromErrs(self, errors, inv_errors):
133        result = []
134        for err in errors:
135            fieldname, message = err
136            result.append("%s: %s" % (fieldname, message))
137        for err in inv_errors:
138            result.append("invariant: %s" % err)
139        return '; '.join(result)
140
[4806]141    def callFactory(self, *args, **kw):
142        return createObject(self.factory_name)
143
144    def parentsExist(self, row, site):
[4811]145        """Tell whether the parent object for data in ``row`` exists.
146        """
[4806]147        raise NotImplementedError('method not implemented')
148
149    def entryExists(self, row, site):
[4811]150        """Tell whether there already exists an entry for ``row`` data.
151        """
[4806]152        raise NotImplementedError('method not implemented')
153
154    def getParent(self, row, site):
[4811]155        """Get the parent object for the entry in ``row``.
156        """
[4806]157        raise NotImplementedError('method not implemented')
[6259]158
[5009]159    def getEntry(self, row, site):
160        """Get the parent object for the entry in ``row``.
161        """
162        raise NotImplementedError('method not implemented')
[6259]163
[4806]164    def addEntry(self, obj, row, site):
[4811]165        """Add the entry given given by ``row`` data.
166        """
[4806]167        raise NotImplementedError('method not implemented')
168
169    def delEntry(self, row, site):
[4811]170        """Delete entry given by ``row`` data.
171        """
[6259]172        raise NotImplementedError('method not implemented')
[4806]173
174    def updateEntry(self, obj, row, site):
[4984]175        """Update obj to the values given in row.
176        """
[4829]177        for key, value in row.items():
[6835]178            # Skip location and catalog fields used to locate the object
[6833]179            if hasattr(obj, key):
180                setattr(obj, key, value)
[4829]181        return
[4821]182
[4832]183    def createLogfile(self, path, fail_path, num, warnings, mode, user,
[4885]184                      timedelta, logger=None):
185        """Write to log file.
[4821]186        """
[4885]187        if logger is None:
188            return
189        status = 'OK'
190        if warnings > 0:
191            status = 'FAILED'
192        logger.info("-" * 20)
193        logger.info("%s: Batch processing finished: %s" % (user, status))
194        logger.info("%s: Source: %s" % (user, path))
195        logger.info("%s: Mode: %s" % (user, mode))
196        logger.info("%s: User: %s" % (user, user))
197        if warnings > 0:
[4900]198            logger.info("%s: Failed datasets: %s" % (
199                    user, os.path.basename(fail_path)))
[4885]200        logger.info("%s: Processing time: %0.3f s (%0.4f s/item)" % (
201                user, timedelta, timedelta/(num or 1)))
202        logger.info("%s: Processed: %s lines (%s successful/ %s failed)" % (
203                user, num, num - warnings, warnings
[4821]204                ))
[4885]205        logger.info("-" * 20)
[4821]206        return
[4877]207
208    def writeFailedRow(self, writer, row, warnings):
209        """Write a row with error messages to error CSV.
210
211        If warnings is a list of strings, they will be concatenated.
212        """
213        error_col = warnings
214        if isinstance(warnings, list):
215            error_col = ' / '.join(warnings)
216        row['--ERRORS--'] = error_col
217        writer.writerow(row)
218        return
[6259]219
[4885]220    def doImport(self, path, headerfields, mode='create', user='Unknown',
221                 logger=None):
[4811]222        """Perform actual import.
223        """
[4832]224        time_start = time.time()
[4806]225        self.checkHeaders(headerfields, mode)
[4832]226        mapping = self.getMapping(path, headerfields, mode)
[4806]227        reader = csv.DictReader(open(path, 'rb'))
[4889]228
[4900]229        temp_dir = tempfile.mkdtemp()
[6259]230
[6273]231        base = os.path.basename(path)
232        (base, ext) = os.path.splitext(base)
[4900]233        failed_path = os.path.join(temp_dir, "%s.pending%s" % (base, ext))
[6831]234        failed_headers = mapping.values()
[4877]235        failed_headers.append('--ERRORS--')
[4821]236        failed_writer = csv.DictWriter(open(failed_path, 'wb'),
237                                       failed_headers)
[6831]238        failed_writer.writerow(dict([(x,x) for x in failed_headers]))
[4891]239
[4900]240        finished_path = os.path.join(temp_dir, "%s.finished%s" % (base, ext))
[6831]241        finished_headers = mapping.values()
[4891]242        finished_writer = csv.DictWriter(open(finished_path, 'wb'),
243                                         finished_headers)
244        finished_writer.writerow(dict([(x,x) for x in finished_headers]))
[6259]245
[4806]246        num =0
[4878]247        num_warns = 0
[4806]248        site = grok.getSite()
[6273]249        converter = IObjectConverter(self.iface)
[4806]250        for raw_row in reader:
251            num += 1
252            string_row = self.applyMapping(raw_row, mapping)
[6273]253            row = dict(string_row.items()) # create deep copy
254            errs, inv_errs, conv_dict =  converter.fromStringDict(
255                string_row, self.factory_name)
256            if errs or inv_errs:
[4878]257                num_warns += 1
[6273]258                conv_warnings = self.stringFromErrs(errs, inv_errs)
259                self.writeFailedRow(
[6824]260                    failed_writer, string_row, conv_warnings)
[4821]261                continue
[6273]262            row.update(conv_dict)
[6259]263
[4806]264            if mode == 'create':
265                if not self.parentsExist(row, site):
[4878]266                    num_warns += 1
[4877]267                    self.writeFailedRow(
[6824]268                        failed_writer, string_row,
[4877]269                        "Not all parents do exist yet. Skipping")
[4806]270                    continue
271                if self.entryExists(row, site):
[4878]272                    num_warns += 1
[4877]273                    self.writeFailedRow(
[6824]274                        failed_writer, string_row,
[6219]275                        "This object already exists in the same container. Skipping.")
[4806]276                    continue
277                obj = self.callFactory()
[6833]278                self.updateEntry(obj, row, site)
[6243]279                try:
280                    self.addEntry(obj, row, site)
[6273]281                except KeyError, error:
[6219]282                    num_warns += 1
283                    self.writeFailedRow(
[6824]284                        failed_writer, string_row,
[6273]285                        "%s Skipping." % error.message)
[6219]286                    continue
[4806]287            elif mode == 'remove':
288                if not self.entryExists(row, site):
[4878]289                    num_warns += 1
[4877]290                    self.writeFailedRow(
[6824]291                        failed_writer, string_row,
[4877]292                        "Cannot remove: no such entry.")
[4806]293                    continue
294                self.delEntry(row, site)
295            elif mode == 'update':
296                obj = self.getEntry(row, site)
297                if obj is None:
[4878]298                    num_warns += 1
[4877]299                    self.writeFailedRow(
[6824]300                        failed_writer, string_row,
[4877]301                        "Cannot update: no such entry.")
[4806]302                    continue
303                self.updateEntry(obj, row, site)
[4891]304            finished_writer.writerow(string_row)
[4821]305
[4832]306        time_end = time.time()
307        timedelta = time_end - time_start
[6259]308
[4878]309        self.createLogfile(path, failed_path, num, num_warns, mode, user,
[4885]310                           timedelta, logger=logger)
[4894]311        failed_path = os.path.abspath(failed_path)
[4878]312        if num_warns == 0:
[4821]313            del failed_writer
314            os.unlink(failed_path)
[4894]315            failed_path = None
316        return (num, num_warns,
317                os.path.abspath(finished_path), failed_path)
Note: See TracBrowser for help on using the repository browser.