source: main/waeup.kofa/trunk/src/waeup/kofa/applicants/batching.py @ 8864

Last change on this file since 8864 was 8617, checked in by uli, 13 years ago

Reduce number of site lookups during imports.

  • Property svn:keywords set to Id
File size: 13.5 KB
RevLine 
[7192]1## $Id: batching.py 8617 2012-06-03 13:57:01Z uli $
[5321]2##
[7192]3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
[5321]4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
[7192]8##
[5321]9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
[7192]13##
[5321]14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
18"""Batch processing for applicants.
19"""
[7268]20import csv
[5321]21import grok
[7271]22from zope.schema import getFields
[5321]23from zope.interface import Interface
[7268]24from zope.component import queryUtility
[8290]25from hurry.workflow.interfaces import IWorkflowState
[7268]26from zope.catalog.interfaces import ICatalog
[7811]27from waeup.kofa.interfaces import (
[8290]28    IBatchProcessor, IObjectConverter, FatalCSVError, IGNORE_MARKER,
29    IObjectHistory, IUserAccount)
30from waeup.kofa.interfaces import MessageFactory as _
[7811]31from waeup.kofa.utils.batching import BatchProcessor
32from waeup.kofa.applicants.interfaces import (
[7268]33    IApplicantsContainer, IApplicant, IApplicantUpdateByRegNo)
[8336]34from waeup.kofa.applicants.workflow import  IMPORTABLE_STATES, CREATED
[5321]35
[7933]36class ApplicantsContainerProcessor(BatchProcessor):
37    """A processor for applicants containers.
[5321]38    """
[5474]39    grok.implements(IBatchProcessor)
[5321]40    grok.provides(IBatchProcessor)
41    grok.context(Interface)
[7933]42    util_name = 'applicants container processor'
[5321]43    grok.name(util_name)
44
[7933]45    name = u'Applicants Container Processor'
[5475]46    mode = u'create'
[6251]47    iface = IApplicantsContainer
[5321]48
[6251]49    location_fields = ['code',]
[6282]50    factory_name = 'waeup.ApplicantsContainer'
[5321]51
52    def parentsExist(self, row, site):
[6251]53        return 'applicants' in site.keys()
[5321]54
55    def entryExists(self, row, site):
[6251]56        return row['code'] in site['applicants'].keys()
[5321]57
58    def getParent(self, row, site):
[6251]59        return site['applicants']
[5321]60
61    def getEntry(self, row, site):
62        if not self.entryExists(row, site):
63            return None
64        parent = self.getParent(row, site)
[6251]65        return parent.get(row['code'])
[5321]66
67    def addEntry(self, obj, row, site):
68        parent = self.getParent(row, site)
[6251]69        parent[row['code']] = obj
[5321]70        return
71
72    def delEntry(self, row, site):
73        parent = self.getParent(row, site)
[6251]74        del parent[row['code']]
[5321]75        return
[7262]76
[7933]77class ApplicantProcessor(BatchProcessor):
[7262]78    """A batch processor for IApplicant objects.
[8331]79
80    In create mode container_code is required. If application_number is given
81    an applicant with this number is created in the designated container.
82    If application_number is not given a random application_number is assigned.
83    applicant_id is being determined by the system and can't be imported.
84
85    In update or remove mode container_code and application_number columns
86    must not exist. The applicant object is solely searched by its applicant_id
87    or reg_number.
[7262]88    """
89    grok.implements(IBatchProcessor)
90    grok.provides(IBatchProcessor)
91    grok.context(Interface)
[7933]92    util_name = 'applicantprocessor'
[7262]93    grok.name(util_name)
[7933]94    name = u'Applicant Processor'
[7262]95    iface = IApplicant
[8581]96    iface_byregnumber = IApplicantUpdateByRegNo
[8331]97    location_fields = ['']
[7262]98    factory_name = 'waeup.Applicant'
99
100    mode = None
101
102    @property
[7268]103    def available_fields(self):
104        return sorted(list(set(
[8331]105            ['application_number',
[8290]106            'container_code','state','password'] + getFields(
[7268]107                self.iface).keys())))
[7262]108
[7268]109    def checkHeaders(self, headerfields, mode='create'):
[8331]110        cond1 = 'container_code' in headerfields
111        cond2 = 'application_number' in headerfields
112        cond3 = 'applicant_id' in headerfields
113        cond4 = 'reg_number' in headerfields
[7268]114        if mode == 'create':
[8331]115            if not cond1:
116                raise FatalCSVError(
117                    "Need at least container_code column!")
118            if cond3:
119                raise FatalCSVError(
120                    "applicant_id can't be imported in create mode!")
[7268]121            for field in self.required_fields:
122                if not field in headerfields:
123                    raise FatalCSVError(
124                        "Need at least columns %s for import!" %
125                        ', '.join(["'%s'" % x for x in self.required_fields]))
[8331]126        if mode in ('update', 'remove'):
127            if not cond3 and not cond4:
128                raise FatalCSVError(
129                    "Need at least column reg_number or applicant_id!")
130            if cond1 or cond2:
131                raise FatalCSVError(
132                    "container_code or application_number can't be imported " +
133                    "in update or remove mode!")
[7268]134        # Check for fields to be ignored...
135        not_ignored_fields = [x for x in headerfields
136                              if not x.startswith('--')]
137        if len(set(not_ignored_fields)) < len(not_ignored_fields):
138            raise FatalCSVError(
139                "Double headers: each column name may only appear once.")
140        return True
[7262]141
[7268]142    def getLocator(self, row):
[8331]143        if row.get('container_code', None) not in (IGNORE_MARKER, None):
144            # create, update or remove
145            return 'container_code'
146        elif row.get('applicant_id', None) not in (IGNORE_MARKER, None):
147            # update or remove
148            return 'applicant_id'
[8236]149        elif row.get('reg_number', None) not in (IGNORE_MARKER, None):
[8331]150            # update or remove
[7270]151            return 'reg_number'
[7268]152        else:
153            return None
[7262]154
155    def getParent(self, row, site):
[8615]156        result = None
[8331]157        if self.getLocator(row) == 'container_code':
[8615]158            result = site['applicants'].get(row['container_code'], None)
159        elif self.getLocator(row) == 'reg_number':
[8331]160            reg_number = row['reg_number']
161            cat = queryUtility(ICatalog, name='applicants_catalog')
162            results = list(
163                cat.searchResults(reg_number=(reg_number, reg_number)))
164            if results:
[8615]165                result = results[0].__parent__
166        elif self.getLocator(row) == 'applicant_id':
[8331]167            applicant_id = row['applicant_id']
168            cat = queryUtility(ICatalog, name='applicants_catalog')
169            results = list(
170                cat.searchResults(applicant_id=(applicant_id, applicant_id)))
171            if results:
[8615]172                result = results[0].__parent__
173        return result
[7262]174
[7268]175    def parentsExist(self, row, site):
176        return self.getParent(row, site) is not None
177
[7262]178    def getEntry(self, row, site):
[8331]179        if self.getLocator(row) == 'container_code':
180            if row.get('application_number', None) not in (IGNORE_MARKER, None):
181                if not self.parentsExist(row, site):
182                    return None
183                parent = self.getParent(row, site)
184                return parent.get(row['application_number'])
[7264]185            return None
[8331]186        if self.getLocator(row) == 'applicant_id':
187            applicant_id = row['applicant_id']
188            cat = queryUtility(ICatalog, name='applicants_catalog')
189            results = list(
190                cat.searchResults(applicant_id=(applicant_id, applicant_id)))
191            if results:
192                return results[0]
193        if self.getLocator(row) == 'reg_number':
[7270]194            reg_number = row['reg_number']
[7268]195            cat = queryUtility(ICatalog, name='applicants_catalog')
196            results = list(
[7270]197                cat.searchResults(reg_number=(reg_number, reg_number)))
[7268]198            if results:
199                return results[0]
200        return None
[7262]201
[7268]202    def entryExists(self, row, site):
203        return self.getEntry(row, site) is not None
204
[7262]205    def addEntry(self, obj, row, site):
206        parent = self.getParent(row, site)
207        parent.addApplicant(obj)
[8334]208        #parent.__parent__.logger.info(
209        #    'Applicant imported: %s' % obj.applicant_id)
[8290]210        history = IObjectHistory(obj)
[8334]211        history.addMessage(_('Application record imported'))
[7262]212        return
213
214    def delEntry(self, row, site):
[7268]215        applicant = self.getEntry(row, site)
216        if applicant is not None:
[8331]217            parent = applicant.__parent__
[7268]218            del parent[applicant.application_number]
[8334]219            #parent.__parent__.logger.info(
220            #    'Applicant removed: %s' % applicant.applicant_id)
[7262]221        pass
[7268]222
[8290]223    def updateEntry(self, obj, row, site):
224        """Update obj to the values given in row.
225        """
226        items_changed = ''
227        # Remove application_number from row if empty
228        if row.has_key('application_number') and row['application_number'] in (
229            None, IGNORE_MARKER):
230            row.pop('application_number')
231
232        # Update applicant_id fom application_number and container code
233        # if application_number is given
234        if row.has_key('application_number'):
235            obj.applicant_id = u'%s_%s' % (
236                row['container_code'], row['application_number'])
[8334]237            items_changed += ('%s=%s, ' % ('applicant_id', obj.applicant_id))
[8290]238            row.pop('application_number')
239
240        # Update password
[8334]241        if row.has_key('password'):
242            passwd = row.get('password', IGNORE_MARKER)
243            if passwd not in ('', IGNORE_MARKER):
[8348]244                if passwd.startswith('{SSHA}'):
245                    # already encrypted password
246                    obj.password = passwd
247                else:
248                    # not yet encrypted password
249                    IUserAccount(obj).setPassword(passwd)
[8334]250                items_changed += ('%s=%s, ' % ('password', passwd))
[8290]251            row.pop('password')
252
253        # Update registration state
[8334]254        if row.has_key('state'):
255            state = row.get('state', IGNORE_MARKER)
256            if state not in (IGNORE_MARKER, ''):
257                IWorkflowState(obj).setState(state)
258                msg = _("State '${a}' set", mapping = {'a':state})
259                history = IObjectHistory(obj)
260                history.addMessage(msg)
261                items_changed += ('%s=%s, ' % ('state', state))
[8290]262            row.pop('state')
263
264        # apply other values...
[8334]265        items_changed += super(ApplicantProcessor, self).updateEntry(
[8290]266            obj, row, site)
267
268        # Log actions...
269        parent = self.getParent(row, site)
[8334]270        if self.getLocator(row) == 'container_code':
[8290]271            # Update mode: the applicant exists and we can get the applicant_id
272            parent.__parent__.logger.info(
[8334]273                'Applicant imported: %s' % items_changed)
[8290]274        else:
275            # Create mode: the applicant does not yet exist
[8334]276            parent.__parent__.logger.info(
277                'Applicant updated: %s' % items_changed)
[8290]278        return items_changed
279
[7268]280    def getMapping(self, path, headerfields, mode):
281        """Get a mapping from CSV file headerfields to actually used fieldnames.
282        """
283        result = dict()
284        reader = csv.reader(open(path, 'rb'))
285        raw_header = reader.next()
286        for num, field in enumerate(headerfields):
[8331]287            if field not in ['applicant_id', 'reg_number'] and mode == 'remove':
[7268]288                continue
289            if field == u'--IGNORE--':
290                # Skip ignored columns in failed and finished data files.
291                continue
292            result[raw_header[num]] = field
293        return result
294
295    def checkConversion(self, row, mode='create'):
296        """Validates all values in row.
297        """
[8331]298        iface = self.iface
299        if self.getLocator(row) == 'reg_number' or mode == 'remove':
[8581]300            iface = self.iface_byregnumber
[7268]301        converter = IObjectConverter(iface)
302        errs, inv_errs, conv_dict =  converter.fromStringDict(
[8223]303            row, self.factory_name, mode=mode)
[8615]304        cert = conv_dict.get('course1', None)
305        if cert is not None and (mode in ('create', 'update')):
[8617]306            # course1 application category must match container's.
307            parent = self.getParent(row, self.site)
[8615]308            if cert.application_category != parent.application_category:
309                errs.append(('course1', 'wrong application category'))
[8290]310        if row.has_key('state') and \
311            not row['state'] in IMPORTABLE_STATES:
312            if row['state'] not in (IGNORE_MARKER, ''):
313                errs.append(('state','not allowed'))
314            else:
315                # state is an attribute of Applicant and must not
316                # be changed if empty
317                conv_dict['state'] = IGNORE_MARKER
[8331]318        application_number = row.get('application_number', None)
[8290]319        if application_number in (IGNORE_MARKER, ''):
320                conv_dict['application_number'] = IGNORE_MARKER
[7268]321        return errs, inv_errs, conv_dict
[8336]322
323    def checkUpdateRequirements(self, obj, row, site):
324        """Checks requirements the object must fulfill when being updated.
325
326        This method is not used in case of deleting or adding objects.
327
328        Returns error messages as strings in case of requirement
329        problems.
330        """
331        if obj.state == CREATED:
332            return 'Applicant is blocked.'
333        return None
[8617]334
335    def doImport(self, *args, **kw):
336        # XXX: Not thread-safe.  Parallel applicant imports into
337        # different sites could mean a mess.  Luckily this is not a
338        # typical use-case. On the other hand it spares thousands of
339        # site lookups during large imports.
340        # XXX: Maybe this should go into Importer base.
341        self.site = grok.getSite() # needed by checkConversion()
342        return super(ApplicantProcessor, self).doImport(*args, **kw)
Note: See TracBrowser for help on using the repository browser.