source: main/waeup.kofa/branches/uli-zc-async/src/waeup/kofa/utils/batching.py @ 9168

Last change on this file since 9168 was 9112, checked in by uli, 12 years ago

Add new helper function.

  • Property svn:keywords set to Id
File size: 23.5 KB
Line 
1## $Id: batching.py 9112 2012-08-26 13:40:01Z uli $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
18"""Kofa components for batch processing.
19
20Batch processors eat CSV files to add, update or remove large numbers
21of certain kinds of objects at once.
22"""
23import grok
24import csv
25import datetime
26import os
27import shutil
28import tempfile
29import time
30from cStringIO import StringIO
31from persistent.list import PersistentList
32from zope.component import createObject, getUtility
33from zope.component.hooks import setSite
34from zope.interface import Interface
35from zope.schema import getFields
36from zope.event import notify
37from waeup.kofa.async import AsyncJob
38from waeup.kofa.interfaces import (
39    IBatchProcessor, FatalCSVError, IObjectConverter, IJobManager,
40    ICSVExporter, IGNORE_MARKER, DuplicationError, JOB_STATUS_MAP,
41    IExportJobContainer, IExportJob)
42
43class BatchProcessor(grok.GlobalUtility):
44    """A processor to add, update, or remove data.
45
46    This is a non-active baseclass.
47    """
48    grok.implements(IBatchProcessor)
49    grok.context(Interface)
50    grok.baseclass()
51
52    # Name used in pages and forms...
53    name = u'Non-registered base processor'
54
55    # Internal name...
56    util_name = 'baseprocessor'
57
58    # Items for this processor need an interface with zope.schema fields.
59    iface = Interface
60
61    # The name must be the same as the util_name attribute in order to
62    # register this utility correctly.
63    grok.name(util_name)
64
65    # Headers needed to locate items...
66    location_fields = ['code', 'faculty_code']
67
68    # A factory with this name must be registered...
69    factory_name = 'waeup.Department'
70
71    @property
72    def required_fields(self):
73        """Required fields that have no default.
74
75        A list of names of field, whose value cannot be set if not
76        given during creation. Therefore these fields must exist in
77        input.
78
79        Fields with a default != missing_value do not belong to this
80        category.
81        """
82        result = []
83        for key, field in getFields(self.iface).items():
84            if key in self.location_fields:
85                continue
86            if field.default is not field.missing_value:
87                continue
88            if field.required:
89                result.append(key)
90        return result
91
92    @property
93    def req(self):
94        result = dict(
95            create = self.location_fields + self.required_fields,
96            update = self.location_fields,
97            remove = self.location_fields,
98        )
99        return result
100
101    @property
102    def available_fields(self):
103        return sorted(list(set(
104                    self.location_fields + getFields(self.iface).keys())))
105
106    def getHeaders(self, mode='create'):
107        return self.available_fields
108
109    def checkHeaders(self, headerfields, mode='create'):
110        req = self.req[mode]
111        # Check for required fields...
112        for field in req:
113            if not field in headerfields:
114                raise FatalCSVError(
115                    "Need at least columns %s for import!" %
116                    ', '.join(["'%s'" % x for x in req]))
117        # Check for double fields. Cannot happen because this error is
118        # already catched in views
119        not_ignored_fields = [x for x in headerfields
120                              if not x.startswith('--')]
121        if len(set(not_ignored_fields)) < len(not_ignored_fields):
122            raise FatalCSVError(
123                "Double headers: each column name may only appear once.")
124        return True
125
126    def applyMapping(self, row, mapping):
127        """Apply mapping to a row of CSV data.
128
129        """
130        result = dict()
131        for key, replacement in mapping.items():
132            if replacement == u'--IGNORE--':
133                # Skip ignored columns in failed and finished data files.
134                continue
135            result[replacement] = row[key]
136        return result
137
138    def getMapping(self, path, headerfields, mode):
139        """Get a mapping from CSV file headerfields to actually used fieldnames.
140
141        """
142        result = dict()
143        reader = csv.reader(open(path, 'rb'))
144        raw_header = reader.next()
145        for num, field in enumerate(headerfields):
146            if field not in self.location_fields and mode == 'remove':
147                # Skip non-location fields when removing.
148                continue
149            if field == u'--IGNORE--':
150                # Skip ignored columns in failed and finished data files.
151                continue
152            result[raw_header[num]] = field
153        return result
154
155    def stringFromErrs(self, errors, inv_errors):
156        result = []
157        for err in errors:
158            fieldname, message = err
159            result.append("%s: %s" % (fieldname, message))
160        for err in inv_errors:
161            result.append("invariant: %s" % err)
162        return '; '.join(result)
163
164    def callFactory(self, *args, **kw):
165        return createObject(self.factory_name)
166
167    def parentsExist(self, row, site):
168        """Tell whether the parent object for data in ``row`` exists.
169        """
170        raise NotImplementedError('method not implemented')
171
172    def entryExists(self, row, site):
173        """Tell whether there already exists an entry for ``row`` data.
174        """
175        raise NotImplementedError('method not implemented')
176
177    def getParent(self, row, site):
178        """Get the parent object for the entry in ``row``.
179        """
180        raise NotImplementedError('method not implemented')
181
182    def getEntry(self, row, site):
183        """Get the parent object for the entry in ``row``.
184        """
185        raise NotImplementedError('method not implemented')
186
187    def addEntry(self, obj, row, site):
188        """Add the entry given given by ``row`` data.
189        """
190        raise NotImplementedError('method not implemented')
191
192    def delEntry(self, row, site):
193        """Delete entry given by ``row`` data.
194        """
195        raise NotImplementedError('method not implemented')
196
197    def checkUpdateRequirements(self, obj, row, site):
198        """Checks requirements the object must fulfill when being updated.
199
200        This method is not used in case of deleting or adding objects.
201
202        Returns error messages as strings in case of requirement
203        problems.
204        """
205        return None
206
207    def updateEntry(self, obj, row, site):
208        """Update obj to the values given in row.
209
210        Returns a string describing the fields changed.
211        """
212        changed = []
213        for key, value in row.items():
214            # Skip fields to be ignored.
215            if value == IGNORE_MARKER:
216                continue
217            # Skip fields not declared in interface and which are
218            # not yet attributes of existing objects. We can thus not
219            # add non-existing attributes here.
220            if not hasattr(obj, key):
221                continue
222            setattr(obj, key, value)
223            log_value = getattr(value, 'code', value)
224            changed.append('%s=%s' % (key, log_value))
225
226        # If any catalog is involved it must be updated.
227        #
228        # XXX: The event is also triggered when creating objects as
229        # updateEntry is called also when creating entries resulting
230        # in objectAdded and additional objectModified events.
231        if len(changed):
232            notify(grok.ObjectModifiedEvent(obj))
233
234        return ', '.join(changed)
235
236    def createLogfile(self, path, fail_path, num, warnings, mode, user,
237                      timedelta, logger=None):
238        """Write to log file.
239        """
240        if logger is None:
241            return
242        status = 'OK'
243        if warnings > 0:
244            status = 'FAILED'
245        logger.info("-" * 20)
246        logger.info("%s: Batch processing finished: %s" % (user, status))
247        logger.info("%s: Source: %s" % (user, path))
248        logger.info("%s: Mode: %s" % (user, mode))
249        logger.info("%s: User: %s" % (user, user))
250        if warnings > 0:
251            logger.info("%s: Failed datasets: %s" % (
252                    user, os.path.basename(fail_path)))
253        logger.info("%s: Processing time: %0.3f s (%0.4f s/item)" % (
254                user, timedelta, timedelta/(num or 1)))
255        logger.info("%s: Processed: %s lines (%s successful/ %s failed)" % (
256                user, num, num - warnings, warnings
257                ))
258        logger.info("-" * 20)
259        return
260
261    def writeFailedRow(self, writer, row, warnings):
262        """Write a row with error messages to error CSV.
263
264        If warnings is a list of strings, they will be concatenated.
265        """
266        error_col = warnings
267        if isinstance(warnings, list):
268            error_col = ' / '.join(warnings)
269        row['--ERRORS--'] = error_col
270        writer.writerow(row)
271        return
272
273    def checkConversion(self, row, mode='ignore', ignore_empty=True):
274        """Validates all values in row.
275        """
276        converter = IObjectConverter(self.iface)
277        errs, inv_errs, conv_dict =  converter.fromStringDict(
278            row, self.factory_name, mode=mode)
279        return errs, inv_errs, conv_dict
280
281    def doImport(self, path, headerfields, mode='create', user='Unknown',
282                 logger=None, ignore_empty=True):
283        """Perform actual import.
284        """
285        time_start = time.time()
286        self.checkHeaders(headerfields, mode)
287        mapping = self.getMapping(path, headerfields, mode)
288        reader = csv.DictReader(open(path, 'rb'))
289
290        temp_dir = tempfile.mkdtemp()
291
292        base = os.path.basename(path)
293        (base, ext) = os.path.splitext(base)
294        failed_path = os.path.join(temp_dir, "%s.pending%s" % (base, ext))
295        failed_headers = mapping.values()
296        failed_headers.append('--ERRORS--')
297        failed_writer = csv.DictWriter(open(failed_path, 'wb'),
298                                       failed_headers)
299        os.chmod(failed_path, 0664)
300        failed_writer.writerow(dict([(x,x) for x in failed_headers]))
301
302        finished_path = os.path.join(temp_dir, "%s.finished%s" % (base, ext))
303        finished_headers = mapping.values()
304        finished_writer = csv.DictWriter(open(finished_path, 'wb'),
305                                         finished_headers)
306        finished_writer.writerow(dict([(x,x) for x in finished_headers]))
307
308        num =0
309        num_warns = 0
310        site = grok.getSite()
311
312        for raw_row in reader:
313            num += 1
314            string_row = self.applyMapping(raw_row, mapping)
315            if ignore_empty and mode in ('update',):
316                # replace empty strings with ignore-markers
317                for key, val in string_row.items():
318                    if val == '':
319                        string_row[key] = IGNORE_MARKER
320            row = dict(string_row.items()) # create deep copy
321            errs, inv_errs, conv_dict = self.checkConversion(string_row, mode)
322            if errs or inv_errs:
323                num_warns += 1
324                conv_warnings = self.stringFromErrs(errs, inv_errs)
325                self.writeFailedRow(
326                    failed_writer, string_row, conv_warnings)
327                continue
328            row.update(conv_dict)
329
330            if mode == 'create':
331                if not self.parentsExist(row, site):
332                    num_warns += 1
333                    self.writeFailedRow(
334                        failed_writer, string_row,
335                        "Not all parents do exist yet. Skipping")
336                    continue
337                if self.entryExists(row, site):
338                    num_warns += 1
339                    self.writeFailedRow(
340                        failed_writer, string_row,
341                        "This object already exists. Skipping.")
342                    continue
343                obj = self.callFactory()
344                # Override all values in row, also
345                # student_ids and applicant_ids which have been
346                # generated in the respective __init__ methods before.
347                self.updateEntry(obj, row, site)
348                try:
349                    self.addEntry(obj, row, site)
350                except KeyError, error:
351                    num_warns += 1
352                    self.writeFailedRow(
353                        failed_writer, string_row,
354                        "%s Skipping." % error.message)
355                    continue
356                except DuplicationError, error:
357                    num_warns += 1
358                    self.writeFailedRow(
359                        failed_writer, string_row,
360                        "%s Skipping." % error.msg)
361                    continue
362            elif mode == 'remove':
363                if not self.entryExists(row, site):
364                    num_warns += 1
365                    self.writeFailedRow(
366                        failed_writer, string_row,
367                        "Cannot remove: no such entry.")
368                    continue
369                self.delEntry(row, site)
370            elif mode == 'update':
371                obj = self.getEntry(row, site)
372                if obj is None:
373                    num_warns += 1
374                    self.writeFailedRow(
375                        failed_writer, string_row,
376                        "Cannot update: no such entry.")
377                    continue
378                update_errors = self.checkUpdateRequirements(obj, row, site)
379                if update_errors is not None:
380                    num_warns += 1
381                    self.writeFailedRow(
382                        failed_writer, string_row, update_errors)
383                    continue
384                self.updateEntry(obj, row, site)
385            finished_writer.writerow(string_row)
386
387        time_end = time.time()
388        timedelta = time_end - time_start
389
390        self.createLogfile(path, failed_path, num, num_warns, mode, user,
391                           timedelta, logger=logger)
392        failed_path = os.path.abspath(failed_path)
393        if num_warns == 0:
394            del failed_writer
395            os.unlink(failed_path)
396            failed_path = None
397        return (num, num_warns,
398                os.path.abspath(finished_path), failed_path)
399
400class ExporterBase(object):
401    """A base for exporters.
402    """
403    grok.implements(ICSVExporter)
404
405    #: Fieldnames considered by this exporter
406    fields = ('code', 'title', 'title_prefix')
407
408    #: The title under which this exporter will be displayed
409    #: (if registered as a utility)
410    title = 'Override this title'
411
412    def mangle_value(self, value, name, context=None):
413        """Hook for mangling values in derived classes
414        """
415        if isinstance(value, bool):
416            value = value and '1' or '0'
417        elif isinstance(value, unicode):
418            # CSV writers like byte streams better than unicode
419            value = value.encode('utf-8')
420        elif isinstance(value, datetime.datetime):
421            value = str(value)
422        elif isinstance(value, datetime.date):
423            # Order is important here: check for date after datetime as
424            # datetimes are also dates.
425            #
426            # Append hash '#' to dates to circumvent unwanted excel automatic
427            value = str('%s#' % value)
428        elif value is None:
429            # None is not really representable in CSV files
430            value = ''
431        return value
432
433    def get_csv_writer(self, filepath=None):
434        """Get a CSV dict writer instance open for writing.
435
436        Returns a tuple (<writer>, <outfile>) where ``<writer>`` is a
437        :class:`csv.DictWriter` instance and outfile is the real file
438        which is written to. The latter is important when writing to
439        StringIO and can normally be ignored otherwise.
440
441        The returned file will already be filled with the header row.
442
443        Please note that if you give a filepath, the returned outfile
444        is open for writing only and you might have to close it before
445        reopening it for reading.
446        """
447        if filepath is None:
448            outfile = StringIO()
449        else:
450            outfile = open(filepath, 'wb')
451        writer = csv.DictWriter(outfile, self.fields)
452        writer.writerow(dict(zip(self.fields, self.fields))) # header
453        return writer, outfile
454
455    def write_item(self, obj, writer):
456        """Write a row extracted from `obj` into CSV file using `writer`.
457        """
458        row = {}
459        for name in self.fields:
460            value = getattr(obj, name, None)
461            value = self.mangle_value(value, name, obj)
462            row[name] = value
463        writer.writerow(row)
464        return
465
466    def close_outfile(self, filepath, outfile):
467        """Close outfile.
468
469        If filepath is None, the contents of outfile is returned.
470        """
471        outfile.seek(0)
472        if filepath is None:
473            return outfile.read()
474        outfile.close()
475        return
476
477    def export(self, iterable, filepath=None):
478        """Export `iterable` as CSV file.
479
480        If `filepath` is ``None``, a raw string with CSV data should
481        be returned.
482        """
483        raise NotImplementedError
484
485    def export_all(self, site, filepath=None):
486        """Export all appropriate objects in `site` into `filepath` as
487        CSV data.
488
489        If `filepath` is ``None``, a raw string with CSV data should
490        be returned.
491        """
492        raise NotImplementedError
493
494
495def export_job(site, exporter_name):
496    """Export all entries delivered by exporter and store it in a temp file.
497
498    `site` gives the site to search. It will be passed to the exporter
499    and also be set as 'current site' as the function is used in
500    asynchronous jobs which run in their own threads and have no site
501    set initially. Therefore `site` must also be a valid value for use
502    with `zope.component.hooks.setSite()`.
503
504    `exporter_name` is the utility name under which the desired
505    exporter was registered with the ZCA.
506
507    The resulting CSV file will be stored in a new temporary directory
508    (using :func:`tempfile.mkdtemp`).
509
510    Returns the path to the created CSV file.
511
512    .. note:: It is the callers responsibility to clean up the used
513              file and its parent directory.
514    """
515    setSite(site)
516    exporter = getUtility(ICSVExporter, name=exporter_name)
517    output_dir = tempfile.mkdtemp()
518    output_path = os.path.join(output_dir, 'export.csv')
519    exporter.export_all(site, filepath=output_path)
520    return output_path
521
522class AsyncExportJob(AsyncJob):
523    """An IJob that exports data to CSV files.
524
525    `AsyncExportJob` instances are regular `AsyncJob` instances with a
526    different constructor API. Instead of a callable to execute, you
527    must pass a `site` and some `exporter_name` to trigger an export.
528
529    The real work is done when an instance of this class is put into a
530    queue. See :mod:`waeup.kofa.async` to learn more about
531    asynchronous jobs.
532
533    The `exporter_name` must be the name under which an ICSVExporter
534    utility was registered with the ZCA.
535
536    The `site` must be a valid site  or ``None``.
537
538    The result of an `AsyncExportJob` is the path to generated CSV
539    file. The file will reside in a temporary directory that should be
540    removed after being used.
541    """
542    grok.implements(IExportJob)
543
544    def __init__(self, site, exporter_name):
545        super(AsyncExportJob, self).__init__(
546            export_job, site, exporter_name)
547
548class ExportJobContainer(object):
549    """A mix-in that provides functionality for asynchronous export jobs.
550    """
551    grok.implements(IExportJobContainer)
552    running_exports = PersistentList()
553
554    def start_export_job(self, exporter_name, user_id):
555        """Start asynchronous export job.
556
557        `exporter_name` is the name of an exporter utility to be used.
558
559        `user_id` is the ID of the user that triggers the export.
560
561        The job_id is stored along with exporter name and user id in a
562        persistent list.
563
564        Returns the job ID of the job started.
565        """
566        site = grok.getSite()
567        manager = getUtility(IJobManager)
568        job = AsyncExportJob(site, exporter_name)
569        job_id = manager.put(job)
570        # Make sure that the persisted list is stored in ZODB
571        self.running_exports = PersistentList(self.running_exports)
572        self.running_exports.append((job_id, exporter_name, user_id))
573        return job_id
574
575    def get_running_export_jobs(self, user_id=None):
576        """Get export jobs for user with `user_id` as list of tuples.
577
578        Each tuples holds ``<job_id>, <exporter_name>, <user_id>`` in
579        that order. The ``<exporter_name>`` is the utility name of the
580        used exporter.
581
582        If `user_id` is ``None``, all running jobs are returned.
583        """
584        entries = []
585        to_delete = []
586        manager = getUtility(IJobManager)
587        for entry in self.running_exports:
588            if user_id is not None and entry[2] != user_id:
589                continue
590            if manager.get(entry[0]) is None:
591                to_delete.append(entry)
592                continue
593            entries.append(entry)
594        if to_delete:
595            self.running_exports = PersistentList(
596                [x for x in self.running_exports if x not in to_delete])
597        return entries
598
599    def get_export_jobs_status(self, user_id=None):
600        """Get running/completed export jobs for `user_id` as list of tuples.
601
602        Each tuple holds ``<raw status>, <status translated>,
603        <exporter title>`` in that order, where ``<status
604        translated>`` and ``<exporter title>`` are translated strings
605        representing the status of the job and the human readable
606        title of the exporter used.
607        """
608        entries = self.get_running_export_jobs(user_id)
609        result = []
610        manager = getUtility(IJobManager)
611        for entry in entries:
612            job = manager.get(entry[0])
613            if job is None:
614                continue
615            status, status_translated = JOB_STATUS_MAP[job.status]
616            exporter_name = getUtility(ICSVExporter, name=entry[1]).title
617            result.append((status, status_translated, exporter_name))
618        return result
619
620    def delete_export_entry(self, entry):
621        """Delete the export denoted by `entry`.
622
623        Removes given entry from the local `running_exports` list and also
624        removes the regarding job via the local job manager.
625
626        `entry` must be a tuple ``(<job id>, <exporter name>, <user
627        id>)`` as created by :meth:`start_export_job` or returned by
628        :meth:`get_running_export_jobs`.
629        """
630        manager = getUtility(IJobManager)
631        job = manager.get(entry[0])
632        if job is not None:
633            # remove created export file
634            if isinstance(job.result, basestring):
635                if os.path.exists(os.path.dirname(job.result)):
636                    shutil.rmtree(os.path.dirname(job.result))
637        manager.remove(entry[0], self)
638        new_entries = [x for x in self.running_exports
639                       if x != entry]
640        self.running_exports = PersistentList(new_entries)
641        return
642
643    def entry_from_job_id(self, job_id):
644        """Get entry tuple for `job_id`.
645
646        Returns ``None`` if no such entry can be found.
647        """
648        for entry in self.running_exports:
649            if entry[0] == job_id:
650                return entry
651        return None
Note: See TracBrowser for help on using the repository browser.