source: main/waeup.sirp/trunk/src/waeup/sirp/utils/batching.py @ 6827

Last change on this file since 6827 was 6824, checked in by Henrik Bettermann, 13 years ago

Skip ignored columns in failed and finished data files.

In the finished data file we now clearly see which fields have been imported. In the pending data file (failed data file) ignored columns are omitted.

We could think about saving the original import file elsewhere.

File size: 11.1 KB
Line 
1"""WAeUP components for batch processing.
2
3Batch processors eat CSV files to add, update or remove large numbers
4of certain kinds of objects at once.
5"""
6import grok
7import copy
8import csv
9import os
10import sys
11import tempfile
12import time
13from zope.component import createObject
14from zope.interface import Interface
15from zope.schema import getFields
16from waeup.sirp.interfaces import (
17    IBatchProcessor, FatalCSVError, DuplicationError, IObjectConverter)
18
19class BatchProcessor(grok.GlobalUtility):
20    """A processor to add, update, or remove data.
21
22    This is a non-active baseclass.
23    """
24    grok.provides(IBatchProcessor)
25    grok.context(Interface)
26    grok.baseclass()
27
28    # Name used in pages and forms...
29    name = u'Non-registered base importer'
30
31    # Internal name...
32    util_name = 'baseimporter'
33
34    # Items for this processor need an interface with zope.schema fields.
35    iface = Interface
36
37    # The name must be the same as the util_name attribute in order to
38    # register this utility correctly.
39    grok.name(util_name)
40
41    # Headers needed to locate items...
42    location_fields = ['code', 'faculty_code']
43
44    # A factory with this name must be registered...
45    factory_name = 'waeup.Department'
46
47    @property
48    def required_fields(self):
49        """Required fields that have no default.
50
51        A list of names of field, whose value cannot be set if not
52        given during creation. Therefore these fields must exist in
53        input.
54
55        Fields with a default != missing_value do not belong to this
56        category.
57        """
58        result = []
59        for key, field in getFields(self.iface).items():
60            if key in self.location_fields:
61                continue
62            if field.default is not field.missing_value:
63                continue
64            if field.required:
65                result.append(key)
66        return result
67
68    @property
69    def req(self):
70        result = dict(
71            create = self.location_fields + self.required_fields,
72            update = self.location_fields,
73            remove = self.location_fields,
74        )
75        return result
76
77    @property
78    def available_fields(self):
79        result = []
80        return sorted(list(set(
81                    self.location_fields + getFields(self.iface).keys())))
82
83    def getHeaders(self, mode='create'):
84        return self.available_fields
85
86    def checkHeaders(self, headerfields, mode='create'):
87        req = self.req[mode]
88        # Check for required fields...
89        for field in req:
90            if not field in headerfields:
91                raise FatalCSVError(
92                    "Need at least columns %s for import!" %
93                    ', '.join(["'%s'" % x for x in req]))
94        # Check for double fields...
95        not_ignored_fields = [x for x in headerfields
96                              if not x.startswith('--')]
97        if len(set(not_ignored_fields)) < len(not_ignored_fields):
98            raise FatalCSVError(
99                "Double headers: each column name may only appear once.")
100        return True
101
102    def applyMapping(self, row, mapping):
103        """Apply mapping to a row of CSV data.
104
105        """
106        result = dict()
107        for key, replacement in mapping.items():
108            if replacement == u'--IGNORE--':
109                # Skip ignored columns in failed and finished data files.
110                continue
111            result[replacement] = row[key]
112        return result
113
114    def getMapping(self, path, headerfields, mode):
115        """Get a mapping from CSV file headerfields to actually used fieldnames.
116
117        """
118        result = dict()
119        reader = csv.reader(open(path, 'rb'))
120        raw_header = reader.next()
121        for num, field in enumerate(headerfields):
122            if field not in self.location_fields and mode == 'remove':
123                # Skip non-location fields when removing.
124                continue
125            if field == u'--IGNORE--':
126                # Skip ignored columns in failed and finished data files.
127                continue
128            result[raw_header[num]] = field
129        return result
130
131    def stringFromErrs(self, errors, inv_errors):
132        result = []
133        for err in errors:
134            fieldname, message = err
135            result.append("%s: %s" % (fieldname, message))
136        for err in inv_errors:
137            result.append("invariant: %s" % err)
138        return '; '.join(result)
139
140    def callFactory(self, *args, **kw):
141        return createObject(self.factory_name)
142
143    def parentsExist(self, row, site):
144        """Tell whether the parent object for data in ``row`` exists.
145        """
146        raise NotImplementedError('method not implemented')
147
148    def entryExists(self, row, site):
149        """Tell whether there already exists an entry for ``row`` data.
150        """
151        raise NotImplementedError('method not implemented')
152
153    def getParent(self, row, site):
154        """Get the parent object for the entry in ``row``.
155        """
156        raise NotImplementedError('method not implemented')
157
158    def getEntry(self, row, site):
159        """Get the parent object for the entry in ``row``.
160        """
161        raise NotImplementedError('method not implemented')
162
163    def addEntry(self, obj, row, site):
164        """Add the entry given given by ``row`` data.
165        """
166        raise NotImplementedError('method not implemented')
167
168    def delEntry(self, row, site):
169        """Delete entry given by ``row`` data.
170        """
171        raise NotImplementedError('method not implemented')
172
173    def updateEntry(self, obj, row, site):
174        """Update obj to the values given in row.
175        """
176        for key, value in row.items():
177            setattr(obj, key, value)
178        return
179
180    def createLogfile(self, path, fail_path, num, warnings, mode, user,
181                      timedelta, logger=None):
182        """Write to log file.
183        """
184        if logger is None:
185            return
186        status = 'OK'
187        if warnings > 0:
188            status = 'FAILED'
189        logger.info("-" * 20)
190        logger.info("%s: Batch processing finished: %s" % (user, status))
191        logger.info("%s: Source: %s" % (user, path))
192        logger.info("%s: Mode: %s" % (user, mode))
193        logger.info("%s: User: %s" % (user, user))
194        if warnings > 0:
195            logger.info("%s: Failed datasets: %s" % (
196                    user, os.path.basename(fail_path)))
197        logger.info("%s: Processing time: %0.3f s (%0.4f s/item)" % (
198                user, timedelta, timedelta/(num or 1)))
199        logger.info("%s: Processed: %s lines (%s successful/ %s failed)" % (
200                user, num, num - warnings, warnings
201                ))
202        logger.info("-" * 20)
203        return
204
205    def writeFailedRow(self, writer, row, warnings):
206        """Write a row with error messages to error CSV.
207
208        If warnings is a list of strings, they will be concatenated.
209        """
210        error_col = warnings
211        if isinstance(warnings, list):
212            error_col = ' / '.join(warnings)
213        row['--ERRORS--'] = error_col
214        writer.writerow(row)
215        return
216
217    def doImport(self, path, headerfields, mode='create', user='Unknown',
218                 logger=None):
219        """Perform actual import.
220        """
221        time_start = time.time()
222        self.checkHeaders(headerfields, mode)
223        mapping = self.getMapping(path, headerfields, mode)
224        reader = csv.DictReader(open(path, 'rb'))
225
226        temp_dir = tempfile.mkdtemp()
227
228        base = os.path.basename(path)
229        (base, ext) = os.path.splitext(base)
230        failed_path = os.path.join(temp_dir, "%s.pending%s" % (base, ext))
231        failed_headers = mapping.keys()
232        failed_headers.append('--ERRORS--')
233        failed_writer = csv.DictWriter(open(failed_path, 'wb'),
234                                       failed_headers)
235        first_row = mapping.items()
236        first_row.append(("--ERRORS--", "--ERRORS--"),)
237        failed_writer.writerow(dict(first_row))
238
239        finished_path = os.path.join(temp_dir, "%s.finished%s" % (base, ext))
240        finished_headers = [x for x in mapping.values()]
241        finished_writer = csv.DictWriter(open(finished_path, 'wb'),
242                                         finished_headers)
243        finished_writer.writerow(dict([(x,x) for x in finished_headers]))
244
245        num =0
246        num_warns = 0
247        site = grok.getSite()
248        converter = IObjectConverter(self.iface)
249        for raw_row in reader:
250            num += 1
251            string_row = self.applyMapping(raw_row, mapping)
252            row = dict(string_row.items()) # create deep copy
253            errs, inv_errs, conv_dict =  converter.fromStringDict(
254                string_row, self.factory_name)
255            if errs or inv_errs:
256                num_warns += 1
257                conv_warnings = self.stringFromErrs(errs, inv_errs)
258                self.writeFailedRow(
259                    failed_writer, string_row, conv_warnings)
260                continue
261            row.update(conv_dict)
262
263            if mode == 'create':
264                if not self.parentsExist(row, site):
265                    num_warns += 1
266                    self.writeFailedRow(
267                        failed_writer, string_row,
268                        "Not all parents do exist yet. Skipping")
269                    continue
270                if self.entryExists(row, site):
271                    num_warns += 1
272                    self.writeFailedRow(
273                        failed_writer, string_row,
274                        "This object already exists in the same container. Skipping.")
275                    continue
276                obj = self.callFactory()
277                for key, value in row.items():
278                    setattr(obj, key, value)
279                try:
280                    self.addEntry(obj, row, site)
281                except KeyError, error:
282                    num_warns += 1
283                    self.writeFailedRow(
284                        failed_writer, string_row,
285                        "%s Skipping." % error.message)
286                    continue
287            elif mode == 'remove':
288                if not self.entryExists(row, site):
289                    num_warns += 1
290                    self.writeFailedRow(
291                        failed_writer, string_row,
292                        "Cannot remove: no such entry.")
293                    continue
294                self.delEntry(row, site)
295            elif mode == 'update':
296                obj = self.getEntry(row, site)
297                if obj is None:
298                    num_warns += 1
299                    self.writeFailedRow(
300                        failed_writer, string_row,
301                        "Cannot update: no such entry.")
302                    continue
303                self.updateEntry(obj, row, site)
304            finished_writer.writerow(string_row)
305
306        time_end = time.time()
307        timedelta = time_end - time_start
308
309        self.createLogfile(path, failed_path, num, num_warns, mode, user,
310                           timedelta, logger=logger)
311        failed_path = os.path.abspath(failed_path)
312        if num_warns == 0:
313            del failed_writer
314            os.unlink(failed_path)
315            failed_path = None
316        return (num, num_warns,
317                os.path.abspath(finished_path), failed_path)
Note: See TracBrowser for help on using the repository browser.