source: main/waeup.kofa/trunk/src/waeup/kofa/applicants/batching.py @ 16299

Last change on this file since 16299 was 16202, checked in by Henrik Bettermann, 4 years ago

Do it right.

  • Property svn:keywords set to Id
File size: 18.8 KB
Line 
1## $Id: batching.py 16202 2020-08-17 14:37:00Z henrik $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
18"""Batch processing for applicants.
19"""
20import unicodecsv as csv # XXX: csv ops should move to dedicated module.
21import grok
22from time import time
23from zope.schema import getFields
24from zope.interface import Interface
25from zope.component import queryUtility, getUtility
26from hurry.workflow.interfaces import IWorkflowState
27from zope.catalog.interfaces import ICatalog
28from waeup.kofa.interfaces import (
29    IBatchProcessor, IObjectConverter, FatalCSVError,
30    IGNORE_MARKER, DELETION_MARKER,
31    IObjectHistory, IUserAccount, DuplicationError)
32from waeup.kofa.interfaces import MessageFactory as _
33from waeup.kofa.payments.interfaces import IPayer
34from waeup.kofa.utils.batching import BatchProcessor
35from waeup.kofa.applicants.interfaces import (
36    IApplicantsContainer, IApplicant, IApplicantUpdateByRegNo,
37    IApplicantOnlinePayment)
38from waeup.kofa.applicants.workflow import  IMPORTABLE_STATES, CREATED
39
40class ApplicantsContainerProcessor(BatchProcessor):
41    """The Applicants Container Processor imports containers for applicants.
42    It does not import their content. There is nothing special about this
43    processor.
44    """
45    grok.implements(IBatchProcessor)
46    grok.provides(IBatchProcessor)
47    grok.context(Interface)
48    util_name = 'applicantscontainerprocessor'
49    grok.name(util_name)
50
51    name = _('ApplicantsContainer Processor')
52    mode = u'create'
53    iface = IApplicantsContainer
54
55    location_fields = ['code',]
56    factory_name = 'waeup.ApplicantsContainer'
57
58    def parentsExist(self, row, site):
59        return 'applicants' in site.keys()
60
61    def entryExists(self, row, site):
62        return row['code'] in site['applicants'].keys()
63
64    def getParent(self, row, site):
65        return site['applicants']
66
67    def getEntry(self, row, site):
68        if not self.entryExists(row, site):
69            return None
70        parent = self.getParent(row, site)
71        return parent.get(row['code'])
72
73    def addEntry(self, obj, row, site):
74        parent = self.getParent(row, site)
75        parent[row['code']] = obj
76        return
77
78    def delEntry(self, row, site):
79        parent = self.getParent(row, site)
80        del parent[row['code']]
81        return
82
83class ApplicantProcessor(BatchProcessor):
84    """The Applicant Processor imports application data (applicants).
85
86    In create mode `container_code` is required. If `application_number` is
87    given, an applicant with this number is created in the designated container.
88    If `application_number` is not given, a random `application_number` is
89    assigned. `applicant_id` is being determined by the system and can't be
90    imported.
91
92    In update or remove mode `container_code` and `application_number` columns
93    must not exist. The applicant object is solely localized by searching
94    the applicants catalog for `reg_number` or `applicant_id` .
95    """
96    grok.implements(IBatchProcessor)
97    grok.provides(IBatchProcessor)
98    grok.context(Interface)
99    util_name = 'applicantprocessor'
100    grok.name(util_name)
101    name = _('Applicant Processor')
102    iface = IApplicant
103    iface_byregnumber = IApplicantUpdateByRegNo
104    factory_name = 'waeup.Applicant'
105
106    mode = None
107
108    @property
109    def available_fields(self):
110        return sorted(list(set(
111            ['application_number',
112            'container_code','state','password'] + getFields(
113                self.iface).keys())))
114
115    def checkHeaders(self, headerfields, mode='create'):
116        cond1 = 'container_code' in headerfields
117        cond2 = 'application_number' in headerfields
118        cond3 = 'applicant_id' in headerfields
119        cond4 = 'reg_number' in headerfields
120        if mode == 'create':
121            if not cond1:
122                raise FatalCSVError(
123                    "Need at least container_code column!")
124            if cond3:
125                raise FatalCSVError(
126                    "applicant_id can't be imported in create mode!")
127            for field in self.required_fields:
128                if not field in headerfields:
129                    raise FatalCSVError(
130                        "Need at least columns %s for import!" %
131                        ', '.join(["'%s'" % x for x in self.required_fields]))
132        if mode in ('update', 'remove'):
133            if not cond3 and not cond4:
134                raise FatalCSVError(
135                    "Need at least column reg_number or applicant_id!")
136            if cond1 or cond2:
137                raise FatalCSVError(
138                    "container_code or application_number can't be imported " +
139                    "in update or remove mode!")
140        # Check for fields to be ignored...
141        not_ignored_fields = [x for x in headerfields
142                              if not x.startswith('--')]
143        if len(set(not_ignored_fields)) < len(not_ignored_fields):
144            raise FatalCSVError(
145                "Double headers: each column name may only appear once.")
146        return True
147
148    def getLocator(self, row):
149        if row.get('container_code', None) not in (IGNORE_MARKER, None):
150            # create mode
151            return 'container_code'
152        elif row.get('applicant_id', None) not in (IGNORE_MARKER, None):
153            # update or remove mode
154            return 'applicant_id'
155        elif row.get('reg_number', None) not in (IGNORE_MARKER, None):
156            # update or remove mode
157            return 'reg_number'
158        else:
159            return None
160
161    def getParent(self, row, site):
162        result = None
163        if self.getLocator(row) == 'container_code':
164            result = site['applicants'].get(row['container_code'], None)
165        elif self.getLocator(row) == 'reg_number':
166            reg_number = row['reg_number']
167            cat = queryUtility(ICatalog, name='applicants_catalog')
168            results = list(
169                cat.searchResults(reg_number=(reg_number, reg_number)))
170            if results:
171                result = results[0].__parent__
172        elif self.getLocator(row) == 'applicant_id':
173            applicant_id = row['applicant_id']
174            cat = queryUtility(ICatalog, name='applicants_catalog')
175            results = list(
176                cat.searchResults(applicant_id=(applicant_id, applicant_id)))
177            if results:
178                result = results[0].__parent__
179        return result
180
181    def parentsExist(self, row, site):
182        return self.getParent(row, site) is not None
183
184    def getEntry(self, row, site):
185        if self.getLocator(row) == 'container_code':
186            if row.get('application_number', None) not in (IGNORE_MARKER, None):
187                if not self.parentsExist(row, site):
188                    return None
189                parent = self.getParent(row, site)
190                return parent.get(row['application_number'])
191            return None
192        if self.getLocator(row) == 'applicant_id':
193            applicant_id = row['applicant_id']
194            cat = queryUtility(ICatalog, name='applicants_catalog')
195            results = list(
196                cat.searchResults(applicant_id=(applicant_id, applicant_id)))
197            if results:
198                return results[0]
199        if self.getLocator(row) == 'reg_number':
200            reg_number = row['reg_number']
201            cat = queryUtility(ICatalog, name='applicants_catalog')
202            results = list(
203                cat.searchResults(reg_number=(reg_number, reg_number)))
204            if results:
205                return results[0]
206        return None
207
208    def entryExists(self, row, site):
209        return self.getEntry(row, site) is not None
210
211    def addEntry(self, obj, row, site):
212        parent = self.getParent(row, site)
213        parent.addApplicant(obj)
214        #parent.__parent__.logger.info(
215        #    'Applicant imported: %s' % obj.applicant_id)
216        history = IObjectHistory(obj)
217        history.addMessage(_('Application record imported'))
218        return
219
220    def delEntry(self, row, site):
221        applicant = self.getEntry(row, site)
222        if applicant is not None:
223            parent = applicant.__parent__
224            del parent[applicant.application_number]
225            #parent.__parent__.logger.info(
226            #    'Applicant removed: %s' % applicant.applicant_id)
227        pass
228
229    def updateEntry(self, obj, row, site, filename):
230        """Update obj to the values given in row.
231        """
232        items_changed = ''
233        # Remove application_number from row if empty
234        if 'application_number' in row and row['application_number'] in (
235            None, IGNORE_MARKER):
236            row.pop('application_number')
237
238        # Update applicant_id fom application_number and container code
239        # if application_number is given
240        if 'application_number' in row:
241            obj.applicant_id = u'%s_%s' % (
242                row['container_code'], row['application_number'])
243            items_changed += ('%s=%s, ' % ('applicant_id', obj.applicant_id))
244            row.pop('application_number')
245
246        # Update
247        if 'password' in row:
248            passwd = row.get('password', IGNORE_MARKER)
249            if passwd not in ('', IGNORE_MARKER):
250                if passwd.startswith('{SSHA}'):
251                    # already encrypted password
252                    obj.password = passwd
253                elif passwd == DELETION_MARKER:
254                    obj.password = None
255                else:
256                    # not yet encrypted password
257                    IUserAccount(obj).setPassword(passwd)
258                items_changed += ('%s=%s, ' % ('password', passwd))
259            row.pop('password')
260
261        # Update registration state
262        if 'state' in row:
263            state = row.get('state', IGNORE_MARKER)
264            if state not in (IGNORE_MARKER, ''):
265                IWorkflowState(obj).setState(state)
266                msg = _("State '${a}' set", mapping = {'a':state})
267                history = IObjectHistory(obj)
268                history.addMessage(msg)
269                items_changed += ('%s=%s, ' % ('state', state))
270            row.pop('state')
271
272        # apply other values...
273        items_changed += super(ApplicantProcessor, self).updateEntry(
274            obj, row, site, filename)
275
276        # Log actions...
277        parent = self.getParent(row, site)
278        if self.getLocator(row) == 'container_code':
279            parent.__parent__.logger.info(
280                '%s - %s - imported: %s' % (self.name, filename, items_changed))
281        else:
282            parent.__parent__.logger.info(
283                '%s - %s - updated: %s' % (self.name, filename, items_changed))
284        return items_changed
285
286    def getMapping(self, path, headerfields, mode):
287        """Get a mapping from CSV file headerfields to actually used fieldnames.
288        """
289        result = dict()
290        reader = csv.reader(open(path, 'rb'))
291        raw_header = reader.next()
292        for num, field in enumerate(headerfields):
293            if field not in ['applicant_id', 'reg_number'] and mode == 'remove':
294                continue
295            if field == u'--IGNORE--':
296                # Skip ignored columns in failed and finished data files.
297                continue
298            result[raw_header[num]] = field
299        return result
300
301    def checkConversion(self, row, mode='create'):
302        """Validates all values in row.
303        """
304        iface = self.iface
305        if self.getLocator(row) == 'reg_number' or mode == 'remove':
306            iface = self.iface_byregnumber
307        converter = IObjectConverter(iface)
308        errs, inv_errs, conv_dict =  converter.fromStringDict(
309            row, self.factory_name, mode=mode)
310        cert = conv_dict.get('course1', None)
311        if cert is not None and (mode in ('create', 'update')):
312            # course1 application category must match container's.
313            site = grok.getSite()
314            parent = self.getParent(row, site)
315            if parent is None:
316                errs.append(('container', 'not found'))
317            elif cert.application_category != parent.application_category:
318                errs.append(('course1', 'wrong application category'))
319        if 'state' in row and \
320            not row['state'] in IMPORTABLE_STATES:
321            if row['state'] not in (IGNORE_MARKER, ''):
322                errs.append(('state','not allowed'))
323            else:
324                # state is an attribute of Applicant and must not
325                # be changed if empty
326                conv_dict['state'] = IGNORE_MARKER
327        application_number = row.get('application_number', None)
328        if application_number in (IGNORE_MARKER, ''):
329                conv_dict['application_number'] = IGNORE_MARKER
330        return errs, inv_errs, conv_dict
331
332    def checkUpdateRequirements(self, obj, row, site):
333        """Checks requirements the object must fulfill when being updated.
334
335        This method is not used in case of deleting or adding objects.
336
337        Returns error messages as strings in case of requirement
338        problems.
339        """
340        if obj.state == CREATED:
341            return 'Applicant is blocked.'
342        return None
343
344class ApplicantOnlinePaymentProcessor(BatchProcessor):
345    """The Applicant Online Payment Processor imports applicant payment tickets.
346    The tickets are located in the applicant container.
347
348    The `checkConversion` method checks the format of the payment identifier.
349    In create mode it does also ensures that same p_id does not exist
350    elsewhere. It must be portal-wide unique.
351
352    When adding a payment ticket, the `addEntry` method checks if a
353    payment has already been made. If so, a `DuplicationError` is raised.
354    """
355    grok.implements(IBatchProcessor)
356    grok.provides(IBatchProcessor)
357    grok.context(Interface)
358    util_name = 'applicantpaymentprocessor'
359    grok.name(util_name)
360
361    name = _('ApplicantOnlinePayment Processor')
362    iface = IApplicantOnlinePayment
363    factory_name = 'waeup.ApplicantOnlinePayment'
364
365    location_fields = ['applicant_id',]
366
367    @property
368    def available_fields(self):
369        af = sorted(list(set(
370            self.location_fields + getFields(self.iface).keys())) +
371            ['p_id',])
372        af.remove('display_item')
373        return af
374
375    def checkHeaders(self, headerfields, mode='ignore'):
376        super(ApplicantOnlinePaymentProcessor, self).checkHeaders(headerfields)
377        if mode in ('update', 'remove') and not 'p_id' in headerfields:
378            raise FatalCSVError(
379                "Need p_id for import in update and remove modes!")
380        return True
381
382    def parentsExist(self, row, site):
383        return self.getParent(row, site) is not None
384
385    def getParent(self, row, site):
386        applicant_id = row['applicant_id']
387        cat = queryUtility(ICatalog, name='applicants_catalog')
388        results = list(
389            cat.searchResults(applicant_id=(applicant_id, applicant_id)))
390        if results:
391            return results[0]
392        return None
393
394    def getEntry(self, row, site):
395        applicant = self.getParent(row, site)
396        if applicant is None:
397            return None
398        p_id = row.get('p_id', None)
399        if p_id in (None, IGNORE_MARKER):
400            return None
401        # We can use the hash symbol at the end of p_id in import files
402        # to avoid annoying automatic number transformation
403        # by Excel or Calc
404        p_id = p_id.strip('#')
405        entry = applicant.get(p_id)
406        return entry
407
408    def entryExists(self, row, site):
409        return self.getEntry(row, site) is not None
410
411    def updateEntry(self, obj, row, site, filename):
412        """Update obj to the values given in row.
413        """
414        items_changed = super(ApplicantOnlinePaymentProcessor, self).updateEntry(
415            obj, row, site, filename)
416        applicant = self.getParent(row, site)
417        applicant.__parent__.__parent__.logger.info(
418            '%s - %s - %s - updated: %s'
419            % (self.name, filename, applicant.applicant_id, items_changed))
420        return
421
422    def samePaymentMade(self, applicant, category):
423        for key in applicant.keys():
424            ticket = applicant[key]
425            if ticket.p_state == 'paid' and\
426                ticket.p_category == category:
427                return True
428        return False
429
430    def addEntry(self, obj, row, site):
431        applicant = self.getParent(row, site)
432        p_id = row['p_id'].strip('#')
433        if obj.p_item != 'Balance' and self.samePaymentMade(
434            applicant, obj.p_category):
435            applicant.__parent__.__parent__.logger.info(
436                '%s - %s - previous update cancelled'
437                % (self.name, applicant.applicant_id))
438            raise DuplicationError('Payment has already been made.')
439        applicant[p_id] = obj
440        return
441
442    def delEntry(self, row, site):
443        payment = self.getEntry(row, site)
444        applicant = self.getParent(row, site)
445        if payment is not None:
446            applicant.__parent__.__parent__.logger.info('%s - Payment ticket removed: %s'
447                % (applicant.applicant_id, payment.p_id))
448            del applicant[payment.p_id]
449        return
450
451    def checkConversion(self, row, mode='ignore'):
452        """Validates all values in row.
453        """
454        errs, inv_errs, conv_dict = super(
455            ApplicantOnlinePaymentProcessor, self).checkConversion(row, mode=mode)
456        # We have to check p_id.
457        p_id = row.get('p_id', None)
458        if mode == 'create' and p_id in (None, IGNORE_MARKER):
459            timestamp = ("%d" % int(time()*10000))[1:]
460            p_id = "p%s" % timestamp
461            conv_dict['p_id'] = p_id
462            return errs, inv_errs, conv_dict
463        elif p_id in (None, IGNORE_MARKER):
464            errs.append(('p_id','missing'))
465            return errs, inv_errs, conv_dict
466        else:
467            p_id = p_id.strip('#')
468            if not len(p_id) == 14:
469                errs.append(('p_id','invalid length'))
470                return errs, inv_errs, conv_dict
471        if mode == 'create':
472            cat = getUtility(ICatalog, name='payments_catalog')
473            results = list(cat.searchResults(p_id=(p_id, p_id)))
474            if len(results) > 0:
475                sids = [IPayer(payment).id for payment in results]
476                sids_string = ''
477                for id in sids:
478                    sids_string += '%s ' % id
479                errs.append(('p_id','p_id exists in %s' % sids_string))
480                return errs, inv_errs, conv_dict
481        return errs, inv_errs, conv_dict
Note: See TracBrowser for help on using the repository browser.