source: main/waeup.kofa/trunk/src/waeup/kofa/utils/batching.py @ 8511

Last change on this file since 8511 was 8509, checked in by Henrik Bettermann, 13 years ago

Emergency fix. Completely untested.

  • Property svn:keywords set to Id
File size: 17.3 KB
Line 
1## $Id: batching.py 8509 2012-05-24 15:22:10Z henrik $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
18"""Kofa components for batch processing.
19
20Batch processors eat CSV files to add, update or remove large numbers
21of certain kinds of objects at once.
22"""
23import grok
24import csv
25import datetime
26import os
27import tempfile
28import time
29from cStringIO import StringIO
30from zope.component import createObject
31from zope.interface import Interface
32from zope.schema import getFields
33from zope.event import notify
34from waeup.kofa.interfaces import (
35    IBatchProcessor, FatalCSVError, IObjectConverter,
36    ICSVExporter, IGNORE_MARKER, DuplicationError)
37
38class BatchProcessor(grok.GlobalUtility):
39    """A processor to add, update, or remove data.
40
41    This is a non-active baseclass.
42    """
43    grok.implements(IBatchProcessor)
44    grok.context(Interface)
45    grok.baseclass()
46
47    # Name used in pages and forms...
48    name = u'Non-registered base processor'
49
50    # Internal name...
51    util_name = 'baseprocessor'
52
53    # Items for this processor need an interface with zope.schema fields.
54    iface = Interface
55
56    # The name must be the same as the util_name attribute in order to
57    # register this utility correctly.
58    grok.name(util_name)
59
60    # Headers needed to locate items...
61    location_fields = ['code', 'faculty_code']
62
63    # A factory with this name must be registered...
64    factory_name = 'waeup.Department'
65
66    @property
67    def required_fields(self):
68        """Required fields that have no default.
69
70        A list of names of field, whose value cannot be set if not
71        given during creation. Therefore these fields must exist in
72        input.
73
74        Fields with a default != missing_value do not belong to this
75        category.
76        """
77        result = []
78        for key, field in getFields(self.iface).items():
79            if key in self.location_fields:
80                continue
81            if field.default is not field.missing_value:
82                continue
83            if field.required:
84                result.append(key)
85        return result
86
87    @property
88    def req(self):
89        result = dict(
90            create = self.location_fields + self.required_fields,
91            update = self.location_fields,
92            remove = self.location_fields,
93        )
94        return result
95
96    @property
97    def available_fields(self):
98        return sorted(list(set(
99                    self.location_fields + getFields(self.iface).keys())))
100
101    def getHeaders(self, mode='create'):
102        return self.available_fields
103
104    def checkHeaders(self, headerfields, mode='create'):
105        req = self.req[mode]
106        # Check for required fields...
107        for field in req:
108            if not field in headerfields:
109                raise FatalCSVError(
110                    "Need at least columns %s for import!" %
111                    ', '.join(["'%s'" % x for x in req]))
112        # Check for double fields. Cannot happen because this error is
113        # already catched in views
114        not_ignored_fields = [x for x in headerfields
115                              if not x.startswith('--')]
116        if len(set(not_ignored_fields)) < len(not_ignored_fields):
117            raise FatalCSVError(
118                "Double headers: each column name may only appear once.")
119        return True
120
121    def applyMapping(self, row, mapping):
122        """Apply mapping to a row of CSV data.
123
124        """
125        result = dict()
126        for key, replacement in mapping.items():
127            if replacement == u'--IGNORE--':
128                # Skip ignored columns in failed and finished data files.
129                continue
130            result[replacement] = row[key]
131        return result
132
133    def getMapping(self, path, headerfields, mode):
134        """Get a mapping from CSV file headerfields to actually used fieldnames.
135
136        """
137        result = dict()
138        reader = csv.reader(open(path, 'rb'))
139        raw_header = reader.next()
140        for num, field in enumerate(headerfields):
141            if field not in self.location_fields and mode == 'remove':
142                # Skip non-location fields when removing.
143                continue
144            if field == u'--IGNORE--':
145                # Skip ignored columns in failed and finished data files.
146                continue
147            result[raw_header[num]] = field
148        return result
149
150    def stringFromErrs(self, errors, inv_errors):
151        result = []
152        for err in errors:
153            fieldname, message = err
154            result.append("%s: %s" % (fieldname, message))
155        for err in inv_errors:
156            result.append("invariant: %s" % err)
157        return '; '.join(result)
158
159    def callFactory(self, *args, **kw):
160        return createObject(self.factory_name)
161
162    def parentsExist(self, row, site):
163        """Tell whether the parent object for data in ``row`` exists.
164        """
165        raise NotImplementedError('method not implemented')
166
167    def entryExists(self, row, site):
168        """Tell whether there already exists an entry for ``row`` data.
169        """
170        raise NotImplementedError('method not implemented')
171
172    def getParent(self, row, site):
173        """Get the parent object for the entry in ``row``.
174        """
175        raise NotImplementedError('method not implemented')
176
177    def getEntry(self, row, site):
178        """Get the parent object for the entry in ``row``.
179        """
180        raise NotImplementedError('method not implemented')
181
182    def addEntry(self, obj, row, site):
183        """Add the entry given given by ``row`` data.
184        """
185        raise NotImplementedError('method not implemented')
186
187    def delEntry(self, row, site):
188        """Delete entry given by ``row`` data.
189        """
190        raise NotImplementedError('method not implemented')
191
192    def checkUpdateRequirements(self, obj, row, site):
193        """Checks requirements the object must fulfill when being updated.
194
195        This method is not used in case of deleting or adding objects.
196
197        Returns error messages as strings in case of requirement
198        problems.
199        """
200        return None
201
202    def updateEntry(self, obj, row, site):
203        """Update obj to the values given in row.
204
205        Returns a string describing the fields changed.
206        """
207        changed = []
208        for key, value in row.items():
209            # Skip fields to be ignored.
210            if value == IGNORE_MARKER:
211                continue
212            # Skip fields not declared in interface and which are
213            # not yet attributes of existing objects. We can thus not
214            # add non-existing attributes here.
215            if not hasattr(obj, key):
216                continue
217            setattr(obj, key, value)
218            log_value = getattr(value, 'code', value)
219            changed.append('%s=%s' % (key, log_value))
220
221        # If any catalog is involved it must be updated.
222        #
223        # XXX: The event is also triggered when creating objects as
224        # updateEntry is called also when creating entries resulting
225        # in objectAdded and additional objectModified events.
226        if len(changed):
227            notify(grok.ObjectModifiedEvent(obj))
228
229        return ', '.join(changed)
230
231    def createLogfile(self, path, fail_path, num, warnings, mode, user,
232                      timedelta, logger=None):
233        """Write to log file.
234        """
235        if logger is None:
236            return
237        status = 'OK'
238        if warnings > 0:
239            status = 'FAILED'
240        logger.info("-" * 20)
241        logger.info("%s: Batch processing finished: %s" % (user, status))
242        logger.info("%s: Source: %s" % (user, path))
243        logger.info("%s: Mode: %s" % (user, mode))
244        logger.info("%s: User: %s" % (user, user))
245        if warnings > 0:
246            logger.info("%s: Failed datasets: %s" % (
247                    user, os.path.basename(fail_path)))
248        logger.info("%s: Processing time: %0.3f s (%0.4f s/item)" % (
249                user, timedelta, timedelta/(num or 1)))
250        logger.info("%s: Processed: %s lines (%s successful/ %s failed)" % (
251                user, num, num - warnings, warnings
252                ))
253        logger.info("-" * 20)
254        return
255
256    def writeFailedRow(self, writer, row, warnings):
257        """Write a row with error messages to error CSV.
258
259        If warnings is a list of strings, they will be concatenated.
260        """
261        error_col = warnings
262        if isinstance(warnings, list):
263            error_col = ' / '.join(warnings)
264        row['--ERRORS--'] = error_col
265        writer.writerow(row)
266        return
267
268    def checkConversion(self, row, mode='ignore', ignore_empty=True):
269        """Validates all values in row.
270        """
271        converter = IObjectConverter(self.iface)
272        errs, inv_errs, conv_dict =  converter.fromStringDict(
273            row, self.factory_name, mode=mode)
274        return errs, inv_errs, conv_dict
275
276    def doImport(self, path, headerfields, mode='create', user='Unknown',
277                 logger=None, ignore_empty=True):
278        """Perform actual import.
279        """
280        time_start = time.time()
281        self.checkHeaders(headerfields, mode)
282        mapping = self.getMapping(path, headerfields, mode)
283        reader = csv.DictReader(open(path, 'rb'))
284
285        temp_dir = tempfile.mkdtemp()
286
287        base = os.path.basename(path)
288        (base, ext) = os.path.splitext(base)
289        failed_path = os.path.join(temp_dir, "%s.pending%s" % (base, ext))
290        failed_headers = mapping.values()
291        failed_headers.append('--ERRORS--')
292        failed_writer = csv.DictWriter(open(failed_path, 'wb'),
293                                       failed_headers)
294        failed_writer.writerow(dict([(x,x) for x in failed_headers]))
295
296        finished_path = os.path.join(temp_dir, "%s.finished%s" % (base, ext))
297        finished_headers = mapping.values()
298        finished_writer = csv.DictWriter(open(finished_path, 'wb'),
299                                         finished_headers)
300        finished_writer.writerow(dict([(x,x) for x in finished_headers]))
301
302        num =0
303        num_warns = 0
304        site = grok.getSite()
305
306        for raw_row in reader:
307            num += 1
308            string_row = self.applyMapping(raw_row, mapping)
309            if ignore_empty and mode in ('update',):
310                # replace empty strings with ignore-markers
311                for key, val in string_row.items():
312                    if val == '':
313                        string_row[key] = IGNORE_MARKER
314            row = dict(string_row.items()) # create deep copy
315            errs, inv_errs, conv_dict = self.checkConversion(string_row, mode)
316            if errs or inv_errs:
317                num_warns += 1
318                conv_warnings = self.stringFromErrs(errs, inv_errs)
319                self.writeFailedRow(
320                    failed_writer, string_row, conv_warnings)
321                continue
322            row.update(conv_dict)
323
324            if mode == 'create':
325                if not self.parentsExist(row, site):
326                    num_warns += 1
327                    self.writeFailedRow(
328                        failed_writer, string_row,
329                        "Not all parents do exist yet. Skipping")
330                    continue
331                if self.entryExists(row, site):
332                    num_warns += 1
333                    self.writeFailedRow(
334                        failed_writer, string_row,
335                        "This object already exists. Skipping.")
336                    continue
337                obj = self.callFactory()
338                # Override all values in row, also
339                # student_ids and applicant_ids which have been
340                # generated in the respective __init__ methods before.
341                self.updateEntry(obj, row, site)
342                try:
343                    self.addEntry(obj, row, site)
344                except KeyError, error:
345                    num_warns += 1
346                    self.writeFailedRow(
347                        failed_writer, string_row,
348                        "%s Skipping." % error.message)
349                except DuplicationError, error:
350                    num_warns += 1
351                    self.writeFailedRow(
352                        failed_writer, string_row,
353                        "%s Skipping." % error.msg)
354                    continue
355            elif mode == 'remove':
356                if not self.entryExists(row, site):
357                    num_warns += 1
358                    self.writeFailedRow(
359                        failed_writer, string_row,
360                        "Cannot remove: no such entry.")
361                    continue
362                self.delEntry(row, site)
363            elif mode == 'update':
364                obj = self.getEntry(row, site)
365                if obj is None:
366                    num_warns += 1
367                    self.writeFailedRow(
368                        failed_writer, string_row,
369                        "Cannot update: no such entry.")
370                    continue
371                update_errors = self.checkUpdateRequirements(obj, row, site)
372                if update_errors is not None:
373                    num_warns += 1
374                    self.writeFailedRow(
375                        failed_writer, string_row, update_errors)
376                    continue
377                self.updateEntry(obj, row, site)
378            finished_writer.writerow(string_row)
379
380        time_end = time.time()
381        timedelta = time_end - time_start
382
383        self.createLogfile(path, failed_path, num, num_warns, mode, user,
384                           timedelta, logger=logger)
385        failed_path = os.path.abspath(failed_path)
386        if num_warns == 0:
387            del failed_writer
388            os.unlink(failed_path)
389            failed_path = None
390        return (num, num_warns,
391                os.path.abspath(finished_path), failed_path)
392
393class ExporterBase(object):
394    """A base for exporters.
395    """
396    grok.implements(ICSVExporter)
397
398    #: Fieldnames considered by this exporter
399    fields = ('code', 'title', 'title_prefix')
400
401    #: The title under which this exporter will be displayed
402    #: (if registered as a utility)
403    title = 'Override this title'
404
405    def mangle_value(self, value, name, context=None):
406        """Hook for mangling values in derived classes
407        """
408        if isinstance(value, bool):
409            value = value and '1' or '0'
410        elif isinstance(value, unicode):
411            # CSV writers like byte streams better than unicode
412            value = value.encode('utf-8')
413        elif isinstance(value, datetime.datetime):
414            value = str(value)
415        elif isinstance(value, datetime.date):
416            # Order is important here: check for date after datetime as
417            # datetimes are also dates.
418            #
419            # Append hash '#' to dates to circumvent unwanted excel automatic
420            value = str('%s#' % value)
421        elif value is None:
422            # None is not really representable in CSV files
423            value = ''
424        return value
425
426    def get_csv_writer(self, filepath=None):
427        """Get a CSV dict writer instance open for writing.
428
429        Returns a tuple (<writer>, <outfile>) where ``<writer>`` is a
430        :class:`csv.DictWriter` instance and outfile is the real file
431        which is written to. The latter is important when writing to
432        StringIO and can normally be ignored otherwise.
433
434        The returned file will already be filled with the header row.
435
436        Please note that if you give a filepath, the returned outfile
437        is open for writing only and you might have to close it before
438        reopening it for reading.
439        """
440        if filepath is None:
441            outfile = StringIO()
442        else:
443            outfile = open(filepath, 'wb')
444        writer = csv.DictWriter(outfile, self.fields)
445        writer.writerow(dict(zip(self.fields, self.fields))) # header
446        return writer, outfile
447
448    def write_item(self, obj, writer):
449        """Write a row extracted from `obj` into CSV file using `writer`.
450        """
451        row = {}
452        for name in self.fields:
453            value = getattr(obj, name, None)
454            value = self.mangle_value(value, name, obj)
455            row[name] = value
456        writer.writerow(row)
457        return
458
459    def close_outfile(self, filepath, outfile):
460        """Close outfile.
461
462        If filepath is None, the contents of outfile is returned.
463        """
464        outfile.seek(0)
465        if filepath is None:
466            return outfile.read()
467        outfile.close()
468        return
469
470    def export(self, iterable, filepath=None):
471        """Export `iterable` as CSV file.
472
473        If `filepath` is ``None``, a raw string with CSV data should
474        be returned.
475        """
476        raise NotImplementedError
477
478    def export_all(self, site, filepath=None):
479        """Export all appropriate objects in `site` into `filepath` as
480        CSV data.
481
482        If `filepath` is ``None``, a raw string with CSV data should
483        be returned.
484        """
485        raise NotImplementedError
Note: See TracBrowser for help on using the repository browser.