source: main/waeup.kofa/trunk/src/waeup/kofa/utils/batching.py @ 8219

Last change on this file since 8219 was 8214, checked in by uli, 13 years ago

Lots of trash to sketch filtered imports.

  • Property svn:keywords set to Id
File size: 15.9 KB
Line 
1## $Id: batching.py 8214 2012-04-19 12:43:37Z uli $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
18"""Kofa components for batch processing.
19
20Batch processors eat CSV files to add, update or remove large numbers
21of certain kinds of objects at once.
22"""
23import grok
24import csv
25import os
26import tempfile
27import time
28from cStringIO import StringIO
29from zope.component import createObject
30from zope.interface import Interface
31from zope.schema import getFields
32from waeup.kofa.interfaces import (
33    IBatchProcessor, FatalCSVError, IObjectConverter,
34    ICSVExporter, DELETION_MARKER, IGNORE_MARKER)
35
36class BatchProcessor(grok.GlobalUtility):
37    """A processor to add, update, or remove data.
38
39    This is a non-active baseclass.
40    """
41    grok.provides(IBatchProcessor)
42    grok.context(Interface)
43    grok.baseclass()
44
45    # Name used in pages and forms...
46    name = u'Non-registered base processor'
47
48    # Internal name...
49    util_name = 'baseprocessor'
50
51    # Items for this processor need an interface with zope.schema fields.
52    iface = Interface
53
54    # The name must be the same as the util_name attribute in order to
55    # register this utility correctly.
56    grok.name(util_name)
57
58    # Headers needed to locate items...
59    location_fields = ['code', 'faculty_code']
60
61    # A factory with this name must be registered...
62    factory_name = 'waeup.Department'
63
64    @property
65    def required_fields(self):
66        """Required fields that have no default.
67
68        A list of names of field, whose value cannot be set if not
69        given during creation. Therefore these fields must exist in
70        input.
71
72        Fields with a default != missing_value do not belong to this
73        category.
74        """
75        result = []
76        for key, field in getFields(self.iface).items():
77            if key in self.location_fields:
78                continue
79            if field.default is not field.missing_value:
80                continue
81            if field.required:
82                result.append(key)
83        return result
84
85    @property
86    def req(self):
87        result = dict(
88            create = self.location_fields + self.required_fields,
89            update = self.location_fields,
90            remove = self.location_fields,
91        )
92        return result
93
94    @property
95    def available_fields(self):
96        return sorted(list(set(
97                    self.location_fields + getFields(self.iface).keys())))
98
99    def getHeaders(self, mode='create'):
100        return self.available_fields
101
102    def checkHeaders(self, headerfields, mode='create'):
103        req = self.req[mode]
104        # Check for required fields...
105        for field in req:
106            if not field in headerfields:
107                raise FatalCSVError(
108                    "Need at least columns %s for import!" %
109                    ', '.join(["'%s'" % x for x in req]))
110        # Check for double fields. Cannot happen because this error is
111        # already catched in views
112        not_ignored_fields = [x for x in headerfields
113                              if not x.startswith('--')]
114        if len(set(not_ignored_fields)) < len(not_ignored_fields):
115            raise FatalCSVError(
116                "Double headers: each column name may only appear once.")
117        return True
118
119    def applyMapping(self, row, mapping):
120        """Apply mapping to a row of CSV data.
121
122        """
123        result = dict()
124        for key, replacement in mapping.items():
125            if replacement == u'--IGNORE--':
126                # Skip ignored columns in failed and finished data files.
127                continue
128            result[replacement] = row[key]
129        return result
130
131    def getMapping(self, path, headerfields, mode):
132        """Get a mapping from CSV file headerfields to actually used fieldnames.
133
134        """
135        result = dict()
136        reader = csv.reader(open(path, 'rb'))
137        raw_header = reader.next()
138        for num, field in enumerate(headerfields):
139            if field not in self.location_fields and mode == 'remove':
140                # Skip non-location fields when removing.
141                continue
142            if field == u'--IGNORE--':
143                # Skip ignored columns in failed and finished data files.
144                continue
145            result[raw_header[num]] = field
146        return result
147
148    def stringFromErrs(self, errors, inv_errors):
149        result = []
150        for err in errors:
151            fieldname, message = err
152            result.append("%s: %s" % (fieldname, message))
153        for err in inv_errors:
154            result.append("invariant: %s" % err)
155        return '; '.join(result)
156
157    def callFactory(self, *args, **kw):
158        return createObject(self.factory_name)
159
160    def parentsExist(self, row, site):
161        """Tell whether the parent object for data in ``row`` exists.
162        """
163        raise NotImplementedError('method not implemented')
164
165    def entryExists(self, row, site):
166        """Tell whether there already exists an entry for ``row`` data.
167        """
168        raise NotImplementedError('method not implemented')
169
170    def getParent(self, row, site):
171        """Get the parent object for the entry in ``row``.
172        """
173        raise NotImplementedError('method not implemented')
174
175    def getEntry(self, row, site):
176        """Get the parent object for the entry in ``row``.
177        """
178        raise NotImplementedError('method not implemented')
179
180    def addEntry(self, obj, row, site):
181        """Add the entry given given by ``row`` data.
182        """
183        raise NotImplementedError('method not implemented')
184
185    def delEntry(self, row, site):
186        """Delete entry given by ``row`` data.
187        """
188        raise NotImplementedError('method not implemented')
189
190    def checkUpdateRequirements(self, obj, row, site):
191        """Checks requirements the object must fulfill when being updated.
192
193        This method is not used in case of deleting or adding objects.
194
195        Returns error messages as strings in case of requirement
196        problems.
197        """
198        return None
199
200    def updateEntry(self, obj, row, site):
201        """Update obj to the values given in row.
202        """
203        for key, value in row.items():
204            # Skip fields not declared in interface.
205            if hasattr(obj, key):
206                # Set attribute to None if value is marked for deletion
207                if value != '<IGNORE>':
208                    setattr(obj, key, value)
209                #if value == DELETIONMARKER:
210                #    setattr(obj, key, None)
211                #elif value is not None:
212                #    setattr(obj, key, value)
213        return
214
215    def createLogfile(self, path, fail_path, num, warnings, mode, user,
216                      timedelta, logger=None):
217        """Write to log file.
218        """
219        if logger is None:
220            return
221        status = 'OK'
222        if warnings > 0:
223            status = 'FAILED'
224        logger.info("-" * 20)
225        logger.info("%s: Batch processing finished: %s" % (user, status))
226        logger.info("%s: Source: %s" % (user, path))
227        logger.info("%s: Mode: %s" % (user, mode))
228        logger.info("%s: User: %s" % (user, user))
229        if warnings > 0:
230            logger.info("%s: Failed datasets: %s" % (
231                    user, os.path.basename(fail_path)))
232        logger.info("%s: Processing time: %0.3f s (%0.4f s/item)" % (
233                user, timedelta, timedelta/(num or 1)))
234        logger.info("%s: Processed: %s lines (%s successful/ %s failed)" % (
235                user, num, num - warnings, warnings
236                ))
237        logger.info("-" * 20)
238        return
239
240    def writeFailedRow(self, writer, row, warnings):
241        """Write a row with error messages to error CSV.
242
243        If warnings is a list of strings, they will be concatenated.
244        """
245        error_col = warnings
246        if isinstance(warnings, list):
247            error_col = ' / '.join(warnings)
248        row['--ERRORS--'] = error_col
249        writer.writerow(row)
250        return
251
252    def checkConversion(self, row, mode='ignore'):
253        """Validates all values in row.
254        """
255        converter = IObjectConverter(self.iface)
256        errs, inv_errs, conv_dict =  converter.fromStringDict(
257            row, self.factory_name)
258        return errs, inv_errs, conv_dict
259
260    def doImport(self, path, headerfields, mode='create', user='Unknown',
261                 logger=None):
262        """Perform actual import.
263        """
264        time_start = time.time()
265        self.checkHeaders(headerfields, mode)
266        mapping = self.getMapping(path, headerfields, mode)
267        reader = csv.DictReader(open(path, 'rb'))
268
269        temp_dir = tempfile.mkdtemp()
270
271        base = os.path.basename(path)
272        (base, ext) = os.path.splitext(base)
273        failed_path = os.path.join(temp_dir, "%s.pending%s" % (base, ext))
274        failed_headers = mapping.values()
275        failed_headers.append('--ERRORS--')
276        failed_writer = csv.DictWriter(open(failed_path, 'wb'),
277                                       failed_headers)
278        failed_writer.writerow(dict([(x,x) for x in failed_headers]))
279
280        finished_path = os.path.join(temp_dir, "%s.finished%s" % (base, ext))
281        finished_headers = mapping.values()
282        finished_writer = csv.DictWriter(open(finished_path, 'wb'),
283                                         finished_headers)
284        finished_writer.writerow(dict([(x,x) for x in finished_headers]))
285
286        num =0
287        num_warns = 0
288        site = grok.getSite()
289
290        for raw_row in reader:
291            num += 1
292            string_row = self.applyMapping(raw_row, mapping)
293            row = dict(string_row.items()) # create deep copy
294            errs, inv_errs, conv_dict = self.checkConversion(string_row, mode)
295            if errs or inv_errs:
296                num_warns += 1
297                conv_warnings = self.stringFromErrs(errs, inv_errs)
298                self.writeFailedRow(
299                    failed_writer, string_row, conv_warnings)
300                continue
301            row.update(conv_dict)
302
303            if mode == 'create':
304                if not self.parentsExist(row, site):
305                    num_warns += 1
306                    self.writeFailedRow(
307                        failed_writer, string_row,
308                        "Not all parents do exist yet. Skipping")
309                    continue
310                if self.entryExists(row, site):
311                    num_warns += 1
312                    self.writeFailedRow(
313                        failed_writer, string_row,
314                        "This object already exists in the same container. Skipping.")
315                    continue
316                obj = self.callFactory()
317                # Override all values in row, also
318                # student_ids and applicant_ids which have been
319                # generated in the respective __init__ methods before.
320                self.updateEntry(obj, row, site)
321                try:
322                    self.addEntry(obj, row, site)
323                except KeyError, error:
324                    num_warns += 1
325                    self.writeFailedRow(
326                        failed_writer, string_row,
327                        "%s Skipping." % error.message)
328                    continue
329            elif mode == 'remove':
330                if not self.entryExists(row, site):
331                    num_warns += 1
332                    self.writeFailedRow(
333                        failed_writer, string_row,
334                        "Cannot remove: no such entry.")
335                    continue
336                self.delEntry(row, site)
337            elif mode == 'update':
338                obj = self.getEntry(row, site)
339                if obj is None:
340                    num_warns += 1
341                    self.writeFailedRow(
342                        failed_writer, string_row,
343                        "Cannot update: no such entry.")
344                    continue
345                update_errors = self.checkUpdateRequirements(obj, row, site)
346                if update_errors is not None:
347                    num_warns += 1
348                    self.writeFailedRow(
349                        failed_writer, string_row, update_errors)
350                    continue
351                self.updateEntry(obj, row, site)
352            finished_writer.writerow(string_row)
353
354        time_end = time.time()
355        timedelta = time_end - time_start
356
357        self.createLogfile(path, failed_path, num, num_warns, mode, user,
358                           timedelta, logger=logger)
359        failed_path = os.path.abspath(failed_path)
360        if num_warns == 0:
361            del failed_writer
362            os.unlink(failed_path)
363            failed_path = None
364        return (num, num_warns,
365                os.path.abspath(finished_path), failed_path)
366
367class ExporterBase(object):
368    """A base for exporters.
369    """
370    grok.implements(ICSVExporter)
371
372    #: Fieldnames considered by this exporter
373    fields = ('code', 'title', 'title_prefix')
374
375    #: The title under which this exporter will be displayed
376    #: (if registered as a utility)
377    title = 'Override this title'
378
379    def mangle_value(self, value, name, context=None):
380        """Hook for mangling values in derived classes
381        """
382        if isinstance(value, bool):
383            value = value and '1' or '0'
384        elif isinstance(value, unicode):
385            # CSV writers like byte streams better than unicode
386            value = value.encode('utf-8')
387        elif value is None:
388            # None is not really representable in CSV files
389            value = ''
390        return value
391
392    def get_csv_writer(self, filepath=None):
393        """Get a CSV dict writer instance open for writing.
394
395        Returns a tuple (<writer>, <outfile>) where ``<writer>`` is a
396        :class:`csv.DictWriter` instance and outfile is the real file
397        which is written to. The latter is important when writing to
398        StringIO and can normally be ignored otherwise.
399
400        The returned file will already be filled with the header row.
401
402        Please note that if you give a filepath, the returned outfile
403        is open for writing only and you might have to close it before
404        reopening it for reading.
405        """
406        if filepath is None:
407            outfile = StringIO()
408        else:
409            outfile = open(filepath, 'wb')
410        writer = csv.DictWriter(outfile, self.fields)
411        writer.writerow(dict(zip(self.fields, self.fields))) # header
412        return writer, outfile
413
414    def write_item(self, obj, writer):
415        """Write a row extracted from `obj` into CSV file using `writer`.
416        """
417        row = {}
418        for name in self.fields:
419            value = getattr(obj, name, None)
420            value = self.mangle_value(value, name, obj)
421            row[name] = value
422        writer.writerow(row)
423        return
424
425    def close_outfile(self, filepath, outfile):
426        """Close outfile.
427
428        If filepath is None, the contents of outfile is returned.
429        """
430        outfile.seek(0)
431        if filepath is None:
432            return outfile.read()
433        outfile.close()
434        return
435
436    def export(self, iterable, filepath=None):
437        """Export `iterable` as CSV file.
438
439        If `filepath` is ``None``, a raw string with CSV data should
440        be returned.
441        """
442        raise NotImplementedError
443
444    def export_all(self, site, filepath=None):
445        """Export all appropriate objects in `site` into `filepath` as
446        CSV data.
447
448        If `filepath` is ``None``, a raw string with CSV data should
449        be returned.
450        """
451        raise NotImplementedError
Note: See TracBrowser for help on using the repository browser.