source: main/waeup.kofa/branches/uli-async-update/src/waeup/kofa/utils/batching.py @ 9169

Last change on this file since 9169 was 9169, checked in by uli, 12 years ago

Merge changes from trunk, r8786-HEAD

  • Property svn:keywords set to Id
File size: 23.9 KB
Line 
1## $Id: batching.py 9169 2012-09-10 11:05:07Z uli $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
18"""Kofa components for batch processing.
19
20Batch processors eat CSV files to add, update or remove large numbers
21of certain kinds of objects at once.
22"""
23import grok
24import csv
25import datetime
26import os
27import shutil
28import tempfile
29import time
30from cStringIO import StringIO
31from persistent.list import PersistentList
32from zope.component import createObject, getUtility
33from zope.component.hooks import setSite
34from zope.interface import Interface
35from zope.schema import getFields
36from zope.event import notify
37from waeup.kofa.async import AsyncJob
38from waeup.kofa.interfaces import (
39    IBatchProcessor, FatalCSVError, IObjectConverter, IJobManager,
40    ICSVExporter, IGNORE_MARKER, DuplicationError, JOB_STATUS_MAP,
41    IExportJobContainer, IExportJob)
42
43class BatchProcessor(grok.GlobalUtility):
44    """A processor to add, update, or remove data.
45
46    This is a non-active baseclass.
47    """
48    grok.implements(IBatchProcessor)
49    grok.context(Interface)
50    grok.baseclass()
51
52    # Name used in pages and forms...
53    name = u'Non-registered base processor'
54
55    # Internal name...
56    util_name = 'baseprocessor'
57
58    # Items for this processor need an interface with zope.schema fields.
59    iface = Interface
60
61    # The name must be the same as the util_name attribute in order to
62    # register this utility correctly.
63    grok.name(util_name)
64
65    # Headers needed to locate items...
66    location_fields = ['code', 'faculty_code']
67
68    # A factory with this name must be registered...
69    factory_name = 'waeup.Department'
70
71    @property
72    def required_fields(self):
73        """Required fields that have no default.
74
75        A list of names of field, whose value cannot be set if not
76        given during creation. Therefore these fields must exist in
77        input.
78
79        Fields with a default != missing_value do not belong to this
80        category.
81        """
82        result = []
83        for key, field in getFields(self.iface).items():
84            if key in self.location_fields:
85                continue
86            if field.default is not field.missing_value:
87                continue
88            if field.required:
89                result.append(key)
90        return result
91
92    @property
93    def req(self):
94        result = dict(
95            create = self.location_fields + self.required_fields,
96            update = self.location_fields,
97            remove = self.location_fields,
98        )
99        return result
100
101    @property
102    def available_fields(self):
103        return sorted(list(set(
104                    self.location_fields + getFields(self.iface).keys())))
105
106    def getHeaders(self, mode='create'):
107        return self.available_fields
108
109    def checkHeaders(self, headerfields, mode='create'):
110        req = self.req[mode]
111        # Check for required fields...
112        for field in req:
113            if not field in headerfields:
114                raise FatalCSVError(
115                    "Need at least columns %s for import!" %
116                    ', '.join(["'%s'" % x for x in req]))
117        # Check for double fields. Cannot happen because this error is
118        # already catched in views
119        not_ignored_fields = [x for x in headerfields
120                              if not x.startswith('--')]
121        if len(set(not_ignored_fields)) < len(not_ignored_fields):
122            raise FatalCSVError(
123                "Double headers: each column name may only appear once.")
124        return True
125
126    def applyMapping(self, row, mapping):
127        """Apply mapping to a row of CSV data.
128
129        """
130        result = dict()
131        for key, replacement in mapping.items():
132            if replacement == u'--IGNORE--':
133                # Skip ignored columns in failed and finished data files.
134                continue
135            result[replacement] = row[key]
136        return result
137
138    def getMapping(self, path, headerfields, mode):
139        """Get a mapping from CSV file headerfields to actually used fieldnames.
140
141        """
142        result = dict()
143        reader = csv.reader(open(path, 'rb'))
144        raw_header = reader.next()
145        for num, field in enumerate(headerfields):
146            if field not in self.location_fields and mode == 'remove':
147                # Skip non-location fields when removing.
148                continue
149            if field == u'--IGNORE--':
150                # Skip ignored columns in failed and finished data files.
151                continue
152            result[raw_header[num]] = field
153        return result
154
155    def stringFromErrs(self, errors, inv_errors):
156        result = []
157        for err in errors:
158            fieldname, message = err
159            result.append("%s: %s" % (fieldname, message))
160        for err in inv_errors:
161            result.append("invariant: %s" % err)
162        return '; '.join(result)
163
164    def callFactory(self, *args, **kw):
165        return createObject(self.factory_name)
166
167    def parentsExist(self, row, site):
168        """Tell whether the parent object for data in ``row`` exists.
169        """
170        raise NotImplementedError('method not implemented')
171
172    def entryExists(self, row, site):
173        """Tell whether there already exists an entry for ``row`` data.
174        """
175        raise NotImplementedError('method not implemented')
176
177    def getParent(self, row, site):
178        """Get the parent object for the entry in ``row``.
179        """
180        raise NotImplementedError('method not implemented')
181
182    def getEntry(self, row, site):
183        """Get the parent object for the entry in ``row``.
184        """
185        raise NotImplementedError('method not implemented')
186
187    def addEntry(self, obj, row, site):
188        """Add the entry given given by ``row`` data.
189        """
190        raise NotImplementedError('method not implemented')
191
192    def delEntry(self, row, site):
193        """Delete entry given by ``row`` data.
194        """
195        raise NotImplementedError('method not implemented')
196
197    def checkUpdateRequirements(self, obj, row, site):
198        """Checks requirements the object must fulfill when being updated.
199
200        This method is not used in case of deleting or adding objects.
201
202        Returns error messages as strings in case of requirement
203        problems.
204        """
205        return None
206
207    def updateEntry(self, obj, row, site):
208        """Update obj to the values given in row.
209
210        Returns a string describing the fields changed.
211        """
212        changed = []
213        for key, value in row.items():
214            # Skip fields to be ignored.
215            if value == IGNORE_MARKER:
216                continue
217            # Skip fields not declared in interface and which are
218            # not yet attributes of existing objects. We can thus not
219            # add non-existing attributes here.
220            if not hasattr(obj, key):
221                continue
222            setattr(obj, key, value)
223            log_value = getattr(value, 'code', value)
224            changed.append('%s=%s' % (key, log_value))
225
226        # If any catalog is involved it must be updated.
227        #
228        # XXX: The event is also triggered when creating objects as
229        # updateEntry is called also when creating entries resulting
230        # in objectAdded and additional objectModified events.
231        if len(changed):
232            notify(grok.ObjectModifiedEvent(obj))
233
234        return ', '.join(changed)
235
236    def createLogfile(self, path, fail_path, num, warnings, mode, user,
237                      timedelta, logger=None):
238        """Write to log file.
239        """
240        if logger is None:
241            return
242        status = 'OK'
243        if warnings > 0:
244            status = 'FAILED'
245        logger.info("-" * 20)
246        logger.info("%s: Batch processing finished: %s" % (user, status))
247        logger.info("%s: Source: %s" % (user, path))
248        logger.info("%s: Mode: %s" % (user, mode))
249        logger.info("%s: User: %s" % (user, user))
250        if warnings > 0:
251            logger.info("%s: Failed datasets: %s" % (
252                    user, os.path.basename(fail_path)))
253        logger.info("%s: Processing time: %0.3f s (%0.4f s/item)" % (
254                user, timedelta, timedelta/(num or 1)))
255        logger.info("%s: Processed: %s lines (%s successful/ %s failed)" % (
256                user, num, num - warnings, warnings
257                ))
258        logger.info("-" * 20)
259        return
260
261    def writeFailedRow(self, writer, row, warnings):
262        """Write a row with error messages to error CSV.
263
264        If warnings is a list of strings, they will be concatenated.
265        """
266        error_col = warnings
267        if isinstance(warnings, list):
268            error_col = ' / '.join(warnings)
269        row['--ERRORS--'] = error_col
270        writer.writerow(row)
271        return
272
273    def checkConversion(self, row, mode='ignore', ignore_empty=True):
274        """Validates all values in row.
275        """
276        converter = IObjectConverter(self.iface)
277        errs, inv_errs, conv_dict =  converter.fromStringDict(
278            row, self.factory_name, mode=mode)
279        return errs, inv_errs, conv_dict
280
281    def doImport(self, path, headerfields, mode='create', user='Unknown',
282                 logger=None, ignore_empty=True):
283        """Perform actual import.
284        """
285        time_start = time.time()
286        self.checkHeaders(headerfields, mode)
287        mapping = self.getMapping(path, headerfields, mode)
288        reader = csv.DictReader(open(path, 'rb'))
289
290        temp_dir = tempfile.mkdtemp()
291
292        base = os.path.basename(path)
293        (base, ext) = os.path.splitext(base)
294        failed_path = os.path.join(temp_dir, "%s.pending%s" % (base, ext))
295        failed_headers = mapping.values()
296        failed_headers.append('--ERRORS--')
297        failed_writer = csv.DictWriter(open(failed_path, 'wb'),
298                                       failed_headers)
299        os.chmod(failed_path, 0664)
300        failed_writer.writerow(dict([(x,x) for x in failed_headers]))
301
302        finished_path = os.path.join(temp_dir, "%s.finished%s" % (base, ext))
303        finished_headers = mapping.values()
304        finished_writer = csv.DictWriter(open(finished_path, 'wb'),
305                                         finished_headers)
306        os.chmod(finished_path, 0664)
307        finished_writer.writerow(dict([(x,x) for x in finished_headers]))
308
309        num =0
310        num_warns = 0
311        site = grok.getSite()
312
313        for raw_row in reader:
314            num += 1
315            string_row = self.applyMapping(raw_row, mapping)
316            if ignore_empty and mode in ('update',):
317                # replace empty strings with ignore-markers
318                for key, val in string_row.items():
319                    if val == '':
320                        string_row[key] = IGNORE_MARKER
321            row = dict(string_row.items()) # create deep copy
322            errs, inv_errs, conv_dict = self.checkConversion(string_row, mode)
323            if errs or inv_errs:
324                num_warns += 1
325                conv_warnings = self.stringFromErrs(errs, inv_errs)
326                self.writeFailedRow(
327                    failed_writer, string_row, conv_warnings)
328                continue
329            row.update(conv_dict)
330
331            if mode == 'create':
332                if not self.parentsExist(row, site):
333                    num_warns += 1
334                    self.writeFailedRow(
335                        failed_writer, string_row,
336                        "Not all parents do exist yet. Skipping")
337                    continue
338                if self.entryExists(row, site):
339                    num_warns += 1
340                    self.writeFailedRow(
341                        failed_writer, string_row,
342                        "This object already exists. Skipping.")
343                    continue
344                obj = self.callFactory()
345                # Override all values in row, also
346                # student_ids and applicant_ids which have been
347                # generated in the respective __init__ methods before.
348                self.updateEntry(obj, row, site)
349                try:
350                    self.addEntry(obj, row, site)
351                except KeyError, error:
352                    num_warns += 1
353                    self.writeFailedRow(
354                        failed_writer, string_row,
355                        "%s Skipping." % error.message)
356                    continue
357                except DuplicationError, error:
358                    num_warns += 1
359                    self.writeFailedRow(
360                        failed_writer, string_row,
361                        "%s Skipping." % error.msg)
362                    continue
363            elif mode == 'remove':
364                if not self.entryExists(row, site):
365                    num_warns += 1
366                    self.writeFailedRow(
367                        failed_writer, string_row,
368                        "Cannot remove: no such entry.")
369                    continue
370                self.delEntry(row, site)
371            elif mode == 'update':
372                obj = self.getEntry(row, site)
373                if obj is None:
374                    num_warns += 1
375                    self.writeFailedRow(
376                        failed_writer, string_row,
377                        "Cannot update: no such entry.")
378                    continue
379                update_errors = self.checkUpdateRequirements(obj, row, site)
380                if update_errors is not None:
381                    num_warns += 1
382                    self.writeFailedRow(
383                        failed_writer, string_row, update_errors)
384                    continue
385                self.updateEntry(obj, row, site)
386            finished_writer.writerow(string_row)
387
388        time_end = time.time()
389        timedelta = time_end - time_start
390
391        self.createLogfile(path, failed_path, num, num_warns, mode, user,
392                           timedelta, logger=logger)
393        failed_path = os.path.abspath(failed_path)
394        if num_warns == 0:
395            del failed_writer
396            os.unlink(failed_path)
397            failed_path = None
398        return (num, num_warns,
399                os.path.abspath(finished_path), failed_path)
400
401    def get_csv_skeleton(self):
402        """Export CSV file only with a header of available fields.
403
404        A raw string with CSV data should be returned.
405        """
406        outfile = StringIO()
407        writer = csv.DictWriter(outfile, self.available_fields)
408        writer.writerow(dict(zip(self.available_fields, self.available_fields))) # header
409        outfile.seek(0)
410        return outfile.read()
411
412class ExporterBase(object):
413    """A base for exporters.
414    """
415    grok.implements(ICSVExporter)
416
417    #: Fieldnames considered by this exporter
418    fields = ('code', 'title', 'title_prefix')
419
420    #: The title under which this exporter will be displayed
421    #: (if registered as a utility)
422    title = 'Override this title'
423
424    def mangle_value(self, value, name, context=None):
425        """Hook for mangling values in derived classes
426        """
427        if isinstance(value, bool):
428            value = value and '1' or '0'
429        elif isinstance(value, unicode):
430            # CSV writers like byte streams better than unicode
431            value = value.encode('utf-8')
432        elif isinstance(value, datetime.datetime):
433            value = str(value)
434        elif isinstance(value, datetime.date):
435            # Order is important here: check for date after datetime as
436            # datetimes are also dates.
437            #
438            # Append hash '#' to dates to circumvent unwanted excel automatic
439            value = str('%s#' % value)
440        elif value is None:
441            # None is not really representable in CSV files
442            value = ''
443        return value
444
445    def get_csv_writer(self, filepath=None):
446        """Get a CSV dict writer instance open for writing.
447
448        Returns a tuple (<writer>, <outfile>) where ``<writer>`` is a
449        :class:`csv.DictWriter` instance and outfile is the real file
450        which is written to. The latter is important when writing to
451        StringIO and can normally be ignored otherwise.
452
453        The returned file will already be filled with the header row.
454
455        Please note that if you give a filepath, the returned outfile
456        is open for writing only and you might have to close it before
457        reopening it for reading.
458        """
459        if filepath is None:
460            outfile = StringIO()
461        else:
462            outfile = open(filepath, 'wb')
463        writer = csv.DictWriter(outfile, self.fields)
464        writer.writerow(dict(zip(self.fields, self.fields))) # header
465        return writer, outfile
466
467    def write_item(self, obj, writer):
468        """Write a row extracted from `obj` into CSV file using `writer`.
469        """
470        row = {}
471        for name in self.fields:
472            value = getattr(obj, name, None)
473            value = self.mangle_value(value, name, obj)
474            row[name] = value
475        writer.writerow(row)
476        return
477
478    def close_outfile(self, filepath, outfile):
479        """Close outfile.
480
481        If filepath is None, the contents of outfile is returned.
482        """
483        outfile.seek(0)
484        if filepath is None:
485            return outfile.read()
486        outfile.close()
487        return
488
489    def export(self, iterable, filepath=None):
490        """Export `iterable` as CSV file.
491
492        If `filepath` is ``None``, a raw string with CSV data should
493        be returned.
494        """
495        raise NotImplementedError
496
497    def export_all(self, site, filepath=None):
498        """Export all appropriate objects in `site` into `filepath` as
499        CSV data.
500
501        If `filepath` is ``None``, a raw string with CSV data should
502        be returned.
503        """
504        raise NotImplementedError
505
506
507def export_job(site, exporter_name):
508    """Export all entries delivered by exporter and store it in a temp file.
509
510    `site` gives the site to search. It will be passed to the exporter
511    and also be set as 'current site' as the function is used in
512    asynchronous jobs which run in their own threads and have no site
513    set initially. Therefore `site` must also be a valid value for use
514    with `zope.component.hooks.setSite()`.
515
516    `exporter_name` is the utility name under which the desired
517    exporter was registered with the ZCA.
518
519    The resulting CSV file will be stored in a new temporary directory
520    (using :func:`tempfile.mkdtemp`).
521
522    Returns the path to the created CSV file.
523
524    .. note:: It is the callers responsibility to clean up the used
525              file and its parent directory.
526    """
527    setSite(site)
528    exporter = getUtility(ICSVExporter, name=exporter_name)
529    output_dir = tempfile.mkdtemp()
530    output_path = os.path.join(output_dir, 'export.csv')
531    exporter.export_all(site, filepath=output_path)
532    return output_path
533
534class AsyncExportJob(AsyncJob):
535    """An IJob that exports data to CSV files.
536
537    `AsyncExportJob` instances are regular `AsyncJob` instances with a
538    different constructor API. Instead of a callable to execute, you
539    must pass a `site` and some `exporter_name` to trigger an export.
540
541    The real work is done when an instance of this class is put into a
542    queue. See :mod:`waeup.kofa.async` to learn more about
543    asynchronous jobs.
544
545    The `exporter_name` must be the name under which an ICSVExporter
546    utility was registered with the ZCA.
547
548    The `site` must be a valid site  or ``None``.
549
550    The result of an `AsyncExportJob` is the path to generated CSV
551    file. The file will reside in a temporary directory that should be
552    removed after being used.
553    """
554    grok.implements(IExportJob)
555
556    def __init__(self, site, exporter_name):
557        super(AsyncExportJob, self).__init__(
558            export_job, site, exporter_name)
559
560class ExportJobContainer(object):
561    """A mix-in that provides functionality for asynchronous export jobs.
562    """
563    grok.implements(IExportJobContainer)
564    running_exports = PersistentList()
565
566    def start_export_job(self, exporter_name, user_id):
567        """Start asynchronous export job.
568
569        `exporter_name` is the name of an exporter utility to be used.
570
571        `user_id` is the ID of the user that triggers the export.
572
573        The job_id is stored along with exporter name and user id in a
574        persistent list.
575
576        Returns the job ID of the job started.
577        """
578        site = grok.getSite()
579        manager = getUtility(IJobManager)
580        job = AsyncExportJob(site, exporter_name)
581        job_id = manager.put(job)
582        # Make sure that the persisted list is stored in ZODB
583        self.running_exports = PersistentList(self.running_exports)
584        self.running_exports.append((job_id, exporter_name, user_id))
585        return job_id
586
587    def get_running_export_jobs(self, user_id=None):
588        """Get export jobs for user with `user_id` as list of tuples.
589
590        Each tuples holds ``<job_id>, <exporter_name>, <user_id>`` in
591        that order. The ``<exporter_name>`` is the utility name of the
592        used exporter.
593
594        If `user_id` is ``None``, all running jobs are returned.
595        """
596        entries = []
597        to_delete = []
598        manager = getUtility(IJobManager)
599        for entry in self.running_exports:
600            if user_id is not None and entry[2] != user_id:
601                continue
602            if manager.get(entry[0]) is None:
603                to_delete.append(entry)
604                continue
605            entries.append(entry)
606        if to_delete:
607            self.running_exports = PersistentList(
608                [x for x in self.running_exports if x not in to_delete])
609        return entries
610
611    def get_export_jobs_status(self, user_id=None):
612        """Get running/completed export jobs for `user_id` as list of tuples.
613
614        Each tuple holds ``<raw status>, <status translated>,
615        <exporter title>`` in that order, where ``<status
616        translated>`` and ``<exporter title>`` are translated strings
617        representing the status of the job and the human readable
618        title of the exporter used.
619        """
620        entries = self.get_running_export_jobs(user_id)
621        result = []
622        manager = getUtility(IJobManager)
623        for entry in entries:
624            job = manager.get(entry[0])
625            if job is None:
626                continue
627            status, status_translated = JOB_STATUS_MAP[job.status]
628            exporter_name = getUtility(ICSVExporter, name=entry[1]).title
629            result.append((status, status_translated, exporter_name))
630        return result
631
632    def delete_export_entry(self, entry):
633        """Delete the export denoted by `entry`.
634
635        Removes given entry from the local `running_exports` list and also
636        removes the regarding job via the local job manager.
637
638        `entry` must be a tuple ``(<job id>, <exporter name>, <user
639        id>)`` as created by :meth:`start_export_job` or returned by
640        :meth:`get_running_export_jobs`.
641        """
642        manager = getUtility(IJobManager)
643        job = manager.get(entry[0])
644        if job is not None:
645            # remove created export file
646            if isinstance(job.result, basestring):
647                if os.path.exists(os.path.dirname(job.result)):
648                    shutil.rmtree(os.path.dirname(job.result))
649        manager.remove(entry[0], self)
650        new_entries = [x for x in self.running_exports
651                       if x != entry]
652        self.running_exports = PersistentList(new_entries)
653        return
654
655    def entry_from_job_id(self, job_id):
656        """Get entry tuple for `job_id`.
657
658        Returns ``None`` if no such entry can be found.
659        """
660        for entry in self.running_exports:
661            if entry[0] == job_id:
662                return entry
663        return None
Note: See TracBrowser for help on using the repository browser.