source: main/waeup.ikoba/trunk/src/waeup/ikoba/customers/tests/test_batching.py @ 12573

Last change on this file since 12573 was 12537, checked in by Henrik Bettermann, 10 years ago

Email address must be required.

  • Property svn:keywords set to Id
File size: 24.0 KB
Line 
1# -*- coding: utf-8 -*-
2## $Id: test_batching.py 12537 2015-02-01 07:38:12Z henrik $
3##
4## Copyright (C) 2014 Uli Fouquet & Henrik Bettermann
5## This program is free software; you can redistribute it and/or modify
6## it under the terms of the GNU General Public License as published by
7## the Free Software Foundation; either version 2 of the License, or
8## (at your option) any later version.
9##
10## This program is distributed in the hope that it will be useful,
11## but WITHOUT ANY WARRANTY; without even the implied warranty of
12## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13## GNU General Public License for more details.
14##
15## You should have received a copy of the GNU General Public License
16## along with this program; if not, write to the Free Software
17## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18##
19"""Unit tests for customer-related data processors.
20"""
21import os
22import shutil
23import tempfile
24import unittest
25import datetime
26import grok
27from decimal import Decimal
28from time import time
29from zope.event import notify
30from zope.component import createObject, queryUtility
31from zope.component.hooks import setSite, clearSite
32from zope.catalog.interfaces import ICatalog
33from zope.interface.verify import verifyClass, verifyObject
34from hurry.workflow.interfaces import IWorkflowState
35
36from waeup.ikoba.app import Company
37from waeup.ikoba.interfaces import (
38    IBatchProcessor, FatalCSVError, IUserAccount, VERIFIED)
39from waeup.ikoba.customers.batching import (
40    CustomerProcessor, CustomerSampleDocumentProcessor, SampleContractProcessor)
41from waeup.ikoba.customers.customer import Customer
42from waeup.ikoba.customers.documents import CustomerSampleDocument
43from waeup.ikoba.products.productoptions import ProductOption
44from waeup.ikoba.testing import FunctionalLayer, FunctionalTestCase
45
46CUSTOMER_SAMPLE_DATA = open(
47    os.path.join(os.path.dirname(__file__), 'sample_customer_data.csv'),
48    'rb').read()
49
50CUSTOMER_HEADER_FIELDS = CUSTOMER_SAMPLE_DATA.split(
51    '\n')[0].split(',')
52
53CUSTOMER_SAMPLE_DATA_UPDATE = open(
54    os.path.join(os.path.dirname(__file__), 'sample_customer_data_update.csv'),
55    'rb').read()
56
57CUSTOMER_HEADER_FIELDS_UPDATE = CUSTOMER_SAMPLE_DATA_UPDATE.split(
58    '\n')[0].split(',')
59
60CUSTOMER_SAMPLE_DATA_DUPLICATES = open(
61    os.path.join(os.path.dirname(__file__),
62                 'sample_customer_data_duplicates.csv'),
63    'rb').read()
64
65CUSTOMER_HEADER_FIELDS_DUPLICATES = CUSTOMER_SAMPLE_DATA_DUPLICATES.split(
66    '\n')[0].split(',')
67
68DOCUMENT_SAMPLE_DATA = open(
69    os.path.join(os.path.dirname(__file__), 'sample_document_data.csv'),
70    'rb').read()
71
72DOCUMENT_HEADER_FIELDS = DOCUMENT_SAMPLE_DATA.split(
73    '\n')[0].split(',')
74
75CONTRACT_SAMPLE_DATA = open(
76    os.path.join(os.path.dirname(__file__), 'sample_contract_data.csv'),
77    'rb').read()
78
79CONTRACT_HEADER_FIELDS = CONTRACT_SAMPLE_DATA.split(
80    '\n')[0].split(',')
81
82class CustomerImportExportSetup(FunctionalTestCase):
83
84    layer = FunctionalLayer
85
86    def setup_customizable_params(self):
87        self._contract_category = u'sample'
88        return
89
90    def setUp(self):
91        super(CustomerImportExportSetup, self).setUp()
92        self.setup_customizable_params()
93        self.dc_root = tempfile.mkdtemp()
94        self.workdir = tempfile.mkdtemp()
95        app = Company()
96        app['datacenter'].setStoragePath(self.dc_root)
97        self.getRootFolder()['app'] = app
98        self.app = self.getRootFolder()['app']
99        setSite(app)
100        self.logfile = os.path.join(
101            self.app['datacenter'].storage, 'logs', 'customers.log')
102
103        #Add some products
104        self.product = createObject('waeup.Product')
105        self.product.product_id = u'SAM'
106        self.product.title = u'Our Sample Product'
107        self.product.contract_category = self._contract_category
108        self.product.valid_from = datetime.date(2015, 12, 4)
109        self.product.tc_dict = {'en': u'Hello world'}
110        option = ProductOption()
111        option.title = u'Base Fee'
112        option.fee = Decimal('800.6')
113        option.currency = 'USD'
114        self.product.options = [option,]
115        self.app['products'].addProduct(self.product)
116        return
117
118    def tearDown(self):
119        super(CustomerImportExportSetup, self).tearDown()
120        shutil.rmtree(self.workdir)
121        shutil.rmtree(self.dc_root)
122        clearSite()
123        return
124
125    def setup_for_export(self):
126        customer = Customer()
127        customer.customer_id = u'A111111'
128        self.app['customers'][customer.customer_id] = self.customer = customer
129        self.outfile = os.path.join(self.workdir, 'myoutput.csv')
130        return
131
132    def setup_customer(self, customer):
133        customer.matric_number = u'234'
134        customer.perm_address = u'Customerroad 21\nLagos 123456\n'
135        customer.reg_number = u'123'
136        customer.firstname = u'Anna'
137        customer.lastname = u'Tester'
138        customer.middlename = u'M.'
139        customer.email = u'bb@bb.bb'
140        customer.date_of_birth = datetime.date(1981, 2, 4)
141        #customer.sex = 'f'
142        customer.email = u'anna@sample.com'
143        customer.phone = u'+234-123-12345'
144        customer.notice = u'Some notice\nin lines.'
145        customer.nationality = u'NG'
146        # Add document
147        document = createObject('waeup.CustomerSampleDocument')
148        document.title = u'My Document'
149        document.document_id = u'DOC1'
150        customer['documents'].addDocument(document)
151        IWorkflowState(document).setState(VERIFIED)
152        self.document = document
153        # Add contract
154        contract = createObject('waeup.SampleContract')
155        contract.contract_id = u'CON1'
156        contract.product_object = self.product
157        contract.document_object = self.document
158        contract.product_options = self.product.options
159        contract.tc_dict = {'en': u'Hello world'}
160        customer['contracts'].addContract(contract)
161        self.contract = contract
162        return customer
163
164class CustomerProcessorTest(CustomerImportExportSetup):
165
166    layer = FunctionalLayer
167
168    def setUp(self):
169        super(CustomerProcessorTest, self).setUp()
170
171        # Add customer
172        customer = Customer()
173        self.app['customers'].addCustomer(customer)
174        customer = self.setup_customer(customer)
175        notify(grok.ObjectModifiedEvent(customer))
176        self.customer = self.app['customers'][customer.customer_id]
177
178        self.processor = CustomerProcessor()
179        self.csv_file = os.path.join(self.workdir, 'sample_customer_data.csv')
180        self.csv_file_update = os.path.join(
181            self.workdir, 'sample_customer_data_update.csv')
182        self.csv_file_duplicates = os.path.join(
183            self.workdir, 'sample_customer_data_duplicates.csv')
184        open(self.csv_file, 'wb').write(CUSTOMER_SAMPLE_DATA)
185        open(self.csv_file_update, 'wb').write(CUSTOMER_SAMPLE_DATA_UPDATE)
186        open(self.csv_file_duplicates, 'wb').write(CUSTOMER_SAMPLE_DATA_DUPLICATES)
187
188    def test_interface(self):
189        # Make sure we fulfill the interface contracts.
190        assert verifyObject(IBatchProcessor, self.processor) is True
191        assert verifyClass(
192            IBatchProcessor, CustomerProcessor) is True
193
194    def test_parentsExist(self):
195        self.assertFalse(self.processor.parentsExist(None, dict()))
196        self.assertTrue(self.processor.parentsExist(None, self.app))
197
198    def test_entryExists(self):
199        assert self.processor.entryExists(
200            dict(customer_id='ID_NONE'), self.app) is False
201        assert self.processor.entryExists(
202            dict(reg_number='123'), self.app) is True
203
204    def test_getParent(self):
205        parent = self.processor.getParent(None, self.app)
206        assert parent is self.app['customers']
207
208    def test_getEntry(self):
209        assert self.processor.getEntry(
210            dict(customer_id='ID_NONE'), self.app) is None
211        assert self.processor.getEntry(
212            dict(customer_id=self.customer.customer_id), self.app) is self.customer
213
214    def test_addEntry(self):
215        new_customer = Customer()
216        self.processor.addEntry(
217            new_customer, dict(), self.app)
218        assert len(self.app['customers'].keys()) == 2
219
220    def test_checkConversion(self):
221        # Make sure we can check conversions and that the cust_id
222        # counter is not raised during such checks.
223        initial_cust_id = self.app['customers']._curr_cust_id
224        errs, inv_errs, conv_dict = self.processor.checkConversion(
225            dict(reg_number='1', state='approved'))
226        self.assertEqual(len(errs),0)
227        # Empty state is allowed
228        errs, inv_errs, conv_dict = self.processor.checkConversion(
229            dict(reg_number='1', state=''))
230        self.assertEqual(len(errs),0)
231        #self.assertTrue(('state', 'no value provided') in errs)
232        errs, inv_errs, conv_dict = self.processor.checkConversion(
233            dict(reg_number='1', state='nonsense'))
234        self.assertEqual(len(errs),1)
235        self.assertTrue(('state', 'not allowed') in errs)
236        new_cust_id = self.app['customers']._curr_cust_id
237        self.assertEqual(initial_cust_id, new_cust_id)
238        return
239
240    def test_checkUpdateRequirements(self):
241        # Make sure that pg customers can't be updated with wrong transition.
242        err = self.processor.checkUpdateRequirements(self.customer,
243            dict(reg_number='1', state='returning'), self.app)
244        self.assertTrue(err is None)
245
246    def test_delEntry(self):
247        assert self.customer.customer_id in self.app['customers'].keys()
248        self.processor.delEntry(
249            dict(reg_number=self.customer.reg_number), self.app)
250        assert self.customer.customer_id not in self.app['customers'].keys()
251
252    def test_import(self):
253        self.assertEqual(self.app['customers']._curr_cust_id, 1000001)
254        num, num_warns, fin_file, fail_file = self.processor.doImport(
255            self.csv_file, CUSTOMER_HEADER_FIELDS)
256        self.assertEqual(num_warns,0)
257        self.assertEqual(len(self.app['customers']), 10)
258        self.assertEqual(self.app['customers']['X666666'].reg_number,'1')
259        self.assertEqual(
260            self.app['customers']['X666666'].state, 'requested')
261        # Two new customer_ids have been created.
262        self.assertEqual(self.app['customers']._curr_cust_id, 1000003)
263        shutil.rmtree(os.path.dirname(fin_file))
264
265    def test_import_update(self):
266        num, num_warns, fin_file, fail_file = self.processor.doImport(
267            self.csv_file, CUSTOMER_HEADER_FIELDS)
268        shutil.rmtree(os.path.dirname(fin_file))
269        num, num_warns, fin_file, fail_file = self.processor.doImport(
270            self.csv_file_update, CUSTOMER_HEADER_FIELDS_UPDATE, 'update')
271        self.assertEqual(num_warns,0)
272        # state has changed
273        self.assertEqual(self.app['customers']['X666666'].state,'approved')
274        # state has not changed
275        self.assertEqual(self.app['customers']['Y777777'].state, 'requested')
276        shutil.rmtree(os.path.dirname(fin_file))
277
278    def test_import_duplicate_data(self):
279        num, num_warns, fin_file, fail_file = self.processor.doImport(
280            self.csv_file_duplicates, CUSTOMER_HEADER_FIELDS_DUPLICATES)
281        content = open(fail_file).read()
282        self.assertEqual(num_warns,3)
283        self.assertEqual(
284            content,
285            'reg_number,password,firstname,lastname,sex,phone,state,date_of_birth,customer_id,email,--ERRORS--\r\n'
286            '1,,Frank,Meyer,m,1234,,1990-01-06,F123456,cc@aa.ng,reg_number: Invalid input\r\n'
287            '3,,Uli,Schulz,m,1234,,1990-01-07,A123456,dd@aa.ng,This object already exists. Skipping.\r\n'
288            '4,,Berta,Pieri,m,1234,,1991-01-07,A234567,aa@aa.ng,email: Invalid input\r\n'
289            )
290        shutil.rmtree(os.path.dirname(fin_file))
291
292class CustomerSampleDocumentProcessorTest(CustomerImportExportSetup):
293
294    def setUp(self):
295        super(CustomerSampleDocumentProcessorTest, self).setUp()
296
297        customer = Customer()
298        self.app['customers'].addCustomer(customer)
299        customer = self.setup_customer(customer)
300        notify(grok.ObjectModifiedEvent(customer))
301        self.customer = self.app['customers'][customer.customer_id]
302
303        # Import customers with subobjects
304        customer_file = os.path.join(self.workdir, 'sample_customer_data.csv')
305        open(customer_file, 'wb').write(CUSTOMER_SAMPLE_DATA)
306        num, num_warns, fin_file, fail_file = CustomerProcessor().doImport(
307            customer_file, CUSTOMER_HEADER_FIELDS)
308        shutil.rmtree(os.path.dirname(fin_file))
309
310        self.processor = CustomerSampleDocumentProcessor()
311        self.csv_file = os.path.join(
312            self.workdir, 'sample_document_data.csv')
313        open(self.csv_file, 'wb').write(DOCUMENT_SAMPLE_DATA)
314
315    def test_interface(self):
316        # Make sure we fulfill the interface contracts.
317        assert verifyObject(IBatchProcessor, self.processor) is True
318        assert verifyClass(
319            IBatchProcessor, CustomerSampleDocumentProcessor) is True
320
321    def test_getEntry(self):
322        assert self.processor.getEntry(
323            dict(customer_id='ID_NONE', document_id='nonsense'), self.app) is None
324        assert self.processor.getEntry(
325            dict(customer_id=self.customer.customer_id, document_id='DOC1'),
326            self.app) is self.customer['documents']['DOC1']
327
328    def test_delEntry(self):
329        assert self.processor.getEntry(
330            dict(customer_id=self.customer.customer_id, document_id='DOC1'),
331            self.app) is self.customer['documents']['DOC1']
332        self.assertEqual(len(self.customer['documents'].keys()),1)
333        self.processor.delEntry(
334            dict(customer_id=self.customer.customer_id, document_id='DOC1'),
335            self.app)
336        assert self.processor.getEntry(
337            dict(customer_id=self.customer.customer_id, document_id='DOC1'),
338            self.app) is None
339        self.assertEqual(len(self.customer['documents'].keys()),0)
340
341    def test_addEntry(self):
342        self.assertEqual(len(self.customer['documents'].keys()),1)
343        document1 = createObject(u'waeup.CustomerSampleDocument')
344        document1.document_id = u'd234'
345        self.processor.addEntry(
346            document1, dict(customer_id=self.customer.customer_id, document_id='d234'),
347            self.app)
348        self.assertEqual(len(self.customer['documents'].keys()),2)
349        self.assertEqual(self.customer['documents']['d234'].document_id, 'd234')
350
351    def test_checkConversion(self):
352        errs, inv_errs, conv_dict = self.processor.checkConversion(
353            dict(document_id='DOC', class_name='CustomerSampleDocument'))
354        self.assertEqual(len(errs),0)
355        errs, inv_errs, conv_dict = self.processor.checkConversion(
356            dict(document_id='id with spaces', class_name='CustomerSampleDocument'))
357        self.assertEqual(len(errs),1)
358        errs, inv_errs, conv_dict = self.processor.checkConversion(
359            dict(document_id='DOC', class_name='WrongDocument'))
360        self.assertEqual(len(errs),1)
361
362    def test_import(self):
363        num, num_warns, fin_file, fail_file = self.processor.doImport(
364            self.csv_file, DOCUMENT_HEADER_FIELDS,'create')
365        fail_file = open(fail_file).read()
366        self.assertEqual(num_warns,1)
367        # document_id must be unique
368        self.assertEqual(fail_file,
369            'class_name,reg_number,document_id,title,--ERRORS--\r\n'
370            'CustomerSampleDocument,2,d5,My stolen doc,'
371            'This object already exists. Skipping.\r\n')
372        document = self.processor.getEntry(dict(reg_number='1',
373            document_id='d3'), self.app)
374        self.assertEqual(
375            self.app['customers']['X666666']['documents']['d3'],
376            document)
377        self.assertEqual(document.document_id, 'd3')
378        document = self.processor.getEntry(dict(reg_number='3',
379            document_id='d5'), self.app)
380        shutil.rmtree(os.path.dirname(fin_file))
381        logcontent = open(self.logfile).read()
382        # Fetch uuid
383        docid = [i for i in self.app['customers']['X666666']['documents'].keys()
384            if len(i) > 10][0]
385         # Logging message from updateEntry
386        self.assertTrue(
387            'INFO - system - Customer Sample Document Processor - '
388            'sample_document_data - X666666 - %s - updated: title=My 4th doc'
389            % docid in logcontent)
390
391    def test_import_update(self):
392        # We perform the same import twice,
393        # the second time in update mode. The number
394        # of warnings must be the same.
395        num, num_warns, fin_file, fail_file = self.processor.doImport(
396            self.csv_file, DOCUMENT_HEADER_FIELDS,'create')
397        shutil.rmtree(os.path.dirname(fin_file))
398        num, num_warns, fin_file, fail_file = self.processor.doImport(
399            self.csv_file, DOCUMENT_HEADER_FIELDS,'update')
400        # There is one record without document_id
401        self.assertEqual(num_warns,1)
402        shutil.rmtree(os.path.dirname(fin_file))
403
404    def test_import_remove(self):
405        # We perform the same import twice,
406        # the second time in remove mode. The number
407        # of warnings must be the same.
408        num, num_warns, fin_file, fail_file = self.processor.doImport(
409            self.csv_file, DOCUMENT_HEADER_FIELDS,'create')
410        shutil.rmtree(os.path.dirname(fin_file))
411        num, num_warns, fin_file, fail_file = self.processor.doImport(
412            self.csv_file, DOCUMENT_HEADER_FIELDS,'remove')
413        # There is one record without document_id and one duplicate
414        self.assertEqual(num_warns,2)
415        shutil.rmtree(os.path.dirname(fin_file))
416        logcontent = open(self.logfile).read()
417        self.assertTrue(
418            'INFO - system - K1000001 - Document removed: d5'
419            in logcontent)
420
421
422class SampleContractProcessorTest(CustomerImportExportSetup):
423
424    def setUp(self):
425        super(SampleContractProcessorTest, self).setUp()
426
427        customer = Customer()
428        self.app['customers'].addCustomer(customer)
429        customer = self.setup_customer(customer)
430        notify(grok.ObjectModifiedEvent(customer))
431        self.customer = self.app['customers'][customer.customer_id]
432
433        # Import customers with subobjects
434        customer_file = os.path.join(self.workdir, 'sample_customer_data.csv')
435        open(customer_file, 'wb').write(CUSTOMER_SAMPLE_DATA)
436        num, num_warns, fin_file, fail_file = CustomerProcessor().doImport(
437            customer_file, CUSTOMER_HEADER_FIELDS)
438        shutil.rmtree(os.path.dirname(fin_file))
439
440        self.processor = SampleContractProcessor()
441        self.csv_file = os.path.join(
442            self.workdir, 'sample_contract_data.csv')
443        open(self.csv_file, 'wb').write(CONTRACT_SAMPLE_DATA)
444
445    def test_interface(self):
446        # Make sure we fulfill the interface contracts.
447        assert verifyObject(IBatchProcessor, self.processor) is True
448        assert verifyClass(
449            IBatchProcessor, SampleContractProcessor) is True
450
451    def test_getEntry(self):
452        assert self.processor.getEntry(
453            dict(customer_id='ID_NONE', contract_id='nonsense'), self.app) is None
454        assert self.processor.getEntry(
455            dict(customer_id=self.customer.customer_id, contract_id='CON1'),
456            self.app) is self.customer['contracts']['CON1']
457
458    def test_delEntry(self):
459        assert self.processor.getEntry(
460            dict(customer_id=self.customer.customer_id, contract_id='CON1'),
461            self.app) is self.customer['contracts']['CON1']
462        self.assertEqual(len(self.customer['contracts'].keys()),1)
463        self.processor.delEntry(
464            dict(customer_id=self.customer.customer_id, contract_id='CON1'),
465            self.app)
466        assert self.processor.getEntry(
467            dict(customer_id=self.customer.customer_id, contract_id='CON1'),
468            self.app) is None
469        self.assertEqual(len(self.customer['contracts'].keys()),0)
470
471    def test_addEntry(self):
472        self.assertEqual(len(self.customer['contracts'].keys()),1)
473        contract1 = createObject(u'waeup.SampleContract')
474        contract1.contract_id = u'CON2'
475        self.processor.addEntry(
476            contract1, dict(customer_id=self.customer.customer_id, contract_id='CON2'),
477            self.app)
478        self.assertEqual(len(self.customer['contracts'].keys()),2)
479        self.assertEqual(self.customer['contracts']['CON2'].contract_id, 'CON2')
480
481    def test_checkConversion(self):
482        errs, inv_errs, conv_dict = self.processor.checkConversion(
483            dict(contract_id='CON3', class_name='SampleContract',
484                 document_object='DOC1', product_object='SAM'))
485        self.assertEqual(len(errs),0)
486        errs, inv_errs, conv_dict = self.processor.checkConversion(
487            dict(contract_id='id with spaces', class_name='SampleContract'))
488        self.assertEqual(len(errs),1)
489        errs, inv_errs, conv_dict = self.processor.checkConversion(
490            dict(contract_id='CON3', class_name='WrongContract'))
491        self.assertEqual(len(errs),1)
492
493    def test_import(self):
494        num, num_warns, fin_file, fail_file = self.processor.doImport(
495            self.csv_file, CONTRACT_HEADER_FIELDS,'create')
496        self.assertEqual(num_warns,3)
497        # contract_id must be unique
498        fail_file = open(fail_file).read()
499        self.assertEqual(fail_file,
500            'reg_number,contract_id,class_name,product_object,tc_dict,document_object,product_options,--ERRORS--\r\n'
501            '2,c5,SampleContract,SAM,{},DOC1,[],This object already exists. Skipping.\r\n'
502            '3,c6,SampleContract,SAM,{},NONEXISTENT,[],document_object: Invalid value\r\n'
503            '3,c7,SampleContract,NONEXISTENT,{},DOC1,[],product_object: Invalid value\r\n'
504            )
505        contract = self.processor.getEntry(dict(reg_number='1',
506            contract_id='c3'), self.app)
507        self.assertEqual(
508            self.app['customers']['X666666']['contracts']['c3'],
509            contract)
510        self.assertEqual(contract.product_options[0].title, 'Base Fee')
511        self.assertEqual(contract.product_options[0].fee, Decimal('800.6') )
512        self.assertEqual(contract.product_options[0].currency, 'USD')
513        self.assertEqual(contract.contract_id, 'c3')
514        self.assertEqual(contract.tc_dict, u"{'en':'Hello World'}")
515        contract = self.processor.getEntry(dict(reg_number='3',
516            contract_id='c5'), self.app)
517        shutil.rmtree(os.path.dirname(fin_file))
518        logcontent = open(self.logfile).read()
519        # Logging message from updateEntry
520        # Fetch uuid
521        conid = [i for i in self.app['customers']['X666666']['contracts'].keys()
522            if len(i) > 10][0]
523        self.assertTrue(
524            'INFO - system - Customer Sample Contract Processor - sample_contract_data - '
525            'X666666 - %s - updated: product_object=SAM, tc_dict={}, document_object=DOC1, '
526            'product_options=[]' % conid
527            in logcontent)
528
529    def test_import_update(self):
530        # We perform the same import twice,
531        # the second time in update mode. The number
532        # of warnings must be the same.
533        num, num_warns, fin_file, fail_file = self.processor.doImport(
534            self.csv_file, CONTRACT_HEADER_FIELDS,'create')
535        shutil.rmtree(os.path.dirname(fin_file))
536        num, num_warns, fin_file, fail_file = self.processor.doImport(
537            self.csv_file, CONTRACT_HEADER_FIELDS,'update')
538        # There are one record without contract_id and two
539        # records with wrong object identifier.
540        self.assertEqual(num_warns,3)
541        shutil.rmtree(os.path.dirname(fin_file))
542
543    def test_import_remove(self):
544        # We perform the same import twice,
545        # the second time in remove mode. The number
546        # of warnings must be the same.
547        num, num_warns, fin_file, fail_file = self.processor.doImport(
548            self.csv_file, CONTRACT_HEADER_FIELDS,'create')
549        shutil.rmtree(os.path.dirname(fin_file))
550        num, num_warns, fin_file, fail_file = self.processor.doImport(
551            self.csv_file, CONTRACT_HEADER_FIELDS,'remove')
552        # There are one record without contract_id, one duplicate and two
553        # records with wrong object identifier.
554        self.assertEqual(num_warns,4)
555        shutil.rmtree(os.path.dirname(fin_file))
556        logcontent = open(self.logfile).read()
557        self.assertTrue(
558            'INFO - system - K1000001 - Contract removed: c5'
559            in logcontent)
Note: See TracBrowser for help on using the repository browser.