source: main/waeup.kofa/trunk/src/waeup/kofa/utils/batching.py @ 8202

Last change on this file since 8202 was 8202, checked in by Henrik Bettermann, 13 years ago

Empty If no value is provided in import files, attributes must not be cleared. Clear attribute only if value == DELETIONMARKER.

  • Property svn:keywords set to Id
File size: 15.8 KB
Line 
1## $Id: batching.py 8202 2012-04-18 05:12:32Z henrik $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
18"""Kofa components for batch processing.
19
20Batch processors eat CSV files to add, update or remove large numbers
21of certain kinds of objects at once.
22"""
23import grok
24import csv
25import os
26import tempfile
27import time
28from cStringIO import StringIO
29from zope.component import createObject
30from zope.interface import Interface
31from zope.schema import getFields
32from waeup.kofa.interfaces import (
33    IBatchProcessor, FatalCSVError, IObjectConverter,
34    ICSVExporter, DELETIONMARKER)
35
36class BatchProcessor(grok.GlobalUtility):
37    """A processor to add, update, or remove data.
38
39    This is a non-active baseclass.
40    """
41    grok.provides(IBatchProcessor)
42    grok.context(Interface)
43    grok.baseclass()
44
45    # Name used in pages and forms...
46    name = u'Non-registered base processor'
47
48    # Internal name...
49    util_name = 'baseprocessor'
50
51    # Items for this processor need an interface with zope.schema fields.
52    iface = Interface
53
54    # The name must be the same as the util_name attribute in order to
55    # register this utility correctly.
56    grok.name(util_name)
57
58    # Headers needed to locate items...
59    location_fields = ['code', 'faculty_code']
60
61    # A factory with this name must be registered...
62    factory_name = 'waeup.Department'
63
64    @property
65    def required_fields(self):
66        """Required fields that have no default.
67
68        A list of names of field, whose value cannot be set if not
69        given during creation. Therefore these fields must exist in
70        input.
71
72        Fields with a default != missing_value do not belong to this
73        category.
74        """
75        result = []
76        for key, field in getFields(self.iface).items():
77            if key in self.location_fields:
78                continue
79            if field.default is not field.missing_value:
80                continue
81            if field.required:
82                result.append(key)
83        return result
84
85    @property
86    def req(self):
87        result = dict(
88            create = self.location_fields + self.required_fields,
89            update = self.location_fields,
90            remove = self.location_fields,
91        )
92        return result
93
94    @property
95    def available_fields(self):
96        return sorted(list(set(
97                    self.location_fields + getFields(self.iface).keys())))
98
99    def getHeaders(self, mode='create'):
100        return self.available_fields
101
102    def checkHeaders(self, headerfields, mode='create'):
103        req = self.req[mode]
104        # Check for required fields...
105        for field in req:
106            if not field in headerfields:
107                raise FatalCSVError(
108                    "Need at least columns %s for import!" %
109                    ', '.join(["'%s'" % x for x in req]))
110        # Check for double fields. Cannot happen because this error is
111        # already catched in views
112        not_ignored_fields = [x for x in headerfields
113                              if not x.startswith('--')]
114        if len(set(not_ignored_fields)) < len(not_ignored_fields):
115            raise FatalCSVError(
116                "Double headers: each column name may only appear once.")
117        return True
118
119    def applyMapping(self, row, mapping):
120        """Apply mapping to a row of CSV data.
121
122        """
123        result = dict()
124        for key, replacement in mapping.items():
125            if replacement == u'--IGNORE--':
126                # Skip ignored columns in failed and finished data files.
127                continue
128            result[replacement] = row[key]
129        return result
130
131    def getMapping(self, path, headerfields, mode):
132        """Get a mapping from CSV file headerfields to actually used fieldnames.
133
134        """
135        result = dict()
136        reader = csv.reader(open(path, 'rb'))
137        raw_header = reader.next()
138        for num, field in enumerate(headerfields):
139            if field not in self.location_fields and mode == 'remove':
140                # Skip non-location fields when removing.
141                continue
142            if field == u'--IGNORE--':
143                # Skip ignored columns in failed and finished data files.
144                continue
145            result[raw_header[num]] = field
146        return result
147
148    def stringFromErrs(self, errors, inv_errors):
149        result = []
150        for err in errors:
151            fieldname, message = err
152            result.append("%s: %s" % (fieldname, message))
153        for err in inv_errors:
154            result.append("invariant: %s" % err)
155        return '; '.join(result)
156
157    def callFactory(self, *args, **kw):
158        return createObject(self.factory_name)
159
160    def parentsExist(self, row, site):
161        """Tell whether the parent object for data in ``row`` exists.
162        """
163        raise NotImplementedError('method not implemented')
164
165    def entryExists(self, row, site):
166        """Tell whether there already exists an entry for ``row`` data.
167        """
168        raise NotImplementedError('method not implemented')
169
170    def getParent(self, row, site):
171        """Get the parent object for the entry in ``row``.
172        """
173        raise NotImplementedError('method not implemented')
174
175    def getEntry(self, row, site):
176        """Get the parent object for the entry in ``row``.
177        """
178        raise NotImplementedError('method not implemented')
179
180    def addEntry(self, obj, row, site):
181        """Add the entry given given by ``row`` data.
182        """
183        raise NotImplementedError('method not implemented')
184
185    def delEntry(self, row, site):
186        """Delete entry given by ``row`` data.
187        """
188        raise NotImplementedError('method not implemented')
189
190    def checkUpdateRequirements(self, obj, row, site):
191        """Checks requirements the object must fulfill when being updated.
192
193        This method is not used in case of deleting or adding objects.
194
195        Returns error messages as strings in case of requirement
196        problems.
197        """
198        return None
199
200    def updateEntry(self, obj, row, site):
201        """Update obj to the values given in row.
202        """
203        for key, value in row.items():
204            # Skip fields not declared in interface.
205            if hasattr(obj, key):
206                # Set attribute to None if value is marked for deletion
207                if value == DELETIONMARKER:
208                    setattr(obj, key, None)
209                elif value is not None:
210                    setattr(obj, key, value)
211        return
212
213    def createLogfile(self, path, fail_path, num, warnings, mode, user,
214                      timedelta, logger=None):
215        """Write to log file.
216        """
217        if logger is None:
218            return
219        status = 'OK'
220        if warnings > 0:
221            status = 'FAILED'
222        logger.info("-" * 20)
223        logger.info("%s: Batch processing finished: %s" % (user, status))
224        logger.info("%s: Source: %s" % (user, path))
225        logger.info("%s: Mode: %s" % (user, mode))
226        logger.info("%s: User: %s" % (user, user))
227        if warnings > 0:
228            logger.info("%s: Failed datasets: %s" % (
229                    user, os.path.basename(fail_path)))
230        logger.info("%s: Processing time: %0.3f s (%0.4f s/item)" % (
231                user, timedelta, timedelta/(num or 1)))
232        logger.info("%s: Processed: %s lines (%s successful/ %s failed)" % (
233                user, num, num - warnings, warnings
234                ))
235        logger.info("-" * 20)
236        return
237
238    def writeFailedRow(self, writer, row, warnings):
239        """Write a row with error messages to error CSV.
240
241        If warnings is a list of strings, they will be concatenated.
242        """
243        error_col = warnings
244        if isinstance(warnings, list):
245            error_col = ' / '.join(warnings)
246        row['--ERRORS--'] = error_col
247        writer.writerow(row)
248        return
249
250    def checkConversion(self, row, mode='ignore'):
251        """Validates all values in row.
252        """
253        converter = IObjectConverter(self.iface)
254        errs, inv_errs, conv_dict =  converter.fromStringDict(
255            row, self.factory_name)
256        return errs, inv_errs, conv_dict
257
258    def doImport(self, path, headerfields, mode='create', user='Unknown',
259                 logger=None):
260        """Perform actual import.
261        """
262        time_start = time.time()
263        self.checkHeaders(headerfields, mode)
264        mapping = self.getMapping(path, headerfields, mode)
265        reader = csv.DictReader(open(path, 'rb'))
266
267        temp_dir = tempfile.mkdtemp()
268
269        base = os.path.basename(path)
270        (base, ext) = os.path.splitext(base)
271        failed_path = os.path.join(temp_dir, "%s.pending%s" % (base, ext))
272        failed_headers = mapping.values()
273        failed_headers.append('--ERRORS--')
274        failed_writer = csv.DictWriter(open(failed_path, 'wb'),
275                                       failed_headers)
276        failed_writer.writerow(dict([(x,x) for x in failed_headers]))
277
278        finished_path = os.path.join(temp_dir, "%s.finished%s" % (base, ext))
279        finished_headers = mapping.values()
280        finished_writer = csv.DictWriter(open(finished_path, 'wb'),
281                                         finished_headers)
282        finished_writer.writerow(dict([(x,x) for x in finished_headers]))
283
284        num =0
285        num_warns = 0
286        site = grok.getSite()
287
288        for raw_row in reader:
289            num += 1
290            string_row = self.applyMapping(raw_row, mapping)
291            row = dict(string_row.items()) # create deep copy
292            errs, inv_errs, conv_dict = self.checkConversion(string_row, mode)
293            if errs or inv_errs:
294                num_warns += 1
295                conv_warnings = self.stringFromErrs(errs, inv_errs)
296                self.writeFailedRow(
297                    failed_writer, string_row, conv_warnings)
298                continue
299            row.update(conv_dict)
300
301            if mode == 'create':
302                if not self.parentsExist(row, site):
303                    num_warns += 1
304                    self.writeFailedRow(
305                        failed_writer, string_row,
306                        "Not all parents do exist yet. Skipping")
307                    continue
308                if self.entryExists(row, site):
309                    num_warns += 1
310                    self.writeFailedRow(
311                        failed_writer, string_row,
312                        "This object already exists in the same container. Skipping.")
313                    continue
314                obj = self.callFactory()
315                # Override all values in row, also
316                # student_ids and applicant_ids which have been
317                # generated in the respective __init__ methods before.
318                self.updateEntry(obj, row, site)
319                try:
320                    self.addEntry(obj, row, site)
321                except KeyError, error:
322                    num_warns += 1
323                    self.writeFailedRow(
324                        failed_writer, string_row,
325                        "%s Skipping." % error.message)
326                    continue
327            elif mode == 'remove':
328                if not self.entryExists(row, site):
329                    num_warns += 1
330                    self.writeFailedRow(
331                        failed_writer, string_row,
332                        "Cannot remove: no such entry.")
333                    continue
334                self.delEntry(row, site)
335            elif mode == 'update':
336                obj = self.getEntry(row, site)
337                if obj is None:
338                    num_warns += 1
339                    self.writeFailedRow(
340                        failed_writer, string_row,
341                        "Cannot update: no such entry.")
342                    continue
343                update_errors = self.checkUpdateRequirements(obj, row, site)
344                if update_errors is not None:
345                    num_warns += 1
346                    self.writeFailedRow(
347                        failed_writer, string_row, update_errors)
348                    continue
349                self.updateEntry(obj, row, site)
350            finished_writer.writerow(string_row)
351
352        time_end = time.time()
353        timedelta = time_end - time_start
354
355        self.createLogfile(path, failed_path, num, num_warns, mode, user,
356                           timedelta, logger=logger)
357        failed_path = os.path.abspath(failed_path)
358        if num_warns == 0:
359            del failed_writer
360            os.unlink(failed_path)
361            failed_path = None
362        return (num, num_warns,
363                os.path.abspath(finished_path), failed_path)
364
365class ExporterBase(object):
366    """A base for exporters.
367    """
368    grok.implements(ICSVExporter)
369
370    #: Fieldnames considered by this exporter
371    fields = ('code', 'title', 'title_prefix')
372
373    #: The title under which this exporter will be displayed
374    #: (if registered as a utility)
375    title = 'Override this title'
376
377    def mangle_value(self, value, name, context=None):
378        """Hook for mangling values in derived classes
379        """
380        if isinstance(value, bool):
381            value = value and '1' or '0'
382        elif isinstance(value, unicode):
383            # CSV writers like byte streams better than unicode
384            value = value.encode('utf-8')
385        elif value is None:
386            # None is not really representable in CSV files
387            value = ''
388        return value
389
390    def get_csv_writer(self, filepath=None):
391        """Get a CSV dict writer instance open for writing.
392
393        Returns a tuple (<writer>, <outfile>) where ``<writer>`` is a
394        :class:`csv.DictWriter` instance and outfile is the real file
395        which is written to. The latter is important when writing to
396        StringIO and can normally be ignored otherwise.
397
398        The returned file will already be filled with the header row.
399
400        Please note that if you give a filepath, the returned outfile
401        is open for writing only and you might have to close it before
402        reopening it for reading.
403        """
404        if filepath is None:
405            outfile = StringIO()
406        else:
407            outfile = open(filepath, 'wb')
408        writer = csv.DictWriter(outfile, self.fields)
409        writer.writerow(dict(zip(self.fields, self.fields))) # header
410        return writer, outfile
411
412    def write_item(self, obj, writer):
413        """Write a row extracted from `obj` into CSV file using `writer`.
414        """
415        row = {}
416        for name in self.fields:
417            value = getattr(obj, name, None)
418            value = self.mangle_value(value, name, obj)
419            row[name] = value
420        writer.writerow(row)
421        return
422
423    def close_outfile(self, filepath, outfile):
424        """Close outfile.
425
426        If filepath is None, the contents of outfile is returned.
427        """
428        outfile.seek(0)
429        if filepath is None:
430            return outfile.read()
431        outfile.close()
432        return
433
434    def export(self, iterable, filepath=None):
435        """Export `iterable` as CSV file.
436
437        If `filepath` is ``None``, a raw string with CSV data should
438        be returned.
439        """
440        raise NotImplementedError
441
442    def export_all(self, site, filepath=None):
443        """Export all appropriate objects in `site` into `filepath` as
444        CSV data.
445
446        If `filepath` is ``None``, a raw string with CSV data should
447        be returned.
448        """
449        raise NotImplementedError
Note: See TracBrowser for help on using the repository browser.