source: main/waeup.kofa/trunk/src/waeup/kofa/utils/batching.py @ 8330

Last change on this file since 8330 was 8330, checked in by Henrik Bettermann, 12 years ago

When using catalogs existing objects must not necessarily be in the same container.

  • Property svn:keywords set to Id
File size: 16.4 KB
Line 
1## $Id: batching.py 8330 2012-05-03 07:25:34Z henrik $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
18"""Kofa components for batch processing.
19
20Batch processors eat CSV files to add, update or remove large numbers
21of certain kinds of objects at once.
22"""
23import grok
24import csv
25import os
26import tempfile
27import time
28from cStringIO import StringIO
29from zope.component import createObject
30from zope.interface import Interface
31from zope.schema import getFields
32from waeup.kofa.interfaces import (
33    IBatchProcessor, FatalCSVError, IObjectConverter,
34    ICSVExporter, IGNORE_MARKER)
35
36class BatchProcessor(grok.GlobalUtility):
37    """A processor to add, update, or remove data.
38
39    This is a non-active baseclass.
40    """
41    grok.implements(IBatchProcessor)
42    grok.context(Interface)
43    grok.baseclass()
44
45    # Name used in pages and forms...
46    name = u'Non-registered base processor'
47
48    # Internal name...
49    util_name = 'baseprocessor'
50
51    # Items for this processor need an interface with zope.schema fields.
52    iface = Interface
53
54    # The name must be the same as the util_name attribute in order to
55    # register this utility correctly.
56    grok.name(util_name)
57
58    # Headers needed to locate items...
59    location_fields = ['code', 'faculty_code']
60
61    # A factory with this name must be registered...
62    factory_name = 'waeup.Department'
63
64    @property
65    def required_fields(self):
66        """Required fields that have no default.
67
68        A list of names of field, whose value cannot be set if not
69        given during creation. Therefore these fields must exist in
70        input.
71
72        Fields with a default != missing_value do not belong to this
73        category.
74        """
75        result = []
76        for key, field in getFields(self.iface).items():
77            if key in self.location_fields:
78                continue
79            if field.default is not field.missing_value:
80                continue
81            if field.required:
82                result.append(key)
83        return result
84
85    @property
86    def req(self):
87        result = dict(
88            create = self.location_fields + self.required_fields,
89            update = self.location_fields,
90            remove = self.location_fields,
91        )
92        return result
93
94    @property
95    def available_fields(self):
96        return sorted(list(set(
97                    self.location_fields + getFields(self.iface).keys())))
98
99    def getHeaders(self, mode='create'):
100        return self.available_fields
101
102    def checkHeaders(self, headerfields, mode='create'):
103        req = self.req[mode]
104        # Check for required fields...
105        for field in req:
106            if not field in headerfields:
107                raise FatalCSVError(
108                    "Need at least columns %s for import!" %
109                    ', '.join(["'%s'" % x for x in req]))
110        # Check for double fields. Cannot happen because this error is
111        # already catched in views
112        not_ignored_fields = [x for x in headerfields
113                              if not x.startswith('--')]
114        if len(set(not_ignored_fields)) < len(not_ignored_fields):
115            raise FatalCSVError(
116                "Double headers: each column name may only appear once.")
117        return True
118
119    def applyMapping(self, row, mapping):
120        """Apply mapping to a row of CSV data.
121
122        """
123        result = dict()
124        for key, replacement in mapping.items():
125            if replacement == u'--IGNORE--':
126                # Skip ignored columns in failed and finished data files.
127                continue
128            result[replacement] = row[key]
129        return result
130
131    def getMapping(self, path, headerfields, mode):
132        """Get a mapping from CSV file headerfields to actually used fieldnames.
133
134        """
135        result = dict()
136        reader = csv.reader(open(path, 'rb'))
137        raw_header = reader.next()
138        for num, field in enumerate(headerfields):
139            if field not in self.location_fields and mode == 'remove':
140                # Skip non-location fields when removing.
141                continue
142            if field == u'--IGNORE--':
143                # Skip ignored columns in failed and finished data files.
144                continue
145            result[raw_header[num]] = field
146        return result
147
148    def stringFromErrs(self, errors, inv_errors):
149        result = []
150        for err in errors:
151            fieldname, message = err
152            result.append("%s: %s" % (fieldname, message))
153        for err in inv_errors:
154            result.append("invariant: %s" % err)
155        return '; '.join(result)
156
157    def callFactory(self, *args, **kw):
158        return createObject(self.factory_name)
159
160    def parentsExist(self, row, site):
161        """Tell whether the parent object for data in ``row`` exists.
162        """
163        raise NotImplementedError('method not implemented')
164
165    def entryExists(self, row, site):
166        """Tell whether there already exists an entry for ``row`` data.
167        """
168        raise NotImplementedError('method not implemented')
169
170    def getParent(self, row, site):
171        """Get the parent object for the entry in ``row``.
172        """
173        raise NotImplementedError('method not implemented')
174
175    def getEntry(self, row, site):
176        """Get the parent object for the entry in ``row``.
177        """
178        raise NotImplementedError('method not implemented')
179
180    def addEntry(self, obj, row, site):
181        """Add the entry given given by ``row`` data.
182        """
183        raise NotImplementedError('method not implemented')
184
185    def delEntry(self, row, site):
186        """Delete entry given by ``row`` data.
187        """
188        raise NotImplementedError('method not implemented')
189
190    def checkUpdateRequirements(self, obj, row, site):
191        """Checks requirements the object must fulfill when being updated.
192
193        This method is not used in case of deleting or adding objects.
194
195        Returns error messages as strings in case of requirement
196        problems.
197        """
198        return None
199
200    def updateEntry(self, obj, row, site):
201        """Update obj to the values given in row.
202
203        Returns a string describing the fields changed.
204        """
205        changed = []
206        for key, value in row.items():
207            # Skip fields to be ignored.
208            if value == IGNORE_MARKER:
209                continue
210            # Skip fields not declared in interface and which are
211            # not yet attributes of existing objects. We can thus not
212            # add non-existing attributes here.
213            if not hasattr(obj, key):
214                continue
215            setattr(obj, key, value)
216            log_value = getattr(value, 'code', value)
217            changed.append('%s=%s' % (key, log_value))
218        return ', '.join(changed)
219
220    def createLogfile(self, path, fail_path, num, warnings, mode, user,
221                      timedelta, logger=None):
222        """Write to log file.
223        """
224        if logger is None:
225            return
226        status = 'OK'
227        if warnings > 0:
228            status = 'FAILED'
229        logger.info("-" * 20)
230        logger.info("%s: Batch processing finished: %s" % (user, status))
231        logger.info("%s: Source: %s" % (user, path))
232        logger.info("%s: Mode: %s" % (user, mode))
233        logger.info("%s: User: %s" % (user, user))
234        if warnings > 0:
235            logger.info("%s: Failed datasets: %s" % (
236                    user, os.path.basename(fail_path)))
237        logger.info("%s: Processing time: %0.3f s (%0.4f s/item)" % (
238                user, timedelta, timedelta/(num or 1)))
239        logger.info("%s: Processed: %s lines (%s successful/ %s failed)" % (
240                user, num, num - warnings, warnings
241                ))
242        logger.info("-" * 20)
243        return
244
245    def writeFailedRow(self, writer, row, warnings):
246        """Write a row with error messages to error CSV.
247
248        If warnings is a list of strings, they will be concatenated.
249        """
250        error_col = warnings
251        if isinstance(warnings, list):
252            error_col = ' / '.join(warnings)
253        row['--ERRORS--'] = error_col
254        writer.writerow(row)
255        return
256
257    def checkConversion(self, row, mode='ignore', ignore_empty=True):
258        """Validates all values in row.
259        """
260        converter = IObjectConverter(self.iface)
261        errs, inv_errs, conv_dict =  converter.fromStringDict(
262            row, self.factory_name, mode=mode)
263        return errs, inv_errs, conv_dict
264
265    def doImport(self, path, headerfields, mode='create', user='Unknown',
266                 logger=None, ignore_empty=True):
267        """Perform actual import.
268        """
269        time_start = time.time()
270        self.checkHeaders(headerfields, mode)
271        mapping = self.getMapping(path, headerfields, mode)
272        reader = csv.DictReader(open(path, 'rb'))
273
274        temp_dir = tempfile.mkdtemp()
275
276        base = os.path.basename(path)
277        (base, ext) = os.path.splitext(base)
278        failed_path = os.path.join(temp_dir, "%s.pending%s" % (base, ext))
279        failed_headers = mapping.values()
280        failed_headers.append('--ERRORS--')
281        failed_writer = csv.DictWriter(open(failed_path, 'wb'),
282                                       failed_headers)
283        failed_writer.writerow(dict([(x,x) for x in failed_headers]))
284
285        finished_path = os.path.join(temp_dir, "%s.finished%s" % (base, ext))
286        finished_headers = mapping.values()
287        finished_writer = csv.DictWriter(open(finished_path, 'wb'),
288                                         finished_headers)
289        finished_writer.writerow(dict([(x,x) for x in finished_headers]))
290
291        num =0
292        num_warns = 0
293        site = grok.getSite()
294
295        for raw_row in reader:
296            num += 1
297            string_row = self.applyMapping(raw_row, mapping)
298            if ignore_empty and mode in ('update',):
299                # replace empty strings with ignore-markers
300                for key, val in string_row.items():
301                    if val == '':
302                        string_row[key] = IGNORE_MARKER
303            row = dict(string_row.items()) # create deep copy
304            errs, inv_errs, conv_dict = self.checkConversion(string_row, mode)
305            if errs or inv_errs:
306                num_warns += 1
307                conv_warnings = self.stringFromErrs(errs, inv_errs)
308                self.writeFailedRow(
309                    failed_writer, string_row, conv_warnings)
310                continue
311            row.update(conv_dict)
312
313            if mode == 'create':
314                if not self.parentsExist(row, site):
315                    num_warns += 1
316                    self.writeFailedRow(
317                        failed_writer, string_row,
318                        "Not all parents do exist yet. Skipping")
319                    continue
320                if self.entryExists(row, site):
321                    num_warns += 1
322                    self.writeFailedRow(
323                        failed_writer, string_row,
324                        "This object already exists. Skipping.")
325                    continue
326                obj = self.callFactory()
327                # Override all values in row, also
328                # student_ids and applicant_ids which have been
329                # generated in the respective __init__ methods before.
330                self.updateEntry(obj, row, site)
331                try:
332                    self.addEntry(obj, row, site)
333                except KeyError, error:
334                    num_warns += 1
335                    self.writeFailedRow(
336                        failed_writer, string_row,
337                        "%s Skipping." % error.message)
338                    continue
339            elif mode == 'remove':
340                if not self.entryExists(row, site):
341                    num_warns += 1
342                    self.writeFailedRow(
343                        failed_writer, string_row,
344                        "Cannot remove: no such entry.")
345                    continue
346                self.delEntry(row, site)
347            elif mode == 'update':
348                obj = self.getEntry(row, site)
349                if obj is None:
350                    num_warns += 1
351                    self.writeFailedRow(
352                        failed_writer, string_row,
353                        "Cannot update: no such entry.")
354                    continue
355                update_errors = self.checkUpdateRequirements(obj, row, site)
356                if update_errors is not None:
357                    num_warns += 1
358                    self.writeFailedRow(
359                        failed_writer, string_row, update_errors)
360                    continue
361                self.updateEntry(obj, row, site)
362            finished_writer.writerow(string_row)
363
364        time_end = time.time()
365        timedelta = time_end - time_start
366
367        self.createLogfile(path, failed_path, num, num_warns, mode, user,
368                           timedelta, logger=logger)
369        failed_path = os.path.abspath(failed_path)
370        if num_warns == 0:
371            del failed_writer
372            os.unlink(failed_path)
373            failed_path = None
374        return (num, num_warns,
375                os.path.abspath(finished_path), failed_path)
376
377class ExporterBase(object):
378    """A base for exporters.
379    """
380    grok.implements(ICSVExporter)
381
382    #: Fieldnames considered by this exporter
383    fields = ('code', 'title', 'title_prefix')
384
385    #: The title under which this exporter will be displayed
386    #: (if registered as a utility)
387    title = 'Override this title'
388
389    def mangle_value(self, value, name, context=None):
390        """Hook for mangling values in derived classes
391        """
392        if isinstance(value, bool):
393            value = value and '1' or '0'
394        elif isinstance(value, unicode):
395            # CSV writers like byte streams better than unicode
396            value = value.encode('utf-8')
397        elif value is None:
398            # None is not really representable in CSV files
399            value = ''
400        return value
401
402    def get_csv_writer(self, filepath=None):
403        """Get a CSV dict writer instance open for writing.
404
405        Returns a tuple (<writer>, <outfile>) where ``<writer>`` is a
406        :class:`csv.DictWriter` instance and outfile is the real file
407        which is written to. The latter is important when writing to
408        StringIO and can normally be ignored otherwise.
409
410        The returned file will already be filled with the header row.
411
412        Please note that if you give a filepath, the returned outfile
413        is open for writing only and you might have to close it before
414        reopening it for reading.
415        """
416        if filepath is None:
417            outfile = StringIO()
418        else:
419            outfile = open(filepath, 'wb')
420        writer = csv.DictWriter(outfile, self.fields)
421        writer.writerow(dict(zip(self.fields, self.fields))) # header
422        return writer, outfile
423
424    def write_item(self, obj, writer):
425        """Write a row extracted from `obj` into CSV file using `writer`.
426        """
427        row = {}
428        for name in self.fields:
429            value = getattr(obj, name, None)
430            value = self.mangle_value(value, name, obj)
431            row[name] = value
432        writer.writerow(row)
433        return
434
435    def close_outfile(self, filepath, outfile):
436        """Close outfile.
437
438        If filepath is None, the contents of outfile is returned.
439        """
440        outfile.seek(0)
441        if filepath is None:
442            return outfile.read()
443        outfile.close()
444        return
445
446    def export(self, iterable, filepath=None):
447        """Export `iterable` as CSV file.
448
449        If `filepath` is ``None``, a raw string with CSV data should
450        be returned.
451        """
452        raise NotImplementedError
453
454    def export_all(self, site, filepath=None):
455        """Export all appropriate objects in `site` into `filepath` as
456        CSV data.
457
458        If `filepath` is ``None``, a raw string with CSV data should
459        be returned.
460        """
461        raise NotImplementedError
Note: See TracBrowser for help on using the repository browser.