source: main/waeup.ikoba/trunk/src/waeup/ikoba/customers/tests/test_batching.py @ 12535

Last change on this file since 12535 was 12521, checked in by Henrik Bettermann, 10 years ago

Use TextLineChoice? field for cutomer emails and ensure that email addresses are unique.

  • Property svn:keywords set to Id
File size: 23.9 KB
Line 
1# -*- coding: utf-8 -*-
2## $Id: test_batching.py 12521 2015-01-30 07:48:39Z henrik $
3##
4## Copyright (C) 2014 Uli Fouquet & Henrik Bettermann
5## This program is free software; you can redistribute it and/or modify
6## it under the terms of the GNU General Public License as published by
7## the Free Software Foundation; either version 2 of the License, or
8## (at your option) any later version.
9##
10## This program is distributed in the hope that it will be useful,
11## but WITHOUT ANY WARRANTY; without even the implied warranty of
12## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13## GNU General Public License for more details.
14##
15## You should have received a copy of the GNU General Public License
16## along with this program; if not, write to the Free Software
17## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18##
19"""Unit tests for customer-related data processors.
20"""
21import os
22import shutil
23import tempfile
24import unittest
25import datetime
26import grok
27from decimal import Decimal
28from time import time
29from zope.event import notify
30from zope.component import createObject, queryUtility
31from zope.component.hooks import setSite, clearSite
32from zope.catalog.interfaces import ICatalog
33from zope.interface.verify import verifyClass, verifyObject
34from hurry.workflow.interfaces import IWorkflowState
35
36from waeup.ikoba.app import Company
37from waeup.ikoba.interfaces import (
38    IBatchProcessor, FatalCSVError, IUserAccount, VERIFIED)
39from waeup.ikoba.customers.batching import (
40    CustomerProcessor, CustomerSampleDocumentProcessor, SampleContractProcessor)
41from waeup.ikoba.customers.customer import Customer
42from waeup.ikoba.customers.documents import CustomerSampleDocument
43from waeup.ikoba.products.productoptions import ProductOption
44from waeup.ikoba.testing import FunctionalLayer, FunctionalTestCase
45
46CUSTOMER_SAMPLE_DATA = open(
47    os.path.join(os.path.dirname(__file__), 'sample_customer_data.csv'),
48    'rb').read()
49
50CUSTOMER_HEADER_FIELDS = CUSTOMER_SAMPLE_DATA.split(
51    '\n')[0].split(',')
52
53CUSTOMER_SAMPLE_DATA_UPDATE = open(
54    os.path.join(os.path.dirname(__file__), 'sample_customer_data_update.csv'),
55    'rb').read()
56
57CUSTOMER_HEADER_FIELDS_UPDATE = CUSTOMER_SAMPLE_DATA_UPDATE.split(
58    '\n')[0].split(',')
59
60CUSTOMER_SAMPLE_DATA_DUPLICATES = open(
61    os.path.join(os.path.dirname(__file__),
62                 'sample_customer_data_duplicates.csv'),
63    'rb').read()
64
65CUSTOMER_HEADER_FIELDS_DUPLICATES = CUSTOMER_SAMPLE_DATA_DUPLICATES.split(
66    '\n')[0].split(',')
67
68DOCUMENT_SAMPLE_DATA = open(
69    os.path.join(os.path.dirname(__file__), 'sample_document_data.csv'),
70    'rb').read()
71
72DOCUMENT_HEADER_FIELDS = DOCUMENT_SAMPLE_DATA.split(
73    '\n')[0].split(',')
74
75CONTRACT_SAMPLE_DATA = open(
76    os.path.join(os.path.dirname(__file__), 'sample_contract_data.csv'),
77    'rb').read()
78
79CONTRACT_HEADER_FIELDS = CONTRACT_SAMPLE_DATA.split(
80    '\n')[0].split(',')
81
82class CustomerImportExportSetup(FunctionalTestCase):
83
84    layer = FunctionalLayer
85
86    def setup_customizable_params(self):
87        self._contract_category = u'sample'
88        return
89
90    def setUp(self):
91        super(CustomerImportExportSetup, self).setUp()
92        self.setup_customizable_params()
93        self.dc_root = tempfile.mkdtemp()
94        self.workdir = tempfile.mkdtemp()
95        app = Company()
96        app['datacenter'].setStoragePath(self.dc_root)
97        self.getRootFolder()['app'] = app
98        self.app = self.getRootFolder()['app']
99        setSite(app)
100        self.logfile = os.path.join(
101            self.app['datacenter'].storage, 'logs', 'customers.log')
102
103        #Add some products
104        self.product = createObject('waeup.Product')
105        self.product.product_id = u'SAM'
106        self.product.title = u'Our Sample Product'
107        self.product.contract_category = self._contract_category
108        self.product.valid_from = datetime.date(2015, 12, 4)
109        self.product.tc_dict = {'en': u'Hello world'}
110        option = ProductOption()
111        option.title = u'Base Fee'
112        option.fee = Decimal('800.6')
113        option.currency = 'USD'
114        self.product.options = [option,]
115        self.app['products'].addProduct(self.product)
116        return
117
118    def tearDown(self):
119        super(CustomerImportExportSetup, self).tearDown()
120        shutil.rmtree(self.workdir)
121        shutil.rmtree(self.dc_root)
122        clearSite()
123        return
124
125    def setup_for_export(self):
126        customer = Customer()
127        customer.customer_id = u'A111111'
128        self.app['customers'][customer.customer_id] = self.customer = customer
129        self.outfile = os.path.join(self.workdir, 'myoutput.csv')
130        return
131
132    def setup_customer(self, customer):
133        customer.matric_number = u'234'
134        customer.perm_address = u'Customerroad 21\nLagos 123456\n'
135        customer.reg_number = u'123'
136        customer.firstname = u'Anna'
137        customer.lastname = u'Tester'
138        customer.middlename = u'M.'
139        customer.date_of_birth = datetime.date(1981, 2, 4)
140        #customer.sex = 'f'
141        customer.email = u'anna@sample.com'
142        customer.phone = u'+234-123-12345'
143        customer.notice = u'Some notice\nin lines.'
144        customer.nationality = u'NG'
145        # Add document
146        document = createObject('waeup.CustomerSampleDocument')
147        document.title = u'My Document'
148        document.document_id = u'DOC1'
149        customer['documents'].addDocument(document)
150        IWorkflowState(document).setState(VERIFIED)
151        self.document = document
152        # Add contract
153        contract = createObject('waeup.SampleContract')
154        contract.contract_id = u'CON1'
155        contract.product_object = self.product
156        contract.document_object = self.document
157        contract.product_options = self.product.options
158        contract.tc_dict = {'en': u'Hello world'}
159        customer['contracts'].addContract(contract)
160        self.contract = contract
161        return customer
162
163class CustomerProcessorTest(CustomerImportExportSetup):
164
165    layer = FunctionalLayer
166
167    def setUp(self):
168        super(CustomerProcessorTest, self).setUp()
169
170        # Add customer
171        customer = Customer()
172        self.app['customers'].addCustomer(customer)
173        customer = self.setup_customer(customer)
174        notify(grok.ObjectModifiedEvent(customer))
175        self.customer = self.app['customers'][customer.customer_id]
176
177        self.processor = CustomerProcessor()
178        self.csv_file = os.path.join(self.workdir, 'sample_customer_data.csv')
179        self.csv_file_update = os.path.join(
180            self.workdir, 'sample_customer_data_update.csv')
181        self.csv_file_duplicates = os.path.join(
182            self.workdir, 'sample_customer_data_duplicates.csv')
183        open(self.csv_file, 'wb').write(CUSTOMER_SAMPLE_DATA)
184        open(self.csv_file_update, 'wb').write(CUSTOMER_SAMPLE_DATA_UPDATE)
185        open(self.csv_file_duplicates, 'wb').write(CUSTOMER_SAMPLE_DATA_DUPLICATES)
186
187    def test_interface(self):
188        # Make sure we fulfill the interface contracts.
189        assert verifyObject(IBatchProcessor, self.processor) is True
190        assert verifyClass(
191            IBatchProcessor, CustomerProcessor) is True
192
193    def test_parentsExist(self):
194        self.assertFalse(self.processor.parentsExist(None, dict()))
195        self.assertTrue(self.processor.parentsExist(None, self.app))
196
197    def test_entryExists(self):
198        assert self.processor.entryExists(
199            dict(customer_id='ID_NONE'), self.app) is False
200        assert self.processor.entryExists(
201            dict(reg_number='123'), self.app) is True
202
203    def test_getParent(self):
204        parent = self.processor.getParent(None, self.app)
205        assert parent is self.app['customers']
206
207    def test_getEntry(self):
208        assert self.processor.getEntry(
209            dict(customer_id='ID_NONE'), self.app) is None
210        assert self.processor.getEntry(
211            dict(customer_id=self.customer.customer_id), self.app) is self.customer
212
213    def test_addEntry(self):
214        new_customer = Customer()
215        self.processor.addEntry(
216            new_customer, dict(), self.app)
217        assert len(self.app['customers'].keys()) == 2
218
219    def test_checkConversion(self):
220        # Make sure we can check conversions and that the cust_id
221        # counter is not raised during such checks.
222        initial_cust_id = self.app['customers']._curr_cust_id
223        errs, inv_errs, conv_dict = self.processor.checkConversion(
224            dict(reg_number='1', state='approved'))
225        self.assertEqual(len(errs),0)
226        # Empty state is allowed
227        errs, inv_errs, conv_dict = self.processor.checkConversion(
228            dict(reg_number='1', state=''))
229        self.assertEqual(len(errs),0)
230        #self.assertTrue(('state', 'no value provided') in errs)
231        errs, inv_errs, conv_dict = self.processor.checkConversion(
232            dict(reg_number='1', state='nonsense'))
233        self.assertEqual(len(errs),1)
234        self.assertTrue(('state', 'not allowed') in errs)
235        new_cust_id = self.app['customers']._curr_cust_id
236        self.assertEqual(initial_cust_id, new_cust_id)
237        return
238
239    def test_checkUpdateRequirements(self):
240        # Make sure that pg customers can't be updated with wrong transition.
241        err = self.processor.checkUpdateRequirements(self.customer,
242            dict(reg_number='1', state='returning'), self.app)
243        self.assertTrue(err is None)
244
245    def test_delEntry(self):
246        assert self.customer.customer_id in self.app['customers'].keys()
247        self.processor.delEntry(
248            dict(reg_number=self.customer.reg_number), self.app)
249        assert self.customer.customer_id not in self.app['customers'].keys()
250
251    def test_import(self):
252        self.assertEqual(self.app['customers']._curr_cust_id, 1000001)
253        num, num_warns, fin_file, fail_file = self.processor.doImport(
254            self.csv_file, CUSTOMER_HEADER_FIELDS)
255        self.assertEqual(num_warns,0)
256        self.assertEqual(len(self.app['customers']), 10)
257        self.assertEqual(self.app['customers']['X666666'].reg_number,'1')
258        self.assertEqual(
259            self.app['customers']['X666666'].state, 'requested')
260        # Two new customer_ids have been created.
261        self.assertEqual(self.app['customers']._curr_cust_id, 1000003)
262        shutil.rmtree(os.path.dirname(fin_file))
263
264    def test_import_update(self):
265        num, num_warns, fin_file, fail_file = self.processor.doImport(
266            self.csv_file, CUSTOMER_HEADER_FIELDS)
267        shutil.rmtree(os.path.dirname(fin_file))
268        num, num_warns, fin_file, fail_file = self.processor.doImport(
269            self.csv_file_update, CUSTOMER_HEADER_FIELDS_UPDATE, 'update')
270        self.assertEqual(num_warns,0)
271        # state has changed
272        self.assertEqual(self.app['customers']['X666666'].state,'approved')
273        # state has not changed
274        self.assertEqual(self.app['customers']['Y777777'].state, 'requested')
275        shutil.rmtree(os.path.dirname(fin_file))
276
277    def test_import_duplicate_data(self):
278        num, num_warns, fin_file, fail_file = self.processor.doImport(
279            self.csv_file_duplicates, CUSTOMER_HEADER_FIELDS_DUPLICATES)
280        content = open(fail_file).read()
281        self.assertEqual(num_warns,3)
282        self.assertEqual(
283            content,
284            'reg_number,password,firstname,lastname,sex,phone,state,date_of_birth,customer_id,email,--ERRORS--\r\n'
285            '1,,Frank,Meyer,m,1234,,1990-01-06,F123456,cc@aa.ng,reg_number: Invalid input\r\n'
286            '3,,Uli,Schulz,m,1234,,1990-01-07,A123456,dd@aa.ng,This object already exists. Skipping.\r\n'
287            '4,,Berta,Pieri,m,1234,,1991-01-07,A234567,aa@aa.ng,email: Invalid input\r\n'
288            )
289        shutil.rmtree(os.path.dirname(fin_file))
290
291class CustomerSampleDocumentProcessorTest(CustomerImportExportSetup):
292
293    def setUp(self):
294        super(CustomerSampleDocumentProcessorTest, self).setUp()
295
296        customer = Customer()
297        self.app['customers'].addCustomer(customer)
298        customer = self.setup_customer(customer)
299        notify(grok.ObjectModifiedEvent(customer))
300        self.customer = self.app['customers'][customer.customer_id]
301
302        # Import customers with subobjects
303        customer_file = os.path.join(self.workdir, 'sample_customer_data.csv')
304        open(customer_file, 'wb').write(CUSTOMER_SAMPLE_DATA)
305        num, num_warns, fin_file, fail_file = CustomerProcessor().doImport(
306            customer_file, CUSTOMER_HEADER_FIELDS)
307        shutil.rmtree(os.path.dirname(fin_file))
308
309        self.processor = CustomerSampleDocumentProcessor()
310        self.csv_file = os.path.join(
311            self.workdir, 'sample_document_data.csv')
312        open(self.csv_file, 'wb').write(DOCUMENT_SAMPLE_DATA)
313
314    def test_interface(self):
315        # Make sure we fulfill the interface contracts.
316        assert verifyObject(IBatchProcessor, self.processor) is True
317        assert verifyClass(
318            IBatchProcessor, CustomerSampleDocumentProcessor) is True
319
320    def test_getEntry(self):
321        assert self.processor.getEntry(
322            dict(customer_id='ID_NONE', document_id='nonsense'), self.app) is None
323        assert self.processor.getEntry(
324            dict(customer_id=self.customer.customer_id, document_id='DOC1'),
325            self.app) is self.customer['documents']['DOC1']
326
327    def test_delEntry(self):
328        assert self.processor.getEntry(
329            dict(customer_id=self.customer.customer_id, document_id='DOC1'),
330            self.app) is self.customer['documents']['DOC1']
331        self.assertEqual(len(self.customer['documents'].keys()),1)
332        self.processor.delEntry(
333            dict(customer_id=self.customer.customer_id, document_id='DOC1'),
334            self.app)
335        assert self.processor.getEntry(
336            dict(customer_id=self.customer.customer_id, document_id='DOC1'),
337            self.app) is None
338        self.assertEqual(len(self.customer['documents'].keys()),0)
339
340    def test_addEntry(self):
341        self.assertEqual(len(self.customer['documents'].keys()),1)
342        document1 = createObject(u'waeup.CustomerSampleDocument')
343        document1.document_id = u'd234'
344        self.processor.addEntry(
345            document1, dict(customer_id=self.customer.customer_id, document_id='d234'),
346            self.app)
347        self.assertEqual(len(self.customer['documents'].keys()),2)
348        self.assertEqual(self.customer['documents']['d234'].document_id, 'd234')
349
350    def test_checkConversion(self):
351        errs, inv_errs, conv_dict = self.processor.checkConversion(
352            dict(document_id='DOC', class_name='CustomerSampleDocument'))
353        self.assertEqual(len(errs),0)
354        errs, inv_errs, conv_dict = self.processor.checkConversion(
355            dict(document_id='id with spaces', class_name='CustomerSampleDocument'))
356        self.assertEqual(len(errs),1)
357        errs, inv_errs, conv_dict = self.processor.checkConversion(
358            dict(document_id='DOC', class_name='WrongDocument'))
359        self.assertEqual(len(errs),1)
360
361    def test_import(self):
362        num, num_warns, fin_file, fail_file = self.processor.doImport(
363            self.csv_file, DOCUMENT_HEADER_FIELDS,'create')
364        fail_file = open(fail_file).read()
365        self.assertEqual(num_warns,1)
366        # document_id must be unique
367        self.assertEqual(fail_file,
368            'class_name,reg_number,document_id,title,--ERRORS--\r\n'
369            'CustomerSampleDocument,2,d5,My stolen doc,'
370            'This object already exists. Skipping.\r\n')
371        document = self.processor.getEntry(dict(reg_number='1',
372            document_id='d3'), self.app)
373        self.assertEqual(
374            self.app['customers']['X666666']['documents']['d3'],
375            document)
376        self.assertEqual(document.document_id, 'd3')
377        document = self.processor.getEntry(dict(reg_number='3',
378            document_id='d5'), self.app)
379        shutil.rmtree(os.path.dirname(fin_file))
380        logcontent = open(self.logfile).read()
381        # Fetch uuid
382        docid = [i for i in self.app['customers']['X666666']['documents'].keys()
383            if len(i) > 10][0]
384         # Logging message from updateEntry
385        self.assertTrue(
386            'INFO - system - Customer Sample Document Processor - '
387            'sample_document_data - X666666 - %s - updated: title=My 4th doc'
388            % docid in logcontent)
389
390    def test_import_update(self):
391        # We perform the same import twice,
392        # the second time in update mode. The number
393        # of warnings must be the same.
394        num, num_warns, fin_file, fail_file = self.processor.doImport(
395            self.csv_file, DOCUMENT_HEADER_FIELDS,'create')
396        shutil.rmtree(os.path.dirname(fin_file))
397        num, num_warns, fin_file, fail_file = self.processor.doImport(
398            self.csv_file, DOCUMENT_HEADER_FIELDS,'update')
399        # There is one record without document_id
400        self.assertEqual(num_warns,1)
401        shutil.rmtree(os.path.dirname(fin_file))
402
403    def test_import_remove(self):
404        # We perform the same import twice,
405        # the second time in remove mode. The number
406        # of warnings must be the same.
407        num, num_warns, fin_file, fail_file = self.processor.doImport(
408            self.csv_file, DOCUMENT_HEADER_FIELDS,'create')
409        shutil.rmtree(os.path.dirname(fin_file))
410        num, num_warns, fin_file, fail_file = self.processor.doImport(
411            self.csv_file, DOCUMENT_HEADER_FIELDS,'remove')
412        # There is one record without document_id and one duplicate
413        self.assertEqual(num_warns,2)
414        shutil.rmtree(os.path.dirname(fin_file))
415        logcontent = open(self.logfile).read()
416        self.assertTrue(
417            'INFO - system - K1000001 - Document removed: d5'
418            in logcontent)
419
420
421class SampleContractProcessorTest(CustomerImportExportSetup):
422
423    def setUp(self):
424        super(SampleContractProcessorTest, self).setUp()
425
426        customer = Customer()
427        self.app['customers'].addCustomer(customer)
428        customer = self.setup_customer(customer)
429        notify(grok.ObjectModifiedEvent(customer))
430        self.customer = self.app['customers'][customer.customer_id]
431
432        # Import customers with subobjects
433        customer_file = os.path.join(self.workdir, 'sample_customer_data.csv')
434        open(customer_file, 'wb').write(CUSTOMER_SAMPLE_DATA)
435        num, num_warns, fin_file, fail_file = CustomerProcessor().doImport(
436            customer_file, CUSTOMER_HEADER_FIELDS)
437        shutil.rmtree(os.path.dirname(fin_file))
438
439        self.processor = SampleContractProcessor()
440        self.csv_file = os.path.join(
441            self.workdir, 'sample_contract_data.csv')
442        open(self.csv_file, 'wb').write(CONTRACT_SAMPLE_DATA)
443
444    def test_interface(self):
445        # Make sure we fulfill the interface contracts.
446        assert verifyObject(IBatchProcessor, self.processor) is True
447        assert verifyClass(
448            IBatchProcessor, SampleContractProcessor) is True
449
450    def test_getEntry(self):
451        assert self.processor.getEntry(
452            dict(customer_id='ID_NONE', contract_id='nonsense'), self.app) is None
453        assert self.processor.getEntry(
454            dict(customer_id=self.customer.customer_id, contract_id='CON1'),
455            self.app) is self.customer['contracts']['CON1']
456
457    def test_delEntry(self):
458        assert self.processor.getEntry(
459            dict(customer_id=self.customer.customer_id, contract_id='CON1'),
460            self.app) is self.customer['contracts']['CON1']
461        self.assertEqual(len(self.customer['contracts'].keys()),1)
462        self.processor.delEntry(
463            dict(customer_id=self.customer.customer_id, contract_id='CON1'),
464            self.app)
465        assert self.processor.getEntry(
466            dict(customer_id=self.customer.customer_id, contract_id='CON1'),
467            self.app) is None
468        self.assertEqual(len(self.customer['contracts'].keys()),0)
469
470    def test_addEntry(self):
471        self.assertEqual(len(self.customer['contracts'].keys()),1)
472        contract1 = createObject(u'waeup.SampleContract')
473        contract1.contract_id = u'CON2'
474        self.processor.addEntry(
475            contract1, dict(customer_id=self.customer.customer_id, contract_id='CON2'),
476            self.app)
477        self.assertEqual(len(self.customer['contracts'].keys()),2)
478        self.assertEqual(self.customer['contracts']['CON2'].contract_id, 'CON2')
479
480    def test_checkConversion(self):
481        errs, inv_errs, conv_dict = self.processor.checkConversion(
482            dict(contract_id='CON3', class_name='SampleContract',
483                 document_object='DOC1', product_object='SAM'))
484        self.assertEqual(len(errs),0)
485        errs, inv_errs, conv_dict = self.processor.checkConversion(
486            dict(contract_id='id with spaces', class_name='SampleContract'))
487        self.assertEqual(len(errs),1)
488        errs, inv_errs, conv_dict = self.processor.checkConversion(
489            dict(contract_id='CON3', class_name='WrongContract'))
490        self.assertEqual(len(errs),1)
491
492    def test_import(self):
493        num, num_warns, fin_file, fail_file = self.processor.doImport(
494            self.csv_file, CONTRACT_HEADER_FIELDS,'create')
495        self.assertEqual(num_warns,3)
496        # contract_id must be unique
497        fail_file = open(fail_file).read()
498        self.assertEqual(fail_file,
499            'reg_number,contract_id,class_name,product_object,tc_dict,document_object,product_options,--ERRORS--\r\n'
500            '2,c5,SampleContract,SAM,{},DOC1,[],This object already exists. Skipping.\r\n'
501            '3,c6,SampleContract,SAM,{},NONEXISTENT,[],document_object: Invalid value\r\n'
502            '3,c7,SampleContract,NONEXISTENT,{},DOC1,[],product_object: Invalid value\r\n'
503            )
504        contract = self.processor.getEntry(dict(reg_number='1',
505            contract_id='c3'), self.app)
506        self.assertEqual(
507            self.app['customers']['X666666']['contracts']['c3'],
508            contract)
509        self.assertEqual(contract.product_options[0].title, 'Base Fee')
510        self.assertEqual(contract.product_options[0].fee, Decimal('800.6') )
511        self.assertEqual(contract.product_options[0].currency, 'USD')
512        self.assertEqual(contract.contract_id, 'c3')
513        self.assertEqual(contract.tc_dict, u"{'en':'Hello World'}")
514        contract = self.processor.getEntry(dict(reg_number='3',
515            contract_id='c5'), self.app)
516        shutil.rmtree(os.path.dirname(fin_file))
517        logcontent = open(self.logfile).read()
518        # Logging message from updateEntry
519        # Fetch uuid
520        conid = [i for i in self.app['customers']['X666666']['contracts'].keys()
521            if len(i) > 10][0]
522        self.assertTrue(
523            'INFO - system - Customer Sample Contract Processor - sample_contract_data - '
524            'X666666 - %s - updated: product_object=SAM, tc_dict={}, document_object=DOC1, '
525            'product_options=[]' % conid
526            in logcontent)
527
528    def test_import_update(self):
529        # We perform the same import twice,
530        # the second time in update mode. The number
531        # of warnings must be the same.
532        num, num_warns, fin_file, fail_file = self.processor.doImport(
533            self.csv_file, CONTRACT_HEADER_FIELDS,'create')
534        shutil.rmtree(os.path.dirname(fin_file))
535        num, num_warns, fin_file, fail_file = self.processor.doImport(
536            self.csv_file, CONTRACT_HEADER_FIELDS,'update')
537        # There are one record without contract_id and two
538        # records with wrong object identifier.
539        self.assertEqual(num_warns,3)
540        shutil.rmtree(os.path.dirname(fin_file))
541
542    def test_import_remove(self):
543        # We perform the same import twice,
544        # the second time in remove mode. The number
545        # of warnings must be the same.
546        num, num_warns, fin_file, fail_file = self.processor.doImport(
547            self.csv_file, CONTRACT_HEADER_FIELDS,'create')
548        shutil.rmtree(os.path.dirname(fin_file))
549        num, num_warns, fin_file, fail_file = self.processor.doImport(
550            self.csv_file, CONTRACT_HEADER_FIELDS,'remove')
551        # There are one record without contract_id, one duplicate and two
552        # records with wrong object identifier.
553        self.assertEqual(num_warns,4)
554        shutil.rmtree(os.path.dirname(fin_file))
555        logcontent = open(self.logfile).read()
556        self.assertTrue(
557            'INFO - system - K1000001 - Contract removed: c5'
558            in logcontent)
Note: See TracBrowser for help on using the repository browser.