source: main/waeup.kofa/trunk/src/waeup/kofa/utils/batching.py @ 7904

Last change on this file since 7904 was 7860, checked in by uli, 13 years ago

pyflakes.

  • Property svn:keywords set to Id
File size: 14.9 KB
RevLine 
[7196]1## $Id: batching.py 7860 2012-03-13 01:19:23Z uli $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
[7819]18"""Kofa components for batch processing.
[4806]19
20Batch processors eat CSV files to add, update or remove large numbers
21of certain kinds of objects at once.
22"""
23import grok
24import csv
[4821]25import os
[4900]26import tempfile
[4821]27import time
[7859]28from cStringIO import StringIO
[4806]29from zope.component import createObject
30from zope.interface import Interface
31from zope.schema import getFields
[7811]32from waeup.kofa.interfaces import (
[7860]33    IBatchProcessor, FatalCSVError, IObjectConverter,
[7859]34    ICSVExporter)
[4806]35
36class BatchProcessor(grok.GlobalUtility):
37    """A processor to add, update, or remove data.
38
39    This is a non-active baseclass.
40    """
[4831]41    grok.provides(IBatchProcessor)
[4806]42    grok.context(Interface)
43    grok.baseclass()
44
45    # Name used in pages and forms...
[5009]46    name = u'Non-registered base importer'
[6259]47
[4806]48    # Internal name...
[5009]49    util_name = 'baseimporter'
[6259]50
[4806]51    # Items for this processor need an interface with zope.schema fields.
[5009]52    iface = Interface
[6259]53
[4806]54    # The name must be the same as the util_name attribute in order to
55    # register this utility correctly.
56    grok.name(util_name)
57
58    # Headers needed to locate items...
59    location_fields = ['code', 'faculty_code']
[6259]60
[4806]61    # A factory with this name must be registered...
62    factory_name = 'waeup.Department'
63
64    @property
65    def required_fields(self):
[4829]66        """Required fields that have no default.
67
68        A list of names of field, whose value cannot be set if not
69        given during creation. Therefore these fields must exist in
70        input.
71
72        Fields with a default != missing_value do not belong to this
73        category.
74        """
[4806]75        result = []
76        for key, field in getFields(self.iface).items():
77            if key in self.location_fields:
78                continue
[4829]79            if field.default is not field.missing_value:
80                continue
[4806]81            if field.required:
82                result.append(key)
83        return result
[6259]84
[4806]85    @property
86    def req(self):
87        result = dict(
88            create = self.location_fields + self.required_fields,
89            update = self.location_fields,
90            remove = self.location_fields,
91        )
92        return result
93
94    @property
95    def available_fields(self):
96        return sorted(list(set(
97                    self.location_fields + getFields(self.iface).keys())))
[6259]98
[4806]99    def getHeaders(self, mode='create'):
100        return self.available_fields
101
102    def checkHeaders(self, headerfields, mode='create'):
103        req = self.req[mode]
104        # Check for required fields...
105        for field in req:
106            if not field in headerfields:
107                raise FatalCSVError(
108                    "Need at least columns %s for import!" %
109                    ', '.join(["'%s'" % x for x in req]))
[6828]110        # Check for double fields. Cannot happen because this error is
111        # already catched in views
[4806]112        not_ignored_fields = [x for x in headerfields
113                              if not x.startswith('--')]
114        if len(set(not_ignored_fields)) < len(not_ignored_fields):
115            raise FatalCSVError(
116                "Double headers: each column name may only appear once.")
117        return True
118
119    def applyMapping(self, row, mapping):
[4811]120        """Apply mapping to a row of CSV data.
[6824]121
[4811]122        """
[4806]123        result = dict()
124        for key, replacement in mapping.items():
[6824]125            if replacement == u'--IGNORE--':
126                # Skip ignored columns in failed and finished data files.
127                continue
[4806]128            result[replacement] = row[key]
129        return result
[6259]130
[4832]131    def getMapping(self, path, headerfields, mode):
[6824]132        """Get a mapping from CSV file headerfields to actually used fieldnames.
133
[4811]134        """
[4832]135        result = dict()
[4806]136        reader = csv.reader(open(path, 'rb'))
137        raw_header = reader.next()
[4832]138        for num, field in enumerate(headerfields):
139            if field not in self.location_fields and mode == 'remove':
[6824]140                # Skip non-location fields when removing.
141                continue
142            if field == u'--IGNORE--':
143                # Skip ignored columns in failed and finished data files.
144                continue
[4832]145            result[raw_header[num]] = field
146        return result
[4806]147
[6273]148    def stringFromErrs(self, errors, inv_errors):
149        result = []
150        for err in errors:
151            fieldname, message = err
152            result.append("%s: %s" % (fieldname, message))
153        for err in inv_errors:
154            result.append("invariant: %s" % err)
155        return '; '.join(result)
156
[4806]157    def callFactory(self, *args, **kw):
158        return createObject(self.factory_name)
159
160    def parentsExist(self, row, site):
[4811]161        """Tell whether the parent object for data in ``row`` exists.
162        """
[4806]163        raise NotImplementedError('method not implemented')
164
165    def entryExists(self, row, site):
[4811]166        """Tell whether there already exists an entry for ``row`` data.
167        """
[4806]168        raise NotImplementedError('method not implemented')
169
170    def getParent(self, row, site):
[4811]171        """Get the parent object for the entry in ``row``.
172        """
[4806]173        raise NotImplementedError('method not implemented')
[6259]174
[5009]175    def getEntry(self, row, site):
176        """Get the parent object for the entry in ``row``.
177        """
178        raise NotImplementedError('method not implemented')
[6259]179
[4806]180    def addEntry(self, obj, row, site):
[4811]181        """Add the entry given given by ``row`` data.
182        """
[4806]183        raise NotImplementedError('method not implemented')
184
185    def delEntry(self, row, site):
[4811]186        """Delete entry given by ``row`` data.
187        """
[6259]188        raise NotImplementedError('method not implemented')
[4806]189
190    def updateEntry(self, obj, row, site):
[4984]191        """Update obj to the values given in row.
192        """
[4829]193        for key, value in row.items():
[6847]194            # Skip fields not declared in interface.
[6833]195            if hasattr(obj, key):
196                setattr(obj, key, value)
[4829]197        return
[4821]198
[4832]199    def createLogfile(self, path, fail_path, num, warnings, mode, user,
[4885]200                      timedelta, logger=None):
201        """Write to log file.
[4821]202        """
[4885]203        if logger is None:
204            return
205        status = 'OK'
206        if warnings > 0:
207            status = 'FAILED'
208        logger.info("-" * 20)
209        logger.info("%s: Batch processing finished: %s" % (user, status))
210        logger.info("%s: Source: %s" % (user, path))
211        logger.info("%s: Mode: %s" % (user, mode))
212        logger.info("%s: User: %s" % (user, user))
213        if warnings > 0:
[4900]214            logger.info("%s: Failed datasets: %s" % (
215                    user, os.path.basename(fail_path)))
[4885]216        logger.info("%s: Processing time: %0.3f s (%0.4f s/item)" % (
217                user, timedelta, timedelta/(num or 1)))
218        logger.info("%s: Processed: %s lines (%s successful/ %s failed)" % (
219                user, num, num - warnings, warnings
[4821]220                ))
[4885]221        logger.info("-" * 20)
[4821]222        return
[4877]223
224    def writeFailedRow(self, writer, row, warnings):
225        """Write a row with error messages to error CSV.
226
227        If warnings is a list of strings, they will be concatenated.
228        """
229        error_col = warnings
230        if isinstance(warnings, list):
231            error_col = ' / '.join(warnings)
232        row['--ERRORS--'] = error_col
233        writer.writerow(row)
234        return
[6259]235
[6847]236    def checkConversion(self, row, mode='ignore'):
237        """Validates all values in row.
238        """
239        converter = IObjectConverter(self.iface)
240        errs, inv_errs, conv_dict =  converter.fromStringDict(
241            row, self.factory_name)
242        return errs, inv_errs, conv_dict
243
[4885]244    def doImport(self, path, headerfields, mode='create', user='Unknown',
245                 logger=None):
[4811]246        """Perform actual import.
247        """
[4832]248        time_start = time.time()
[4806]249        self.checkHeaders(headerfields, mode)
[4832]250        mapping = self.getMapping(path, headerfields, mode)
[4806]251        reader = csv.DictReader(open(path, 'rb'))
[4889]252
[4900]253        temp_dir = tempfile.mkdtemp()
[6259]254
[6273]255        base = os.path.basename(path)
256        (base, ext) = os.path.splitext(base)
[4900]257        failed_path = os.path.join(temp_dir, "%s.pending%s" % (base, ext))
[6831]258        failed_headers = mapping.values()
[4877]259        failed_headers.append('--ERRORS--')
[4821]260        failed_writer = csv.DictWriter(open(failed_path, 'wb'),
261                                       failed_headers)
[6831]262        failed_writer.writerow(dict([(x,x) for x in failed_headers]))
[4891]263
[4900]264        finished_path = os.path.join(temp_dir, "%s.finished%s" % (base, ext))
[6831]265        finished_headers = mapping.values()
[4891]266        finished_writer = csv.DictWriter(open(finished_path, 'wb'),
267                                         finished_headers)
268        finished_writer.writerow(dict([(x,x) for x in finished_headers]))
[6259]269
[4806]270        num =0
[4878]271        num_warns = 0
[4806]272        site = grok.getSite()
[7859]273
[4806]274        for raw_row in reader:
275            num += 1
276            string_row = self.applyMapping(raw_row, mapping)
[6273]277            row = dict(string_row.items()) # create deep copy
[6847]278            errs, inv_errs, conv_dict = self.checkConversion(string_row, mode)
[6273]279            if errs or inv_errs:
[4878]280                num_warns += 1
[6273]281                conv_warnings = self.stringFromErrs(errs, inv_errs)
282                self.writeFailedRow(
[6824]283                    failed_writer, string_row, conv_warnings)
[4821]284                continue
[6273]285            row.update(conv_dict)
[6259]286
[4806]287            if mode == 'create':
288                if not self.parentsExist(row, site):
[4878]289                    num_warns += 1
[4877]290                    self.writeFailedRow(
[6824]291                        failed_writer, string_row,
[4877]292                        "Not all parents do exist yet. Skipping")
[4806]293                    continue
294                if self.entryExists(row, site):
[4878]295                    num_warns += 1
[4877]296                    self.writeFailedRow(
[6824]297                        failed_writer, string_row,
[6219]298                        "This object already exists in the same container. Skipping.")
[4806]299                    continue
300                obj = self.callFactory()
[7273]301                # Override all values in row, also
302                # student_ids and applicant_ids which have been
303                # generated in the respective __init__ methods before.
[7649]304                self.updateEntry(obj, row, site)
[6243]305                try:
306                    self.addEntry(obj, row, site)
[6273]307                except KeyError, error:
[6219]308                    num_warns += 1
309                    self.writeFailedRow(
[6824]310                        failed_writer, string_row,
[6273]311                        "%s Skipping." % error.message)
[6219]312                    continue
[4806]313            elif mode == 'remove':
314                if not self.entryExists(row, site):
[4878]315                    num_warns += 1
[4877]316                    self.writeFailedRow(
[6824]317                        failed_writer, string_row,
[4877]318                        "Cannot remove: no such entry.")
[4806]319                    continue
320                self.delEntry(row, site)
321            elif mode == 'update':
322                obj = self.getEntry(row, site)
323                if obj is None:
[4878]324                    num_warns += 1
[4877]325                    self.writeFailedRow(
[6824]326                        failed_writer, string_row,
[4877]327                        "Cannot update: no such entry.")
[4806]328                    continue
329                self.updateEntry(obj, row, site)
[4891]330            finished_writer.writerow(string_row)
[4821]331
[4832]332        time_end = time.time()
333        timedelta = time_end - time_start
[6259]334
[4878]335        self.createLogfile(path, failed_path, num, num_warns, mode, user,
[4885]336                           timedelta, logger=logger)
[4894]337        failed_path = os.path.abspath(failed_path)
[4878]338        if num_warns == 0:
[4821]339            del failed_writer
340            os.unlink(failed_path)
[4894]341            failed_path = None
342        return (num, num_warns,
343                os.path.abspath(finished_path), failed_path)
[7859]344
345class ExporterBase(object):
346    """A base for exporters.
347    """
348    grok.implements(ICSVExporter)
349
350    #: Fieldnames considered by this exporter
351    fields = ('code', 'title', 'title_prefix')
352
353    def mangle_value(self, value, name, context=None):
354        """Hook for mangling values in derived classes
355        """
356        if isinstance(value, bool):
357            value = value and '1' or '0'
358        elif isinstance(value, unicode):
359            # CSV writers like byte streams better than unicode
360            value = value.encode('utf-8')
361        elif value is None:
362            # None is not really representable in CSV files
363            value = ''
364        return value
365
366    def get_csv_writer(self, filepath=None):
367        """Get a CSV dict writer instance open for writing.
368
369        Returns a tuple (<writer>, <outfile>) where ``<writer>`` is a
370        :class:`csv.DictWriter` instance and outfile is the real file
371        which is written to. The latter is important when writing to
372        StringIO and can normally be ignored otherwise.
373
374        The returned file will already be filled with the header row.
375
376        Please note that if you give a filepath, the returned outfile
377        is open for writing only and you might have to close it before
378        reopening it for reading.
379        """
380        if filepath is None:
381            outfile = StringIO()
382        else:
383            outfile = open(filepath, 'wb')
384        writer = csv.DictWriter(outfile, self.fields)
385        writer.writerow(dict(zip(self.fields, self.fields))) # header
386        return writer, outfile
387
388    def write_item(self, obj, writer):
389        """Write a row extracted from `obj` into CSV file using `writer`.
390        """
391        row = {}
392        for name in self.fields:
393            value = getattr(obj, name, None)
394            value = self.mangle_value(value, name, obj)
395            row[name] = value
396        writer.writerow(row)
397        return
398
399    def close_outfile(self, filepath, outfile):
400        """Close outfile.
401
402        If filepath is None, the contents of outfile is returned.
403        """
404        outfile.seek(0)
405        if filepath is None:
406            return outfile.read()
407        outfile.close()
408        return
409
410    def export(self, iterable, filepath=None):
411        """Export `iterable` as CSV file.
412
413        If `filepath` is ``None``, a raw string with CSV data should
414        be returned.
415        """
416        raise NotImplementedError
417
418    def export_all(self, site, filepath=None):
419        """Export all appropriate objects in `site` into `filepath` as
420        CSV data.
421
422        If `filepath` is ``None``, a raw string with CSV data should
423        be returned.
424        """
425        raise NotImplementedError
Note: See TracBrowser for help on using the repository browser.