source: main/waeup.kofa/trunk/src/waeup/kofa/applicants/tests/test_batching.py

Last change on this file was 17772, checked in by Henrik Bettermann, 4 months ago

Add ApplicantRefereeReportProcessor.

  • Property svn:keywords set to Id
File size: 29.0 KB
Line 
1## $Id: test_batching.py 17772 2024-05-13 08:31:58Z henrik $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
18"""Unit tests for applicants-related data processors.
19"""
20import datetime
21import os
22import pytz
23import shutil
24import tempfile
25import unittest
26import grok
27from time import time
28from hurry.workflow.interfaces import IWorkflowState
29from zope.component.hooks import setSite, clearSite
30from zope.component import createObject
31from zope.interface.verify import verifyClass, verifyObject
32from zope.event import notify
33
34from waeup.kofa.app import University
35from waeup.kofa.applicants.batching import (
36    ApplicantsContainerProcessor, ApplicantProcessor,
37    ApplicantOnlinePaymentProcessor,
38    ApplicantRefereeReportProcessor)
39from waeup.kofa.applicants.container import ApplicantsContainer
40from waeup.kofa.applicants.applicant import Applicant
41from waeup.kofa.university.faculty import Faculty
42from waeup.kofa.university.department import Department
43from waeup.kofa.testing import FunctionalLayer, FunctionalTestCase
44from waeup.kofa.interfaces import (
45    IBatchProcessor, IUserAccount, DuplicationError)
46from waeup.kofa.applicants.workflow import CREATED
47
48
49# Sample data we can use in tests...
50APPS_CONTAINER_SAMPLE_DATA = open(
51    os.path.join(os.path.dirname(__file__), 'sample_container_data.csv'),
52    'rb').read()
53
54# The header fields of the above CSV snippet
55APPS_CONTAINER_HEADER_FIELDS = APPS_CONTAINER_SAMPLE_DATA.split(
56    '\n')[0].split(',')
57
58# The same for applicants
59APPLICANT_SAMPLE_DATA = open(
60    os.path.join(os.path.dirname(__file__), 'sample_applicant_data.csv'),
61    'rb').read()
62FAULTY_APPLICANT_SAMPLE_DATA = open(
63    os.path.join(os.path.dirname(__file__),
64                 'sample_faulty_applicant_data.csv'), 'rb').read()
65
66APPLICANT_HEADER_FIELDS = APPLICANT_SAMPLE_DATA.split(
67    '\n')[0].split(',')
68
69APPLICANT_SAMPLE_DATA_UPDATE = open(
70    os.path.join(os.path.dirname(__file__),
71                 'sample_applicant_data_update.csv'), 'rb').read()
72
73APPLICANT_SAMPLE_DATA_UPDATE2 = open(
74    os.path.join(os.path.dirname(__file__),
75                 'sample_applicant_data_update2.csv'), 'rb').read()
76
77APPLICANT_HEADER_FIELDS_UPDATE = APPLICANT_SAMPLE_DATA_UPDATE.split(
78    '\n')[0].split(',')
79
80APPLICANT_HEADER_FIELDS_UPDATE2 = APPLICANT_SAMPLE_DATA_UPDATE2.split(
81    '\n')[0].split(',')
82
83PAYMENT_SAMPLE_DATA = open(
84    os.path.join(os.path.dirname(__file__), 'sample_payment_data.csv'),
85    'rb').read()
86
87PAYMENT_HEADER_FIELDS = PAYMENT_SAMPLE_DATA.split(
88    '\n')[0].split(',')
89
90
91REPORT_SAMPLE_DATA = open(
92    os.path.join(os.path.dirname(__file__), 'sample_report_data.csv'),
93    'rb').read()
94
95REPORT_HEADER_FIELDS = REPORT_SAMPLE_DATA.split(
96    '\n')[0].split(',')
97
98class ApplicantsContainerProcessorTest(FunctionalTestCase):
99
100    layer = FunctionalLayer
101
102    def setUp(self):
103        super(ApplicantsContainerProcessorTest, self).setUp()
104
105        # Setup a sample site for each test
106        app = University()
107        self.dc_root = tempfile.mkdtemp()
108        app['datacenter'].setStoragePath(self.dc_root)
109
110        # Prepopulate the ZODB...
111        self.getRootFolder()['app'] = app
112        self.app = self.getRootFolder()['app']
113        self.container = ApplicantsContainer()
114        self.container.code = u'dp2011'
115        self.app['applicants']['dp2011'] = self.container
116
117        self.processor = ApplicantsContainerProcessor()
118        self.workdir = tempfile.mkdtemp()
119        self.csv_file = os.path.join(self.workdir, 'sampledata.csv')
120        open(self.csv_file, 'wb').write(APPS_CONTAINER_SAMPLE_DATA)
121        setSite(self.app)
122        return
123
124    def tearDown(self):
125        super(ApplicantsContainerProcessorTest, self).tearDown()
126        shutil.rmtree(self.workdir)
127        shutil.rmtree(self.dc_root)
128        clearSite()
129        return
130
131    def test_interface(self):
132        # Make sure we fulfill the interface contracts.
133        assert verifyObject(IBatchProcessor, self.processor) is True
134        assert verifyClass(
135            IBatchProcessor, ApplicantsContainerProcessor) is True
136
137    def test_parentsExist(self):
138        assert self.processor.parentsExist(None, dict()) is False
139        assert self.processor.parentsExist(None, self.app) is True
140
141    def test_entryExists(self):
142        assert self.processor.entryExists(
143            dict(code='REG_NONE'), self.app) is False
144        assert self.processor.entryExists(
145            dict(code='dp2011'), self.app) is True
146
147    def test_getParent(self):
148        parent = self.processor.getParent(None, self.app)
149        assert parent is self.app['applicants']
150
151    def test_getEntry(self):
152        assert self.processor.getEntry(
153            dict(code='REG_NONE'), self.app) is None
154        assert self.processor.getEntry(
155            dict(code='dp2011'), self.app) is self.container
156
157    def test_addEntry(self):
158        self.processor.addEntry(
159            'New application', dict(code='dp2012'), self.app)
160        assert self.app['applicants']['dp2012'] == 'New application'
161
162    def test_delEntry(self):
163        self.processor.delEntry(dict(code='dp2011'), self.app)
164        assert 'dp2011' not in self.app['applicants'].keys()
165
166    def test_import(self):
167        # Do a real import
168        # see local sample_container.csv file for input
169        num, num_warns, fin_file, fail_file = self.processor.doImport(
170            self.csv_file, APPS_CONTAINER_HEADER_FIELDS)
171        avail_containers = [x for x in self.app['applicants'].keys()]
172        container = self.app['applicants'].get('app2017', None)
173        container2 = self.app['applicants'].get('app2018', None)
174        self.assertTrue(container is not None)
175        self.assertTrue(container2 is not None)
176
177        # check attributes
178        self.assertEqual(container.code, u'app2017')
179        self.assertEqual(container.title, u'General Studies')
180        self.assertEqual(container.prefix, u'app')
181        self.assertEqual(container.year, 2017)
182        self.assertEqual(container.application_category, 'basic')
183        self.assertEqual(
184            container.description,
185            u'This text can been seen by anonymous users.\n'
186            u'>>de<<\nDieser Text kann von anonymen Benutzern '
187            u'gelesen werden.')
188        self.assertEqual(container.startdate,
189                         datetime.datetime(2012, 3, 1, 0, 0, tzinfo=pytz.utc))
190        self.assertEqual(container.enddate,
191                         datetime.datetime(2012, 4, 25, 0, 0, tzinfo=pytz.utc))
192        shutil.rmtree(os.path.dirname(fin_file))
193
194class ApplicantImportExportSetup(FunctionalTestCase):
195
196    layer = FunctionalLayer
197
198    def setUp(self):
199        super(ApplicantImportExportSetup, self).setUp()
200        # Setup a sample site for each test
201        app = University()
202        self.dc_root = tempfile.mkdtemp()
203        app['datacenter'].setStoragePath(self.dc_root)
204
205        # Prepopulate the ZODB...
206        self.getRootFolder()['app'] = app
207        # we add the site immediately after creation to the
208        # ZODB. Catalogs and other local utilities are not setup
209        # before that step.
210        self.app = self.getRootFolder()['app']
211        # Set site here. Some of the following setup code might need
212        # to access grok.getSite() and should get our new app then
213        setSite(app)
214
215        # Add an applicants container
216        self.container = ApplicantsContainer()
217        self.container.code = u'dp2011'
218        self.container.application_category = u'basic'
219        self.app['applicants']['dp2011'] = self.container
220
221        # Populate university
222        self.certificate = createObject('waeup.Certificate')
223        self.certificate.code = 'CERT1'
224        self.certificate.application_category = 'basic'
225        self.certificate.start_level = 100
226        self.certificate.end_level = 500
227        self.app['faculties']['fac1'] = Faculty()
228        self.app['faculties']['fac1']['dep1'] = Department()
229        self.app['faculties']['fac1']['dep1'].certificates.addCertificate(
230            self.certificate)
231        self.certificate2 = createObject('waeup.Certificate')
232        self.certificate2.code = 'CERT2'
233        self.certificate2.application_category = 'xyz'
234        self.app['faculties']['fac1']['dep1'].certificates.addCertificate(
235            self.certificate2)
236
237        # Add applicant with subobjects
238        applicant = Applicant()
239        applicant.firstname = u'Anna'
240        applicant.lastname = u'Tester'
241        self.app['applicants']['dp2011'].addApplicant(applicant)
242        self.application_number = applicant.application_number
243        self.applicant = self.app['applicants']['dp2011'][
244            self.application_number]
245        self.workdir = tempfile.mkdtemp()
246
247        self.logfile = os.path.join(
248            self.app['datacenter'].storage, 'logs', 'applicants.log')
249        return
250
251    def tearDown(self):
252        super(ApplicantImportExportSetup, self).tearDown()
253        shutil.rmtree(self.workdir)
254        shutil.rmtree(self.dc_root)
255        clearSite()
256        return
257
258class ApplicantProcessorTest(ApplicantImportExportSetup):
259
260    layer = FunctionalLayer
261
262    def setUp(self):
263        super(ApplicantProcessorTest, self).setUp()
264        self.processor = ApplicantProcessor()
265        self.csv_file = os.path.join(self.workdir, 'sample_applicant_data.csv')
266        self.csv_file_faulty = os.path.join(self.workdir,
267                                            'faulty_applicant_data.csv')
268        self.csv_file_update = os.path.join(
269            self.workdir, 'sample_applicant_data_update.csv')
270        self.csv_file_update2 = os.path.join(
271            self.workdir, 'sample_applicant_data_update2.csv')
272        open(self.csv_file, 'wb').write(APPLICANT_SAMPLE_DATA)
273        open(self.csv_file_faulty, 'wb').write(FAULTY_APPLICANT_SAMPLE_DATA)
274        open(self.csv_file_update, 'wb').write(APPLICANT_SAMPLE_DATA_UPDATE)
275        open(self.csv_file_update2, 'wb').write(APPLICANT_SAMPLE_DATA_UPDATE2)
276
277    def test_interface(self):
278        # Make sure we fulfill the interface contracts.
279        assert verifyObject(IBatchProcessor, self.processor) is True
280        assert verifyClass(
281            IBatchProcessor, ApplicantProcessor) is True
282
283    def test_entryExists(self):
284        assert self.processor.entryExists(
285            dict(container_code='dp2011', application_number='999'),
286            self.app) is False
287
288    def test_getEntry(self):
289        applicant = self.processor.getEntry(
290            dict(container_code='dp2011',
291                 application_number=self.application_number), self.app)
292        self.assertEqual(applicant.applicant_id, self.applicant.applicant_id)
293
294    def test_addEntry(self):
295        new_applicant = Applicant()
296        self.processor.addEntry(
297            new_applicant, dict(container_code='dp2011'), self.app)
298        assert len(self.app['applicants']['dp2011'].keys()) == 2
299
300    def test_delEntry(self):
301        assert self.application_number in self.app[
302            'applicants']['dp2011'].keys()
303        self.processor.delEntry(
304            dict(container_code='dp2011',
305                application_number=self.application_number), self.app)
306        assert self.application_number not in self.app[
307            'applicants']['dp2011'].keys()
308
309    def test_import(self):
310        num, num_warns, fin_file, fail_file = self.processor.doImport(
311            self.csv_file, APPLICANT_HEADER_FIELDS)
312        self.assertEqual(num_warns,0)
313        keys = self.app['applicants']['dp2011'].keys()
314        assert len(keys) == 5
315        container = self.app['applicants']['dp2011']
316        assert  container.__implemented__.__name__ == (
317            'waeup.kofa.applicants.container.ApplicantsContainer')
318        applicant = container[keys[0]]
319        assert applicant.__implemented__.__name__ == (
320            'waeup.kofa.applicants.applicant.Applicant')
321        logcontent = open(self.logfile).read()
322        # Logging message from updateEntry,
323        # create applicant with given application_number
324        self.assertTrue(
325            'Applicant Processor - sample_applicant_data - imported: '
326            'applicant_id=dp2011_1234, password=mypwd1, '
327            'reg_number=1001, firstname=Aaren, middlename=Peter, lastname=Pieri, '
328            'sex=m, course1=CERT1, date_of_birth=1990-01-02, email=xx@yy.zz' in
329            logcontent)
330        # create applicant with random application_number which is
331        # not shown in the log file
332        self.assertTrue(
333            'Applicant Processor - sample_applicant_data - imported: '
334            'reg_number=1003, firstname=Aaren, '
335            'middlename=Alfons, lastname=Berson, sex=m, course1=CERT1, '
336            'date_of_birth=1990-01-04, email=xx@yy.zz' in
337            logcontent)
338        # Logging message from handle_applicant_transition_event
339        self.assertTrue(
340            'dp2011_1234 - Application initialized' in
341            logcontent)
342        shutil.rmtree(os.path.dirname(fin_file))
343
344    def test_import_faulty(self):
345        num, num_warns, fin_file, fail_file = self.processor.doImport(
346            self.csv_file_faulty, APPLICANT_HEADER_FIELDS)
347        # we cannot import data with faulty dates. A date is faulty
348        # when in format xx/yy/zzzz as we cannot say whether it is
349        # meant as dd/mm/yyyy or mm/dd/yyyy. We therefore require yyyy-mm-dd
350        for applicant in self.app['applicants']['dp2011'].values():
351            if applicant.date_of_birth == datetime.date(1990, 1, 2):
352                self.fail(
353                    'Wrong birthdate of imported applicant '
354                    '(1990-01-02, should be: 1990-02-01)')
355        self.assertEqual(num_warns,4)
356        fail_contents = open(fail_file, 'rb').read()
357        # CERT2 is in wrong category ...
358        self.assertTrue('course1: wrong application category' in fail_contents)
359        # ... and CERT3 does not exist.
360        self.assertTrue('course1: Invalid value' in fail_contents)
361        # Conversion checking already fails because Mister or Miss No's
362        # container does not exist.
363        self.assertTrue('no@yy.zz,container: not found' in fail_contents)
364        self.assertTrue('nobody@yy.zz,container: not found' in fail_contents)
365        shutil.rmtree(os.path.dirname(fail_file))
366        return
367
368    def test_import_update(self):
369        num, num_warns, fin_file, fail_file = self.processor.doImport(
370            self.csv_file, APPLICANT_HEADER_FIELDS)
371        shutil.rmtree(os.path.dirname(fin_file))
372        num, num_warns, fin_file, fail_file = self.processor.doImport(
373            self.csv_file_update, APPLICANT_HEADER_FIELDS_UPDATE, 'update')
374        self.assertEqual(num_warns,0)
375        # The middlename import value was None.
376        # Confirm that middlename has not been deleted.
377        container = self.app['applicants']['dp2011']
378        self.assertEqual(container['1234'].middlename, 'Peter')
379        # state of Pieri has not changed
380        self.assertEqual(container['1234'].state,'initialized')
381        # state of Finau has changed
382        self.assertEqual(container['2345'].state,'admitted')
383        # password of Pieri has been set
384        self.assertTrue(IUserAccount(container['1234']).checkPassword('mypwd1'))
385        # password of Finau is still unset
386        self.assertEqual(IUserAccount(container['2345']).password,None)
387        # password of Simon was encrypted already
388        self.assertTrue(
389            IUserAccount(container['4567']).checkPassword('mypwd1'))
390        # reg_number of Finau has changed
391        self.assertEqual(container['2345'].reg_number, '6666')
392        logcontent = open(self.logfile).read()
393        # Logging message from updateEntry,
394        # reg_number is locator
395        self.assertTrue(
396            'Applicant Processor - sample_applicant_data_update - updated: '
397            'reg_number=1001, firstname=Aaren' in
398            logcontent)
399        # applicant_id is locator
400        self.assertTrue(
401            'Applicant Processor - sample_applicant_data_update - updated: '
402            'state=admitted, reg_number=6666, '
403            'firstname=Alfons, applicant_id=dp2011_2345' in
404            logcontent)
405        shutil.rmtree(os.path.dirname(fin_file))
406
407        # Now we import another file which clears all middlename attributes
408        # and uses the new reg_number as locator. This test also checks
409        # if the catalog has been informed about the reg_no change and if
410        # applicants in state created are really blocked.
411        IWorkflowState(container['4567']).setState(CREATED)
412        num, num_warns, fin_file, fail_file = self.processor.doImport(
413            self.csv_file_update2, APPLICANT_HEADER_FIELDS_UPDATE2, 'update')
414        failcontent = open(fail_file).read()
415        self.assertTrue('Applicant is blocked' in failcontent)
416        self.assertEqual(num_warns,1)
417        # Middlename is cleared.
418        assert container['1234'].middlename is None
419        # Firstname of applicant in state created isn't changed.
420        self.assertEqual(container['4567'].firstname, 'Simon')
421        shutil.rmtree(os.path.dirname(fin_file))
422
423    def test_import_remove(self):
424        num, num_warns, fin_file, fail_file = self.processor.doImport(
425            self.csv_file, APPLICANT_HEADER_FIELDS)
426        shutil.rmtree(os.path.dirname(fin_file))
427        num, num_warns, fin_file, fail_file = self.processor.doImport(
428            self.csv_file_update, APPLICANT_HEADER_FIELDS_UPDATE, 'remove')
429        self.assertEqual(num_warns,0)
430        logcontent = open(self.logfile).read()
431        # Logging message from handle_applicant_transition_event
432        self.assertTrue(
433            'dp2011_1234 - Application record removed' in
434            logcontent)
435        shutil.rmtree(os.path.dirname(fin_file))
436
437class PaymentProcessorTest(ApplicantImportExportSetup):
438
439    def setUp(self):
440        super(PaymentProcessorTest, self).setUp()
441
442        applicant = Applicant()
443        applicant.firstname = u'Anna2'
444        applicant.lastname = u'Tester'
445        applicant.applicant_id = u'dp2011_1234'
446        self.app['applicants']['dp2011'].addApplicant(applicant)
447        payment = createObject(u'waeup.ApplicantOnlinePayment')
448        payment.p_id = 'p120'
449        payment.p_session = 2012
450        payment.p_category = 'application'
451        payment.p_state = 'paid'
452        applicant['p120'] = payment
453        self.applicant2 = applicant
454        self.processor = ApplicantOnlinePaymentProcessor()
455        self.csv_file = os.path.join(
456            self.workdir, 'sample_payment_data.csv')
457        open(self.csv_file, 'wb').write(PAYMENT_SAMPLE_DATA)
458
459    def test_interface(self):
460        # Make sure we fulfill the interface contracts.
461        assert verifyObject(IBatchProcessor, self.processor) is True
462        assert verifyClass(
463            IBatchProcessor, ApplicantOnlinePaymentProcessor) is True
464
465    def test_getEntry(self):
466        assert self.processor.getEntry(
467            dict(applicant_id='ID_NONE', p_id='nonsense'), self.app) is None
468        assert self.processor.getEntry(
469            dict(applicant_id=self.applicant2.applicant_id, p_id='p120'),
470            self.app) is self.applicant2['p120']
471        assert self.processor.getEntry(
472            dict(applicant_id=self.applicant2.applicant_id, p_id='p120#'),
473            self.app) is self.applicant2['p120']
474
475    def test_delEntry(self):
476        assert self.processor.getEntry(
477            dict(applicant_id=self.applicant2.applicant_id, p_id='p120'),
478            self.app) is self.applicant2['p120']
479        self.assertEqual(len(self.applicant2.keys()),1)
480        self.processor.delEntry(
481            dict(applicant_id=self.applicant2.applicant_id, p_id='p120'),
482            self.app)
483        assert self.processor.getEntry(
484            dict(applicant_id=self.applicant2.applicant_id, p_id='p120'),
485            self.app) is None
486        self.assertEqual(len(self.applicant.keys()),0)
487
488    def test_addEntry(self):
489        self.assertEqual(len(self.applicant2.keys()),1)
490        self.processor.delEntry(
491            dict(applicant_id=self.applicant2.applicant_id, p_id='p120'),
492            self.app)
493        payment1 = createObject(u'waeup.ApplicantOnlinePayment')
494        payment1.p_category = 'application'
495        payment1.p_id = 'p234'
496        self.processor.addEntry(
497            payment1, dict(applicant_id=self.applicant2.applicant_id, p_id='p234'),
498            self.app)
499        self.assertEqual(len(self.applicant2.keys()),1)
500        self.assertEqual(self.applicant2['p234'].p_id, 'p234')
501        # Same payment must not exist.
502        payment1.p_state = 'paid'
503        payment2 = createObject(u'waeup.ApplicantOnlinePayment')
504        payment2.p_id = 'p456'
505        payment2.p_category = 'application'
506        self.assertRaises(
507            DuplicationError, self.processor.addEntry, payment2,
508            dict(applicant_id=self.applicant2.applicant_id, p_id='p456'), self.app)
509        # But we can add a ticket with another p_category.
510        payment2.p_category = 'app_balance'
511        self.processor.addEntry(
512            payment2, dict(applicant_id=self.applicant2.applicant_id, p_id='p456'),
513            self.app)
514        self.assertEqual(len(self.applicant2.keys()),2)
515        self.assertEqual(self.applicant2['p456'].p_id, 'p456')
516
517    def test_checkConversion(self):
518        errs, inv_errs, conv_dict = self.processor.checkConversion(
519            dict(p_id='<IGNORE>'), mode='create')
520        self.assertEqual(len(errs),0)
521        errs, inv_errs, conv_dict = self.processor.checkConversion(
522            dict(p_id='<IGNORE>'), mode='update')
523        self.assertEqual(len(errs),1)
524        self.assertEqual(errs[0], ('p_id', u'missing'))
525        errs, inv_errs, conv_dict = self.processor.checkConversion(
526            dict(p_id='p1266236341955'))
527        self.assertEqual(len(errs),0)
528        errs, inv_errs, conv_dict = self.processor.checkConversion(
529            dict(p_id='nonsense'))
530        self.assertEqual(len(errs),1)
531        self.assertEqual(errs[0], ('p_id', u'invalid length'))
532        timestamp = ("%d" % int(time()*10000))[1:]
533        p_id = "p%s" % timestamp
534        errs, inv_errs, conv_dict = self.processor.checkConversion(
535            dict(p_id=p_id))
536        self.assertEqual(len(errs),0)
537        dup_payment = createObject(u'waeup.ApplicantOnlinePayment')
538        dup_payment.p_id = 'p1266236341955'
539        self.applicant2[dup_payment.p_id] = dup_payment
540        errs, inv_errs, conv_dict = self.processor.checkConversion(
541            dict(p_id='p1266236341955'), mode='create')
542        self.assertEqual(len(errs),1)
543        self.assertEqual(errs[0], ('p_id', u'p_id exists in dp2011_1234 '))
544
545    def test_import(self):
546        num, num_warns, fin_file, fail_file = self.processor.doImport(
547            self.csv_file, PAYMENT_HEADER_FIELDS,'create')
548        self.assertEqual(num_warns,2)
549        fail_file = open(fail_file).read()
550        self.assertTrue('Payment has already been made' in fail_file)
551        self.processor.delEntry(
552            dict(applicant_id=self.applicant2.applicant_id, p_id='p120'),
553            self.app)
554        num, num_warns, fin_file, fail_file = self.processor.doImport(
555            self.csv_file, PAYMENT_HEADER_FIELDS,'create')
556        # Both records can be imported now.
557        self.assertEqual(num_warns,0)
558        # One with imported and known p_id ...
559        payment = self.processor.getEntry(dict(applicant_id='dp2011_1234',
560            p_id='p1266236341955'), self.app)
561        self.assertEqual(payment.p_id, 'p1266236341955')
562        cdate = payment.creation_date.strftime("%Y-%m-%d %H:%M:%S")
563        # Ooooh, still the old problem, see
564        # http://mail.dzug.org/mailman/archives/zope/2006-August/001153.html.
565        # WAT is interpreted as GMT-1 and not GMT+1
566        self.assertEqual(cdate, '2010-11-25 21:16:33')
567        self.assertEqual(str(payment.creation_date.tzinfo),'UTC')
568        # ... the other one with generated p_id.
569        p_id_failed = self.applicant2.keys()[1]
570        payment_failed = self.processor.getEntry(dict(applicant_id='dp2011_1234',
571            p_id=p_id_failed), self.app)
572        self.assertEqual(payment_failed.p_state, 'failed')
573        self.assertEqual(payment_failed.amount_auth, 10500.1)
574        shutil.rmtree(os.path.dirname(fin_file))
575        logcontent = open(self.logfile).read()
576        # Logging message from updateEntry
577        self.assertTrue(
578            'INFO - system - ApplicantOnlinePayment Processor - dp2011_1234 - '
579            'previous update cancelled' in logcontent)
580        self.assertTrue(
581            'INFO - system - ApplicantOnlinePayment Processor - '
582            'sample_payment_data - dp2011_1234 - updated: p_id=p1266236341955, '
583            'creation_date=2010-11-25 21:16:33.757000+00:00, '
584            'r_amount_approved=19500.1, p_category=application, '
585            'amount_auth=19500.1, p_session=2015, p_state=paid' in logcontent)
586        self.assertTrue(
587            'INFO - system - ApplicantOnlinePayment Processor - '
588            'sample_payment_data - dp2011_1234 - updated: p_id=%s, '
589            'creation_date=2011-11-25 21:16:33.757000+00:00, '
590            'r_amount_approved=19500.1, p_category=application, '
591            'amount_auth=10500.1, p_session=2016, p_state=failed'
592            % p_id_failed in logcontent)
593
594class RefereeReportProcessorTest(ApplicantImportExportSetup):
595
596    def setUp(self):
597        super(RefereeReportProcessorTest, self).setUp()
598
599        applicant = Applicant()
600        applicant.firstname = u'Anna2'
601        applicant.lastname = u'Tester'
602        applicant.applicant_id = u'dp2011_1234'
603        self.app['applicants']['dp2011'].addApplicant(applicant)
604        report = createObject(u'waeup.ApplicantRefereeReport')
605        report.r_id = 'r120'
606        report.p_session = 2012
607        report.p_category = 'application'
608        report.p_state = 'paid'
609        applicant['r120'] = report
610        self.applicant2 = applicant
611        self.processor = ApplicantRefereeReportProcessor()
612        self.csv_file = os.path.join(
613            self.workdir, 'sample_report_data.csv')
614        open(self.csv_file, 'wb').write(REPORT_SAMPLE_DATA)
615
616    def test_interface(self):
617        # Make sure we fulfill the interface contracts.
618        assert verifyObject(IBatchProcessor, self.processor) is True
619        assert verifyClass(
620            IBatchProcessor, ApplicantRefereeReportProcessor) is True
621
622    def test_getEntry(self):
623        assert self.processor.getEntry(
624            dict(applicant_id='ID_NONE', r_id='nonsense'), self.app) is None
625        assert self.processor.getEntry(
626            dict(applicant_id=self.applicant2.applicant_id, r_id='r120'),
627            self.app) is self.applicant2['r120']
628
629    def test_delEntry(self):
630        assert self.processor.getEntry(
631            dict(applicant_id=self.applicant2.applicant_id, r_id='r120'),
632            self.app) is self.applicant2['r120']
633        self.assertEqual(len(self.applicant2.keys()),1)
634        self.processor.delEntry(
635            dict(applicant_id=self.applicant2.applicant_id, r_id='r120'),
636            self.app)
637        assert self.processor.getEntry(
638            dict(applicant_id=self.applicant2.applicant_id, r_id='r120'),
639            self.app) is None
640        self.assertEqual(len(self.applicant.keys()),0)
641
642    def test_addEntry(self):
643        self.assertEqual(len(self.applicant2.keys()),1)
644        self.processor.delEntry(
645            dict(applicant_id=self.applicant2.applicant_id, r_id='r120'),
646            self.app)
647        self.assertEqual(len(self.applicant2.keys()),0)
648        report1 = createObject(u'waeup.ApplicantRefereeReport')
649        report1.r_id = 'r234'
650        self.processor.addEntry(
651            report1, dict(applicant_id=self.applicant2.applicant_id, r_id='r234'),
652            self.app)
653        self.assertEqual(len(self.applicant2.keys()),1)
654        self.assertEqual(self.applicant2['r234'].r_id, 'r234')
655
656    def test_import(self):
657        num, num_warns, fin_file, fail_file = self.processor.doImport(
658            self.csv_file, REPORT_HEADER_FIELDS,'create')
659        #fail_contents = open(fail_file, 'rb').read()
660        self.assertEqual(num_warns,0)
661        report = self.processor.getEntry(dict(applicant_id='dp2011_1234',
662            r_id='r122'), self.app)
663        self.assertEqual(report.r_id, 'r122')
664        cdate = report.creation_date.strftime("%Y-%m-%d %H:%M:%S")
665        self.assertEqual(cdate, '2024-05-12 08:34:46')
666        self.assertEqual(str(report.creation_date.tzinfo),'UTC')
667        # ... the other one with generated p_id.
668        shutil.rmtree(os.path.dirname(fin_file))
669        logcontent = open(self.logfile).read()
670        # Logging message from updateEntry
671        self.assertTrue(
672            'INFO - system - ApplicantRefereeReport Processor - '
673            'sample_report_data - dp2011_1234 - updated: name=John Doe, '
674            'email_pref=aa@aa.aa, creation_date=2024-05-12 08:34:46.515874+00:00, '
675            'phone=--1234, r_id=r122, report=Very nice student., '
676            'email=xx@xx.xx' in logcontent)
Note: See TracBrowser for help on using the repository browser.