source: main/waeup.kofa/branches/uli-fix-zc-async/src/waeup/kofa/applicants/batching.py @ 17981

Last change on this file since 17981 was 16472, checked in by Henrik Bettermann, 4 years ago

Adjust ApplicantOnlinePaymentProcessor.getMapping (2nd bugfix).

  • Property svn:keywords set to Id
File size: 19.4 KB
Line 
1## $Id: batching.py 16472 2021-04-21 14:35:48Z henrik $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
18"""Batch processing for applicants.
19"""
20import unicodecsv as csv # XXX: csv ops should move to dedicated module.
21import grok
22from time import time
23import unicodecsv
24from zope.schema import getFields
25from zope.interface import Interface
26from zope.component import queryUtility, getUtility
27from hurry.workflow.interfaces import IWorkflowState
28from zope.catalog.interfaces import ICatalog
29from waeup.kofa.interfaces import (
30    IBatchProcessor, IObjectConverter, FatalCSVError,
31    IGNORE_MARKER, DELETION_MARKER,
32    IObjectHistory, IUserAccount, DuplicationError)
33from waeup.kofa.interfaces import MessageFactory as _
34from waeup.kofa.payments.interfaces import IPayer
35from waeup.kofa.utils.batching import BatchProcessor
36from waeup.kofa.applicants.interfaces import (
37    IApplicantsContainer, IApplicant, IApplicantUpdateByRegNo,
38    IApplicantOnlinePayment)
39from waeup.kofa.applicants.workflow import  IMPORTABLE_STATES, CREATED
40
41class ApplicantsContainerProcessor(BatchProcessor):
42    """The Applicants Container Processor imports containers for applicants.
43    It does not import their content. There is nothing special about this
44    processor.
45    """
46    grok.implements(IBatchProcessor)
47    grok.provides(IBatchProcessor)
48    grok.context(Interface)
49    util_name = 'applicantscontainerprocessor'
50    grok.name(util_name)
51
52    name = _('ApplicantsContainer Processor')
53    mode = u'create'
54    iface = IApplicantsContainer
55
56    location_fields = ['code',]
57    factory_name = 'waeup.ApplicantsContainer'
58
59    def parentsExist(self, row, site):
60        return 'applicants' in site.keys()
61
62    def entryExists(self, row, site):
63        return row['code'] in site['applicants'].keys()
64
65    def getParent(self, row, site):
66        return site['applicants']
67
68    def getEntry(self, row, site):
69        if not self.entryExists(row, site):
70            return None
71        parent = self.getParent(row, site)
72        return parent.get(row['code'])
73
74    def addEntry(self, obj, row, site):
75        parent = self.getParent(row, site)
76        parent[row['code']] = obj
77        return
78
79    def delEntry(self, row, site):
80        parent = self.getParent(row, site)
81        del parent[row['code']]
82        return
83
84class ApplicantProcessor(BatchProcessor):
85    """The Applicant Processor imports application data (applicants).
86
87    In create mode `container_code` is required. If `application_number` is
88    given, an applicant with this number is created in the designated container.
89    If `application_number` is not given, a random `application_number` is
90    assigned. `applicant_id` is being determined by the system and can't be
91    imported.
92
93    In update or remove mode `container_code` and `application_number` columns
94    must not exist. The applicant object is solely localized by searching
95    the applicants catalog for `reg_number` or `applicant_id` .
96    """
97    grok.implements(IBatchProcessor)
98    grok.provides(IBatchProcessor)
99    grok.context(Interface)
100    util_name = 'applicantprocessor'
101    grok.name(util_name)
102    name = _('Applicant Processor')
103    iface = IApplicant
104    iface_byregnumber = IApplicantUpdateByRegNo
105    factory_name = 'waeup.Applicant'
106
107    mode = None
108
109    @property
110    def available_fields(self):
111        return sorted(list(set(
112            ['application_number',
113            'container_code','state','password'] + getFields(
114                self.iface).keys())))
115
116    def checkHeaders(self, headerfields, mode='create'):
117        cond1 = 'container_code' in headerfields
118        cond2 = 'application_number' in headerfields
119        cond3 = 'applicant_id' in headerfields
120        cond4 = 'reg_number' in headerfields
121        if mode == 'create':
122            if not cond1:
123                raise FatalCSVError(
124                    "Need at least container_code column!")
125            if cond3:
126                raise FatalCSVError(
127                    "applicant_id can't be imported in create mode!")
128            for field in self.required_fields:
129                if not field in headerfields:
130                    raise FatalCSVError(
131                        "Need at least columns %s for import!" %
132                        ', '.join(["'%s'" % x for x in self.required_fields]))
133        if mode in ('update', 'remove'):
134            if not cond3 and not cond4:
135                raise FatalCSVError(
136                    "Need at least column reg_number or applicant_id!")
137            if cond1 or cond2:
138                raise FatalCSVError(
139                    "container_code or application_number can't be imported " +
140                    "in update or remove mode!")
141        # Check for fields to be ignored...
142        not_ignored_fields = [x for x in headerfields
143                              if not x.startswith('--')]
144        if len(set(not_ignored_fields)) < len(not_ignored_fields):
145            raise FatalCSVError(
146                "Double headers: each column name may only appear once.")
147        return True
148
149    def getLocator(self, row):
150        if row.get('container_code', None) not in (IGNORE_MARKER, None):
151            # create mode
152            return 'container_code'
153        elif row.get('applicant_id', None) not in (IGNORE_MARKER, None):
154            # update or remove mode
155            return 'applicant_id'
156        elif row.get('reg_number', None) not in (IGNORE_MARKER, None):
157            # update or remove mode
158            return 'reg_number'
159        else:
160            return None
161
162    def getParent(self, row, site):
163        result = None
164        if self.getLocator(row) == 'container_code':
165            result = site['applicants'].get(row['container_code'], None)
166        elif self.getLocator(row) == 'reg_number':
167            reg_number = row['reg_number']
168            cat = queryUtility(ICatalog, name='applicants_catalog')
169            results = list(
170                cat.searchResults(reg_number=(reg_number, reg_number)))
171            if results:
172                result = results[0].__parent__
173        elif self.getLocator(row) == 'applicant_id':
174            applicant_id = row['applicant_id']
175            cat = queryUtility(ICatalog, name='applicants_catalog')
176            results = list(
177                cat.searchResults(applicant_id=(applicant_id, applicant_id)))
178            if results:
179                result = results[0].__parent__
180        return result
181
182    def parentsExist(self, row, site):
183        return self.getParent(row, site) is not None
184
185    def getEntry(self, row, site):
186        if self.getLocator(row) == 'container_code':
187            if row.get('application_number', None) not in (IGNORE_MARKER, None):
188                if not self.parentsExist(row, site):
189                    return None
190                parent = self.getParent(row, site)
191                return parent.get(row['application_number'])
192            return None
193        if self.getLocator(row) == 'applicant_id':
194            applicant_id = row['applicant_id']
195            cat = queryUtility(ICatalog, name='applicants_catalog')
196            results = list(
197                cat.searchResults(applicant_id=(applicant_id, applicant_id)))
198            if results:
199                return results[0]
200        if self.getLocator(row) == 'reg_number':
201            reg_number = row['reg_number']
202            cat = queryUtility(ICatalog, name='applicants_catalog')
203            results = list(
204                cat.searchResults(reg_number=(reg_number, reg_number)))
205            if results:
206                return results[0]
207        return None
208
209    def entryExists(self, row, site):
210        return self.getEntry(row, site) is not None
211
212    def addEntry(self, obj, row, site):
213        parent = self.getParent(row, site)
214        parent.addApplicant(obj)
215        #parent.__parent__.logger.info(
216        #    'Applicant imported: %s' % obj.applicant_id)
217        history = IObjectHistory(obj)
218        history.addMessage(_('Application record imported'))
219        return
220
221    def delEntry(self, row, site):
222        applicant = self.getEntry(row, site)
223        if applicant is not None:
224            parent = applicant.__parent__
225            del parent[applicant.application_number]
226            #parent.__parent__.logger.info(
227            #    'Applicant removed: %s' % applicant.applicant_id)
228        pass
229
230    def updateEntry(self, obj, row, site, filename):
231        """Update obj to the values given in row.
232        """
233        items_changed = ''
234        # Remove application_number from row if empty
235        if 'application_number' in row and row['application_number'] in (
236            None, IGNORE_MARKER):
237            row.pop('application_number')
238
239        # Update applicant_id fom application_number and container code
240        # if application_number is given
241        if 'application_number' in row:
242            obj.applicant_id = u'%s_%s' % (
243                row['container_code'], row['application_number'])
244            items_changed += ('%s=%s, ' % ('applicant_id', obj.applicant_id))
245            row.pop('application_number')
246
247        # Update
248        if 'password' in row:
249            passwd = row.get('password', IGNORE_MARKER)
250            if passwd not in ('', IGNORE_MARKER):
251                if passwd.startswith('{SSHA}'):
252                    # already encrypted password
253                    obj.password = passwd
254                elif passwd == DELETION_MARKER:
255                    obj.password = None
256                else:
257                    # not yet encrypted password
258                    IUserAccount(obj).setPassword(passwd)
259                items_changed += ('%s=%s, ' % ('password', passwd))
260            row.pop('password')
261
262        # Update registration state
263        if 'state' in row:
264            state = row.get('state', IGNORE_MARKER)
265            if state not in (IGNORE_MARKER, ''):
266                IWorkflowState(obj).setState(state)
267                msg = _("State '${a}' set", mapping = {'a':state})
268                history = IObjectHistory(obj)
269                history.addMessage(msg)
270                items_changed += ('%s=%s, ' % ('state', state))
271            row.pop('state')
272
273        # apply other values...
274        items_changed += super(ApplicantProcessor, self).updateEntry(
275            obj, row, site, filename)
276
277        # Log actions...
278        parent = self.getParent(row, site)
279        if self.getLocator(row) == 'container_code':
280            parent.__parent__.logger.info(
281                '%s - %s - imported: %s' % (self.name, filename, items_changed))
282        else:
283            parent.__parent__.logger.info(
284                '%s - %s - updated: %s' % (self.name, filename, items_changed))
285        return items_changed
286
287    def getMapping(self, path, headerfields, mode):
288        """Get a mapping from CSV file headerfields to actually used fieldnames.
289        """
290        result = dict()
291        reader = csv.reader(open(path, 'rb'))
292        raw_header = reader.next()
293        for num, field in enumerate(headerfields):
294            if field not in ['applicant_id', 'reg_number'] and mode == 'remove':
295                continue
296            if field == u'--IGNORE--':
297                # Skip ignored columns in failed and finished data files.
298                continue
299            result[raw_header[num]] = field
300        return result
301
302    def checkConversion(self, row, mode='create'):
303        """Validates all values in row.
304        """
305        iface = self.iface
306        if self.getLocator(row) == 'reg_number' or mode == 'remove':
307            iface = self.iface_byregnumber
308        converter = IObjectConverter(iface)
309        errs, inv_errs, conv_dict =  converter.fromStringDict(
310            row, self.factory_name, mode=mode)
311        cert = conv_dict.get('course1', None)
312        if cert is not None and (mode in ('create', 'update')):
313            # course1 application category must match container's.
314            site = grok.getSite()
315            parent = self.getParent(row, site)
316            if parent is None:
317                errs.append(('container', 'not found'))
318            elif cert.application_category != parent.application_category:
319                errs.append(('course1', 'wrong application category'))
320        if 'state' in row and \
321            not row['state'] in IMPORTABLE_STATES:
322            if row['state'] not in (IGNORE_MARKER, ''):
323                errs.append(('state','not allowed'))
324            else:
325                # state is an attribute of Applicant and must not
326                # be changed if empty
327                conv_dict['state'] = IGNORE_MARKER
328        application_number = row.get('application_number', None)
329        if application_number in (IGNORE_MARKER, ''):
330                conv_dict['application_number'] = IGNORE_MARKER
331        return errs, inv_errs, conv_dict
332
333    def checkUpdateRequirements(self, obj, row, site):
334        """Checks requirements the object must fulfill when being updated.
335
336        This method is not used in case of deleting or adding objects.
337
338        Returns error messages as strings in case of requirement
339        problems.
340        """
341        if obj.state == CREATED:
342            return 'Applicant is blocked.'
343        return None
344
345class ApplicantOnlinePaymentProcessor(BatchProcessor):
346    """The Applicant Online Payment Processor imports applicant payment tickets.
347    The tickets are located in the applicant container.
348
349    The `checkConversion` method checks the format of the payment identifier.
350    In create mode it does also ensures that same p_id does not exist
351    elsewhere. It must be portal-wide unique.
352
353    When adding a payment ticket, the `addEntry` method checks if a
354    payment has already been made. If so, a `DuplicationError` is raised.
355    """
356    grok.implements(IBatchProcessor)
357    grok.provides(IBatchProcessor)
358    grok.context(Interface)
359    util_name = 'applicantpaymentprocessor'
360    grok.name(util_name)
361
362    name = _('ApplicantOnlinePayment Processor')
363    iface = IApplicantOnlinePayment
364    factory_name = 'waeup.ApplicantOnlinePayment'
365
366    location_fields = ['applicant_id',]
367
368    @property
369    def available_fields(self):
370        af = sorted(list(set(
371            self.location_fields + getFields(self.iface).keys())) +
372            ['p_id',])
373        af.remove('display_item')
374        return af
375
376    def checkHeaders(self, headerfields, mode='ignore'):
377        super(ApplicantOnlinePaymentProcessor, self).checkHeaders(headerfields)
378        if mode in ('update', 'remove') and not 'p_id' in headerfields:
379            raise FatalCSVError(
380                "Need p_id for import in update and remove modes!")
381        return True
382
383    def getMapping(self, path, headerfields, mode):
384        """Get a mapping from CSV file headerfields to actually used fieldnames.
385
386        """
387        result = dict()
388        reader = unicodecsv.reader(open(path, 'rb'))
389        raw_header = reader.next()
390        for num, field in enumerate(headerfields):
391            if field not in  ['applicant_id', 'p_id'] and mode == 'remove':
392                continue
393            if field == u'--IGNORE--':
394                continue
395            result[raw_header[num]] = field
396        return result
397
398    def parentsExist(self, row, site):
399        return self.getParent(row, site) is not None
400
401    def getParent(self, row, site):
402        applicant_id = row['applicant_id']
403        cat = queryUtility(ICatalog, name='applicants_catalog')
404        results = list(
405            cat.searchResults(applicant_id=(applicant_id, applicant_id)))
406        if results:
407            return results[0]
408        return None
409
410    def getEntry(self, row, site):
411        applicant = self.getParent(row, site)
412        if applicant is None:
413            return None
414        p_id = row.get('p_id', None)
415        if p_id in (None, IGNORE_MARKER):
416            return None
417        # We can use the hash symbol at the end of p_id in import files
418        # to avoid annoying automatic number transformation
419        # by Excel or Calc
420        p_id = p_id.strip('#')
421        entry = applicant.get(p_id)
422        return entry
423
424    def entryExists(self, row, site):
425        return self.getEntry(row, site) is not None
426
427    def updateEntry(self, obj, row, site, filename):
428        """Update obj to the values given in row.
429        """
430        items_changed = super(ApplicantOnlinePaymentProcessor, self).updateEntry(
431            obj, row, site, filename)
432        applicant = self.getParent(row, site)
433        applicant.__parent__.__parent__.logger.info(
434            '%s - %s - %s - updated: %s'
435            % (self.name, filename, applicant.applicant_id, items_changed))
436        return
437
438    def samePaymentMade(self, applicant, category):
439        for key in applicant.keys():
440            ticket = applicant[key]
441            if ticket.p_state == 'paid' and\
442                ticket.p_category == category:
443                return True
444        return False
445
446    def addEntry(self, obj, row, site):
447        applicant = self.getParent(row, site)
448        p_id = row['p_id'].strip('#')
449        if not (obj.p_item and obj.p_item.startswith(
450            'Balance')) and self.samePaymentMade(applicant, obj.p_category):
451            applicant.__parent__.__parent__.logger.info(
452                '%s - %s - previous update cancelled'
453                % (self.name, applicant.applicant_id))
454            raise DuplicationError('Payment has already been made.')
455        applicant[p_id] = obj
456        return
457
458    def delEntry(self, row, site):
459        payment = self.getEntry(row, site)
460        applicant = self.getParent(row, site)
461        if payment is not None:
462            applicant.__parent__.__parent__.logger.info('%s - Payment ticket removed: %s'
463                % (applicant.applicant_id, payment.p_id))
464            del applicant[payment.p_id]
465        return
466
467    def checkConversion(self, row, mode='ignore'):
468        """Validates all values in row.
469        """
470        errs, inv_errs, conv_dict = super(
471            ApplicantOnlinePaymentProcessor, self).checkConversion(row, mode=mode)
472        # We have to check p_id.
473        p_id = row.get('p_id', None)
474        if mode == 'create' and p_id in (None, IGNORE_MARKER):
475            timestamp = ("%d" % int(time()*10000))[1:]
476            p_id = "p%s" % timestamp
477            conv_dict['p_id'] = p_id
478            return errs, inv_errs, conv_dict
479        elif p_id in (None, IGNORE_MARKER):
480            errs.append(('p_id','missing'))
481            return errs, inv_errs, conv_dict
482        else:
483            p_id = p_id.strip('#')
484            if not len(p_id) == 14:
485                errs.append(('p_id','invalid length'))
486                return errs, inv_errs, conv_dict
487        if mode == 'create':
488            cat = getUtility(ICatalog, name='payments_catalog')
489            results = list(cat.searchResults(p_id=(p_id, p_id)))
490            if len(results) > 0:
491                sids = [IPayer(payment).id for payment in results]
492                sids_string = ''
493                for id in sids:
494                    sids_string += '%s ' % id
495                errs.append(('p_id','p_id exists in %s' % sids_string))
496                return errs, inv_errs, conv_dict
497        return errs, inv_errs, conv_dict
Note: See TracBrowser for help on using the repository browser.