source: main/waeup.kofa/trunk/src/waeup/kofa/utils/batching.py @ 8332

Last change on this file since 8332 was 8332, checked in by Henrik Bettermann, 12 years ago

ObjectModifiedEvent? must be fired in updateEntry to ensure that catalogs involved are updated after setting attributes.

  • Property svn:keywords set to Id
File size: 16.5 KB
Line 
1## $Id: batching.py 8332 2012-05-03 11:41:27Z henrik $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
18"""Kofa components for batch processing.
19
20Batch processors eat CSV files to add, update or remove large numbers
21of certain kinds of objects at once.
22"""
23import grok
24import csv
25import os
26import tempfile
27import time
28from cStringIO import StringIO
29from zope.component import createObject
30from zope.interface import Interface
31from zope.schema import getFields
32from zope.event import notify
33from waeup.kofa.interfaces import (
34    IBatchProcessor, FatalCSVError, IObjectConverter,
35    ICSVExporter, IGNORE_MARKER)
36
37class BatchProcessor(grok.GlobalUtility):
38    """A processor to add, update, or remove data.
39
40    This is a non-active baseclass.
41    """
42    grok.implements(IBatchProcessor)
43    grok.context(Interface)
44    grok.baseclass()
45
46    # Name used in pages and forms...
47    name = u'Non-registered base processor'
48
49    # Internal name...
50    util_name = 'baseprocessor'
51
52    # Items for this processor need an interface with zope.schema fields.
53    iface = Interface
54
55    # The name must be the same as the util_name attribute in order to
56    # register this utility correctly.
57    grok.name(util_name)
58
59    # Headers needed to locate items...
60    location_fields = ['code', 'faculty_code']
61
62    # A factory with this name must be registered...
63    factory_name = 'waeup.Department'
64
65    @property
66    def required_fields(self):
67        """Required fields that have no default.
68
69        A list of names of field, whose value cannot be set if not
70        given during creation. Therefore these fields must exist in
71        input.
72
73        Fields with a default != missing_value do not belong to this
74        category.
75        """
76        result = []
77        for key, field in getFields(self.iface).items():
78            if key in self.location_fields:
79                continue
80            if field.default is not field.missing_value:
81                continue
82            if field.required:
83                result.append(key)
84        return result
85
86    @property
87    def req(self):
88        result = dict(
89            create = self.location_fields + self.required_fields,
90            update = self.location_fields,
91            remove = self.location_fields,
92        )
93        return result
94
95    @property
96    def available_fields(self):
97        return sorted(list(set(
98                    self.location_fields + getFields(self.iface).keys())))
99
100    def getHeaders(self, mode='create'):
101        return self.available_fields
102
103    def checkHeaders(self, headerfields, mode='create'):
104        req = self.req[mode]
105        # Check for required fields...
106        for field in req:
107            if not field in headerfields:
108                raise FatalCSVError(
109                    "Need at least columns %s for import!" %
110                    ', '.join(["'%s'" % x for x in req]))
111        # Check for double fields. Cannot happen because this error is
112        # already catched in views
113        not_ignored_fields = [x for x in headerfields
114                              if not x.startswith('--')]
115        if len(set(not_ignored_fields)) < len(not_ignored_fields):
116            raise FatalCSVError(
117                "Double headers: each column name may only appear once.")
118        return True
119
120    def applyMapping(self, row, mapping):
121        """Apply mapping to a row of CSV data.
122
123        """
124        result = dict()
125        for key, replacement in mapping.items():
126            if replacement == u'--IGNORE--':
127                # Skip ignored columns in failed and finished data files.
128                continue
129            result[replacement] = row[key]
130        return result
131
132    def getMapping(self, path, headerfields, mode):
133        """Get a mapping from CSV file headerfields to actually used fieldnames.
134
135        """
136        result = dict()
137        reader = csv.reader(open(path, 'rb'))
138        raw_header = reader.next()
139        for num, field in enumerate(headerfields):
140            if field not in self.location_fields and mode == 'remove':
141                # Skip non-location fields when removing.
142                continue
143            if field == u'--IGNORE--':
144                # Skip ignored columns in failed and finished data files.
145                continue
146            result[raw_header[num]] = field
147        return result
148
149    def stringFromErrs(self, errors, inv_errors):
150        result = []
151        for err in errors:
152            fieldname, message = err
153            result.append("%s: %s" % (fieldname, message))
154        for err in inv_errors:
155            result.append("invariant: %s" % err)
156        return '; '.join(result)
157
158    def callFactory(self, *args, **kw):
159        return createObject(self.factory_name)
160
161    def parentsExist(self, row, site):
162        """Tell whether the parent object for data in ``row`` exists.
163        """
164        raise NotImplementedError('method not implemented')
165
166    def entryExists(self, row, site):
167        """Tell whether there already exists an entry for ``row`` data.
168        """
169        raise NotImplementedError('method not implemented')
170
171    def getParent(self, row, site):
172        """Get the parent object for the entry in ``row``.
173        """
174        raise NotImplementedError('method not implemented')
175
176    def getEntry(self, row, site):
177        """Get the parent object for the entry in ``row``.
178        """
179        raise NotImplementedError('method not implemented')
180
181    def addEntry(self, obj, row, site):
182        """Add the entry given given by ``row`` data.
183        """
184        raise NotImplementedError('method not implemented')
185
186    def delEntry(self, row, site):
187        """Delete entry given by ``row`` data.
188        """
189        raise NotImplementedError('method not implemented')
190
191    def checkUpdateRequirements(self, obj, row, site):
192        """Checks requirements the object must fulfill when being updated.
193
194        This method is not used in case of deleting or adding objects.
195
196        Returns error messages as strings in case of requirement
197        problems.
198        """
199        return None
200
201    def updateEntry(self, obj, row, site):
202        """Update obj to the values given in row.
203
204        Returns a string describing the fields changed.
205        """
206        changed = []
207        for key, value in row.items():
208            # Skip fields to be ignored.
209            if value == IGNORE_MARKER:
210                continue
211            # Skip fields not declared in interface and which are
212            # not yet attributes of existing objects. We can thus not
213            # add non-existing attributes here.
214            if not hasattr(obj, key):
215                continue
216            setattr(obj, key, value)
217            log_value = getattr(value, 'code', value)
218            changed.append('%s=%s' % (key, log_value))
219
220        # If any catalog is involved it must be updated
221        notify(grok.ObjectModifiedEvent(obj))
222
223        return ', '.join(changed)
224
225    def createLogfile(self, path, fail_path, num, warnings, mode, user,
226                      timedelta, logger=None):
227        """Write to log file.
228        """
229        if logger is None:
230            return
231        status = 'OK'
232        if warnings > 0:
233            status = 'FAILED'
234        logger.info("-" * 20)
235        logger.info("%s: Batch processing finished: %s" % (user, status))
236        logger.info("%s: Source: %s" % (user, path))
237        logger.info("%s: Mode: %s" % (user, mode))
238        logger.info("%s: User: %s" % (user, user))
239        if warnings > 0:
240            logger.info("%s: Failed datasets: %s" % (
241                    user, os.path.basename(fail_path)))
242        logger.info("%s: Processing time: %0.3f s (%0.4f s/item)" % (
243                user, timedelta, timedelta/(num or 1)))
244        logger.info("%s: Processed: %s lines (%s successful/ %s failed)" % (
245                user, num, num - warnings, warnings
246                ))
247        logger.info("-" * 20)
248        return
249
250    def writeFailedRow(self, writer, row, warnings):
251        """Write a row with error messages to error CSV.
252
253        If warnings is a list of strings, they will be concatenated.
254        """
255        error_col = warnings
256        if isinstance(warnings, list):
257            error_col = ' / '.join(warnings)
258        row['--ERRORS--'] = error_col
259        writer.writerow(row)
260        return
261
262    def checkConversion(self, row, mode='ignore', ignore_empty=True):
263        """Validates all values in row.
264        """
265        converter = IObjectConverter(self.iface)
266        errs, inv_errs, conv_dict =  converter.fromStringDict(
267            row, self.factory_name, mode=mode)
268        return errs, inv_errs, conv_dict
269
270    def doImport(self, path, headerfields, mode='create', user='Unknown',
271                 logger=None, ignore_empty=True):
272        """Perform actual import.
273        """
274        time_start = time.time()
275        self.checkHeaders(headerfields, mode)
276        mapping = self.getMapping(path, headerfields, mode)
277        reader = csv.DictReader(open(path, 'rb'))
278
279        temp_dir = tempfile.mkdtemp()
280
281        base = os.path.basename(path)
282        (base, ext) = os.path.splitext(base)
283        failed_path = os.path.join(temp_dir, "%s.pending%s" % (base, ext))
284        failed_headers = mapping.values()
285        failed_headers.append('--ERRORS--')
286        failed_writer = csv.DictWriter(open(failed_path, 'wb'),
287                                       failed_headers)
288        failed_writer.writerow(dict([(x,x) for x in failed_headers]))
289
290        finished_path = os.path.join(temp_dir, "%s.finished%s" % (base, ext))
291        finished_headers = mapping.values()
292        finished_writer = csv.DictWriter(open(finished_path, 'wb'),
293                                         finished_headers)
294        finished_writer.writerow(dict([(x,x) for x in finished_headers]))
295
296        num =0
297        num_warns = 0
298        site = grok.getSite()
299
300        for raw_row in reader:
301            num += 1
302            string_row = self.applyMapping(raw_row, mapping)
303            if ignore_empty and mode in ('update',):
304                # replace empty strings with ignore-markers
305                for key, val in string_row.items():
306                    if val == '':
307                        string_row[key] = IGNORE_MARKER
308            row = dict(string_row.items()) # create deep copy
309            errs, inv_errs, conv_dict = self.checkConversion(string_row, mode)
310            if errs or inv_errs:
311                num_warns += 1
312                conv_warnings = self.stringFromErrs(errs, inv_errs)
313                self.writeFailedRow(
314                    failed_writer, string_row, conv_warnings)
315                continue
316            row.update(conv_dict)
317
318            if mode == 'create':
319                if not self.parentsExist(row, site):
320                    num_warns += 1
321                    self.writeFailedRow(
322                        failed_writer, string_row,
323                        "Not all parents do exist yet. Skipping")
324                    continue
325                if self.entryExists(row, site):
326                    num_warns += 1
327                    self.writeFailedRow(
328                        failed_writer, string_row,
329                        "This object already exists. Skipping.")
330                    continue
331                obj = self.callFactory()
332                # Override all values in row, also
333                # student_ids and applicant_ids which have been
334                # generated in the respective __init__ methods before.
335                self.updateEntry(obj, row, site)
336                try:
337                    self.addEntry(obj, row, site)
338                except KeyError, error:
339                    num_warns += 1
340                    self.writeFailedRow(
341                        failed_writer, string_row,
342                        "%s Skipping." % error.message)
343                    continue
344            elif mode == 'remove':
345                if not self.entryExists(row, site):
346                    num_warns += 1
347                    self.writeFailedRow(
348                        failed_writer, string_row,
349                        "Cannot remove: no such entry.")
350                    continue
351                self.delEntry(row, site)
352            elif mode == 'update':
353                obj = self.getEntry(row, site)
354                if obj is None:
355                    num_warns += 1
356                    self.writeFailedRow(
357                        failed_writer, string_row,
358                        "Cannot update: no such entry.")
359                    continue
360                update_errors = self.checkUpdateRequirements(obj, row, site)
361                if update_errors is not None:
362                    num_warns += 1
363                    self.writeFailedRow(
364                        failed_writer, string_row, update_errors)
365                    continue
366                self.updateEntry(obj, row, site)
367            finished_writer.writerow(string_row)
368
369        time_end = time.time()
370        timedelta = time_end - time_start
371
372        self.createLogfile(path, failed_path, num, num_warns, mode, user,
373                           timedelta, logger=logger)
374        failed_path = os.path.abspath(failed_path)
375        if num_warns == 0:
376            del failed_writer
377            os.unlink(failed_path)
378            failed_path = None
379        return (num, num_warns,
380                os.path.abspath(finished_path), failed_path)
381
382class ExporterBase(object):
383    """A base for exporters.
384    """
385    grok.implements(ICSVExporter)
386
387    #: Fieldnames considered by this exporter
388    fields = ('code', 'title', 'title_prefix')
389
390    #: The title under which this exporter will be displayed
391    #: (if registered as a utility)
392    title = 'Override this title'
393
394    def mangle_value(self, value, name, context=None):
395        """Hook for mangling values in derived classes
396        """
397        if isinstance(value, bool):
398            value = value and '1' or '0'
399        elif isinstance(value, unicode):
400            # CSV writers like byte streams better than unicode
401            value = value.encode('utf-8')
402        elif value is None:
403            # None is not really representable in CSV files
404            value = ''
405        return value
406
407    def get_csv_writer(self, filepath=None):
408        """Get a CSV dict writer instance open for writing.
409
410        Returns a tuple (<writer>, <outfile>) where ``<writer>`` is a
411        :class:`csv.DictWriter` instance and outfile is the real file
412        which is written to. The latter is important when writing to
413        StringIO and can normally be ignored otherwise.
414
415        The returned file will already be filled with the header row.
416
417        Please note that if you give a filepath, the returned outfile
418        is open for writing only and you might have to close it before
419        reopening it for reading.
420        """
421        if filepath is None:
422            outfile = StringIO()
423        else:
424            outfile = open(filepath, 'wb')
425        writer = csv.DictWriter(outfile, self.fields)
426        writer.writerow(dict(zip(self.fields, self.fields))) # header
427        return writer, outfile
428
429    def write_item(self, obj, writer):
430        """Write a row extracted from `obj` into CSV file using `writer`.
431        """
432        row = {}
433        for name in self.fields:
434            value = getattr(obj, name, None)
435            value = self.mangle_value(value, name, obj)
436            row[name] = value
437        writer.writerow(row)
438        return
439
440    def close_outfile(self, filepath, outfile):
441        """Close outfile.
442
443        If filepath is None, the contents of outfile is returned.
444        """
445        outfile.seek(0)
446        if filepath is None:
447            return outfile.read()
448        outfile.close()
449        return
450
451    def export(self, iterable, filepath=None):
452        """Export `iterable` as CSV file.
453
454        If `filepath` is ``None``, a raw string with CSV data should
455        be returned.
456        """
457        raise NotImplementedError
458
459    def export_all(self, site, filepath=None):
460        """Export all appropriate objects in `site` into `filepath` as
461        CSV data.
462
463        If `filepath` is ``None``, a raw string with CSV data should
464        be returned.
465        """
466        raise NotImplementedError
Note: See TracBrowser for help on using the repository browser.