source: main/waeup.kofa/trunk/src/waeup/kofa/applicants/batching.py @ 17863

Last change on this file since 17863 was 17787, checked in by Henrik Bettermann, 8 months ago

Add SessionConfigurationProcessor.
Add ConfigurationContainerProcessor.
Add ConfigurationContainerExporter.

  • Property svn:keywords set to Id
File size: 22.1 KB
RevLine 
[7192]1## $Id: batching.py 17787 2024-05-15 06:42:58Z henrik $
[5321]2##
[7192]3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
[5321]4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
[7192]8##
[5321]9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
[7192]13##
[5321]14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
18"""Batch processing for applicants.
19"""
[10028]20import unicodecsv as csv # XXX: csv ops should move to dedicated module.
[5321]21import grok
[13873]22from time import time
[17762]23from ast import literal_eval
[16472]24import unicodecsv
[7271]25from zope.schema import getFields
[5321]26from zope.interface import Interface
[13872]27from zope.component import queryUtility, getUtility
[8290]28from hurry.workflow.interfaces import IWorkflowState
[7268]29from zope.catalog.interfaces import ICatalog
[7811]30from waeup.kofa.interfaces import (
[15628]31    IBatchProcessor, IObjectConverter, FatalCSVError,
32    IGNORE_MARKER, DELETION_MARKER,
[13872]33    IObjectHistory, IUserAccount, DuplicationError)
[8290]34from waeup.kofa.interfaces import MessageFactory as _
[13872]35from waeup.kofa.payments.interfaces import IPayer
[7811]36from waeup.kofa.utils.batching import BatchProcessor
37from waeup.kofa.applicants.interfaces import (
[13872]38    IApplicantsContainer, IApplicant, IApplicantUpdateByRegNo,
[17772]39    IApplicantOnlinePayment, IApplicantRefereeReport)
[8336]40from waeup.kofa.applicants.workflow import  IMPORTABLE_STATES, CREATED
[5321]41
[7933]42class ApplicantsContainerProcessor(BatchProcessor):
[12869]43    """The Applicants Container Processor imports containers for applicants.
44    It does not import their content. There is nothing special about this
45    processor.
[5321]46    """
[5474]47    grok.implements(IBatchProcessor)
[5321]48    grok.provides(IBatchProcessor)
49    grok.context(Interface)
[8987]50    util_name = 'applicantscontainerprocessor'
[5321]51    grok.name(util_name)
52
[11891]53    name = _('ApplicantsContainer Processor')
[5475]54    mode = u'create'
[6251]55    iface = IApplicantsContainer
[5321]56
[6251]57    location_fields = ['code',]
[6282]58    factory_name = 'waeup.ApplicantsContainer'
[5321]59
60    def parentsExist(self, row, site):
[6251]61        return 'applicants' in site.keys()
[5321]62
63    def entryExists(self, row, site):
[6251]64        return row['code'] in site['applicants'].keys()
[5321]65
66    def getParent(self, row, site):
[6251]67        return site['applicants']
[5321]68
69    def getEntry(self, row, site):
70        if not self.entryExists(row, site):
71            return None
72        parent = self.getParent(row, site)
[6251]73        return parent.get(row['code'])
[5321]74
75    def addEntry(self, obj, row, site):
76        parent = self.getParent(row, site)
[6251]77        parent[row['code']] = obj
[5321]78        return
79
80    def delEntry(self, row, site):
81        parent = self.getParent(row, site)
[6251]82        del parent[row['code']]
[5321]83        return
[7262]84
[7933]85class ApplicantProcessor(BatchProcessor):
[12872]86    """The Applicant Processor imports application data (applicants).
[8331]87
[12869]88    In create mode `container_code` is required. If `application_number` is
89    given, an applicant with this number is created in the designated container.
90    If `application_number` is not given, a random `application_number` is
91    assigned. `applicant_id` is being determined by the system and can't be
92    imported.
[8331]93
[12869]94    In update or remove mode `container_code` and `application_number` columns
[12870]95    must not exist. The applicant object is solely localized by searching
96    the applicants catalog for `reg_number` or `applicant_id` .
[7262]97    """
98    grok.implements(IBatchProcessor)
99    grok.provides(IBatchProcessor)
100    grok.context(Interface)
[7933]101    util_name = 'applicantprocessor'
[7262]102    grok.name(util_name)
[11891]103    name = _('Applicant Processor')
[7262]104    iface = IApplicant
[8581]105    iface_byregnumber = IApplicantUpdateByRegNo
[7262]106    factory_name = 'waeup.Applicant'
107
108    @property
[7268]109    def available_fields(self):
110        return sorted(list(set(
[17762]111            ['application_number', 'history',
[8290]112            'container_code','state','password'] + getFields(
[7268]113                self.iface).keys())))
[7262]114
[17705]115    def cheadminckHeaders(self, headerfields, mode='create'):
[8331]116        cond1 = 'container_code' in headerfields
117        cond2 = 'application_number' in headerfields
118        cond3 = 'applicant_id' in headerfields
119        cond4 = 'reg_number' in headerfields
[7268]120        if mode == 'create':
[8331]121            if not cond1:
122                raise FatalCSVError(
123                    "Need at least container_code column!")
124            if cond3:
125                raise FatalCSVError(
126                    "applicant_id can't be imported in create mode!")
[7268]127            for field in self.required_fields:
128                if not field in headerfields:
129                    raise FatalCSVError(
130                        "Need at least columns %s for import!" %
131                        ', '.join(["'%s'" % x for x in self.required_fields]))
[8331]132        if mode in ('update', 'remove'):
133            if not cond3 and not cond4:
134                raise FatalCSVError(
135                    "Need at least column reg_number or applicant_id!")
136            if cond1 or cond2:
137                raise FatalCSVError(
138                    "container_code or application_number can't be imported " +
139                    "in update or remove mode!")
[7268]140        # Check for fields to be ignored...
141        not_ignored_fields = [x for x in headerfields
142                              if not x.startswith('--')]
143        if len(set(not_ignored_fields)) < len(not_ignored_fields):
144            raise FatalCSVError(
145                "Double headers: each column name may only appear once.")
146        return True
[7262]147
[7268]148    def getLocator(self, row):
[8331]149        if row.get('container_code', None) not in (IGNORE_MARKER, None):
[9250]150            # create mode
[8331]151            return 'container_code'
152        elif row.get('applicant_id', None) not in (IGNORE_MARKER, None):
[9250]153            # update or remove mode
[8331]154            return 'applicant_id'
[8236]155        elif row.get('reg_number', None) not in (IGNORE_MARKER, None):
[9250]156            # update or remove mode
[7270]157            return 'reg_number'
[7268]158        else:
159            return None
[7262]160
161    def getParent(self, row, site):
[8615]162        result = None
[8331]163        if self.getLocator(row) == 'container_code':
[8615]164            result = site['applicants'].get(row['container_code'], None)
165        elif self.getLocator(row) == 'reg_number':
[8331]166            reg_number = row['reg_number']
167            cat = queryUtility(ICatalog, name='applicants_catalog')
168            results = list(
169                cat.searchResults(reg_number=(reg_number, reg_number)))
170            if results:
[8615]171                result = results[0].__parent__
172        elif self.getLocator(row) == 'applicant_id':
[8331]173            applicant_id = row['applicant_id']
174            cat = queryUtility(ICatalog, name='applicants_catalog')
175            results = list(
176                cat.searchResults(applicant_id=(applicant_id, applicant_id)))
177            if results:
[8615]178                result = results[0].__parent__
179        return result
[7262]180
[7268]181    def parentsExist(self, row, site):
182        return self.getParent(row, site) is not None
183
[7262]184    def getEntry(self, row, site):
[8331]185        if self.getLocator(row) == 'container_code':
186            if row.get('application_number', None) not in (IGNORE_MARKER, None):
187                if not self.parentsExist(row, site):
188                    return None
189                parent = self.getParent(row, site)
190                return parent.get(row['application_number'])
[7264]191            return None
[8331]192        if self.getLocator(row) == 'applicant_id':
193            applicant_id = row['applicant_id']
194            cat = queryUtility(ICatalog, name='applicants_catalog')
195            results = list(
196                cat.searchResults(applicant_id=(applicant_id, applicant_id)))
197            if results:
198                return results[0]
199        if self.getLocator(row) == 'reg_number':
[7270]200            reg_number = row['reg_number']
[7268]201            cat = queryUtility(ICatalog, name='applicants_catalog')
202            results = list(
[7270]203                cat.searchResults(reg_number=(reg_number, reg_number)))
[7268]204            if results:
205                return results[0]
206        return None
[7262]207
[7268]208    def entryExists(self, row, site):
209        return self.getEntry(row, site) is not None
210
[7262]211    def addEntry(self, obj, row, site):
212        parent = self.getParent(row, site)
213        parent.addApplicant(obj)
[8334]214        #parent.__parent__.logger.info(
215        #    'Applicant imported: %s' % obj.applicant_id)
[8290]216        history = IObjectHistory(obj)
[8334]217        history.addMessage(_('Application record imported'))
[7262]218        return
219
220    def delEntry(self, row, site):
[7268]221        applicant = self.getEntry(row, site)
222        if applicant is not None:
[8331]223            parent = applicant.__parent__
[7268]224            del parent[applicant.application_number]
[8334]225            #parent.__parent__.logger.info(
226            #    'Applicant removed: %s' % applicant.applicant_id)
[7262]227        pass
[7268]228
[9706]229    def updateEntry(self, obj, row, site, filename):
[8290]230        """Update obj to the values given in row.
231        """
232        items_changed = ''
233        # Remove application_number from row if empty
[9701]234        if 'application_number' in row and row['application_number'] in (
[8290]235            None, IGNORE_MARKER):
236            row.pop('application_number')
237
238        # Update applicant_id fom application_number and container code
239        # if application_number is given
[9701]240        if 'application_number' in row:
[8290]241            obj.applicant_id = u'%s_%s' % (
242                row['container_code'], row['application_number'])
[8334]243            items_changed += ('%s=%s, ' % ('applicant_id', obj.applicant_id))
[8290]244            row.pop('application_number')
245
[15628]246        # Update
[9701]247        if 'password' in row:
[8334]248            passwd = row.get('password', IGNORE_MARKER)
249            if passwd not in ('', IGNORE_MARKER):
[8348]250                if passwd.startswith('{SSHA}'):
251                    # already encrypted password
252                    obj.password = passwd
[15628]253                elif passwd == DELETION_MARKER:
254                    obj.password = None
[8348]255                else:
256                    # not yet encrypted password
257                    IUserAccount(obj).setPassword(passwd)
[8334]258                items_changed += ('%s=%s, ' % ('password', passwd))
[8290]259            row.pop('password')
260
[17762]261        # Replace entire history
262        if 'history' in row:
263            new_history = row.get('history', IGNORE_MARKER)
264            if new_history not in (IGNORE_MARKER, ''):
265                history = IObjectHistory(obj)
266                history._annotations[
267                    history.history_key] = literal_eval(new_history)
268                items_changed += ('%s=%s, ' % ('history', new_history))
269            row.pop('history')
270
[8290]271        # Update registration state
[9701]272        if 'state' in row:
[8334]273            state = row.get('state', IGNORE_MARKER)
274            if state not in (IGNORE_MARKER, ''):
275                IWorkflowState(obj).setState(state)
276                msg = _("State '${a}' set", mapping = {'a':state})
277                history = IObjectHistory(obj)
278                history.addMessage(msg)
279                items_changed += ('%s=%s, ' % ('state', state))
[8290]280            row.pop('state')
281
282        # apply other values...
[8334]283        items_changed += super(ApplicantProcessor, self).updateEntry(
[9706]284            obj, row, site, filename)
[8290]285
286        # Log actions...
287        parent = self.getParent(row, site)
[8334]288        if self.getLocator(row) == 'container_code':
[8290]289            parent.__parent__.logger.info(
[9706]290                '%s - %s - imported: %s' % (self.name, filename, items_changed))
[8290]291        else:
[8334]292            parent.__parent__.logger.info(
[9706]293                '%s - %s - updated: %s' % (self.name, filename, items_changed))
[8290]294        return items_changed
295
[7268]296    def getMapping(self, path, headerfields, mode):
297        """Get a mapping from CSV file headerfields to actually used fieldnames.
298        """
299        result = dict()
300        reader = csv.reader(open(path, 'rb'))
301        raw_header = reader.next()
302        for num, field in enumerate(headerfields):
[8331]303            if field not in ['applicant_id', 'reg_number'] and mode == 'remove':
[7268]304                continue
305            if field == u'--IGNORE--':
306                # Skip ignored columns in failed and finished data files.
307                continue
308            result[raw_header[num]] = field
309        return result
310
311    def checkConversion(self, row, mode='create'):
312        """Validates all values in row.
313        """
[8331]314        iface = self.iface
315        if self.getLocator(row) == 'reg_number' or mode == 'remove':
[8581]316            iface = self.iface_byregnumber
[7268]317        converter = IObjectConverter(iface)
318        errs, inv_errs, conv_dict =  converter.fromStringDict(
[8223]319            row, self.factory_name, mode=mode)
[8615]320        cert = conv_dict.get('course1', None)
321        if cert is not None and (mode in ('create', 'update')):
[8617]322            # course1 application category must match container's.
[12869]323            site = grok.getSite()
324            parent = self.getParent(row, site)
[10613]325            if parent is None:
326                errs.append(('container', 'not found'))
327            elif cert.application_category != parent.application_category:
[8615]328                errs.append(('course1', 'wrong application category'))
[9701]329        if 'state' in row and \
[8290]330            not row['state'] in IMPORTABLE_STATES:
331            if row['state'] not in (IGNORE_MARKER, ''):
332                errs.append(('state','not allowed'))
333            else:
334                # state is an attribute of Applicant and must not
335                # be changed if empty
336                conv_dict['state'] = IGNORE_MARKER
[8331]337        application_number = row.get('application_number', None)
[8290]338        if application_number in (IGNORE_MARKER, ''):
339                conv_dict['application_number'] = IGNORE_MARKER
[7268]340        return errs, inv_errs, conv_dict
[8336]341
342    def checkUpdateRequirements(self, obj, row, site):
343        """Checks requirements the object must fulfill when being updated.
344
345        This method is not used in case of deleting or adding objects.
346
347        Returns error messages as strings in case of requirement
348        problems.
349        """
[17705]350        # Block applicant with backdoor ...
351        if obj.state == CREATED and obj.notice != 'reset':
[8336]352            return 'Applicant is blocked.'
353        return None
[13872]354
355class ApplicantOnlinePaymentProcessor(BatchProcessor):
356    """The Applicant Online Payment Processor imports applicant payment tickets.
357    The tickets are located in the applicant container.
358
359    The `checkConversion` method checks the format of the payment identifier.
360    In create mode it does also ensures that same p_id does not exist
361    elsewhere. It must be portal-wide unique.
362
363    When adding a payment ticket, the `addEntry` method checks if a
364    payment has already been made. If so, a `DuplicationError` is raised.
365    """
366    grok.implements(IBatchProcessor)
367    grok.provides(IBatchProcessor)
368    grok.context(Interface)
369    util_name = 'applicantpaymentprocessor'
370    grok.name(util_name)
371
372    name = _('ApplicantOnlinePayment Processor')
373    iface = IApplicantOnlinePayment
374    factory_name = 'waeup.ApplicantOnlinePayment'
375
[16472]376    location_fields = ['applicant_id',]
[13872]377
378    @property
379    def available_fields(self):
380        af = sorted(list(set(
[14804]381            self.location_fields + getFields(self.iface).keys())) +
382            ['p_id',])
[13872]383        af.remove('display_item')
384        return af
385
[14804]386    def checkHeaders(self, headerfields, mode='ignore'):
387        super(ApplicantOnlinePaymentProcessor, self).checkHeaders(headerfields)
388        if mode in ('update', 'remove') and not 'p_id' in headerfields:
389            raise FatalCSVError(
390                "Need p_id for import in update and remove modes!")
391        return True
392
[16472]393    def getMapping(self, path, headerfields, mode):
394        """Get a mapping from CSV file headerfields to actually used fieldnames.
395
396        """
397        result = dict()
398        reader = unicodecsv.reader(open(path, 'rb'))
399        raw_header = reader.next()
400        for num, field in enumerate(headerfields):
401            if field not in  ['applicant_id', 'p_id'] and mode == 'remove':
402                continue
403            if field == u'--IGNORE--':
404                continue
405            result[raw_header[num]] = field
406        return result
407
[13872]408    def parentsExist(self, row, site):
409        return self.getParent(row, site) is not None
410
411    def getParent(self, row, site):
412        applicant_id = row['applicant_id']
413        cat = queryUtility(ICatalog, name='applicants_catalog')
414        results = list(
415            cat.searchResults(applicant_id=(applicant_id, applicant_id)))
416        if results:
417            return results[0]
418        return None
419
420    def getEntry(self, row, site):
421        applicant = self.getParent(row, site)
422        if applicant is None:
423            return None
424        p_id = row.get('p_id', None)
425        if p_id in (None, IGNORE_MARKER):
426            return None
427        # We can use the hash symbol at the end of p_id in import files
428        # to avoid annoying automatic number transformation
429        # by Excel or Calc
430        p_id = p_id.strip('#')
431        entry = applicant.get(p_id)
432        return entry
433
434    def entryExists(self, row, site):
435        return self.getEntry(row, site) is not None
436
437    def updateEntry(self, obj, row, site, filename):
438        """Update obj to the values given in row.
439        """
440        items_changed = super(ApplicantOnlinePaymentProcessor, self).updateEntry(
441            obj, row, site, filename)
442        applicant = self.getParent(row, site)
443        applicant.__parent__.__parent__.logger.info(
444            '%s - %s - %s - updated: %s'
445            % (self.name, filename, applicant.applicant_id, items_changed))
446        return
447
448    def samePaymentMade(self, applicant, category):
449        for key in applicant.keys():
450            ticket = applicant[key]
451            if ticket.p_state == 'paid' and\
[16202]452                ticket.p_category == category:
[16200]453                return True
[13872]454        return False
455
456    def addEntry(self, obj, row, site):
457        applicant = self.getParent(row, site)
458        p_id = row['p_id'].strip('#')
[16358]459        if not (obj.p_item and obj.p_item.startswith(
460            'Balance')) and self.samePaymentMade(applicant, obj.p_category):
[13872]461            applicant.__parent__.__parent__.logger.info(
462                '%s - %s - previous update cancelled'
463                % (self.name, applicant.applicant_id))
464            raise DuplicationError('Payment has already been made.')
465        applicant[p_id] = obj
466        return
467
468    def delEntry(self, row, site):
469        payment = self.getEntry(row, site)
470        applicant = self.getParent(row, site)
471        if payment is not None:
472            applicant.__parent__.__parent__.logger.info('%s - Payment ticket removed: %s'
473                % (applicant.applicant_id, payment.p_id))
474            del applicant[payment.p_id]
475        return
476
477    def checkConversion(self, row, mode='ignore'):
478        """Validates all values in row.
479        """
480        errs, inv_errs, conv_dict = super(
481            ApplicantOnlinePaymentProcessor, self).checkConversion(row, mode=mode)
482        # We have to check p_id.
483        p_id = row.get('p_id', None)
484        if mode == 'create' and p_id in (None, IGNORE_MARKER):
485            timestamp = ("%d" % int(time()*10000))[1:]
486            p_id = "p%s" % timestamp
487            conv_dict['p_id'] = p_id
488            return errs, inv_errs, conv_dict
489        elif p_id in (None, IGNORE_MARKER):
490            errs.append(('p_id','missing'))
491            return errs, inv_errs, conv_dict
492        else:
493            p_id = p_id.strip('#')
494            if not len(p_id) == 14:
495                errs.append(('p_id','invalid length'))
496                return errs, inv_errs, conv_dict
497        if mode == 'create':
498            cat = getUtility(ICatalog, name='payments_catalog')
499            results = list(cat.searchResults(p_id=(p_id, p_id)))
500            if len(results) > 0:
501                sids = [IPayer(payment).id for payment in results]
502                sids_string = ''
503                for id in sids:
504                    sids_string += '%s ' % id
505                errs.append(('p_id','p_id exists in %s' % sids_string))
506                return errs, inv_errs, conv_dict
507        return errs, inv_errs, conv_dict
[17772]508
509class ApplicantRefereeReportProcessor(BatchProcessor):
510    """The Applicant Referee Report Processor imports applicant referee reports.
511    The reports are located in the applicant container.
512    """
513    grok.implements(IBatchProcessor)
514    grok.provides(IBatchProcessor)
515    grok.context(Interface)
516    util_name = 'applicantrefereereportprocessor'
517    grok.name(util_name)
518
519    name = _('ApplicantRefereeReport Processor')
520    iface = IApplicantRefereeReport
521    factory_name = 'waeup.ApplicantRefereeReport'
522
523    location_fields = ['applicant_id',]
524
[17774]525    @property
526    def available_fields(self):
527        return sorted(list(set(
528            ['r_id', 'email', 'applicant_id'] + getFields(self.iface).keys())))
529
[17772]530    def parentsExist(self, row, site):
531        return self.getParent(row, site) is not None
532
533    def getParent(self, row, site):
534        applicant_id = row['applicant_id']
535        cat = queryUtility(ICatalog, name='applicants_catalog')
536        results = list(
537            cat.searchResults(applicant_id=(applicant_id, applicant_id)))
538        if results:
539            return results[0]
540        return None
541
542    def getEntry(self, row, site):
543        parent = self.getParent(row, site)
544        if parent is None:
545            return None
546        return parent.get(row['r_id'])
547
548    def entryExists(self, row, site):
549        return self.getEntry(row, site) is not None
550
551    def updateEntry(self, obj, row, site, filename):
552        """Update obj to the values given in row.
553        """
554        items_changed = super(ApplicantRefereeReportProcessor, self).updateEntry(
555            obj, row, site, filename)
556        applicant = self.getParent(row, site)
557        applicant.__parent__.__parent__.logger.info(
558            '%s - %s - %s - updated: %s'
559            % (self.name, filename, applicant.applicant_id, items_changed))
560        return
561
562    def addEntry(self, obj, row, site):
563        parent = self.getParent(row, site)
564        parent[row['r_id']] = obj
565        return
566
567    def delEntry(self, row, site):
568        report = self.getEntry(row, site)
569        parent = self.getParent(row, site)
570        if report is not None:
571            parent.__parent__.__parent__.logger.info(
572                '%s - Referee Report removed' % report.r_id)
573            del parent[report.r_id]
574        return
Note: See TracBrowser for help on using the repository browser.