source: main/waeup.sirp/trunk/src/waeup/sirp/utils/batching.py @ 6830

Last change on this file since 6830 was 6828, checked in by Henrik Bettermann, 14 years ago

Catch double headers already in import step 2. Do not show table if double headers occur.

Create warning messages in views and use flash method to render warnings.

File size: 11.1 KB
RevLine 
[4806]1"""WAeUP components for batch processing.
2
3Batch processors eat CSV files to add, update or remove large numbers
4of certain kinds of objects at once.
5"""
6import grok
[4870]7import copy
[4806]8import csv
[4821]9import os
10import sys
[4900]11import tempfile
[4821]12import time
[4806]13from zope.component import createObject
14from zope.interface import Interface
15from zope.schema import getFields
[5005]16from waeup.sirp.interfaces import (
[6276]17    IBatchProcessor, FatalCSVError, DuplicationError, IObjectConverter)
[4806]18
19class BatchProcessor(grok.GlobalUtility):
20    """A processor to add, update, or remove data.
21
22    This is a non-active baseclass.
23    """
[4831]24    grok.provides(IBatchProcessor)
[4806]25    grok.context(Interface)
26    grok.baseclass()
27
28    # Name used in pages and forms...
[5009]29    name = u'Non-registered base importer'
[6259]30
[4806]31    # Internal name...
[5009]32    util_name = 'baseimporter'
[6259]33
[4806]34    # Items for this processor need an interface with zope.schema fields.
[5009]35    iface = Interface
[6259]36
[4806]37    # The name must be the same as the util_name attribute in order to
38    # register this utility correctly.
39    grok.name(util_name)
40
41    # Headers needed to locate items...
42    location_fields = ['code', 'faculty_code']
[6259]43
[4806]44    # A factory with this name must be registered...
45    factory_name = 'waeup.Department'
46
47    @property
48    def required_fields(self):
[4829]49        """Required fields that have no default.
50
51        A list of names of field, whose value cannot be set if not
52        given during creation. Therefore these fields must exist in
53        input.
54
55        Fields with a default != missing_value do not belong to this
56        category.
57        """
[4806]58        result = []
59        for key, field in getFields(self.iface).items():
60            if key in self.location_fields:
61                continue
[4829]62            if field.default is not field.missing_value:
63                continue
[4806]64            if field.required:
65                result.append(key)
66        return result
[6259]67
[4806]68    @property
69    def req(self):
70        result = dict(
71            create = self.location_fields + self.required_fields,
72            update = self.location_fields,
73            remove = self.location_fields,
74        )
75        return result
76
77    @property
78    def available_fields(self):
79        result = []
80        return sorted(list(set(
81                    self.location_fields + getFields(self.iface).keys())))
[6259]82
[4806]83    def getHeaders(self, mode='create'):
84        return self.available_fields
85
86    def checkHeaders(self, headerfields, mode='create'):
87        req = self.req[mode]
88        # Check for required fields...
89        for field in req:
90            if not field in headerfields:
91                raise FatalCSVError(
92                    "Need at least columns %s for import!" %
93                    ', '.join(["'%s'" % x for x in req]))
[6828]94        # Check for double fields. Cannot happen because this error is
95        # already catched in views
[4806]96        not_ignored_fields = [x for x in headerfields
97                              if not x.startswith('--')]
98        if len(set(not_ignored_fields)) < len(not_ignored_fields):
99            raise FatalCSVError(
100                "Double headers: each column name may only appear once.")
101        return True
102
103    def applyMapping(self, row, mapping):
[4811]104        """Apply mapping to a row of CSV data.
[6824]105
[4811]106        """
[4806]107        result = dict()
108        for key, replacement in mapping.items():
[6824]109            if replacement == u'--IGNORE--':
110                # Skip ignored columns in failed and finished data files.
111                continue
[4806]112            result[replacement] = row[key]
113        return result
[6259]114
[4832]115    def getMapping(self, path, headerfields, mode):
[6824]116        """Get a mapping from CSV file headerfields to actually used fieldnames.
117
[4811]118        """
[4832]119        result = dict()
[4806]120        reader = csv.reader(open(path, 'rb'))
121        raw_header = reader.next()
[4832]122        for num, field in enumerate(headerfields):
123            if field not in self.location_fields and mode == 'remove':
[6824]124                # Skip non-location fields when removing.
125                continue
126            if field == u'--IGNORE--':
127                # Skip ignored columns in failed and finished data files.
128                continue
[4832]129            result[raw_header[num]] = field
130        return result
[4806]131
[6273]132    def stringFromErrs(self, errors, inv_errors):
133        result = []
134        for err in errors:
135            fieldname, message = err
136            result.append("%s: %s" % (fieldname, message))
137        for err in inv_errors:
138            result.append("invariant: %s" % err)
139        return '; '.join(result)
140
[4806]141    def callFactory(self, *args, **kw):
142        return createObject(self.factory_name)
143
144    def parentsExist(self, row, site):
[4811]145        """Tell whether the parent object for data in ``row`` exists.
146        """
[4806]147        raise NotImplementedError('method not implemented')
148
149    def entryExists(self, row, site):
[4811]150        """Tell whether there already exists an entry for ``row`` data.
151        """
[4806]152        raise NotImplementedError('method not implemented')
153
154    def getParent(self, row, site):
[4811]155        """Get the parent object for the entry in ``row``.
156        """
[4806]157        raise NotImplementedError('method not implemented')
[6259]158
[5009]159    def getEntry(self, row, site):
160        """Get the parent object for the entry in ``row``.
161        """
162        raise NotImplementedError('method not implemented')
[6259]163
[4806]164    def addEntry(self, obj, row, site):
[4811]165        """Add the entry given given by ``row`` data.
166        """
[4806]167        raise NotImplementedError('method not implemented')
168
169    def delEntry(self, row, site):
[4811]170        """Delete entry given by ``row`` data.
171        """
[6259]172        raise NotImplementedError('method not implemented')
[4806]173
174    def updateEntry(self, obj, row, site):
[4984]175        """Update obj to the values given in row.
176        """
[4829]177        for key, value in row.items():
178            setattr(obj, key, value)
179        return
[4821]180
[4832]181    def createLogfile(self, path, fail_path, num, warnings, mode, user,
[4885]182                      timedelta, logger=None):
183        """Write to log file.
[4821]184        """
[4885]185        if logger is None:
186            return
187        status = 'OK'
188        if warnings > 0:
189            status = 'FAILED'
190        logger.info("-" * 20)
191        logger.info("%s: Batch processing finished: %s" % (user, status))
192        logger.info("%s: Source: %s" % (user, path))
193        logger.info("%s: Mode: %s" % (user, mode))
194        logger.info("%s: User: %s" % (user, user))
195        if warnings > 0:
[4900]196            logger.info("%s: Failed datasets: %s" % (
197                    user, os.path.basename(fail_path)))
[4885]198        logger.info("%s: Processing time: %0.3f s (%0.4f s/item)" % (
199                user, timedelta, timedelta/(num or 1)))
200        logger.info("%s: Processed: %s lines (%s successful/ %s failed)" % (
201                user, num, num - warnings, warnings
[4821]202                ))
[4885]203        logger.info("-" * 20)
[4821]204        return
[4877]205
206    def writeFailedRow(self, writer, row, warnings):
207        """Write a row with error messages to error CSV.
208
209        If warnings is a list of strings, they will be concatenated.
210        """
211        error_col = warnings
212        if isinstance(warnings, list):
213            error_col = ' / '.join(warnings)
214        row['--ERRORS--'] = error_col
215        writer.writerow(row)
216        return
[6259]217
[4885]218    def doImport(self, path, headerfields, mode='create', user='Unknown',
219                 logger=None):
[4811]220        """Perform actual import.
221        """
[4832]222        time_start = time.time()
[4806]223        self.checkHeaders(headerfields, mode)
[4832]224        mapping = self.getMapping(path, headerfields, mode)
[4806]225        reader = csv.DictReader(open(path, 'rb'))
[4889]226
[4900]227        temp_dir = tempfile.mkdtemp()
[6259]228
[6273]229        base = os.path.basename(path)
230        (base, ext) = os.path.splitext(base)
[4900]231        failed_path = os.path.join(temp_dir, "%s.pending%s" % (base, ext))
[4911]232        failed_headers = mapping.keys()
[4877]233        failed_headers.append('--ERRORS--')
[4821]234        failed_writer = csv.DictWriter(open(failed_path, 'wb'),
235                                       failed_headers)
[4911]236        first_row = mapping.items()
237        first_row.append(("--ERRORS--", "--ERRORS--"),)
238        failed_writer.writerow(dict(first_row))
[4891]239
[4900]240        finished_path = os.path.join(temp_dir, "%s.finished%s" % (base, ext))
[4891]241        finished_headers = [x for x in mapping.values()]
242        finished_writer = csv.DictWriter(open(finished_path, 'wb'),
243                                         finished_headers)
244        finished_writer.writerow(dict([(x,x) for x in finished_headers]))
[6259]245
[4806]246        num =0
[4878]247        num_warns = 0
[4806]248        site = grok.getSite()
[6273]249        converter = IObjectConverter(self.iface)
[4806]250        for raw_row in reader:
251            num += 1
252            string_row = self.applyMapping(raw_row, mapping)
[6273]253            row = dict(string_row.items()) # create deep copy
254            errs, inv_errs, conv_dict =  converter.fromStringDict(
255                string_row, self.factory_name)
256            if errs or inv_errs:
[4878]257                num_warns += 1
[6273]258                conv_warnings = self.stringFromErrs(errs, inv_errs)
259                self.writeFailedRow(
[6824]260                    failed_writer, string_row, conv_warnings)
[4821]261                continue
[6273]262            row.update(conv_dict)
[6259]263
[4806]264            if mode == 'create':
265                if not self.parentsExist(row, site):
[4878]266                    num_warns += 1
[4877]267                    self.writeFailedRow(
[6824]268                        failed_writer, string_row,
[4877]269                        "Not all parents do exist yet. Skipping")
[4806]270                    continue
271                if self.entryExists(row, site):
[4878]272                    num_warns += 1
[4877]273                    self.writeFailedRow(
[6824]274                        failed_writer, string_row,
[6219]275                        "This object already exists in the same container. Skipping.")
[4806]276                    continue
277                obj = self.callFactory()
278                for key, value in row.items():
279                    setattr(obj, key, value)
[6243]280                try:
281                    self.addEntry(obj, row, site)
[6273]282                except KeyError, error:
[6219]283                    num_warns += 1
284                    self.writeFailedRow(
[6824]285                        failed_writer, string_row,
[6273]286                        "%s Skipping." % error.message)
[6219]287                    continue
[4806]288            elif mode == 'remove':
289                if not self.entryExists(row, site):
[4878]290                    num_warns += 1
[4877]291                    self.writeFailedRow(
[6824]292                        failed_writer, string_row,
[4877]293                        "Cannot remove: no such entry.")
[4806]294                    continue
295                self.delEntry(row, site)
296            elif mode == 'update':
297                obj = self.getEntry(row, site)
298                if obj is None:
[4878]299                    num_warns += 1
[4877]300                    self.writeFailedRow(
[6824]301                        failed_writer, string_row,
[4877]302                        "Cannot update: no such entry.")
[4806]303                    continue
304                self.updateEntry(obj, row, site)
[4891]305            finished_writer.writerow(string_row)
[4821]306
[4832]307        time_end = time.time()
308        timedelta = time_end - time_start
[6259]309
[4878]310        self.createLogfile(path, failed_path, num, num_warns, mode, user,
[4885]311                           timedelta, logger=logger)
[4894]312        failed_path = os.path.abspath(failed_path)
[4878]313        if num_warns == 0:
[4821]314            del failed_writer
315            os.unlink(failed_path)
[4894]316            failed_path = None
317        return (num, num_warns,
318                os.path.abspath(finished_path), failed_path)
Note: See TracBrowser for help on using the repository browser.