source: main/waeup.kofa/trunk/src/waeup/kofa/applicants/batching.py @ 10650

Last change on this file since 10650 was 10613, checked in by Henrik Bettermann, 11 years ago

If certificate was given in import file but the parent container of an applicant doesn't exist , conversion checking exited with a traceback. We are now catching this error.

  • Property svn:keywords set to Id
File size: 13.5 KB
RevLine 
[7192]1## $Id: batching.py 10613 2013-09-10 16:47:04Z henrik $
[5321]2##
[7192]3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
[5321]4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
[7192]8##
[5321]9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
[7192]13##
[5321]14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
18"""Batch processing for applicants.
19"""
[10028]20import unicodecsv as csv # XXX: csv ops should move to dedicated module.
[5321]21import grok
[7271]22from zope.schema import getFields
[5321]23from zope.interface import Interface
[7268]24from zope.component import queryUtility
[8290]25from hurry.workflow.interfaces import IWorkflowState
[7268]26from zope.catalog.interfaces import ICatalog
[7811]27from waeup.kofa.interfaces import (
[8290]28    IBatchProcessor, IObjectConverter, FatalCSVError, IGNORE_MARKER,
29    IObjectHistory, IUserAccount)
30from waeup.kofa.interfaces import MessageFactory as _
[7811]31from waeup.kofa.utils.batching import BatchProcessor
32from waeup.kofa.applicants.interfaces import (
[7268]33    IApplicantsContainer, IApplicant, IApplicantUpdateByRegNo)
[8336]34from waeup.kofa.applicants.workflow import  IMPORTABLE_STATES, CREATED
[5321]35
[7933]36class ApplicantsContainerProcessor(BatchProcessor):
37    """A processor for applicants containers.
[5321]38    """
[5474]39    grok.implements(IBatchProcessor)
[5321]40    grok.provides(IBatchProcessor)
41    grok.context(Interface)
[8987]42    util_name = 'applicantscontainerprocessor'
[5321]43    grok.name(util_name)
44
[7933]45    name = u'Applicants Container Processor'
[5475]46    mode = u'create'
[6251]47    iface = IApplicantsContainer
[5321]48
[6251]49    location_fields = ['code',]
[6282]50    factory_name = 'waeup.ApplicantsContainer'
[5321]51
52    def parentsExist(self, row, site):
[6251]53        return 'applicants' in site.keys()
[5321]54
55    def entryExists(self, row, site):
[6251]56        return row['code'] in site['applicants'].keys()
[5321]57
58    def getParent(self, row, site):
[6251]59        return site['applicants']
[5321]60
61    def getEntry(self, row, site):
62        if not self.entryExists(row, site):
63            return None
64        parent = self.getParent(row, site)
[6251]65        return parent.get(row['code'])
[5321]66
67    def addEntry(self, obj, row, site):
68        parent = self.getParent(row, site)
[6251]69        parent[row['code']] = obj
[5321]70        return
71
72    def delEntry(self, row, site):
73        parent = self.getParent(row, site)
[6251]74        del parent[row['code']]
[5321]75        return
[7262]76
[7933]77class ApplicantProcessor(BatchProcessor):
[7262]78    """A batch processor for IApplicant objects.
[8331]79
80    In create mode container_code is required. If application_number is given
81    an applicant with this number is created in the designated container.
82    If application_number is not given a random application_number is assigned.
83    applicant_id is being determined by the system and can't be imported.
84
85    In update or remove mode container_code and application_number columns
86    must not exist. The applicant object is solely searched by its applicant_id
87    or reg_number.
[7262]88    """
89    grok.implements(IBatchProcessor)
90    grok.provides(IBatchProcessor)
91    grok.context(Interface)
[7933]92    util_name = 'applicantprocessor'
[7262]93    grok.name(util_name)
[7933]94    name = u'Applicant Processor'
[7262]95    iface = IApplicant
[8581]96    iface_byregnumber = IApplicantUpdateByRegNo
[8331]97    location_fields = ['']
[7262]98    factory_name = 'waeup.Applicant'
99
100    mode = None
101
102    @property
[7268]103    def available_fields(self):
104        return sorted(list(set(
[8331]105            ['application_number',
[8290]106            'container_code','state','password'] + getFields(
[7268]107                self.iface).keys())))
[7262]108
[7268]109    def checkHeaders(self, headerfields, mode='create'):
[8331]110        cond1 = 'container_code' in headerfields
111        cond2 = 'application_number' in headerfields
112        cond3 = 'applicant_id' in headerfields
113        cond4 = 'reg_number' in headerfields
[7268]114        if mode == 'create':
[8331]115            if not cond1:
116                raise FatalCSVError(
117                    "Need at least container_code column!")
118            if cond3:
119                raise FatalCSVError(
120                    "applicant_id can't be imported in create mode!")
[7268]121            for field in self.required_fields:
122                if not field in headerfields:
123                    raise FatalCSVError(
124                        "Need at least columns %s for import!" %
125                        ', '.join(["'%s'" % x for x in self.required_fields]))
[8331]126        if mode in ('update', 'remove'):
127            if not cond3 and not cond4:
128                raise FatalCSVError(
129                    "Need at least column reg_number or applicant_id!")
130            if cond1 or cond2:
131                raise FatalCSVError(
132                    "container_code or application_number can't be imported " +
133                    "in update or remove mode!")
[7268]134        # Check for fields to be ignored...
135        not_ignored_fields = [x for x in headerfields
136                              if not x.startswith('--')]
137        if len(set(not_ignored_fields)) < len(not_ignored_fields):
138            raise FatalCSVError(
139                "Double headers: each column name may only appear once.")
140        return True
[7262]141
[7268]142    def getLocator(self, row):
[8331]143        if row.get('container_code', None) not in (IGNORE_MARKER, None):
[9250]144            # create mode
[8331]145            return 'container_code'
146        elif row.get('applicant_id', None) not in (IGNORE_MARKER, None):
[9250]147            # update or remove mode
[8331]148            return 'applicant_id'
[8236]149        elif row.get('reg_number', None) not in (IGNORE_MARKER, None):
[9250]150            # update or remove mode
[7270]151            return 'reg_number'
[7268]152        else:
153            return None
[7262]154
155    def getParent(self, row, site):
[8615]156        result = None
[8331]157        if self.getLocator(row) == 'container_code':
[8615]158            result = site['applicants'].get(row['container_code'], None)
159        elif self.getLocator(row) == 'reg_number':
[8331]160            reg_number = row['reg_number']
161            cat = queryUtility(ICatalog, name='applicants_catalog')
162            results = list(
163                cat.searchResults(reg_number=(reg_number, reg_number)))
164            if results:
[8615]165                result = results[0].__parent__
166        elif self.getLocator(row) == 'applicant_id':
[8331]167            applicant_id = row['applicant_id']
168            cat = queryUtility(ICatalog, name='applicants_catalog')
169            results = list(
170                cat.searchResults(applicant_id=(applicant_id, applicant_id)))
171            if results:
[8615]172                result = results[0].__parent__
173        return result
[7262]174
[7268]175    def parentsExist(self, row, site):
176        return self.getParent(row, site) is not None
177
[7262]178    def getEntry(self, row, site):
[8331]179        if self.getLocator(row) == 'container_code':
180            if row.get('application_number', None) not in (IGNORE_MARKER, None):
181                if not self.parentsExist(row, site):
182                    return None
183                parent = self.getParent(row, site)
184                return parent.get(row['application_number'])
[7264]185            return None
[8331]186        if self.getLocator(row) == 'applicant_id':
187            applicant_id = row['applicant_id']
188            cat = queryUtility(ICatalog, name='applicants_catalog')
189            results = list(
190                cat.searchResults(applicant_id=(applicant_id, applicant_id)))
191            if results:
192                return results[0]
193        if self.getLocator(row) == 'reg_number':
[7270]194            reg_number = row['reg_number']
[7268]195            cat = queryUtility(ICatalog, name='applicants_catalog')
196            results = list(
[7270]197                cat.searchResults(reg_number=(reg_number, reg_number)))
[7268]198            if results:
199                return results[0]
200        return None
[7262]201
[7268]202    def entryExists(self, row, site):
203        return self.getEntry(row, site) is not None
204
[7262]205    def addEntry(self, obj, row, site):
206        parent = self.getParent(row, site)
207        parent.addApplicant(obj)
[8334]208        #parent.__parent__.logger.info(
209        #    'Applicant imported: %s' % obj.applicant_id)
[8290]210        history = IObjectHistory(obj)
[8334]211        history.addMessage(_('Application record imported'))
[7262]212        return
213
214    def delEntry(self, row, site):
[7268]215        applicant = self.getEntry(row, site)
216        if applicant is not None:
[8331]217            parent = applicant.__parent__
[7268]218            del parent[applicant.application_number]
[8334]219            #parent.__parent__.logger.info(
220            #    'Applicant removed: %s' % applicant.applicant_id)
[7262]221        pass
[7268]222
[9706]223    def updateEntry(self, obj, row, site, filename):
[8290]224        """Update obj to the values given in row.
225        """
226        items_changed = ''
227        # Remove application_number from row if empty
[9701]228        if 'application_number' in row and row['application_number'] in (
[8290]229            None, IGNORE_MARKER):
230            row.pop('application_number')
231
232        # Update applicant_id fom application_number and container code
233        # if application_number is given
[9701]234        if 'application_number' in row:
[8290]235            obj.applicant_id = u'%s_%s' % (
236                row['container_code'], row['application_number'])
[8334]237            items_changed += ('%s=%s, ' % ('applicant_id', obj.applicant_id))
[8290]238            row.pop('application_number')
239
240        # Update password
[9701]241        if 'password' in row:
[8334]242            passwd = row.get('password', IGNORE_MARKER)
243            if passwd not in ('', IGNORE_MARKER):
[8348]244                if passwd.startswith('{SSHA}'):
245                    # already encrypted password
246                    obj.password = passwd
247                else:
248                    # not yet encrypted password
249                    IUserAccount(obj).setPassword(passwd)
[8334]250                items_changed += ('%s=%s, ' % ('password', passwd))
[8290]251            row.pop('password')
252
253        # Update registration state
[9701]254        if 'state' in row:
[8334]255            state = row.get('state', IGNORE_MARKER)
256            if state not in (IGNORE_MARKER, ''):
257                IWorkflowState(obj).setState(state)
258                msg = _("State '${a}' set", mapping = {'a':state})
259                history = IObjectHistory(obj)
260                history.addMessage(msg)
261                items_changed += ('%s=%s, ' % ('state', state))
[8290]262            row.pop('state')
263
264        # apply other values...
[8334]265        items_changed += super(ApplicantProcessor, self).updateEntry(
[9706]266            obj, row, site, filename)
[8290]267
268        # Log actions...
269        parent = self.getParent(row, site)
[8334]270        if self.getLocator(row) == 'container_code':
[8290]271            parent.__parent__.logger.info(
[9706]272                '%s - %s - imported: %s' % (self.name, filename, items_changed))
[8290]273        else:
[8334]274            parent.__parent__.logger.info(
[9706]275                '%s - %s - updated: %s' % (self.name, filename, items_changed))
[8290]276        return items_changed
277
[7268]278    def getMapping(self, path, headerfields, mode):
279        """Get a mapping from CSV file headerfields to actually used fieldnames.
280        """
281        result = dict()
282        reader = csv.reader(open(path, 'rb'))
283        raw_header = reader.next()
284        for num, field in enumerate(headerfields):
[8331]285            if field not in ['applicant_id', 'reg_number'] and mode == 'remove':
[7268]286                continue
287            if field == u'--IGNORE--':
288                # Skip ignored columns in failed and finished data files.
289                continue
290            result[raw_header[num]] = field
291        return result
292
293    def checkConversion(self, row, mode='create'):
294        """Validates all values in row.
295        """
[8331]296        iface = self.iface
297        if self.getLocator(row) == 'reg_number' or mode == 'remove':
[8581]298            iface = self.iface_byregnumber
[7268]299        converter = IObjectConverter(iface)
300        errs, inv_errs, conv_dict =  converter.fromStringDict(
[8223]301            row, self.factory_name, mode=mode)
[8615]302        cert = conv_dict.get('course1', None)
303        if cert is not None and (mode in ('create', 'update')):
[8617]304            # course1 application category must match container's.
305            parent = self.getParent(row, self.site)
[10613]306            if parent is None:
307                errs.append(('container', 'not found'))
308            elif cert.application_category != parent.application_category:
[8615]309                errs.append(('course1', 'wrong application category'))
[9701]310        if 'state' in row and \
[8290]311            not row['state'] in IMPORTABLE_STATES:
312            if row['state'] not in (IGNORE_MARKER, ''):
313                errs.append(('state','not allowed'))
314            else:
315                # state is an attribute of Applicant and must not
316                # be changed if empty
317                conv_dict['state'] = IGNORE_MARKER
[8331]318        application_number = row.get('application_number', None)
[8290]319        if application_number in (IGNORE_MARKER, ''):
320                conv_dict['application_number'] = IGNORE_MARKER
[7268]321        return errs, inv_errs, conv_dict
[8336]322
323    def checkUpdateRequirements(self, obj, row, site):
324        """Checks requirements the object must fulfill when being updated.
325
326        This method is not used in case of deleting or adding objects.
327
328        Returns error messages as strings in case of requirement
329        problems.
330        """
331        if obj.state == CREATED:
332            return 'Applicant is blocked.'
333        return None
[8617]334
335    def doImport(self, *args, **kw):
336        # XXX: Not thread-safe.  Parallel applicant imports into
337        # different sites could mean a mess.  Luckily this is not a
338        # typical use-case. On the other hand it spares thousands of
339        # site lookups during large imports.
340        # XXX: Maybe this should go into Importer base.
341        self.site = grok.getSite() # needed by checkConversion()
342        return super(ApplicantProcessor, self).doImport(*args, **kw)
Note: See TracBrowser for help on using the repository browser.