source: main/waeup.sirp/trunk/src/waeup/sirp/utils/batching.py @ 6274

Last change on this file since 6274 was 6273, checked in by uli, 14 years ago

Finally make the new converter work. API-wise it is as good as the old one (can import everyting, the old one could),
but design-wise it might be much more powerfull. Basically it can handle/convert all content-types for which one can
create an Add- or EditForm? successfully. In other words: if you manage to write an edit form for some content type,
then you can also create an importer for that content-type. Still finetuning needed (for dates, bool data, etc.) but
the main things work.

File size: 11.9 KB
RevLine 
[4806]1"""WAeUP components for batch processing.
2
3Batch processors eat CSV files to add, update or remove large numbers
4of certain kinds of objects at once.
5"""
6import grok
[4870]7import copy
[4806]8import csv
[4821]9import os
10import sys
[4900]11import tempfile
[4821]12import time
[4806]13from zope.component import createObject
14from zope.interface import Interface
15from zope.schema import getFields
[5005]16from waeup.sirp.interfaces import (
[6273]17    IBatchProcessor, ISchemaTypeConverter, FatalCSVError, DuplicationError,
18    IObjectConverter)
[4806]19
20class BatchProcessor(grok.GlobalUtility):
21    """A processor to add, update, or remove data.
22
23    This is a non-active baseclass.
24    """
[4831]25    grok.provides(IBatchProcessor)
[4806]26    grok.context(Interface)
27    grok.baseclass()
28
29    # Name used in pages and forms...
[5009]30    name = u'Non-registered base importer'
[6259]31
[4806]32    # Internal name...
[5009]33    util_name = 'baseimporter'
[6259]34
[4806]35    # Items for this processor need an interface with zope.schema fields.
[5009]36    iface = Interface
[6259]37
[4806]38    # The name must be the same as the util_name attribute in order to
39    # register this utility correctly.
40    grok.name(util_name)
41
42    # Headers needed to locate items...
43    location_fields = ['code', 'faculty_code']
[6259]44
[4806]45    # A factory with this name must be registered...
46    factory_name = 'waeup.Department'
47
48    @property
49    def required_fields(self):
[4829]50        """Required fields that have no default.
51
52        A list of names of field, whose value cannot be set if not
53        given during creation. Therefore these fields must exist in
54        input.
55
56        Fields with a default != missing_value do not belong to this
57        category.
58        """
[4806]59        result = []
60        for key, field in getFields(self.iface).items():
61            if key in self.location_fields:
62                continue
[4829]63            if field.default is not field.missing_value:
64                continue
[4806]65            if field.required:
66                result.append(key)
67        return result
[6259]68
[4806]69    @property
70    def req(self):
71        result = dict(
72            create = self.location_fields + self.required_fields,
73            update = self.location_fields,
74            remove = self.location_fields,
75        )
76        return result
77
78    @property
79    def available_fields(self):
80        result = []
81        return sorted(list(set(
82                    self.location_fields + getFields(self.iface).keys())))
[6259]83
[4806]84    def getHeaders(self, mode='create'):
85        return self.available_fields
86
87    def checkHeaders(self, headerfields, mode='create'):
88        req = self.req[mode]
89        # Check for required fields...
90        for field in req:
91            if not field in headerfields:
92                raise FatalCSVError(
93                    "Need at least columns %s for import!" %
94                    ', '.join(["'%s'" % x for x in req]))
95        # Check for double fields...
96        not_ignored_fields = [x for x in headerfields
97                              if not x.startswith('--')]
98        if len(set(not_ignored_fields)) < len(not_ignored_fields):
99            raise FatalCSVError(
100                "Double headers: each column name may only appear once.")
101        return True
102
103    def applyMapping(self, row, mapping):
[4811]104        """Apply mapping to a row of CSV data.
105        """
[4806]106        result = dict()
107        for key, replacement in mapping.items():
108            result[replacement] = row[key]
109        return result
[6259]110
[4832]111    def getMapping(self, path, headerfields, mode):
[4811]112        """Get a mapping from CSV file headerfields to actually used
113           fieldnames.
114        """
[4832]115        result = dict()
[4806]116        reader = csv.reader(open(path, 'rb'))
117        raw_header = reader.next()
[4832]118        for num, field in enumerate(headerfields):
119            if field not in self.location_fields and mode == 'remove':
120                # Ignore non-location fields when removing...
121                field = '--IGNORE--'
122            result[raw_header[num]] = field
123        return result
[4806]124
125    def getFieldConverters(self, fieldnames):
[4811]126        """Get converters for fieldnames.
127        """
[4806]128        result = dict()
129        for key, field in getFields(self.iface).items():
130            if key not in fieldnames:
131                continue
132            converter = ISchemaTypeConverter(field)
133            result[key] = converter
134        return result
[6259]135
[4806]136    def convertToTypes(self, row, converter_dict):
[4811]137        """Convert values in given row to destination type.
138        """
[4806]139        if '--IGNORE--' in row.keys():
140            del row['--IGNORE--']
[4821]141        warnings = []
[4806]142        for key, value in row.items():
143            converter = converter_dict.get(key, None)
144            if converter:
[4821]145                try:
146                    row.update({key:converter_dict[key].fromString(value)})
147                except:
[4995]148                    msg = "conversion error: field %s: %s %r" % (
149                        key, sys.exc_info()[0], sys.exc_info()[1])
[4821]150                    warnings.append(msg)
151        return (row, warnings)
[4806]152
[6273]153    def stringFromErrs(self, errors, inv_errors):
154        result = []
155        for err in errors:
156            fieldname, message = err
157            result.append("%s: %s" % (fieldname, message))
158        for err in inv_errors:
159            result.append("invariant: %s" % err)
160        return '; '.join(result)
161
[4806]162    def callFactory(self, *args, **kw):
163        return createObject(self.factory_name)
164
165    def parentsExist(self, row, site):
[4811]166        """Tell whether the parent object for data in ``row`` exists.
167        """
[4806]168        raise NotImplementedError('method not implemented')
169
170    def entryExists(self, row, site):
[4811]171        """Tell whether there already exists an entry for ``row`` data.
172        """
[4806]173        raise NotImplementedError('method not implemented')
174
175    def getParent(self, row, site):
[4811]176        """Get the parent object for the entry in ``row``.
177        """
[4806]178        raise NotImplementedError('method not implemented')
[6259]179
[5009]180    def getEntry(self, row, site):
181        """Get the parent object for the entry in ``row``.
182        """
183        raise NotImplementedError('method not implemented')
[6259]184
[4806]185    def addEntry(self, obj, row, site):
[4811]186        """Add the entry given given by ``row`` data.
187        """
[4806]188        raise NotImplementedError('method not implemented')
189
190    def delEntry(self, row, site):
[4811]191        """Delete entry given by ``row`` data.
192        """
[6259]193        raise NotImplementedError('method not implemented')
[4806]194
195    def updateEntry(self, obj, row, site):
[4984]196        """Update obj to the values given in row.
197        """
[4829]198        for key, value in row.items():
199            setattr(obj, key, value)
200        return
[4821]201
[4832]202    def createLogfile(self, path, fail_path, num, warnings, mode, user,
[4885]203                      timedelta, logger=None):
204        """Write to log file.
[4821]205        """
[4885]206        if logger is None:
207            return
208        status = 'OK'
209        if warnings > 0:
210            status = 'FAILED'
211        logger.info("-" * 20)
212        logger.info("%s: Batch processing finished: %s" % (user, status))
213        logger.info("%s: Source: %s" % (user, path))
214        logger.info("%s: Mode: %s" % (user, mode))
215        logger.info("%s: User: %s" % (user, user))
216        if warnings > 0:
[4900]217            logger.info("%s: Failed datasets: %s" % (
218                    user, os.path.basename(fail_path)))
[4885]219        logger.info("%s: Processing time: %0.3f s (%0.4f s/item)" % (
220                user, timedelta, timedelta/(num or 1)))
221        logger.info("%s: Processed: %s lines (%s successful/ %s failed)" % (
222                user, num, num - warnings, warnings
[4821]223                ))
[4885]224        logger.info("-" * 20)
[4821]225        return
[4877]226
227    def writeFailedRow(self, writer, row, warnings):
228        """Write a row with error messages to error CSV.
229
230        If warnings is a list of strings, they will be concatenated.
231        """
232        error_col = warnings
233        if isinstance(warnings, list):
234            error_col = ' / '.join(warnings)
235        row['--ERRORS--'] = error_col
236        writer.writerow(row)
237        return
[6259]238
[4885]239    def doImport(self, path, headerfields, mode='create', user='Unknown',
240                 logger=None):
[4811]241        """Perform actual import.
242        """
[4832]243        time_start = time.time()
[4806]244        self.checkHeaders(headerfields, mode)
[4832]245        mapping = self.getMapping(path, headerfields, mode)
[4806]246        converters = self.getFieldConverters(headerfields)
247        reader = csv.DictReader(open(path, 'rb'))
[4889]248
[4900]249        temp_dir = tempfile.mkdtemp()
[6259]250
[6273]251        base = os.path.basename(path)
252        (base, ext) = os.path.splitext(base)
[4900]253        failed_path = os.path.join(temp_dir, "%s.pending%s" % (base, ext))
[4911]254        failed_headers = mapping.keys()
[4877]255        failed_headers.append('--ERRORS--')
[4821]256        failed_writer = csv.DictWriter(open(failed_path, 'wb'),
257                                       failed_headers)
[4911]258        first_row = mapping.items()
259        first_row.append(("--ERRORS--", "--ERRORS--"),)
260        failed_writer.writerow(dict(first_row))
[4891]261
[4900]262        finished_path = os.path.join(temp_dir, "%s.finished%s" % (base, ext))
[4891]263        finished_headers = [x for x in mapping.values()]
264        finished_writer = csv.DictWriter(open(finished_path, 'wb'),
265                                         finished_headers)
266        finished_writer.writerow(dict([(x,x) for x in finished_headers]))
[6259]267
[4806]268        num =0
[4878]269        num_warns = 0
[4806]270        site = grok.getSite()
[6273]271        converter = IObjectConverter(self.iface)
[4806]272        for raw_row in reader:
273            num += 1
274            string_row = self.applyMapping(raw_row, mapping)
[6273]275            row = dict(string_row.items()) # create deep copy
276            errs, inv_errs, conv_dict =  converter.fromStringDict(
277                string_row, self.factory_name)
278            if errs or inv_errs:
[4878]279                num_warns += 1
[6273]280                conv_warnings = self.stringFromErrs(errs, inv_errs)
281                self.writeFailedRow(
282                    failed_writer, raw_row, conv_warnings)
[4821]283                continue
[6273]284            row.update(conv_dict)
[6259]285
[4806]286            if mode == 'create':
287                if not self.parentsExist(row, site):
[4878]288                    num_warns += 1
[4877]289                    self.writeFailedRow(
[4911]290                        failed_writer, raw_row,
[4877]291                        "Not all parents do exist yet. Skipping")
[4806]292                    continue
293                if self.entryExists(row, site):
[4878]294                    num_warns += 1
[4877]295                    self.writeFailedRow(
[4911]296                        failed_writer, raw_row,
[6219]297                        "This object already exists in the same container. Skipping.")
[4806]298                    continue
299                obj = self.callFactory()
300                for key, value in row.items():
301                    setattr(obj, key, value)
[6243]302                try:
303                    self.addEntry(obj, row, site)
[6273]304                except KeyError, error:
[6219]305                    num_warns += 1
306                    self.writeFailedRow(
307                        failed_writer, raw_row,
[6273]308                        "%s Skipping." % error.message)
[6219]309                    continue
[4806]310            elif mode == 'remove':
311                if not self.entryExists(row, site):
[4878]312                    num_warns += 1
[4877]313                    self.writeFailedRow(
[4911]314                        failed_writer, raw_row,
[4877]315                        "Cannot remove: no such entry.")
[4806]316                    continue
317                self.delEntry(row, site)
318            elif mode == 'update':
319                obj = self.getEntry(row, site)
320                if obj is None:
[4878]321                    num_warns += 1
[4877]322                    self.writeFailedRow(
[4911]323                        failed_writer, raw_row,
[4877]324                        "Cannot update: no such entry.")
[4806]325                    continue
326                self.updateEntry(obj, row, site)
[4891]327            finished_writer.writerow(string_row)
[4821]328
[4832]329        time_end = time.time()
330        timedelta = time_end - time_start
[6259]331
[4878]332        self.createLogfile(path, failed_path, num, num_warns, mode, user,
[4885]333                           timedelta, logger=logger)
[4894]334        failed_path = os.path.abspath(failed_path)
[4878]335        if num_warns == 0:
[4821]336            del failed_writer
337            os.unlink(failed_path)
[4894]338            failed_path = None
339        return (num, num_warns,
340                os.path.abspath(finished_path), failed_path)
Note: See TracBrowser for help on using the repository browser.