source: main/waeup.kofa/trunk/src/waeup/kofa/utils/batching.py @ 9776

Last change on this file since 9776 was 9739, checked in by Henrik Bettermann, 12 years ago

Put all information into a single logfile line. Otherwise we can't find the information via the UI.

  • Property svn:keywords set to Id
File size: 25.9 KB
Line 
1## $Id: batching.py 9739 2012-11-29 18:46:54Z henrik $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
18"""Kofa components for batch processing.
19
20Batch processors eat CSV files to add, update or remove large numbers
21of certain kinds of objects at once.
22"""
23import grok
24import csv
25import datetime
26import os
27import shutil
28import tempfile
29import time
30from cStringIO import StringIO
31from persistent.list import PersistentList
32from zope.component import createObject, getUtility
33from zope.component.hooks import setSite
34from zope.interface import Interface, implementer
35from zope.schema import getFields
36from zope.event import notify
37from waeup.kofa.async import AsyncJob
38from waeup.kofa.interfaces import (
39    IBatchProcessor, FatalCSVError, IObjectConverter, IJobManager,
40    ICSVExporter, IGNORE_MARKER, DuplicationError, JOB_STATUS_MAP,
41    IExportJobContainer, IExportJob, IExportContainerFinder)
42
43class BatchProcessor(grok.GlobalUtility):
44    """A processor to add, update, or remove data.
45
46    This is a non-active baseclass.
47    """
48    grok.implements(IBatchProcessor)
49    grok.context(Interface)
50    grok.baseclass()
51
52    # Name used in pages and forms...
53    name = u'Non-registered base processor'
54
55    # Internal name...
56    util_name = 'baseprocessor'
57
58    # Items for this processor need an interface with zope.schema fields.
59    iface = Interface
60
61    # The name must be the same as the util_name attribute in order to
62    # register this utility correctly.
63    grok.name(util_name)
64
65    # Headers needed to locate items...
66    location_fields = ['code', 'faculty_code']
67
68    # A factory with this name must be registered...
69    factory_name = 'waeup.Department'
70
71    @property
72    def required_fields(self):
73        """Required fields that have no default.
74
75        A list of names of field, whose value cannot be set if not
76        given during creation. Therefore these fields must exist in
77        input.
78
79        Fields with a default != missing_value do not belong to this
80        category.
81        """
82        result = []
83        for key, field in getFields(self.iface).items():
84            if key in self.location_fields:
85                continue
86            if field.default is not field.missing_value:
87                continue
88            if field.required:
89                result.append(key)
90        return result
91
92    @property
93    def req(self):
94        result = dict(
95            create = self.location_fields + self.required_fields,
96            update = self.location_fields,
97            remove = self.location_fields,
98        )
99        return result
100
101    @property
102    def available_fields(self):
103        return sorted(list(set(
104                    self.location_fields + getFields(self.iface).keys())))
105
106    def getHeaders(self, mode='create'):
107        return self.available_fields
108
109    def checkHeaders(self, headerfields, mode='create'):
110        req = self.req[mode]
111        # Check for required fields...
112        for field in req:
113            if not field in headerfields:
114                raise FatalCSVError(
115                    "Need at least columns %s for import!" %
116                    ', '.join(["'%s'" % x for x in req]))
117        # Check for double fields. Cannot happen because this error is
118        # already catched in views
119        not_ignored_fields = [x for x in headerfields
120                              if not x.startswith('--')]
121        if len(set(not_ignored_fields)) < len(not_ignored_fields):
122            raise FatalCSVError(
123                "Double headers: each column name may only appear once.")
124        return True
125
126    def applyMapping(self, row, mapping):
127        """Apply mapping to a row of CSV data.
128
129        """
130        result = dict()
131        for key, replacement in mapping.items():
132            if replacement == u'--IGNORE--':
133                # Skip ignored columns in failed and finished data files.
134                continue
135            result[replacement] = row[key]
136        return result
137
138    def getMapping(self, path, headerfields, mode):
139        """Get a mapping from CSV file headerfields to actually used fieldnames.
140
141        """
142        result = dict()
143        reader = csv.reader(open(path, 'rb'))
144        raw_header = reader.next()
145        for num, field in enumerate(headerfields):
146            if field not in self.location_fields and mode == 'remove':
147                # Skip non-location fields when removing.
148                continue
149            if field == u'--IGNORE--':
150                # Skip ignored columns in failed and finished data files.
151                continue
152            result[raw_header[num]] = field
153        return result
154
155    def stringFromErrs(self, errors, inv_errors):
156        result = []
157        for err in errors:
158            fieldname, message = err
159            result.append("%s: %s" % (fieldname, message))
160        for err in inv_errors:
161            result.append("invariant: %s" % err)
162        return '; '.join(result)
163
164    def callFactory(self, *args, **kw):
165        return createObject(self.factory_name)
166
167    def parentsExist(self, row, site):
168        """Tell whether the parent object for data in ``row`` exists.
169        """
170        raise NotImplementedError('method not implemented')
171
172    def entryExists(self, row, site):
173        """Tell whether there already exists an entry for ``row`` data.
174        """
175        raise NotImplementedError('method not implemented')
176
177    def getParent(self, row, site):
178        """Get the parent object for the entry in ``row``.
179        """
180        raise NotImplementedError('method not implemented')
181
182    def getEntry(self, row, site):
183        """Get the parent object for the entry in ``row``.
184        """
185        raise NotImplementedError('method not implemented')
186
187    def addEntry(self, obj, row, site):
188        """Add the entry given given by ``row`` data.
189        """
190        raise NotImplementedError('method not implemented')
191
192    def delEntry(self, row, site):
193        """Delete entry given by ``row`` data.
194        """
195        raise NotImplementedError('method not implemented')
196
197    def checkUpdateRequirements(self, obj, row, site):
198        """Checks requirements the object must fulfill when being updated.
199
200        This method is not used in case of deleting or adding objects.
201
202        Returns error messages as strings in case of requirement
203        problems.
204        """
205        return None
206
207    def updateEntry(self, obj, row, site, filename):
208        """Update obj to the values given in row.
209
210        Returns a string describing the fields changed.
211        """
212        changed = []
213        for key, value in row.items():
214            # Skip fields to be ignored.
215            if value == IGNORE_MARKER:
216                continue
217            # Skip fields not declared in interface and which are
218            # not yet attributes of existing objects. We can thus not
219            # add non-existing attributes here.
220            if not hasattr(obj, key):
221                continue
222            try:
223                setattr(obj, key, value)
224            except AttributeError:
225                # Computed attributes can't be set.
226                continue
227            log_value = getattr(value, 'code', value)
228            changed.append('%s=%s' % (key, log_value))
229
230        # If any catalog is involved it must be updated.
231        #
232        # XXX: The event is also triggered when creating objects as
233        # updateEntry is called also when creating entries resulting
234        # in objectAdded and additional objectModified events.
235        if len(changed):
236            notify(grok.ObjectModifiedEvent(obj))
237
238        return ', '.join(changed)
239
240    def createLogfile(self, path, fail_path, num, warnings, mode, user,
241                      timedelta, logger=None):
242        """Write to log file.
243        """
244        if logger is None:
245            return
246        logger.info(
247            "processed: %s, %s mode, %s lines (%s successful/ %s failed), "
248            "%0.3f s (%0.4f s/item)" % (
249            path, mode, num, num - warnings, warnings,
250            timedelta, timedelta/(num or 1)))
251        return
252
253    def writeFailedRow(self, writer, row, warnings):
254        """Write a row with error messages to error CSV.
255
256        If warnings is a list of strings, they will be concatenated.
257        """
258        error_col = warnings
259        if isinstance(warnings, list):
260            error_col = ' / '.join(warnings)
261        row['--ERRORS--'] = error_col
262        writer.writerow(row)
263        return
264
265    def checkConversion(self, row, mode='ignore', ignore_empty=True):
266        """Validates all values in row.
267        """
268        converter = IObjectConverter(self.iface)
269        errs, inv_errs, conv_dict =  converter.fromStringDict(
270            row, self.factory_name, mode=mode)
271        return errs, inv_errs, conv_dict
272
273    def doImport(self, path, headerfields, mode='create', user='Unknown',
274                 logger=None, ignore_empty=True):
275        """Perform actual import.
276        """
277        time_start = time.time()
278        self.checkHeaders(headerfields, mode)
279        mapping = self.getMapping(path, headerfields, mode)
280        reader = csv.DictReader(open(path, 'rb'))
281
282        temp_dir = tempfile.mkdtemp()
283
284        base = os.path.basename(path)
285        (base, ext) = os.path.splitext(base)
286        failed_path = os.path.join(temp_dir, "%s.pending%s" % (base, ext))
287        failed_headers = mapping.values()
288        failed_headers.append('--ERRORS--')
289        failed_writer = csv.DictWriter(open(failed_path, 'wb'),
290                                       failed_headers)
291        os.chmod(failed_path, 0664)
292        failed_writer.writerow(dict([(x,x) for x in failed_headers]))
293
294        finished_path = os.path.join(temp_dir, "%s.finished%s" % (base, ext))
295        finished_headers = mapping.values()
296        finished_writer = csv.DictWriter(open(finished_path, 'wb'),
297                                         finished_headers)
298        os.chmod(finished_path, 0664)
299        finished_writer.writerow(dict([(x,x) for x in finished_headers]))
300
301        num =0
302        num_warns = 0
303        site = grok.getSite()
304
305        for raw_row in reader:
306            num += 1
307            string_row = self.applyMapping(raw_row, mapping)
308            if ignore_empty and mode in ('update',):
309                # replace empty strings with ignore-markers
310                for key, val in string_row.items():
311                    if val == '':
312                        string_row[key] = IGNORE_MARKER
313            row = dict(string_row.items()) # create deep copy
314            errs, inv_errs, conv_dict = self.checkConversion(string_row, mode)
315            if errs or inv_errs:
316                num_warns += 1
317                conv_warnings = self.stringFromErrs(errs, inv_errs)
318                self.writeFailedRow(
319                    failed_writer, string_row, conv_warnings)
320                continue
321            row.update(conv_dict)
322
323            if mode == 'create':
324                if not self.parentsExist(row, site):
325                    num_warns += 1
326                    self.writeFailedRow(
327                        failed_writer, string_row,
328                        "Not all parents do exist yet. Skipping")
329                    continue
330                if self.entryExists(row, site):
331                    num_warns += 1
332                    self.writeFailedRow(
333                        failed_writer, string_row,
334                        "This object already exists. Skipping.")
335                    continue
336                obj = self.callFactory()
337                # Override all values in row, also
338                # student_ids and applicant_ids which have been
339                # generated in the respective __init__ methods before.
340                self.updateEntry(obj, row, site, base)
341                try:
342                    self.addEntry(obj, row, site)
343                except KeyError, error:
344                    num_warns += 1
345                    self.writeFailedRow(
346                        failed_writer, string_row,
347                        "%s Skipping." % error.message)
348                    continue
349                except DuplicationError, error:
350                    num_warns += 1
351                    self.writeFailedRow(
352                        failed_writer, string_row,
353                        "%s Skipping." % error.msg)
354                    continue
355            elif mode == 'remove':
356                if not self.entryExists(row, site):
357                    num_warns += 1
358                    self.writeFailedRow(
359                        failed_writer, string_row,
360                        "Cannot remove: no such entry")
361                    continue
362                self.delEntry(row, site)
363            elif mode == 'update':
364                obj = self.getEntry(row, site)
365                if obj is None:
366                    num_warns += 1
367                    self.writeFailedRow(
368                        failed_writer, string_row,
369                        "Cannot update: no such entry")
370                    continue
371                update_errors = self.checkUpdateRequirements(obj, row, site)
372                if update_errors is not None:
373                    num_warns += 1
374                    self.writeFailedRow(
375                        failed_writer, string_row, update_errors)
376                    continue
377                self.updateEntry(obj, row, site, base)
378            finished_writer.writerow(string_row)
379
380        time_end = time.time()
381        timedelta = time_end - time_start
382
383        self.createLogfile(path, failed_path, num, num_warns, mode, user,
384                           timedelta, logger=logger)
385        failed_path = os.path.abspath(failed_path)
386        if num_warns == 0:
387            del failed_writer
388            os.unlink(failed_path)
389            failed_path = None
390        return (num, num_warns,
391                os.path.abspath(finished_path), failed_path)
392
393    def get_csv_skeleton(self):
394        """Export CSV file only with a header of available fields.
395
396        A raw string with CSV data should be returned.
397        """
398        outfile = StringIO()
399        writer = csv.DictWriter(outfile, self.available_fields)
400        writer.writerow(
401            dict(zip(self.available_fields, self.available_fields))) # header
402        outfile.seek(0)
403        return outfile.read()
404
405class ExporterBase(object):
406    """A base for exporters.
407    """
408    grok.implements(ICSVExporter)
409
410    #: Fieldnames considered by this exporter
411    fields = ('code', 'title', 'title_prefix')
412
413    #: The title under which this exporter will be displayed
414    #: (if registered as a utility)
415    title = 'Override this title'
416
417    def mangle_value(self, value, name, context=None):
418        """Hook for mangling values in derived classes
419        """
420        if isinstance(value, bool):
421            value = value and '1' or '0'
422        elif isinstance(value, unicode):
423            # CSV writers like byte streams better than unicode
424            value = value.encode('utf-8')
425        elif isinstance(value, datetime.datetime):
426            value = str(value)
427        elif isinstance(value, datetime.date):
428            # Order is important here: check for date after datetime as
429            # datetimes are also dates.
430            #
431            # Append hash '#' to dates to circumvent unwanted excel automatic
432            value = str('%s#' % value)
433        elif value is None:
434            # None is not really representable in CSV files
435            value = ''
436        return value
437
438    def get_csv_writer(self, filepath=None):
439        """Get a CSV dict writer instance open for writing.
440
441        Returns a tuple (<writer>, <outfile>) where ``<writer>`` is a
442        :class:`csv.DictWriter` instance and outfile is the real file
443        which is written to. The latter is important when writing to
444        StringIO and can normally be ignored otherwise.
445
446        The returned file will already be filled with the header row.
447
448        Please note that if you give a filepath, the returned outfile
449        is open for writing only and you might have to close it before
450        reopening it for reading.
451        """
452        if filepath is None:
453            outfile = StringIO()
454        else:
455            outfile = open(filepath, 'wb')
456        writer = csv.DictWriter(outfile, self.fields)
457        writer.writerow(dict(zip(self.fields, self.fields))) # header
458        return writer, outfile
459
460    def write_item(self, obj, writer):
461        """Write a row extracted from `obj` into CSV file using `writer`.
462        """
463        row = {}
464        for name in self.fields:
465            value = getattr(obj, name, None)
466            value = self.mangle_value(value, name, obj)
467            row[name] = value
468        writer.writerow(row)
469        return
470
471    def close_outfile(self, filepath, outfile):
472        """Close outfile.
473
474        If filepath is None, the contents of outfile is returned.
475        """
476        outfile.seek(0)
477        if filepath is None:
478            return outfile.read()
479        outfile.close()
480        return
481
482    def export(self, iterable, filepath=None):
483        """Export `iterable` as CSV file.
484
485        If `filepath` is ``None``, a raw string with CSV data should
486        be returned.
487        """
488        raise NotImplementedError
489
490    def export_all(self, site, filepath=None):
491        """Export all appropriate objects in `site` into `filepath` as
492        CSV data.
493
494        If `filepath` is ``None``, a raw string with CSV data should
495        be returned.
496        """
497        raise NotImplementedError
498
499
500def export_job(site, exporter_name):
501    """Export all entries delivered by exporter and store it in a temp file.
502
503    `site` gives the site to search. It will be passed to the exporter
504    and also be set as 'current site' as the function is used in
505    asynchronous jobs which run in their own threads and have no site
506    set initially. Therefore `site` must also be a valid value for use
507    with `zope.component.hooks.setSite()`.
508
509    `exporter_name` is the utility name under which the desired
510    exporter was registered with the ZCA.
511
512    The resulting CSV file will be stored in a new temporary directory
513    (using :func:`tempfile.mkdtemp`). It will be named after the
514    exporter used with `.csv` filename extension.
515
516    Returns the path to the created CSV file.
517
518    .. note:: It is the callers responsibility to clean up the used
519              file and its parent directory.
520    """
521    setSite(site)
522    exporter = getUtility(ICSVExporter, name=exporter_name)
523    output_dir = tempfile.mkdtemp()
524    filename = '%s.csv' % exporter_name
525    output_path = os.path.join(output_dir, filename)
526    exporter.export_all(site, filepath=output_path)
527    return output_path
528
529class AsyncExportJob(AsyncJob):
530    """An IJob that exports data to CSV files.
531
532    `AsyncExportJob` instances are regular `AsyncJob` instances with a
533    different constructor API. Instead of a callable to execute, you
534    must pass a `site` and some `exporter_name` to trigger an export.
535
536    The real work is done when an instance of this class is put into a
537    queue. See :mod:`waeup.kofa.async` to learn more about
538    asynchronous jobs.
539
540    The `exporter_name` must be the name under which an ICSVExporter
541    utility was registered with the ZCA.
542
543    The `site` must be a valid site  or ``None``.
544
545    The result of an `AsyncExportJob` is the path to generated CSV
546    file. The file will reside in a temporary directory that should be
547    removed after being used.
548    """
549    grok.implements(IExportJob)
550
551    def __init__(self, site, exporter_name, *args, **kwargs):
552        super(AsyncExportJob, self).__init__(
553            export_job, site, exporter_name, *args, **kwargs)
554
555class ExportJobContainer(object):
556    """A mix-in that provides functionality for asynchronous export jobs.
557    """
558    grok.implements(IExportJobContainer)
559    running_exports = PersistentList()
560
561    def start_export_job(self, exporter_name, user_id, *args, **kwargs):
562        """Start asynchronous export job.
563
564        `exporter_name` is the name of an exporter utility to be used.
565
566        `user_id` is the ID of the user that triggers the export.
567
568        The job_id is stored along with exporter name and user id in a
569        persistent list.
570
571        The method supports additional positional and keyword
572        arguments, which are passed as-is to the respective
573        :class:`AsyncExportJob`.
574
575        Returns the job ID of the job started.
576        """
577        site = grok.getSite()
578        manager = getUtility(IJobManager)
579        job = AsyncExportJob(site, exporter_name, *args, **kwargs)
580        job_id = manager.put(job)
581        # Make sure that the persisted list is stored in ZODB
582        self.running_exports = PersistentList(self.running_exports)
583        self.running_exports.append((job_id, exporter_name, user_id))
584        return job_id
585
586    def get_running_export_jobs(self, user_id=None):
587        """Get export jobs for user with `user_id` as list of tuples.
588
589        Each tuples holds ``<job_id>, <exporter_name>, <user_id>`` in
590        that order. The ``<exporter_name>`` is the utility name of the
591        used exporter.
592
593        If `user_id` is ``None``, all running jobs are returned.
594        """
595        entries = []
596        to_delete = []
597        manager = getUtility(IJobManager)
598        for entry in self.running_exports:
599            if user_id is not None and entry[2] != user_id:
600                continue
601            if manager.get(entry[0]) is None:
602                to_delete.append(entry)
603                continue
604            entries.append(entry)
605        if to_delete:
606            self.running_exports = PersistentList(
607                [x for x in self.running_exports if x not in to_delete])
608        return entries
609
610    def get_export_jobs_status(self, user_id=None):
611        """Get running/completed export jobs for `user_id` as list of tuples.
612
613        Each tuple holds ``<raw status>, <status translated>,
614        <exporter title>`` in that order, where ``<status
615        translated>`` and ``<exporter title>`` are translated strings
616        representing the status of the job and the human readable
617        title of the exporter used.
618        """
619        entries = self.get_running_export_jobs(user_id)
620        result = []
621        manager = getUtility(IJobManager)
622        for entry in entries:
623            job = manager.get(entry[0])
624            if job is None:
625                continue
626            status, status_translated = JOB_STATUS_MAP[job.status]
627            exporter_name = getUtility(ICSVExporter, name=entry[1]).title
628            result.append((status, status_translated, exporter_name))
629        return result
630
631    def delete_export_entry(self, entry):
632        """Delete the export denoted by `entry`.
633
634        Removes given entry from the local `running_exports` list and also
635        removes the regarding job via the local job manager.
636
637        `entry` must be a tuple ``(<job id>, <exporter name>, <user
638        id>)`` as created by :meth:`start_export_job` or returned by
639        :meth:`get_running_export_jobs`.
640        """
641        manager = getUtility(IJobManager)
642        job = manager.get(entry[0])
643        if job is not None:
644            # remove created export file
645            if isinstance(job.result, basestring):
646                if os.path.exists(os.path.dirname(job.result)):
647                    shutil.rmtree(os.path.dirname(job.result))
648        manager.remove(entry[0], self)
649        new_entries = [x for x in self.running_exports
650                       if x != entry]
651        self.running_exports = PersistentList(new_entries)
652        return
653
654    def entry_from_job_id(self, job_id):
655        """Get entry tuple for `job_id`.
656
657        Returns ``None`` if no such entry can be found.
658        """
659        for entry in self.running_exports:
660            if entry[0] == job_id:
661                return entry
662        return None
663
664class VirtualExportJobContainer(ExportJobContainer):
665    """A virtual export job container.
666
667    Virtual ExportJobContainers can be used as a mixin just like real
668    ExportJobContainer.
669
670    They retrieve and store data in the site-wide ExportJobContainer.
671
672    Functionality is currently entirely as for regular
673    ExportJobContainers, except that data is stored elsewhere.
674
675    VirtualExportJobContainers need a registered
676    IExportContainerFinder utility to find a suitable container for
677    storing data.
678    """
679    grok.implements(IExportJobContainer)
680
681    @property
682    def _site_container(self):
683        return getUtility(IExportContainerFinder)()
684
685    # The following is a simple trick. While ExportJobContainers store
686    # only one attribute in ZODB, it is sufficient to replace this
687    # attribute `running_exports` with a suitable manager to make the
688    # whole virtual container work like the original but with the data
689    # stored in the site-wide exports container. This way, virtual
690    # export containers provide the whole functionality of a regular
691    # exports container but store no data at all with themselves.
692    @property
693    def running_exports(self):
694        """Exports stored in the site-wide exports container.
695        """
696        return self._site_container.running_exports
697
698    @running_exports.setter
699    def running_exports(self, value):
700        self._site_container.running_exports = value
701
702    @running_exports.deleter
703    def running_exports(self):
704        del self._site_container.running_exports
705
706
707@implementer(IExportContainerFinder)
708class ExportContainerFinder(grok.GlobalUtility):
709    """Finder for local (site-wide) export container.
710    """
711
712    def __call__(self):
713        """Get the local export container-
714
715        If no site can be determined or the site provides no export
716        container, None is returned.
717        """
718        site = grok.getSite()
719        if site is None:
720            return None
721        return site.get('datacenter', None)
Note: See TracBrowser for help on using the repository browser.