source: main/waeup.kofa/trunk/src/waeup/kofa/utils/batching.py @ 8304

Last change on this file since 8304 was 8304, checked in by Henrik Bettermann, 12 years ago

Add more info about the updateEntry: We can't use this method to update objects after adding fields to the interface.

  • Property svn:keywords set to Id
File size: 16.4 KB
Line 
1## $Id: batching.py 8304 2012-04-29 05:04:33Z henrik $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
18"""Kofa components for batch processing.
19
20Batch processors eat CSV files to add, update or remove large numbers
21of certain kinds of objects at once.
22"""
23import grok
24import csv
25import os
26import tempfile
27import time
28from cStringIO import StringIO
29from zope.component import createObject
30from zope.interface import Interface
31from zope.schema import getFields
32from waeup.kofa.interfaces import (
33    IBatchProcessor, FatalCSVError, IObjectConverter,
34    ICSVExporter, IGNORE_MARKER)
35
36class BatchProcessor(grok.GlobalUtility):
37    """A processor to add, update, or remove data.
38
39    This is a non-active baseclass.
40    """
41    grok.implements(IBatchProcessor)
42    grok.context(Interface)
43    grok.baseclass()
44
45    # Name used in pages and forms...
46    name = u'Non-registered base processor'
47
48    # Internal name...
49    util_name = 'baseprocessor'
50
51    # Items for this processor need an interface with zope.schema fields.
52    iface = Interface
53
54    # The name must be the same as the util_name attribute in order to
55    # register this utility correctly.
56    grok.name(util_name)
57
58    # Headers needed to locate items...
59    location_fields = ['code', 'faculty_code']
60
61    # A factory with this name must be registered...
62    factory_name = 'waeup.Department'
63
64    @property
65    def required_fields(self):
66        """Required fields that have no default.
67
68        A list of names of field, whose value cannot be set if not
69        given during creation. Therefore these fields must exist in
70        input.
71
72        Fields with a default != missing_value do not belong to this
73        category.
74        """
75        result = []
76        for key, field in getFields(self.iface).items():
77            if key in self.location_fields:
78                continue
79            if field.default is not field.missing_value:
80                continue
81            if field.required:
82                result.append(key)
83        return result
84
85    @property
86    def req(self):
87        result = dict(
88            create = self.location_fields + self.required_fields,
89            update = self.location_fields,
90            remove = self.location_fields,
91        )
92        return result
93
94    @property
95    def available_fields(self):
96        return sorted(list(set(
97                    self.location_fields + getFields(self.iface).keys())))
98
99    def getHeaders(self, mode='create'):
100        return self.available_fields
101
102    def checkHeaders(self, headerfields, mode='create'):
103        req = self.req[mode]
104        # Check for required fields...
105        for field in req:
106            if not field in headerfields:
107                raise FatalCSVError(
108                    "Need at least columns %s for import!" %
109                    ', '.join(["'%s'" % x for x in req]))
110        # Check for double fields. Cannot happen because this error is
111        # already catched in views
112        not_ignored_fields = [x for x in headerfields
113                              if not x.startswith('--')]
114        if len(set(not_ignored_fields)) < len(not_ignored_fields):
115            raise FatalCSVError(
116                "Double headers: each column name may only appear once.")
117        return True
118
119    def applyMapping(self, row, mapping):
120        """Apply mapping to a row of CSV data.
121
122        """
123        result = dict()
124        for key, replacement in mapping.items():
125            if replacement == u'--IGNORE--':
126                # Skip ignored columns in failed and finished data files.
127                continue
128            result[replacement] = row[key]
129        return result
130
131    def getMapping(self, path, headerfields, mode):
132        """Get a mapping from CSV file headerfields to actually used fieldnames.
133
134        """
135        result = dict()
136        reader = csv.reader(open(path, 'rb'))
137        raw_header = reader.next()
138        for num, field in enumerate(headerfields):
139            if field not in self.location_fields and mode == 'remove':
140                # Skip non-location fields when removing.
141                continue
142            if field == u'--IGNORE--':
143                # Skip ignored columns in failed and finished data files.
144                continue
145            result[raw_header[num]] = field
146        return result
147
148    def stringFromErrs(self, errors, inv_errors):
149        result = []
150        for err in errors:
151            fieldname, message = err
152            result.append("%s: %s" % (fieldname, message))
153        for err in inv_errors:
154            result.append("invariant: %s" % err)
155        return '; '.join(result)
156
157    def callFactory(self, *args, **kw):
158        return createObject(self.factory_name)
159
160    def parentsExist(self, row, site):
161        """Tell whether the parent object for data in ``row`` exists.
162        """
163        raise NotImplementedError('method not implemented')
164
165    def entryExists(self, row, site):
166        """Tell whether there already exists an entry for ``row`` data.
167        """
168        raise NotImplementedError('method not implemented')
169
170    def getParent(self, row, site):
171        """Get the parent object for the entry in ``row``.
172        """
173        raise NotImplementedError('method not implemented')
174
175    def getEntry(self, row, site):
176        """Get the parent object for the entry in ``row``.
177        """
178        raise NotImplementedError('method not implemented')
179
180    def addEntry(self, obj, row, site):
181        """Add the entry given given by ``row`` data.
182        """
183        raise NotImplementedError('method not implemented')
184
185    def delEntry(self, row, site):
186        """Delete entry given by ``row`` data.
187        """
188        raise NotImplementedError('method not implemented')
189
190    def checkUpdateRequirements(self, obj, row, site):
191        """Checks requirements the object must fulfill when being updated.
192
193        This method is not used in case of deleting or adding objects.
194
195        Returns error messages as strings in case of requirement
196        problems.
197        """
198        return None
199
200    def updateEntry(self, obj, row, site):
201        """Update obj to the values given in row.
202
203        Returns a string describing the fields changed.
204        """
205        changed = []
206        for key, value in row.items():
207            # Skip fields to be ignored.
208            if value == IGNORE_MARKER:
209                continue
210            # Skip fields not declared in interface and which are
211            # not yet attributes of existing objects. We can thus not
212            # add non-existing attributes here.
213            if not hasattr(obj, key):
214                continue
215            setattr(obj, key, value)
216            log_value = getattr(value, 'code', value)
217            changed.append('%s=%s' % (key, log_value))
218        return ', '.join(changed)
219
220    def createLogfile(self, path, fail_path, num, warnings, mode, user,
221                      timedelta, logger=None):
222        """Write to log file.
223        """
224        if logger is None:
225            return
226        status = 'OK'
227        if warnings > 0:
228            status = 'FAILED'
229        logger.info("-" * 20)
230        logger.info("%s: Batch processing finished: %s" % (user, status))
231        logger.info("%s: Source: %s" % (user, path))
232        logger.info("%s: Mode: %s" % (user, mode))
233        logger.info("%s: User: %s" % (user, user))
234        if warnings > 0:
235            logger.info("%s: Failed datasets: %s" % (
236                    user, os.path.basename(fail_path)))
237        logger.info("%s: Processing time: %0.3f s (%0.4f s/item)" % (
238                user, timedelta, timedelta/(num or 1)))
239        logger.info("%s: Processed: %s lines (%s successful/ %s failed)" % (
240                user, num, num - warnings, warnings
241                ))
242        logger.info("-" * 20)
243        return
244
245    def writeFailedRow(self, writer, row, warnings):
246        """Write a row with error messages to error CSV.
247
248        If warnings is a list of strings, they will be concatenated.
249        """
250        error_col = warnings
251        if isinstance(warnings, list):
252            error_col = ' / '.join(warnings)
253        row['--ERRORS--'] = error_col
254        writer.writerow(row)
255        return
256
257    def checkConversion(self, row, mode='ignore', ignore_empty=True):
258        """Validates all values in row.
259        """
260        converter = IObjectConverter(self.iface)
261        errs, inv_errs, conv_dict =  converter.fromStringDict(
262            row, self.factory_name, mode=mode)
263        return errs, inv_errs, conv_dict
264
265    def doImport(self, path, headerfields, mode='create', user='Unknown',
266                 logger=None, ignore_empty=True):
267        """Perform actual import.
268        """
269        time_start = time.time()
270        self.checkHeaders(headerfields, mode)
271        mapping = self.getMapping(path, headerfields, mode)
272        reader = csv.DictReader(open(path, 'rb'))
273
274        temp_dir = tempfile.mkdtemp()
275
276        base = os.path.basename(path)
277        (base, ext) = os.path.splitext(base)
278        failed_path = os.path.join(temp_dir, "%s.pending%s" % (base, ext))
279        failed_headers = mapping.values()
280        failed_headers.append('--ERRORS--')
281        failed_writer = csv.DictWriter(open(failed_path, 'wb'),
282                                       failed_headers)
283        failed_writer.writerow(dict([(x,x) for x in failed_headers]))
284
285        finished_path = os.path.join(temp_dir, "%s.finished%s" % (base, ext))
286        finished_headers = mapping.values()
287        finished_writer = csv.DictWriter(open(finished_path, 'wb'),
288                                         finished_headers)
289        finished_writer.writerow(dict([(x,x) for x in finished_headers]))
290
291        num =0
292        num_warns = 0
293        site = grok.getSite()
294
295        for raw_row in reader:
296            num += 1
297            string_row = self.applyMapping(raw_row, mapping)
298            if ignore_empty and mode in ('update',):
299                # replace empty strings with ignore-markers
300                for key, val in string_row.items():
301                    if val == '':
302                        string_row[key] = IGNORE_MARKER
303            row = dict(string_row.items()) # create deep copy
304            errs, inv_errs, conv_dict = self.checkConversion(string_row, mode)
305            if errs or inv_errs:
306                num_warns += 1
307                conv_warnings = self.stringFromErrs(errs, inv_errs)
308                self.writeFailedRow(
309                    failed_writer, string_row, conv_warnings)
310                continue
311            row.update(conv_dict)
312
313            if mode == 'create':
314                if not self.parentsExist(row, site):
315                    num_warns += 1
316                    self.writeFailedRow(
317                        failed_writer, string_row,
318                        "Not all parents do exist yet. Skipping")
319                    continue
320                if self.entryExists(row, site):
321                    num_warns += 1
322                    self.writeFailedRow(
323                        failed_writer, string_row,
324                        "This object already exists in the same container. "
325                        "Skipping.")
326                    continue
327                obj = self.callFactory()
328                # Override all values in row, also
329                # student_ids and applicant_ids which have been
330                # generated in the respective __init__ methods before.
331                self.updateEntry(obj, row, site)
332                try:
333                    self.addEntry(obj, row, site)
334                except KeyError, error:
335                    num_warns += 1
336                    self.writeFailedRow(
337                        failed_writer, string_row,
338                        "%s Skipping." % error.message)
339                    continue
340            elif mode == 'remove':
341                if not self.entryExists(row, site):
342                    num_warns += 1
343                    self.writeFailedRow(
344                        failed_writer, string_row,
345                        "Cannot remove: no such entry.")
346                    continue
347                self.delEntry(row, site)
348            elif mode == 'update':
349                obj = self.getEntry(row, site)
350                if obj is None:
351                    num_warns += 1
352                    self.writeFailedRow(
353                        failed_writer, string_row,
354                        "Cannot update: no such entry.")
355                    continue
356                update_errors = self.checkUpdateRequirements(obj, row, site)
357                if update_errors is not None:
358                    num_warns += 1
359                    self.writeFailedRow(
360                        failed_writer, string_row, update_errors)
361                    continue
362                self.updateEntry(obj, row, site)
363            finished_writer.writerow(string_row)
364
365        time_end = time.time()
366        timedelta = time_end - time_start
367
368        self.createLogfile(path, failed_path, num, num_warns, mode, user,
369                           timedelta, logger=logger)
370        failed_path = os.path.abspath(failed_path)
371        if num_warns == 0:
372            del failed_writer
373            os.unlink(failed_path)
374            failed_path = None
375        return (num, num_warns,
376                os.path.abspath(finished_path), failed_path)
377
378class ExporterBase(object):
379    """A base for exporters.
380    """
381    grok.implements(ICSVExporter)
382
383    #: Fieldnames considered by this exporter
384    fields = ('code', 'title', 'title_prefix')
385
386    #: The title under which this exporter will be displayed
387    #: (if registered as a utility)
388    title = 'Override this title'
389
390    def mangle_value(self, value, name, context=None):
391        """Hook for mangling values in derived classes
392        """
393        if isinstance(value, bool):
394            value = value and '1' or '0'
395        elif isinstance(value, unicode):
396            # CSV writers like byte streams better than unicode
397            value = value.encode('utf-8')
398        elif value is None:
399            # None is not really representable in CSV files
400            value = ''
401        return value
402
403    def get_csv_writer(self, filepath=None):
404        """Get a CSV dict writer instance open for writing.
405
406        Returns a tuple (<writer>, <outfile>) where ``<writer>`` is a
407        :class:`csv.DictWriter` instance and outfile is the real file
408        which is written to. The latter is important when writing to
409        StringIO and can normally be ignored otherwise.
410
411        The returned file will already be filled with the header row.
412
413        Please note that if you give a filepath, the returned outfile
414        is open for writing only and you might have to close it before
415        reopening it for reading.
416        """
417        if filepath is None:
418            outfile = StringIO()
419        else:
420            outfile = open(filepath, 'wb')
421        writer = csv.DictWriter(outfile, self.fields)
422        writer.writerow(dict(zip(self.fields, self.fields))) # header
423        return writer, outfile
424
425    def write_item(self, obj, writer):
426        """Write a row extracted from `obj` into CSV file using `writer`.
427        """
428        row = {}
429        for name in self.fields:
430            value = getattr(obj, name, None)
431            value = self.mangle_value(value, name, obj)
432            row[name] = value
433        writer.writerow(row)
434        return
435
436    def close_outfile(self, filepath, outfile):
437        """Close outfile.
438
439        If filepath is None, the contents of outfile is returned.
440        """
441        outfile.seek(0)
442        if filepath is None:
443            return outfile.read()
444        outfile.close()
445        return
446
447    def export(self, iterable, filepath=None):
448        """Export `iterable` as CSV file.
449
450        If `filepath` is ``None``, a raw string with CSV data should
451        be returned.
452        """
453        raise NotImplementedError
454
455    def export_all(self, site, filepath=None):
456        """Export all appropriate objects in `site` into `filepath` as
457        CSV data.
458
459        If `filepath` is ``None``, a raw string with CSV data should
460        be returned.
461        """
462        raise NotImplementedError
Note: See TracBrowser for help on using the repository browser.