source: main/waeup.kofa/trunk/src/waeup/kofa/applicants/batching.py @ 14416

Last change on this file since 14416 was 13873, checked in by Henrik Bettermann, 9 years ago

Add checkConversion test.

  • Property svn:keywords set to Id
File size: 18.3 KB
RevLine 
[7192]1## $Id: batching.py 13873 2016-06-04 04:43:01Z henrik $
[5321]2##
[7192]3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
[5321]4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
[7192]8##
[5321]9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
[7192]13##
[5321]14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
18"""Batch processing for applicants.
19"""
[10028]20import unicodecsv as csv # XXX: csv ops should move to dedicated module.
[5321]21import grok
[13873]22from time import time
[7271]23from zope.schema import getFields
[5321]24from zope.interface import Interface
[13872]25from zope.component import queryUtility, getUtility
[8290]26from hurry.workflow.interfaces import IWorkflowState
[7268]27from zope.catalog.interfaces import ICatalog
[7811]28from waeup.kofa.interfaces import (
[8290]29    IBatchProcessor, IObjectConverter, FatalCSVError, IGNORE_MARKER,
[13872]30    IObjectHistory, IUserAccount, DuplicationError)
[8290]31from waeup.kofa.interfaces import MessageFactory as _
[13872]32from waeup.kofa.payments.interfaces import IPayer
[7811]33from waeup.kofa.utils.batching import BatchProcessor
34from waeup.kofa.applicants.interfaces import (
[13872]35    IApplicantsContainer, IApplicant, IApplicantUpdateByRegNo,
36    IApplicantOnlinePayment)
[8336]37from waeup.kofa.applicants.workflow import  IMPORTABLE_STATES, CREATED
[5321]38
[7933]39class ApplicantsContainerProcessor(BatchProcessor):
[12869]40    """The Applicants Container Processor imports containers for applicants.
41    It does not import their content. There is nothing special about this
42    processor.
[5321]43    """
[5474]44    grok.implements(IBatchProcessor)
[5321]45    grok.provides(IBatchProcessor)
46    grok.context(Interface)
[8987]47    util_name = 'applicantscontainerprocessor'
[5321]48    grok.name(util_name)
49
[11891]50    name = _('ApplicantsContainer Processor')
[5475]51    mode = u'create'
[6251]52    iface = IApplicantsContainer
[5321]53
[6251]54    location_fields = ['code',]
[6282]55    factory_name = 'waeup.ApplicantsContainer'
[5321]56
57    def parentsExist(self, row, site):
[6251]58        return 'applicants' in site.keys()
[5321]59
60    def entryExists(self, row, site):
[6251]61        return row['code'] in site['applicants'].keys()
[5321]62
63    def getParent(self, row, site):
[6251]64        return site['applicants']
[5321]65
66    def getEntry(self, row, site):
67        if not self.entryExists(row, site):
68            return None
69        parent = self.getParent(row, site)
[6251]70        return parent.get(row['code'])
[5321]71
72    def addEntry(self, obj, row, site):
73        parent = self.getParent(row, site)
[6251]74        parent[row['code']] = obj
[5321]75        return
76
77    def delEntry(self, row, site):
78        parent = self.getParent(row, site)
[6251]79        del parent[row['code']]
[5321]80        return
[7262]81
[7933]82class ApplicantProcessor(BatchProcessor):
[12872]83    """The Applicant Processor imports application data (applicants).
[8331]84
[12869]85    In create mode `container_code` is required. If `application_number` is
86    given, an applicant with this number is created in the designated container.
87    If `application_number` is not given, a random `application_number` is
88    assigned. `applicant_id` is being determined by the system and can't be
89    imported.
[8331]90
[12869]91    In update or remove mode `container_code` and `application_number` columns
[12870]92    must not exist. The applicant object is solely localized by searching
93    the applicants catalog for `reg_number` or `applicant_id` .
[7262]94    """
95    grok.implements(IBatchProcessor)
96    grok.provides(IBatchProcessor)
97    grok.context(Interface)
[7933]98    util_name = 'applicantprocessor'
[7262]99    grok.name(util_name)
[11891]100    name = _('Applicant Processor')
[7262]101    iface = IApplicant
[8581]102    iface_byregnumber = IApplicantUpdateByRegNo
[7262]103    factory_name = 'waeup.Applicant'
104
105    mode = None
106
107    @property
[7268]108    def available_fields(self):
109        return sorted(list(set(
[8331]110            ['application_number',
[8290]111            'container_code','state','password'] + getFields(
[7268]112                self.iface).keys())))
[7262]113
[7268]114    def checkHeaders(self, headerfields, mode='create'):
[8331]115        cond1 = 'container_code' in headerfields
116        cond2 = 'application_number' in headerfields
117        cond3 = 'applicant_id' in headerfields
118        cond4 = 'reg_number' in headerfields
[7268]119        if mode == 'create':
[8331]120            if not cond1:
121                raise FatalCSVError(
122                    "Need at least container_code column!")
123            if cond3:
124                raise FatalCSVError(
125                    "applicant_id can't be imported in create mode!")
[7268]126            for field in self.required_fields:
127                if not field in headerfields:
128                    raise FatalCSVError(
129                        "Need at least columns %s for import!" %
130                        ', '.join(["'%s'" % x for x in self.required_fields]))
[8331]131        if mode in ('update', 'remove'):
132            if not cond3 and not cond4:
133                raise FatalCSVError(
134                    "Need at least column reg_number or applicant_id!")
135            if cond1 or cond2:
136                raise FatalCSVError(
137                    "container_code or application_number can't be imported " +
138                    "in update or remove mode!")
[7268]139        # Check for fields to be ignored...
140        not_ignored_fields = [x for x in headerfields
141                              if not x.startswith('--')]
142        if len(set(not_ignored_fields)) < len(not_ignored_fields):
143            raise FatalCSVError(
144                "Double headers: each column name may only appear once.")
145        return True
[7262]146
[7268]147    def getLocator(self, row):
[8331]148        if row.get('container_code', None) not in (IGNORE_MARKER, None):
[9250]149            # create mode
[8331]150            return 'container_code'
151        elif row.get('applicant_id', None) not in (IGNORE_MARKER, None):
[9250]152            # update or remove mode
[8331]153            return 'applicant_id'
[8236]154        elif row.get('reg_number', None) not in (IGNORE_MARKER, None):
[9250]155            # update or remove mode
[7270]156            return 'reg_number'
[7268]157        else:
158            return None
[7262]159
160    def getParent(self, row, site):
[8615]161        result = None
[8331]162        if self.getLocator(row) == 'container_code':
[8615]163            result = site['applicants'].get(row['container_code'], None)
164        elif self.getLocator(row) == 'reg_number':
[8331]165            reg_number = row['reg_number']
166            cat = queryUtility(ICatalog, name='applicants_catalog')
167            results = list(
168                cat.searchResults(reg_number=(reg_number, reg_number)))
169            if results:
[8615]170                result = results[0].__parent__
171        elif self.getLocator(row) == 'applicant_id':
[8331]172            applicant_id = row['applicant_id']
173            cat = queryUtility(ICatalog, name='applicants_catalog')
174            results = list(
175                cat.searchResults(applicant_id=(applicant_id, applicant_id)))
176            if results:
[8615]177                result = results[0].__parent__
178        return result
[7262]179
[7268]180    def parentsExist(self, row, site):
181        return self.getParent(row, site) is not None
182
[7262]183    def getEntry(self, row, site):
[8331]184        if self.getLocator(row) == 'container_code':
185            if row.get('application_number', None) not in (IGNORE_MARKER, None):
186                if not self.parentsExist(row, site):
187                    return None
188                parent = self.getParent(row, site)
189                return parent.get(row['application_number'])
[7264]190            return None
[8331]191        if self.getLocator(row) == 'applicant_id':
192            applicant_id = row['applicant_id']
193            cat = queryUtility(ICatalog, name='applicants_catalog')
194            results = list(
195                cat.searchResults(applicant_id=(applicant_id, applicant_id)))
196            if results:
197                return results[0]
198        if self.getLocator(row) == 'reg_number':
[7270]199            reg_number = row['reg_number']
[7268]200            cat = queryUtility(ICatalog, name='applicants_catalog')
201            results = list(
[7270]202                cat.searchResults(reg_number=(reg_number, reg_number)))
[7268]203            if results:
204                return results[0]
205        return None
[7262]206
[7268]207    def entryExists(self, row, site):
208        return self.getEntry(row, site) is not None
209
[7262]210    def addEntry(self, obj, row, site):
211        parent = self.getParent(row, site)
212        parent.addApplicant(obj)
[8334]213        #parent.__parent__.logger.info(
214        #    'Applicant imported: %s' % obj.applicant_id)
[8290]215        history = IObjectHistory(obj)
[8334]216        history.addMessage(_('Application record imported'))
[7262]217        return
218
219    def delEntry(self, row, site):
[7268]220        applicant = self.getEntry(row, site)
221        if applicant is not None:
[8331]222            parent = applicant.__parent__
[7268]223            del parent[applicant.application_number]
[8334]224            #parent.__parent__.logger.info(
225            #    'Applicant removed: %s' % applicant.applicant_id)
[7262]226        pass
[7268]227
[9706]228    def updateEntry(self, obj, row, site, filename):
[8290]229        """Update obj to the values given in row.
230        """
231        items_changed = ''
232        # Remove application_number from row if empty
[9701]233        if 'application_number' in row and row['application_number'] in (
[8290]234            None, IGNORE_MARKER):
235            row.pop('application_number')
236
237        # Update applicant_id fom application_number and container code
238        # if application_number is given
[9701]239        if 'application_number' in row:
[8290]240            obj.applicant_id = u'%s_%s' % (
241                row['container_code'], row['application_number'])
[8334]242            items_changed += ('%s=%s, ' % ('applicant_id', obj.applicant_id))
[8290]243            row.pop('application_number')
244
245        # Update password
[9701]246        if 'password' in row:
[8334]247            passwd = row.get('password', IGNORE_MARKER)
248            if passwd not in ('', IGNORE_MARKER):
[8348]249                if passwd.startswith('{SSHA}'):
250                    # already encrypted password
251                    obj.password = passwd
252                else:
253                    # not yet encrypted password
254                    IUserAccount(obj).setPassword(passwd)
[8334]255                items_changed += ('%s=%s, ' % ('password', passwd))
[8290]256            row.pop('password')
257
258        # Update registration state
[9701]259        if 'state' in row:
[8334]260            state = row.get('state', IGNORE_MARKER)
261            if state not in (IGNORE_MARKER, ''):
262                IWorkflowState(obj).setState(state)
263                msg = _("State '${a}' set", mapping = {'a':state})
264                history = IObjectHistory(obj)
265                history.addMessage(msg)
266                items_changed += ('%s=%s, ' % ('state', state))
[8290]267            row.pop('state')
268
269        # apply other values...
[8334]270        items_changed += super(ApplicantProcessor, self).updateEntry(
[9706]271            obj, row, site, filename)
[8290]272
273        # Log actions...
274        parent = self.getParent(row, site)
[8334]275        if self.getLocator(row) == 'container_code':
[8290]276            parent.__parent__.logger.info(
[9706]277                '%s - %s - imported: %s' % (self.name, filename, items_changed))
[8290]278        else:
[8334]279            parent.__parent__.logger.info(
[9706]280                '%s - %s - updated: %s' % (self.name, filename, items_changed))
[8290]281        return items_changed
282
[7268]283    def getMapping(self, path, headerfields, mode):
284        """Get a mapping from CSV file headerfields to actually used fieldnames.
285        """
286        result = dict()
287        reader = csv.reader(open(path, 'rb'))
288        raw_header = reader.next()
289        for num, field in enumerate(headerfields):
[8331]290            if field not in ['applicant_id', 'reg_number'] and mode == 'remove':
[7268]291                continue
292            if field == u'--IGNORE--':
293                # Skip ignored columns in failed and finished data files.
294                continue
295            result[raw_header[num]] = field
296        return result
297
298    def checkConversion(self, row, mode='create'):
299        """Validates all values in row.
300        """
[8331]301        iface = self.iface
302        if self.getLocator(row) == 'reg_number' or mode == 'remove':
[8581]303            iface = self.iface_byregnumber
[7268]304        converter = IObjectConverter(iface)
305        errs, inv_errs, conv_dict =  converter.fromStringDict(
[8223]306            row, self.factory_name, mode=mode)
[8615]307        cert = conv_dict.get('course1', None)
308        if cert is not None and (mode in ('create', 'update')):
[8617]309            # course1 application category must match container's.
[12869]310            site = grok.getSite()
311            parent = self.getParent(row, site)
[10613]312            if parent is None:
313                errs.append(('container', 'not found'))
314            elif cert.application_category != parent.application_category:
[8615]315                errs.append(('course1', 'wrong application category'))
[9701]316        if 'state' in row and \
[8290]317            not row['state'] in IMPORTABLE_STATES:
318            if row['state'] not in (IGNORE_MARKER, ''):
319                errs.append(('state','not allowed'))
320            else:
321                # state is an attribute of Applicant and must not
322                # be changed if empty
323                conv_dict['state'] = IGNORE_MARKER
[8331]324        application_number = row.get('application_number', None)
[8290]325        if application_number in (IGNORE_MARKER, ''):
326                conv_dict['application_number'] = IGNORE_MARKER
[7268]327        return errs, inv_errs, conv_dict
[8336]328
329    def checkUpdateRequirements(self, obj, row, site):
330        """Checks requirements the object must fulfill when being updated.
331
332        This method is not used in case of deleting or adding objects.
333
334        Returns error messages as strings in case of requirement
335        problems.
336        """
337        if obj.state == CREATED:
338            return 'Applicant is blocked.'
339        return None
[13872]340
341class ApplicantOnlinePaymentProcessor(BatchProcessor):
342    """The Applicant Online Payment Processor imports applicant payment tickets.
343    The tickets are located in the applicant container.
344
345    The `checkConversion` method checks the format of the payment identifier.
346    In create mode it does also ensures that same p_id does not exist
347    elsewhere. It must be portal-wide unique.
348
349    When adding a payment ticket, the `addEntry` method checks if a
350    payment has already been made. If so, a `DuplicationError` is raised.
351    """
352    grok.implements(IBatchProcessor)
353    grok.provides(IBatchProcessor)
354    grok.context(Interface)
355    util_name = 'applicantpaymentprocessor'
356    grok.name(util_name)
357
358    name = _('ApplicantOnlinePayment Processor')
359    iface = IApplicantOnlinePayment
360    factory_name = 'waeup.ApplicantOnlinePayment'
361
362    location_fields = ['applicant_id', 'p_id']
363
364    @property
365    def available_fields(self):
366        af = sorted(list(set(
[13873]367            self.location_fields + getFields(self.iface).keys())))
[13872]368        af.remove('display_item')
369        return af
370
371    def parentsExist(self, row, site):
372        return self.getParent(row, site) is not None
373
374    def getParent(self, row, site):
375        applicant_id = row['applicant_id']
376        cat = queryUtility(ICatalog, name='applicants_catalog')
377        results = list(
378            cat.searchResults(applicant_id=(applicant_id, applicant_id)))
379        if results:
380            return results[0]
381        return None
382
383    def getEntry(self, row, site):
384        applicant = self.getParent(row, site)
385        if applicant is None:
386            return None
387        p_id = row.get('p_id', None)
388        if p_id in (None, IGNORE_MARKER):
389            return None
390        # We can use the hash symbol at the end of p_id in import files
391        # to avoid annoying automatic number transformation
392        # by Excel or Calc
393        p_id = p_id.strip('#')
394        entry = applicant.get(p_id)
395        return entry
396
397    def entryExists(self, row, site):
398        return self.getEntry(row, site) is not None
399
400    def updateEntry(self, obj, row, site, filename):
401        """Update obj to the values given in row.
402        """
403        items_changed = super(ApplicantOnlinePaymentProcessor, self).updateEntry(
404            obj, row, site, filename)
405        applicant = self.getParent(row, site)
406        applicant.__parent__.__parent__.logger.info(
407            '%s - %s - %s - updated: %s'
408            % (self.name, filename, applicant.applicant_id, items_changed))
409        return
410
411    def samePaymentMade(self, applicant, category):
412        for key in applicant.keys():
413            ticket = applicant[key]
414            if ticket.p_state == 'paid' and\
415                ticket.p_category == category:
416                  return True
417        return False
418
419    def addEntry(self, obj, row, site):
420        applicant = self.getParent(row, site)
421        p_id = row['p_id'].strip('#')
422        if self.samePaymentMade(applicant, obj.p_category):
423            applicant.__parent__.__parent__.logger.info(
424                '%s - %s - previous update cancelled'
425                % (self.name, applicant.applicant_id))
426            raise DuplicationError('Payment has already been made.')
427        applicant[p_id] = obj
428        return
429
430    def delEntry(self, row, site):
431        payment = self.getEntry(row, site)
432        applicant = self.getParent(row, site)
433        if payment is not None:
434            applicant.__parent__.__parent__.logger.info('%s - Payment ticket removed: %s'
435                % (applicant.applicant_id, payment.p_id))
436            del applicant[payment.p_id]
437        return
438
439    def checkConversion(self, row, mode='ignore'):
440        """Validates all values in row.
441        """
442        errs, inv_errs, conv_dict = super(
443            ApplicantOnlinePaymentProcessor, self).checkConversion(row, mode=mode)
444        # We have to check p_id.
445        p_id = row.get('p_id', None)
446        if mode == 'create' and p_id in (None, IGNORE_MARKER):
447            timestamp = ("%d" % int(time()*10000))[1:]
448            p_id = "p%s" % timestamp
449            conv_dict['p_id'] = p_id
450            return errs, inv_errs, conv_dict
451        elif p_id in (None, IGNORE_MARKER):
452            errs.append(('p_id','missing'))
453            return errs, inv_errs, conv_dict
454        else:
455            p_id = p_id.strip('#')
456            if not len(p_id) == 14:
457                errs.append(('p_id','invalid length'))
458                return errs, inv_errs, conv_dict
459        if mode == 'create':
460            cat = getUtility(ICatalog, name='payments_catalog')
461            results = list(cat.searchResults(p_id=(p_id, p_id)))
462            if len(results) > 0:
463                sids = [IPayer(payment).id for payment in results]
464                sids_string = ''
465                for id in sids:
466                    sids_string += '%s ' % id
467                errs.append(('p_id','p_id exists in %s' % sids_string))
468                return errs, inv_errs, conv_dict
469        return errs, inv_errs, conv_dict
Note: See TracBrowser for help on using the repository browser.