source: main/waeup.ikoba/trunk/src/waeup/ikoba/customers/tests/test_batching.py @ 12655

Last change on this file since 12655 was 12633, checked in by Henrik Bettermann, 10 years ago

Copy over valid_from and valid_to from products when creating contract objects.

Fix ContractProcessorBase?. Do really allow to import tc_dict, valid_from, title and valid_to.

  • Property svn:keywords set to Id
File size: 24.2 KB
Line 
1# -*- coding: utf-8 -*-
2## $Id: test_batching.py 12633 2015-02-27 17:39:46Z henrik $
3##
4## Copyright (C) 2014 Uli Fouquet & Henrik Bettermann
5## This program is free software; you can redistribute it and/or modify
6## it under the terms of the GNU General Public License as published by
7## the Free Software Foundation; either version 2 of the License, or
8## (at your option) any later version.
9##
10## This program is distributed in the hope that it will be useful,
11## but WITHOUT ANY WARRANTY; without even the implied warranty of
12## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13## GNU General Public License for more details.
14##
15## You should have received a copy of the GNU General Public License
16## along with this program; if not, write to the Free Software
17## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18##
19"""Unit tests for customer-related data processors.
20"""
21import os
22import shutil
23import tempfile
24import unittest
25import datetime
26import grok
27from decimal import Decimal
28from time import time
29from zope.event import notify
30from zope.component import createObject, queryUtility
31from zope.component.hooks import setSite, clearSite
32from zope.catalog.interfaces import ICatalog
33from zope.interface.verify import verifyClass, verifyObject
34from hurry.workflow.interfaces import IWorkflowState
35
36from waeup.ikoba.app import Company
37from waeup.ikoba.interfaces import (
38    IBatchProcessor, FatalCSVError, IUserAccount, VERIFIED)
39from waeup.ikoba.customers.batching import (
40    CustomerProcessor, CustomerSampleDocumentProcessor, SampleContractProcessor)
41from waeup.ikoba.customers.customer import Customer
42from waeup.ikoba.customers.documents import CustomerSampleDocument
43from waeup.ikoba.products.productoptions import ProductOption
44from waeup.ikoba.testing import FunctionalLayer, FunctionalTestCase
45
46CUSTOMER_SAMPLE_DATA = open(
47    os.path.join(os.path.dirname(__file__), 'sample_customer_data.csv'),
48    'rb').read()
49
50CUSTOMER_HEADER_FIELDS = CUSTOMER_SAMPLE_DATA.split(
51    '\n')[0].split(',')
52
53CUSTOMER_SAMPLE_DATA_UPDATE = open(
54    os.path.join(os.path.dirname(__file__), 'sample_customer_data_update.csv'),
55    'rb').read()
56
57CUSTOMER_HEADER_FIELDS_UPDATE = CUSTOMER_SAMPLE_DATA_UPDATE.split(
58    '\n')[0].split(',')
59
60CUSTOMER_SAMPLE_DATA_DUPLICATES = open(
61    os.path.join(os.path.dirname(__file__),
62                 'sample_customer_data_duplicates.csv'),
63    'rb').read()
64
65CUSTOMER_HEADER_FIELDS_DUPLICATES = CUSTOMER_SAMPLE_DATA_DUPLICATES.split(
66    '\n')[0].split(',')
67
68DOCUMENT_SAMPLE_DATA = open(
69    os.path.join(os.path.dirname(__file__), 'sample_document_data.csv'),
70    'rb').read()
71
72DOCUMENT_HEADER_FIELDS = DOCUMENT_SAMPLE_DATA.split(
73    '\n')[0].split(',')
74
75CONTRACT_SAMPLE_DATA = open(
76    os.path.join(os.path.dirname(__file__), 'sample_contract_data.csv'),
77    'rb').read()
78
79CONTRACT_HEADER_FIELDS = CONTRACT_SAMPLE_DATA.split(
80    '\n')[0].split(',')
81
82class CustomerImportExportSetup(FunctionalTestCase):
83
84    layer = FunctionalLayer
85
86    def setup_customizable_params(self):
87        self._contract_category = u'sample'
88        return
89
90    def setUp(self):
91        super(CustomerImportExportSetup, self).setUp()
92        self.setup_customizable_params()
93        self.dc_root = tempfile.mkdtemp()
94        self.workdir = tempfile.mkdtemp()
95        app = Company()
96        app['datacenter'].setStoragePath(self.dc_root)
97        self.getRootFolder()['app'] = app
98        self.app = self.getRootFolder()['app']
99        setSite(app)
100        self.logfile = os.path.join(
101            self.app['datacenter'].storage, 'logs', 'customers.log')
102
103        #Add some products
104        self.product = createObject('waeup.Product')
105        self.product.product_id = u'SAM'
106        self.product.title = u'Our Sample Product'
107        self.product.contract_category = self._contract_category
108        self.product.valid_from = datetime.date(2015, 12, 4)
109        self.product.tc_dict = {'en': u'Hello world'}
110        option = ProductOption()
111        option.title = u'Base Fee'
112        option.fee = Decimal('800.6')
113        option.currency = 'USD'
114        self.product.options = [option,]
115        self.app['products'].addProduct(self.product)
116        return
117
118    def tearDown(self):
119        super(CustomerImportExportSetup, self).tearDown()
120        shutil.rmtree(self.workdir)
121        shutil.rmtree(self.dc_root)
122        clearSite()
123        return
124
125    def setup_for_export(self):
126        customer = Customer()
127        customer.customer_id = u'A111111'
128        self.app['customers'][customer.customer_id] = self.customer = customer
129        self.outfile = os.path.join(self.workdir, 'myoutput.csv')
130        return
131
132    def setup_customer(self, customer):
133        customer.matric_number = u'234'
134        customer.perm_address = u'Customerroad 21\nLagos 123456\n'
135        customer.reg_number = u'123'
136        customer.firstname = u'Anna'
137        customer.lastname = u'Tester'
138        customer.middlename = u'M.'
139        customer.email = u'bb@bb.bb'
140        customer.date_of_birth = datetime.date(1981, 2, 4)
141        #customer.sex = 'f'
142        customer.email = u'anna@sample.com'
143        customer.phone = u'+234-123-12345'
144        customer.notice = u'Some notice\nin lines.'
145        customer.nationality = u'NG'
146        # Add document
147        document = createObject('waeup.CustomerSampleDocument')
148        document.title = u'My Document'
149        document.document_id = u'DOC1'
150        customer['documents'].addDocument(document)
151        IWorkflowState(document).setState(VERIFIED)
152        self.document = document
153        # Add contract
154        contract = createObject('waeup.SampleContract')
155        contract.contract_id = u'CON1'
156        contract.product_object = self.product
157        contract.document_object = self.document
158        contract.product_options = self.product.options
159        contract.tc_dict = {'en': u'Hello world'}
160        contract.valid_from = datetime.date(2014, 2, 4)
161        contract.valid_to = datetime.date(2014, 12, 4)
162
163        customer['contracts'].addContract(contract)
164        self.contract = contract
165        return customer
166
167class CustomerProcessorTest(CustomerImportExportSetup):
168
169    layer = FunctionalLayer
170
171    def setUp(self):
172        super(CustomerProcessorTest, self).setUp()
173
174        # Add customer
175        customer = Customer()
176        self.app['customers'].addCustomer(customer)
177        customer = self.setup_customer(customer)
178        notify(grok.ObjectModifiedEvent(customer))
179        self.customer = self.app['customers'][customer.customer_id]
180
181        self.processor = CustomerProcessor()
182        self.csv_file = os.path.join(self.workdir, 'sample_customer_data.csv')
183        self.csv_file_update = os.path.join(
184            self.workdir, 'sample_customer_data_update.csv')
185        self.csv_file_duplicates = os.path.join(
186            self.workdir, 'sample_customer_data_duplicates.csv')
187        open(self.csv_file, 'wb').write(CUSTOMER_SAMPLE_DATA)
188        open(self.csv_file_update, 'wb').write(CUSTOMER_SAMPLE_DATA_UPDATE)
189        open(self.csv_file_duplicates, 'wb').write(CUSTOMER_SAMPLE_DATA_DUPLICATES)
190
191    def test_interface(self):
192        # Make sure we fulfill the interface contracts.
193        assert verifyObject(IBatchProcessor, self.processor) is True
194        assert verifyClass(
195            IBatchProcessor, CustomerProcessor) is True
196
197    def test_parentsExist(self):
198        self.assertFalse(self.processor.parentsExist(None, dict()))
199        self.assertTrue(self.processor.parentsExist(None, self.app))
200
201    def test_entryExists(self):
202        assert self.processor.entryExists(
203            dict(customer_id='ID_NONE'), self.app) is False
204        assert self.processor.entryExists(
205            dict(reg_number='123'), self.app) is True
206
207    def test_getParent(self):
208        parent = self.processor.getParent(None, self.app)
209        assert parent is self.app['customers']
210
211    def test_getEntry(self):
212        assert self.processor.getEntry(
213            dict(customer_id='ID_NONE'), self.app) is None
214        assert self.processor.getEntry(
215            dict(customer_id=self.customer.customer_id), self.app) is self.customer
216
217    def test_addEntry(self):
218        new_customer = Customer()
219        self.processor.addEntry(
220            new_customer, dict(), self.app)
221        assert len(self.app['customers'].keys()) == 2
222
223    def test_checkConversion(self):
224        # Make sure we can check conversions and that the cust_id
225        # counter is not raised during such checks.
226        initial_cust_id = self.app['customers']._curr_cust_id
227        errs, inv_errs, conv_dict = self.processor.checkConversion(
228            dict(reg_number='1', state='approved'))
229        self.assertEqual(len(errs),0)
230        # Empty state is allowed
231        errs, inv_errs, conv_dict = self.processor.checkConversion(
232            dict(reg_number='1', state=''))
233        self.assertEqual(len(errs),0)
234        #self.assertTrue(('state', 'no value provided') in errs)
235        errs, inv_errs, conv_dict = self.processor.checkConversion(
236            dict(reg_number='1', state='nonsense'))
237        self.assertEqual(len(errs),1)
238        self.assertTrue(('state', 'not allowed') in errs)
239        new_cust_id = self.app['customers']._curr_cust_id
240        self.assertEqual(initial_cust_id, new_cust_id)
241        return
242
243    def test_checkUpdateRequirements(self):
244        # Make sure that pg customers can't be updated with wrong transition.
245        err = self.processor.checkUpdateRequirements(self.customer,
246            dict(reg_number='1', state='returning'), self.app)
247        self.assertTrue(err is None)
248
249    def test_delEntry(self):
250        assert self.customer.customer_id in self.app['customers'].keys()
251        self.processor.delEntry(
252            dict(reg_number=self.customer.reg_number), self.app)
253        assert self.customer.customer_id not in self.app['customers'].keys()
254
255    def test_import(self):
256        self.assertEqual(self.app['customers']._curr_cust_id, 1000001)
257        num, num_warns, fin_file, fail_file = self.processor.doImport(
258            self.csv_file, CUSTOMER_HEADER_FIELDS)
259        self.assertEqual(num_warns,0)
260        self.assertEqual(len(self.app['customers']), 10)
261        self.assertEqual(self.app['customers']['X666666'].reg_number,'1')
262        self.assertEqual(
263            self.app['customers']['X666666'].state, 'requested')
264        # Two new customer_ids have been created.
265        self.assertEqual(self.app['customers']._curr_cust_id, 1000003)
266        shutil.rmtree(os.path.dirname(fin_file))
267
268    def test_import_update(self):
269        num, num_warns, fin_file, fail_file = self.processor.doImport(
270            self.csv_file, CUSTOMER_HEADER_FIELDS)
271        shutil.rmtree(os.path.dirname(fin_file))
272        num, num_warns, fin_file, fail_file = self.processor.doImport(
273            self.csv_file_update, CUSTOMER_HEADER_FIELDS_UPDATE, 'update')
274        self.assertEqual(num_warns,0)
275        # state has changed
276        self.assertEqual(self.app['customers']['X666666'].state,'approved')
277        # state has not changed
278        self.assertEqual(self.app['customers']['Y777777'].state, 'requested')
279        shutil.rmtree(os.path.dirname(fin_file))
280
281    def test_import_duplicate_data(self):
282        num, num_warns, fin_file, fail_file = self.processor.doImport(
283            self.csv_file_duplicates, CUSTOMER_HEADER_FIELDS_DUPLICATES)
284        content = open(fail_file).read()
285        self.assertEqual(num_warns,3)
286        self.assertEqual(
287            content,
288            'reg_number,password,firstname,lastname,sex,phone,state,date_of_birth,customer_id,email,--ERRORS--\r\n'
289            '1,,Frank,Meyer,m,1234,,1990-01-06,F123456,cc@aa.ng,reg_number: Invalid input\r\n'
290            '3,,Uli,Schulz,m,1234,,1990-01-07,A123456,dd@aa.ng,This object already exists. Skipping.\r\n'
291            '4,,Berta,Pieri,m,1234,,1991-01-07,A234567,aa@aa.ng,email: Invalid input\r\n'
292            )
293        shutil.rmtree(os.path.dirname(fin_file))
294
295class CustomerSampleDocumentProcessorTest(CustomerImportExportSetup):
296
297    def setUp(self):
298        super(CustomerSampleDocumentProcessorTest, self).setUp()
299
300        customer = Customer()
301        self.app['customers'].addCustomer(customer)
302        customer = self.setup_customer(customer)
303        notify(grok.ObjectModifiedEvent(customer))
304        self.customer = self.app['customers'][customer.customer_id]
305
306        # Import customers with subobjects
307        customer_file = os.path.join(self.workdir, 'sample_customer_data.csv')
308        open(customer_file, 'wb').write(CUSTOMER_SAMPLE_DATA)
309        num, num_warns, fin_file, fail_file = CustomerProcessor().doImport(
310            customer_file, CUSTOMER_HEADER_FIELDS)
311        shutil.rmtree(os.path.dirname(fin_file))
312
313        self.processor = CustomerSampleDocumentProcessor()
314        self.csv_file = os.path.join(
315            self.workdir, 'sample_document_data.csv')
316        open(self.csv_file, 'wb').write(DOCUMENT_SAMPLE_DATA)
317
318    def test_interface(self):
319        # Make sure we fulfill the interface contracts.
320        assert verifyObject(IBatchProcessor, self.processor) is True
321        assert verifyClass(
322            IBatchProcessor, CustomerSampleDocumentProcessor) is True
323
324    def test_getEntry(self):
325        assert self.processor.getEntry(
326            dict(customer_id='ID_NONE', document_id='nonsense'), self.app) is None
327        assert self.processor.getEntry(
328            dict(customer_id=self.customer.customer_id, document_id='DOC1'),
329            self.app) is self.customer['documents']['DOC1']
330
331    def test_delEntry(self):
332        assert self.processor.getEntry(
333            dict(customer_id=self.customer.customer_id, document_id='DOC1'),
334            self.app) is self.customer['documents']['DOC1']
335        self.assertEqual(len(self.customer['documents'].keys()),1)
336        self.processor.delEntry(
337            dict(customer_id=self.customer.customer_id, document_id='DOC1'),
338            self.app)
339        assert self.processor.getEntry(
340            dict(customer_id=self.customer.customer_id, document_id='DOC1'),
341            self.app) is None
342        self.assertEqual(len(self.customer['documents'].keys()),0)
343
344    def test_addEntry(self):
345        self.assertEqual(len(self.customer['documents'].keys()),1)
346        document1 = createObject(u'waeup.CustomerSampleDocument')
347        document1.document_id = u'd234'
348        self.processor.addEntry(
349            document1, dict(customer_id=self.customer.customer_id, document_id='d234'),
350            self.app)
351        self.assertEqual(len(self.customer['documents'].keys()),2)
352        self.assertEqual(self.customer['documents']['d234'].document_id, 'd234')
353
354    def test_checkConversion(self):
355        errs, inv_errs, conv_dict = self.processor.checkConversion(
356            dict(document_id='DOC', class_name='CustomerSampleDocument'))
357        self.assertEqual(len(errs),0)
358        errs, inv_errs, conv_dict = self.processor.checkConversion(
359            dict(document_id='id with spaces', class_name='CustomerSampleDocument'))
360        self.assertEqual(len(errs),1)
361        errs, inv_errs, conv_dict = self.processor.checkConversion(
362            dict(document_id='DOC', class_name='WrongDocument'))
363        self.assertEqual(len(errs),1)
364
365    def test_import(self):
366        num, num_warns, fin_file, fail_file = self.processor.doImport(
367            self.csv_file, DOCUMENT_HEADER_FIELDS,'create')
368        fail_file = open(fail_file).read()
369        self.assertEqual(num_warns,1)
370        # document_id must be unique
371        self.assertEqual(fail_file,
372            'class_name,reg_number,document_id,title,--ERRORS--\r\n'
373            'CustomerSampleDocument,2,d5,My stolen doc,'
374            'This object already exists. Skipping.\r\n')
375        document = self.processor.getEntry(dict(reg_number='1',
376            document_id='d3'), self.app)
377        self.assertEqual(
378            self.app['customers']['X666666']['documents']['d3'],
379            document)
380        self.assertEqual(document.document_id, 'd3')
381        document = self.processor.getEntry(dict(reg_number='3',
382            document_id='d5'), self.app)
383        shutil.rmtree(os.path.dirname(fin_file))
384        logcontent = open(self.logfile).read()
385        # Fetch uuid
386        docid = [i for i in self.app['customers']['X666666']['documents'].keys()
387            if len(i) > 10][0]
388         # Logging message from updateEntry
389        self.assertTrue(
390            'INFO - system - Customer Sample Document Processor - '
391            'sample_document_data - X666666 - %s - updated: title=My 4th doc'
392            % docid in logcontent)
393
394    def test_import_update(self):
395        # We perform the same import twice,
396        # the second time in update mode. The number
397        # of warnings must be the same.
398        num, num_warns, fin_file, fail_file = self.processor.doImport(
399            self.csv_file, DOCUMENT_HEADER_FIELDS,'create')
400        shutil.rmtree(os.path.dirname(fin_file))
401        num, num_warns, fin_file, fail_file = self.processor.doImport(
402            self.csv_file, DOCUMENT_HEADER_FIELDS,'update')
403        # There is one record without document_id
404        self.assertEqual(num_warns,1)
405        shutil.rmtree(os.path.dirname(fin_file))
406
407    def test_import_remove(self):
408        # We perform the same import twice,
409        # the second time in remove mode. The number
410        # of warnings must be the same.
411        num, num_warns, fin_file, fail_file = self.processor.doImport(
412            self.csv_file, DOCUMENT_HEADER_FIELDS,'create')
413        shutil.rmtree(os.path.dirname(fin_file))
414        num, num_warns, fin_file, fail_file = self.processor.doImport(
415            self.csv_file, DOCUMENT_HEADER_FIELDS,'remove')
416        # There is one record without document_id and one duplicate
417        self.assertEqual(num_warns,2)
418        shutil.rmtree(os.path.dirname(fin_file))
419        logcontent = open(self.logfile).read()
420        self.assertTrue(
421            'INFO - system - K1000001 - Document removed: d5'
422            in logcontent)
423
424
425class SampleContractProcessorTest(CustomerImportExportSetup):
426
427    def setUp(self):
428        super(SampleContractProcessorTest, self).setUp()
429
430        customer = Customer()
431        self.app['customers'].addCustomer(customer)
432        customer = self.setup_customer(customer)
433        notify(grok.ObjectModifiedEvent(customer))
434        self.customer = self.app['customers'][customer.customer_id]
435
436        # Import customers with subobjects
437        customer_file = os.path.join(self.workdir, 'sample_customer_data.csv')
438        open(customer_file, 'wb').write(CUSTOMER_SAMPLE_DATA)
439        num, num_warns, fin_file, fail_file = CustomerProcessor().doImport(
440            customer_file, CUSTOMER_HEADER_FIELDS)
441        shutil.rmtree(os.path.dirname(fin_file))
442
443        self.processor = SampleContractProcessor()
444        self.csv_file = os.path.join(
445            self.workdir, 'sample_contract_data.csv')
446        open(self.csv_file, 'wb').write(CONTRACT_SAMPLE_DATA)
447
448    def test_interface(self):
449        # Make sure we fulfill the interface contracts.
450        assert verifyObject(IBatchProcessor, self.processor) is True
451        assert verifyClass(
452            IBatchProcessor, SampleContractProcessor) is True
453
454    def test_getEntry(self):
455        assert self.processor.getEntry(
456            dict(customer_id='ID_NONE', contract_id='nonsense'), self.app) is None
457        assert self.processor.getEntry(
458            dict(customer_id=self.customer.customer_id, contract_id='CON1'),
459            self.app) is self.customer['contracts']['CON1']
460
461    def test_delEntry(self):
462        assert self.processor.getEntry(
463            dict(customer_id=self.customer.customer_id, contract_id='CON1'),
464            self.app) is self.customer['contracts']['CON1']
465        self.assertEqual(len(self.customer['contracts'].keys()),1)
466        self.processor.delEntry(
467            dict(customer_id=self.customer.customer_id, contract_id='CON1'),
468            self.app)
469        assert self.processor.getEntry(
470            dict(customer_id=self.customer.customer_id, contract_id='CON1'),
471            self.app) is None
472        self.assertEqual(len(self.customer['contracts'].keys()),0)
473
474    def test_addEntry(self):
475        self.assertEqual(len(self.customer['contracts'].keys()),1)
476        contract1 = createObject(u'waeup.SampleContract')
477        contract1.contract_id = u'CON2'
478        self.processor.addEntry(
479            contract1, dict(customer_id=self.customer.customer_id, contract_id='CON2'),
480            self.app)
481        self.assertEqual(len(self.customer['contracts'].keys()),2)
482        self.assertEqual(self.customer['contracts']['CON2'].contract_id, 'CON2')
483
484    def test_checkConversion(self):
485        errs, inv_errs, conv_dict = self.processor.checkConversion(
486            dict(contract_id='CON3', class_name='SampleContract',
487                 document_object='DOC1', product_object='SAM'))
488        self.assertEqual(len(errs),0)
489        errs, inv_errs, conv_dict = self.processor.checkConversion(
490            dict(contract_id='id with spaces', class_name='SampleContract'))
491        self.assertEqual(len(errs),1)
492        errs, inv_errs, conv_dict = self.processor.checkConversion(
493            dict(contract_id='CON3', class_name='WrongContract'))
494        self.assertEqual(len(errs),1)
495
496    def test_import(self):
497        num, num_warns, fin_file, fail_file = self.processor.doImport(
498            self.csv_file, CONTRACT_HEADER_FIELDS,'create')
499        self.assertEqual(num_warns,3)
500        # contract_id must be unique
501        fail_file = open(fail_file).read()
502        self.assertEqual(fail_file,
503            'reg_number,valid_from,contract_id,title,class_name,valid_to,product_object,tc_dict,document_object,product_options,--ERRORS--\r\n'
504            '2,,c5,,SampleContract,,SAM,,DOC1,[],This object already exists. Skipping.\r\n'
505            '3,,c6,,SampleContract,,SAM,,NONEXISTENT,[],document_object: Invalid value\r\n'
506            '3,,c7,,SampleContract,,NONEXISTENT,,DOC1,[],product_object: Invalid value\r\n'
507            )
508        contract = self.processor.getEntry(dict(reg_number='1',
509            contract_id='c3'), self.app)
510        self.assertEqual(
511            self.app['customers']['X666666']['contracts']['c3'],
512            contract)
513        self.assertEqual(contract.product_options[0].title, 'Base Fee')
514        self.assertEqual(contract.product_options[0].fee, Decimal('800.6') )
515        self.assertEqual(contract.product_options[0].currency, 'USD')
516        self.assertEqual(contract.contract_id, 'c3')
517        self.assertEqual(contract.title, u'Nice Contract')
518        self.assertEqual(contract.tc_dict, {'en':'Hello World'})
519        contract = self.processor.getEntry(dict(reg_number='3',
520            contract_id='c5'), self.app)
521        shutil.rmtree(os.path.dirname(fin_file))
522        logcontent = open(self.logfile).read()
523        # Logging message from updateEntry
524        # Fetch uuid
525        conid = [i for i in self.app['customers']['X666666']['contracts'].keys()
526            if len(i) > 10][0]
527        self.assertTrue(
528            'INFO - system - Customer Sample Contract Processor - sample_contract_data - '
529            'X666666 - %s - updated: valid_from=, title=, valid_to=, product_object=SAM, tc_dict=, document_object=DOC1, '
530            'product_options=[]' % conid
531            in logcontent)
532
533    def test_import_update(self):
534        # We perform the same import twice,
535        # the second time in update mode. The number
536        # of warnings must be the same.
537        num, num_warns, fin_file, fail_file = self.processor.doImport(
538            self.csv_file, CONTRACT_HEADER_FIELDS,'create')
539        shutil.rmtree(os.path.dirname(fin_file))
540        num, num_warns, fin_file, fail_file = self.processor.doImport(
541            self.csv_file, CONTRACT_HEADER_FIELDS,'update')
542        # There are one record without contract_id and two
543        # records with wrong object identifier.
544        self.assertEqual(num_warns,3)
545        shutil.rmtree(os.path.dirname(fin_file))
546
547    def test_import_remove(self):
548        # We perform the same import twice,
549        # the second time in remove mode. The number
550        # of warnings must be the same.
551        num, num_warns, fin_file, fail_file = self.processor.doImport(
552            self.csv_file, CONTRACT_HEADER_FIELDS,'create')
553        shutil.rmtree(os.path.dirname(fin_file))
554        num, num_warns, fin_file, fail_file = self.processor.doImport(
555            self.csv_file, CONTRACT_HEADER_FIELDS,'remove')
556        # There are one record without contract_id, one duplicate and two
557        # records with wrong object identifier.
558        self.assertEqual(num_warns,4)
559        shutil.rmtree(os.path.dirname(fin_file))
560        logcontent = open(self.logfile).read()
561        self.assertTrue(
562            'INFO - system - K1000001 - Contract removed: c5'
563            in logcontent)
Note: See TracBrowser for help on using the repository browser.