source: main/waeup.kofa/trunk/src/waeup/kofa/applicants/batching.py @ 17773

Last change on this file since 17773 was 17772, checked in by Henrik Bettermann, 8 months ago

Add ApplicantRefereeReportProcessor.

  • Property svn:keywords set to Id
File size: 22.0 KB
RevLine 
[7192]1## $Id: batching.py 17772 2024-05-13 08:31:58Z henrik $
[5321]2##
[7192]3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
[5321]4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
[7192]8##
[5321]9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
[7192]13##
[5321]14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
18"""Batch processing for applicants.
19"""
[10028]20import unicodecsv as csv # XXX: csv ops should move to dedicated module.
[5321]21import grok
[13873]22from time import time
[17762]23from ast import literal_eval
[16472]24import unicodecsv
[7271]25from zope.schema import getFields
[5321]26from zope.interface import Interface
[13872]27from zope.component import queryUtility, getUtility
[8290]28from hurry.workflow.interfaces import IWorkflowState
[7268]29from zope.catalog.interfaces import ICatalog
[7811]30from waeup.kofa.interfaces import (
[15628]31    IBatchProcessor, IObjectConverter, FatalCSVError,
32    IGNORE_MARKER, DELETION_MARKER,
[13872]33    IObjectHistory, IUserAccount, DuplicationError)
[8290]34from waeup.kofa.interfaces import MessageFactory as _
[13872]35from waeup.kofa.payments.interfaces import IPayer
[7811]36from waeup.kofa.utils.batching import BatchProcessor
37from waeup.kofa.applicants.interfaces import (
[13872]38    IApplicantsContainer, IApplicant, IApplicantUpdateByRegNo,
[17772]39    IApplicantOnlinePayment, IApplicantRefereeReport)
[8336]40from waeup.kofa.applicants.workflow import  IMPORTABLE_STATES, CREATED
[5321]41
[7933]42class ApplicantsContainerProcessor(BatchProcessor):
[12869]43    """The Applicants Container Processor imports containers for applicants.
44    It does not import their content. There is nothing special about this
45    processor.
[5321]46    """
[5474]47    grok.implements(IBatchProcessor)
[5321]48    grok.provides(IBatchProcessor)
49    grok.context(Interface)
[8987]50    util_name = 'applicantscontainerprocessor'
[5321]51    grok.name(util_name)
52
[11891]53    name = _('ApplicantsContainer Processor')
[5475]54    mode = u'create'
[6251]55    iface = IApplicantsContainer
[5321]56
[6251]57    location_fields = ['code',]
[6282]58    factory_name = 'waeup.ApplicantsContainer'
[5321]59
60    def parentsExist(self, row, site):
[6251]61        return 'applicants' in site.keys()
[5321]62
63    def entryExists(self, row, site):
[6251]64        return row['code'] in site['applicants'].keys()
[5321]65
66    def getParent(self, row, site):
[6251]67        return site['applicants']
[5321]68
69    def getEntry(self, row, site):
70        if not self.entryExists(row, site):
71            return None
72        parent = self.getParent(row, site)
[6251]73        return parent.get(row['code'])
[5321]74
75    def addEntry(self, obj, row, site):
76        parent = self.getParent(row, site)
[6251]77        parent[row['code']] = obj
[5321]78        return
79
80    def delEntry(self, row, site):
81        parent = self.getParent(row, site)
[6251]82        del parent[row['code']]
[5321]83        return
[7262]84
[7933]85class ApplicantProcessor(BatchProcessor):
[12872]86    """The Applicant Processor imports application data (applicants).
[8331]87
[12869]88    In create mode `container_code` is required. If `application_number` is
89    given, an applicant with this number is created in the designated container.
90    If `application_number` is not given, a random `application_number` is
91    assigned. `applicant_id` is being determined by the system and can't be
92    imported.
[8331]93
[12869]94    In update or remove mode `container_code` and `application_number` columns
[12870]95    must not exist. The applicant object is solely localized by searching
96    the applicants catalog for `reg_number` or `applicant_id` .
[7262]97    """
98    grok.implements(IBatchProcessor)
99    grok.provides(IBatchProcessor)
100    grok.context(Interface)
[7933]101    util_name = 'applicantprocessor'
[7262]102    grok.name(util_name)
[11891]103    name = _('Applicant Processor')
[7262]104    iface = IApplicant
[8581]105    iface_byregnumber = IApplicantUpdateByRegNo
[7262]106    factory_name = 'waeup.Applicant'
107
108    mode = None
109
110    @property
[7268]111    def available_fields(self):
112        return sorted(list(set(
[17762]113            ['application_number', 'history',
[8290]114            'container_code','state','password'] + getFields(
[7268]115                self.iface).keys())))
[7262]116
[17705]117    def cheadminckHeaders(self, headerfields, mode='create'):
[8331]118        cond1 = 'container_code' in headerfields
119        cond2 = 'application_number' in headerfields
120        cond3 = 'applicant_id' in headerfields
121        cond4 = 'reg_number' in headerfields
[7268]122        if mode == 'create':
[8331]123            if not cond1:
124                raise FatalCSVError(
125                    "Need at least container_code column!")
126            if cond3:
127                raise FatalCSVError(
128                    "applicant_id can't be imported in create mode!")
[7268]129            for field in self.required_fields:
130                if not field in headerfields:
131                    raise FatalCSVError(
132                        "Need at least columns %s for import!" %
133                        ', '.join(["'%s'" % x for x in self.required_fields]))
[8331]134        if mode in ('update', 'remove'):
135            if not cond3 and not cond4:
136                raise FatalCSVError(
137                    "Need at least column reg_number or applicant_id!")
138            if cond1 or cond2:
139                raise FatalCSVError(
140                    "container_code or application_number can't be imported " +
141                    "in update or remove mode!")
[7268]142        # Check for fields to be ignored...
143        not_ignored_fields = [x for x in headerfields
144                              if not x.startswith('--')]
145        if len(set(not_ignored_fields)) < len(not_ignored_fields):
146            raise FatalCSVError(
147                "Double headers: each column name may only appear once.")
148        return True
[7262]149
[7268]150    def getLocator(self, row):
[8331]151        if row.get('container_code', None) not in (IGNORE_MARKER, None):
[9250]152            # create mode
[8331]153            return 'container_code'
154        elif row.get('applicant_id', None) not in (IGNORE_MARKER, None):
[9250]155            # update or remove mode
[8331]156            return 'applicant_id'
[8236]157        elif row.get('reg_number', None) not in (IGNORE_MARKER, None):
[9250]158            # update or remove mode
[7270]159            return 'reg_number'
[7268]160        else:
161            return None
[7262]162
163    def getParent(self, row, site):
[8615]164        result = None
[8331]165        if self.getLocator(row) == 'container_code':
[8615]166            result = site['applicants'].get(row['container_code'], None)
167        elif self.getLocator(row) == 'reg_number':
[8331]168            reg_number = row['reg_number']
169            cat = queryUtility(ICatalog, name='applicants_catalog')
170            results = list(
171                cat.searchResults(reg_number=(reg_number, reg_number)))
172            if results:
[8615]173                result = results[0].__parent__
174        elif self.getLocator(row) == 'applicant_id':
[8331]175            applicant_id = row['applicant_id']
176            cat = queryUtility(ICatalog, name='applicants_catalog')
177            results = list(
178                cat.searchResults(applicant_id=(applicant_id, applicant_id)))
179            if results:
[8615]180                result = results[0].__parent__
181        return result
[7262]182
[7268]183    def parentsExist(self, row, site):
184        return self.getParent(row, site) is not None
185
[7262]186    def getEntry(self, row, site):
[8331]187        if self.getLocator(row) == 'container_code':
188            if row.get('application_number', None) not in (IGNORE_MARKER, None):
189                if not self.parentsExist(row, site):
190                    return None
191                parent = self.getParent(row, site)
192                return parent.get(row['application_number'])
[7264]193            return None
[8331]194        if self.getLocator(row) == 'applicant_id':
195            applicant_id = row['applicant_id']
196            cat = queryUtility(ICatalog, name='applicants_catalog')
197            results = list(
198                cat.searchResults(applicant_id=(applicant_id, applicant_id)))
199            if results:
200                return results[0]
201        if self.getLocator(row) == 'reg_number':
[7270]202            reg_number = row['reg_number']
[7268]203            cat = queryUtility(ICatalog, name='applicants_catalog')
204            results = list(
[7270]205                cat.searchResults(reg_number=(reg_number, reg_number)))
[7268]206            if results:
207                return results[0]
208        return None
[7262]209
[7268]210    def entryExists(self, row, site):
211        return self.getEntry(row, site) is not None
212
[7262]213    def addEntry(self, obj, row, site):
214        parent = self.getParent(row, site)
215        parent.addApplicant(obj)
[8334]216        #parent.__parent__.logger.info(
217        #    'Applicant imported: %s' % obj.applicant_id)
[8290]218        history = IObjectHistory(obj)
[8334]219        history.addMessage(_('Application record imported'))
[7262]220        return
221
222    def delEntry(self, row, site):
[7268]223        applicant = self.getEntry(row, site)
224        if applicant is not None:
[8331]225            parent = applicant.__parent__
[7268]226            del parent[applicant.application_number]
[8334]227            #parent.__parent__.logger.info(
228            #    'Applicant removed: %s' % applicant.applicant_id)
[7262]229        pass
[7268]230
[9706]231    def updateEntry(self, obj, row, site, filename):
[8290]232        """Update obj to the values given in row.
233        """
234        items_changed = ''
235        # Remove application_number from row if empty
[9701]236        if 'application_number' in row and row['application_number'] in (
[8290]237            None, IGNORE_MARKER):
238            row.pop('application_number')
239
240        # Update applicant_id fom application_number and container code
241        # if application_number is given
[9701]242        if 'application_number' in row:
[8290]243            obj.applicant_id = u'%s_%s' % (
244                row['container_code'], row['application_number'])
[8334]245            items_changed += ('%s=%s, ' % ('applicant_id', obj.applicant_id))
[8290]246            row.pop('application_number')
247
[15628]248        # Update
[9701]249        if 'password' in row:
[8334]250            passwd = row.get('password', IGNORE_MARKER)
251            if passwd not in ('', IGNORE_MARKER):
[8348]252                if passwd.startswith('{SSHA}'):
253                    # already encrypted password
254                    obj.password = passwd
[15628]255                elif passwd == DELETION_MARKER:
256                    obj.password = None
[8348]257                else:
258                    # not yet encrypted password
259                    IUserAccount(obj).setPassword(passwd)
[8334]260                items_changed += ('%s=%s, ' % ('password', passwd))
[8290]261            row.pop('password')
262
[17762]263        # Replace entire history
264        if 'history' in row:
265            new_history = row.get('history', IGNORE_MARKER)
266            if new_history not in (IGNORE_MARKER, ''):
267                history = IObjectHistory(obj)
268                history._annotations[
269                    history.history_key] = literal_eval(new_history)
270                items_changed += ('%s=%s, ' % ('history', new_history))
271            row.pop('history')
272
[8290]273        # Update registration state
[9701]274        if 'state' in row:
[8334]275            state = row.get('state', IGNORE_MARKER)
276            if state not in (IGNORE_MARKER, ''):
277                IWorkflowState(obj).setState(state)
278                msg = _("State '${a}' set", mapping = {'a':state})
279                history = IObjectHistory(obj)
280                history.addMessage(msg)
281                items_changed += ('%s=%s, ' % ('state', state))
[8290]282            row.pop('state')
283
284        # apply other values...
[8334]285        items_changed += super(ApplicantProcessor, self).updateEntry(
[9706]286            obj, row, site, filename)
[8290]287
288        # Log actions...
289        parent = self.getParent(row, site)
[8334]290        if self.getLocator(row) == 'container_code':
[8290]291            parent.__parent__.logger.info(
[9706]292                '%s - %s - imported: %s' % (self.name, filename, items_changed))
[8290]293        else:
[8334]294            parent.__parent__.logger.info(
[9706]295                '%s - %s - updated: %s' % (self.name, filename, items_changed))
[8290]296        return items_changed
297
[7268]298    def getMapping(self, path, headerfields, mode):
299        """Get a mapping from CSV file headerfields to actually used fieldnames.
300        """
301        result = dict()
302        reader = csv.reader(open(path, 'rb'))
303        raw_header = reader.next()
304        for num, field in enumerate(headerfields):
[8331]305            if field not in ['applicant_id', 'reg_number'] and mode == 'remove':
[7268]306                continue
307            if field == u'--IGNORE--':
308                # Skip ignored columns in failed and finished data files.
309                continue
310            result[raw_header[num]] = field
311        return result
312
313    def checkConversion(self, row, mode='create'):
314        """Validates all values in row.
315        """
[8331]316        iface = self.iface
317        if self.getLocator(row) == 'reg_number' or mode == 'remove':
[8581]318            iface = self.iface_byregnumber
[7268]319        converter = IObjectConverter(iface)
320        errs, inv_errs, conv_dict =  converter.fromStringDict(
[8223]321            row, self.factory_name, mode=mode)
[8615]322        cert = conv_dict.get('course1', None)
323        if cert is not None and (mode in ('create', 'update')):
[8617]324            # course1 application category must match container's.
[12869]325            site = grok.getSite()
326            parent = self.getParent(row, site)
[10613]327            if parent is None:
328                errs.append(('container', 'not found'))
329            elif cert.application_category != parent.application_category:
[8615]330                errs.append(('course1', 'wrong application category'))
[9701]331        if 'state' in row and \
[8290]332            not row['state'] in IMPORTABLE_STATES:
333            if row['state'] not in (IGNORE_MARKER, ''):
334                errs.append(('state','not allowed'))
335            else:
336                # state is an attribute of Applicant and must not
337                # be changed if empty
338                conv_dict['state'] = IGNORE_MARKER
[8331]339        application_number = row.get('application_number', None)
[8290]340        if application_number in (IGNORE_MARKER, ''):
341                conv_dict['application_number'] = IGNORE_MARKER
[7268]342        return errs, inv_errs, conv_dict
[8336]343
344    def checkUpdateRequirements(self, obj, row, site):
345        """Checks requirements the object must fulfill when being updated.
346
347        This method is not used in case of deleting or adding objects.
348
349        Returns error messages as strings in case of requirement
350        problems.
351        """
[17705]352        # Block applicant with backdoor ...
353        if obj.state == CREATED and obj.notice != 'reset':
[8336]354            return 'Applicant is blocked.'
355        return None
[13872]356
357class ApplicantOnlinePaymentProcessor(BatchProcessor):
358    """The Applicant Online Payment Processor imports applicant payment tickets.
359    The tickets are located in the applicant container.
360
361    The `checkConversion` method checks the format of the payment identifier.
362    In create mode it does also ensures that same p_id does not exist
363    elsewhere. It must be portal-wide unique.
364
365    When adding a payment ticket, the `addEntry` method checks if a
366    payment has already been made. If so, a `DuplicationError` is raised.
367    """
368    grok.implements(IBatchProcessor)
369    grok.provides(IBatchProcessor)
370    grok.context(Interface)
371    util_name = 'applicantpaymentprocessor'
372    grok.name(util_name)
373
374    name = _('ApplicantOnlinePayment Processor')
375    iface = IApplicantOnlinePayment
376    factory_name = 'waeup.ApplicantOnlinePayment'
377
[16472]378    location_fields = ['applicant_id',]
[13872]379
380    @property
381    def available_fields(self):
382        af = sorted(list(set(
[14804]383            self.location_fields + getFields(self.iface).keys())) +
384            ['p_id',])
[13872]385        af.remove('display_item')
386        return af
387
[14804]388    def checkHeaders(self, headerfields, mode='ignore'):
389        super(ApplicantOnlinePaymentProcessor, self).checkHeaders(headerfields)
390        if mode in ('update', 'remove') and not 'p_id' in headerfields:
391            raise FatalCSVError(
392                "Need p_id for import in update and remove modes!")
393        return True
394
[16472]395    def getMapping(self, path, headerfields, mode):
396        """Get a mapping from CSV file headerfields to actually used fieldnames.
397
398        """
399        result = dict()
400        reader = unicodecsv.reader(open(path, 'rb'))
401        raw_header = reader.next()
402        for num, field in enumerate(headerfields):
403            if field not in  ['applicant_id', 'p_id'] and mode == 'remove':
404                continue
405            if field == u'--IGNORE--':
406                continue
407            result[raw_header[num]] = field
408        return result
409
[13872]410    def parentsExist(self, row, site):
411        return self.getParent(row, site) is not None
412
413    def getParent(self, row, site):
414        applicant_id = row['applicant_id']
415        cat = queryUtility(ICatalog, name='applicants_catalog')
416        results = list(
417            cat.searchResults(applicant_id=(applicant_id, applicant_id)))
418        if results:
419            return results[0]
420        return None
421
422    def getEntry(self, row, site):
423        applicant = self.getParent(row, site)
424        if applicant is None:
425            return None
426        p_id = row.get('p_id', None)
427        if p_id in (None, IGNORE_MARKER):
428            return None
429        # We can use the hash symbol at the end of p_id in import files
430        # to avoid annoying automatic number transformation
431        # by Excel or Calc
432        p_id = p_id.strip('#')
433        entry = applicant.get(p_id)
434        return entry
435
436    def entryExists(self, row, site):
437        return self.getEntry(row, site) is not None
438
439    def updateEntry(self, obj, row, site, filename):
440        """Update obj to the values given in row.
441        """
442        items_changed = super(ApplicantOnlinePaymentProcessor, self).updateEntry(
443            obj, row, site, filename)
444        applicant = self.getParent(row, site)
445        applicant.__parent__.__parent__.logger.info(
446            '%s - %s - %s - updated: %s'
447            % (self.name, filename, applicant.applicant_id, items_changed))
448        return
449
450    def samePaymentMade(self, applicant, category):
451        for key in applicant.keys():
452            ticket = applicant[key]
453            if ticket.p_state == 'paid' and\
[16202]454                ticket.p_category == category:
[16200]455                return True
[13872]456        return False
457
458    def addEntry(self, obj, row, site):
459        applicant = self.getParent(row, site)
460        p_id = row['p_id'].strip('#')
[16358]461        if not (obj.p_item and obj.p_item.startswith(
462            'Balance')) and self.samePaymentMade(applicant, obj.p_category):
[13872]463            applicant.__parent__.__parent__.logger.info(
464                '%s - %s - previous update cancelled'
465                % (self.name, applicant.applicant_id))
466            raise DuplicationError('Payment has already been made.')
467        applicant[p_id] = obj
468        return
469
470    def delEntry(self, row, site):
471        payment = self.getEntry(row, site)
472        applicant = self.getParent(row, site)
473        if payment is not None:
474            applicant.__parent__.__parent__.logger.info('%s - Payment ticket removed: %s'
475                % (applicant.applicant_id, payment.p_id))
476            del applicant[payment.p_id]
477        return
478
479    def checkConversion(self, row, mode='ignore'):
480        """Validates all values in row.
481        """
482        errs, inv_errs, conv_dict = super(
483            ApplicantOnlinePaymentProcessor, self).checkConversion(row, mode=mode)
484        # We have to check p_id.
485        p_id = row.get('p_id', None)
486        if mode == 'create' and p_id in (None, IGNORE_MARKER):
487            timestamp = ("%d" % int(time()*10000))[1:]
488            p_id = "p%s" % timestamp
489            conv_dict['p_id'] = p_id
490            return errs, inv_errs, conv_dict
491        elif p_id in (None, IGNORE_MARKER):
492            errs.append(('p_id','missing'))
493            return errs, inv_errs, conv_dict
494        else:
495            p_id = p_id.strip('#')
496            if not len(p_id) == 14:
497                errs.append(('p_id','invalid length'))
498                return errs, inv_errs, conv_dict
499        if mode == 'create':
500            cat = getUtility(ICatalog, name='payments_catalog')
501            results = list(cat.searchResults(p_id=(p_id, p_id)))
502            if len(results) > 0:
503                sids = [IPayer(payment).id for payment in results]
504                sids_string = ''
505                for id in sids:
506                    sids_string += '%s ' % id
507                errs.append(('p_id','p_id exists in %s' % sids_string))
508                return errs, inv_errs, conv_dict
509        return errs, inv_errs, conv_dict
[17772]510
511class ApplicantRefereeReportProcessor(BatchProcessor):
512    """The Applicant Referee Report Processor imports applicant referee reports.
513    The reports are located in the applicant container.
514    """
515    grok.implements(IBatchProcessor)
516    grok.provides(IBatchProcessor)
517    grok.context(Interface)
518    util_name = 'applicantrefereereportprocessor'
519    grok.name(util_name)
520
521    name = _('ApplicantRefereeReport Processor')
522    iface = IApplicantRefereeReport
523    factory_name = 'waeup.ApplicantRefereeReport'
524
525    location_fields = ['applicant_id',]
526
527    def parentsExist(self, row, site):
528        return self.getParent(row, site) is not None
529
530    def getParent(self, row, site):
531        applicant_id = row['applicant_id']
532        cat = queryUtility(ICatalog, name='applicants_catalog')
533        results = list(
534            cat.searchResults(applicant_id=(applicant_id, applicant_id)))
535        if results:
536            return results[0]
537        return None
538
539    def getEntry(self, row, site):
540        parent = self.getParent(row, site)
541        if parent is None:
542            return None
543        return parent.get(row['r_id'])
544
545    def entryExists(self, row, site):
546        return self.getEntry(row, site) is not None
547
548    def updateEntry(self, obj, row, site, filename):
549        """Update obj to the values given in row.
550        """
551        items_changed = super(ApplicantRefereeReportProcessor, self).updateEntry(
552            obj, row, site, filename)
553        applicant = self.getParent(row, site)
554        applicant.__parent__.__parent__.logger.info(
555            '%s - %s - %s - updated: %s'
556            % (self.name, filename, applicant.applicant_id, items_changed))
557        return
558
559    def addEntry(self, obj, row, site):
560        parent = self.getParent(row, site)
561        parent[row['r_id']] = obj
562        return
563
564    def delEntry(self, row, site):
565        report = self.getEntry(row, site)
566        parent = self.getParent(row, site)
567        if report is not None:
568            parent.__parent__.__parent__.logger.info(
569                '%s - Referee Report removed' % report.r_id)
570            del parent[report.r_id]
571        return
Note: See TracBrowser for help on using the repository browser.