source: main/waeup.kofa/trunk/src/waeup/kofa/utils/batching.py @ 7974

Last change on this file since 7974 was 7950, checked in by Henrik Bettermann, 13 years ago

checkUpdateRequirements is even better than checkConstraints.

  • Property svn:keywords set to Id
File size: 15.6 KB
Line 
1## $Id: batching.py 7950 2012-03-22 07:00:06Z henrik $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
18"""Kofa components for batch processing.
19
20Batch processors eat CSV files to add, update or remove large numbers
21of certain kinds of objects at once.
22"""
23import grok
24import csv
25import os
26import tempfile
27import time
28from cStringIO import StringIO
29from zope.component import createObject
30from zope.interface import Interface
31from zope.schema import getFields
32from waeup.kofa.interfaces import (
33    IBatchProcessor, FatalCSVError, IObjectConverter,
34    ICSVExporter)
35
36class BatchProcessor(grok.GlobalUtility):
37    """A processor to add, update, or remove data.
38
39    This is a non-active baseclass.
40    """
41    grok.provides(IBatchProcessor)
42    grok.context(Interface)
43    grok.baseclass()
44
45    # Name used in pages and forms...
46    name = u'Non-registered base processor'
47
48    # Internal name...
49    util_name = 'baseprocessor'
50
51    # Items for this processor need an interface with zope.schema fields.
52    iface = Interface
53
54    # The name must be the same as the util_name attribute in order to
55    # register this utility correctly.
56    grok.name(util_name)
57
58    # Headers needed to locate items...
59    location_fields = ['code', 'faculty_code']
60
61    # A factory with this name must be registered...
62    factory_name = 'waeup.Department'
63
64    @property
65    def required_fields(self):
66        """Required fields that have no default.
67
68        A list of names of field, whose value cannot be set if not
69        given during creation. Therefore these fields must exist in
70        input.
71
72        Fields with a default != missing_value do not belong to this
73        category.
74        """
75        result = []
76        for key, field in getFields(self.iface).items():
77            if key in self.location_fields:
78                continue
79            if field.default is not field.missing_value:
80                continue
81            if field.required:
82                result.append(key)
83        return result
84
85    @property
86    def req(self):
87        result = dict(
88            create = self.location_fields + self.required_fields,
89            update = self.location_fields,
90            remove = self.location_fields,
91        )
92        return result
93
94    @property
95    def available_fields(self):
96        return sorted(list(set(
97                    self.location_fields + getFields(self.iface).keys())))
98
99    def getHeaders(self, mode='create'):
100        return self.available_fields
101
102    def checkHeaders(self, headerfields, mode='create'):
103        req = self.req[mode]
104        # Check for required fields...
105        for field in req:
106            if not field in headerfields:
107                raise FatalCSVError(
108                    "Need at least columns %s for import!" %
109                    ', '.join(["'%s'" % x for x in req]))
110        # Check for double fields. Cannot happen because this error is
111        # already catched in views
112        not_ignored_fields = [x for x in headerfields
113                              if not x.startswith('--')]
114        if len(set(not_ignored_fields)) < len(not_ignored_fields):
115            raise FatalCSVError(
116                "Double headers: each column name may only appear once.")
117        return True
118
119    def applyMapping(self, row, mapping):
120        """Apply mapping to a row of CSV data.
121
122        """
123        result = dict()
124        for key, replacement in mapping.items():
125            if replacement == u'--IGNORE--':
126                # Skip ignored columns in failed and finished data files.
127                continue
128            result[replacement] = row[key]
129        return result
130
131    def getMapping(self, path, headerfields, mode):
132        """Get a mapping from CSV file headerfields to actually used fieldnames.
133
134        """
135        result = dict()
136        reader = csv.reader(open(path, 'rb'))
137        raw_header = reader.next()
138        for num, field in enumerate(headerfields):
139            if field not in self.location_fields and mode == 'remove':
140                # Skip non-location fields when removing.
141                continue
142            if field == u'--IGNORE--':
143                # Skip ignored columns in failed and finished data files.
144                continue
145            result[raw_header[num]] = field
146        return result
147
148    def stringFromErrs(self, errors, inv_errors):
149        result = []
150        for err in errors:
151            fieldname, message = err
152            result.append("%s: %s" % (fieldname, message))
153        for err in inv_errors:
154            result.append("invariant: %s" % err)
155        return '; '.join(result)
156
157    def callFactory(self, *args, **kw):
158        return createObject(self.factory_name)
159
160    def parentsExist(self, row, site):
161        """Tell whether the parent object for data in ``row`` exists.
162        """
163        raise NotImplementedError('method not implemented')
164
165    def entryExists(self, row, site):
166        """Tell whether there already exists an entry for ``row`` data.
167        """
168        raise NotImplementedError('method not implemented')
169
170    def getParent(self, row, site):
171        """Get the parent object for the entry in ``row``.
172        """
173        raise NotImplementedError('method not implemented')
174
175    def getEntry(self, row, site):
176        """Get the parent object for the entry in ``row``.
177        """
178        raise NotImplementedError('method not implemented')
179
180    def addEntry(self, obj, row, site):
181        """Add the entry given given by ``row`` data.
182        """
183        raise NotImplementedError('method not implemented')
184
185    def delEntry(self, row, site):
186        """Delete entry given by ``row`` data.
187        """
188        raise NotImplementedError('method not implemented')
189
190    def checkUpdateRequirements(self, obj, row, site):
191        """Checks requirements the object must fulfill when being updated.
192
193        This method is not used in case of deleting or adding objects.
194
195        Returns error messages as strings in case of requirement
196        problems.
197        """
198        return None
199
200    def updateEntry(self, obj, row, site):
201        """Update obj to the values given in row.
202        """
203        for key, value in row.items():
204            # Skip fields not declared in interface.
205            if hasattr(obj, key):
206                setattr(obj, key, value)
207        return
208
209    def createLogfile(self, path, fail_path, num, warnings, mode, user,
210                      timedelta, logger=None):
211        """Write to log file.
212        """
213        if logger is None:
214            return
215        status = 'OK'
216        if warnings > 0:
217            status = 'FAILED'
218        logger.info("-" * 20)
219        logger.info("%s: Batch processing finished: %s" % (user, status))
220        logger.info("%s: Source: %s" % (user, path))
221        logger.info("%s: Mode: %s" % (user, mode))
222        logger.info("%s: User: %s" % (user, user))
223        if warnings > 0:
224            logger.info("%s: Failed datasets: %s" % (
225                    user, os.path.basename(fail_path)))
226        logger.info("%s: Processing time: %0.3f s (%0.4f s/item)" % (
227                user, timedelta, timedelta/(num or 1)))
228        logger.info("%s: Processed: %s lines (%s successful/ %s failed)" % (
229                user, num, num - warnings, warnings
230                ))
231        logger.info("-" * 20)
232        return
233
234    def writeFailedRow(self, writer, row, warnings):
235        """Write a row with error messages to error CSV.
236
237        If warnings is a list of strings, they will be concatenated.
238        """
239        error_col = warnings
240        if isinstance(warnings, list):
241            error_col = ' / '.join(warnings)
242        row['--ERRORS--'] = error_col
243        writer.writerow(row)
244        return
245
246    def checkConversion(self, row, mode='ignore'):
247        """Validates all values in row.
248        """
249        converter = IObjectConverter(self.iface)
250        errs, inv_errs, conv_dict =  converter.fromStringDict(
251            row, self.factory_name)
252        return errs, inv_errs, conv_dict
253
254    def doImport(self, path, headerfields, mode='create', user='Unknown',
255                 logger=None):
256        """Perform actual import.
257        """
258        time_start = time.time()
259        self.checkHeaders(headerfields, mode)
260        mapping = self.getMapping(path, headerfields, mode)
261        reader = csv.DictReader(open(path, 'rb'))
262
263        temp_dir = tempfile.mkdtemp()
264
265        base = os.path.basename(path)
266        (base, ext) = os.path.splitext(base)
267        failed_path = os.path.join(temp_dir, "%s.pending%s" % (base, ext))
268        failed_headers = mapping.values()
269        failed_headers.append('--ERRORS--')
270        failed_writer = csv.DictWriter(open(failed_path, 'wb'),
271                                       failed_headers)
272        failed_writer.writerow(dict([(x,x) for x in failed_headers]))
273
274        finished_path = os.path.join(temp_dir, "%s.finished%s" % (base, ext))
275        finished_headers = mapping.values()
276        finished_writer = csv.DictWriter(open(finished_path, 'wb'),
277                                         finished_headers)
278        finished_writer.writerow(dict([(x,x) for x in finished_headers]))
279
280        num =0
281        num_warns = 0
282        site = grok.getSite()
283
284        for raw_row in reader:
285            num += 1
286            string_row = self.applyMapping(raw_row, mapping)
287            row = dict(string_row.items()) # create deep copy
288            errs, inv_errs, conv_dict = self.checkConversion(string_row, mode)
289            if errs or inv_errs:
290                num_warns += 1
291                conv_warnings = self.stringFromErrs(errs, inv_errs)
292                self.writeFailedRow(
293                    failed_writer, string_row, conv_warnings)
294                continue
295            row.update(conv_dict)
296
297            if mode == 'create':
298                if not self.parentsExist(row, site):
299                    num_warns += 1
300                    self.writeFailedRow(
301                        failed_writer, string_row,
302                        "Not all parents do exist yet. Skipping")
303                    continue
304                if self.entryExists(row, site):
305                    num_warns += 1
306                    self.writeFailedRow(
307                        failed_writer, string_row,
308                        "This object already exists in the same container. Skipping.")
309                    continue
310                obj = self.callFactory()
311                # Override all values in row, also
312                # student_ids and applicant_ids which have been
313                # generated in the respective __init__ methods before.
314                self.updateEntry(obj, row, site)
315                try:
316                    self.addEntry(obj, row, site)
317                except KeyError, error:
318                    num_warns += 1
319                    self.writeFailedRow(
320                        failed_writer, string_row,
321                        "%s Skipping." % error.message)
322                    continue
323            elif mode == 'remove':
324                if not self.entryExists(row, site):
325                    num_warns += 1
326                    self.writeFailedRow(
327                        failed_writer, string_row,
328                        "Cannot remove: no such entry.")
329                    continue
330                self.delEntry(row, site)
331            elif mode == 'update':
332                obj = self.getEntry(row, site)
333                if obj is None:
334                    num_warns += 1
335                    self.writeFailedRow(
336                        failed_writer, string_row,
337                        "Cannot update: no such entry.")
338                    continue
339                update_errors = self.checkUpdateRequirements(obj, row, site)
340                if update_errors is not None:
341                    num_warns += 1
342                    self.writeFailedRow(
343                        failed_writer, string_row, update_errors)
344                    continue
345                self.updateEntry(obj, row, site)
346            finished_writer.writerow(string_row)
347
348        time_end = time.time()
349        timedelta = time_end - time_start
350
351        self.createLogfile(path, failed_path, num, num_warns, mode, user,
352                           timedelta, logger=logger)
353        failed_path = os.path.abspath(failed_path)
354        if num_warns == 0:
355            del failed_writer
356            os.unlink(failed_path)
357            failed_path = None
358        return (num, num_warns,
359                os.path.abspath(finished_path), failed_path)
360
361class ExporterBase(object):
362    """A base for exporters.
363    """
364    grok.implements(ICSVExporter)
365
366    #: Fieldnames considered by this exporter
367    fields = ('code', 'title', 'title_prefix')
368
369    #: The title under which this exporter will be displayed
370    #: (if registered as a utility)
371    title = 'Override this title'
372
373    def mangle_value(self, value, name, context=None):
374        """Hook for mangling values in derived classes
375        """
376        if isinstance(value, bool):
377            value = value and '1' or '0'
378        elif isinstance(value, unicode):
379            # CSV writers like byte streams better than unicode
380            value = value.encode('utf-8')
381        elif value is None:
382            # None is not really representable in CSV files
383            value = ''
384        return value
385
386    def get_csv_writer(self, filepath=None):
387        """Get a CSV dict writer instance open for writing.
388
389        Returns a tuple (<writer>, <outfile>) where ``<writer>`` is a
390        :class:`csv.DictWriter` instance and outfile is the real file
391        which is written to. The latter is important when writing to
392        StringIO and can normally be ignored otherwise.
393
394        The returned file will already be filled with the header row.
395
396        Please note that if you give a filepath, the returned outfile
397        is open for writing only and you might have to close it before
398        reopening it for reading.
399        """
400        if filepath is None:
401            outfile = StringIO()
402        else:
403            outfile = open(filepath, 'wb')
404        writer = csv.DictWriter(outfile, self.fields)
405        writer.writerow(dict(zip(self.fields, self.fields))) # header
406        return writer, outfile
407
408    def write_item(self, obj, writer):
409        """Write a row extracted from `obj` into CSV file using `writer`.
410        """
411        row = {}
412        for name in self.fields:
413            value = getattr(obj, name, None)
414            value = self.mangle_value(value, name, obj)
415            row[name] = value
416        writer.writerow(row)
417        return
418
419    def close_outfile(self, filepath, outfile):
420        """Close outfile.
421
422        If filepath is None, the contents of outfile is returned.
423        """
424        outfile.seek(0)
425        if filepath is None:
426            return outfile.read()
427        outfile.close()
428        return
429
430    def export(self, iterable, filepath=None):
431        """Export `iterable` as CSV file.
432
433        If `filepath` is ``None``, a raw string with CSV data should
434        be returned.
435        """
436        raise NotImplementedError
437
438    def export_all(self, site, filepath=None):
439        """Export all appropriate objects in `site` into `filepath` as
440        CSV data.
441
442        If `filepath` is ``None``, a raw string with CSV data should
443        be returned.
444        """
445        raise NotImplementedError
Note: See TracBrowser for help on using the repository browser.