source: main/waeup.sirp/trunk/src/waeup/sirp/utils/batching.py @ 6273

Last change on this file since 6273 was 6273, checked in by uli, 13 years ago

Finally make the new converter work. API-wise it is as good as the old one (can import everyting, the old one could),
but design-wise it might be much more powerfull. Basically it can handle/convert all content-types for which one can
create an Add- or EditForm? successfully. In other words: if you manage to write an edit form for some content type,
then you can also create an importer for that content-type. Still finetuning needed (for dates, bool data, etc.) but
the main things work.

File size: 11.9 KB
Line 
1"""WAeUP components for batch processing.
2
3Batch processors eat CSV files to add, update or remove large numbers
4of certain kinds of objects at once.
5"""
6import grok
7import copy
8import csv
9import os
10import sys
11import tempfile
12import time
13from zope.component import createObject
14from zope.interface import Interface
15from zope.schema import getFields
16from waeup.sirp.interfaces import (
17    IBatchProcessor, ISchemaTypeConverter, FatalCSVError, DuplicationError,
18    IObjectConverter)
19
20class BatchProcessor(grok.GlobalUtility):
21    """A processor to add, update, or remove data.
22
23    This is a non-active baseclass.
24    """
25    grok.provides(IBatchProcessor)
26    grok.context(Interface)
27    grok.baseclass()
28
29    # Name used in pages and forms...
30    name = u'Non-registered base importer'
31
32    # Internal name...
33    util_name = 'baseimporter'
34
35    # Items for this processor need an interface with zope.schema fields.
36    iface = Interface
37
38    # The name must be the same as the util_name attribute in order to
39    # register this utility correctly.
40    grok.name(util_name)
41
42    # Headers needed to locate items...
43    location_fields = ['code', 'faculty_code']
44
45    # A factory with this name must be registered...
46    factory_name = 'waeup.Department'
47
48    @property
49    def required_fields(self):
50        """Required fields that have no default.
51
52        A list of names of field, whose value cannot be set if not
53        given during creation. Therefore these fields must exist in
54        input.
55
56        Fields with a default != missing_value do not belong to this
57        category.
58        """
59        result = []
60        for key, field in getFields(self.iface).items():
61            if key in self.location_fields:
62                continue
63            if field.default is not field.missing_value:
64                continue
65            if field.required:
66                result.append(key)
67        return result
68
69    @property
70    def req(self):
71        result = dict(
72            create = self.location_fields + self.required_fields,
73            update = self.location_fields,
74            remove = self.location_fields,
75        )
76        return result
77
78    @property
79    def available_fields(self):
80        result = []
81        return sorted(list(set(
82                    self.location_fields + getFields(self.iface).keys())))
83
84    def getHeaders(self, mode='create'):
85        return self.available_fields
86
87    def checkHeaders(self, headerfields, mode='create'):
88        req = self.req[mode]
89        # Check for required fields...
90        for field in req:
91            if not field in headerfields:
92                raise FatalCSVError(
93                    "Need at least columns %s for import!" %
94                    ', '.join(["'%s'" % x for x in req]))
95        # Check for double fields...
96        not_ignored_fields = [x for x in headerfields
97                              if not x.startswith('--')]
98        if len(set(not_ignored_fields)) < len(not_ignored_fields):
99            raise FatalCSVError(
100                "Double headers: each column name may only appear once.")
101        return True
102
103    def applyMapping(self, row, mapping):
104        """Apply mapping to a row of CSV data.
105        """
106        result = dict()
107        for key, replacement in mapping.items():
108            result[replacement] = row[key]
109        return result
110
111    def getMapping(self, path, headerfields, mode):
112        """Get a mapping from CSV file headerfields to actually used
113           fieldnames.
114        """
115        result = dict()
116        reader = csv.reader(open(path, 'rb'))
117        raw_header = reader.next()
118        for num, field in enumerate(headerfields):
119            if field not in self.location_fields and mode == 'remove':
120                # Ignore non-location fields when removing...
121                field = '--IGNORE--'
122            result[raw_header[num]] = field
123        return result
124
125    def getFieldConverters(self, fieldnames):
126        """Get converters for fieldnames.
127        """
128        result = dict()
129        for key, field in getFields(self.iface).items():
130            if key not in fieldnames:
131                continue
132            converter = ISchemaTypeConverter(field)
133            result[key] = converter
134        return result
135
136    def convertToTypes(self, row, converter_dict):
137        """Convert values in given row to destination type.
138        """
139        if '--IGNORE--' in row.keys():
140            del row['--IGNORE--']
141        warnings = []
142        for key, value in row.items():
143            converter = converter_dict.get(key, None)
144            if converter:
145                try:
146                    row.update({key:converter_dict[key].fromString(value)})
147                except:
148                    msg = "conversion error: field %s: %s %r" % (
149                        key, sys.exc_info()[0], sys.exc_info()[1])
150                    warnings.append(msg)
151        return (row, warnings)
152
153    def stringFromErrs(self, errors, inv_errors):
154        result = []
155        for err in errors:
156            fieldname, message = err
157            result.append("%s: %s" % (fieldname, message))
158        for err in inv_errors:
159            result.append("invariant: %s" % err)
160        return '; '.join(result)
161
162    def callFactory(self, *args, **kw):
163        return createObject(self.factory_name)
164
165    def parentsExist(self, row, site):
166        """Tell whether the parent object for data in ``row`` exists.
167        """
168        raise NotImplementedError('method not implemented')
169
170    def entryExists(self, row, site):
171        """Tell whether there already exists an entry for ``row`` data.
172        """
173        raise NotImplementedError('method not implemented')
174
175    def getParent(self, row, site):
176        """Get the parent object for the entry in ``row``.
177        """
178        raise NotImplementedError('method not implemented')
179
180    def getEntry(self, row, site):
181        """Get the parent object for the entry in ``row``.
182        """
183        raise NotImplementedError('method not implemented')
184
185    def addEntry(self, obj, row, site):
186        """Add the entry given given by ``row`` data.
187        """
188        raise NotImplementedError('method not implemented')
189
190    def delEntry(self, row, site):
191        """Delete entry given by ``row`` data.
192        """
193        raise NotImplementedError('method not implemented')
194
195    def updateEntry(self, obj, row, site):
196        """Update obj to the values given in row.
197        """
198        for key, value in row.items():
199            setattr(obj, key, value)
200        return
201
202    def createLogfile(self, path, fail_path, num, warnings, mode, user,
203                      timedelta, logger=None):
204        """Write to log file.
205        """
206        if logger is None:
207            return
208        status = 'OK'
209        if warnings > 0:
210            status = 'FAILED'
211        logger.info("-" * 20)
212        logger.info("%s: Batch processing finished: %s" % (user, status))
213        logger.info("%s: Source: %s" % (user, path))
214        logger.info("%s: Mode: %s" % (user, mode))
215        logger.info("%s: User: %s" % (user, user))
216        if warnings > 0:
217            logger.info("%s: Failed datasets: %s" % (
218                    user, os.path.basename(fail_path)))
219        logger.info("%s: Processing time: %0.3f s (%0.4f s/item)" % (
220                user, timedelta, timedelta/(num or 1)))
221        logger.info("%s: Processed: %s lines (%s successful/ %s failed)" % (
222                user, num, num - warnings, warnings
223                ))
224        logger.info("-" * 20)
225        return
226
227    def writeFailedRow(self, writer, row, warnings):
228        """Write a row with error messages to error CSV.
229
230        If warnings is a list of strings, they will be concatenated.
231        """
232        error_col = warnings
233        if isinstance(warnings, list):
234            error_col = ' / '.join(warnings)
235        row['--ERRORS--'] = error_col
236        writer.writerow(row)
237        return
238
239    def doImport(self, path, headerfields, mode='create', user='Unknown',
240                 logger=None):
241        """Perform actual import.
242        """
243        time_start = time.time()
244        self.checkHeaders(headerfields, mode)
245        mapping = self.getMapping(path, headerfields, mode)
246        converters = self.getFieldConverters(headerfields)
247        reader = csv.DictReader(open(path, 'rb'))
248
249        temp_dir = tempfile.mkdtemp()
250
251        base = os.path.basename(path)
252        (base, ext) = os.path.splitext(base)
253        failed_path = os.path.join(temp_dir, "%s.pending%s" % (base, ext))
254        failed_headers = mapping.keys()
255        failed_headers.append('--ERRORS--')
256        failed_writer = csv.DictWriter(open(failed_path, 'wb'),
257                                       failed_headers)
258        first_row = mapping.items()
259        first_row.append(("--ERRORS--", "--ERRORS--"),)
260        failed_writer.writerow(dict(first_row))
261
262        finished_path = os.path.join(temp_dir, "%s.finished%s" % (base, ext))
263        finished_headers = [x for x in mapping.values()]
264        finished_writer = csv.DictWriter(open(finished_path, 'wb'),
265                                         finished_headers)
266        finished_writer.writerow(dict([(x,x) for x in finished_headers]))
267
268        num =0
269        num_warns = 0
270        site = grok.getSite()
271        converter = IObjectConverter(self.iface)
272        for raw_row in reader:
273            num += 1
274            string_row = self.applyMapping(raw_row, mapping)
275            row = dict(string_row.items()) # create deep copy
276            errs, inv_errs, conv_dict =  converter.fromStringDict(
277                string_row, self.factory_name)
278            if errs or inv_errs:
279                num_warns += 1
280                conv_warnings = self.stringFromErrs(errs, inv_errs)
281                self.writeFailedRow(
282                    failed_writer, raw_row, conv_warnings)
283                continue
284            row.update(conv_dict)
285
286            if mode == 'create':
287                if not self.parentsExist(row, site):
288                    num_warns += 1
289                    self.writeFailedRow(
290                        failed_writer, raw_row,
291                        "Not all parents do exist yet. Skipping")
292                    continue
293                if self.entryExists(row, site):
294                    num_warns += 1
295                    self.writeFailedRow(
296                        failed_writer, raw_row,
297                        "This object already exists in the same container. Skipping.")
298                    continue
299                obj = self.callFactory()
300                for key, value in row.items():
301                    setattr(obj, key, value)
302                try:
303                    self.addEntry(obj, row, site)
304                except KeyError, error:
305                    num_warns += 1
306                    self.writeFailedRow(
307                        failed_writer, raw_row,
308                        "%s Skipping." % error.message)
309                    continue
310            elif mode == 'remove':
311                if not self.entryExists(row, site):
312                    num_warns += 1
313                    self.writeFailedRow(
314                        failed_writer, raw_row,
315                        "Cannot remove: no such entry.")
316                    continue
317                self.delEntry(row, site)
318            elif mode == 'update':
319                obj = self.getEntry(row, site)
320                if obj is None:
321                    num_warns += 1
322                    self.writeFailedRow(
323                        failed_writer, raw_row,
324                        "Cannot update: no such entry.")
325                    continue
326                self.updateEntry(obj, row, site)
327            finished_writer.writerow(string_row)
328
329        time_end = time.time()
330        timedelta = time_end - time_start
331
332        self.createLogfile(path, failed_path, num, num_warns, mode, user,
333                           timedelta, logger=logger)
334        failed_path = os.path.abspath(failed_path)
335        if num_warns == 0:
336            del failed_writer
337            os.unlink(failed_path)
338            failed_path = None
339        return (num, num_warns,
340                os.path.abspath(finished_path), failed_path)
Note: See TracBrowser for help on using the repository browser.