source: main/waeup.kofa/trunk/src/waeup/kofa/utils/batching.py @ 8550

Last change on this file since 8550 was 8540, checked in by Henrik Bettermann, 13 years ago

Fix emergency fix. Due to this bug I recovered that generate_applicant_id generates duplicate keys.

  • Property svn:keywords set to Id
File size: 17.4 KB
Line 
1## $Id: batching.py 8540 2012-05-29 06:22:11Z henrik $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
18"""Kofa components for batch processing.
19
20Batch processors eat CSV files to add, update or remove large numbers
21of certain kinds of objects at once.
22"""
23import grok
24import csv
25import datetime
26import os
27import tempfile
28import time
29from cStringIO import StringIO
30from zope.component import createObject
31from zope.interface import Interface
32from zope.schema import getFields
33from zope.event import notify
34from waeup.kofa.interfaces import (
35    IBatchProcessor, FatalCSVError, IObjectConverter,
36    ICSVExporter, IGNORE_MARKER, DuplicationError)
37
38class BatchProcessor(grok.GlobalUtility):
39    """A processor to add, update, or remove data.
40
41    This is a non-active baseclass.
42    """
43    grok.implements(IBatchProcessor)
44    grok.context(Interface)
45    grok.baseclass()
46
47    # Name used in pages and forms...
48    name = u'Non-registered base processor'
49
50    # Internal name...
51    util_name = 'baseprocessor'
52
53    # Items for this processor need an interface with zope.schema fields.
54    iface = Interface
55
56    # The name must be the same as the util_name attribute in order to
57    # register this utility correctly.
58    grok.name(util_name)
59
60    # Headers needed to locate items...
61    location_fields = ['code', 'faculty_code']
62
63    # A factory with this name must be registered...
64    factory_name = 'waeup.Department'
65
66    @property
67    def required_fields(self):
68        """Required fields that have no default.
69
70        A list of names of field, whose value cannot be set if not
71        given during creation. Therefore these fields must exist in
72        input.
73
74        Fields with a default != missing_value do not belong to this
75        category.
76        """
77        result = []
78        for key, field in getFields(self.iface).items():
79            if key in self.location_fields:
80                continue
81            if field.default is not field.missing_value:
82                continue
83            if field.required:
84                result.append(key)
85        return result
86
87    @property
88    def req(self):
89        result = dict(
90            create = self.location_fields + self.required_fields,
91            update = self.location_fields,
92            remove = self.location_fields,
93        )
94        return result
95
96    @property
97    def available_fields(self):
98        return sorted(list(set(
99                    self.location_fields + getFields(self.iface).keys())))
100
101    def getHeaders(self, mode='create'):
102        return self.available_fields
103
104    def checkHeaders(self, headerfields, mode='create'):
105        req = self.req[mode]
106        # Check for required fields...
107        for field in req:
108            if not field in headerfields:
109                raise FatalCSVError(
110                    "Need at least columns %s for import!" %
111                    ', '.join(["'%s'" % x for x in req]))
112        # Check for double fields. Cannot happen because this error is
113        # already catched in views
114        not_ignored_fields = [x for x in headerfields
115                              if not x.startswith('--')]
116        if len(set(not_ignored_fields)) < len(not_ignored_fields):
117            raise FatalCSVError(
118                "Double headers: each column name may only appear once.")
119        return True
120
121    def applyMapping(self, row, mapping):
122        """Apply mapping to a row of CSV data.
123
124        """
125        result = dict()
126        for key, replacement in mapping.items():
127            if replacement == u'--IGNORE--':
128                # Skip ignored columns in failed and finished data files.
129                continue
130            result[replacement] = row[key]
131        return result
132
133    def getMapping(self, path, headerfields, mode):
134        """Get a mapping from CSV file headerfields to actually used fieldnames.
135
136        """
137        result = dict()
138        reader = csv.reader(open(path, 'rb'))
139        raw_header = reader.next()
140        for num, field in enumerate(headerfields):
141            if field not in self.location_fields and mode == 'remove':
142                # Skip non-location fields when removing.
143                continue
144            if field == u'--IGNORE--':
145                # Skip ignored columns in failed and finished data files.
146                continue
147            result[raw_header[num]] = field
148        return result
149
150    def stringFromErrs(self, errors, inv_errors):
151        result = []
152        for err in errors:
153            fieldname, message = err
154            result.append("%s: %s" % (fieldname, message))
155        for err in inv_errors:
156            result.append("invariant: %s" % err)
157        return '; '.join(result)
158
159    def callFactory(self, *args, **kw):
160        return createObject(self.factory_name)
161
162    def parentsExist(self, row, site):
163        """Tell whether the parent object for data in ``row`` exists.
164        """
165        raise NotImplementedError('method not implemented')
166
167    def entryExists(self, row, site):
168        """Tell whether there already exists an entry for ``row`` data.
169        """
170        raise NotImplementedError('method not implemented')
171
172    def getParent(self, row, site):
173        """Get the parent object for the entry in ``row``.
174        """
175        raise NotImplementedError('method not implemented')
176
177    def getEntry(self, row, site):
178        """Get the parent object for the entry in ``row``.
179        """
180        raise NotImplementedError('method not implemented')
181
182    def addEntry(self, obj, row, site):
183        """Add the entry given given by ``row`` data.
184        """
185        raise NotImplementedError('method not implemented')
186
187    def delEntry(self, row, site):
188        """Delete entry given by ``row`` data.
189        """
190        raise NotImplementedError('method not implemented')
191
192    def checkUpdateRequirements(self, obj, row, site):
193        """Checks requirements the object must fulfill when being updated.
194
195        This method is not used in case of deleting or adding objects.
196
197        Returns error messages as strings in case of requirement
198        problems.
199        """
200        return None
201
202    def updateEntry(self, obj, row, site):
203        """Update obj to the values given in row.
204
205        Returns a string describing the fields changed.
206        """
207        changed = []
208        for key, value in row.items():
209            # Skip fields to be ignored.
210            if value == IGNORE_MARKER:
211                continue
212            # Skip fields not declared in interface and which are
213            # not yet attributes of existing objects. We can thus not
214            # add non-existing attributes here.
215            if not hasattr(obj, key):
216                continue
217            setattr(obj, key, value)
218            log_value = getattr(value, 'code', value)
219            changed.append('%s=%s' % (key, log_value))
220
221        # If any catalog is involved it must be updated.
222        #
223        # XXX: The event is also triggered when creating objects as
224        # updateEntry is called also when creating entries resulting
225        # in objectAdded and additional objectModified events.
226        if len(changed):
227            notify(grok.ObjectModifiedEvent(obj))
228
229        return ', '.join(changed)
230
231    def createLogfile(self, path, fail_path, num, warnings, mode, user,
232                      timedelta, logger=None):
233        """Write to log file.
234        """
235        if logger is None:
236            return
237        status = 'OK'
238        if warnings > 0:
239            status = 'FAILED'
240        logger.info("-" * 20)
241        logger.info("%s: Batch processing finished: %s" % (user, status))
242        logger.info("%s: Source: %s" % (user, path))
243        logger.info("%s: Mode: %s" % (user, mode))
244        logger.info("%s: User: %s" % (user, user))
245        if warnings > 0:
246            logger.info("%s: Failed datasets: %s" % (
247                    user, os.path.basename(fail_path)))
248        logger.info("%s: Processing time: %0.3f s (%0.4f s/item)" % (
249                user, timedelta, timedelta/(num or 1)))
250        logger.info("%s: Processed: %s lines (%s successful/ %s failed)" % (
251                user, num, num - warnings, warnings
252                ))
253        logger.info("-" * 20)
254        return
255
256    def writeFailedRow(self, writer, row, warnings):
257        """Write a row with error messages to error CSV.
258
259        If warnings is a list of strings, they will be concatenated.
260        """
261        error_col = warnings
262        if isinstance(warnings, list):
263            error_col = ' / '.join(warnings)
264        row['--ERRORS--'] = error_col
265        writer.writerow(row)
266        return
267
268    def checkConversion(self, row, mode='ignore', ignore_empty=True):
269        """Validates all values in row.
270        """
271        converter = IObjectConverter(self.iface)
272        errs, inv_errs, conv_dict =  converter.fromStringDict(
273            row, self.factory_name, mode=mode)
274        return errs, inv_errs, conv_dict
275
276    def doImport(self, path, headerfields, mode='create', user='Unknown',
277                 logger=None, ignore_empty=True):
278        """Perform actual import.
279        """
280        time_start = time.time()
281        self.checkHeaders(headerfields, mode)
282        mapping = self.getMapping(path, headerfields, mode)
283        reader = csv.DictReader(open(path, 'rb'))
284
285        temp_dir = tempfile.mkdtemp()
286
287        base = os.path.basename(path)
288        (base, ext) = os.path.splitext(base)
289        failed_path = os.path.join(temp_dir, "%s.pending%s" % (base, ext))
290        failed_headers = mapping.values()
291        failed_headers.append('--ERRORS--')
292        failed_writer = csv.DictWriter(open(failed_path, 'wb'),
293                                       failed_headers)
294        failed_writer.writerow(dict([(x,x) for x in failed_headers]))
295
296        finished_path = os.path.join(temp_dir, "%s.finished%s" % (base, ext))
297        finished_headers = mapping.values()
298        finished_writer = csv.DictWriter(open(finished_path, 'wb'),
299                                         finished_headers)
300        finished_writer.writerow(dict([(x,x) for x in finished_headers]))
301
302        num =0
303        num_warns = 0
304        site = grok.getSite()
305
306        for raw_row in reader:
307            num += 1
308            string_row = self.applyMapping(raw_row, mapping)
309            if ignore_empty and mode in ('update',):
310                # replace empty strings with ignore-markers
311                for key, val in string_row.items():
312                    if val == '':
313                        string_row[key] = IGNORE_MARKER
314            row = dict(string_row.items()) # create deep copy
315            errs, inv_errs, conv_dict = self.checkConversion(string_row, mode)
316            if errs or inv_errs:
317                num_warns += 1
318                conv_warnings = self.stringFromErrs(errs, inv_errs)
319                self.writeFailedRow(
320                    failed_writer, string_row, conv_warnings)
321                continue
322            row.update(conv_dict)
323
324            if mode == 'create':
325                if not self.parentsExist(row, site):
326                    num_warns += 1
327                    self.writeFailedRow(
328                        failed_writer, string_row,
329                        "Not all parents do exist yet. Skipping")
330                    continue
331                if self.entryExists(row, site):
332                    num_warns += 1
333                    self.writeFailedRow(
334                        failed_writer, string_row,
335                        "This object already exists. Skipping.")
336                    continue
337                obj = self.callFactory()
338                # Override all values in row, also
339                # student_ids and applicant_ids which have been
340                # generated in the respective __init__ methods before.
341                self.updateEntry(obj, row, site)
342                try:
343                    self.addEntry(obj, row, site)
344                except KeyError, error:
345                    num_warns += 1
346                    self.writeFailedRow(
347                        failed_writer, string_row,
348                        "%s Skipping." % error.message)
349                    continue
350                except DuplicationError, error:
351                    num_warns += 1
352                    self.writeFailedRow(
353                        failed_writer, string_row,
354                        "%s Skipping." % error.msg)
355                    continue
356            elif mode == 'remove':
357                if not self.entryExists(row, site):
358                    num_warns += 1
359                    self.writeFailedRow(
360                        failed_writer, string_row,
361                        "Cannot remove: no such entry.")
362                    continue
363                self.delEntry(row, site)
364            elif mode == 'update':
365                obj = self.getEntry(row, site)
366                if obj is None:
367                    num_warns += 1
368                    self.writeFailedRow(
369                        failed_writer, string_row,
370                        "Cannot update: no such entry.")
371                    continue
372                update_errors = self.checkUpdateRequirements(obj, row, site)
373                if update_errors is not None:
374                    num_warns += 1
375                    self.writeFailedRow(
376                        failed_writer, string_row, update_errors)
377                    continue
378                self.updateEntry(obj, row, site)
379            finished_writer.writerow(string_row)
380
381        time_end = time.time()
382        timedelta = time_end - time_start
383
384        self.createLogfile(path, failed_path, num, num_warns, mode, user,
385                           timedelta, logger=logger)
386        failed_path = os.path.abspath(failed_path)
387        if num_warns == 0:
388            del failed_writer
389            os.unlink(failed_path)
390            failed_path = None
391        return (num, num_warns,
392                os.path.abspath(finished_path), failed_path)
393
394class ExporterBase(object):
395    """A base for exporters.
396    """
397    grok.implements(ICSVExporter)
398
399    #: Fieldnames considered by this exporter
400    fields = ('code', 'title', 'title_prefix')
401
402    #: The title under which this exporter will be displayed
403    #: (if registered as a utility)
404    title = 'Override this title'
405
406    def mangle_value(self, value, name, context=None):
407        """Hook for mangling values in derived classes
408        """
409        if isinstance(value, bool):
410            value = value and '1' or '0'
411        elif isinstance(value, unicode):
412            # CSV writers like byte streams better than unicode
413            value = value.encode('utf-8')
414        elif isinstance(value, datetime.datetime):
415            value = str(value)
416        elif isinstance(value, datetime.date):
417            # Order is important here: check for date after datetime as
418            # datetimes are also dates.
419            #
420            # Append hash '#' to dates to circumvent unwanted excel automatic
421            value = str('%s#' % value)
422        elif value is None:
423            # None is not really representable in CSV files
424            value = ''
425        return value
426
427    def get_csv_writer(self, filepath=None):
428        """Get a CSV dict writer instance open for writing.
429
430        Returns a tuple (<writer>, <outfile>) where ``<writer>`` is a
431        :class:`csv.DictWriter` instance and outfile is the real file
432        which is written to. The latter is important when writing to
433        StringIO and can normally be ignored otherwise.
434
435        The returned file will already be filled with the header row.
436
437        Please note that if you give a filepath, the returned outfile
438        is open for writing only and you might have to close it before
439        reopening it for reading.
440        """
441        if filepath is None:
442            outfile = StringIO()
443        else:
444            outfile = open(filepath, 'wb')
445        writer = csv.DictWriter(outfile, self.fields)
446        writer.writerow(dict(zip(self.fields, self.fields))) # header
447        return writer, outfile
448
449    def write_item(self, obj, writer):
450        """Write a row extracted from `obj` into CSV file using `writer`.
451        """
452        row = {}
453        for name in self.fields:
454            value = getattr(obj, name, None)
455            value = self.mangle_value(value, name, obj)
456            row[name] = value
457        writer.writerow(row)
458        return
459
460    def close_outfile(self, filepath, outfile):
461        """Close outfile.
462
463        If filepath is None, the contents of outfile is returned.
464        """
465        outfile.seek(0)
466        if filepath is None:
467            return outfile.read()
468        outfile.close()
469        return
470
471    def export(self, iterable, filepath=None):
472        """Export `iterable` as CSV file.
473
474        If `filepath` is ``None``, a raw string with CSV data should
475        be returned.
476        """
477        raise NotImplementedError
478
479    def export_all(self, site, filepath=None):
480        """Export all appropriate objects in `site` into `filepath` as
481        CSV data.
482
483        If `filepath` is ``None``, a raw string with CSV data should
484        be returned.
485        """
486        raise NotImplementedError
Note: See TracBrowser for help on using the repository browser.