source: main/waeup.ikoba/trunk/src/waeup/ikoba/customers/tests/test_batching.py @ 12386

Last change on this file since 12386 was 12366, checked in by Henrik Bettermann, 10 years ago

Extend test.

  • Property svn:keywords set to Id
File size: 23.7 KB
Line 
1# -*- coding: utf-8 -*-
2## $Id: test_batching.py 12366 2015-01-02 10:24:02Z henrik $
3##
4## Copyright (C) 2014 Uli Fouquet & Henrik Bettermann
5## This program is free software; you can redistribute it and/or modify
6## it under the terms of the GNU General Public License as published by
7## the Free Software Foundation; either version 2 of the License, or
8## (at your option) any later version.
9##
10## This program is distributed in the hope that it will be useful,
11## but WITHOUT ANY WARRANTY; without even the implied warranty of
12## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13## GNU General Public License for more details.
14##
15## You should have received a copy of the GNU General Public License
16## along with this program; if not, write to the Free Software
17## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18##
19"""Unit tests for customer-related data processors.
20"""
21import os
22import shutil
23import tempfile
24import unittest
25import datetime
26import grok
27from decimal import Decimal
28from time import time
29from zope.event import notify
30from zope.component import createObject, queryUtility
31from zope.component.hooks import setSite, clearSite
32from zope.catalog.interfaces import ICatalog
33from zope.interface.verify import verifyClass, verifyObject
34from hurry.workflow.interfaces import IWorkflowState
35
36from waeup.ikoba.app import Company
37from waeup.ikoba.interfaces import (
38    IBatchProcessor, FatalCSVError, IUserAccount, VERIFIED)
39from waeup.ikoba.customers.batching import (
40    CustomerProcessor, CustomerSampleDocumentProcessor, SampleContractProcessor)
41from waeup.ikoba.customers.customer import Customer
42from waeup.ikoba.customers.documents import CustomerSampleDocument
43from waeup.ikoba.products.productoptions import ProductOption
44from waeup.ikoba.testing import FunctionalLayer, FunctionalTestCase
45
46CUSTOMER_SAMPLE_DATA = open(
47    os.path.join(os.path.dirname(__file__), 'sample_customer_data.csv'),
48    'rb').read()
49
50CUSTOMER_HEADER_FIELDS = CUSTOMER_SAMPLE_DATA.split(
51    '\n')[0].split(',')
52
53CUSTOMER_SAMPLE_DATA_UPDATE = open(
54    os.path.join(os.path.dirname(__file__), 'sample_customer_data_update.csv'),
55    'rb').read()
56
57CUSTOMER_HEADER_FIELDS_UPDATE = CUSTOMER_SAMPLE_DATA_UPDATE.split(
58    '\n')[0].split(',')
59
60CUSTOMER_SAMPLE_DATA_DUPLICATES = open(
61    os.path.join(os.path.dirname(__file__),
62                 'sample_customer_data_duplicates.csv'),
63    'rb').read()
64
65CUSTOMER_HEADER_FIELDS_DUPLICATES = CUSTOMER_SAMPLE_DATA_DUPLICATES.split(
66    '\n')[0].split(',')
67
68DOCUMENT_SAMPLE_DATA = open(
69    os.path.join(os.path.dirname(__file__), 'sample_document_data.csv'),
70    'rb').read()
71
72DOCUMENT_HEADER_FIELDS = DOCUMENT_SAMPLE_DATA.split(
73    '\n')[0].split(',')
74
75CONTRACT_SAMPLE_DATA = open(
76    os.path.join(os.path.dirname(__file__), 'sample_contract_data.csv'),
77    'rb').read()
78
79CONTRACT_HEADER_FIELDS = CONTRACT_SAMPLE_DATA.split(
80    '\n')[0].split(',')
81
82class CustomerImportExportSetup(FunctionalTestCase):
83
84    layer = FunctionalLayer
85
86    def setUp(self):
87        super(CustomerImportExportSetup, self).setUp()
88        self.dc_root = tempfile.mkdtemp()
89        self.workdir = tempfile.mkdtemp()
90        app = Company()
91        app['datacenter'].setStoragePath(self.dc_root)
92        self.getRootFolder()['app'] = app
93        self.app = self.getRootFolder()['app']
94        setSite(app)
95        self.logfile = os.path.join(
96            self.app['datacenter'].storage, 'logs', 'customers.log')
97
98        #Add some products
99        self.product = createObject('waeup.Product')
100        self.product.product_id = u'SAM'
101        self.product.title = u'Our Sample Product'
102        self.product.contract_category = u'sample'
103        self.product.valid_from = datetime.date(2015, 12, 4)
104        self.product.tc_dict = {'en': u'Hello world'}
105        option = ProductOption()
106        option.title = u'Base Fee'
107        option.fee = Decimal('800.6')
108        option.currency = 'USD'
109        self.product.options = [option,]
110        self.app['products'].addProduct(self.product)
111        return
112
113    def tearDown(self):
114        super(CustomerImportExportSetup, self).tearDown()
115        shutil.rmtree(self.workdir)
116        shutil.rmtree(self.dc_root)
117        clearSite()
118        return
119
120    def setup_for_export(self):
121        customer = Customer()
122        customer.customer_id = u'A111111'
123        self.app['customers'][customer.customer_id] = self.customer = customer
124        self.outfile = os.path.join(self.workdir, 'myoutput.csv')
125        return
126
127    def setup_customer(self, customer):
128        customer.matric_number = u'234'
129        customer.perm_address = u'Customerroad 21\nLagos 123456\n'
130        customer.reg_number = u'123'
131        customer.firstname = u'Anna'
132        customer.lastname = u'Tester'
133        customer.middlename = u'M.'
134        customer.date_of_birth = datetime.date(1981, 2, 4)
135        #customer.sex = 'f'
136        customer.email = 'anna@sample.com'
137        customer.phone = u'+234-123-12345'
138        customer.notice = u'Some notice\nin lines.'
139        customer.nationality = u'NG'
140        # Add document
141        document = createObject('waeup.CustomerSampleDocument')
142        document.title = u'My Document'
143        document.document_id = u'DOC1'
144        customer['documents'].addDocument(document)
145        IWorkflowState(document).setState(VERIFIED)
146        self.document = document
147        # Add contract
148        contract = createObject('waeup.SampleContract')
149        contract.contract_id = u'CON1'
150        contract.product_object = self.product
151        contract.document_object = self.document
152        contract.product_options = self.product.options
153        contract.tc_dict = {'en': u'Hello world'}
154        customer['contracts'].addContract(contract)
155        self.contract = contract
156        return customer
157
158class CustomerProcessorTest(CustomerImportExportSetup):
159
160    layer = FunctionalLayer
161
162    def setUp(self):
163        super(CustomerProcessorTest, self).setUp()
164
165        # Add customer
166        customer = Customer()
167        self.app['customers'].addCustomer(customer)
168        customer = self.setup_customer(customer)
169        notify(grok.ObjectModifiedEvent(customer))
170        self.customer = self.app['customers'][customer.customer_id]
171
172        self.processor = CustomerProcessor()
173        self.csv_file = os.path.join(self.workdir, 'sample_customer_data.csv')
174        self.csv_file_update = os.path.join(
175            self.workdir, 'sample_customer_data_update.csv')
176        self.csv_file_duplicates = os.path.join(
177            self.workdir, 'sample_customer_data_duplicates.csv')
178        open(self.csv_file, 'wb').write(CUSTOMER_SAMPLE_DATA)
179        open(self.csv_file_update, 'wb').write(CUSTOMER_SAMPLE_DATA_UPDATE)
180        open(self.csv_file_duplicates, 'wb').write(CUSTOMER_SAMPLE_DATA_DUPLICATES)
181
182    def test_interface(self):
183        # Make sure we fulfill the interface contracts.
184        assert verifyObject(IBatchProcessor, self.processor) is True
185        assert verifyClass(
186            IBatchProcessor, CustomerProcessor) is True
187
188    def test_parentsExist(self):
189        self.assertFalse(self.processor.parentsExist(None, dict()))
190        self.assertTrue(self.processor.parentsExist(None, self.app))
191
192    def test_entryExists(self):
193        assert self.processor.entryExists(
194            dict(customer_id='ID_NONE'), self.app) is False
195        assert self.processor.entryExists(
196            dict(reg_number='123'), self.app) is True
197
198    def test_getParent(self):
199        parent = self.processor.getParent(None, self.app)
200        assert parent is self.app['customers']
201
202    def test_getEntry(self):
203        assert self.processor.getEntry(
204            dict(customer_id='ID_NONE'), self.app) is None
205        assert self.processor.getEntry(
206            dict(customer_id=self.customer.customer_id), self.app) is self.customer
207
208    def test_addEntry(self):
209        new_customer = Customer()
210        self.processor.addEntry(
211            new_customer, dict(), self.app)
212        assert len(self.app['customers'].keys()) == 2
213
214    def test_checkConversion(self):
215        # Make sure we can check conversions and that the cust_id
216        # counter is not raised during such checks.
217        initial_cust_id = self.app['customers']._curr_cust_id
218        errs, inv_errs, conv_dict = self.processor.checkConversion(
219            dict(reg_number='1', state='approved'))
220        self.assertEqual(len(errs),0)
221        # Empty state is allowed
222        errs, inv_errs, conv_dict = self.processor.checkConversion(
223            dict(reg_number='1', state=''))
224        self.assertEqual(len(errs),0)
225        #self.assertTrue(('state', 'no value provided') in errs)
226        errs, inv_errs, conv_dict = self.processor.checkConversion(
227            dict(reg_number='1', state='nonsense'))
228        self.assertEqual(len(errs),1)
229        self.assertTrue(('state', 'not allowed') in errs)
230        new_cust_id = self.app['customers']._curr_cust_id
231        self.assertEqual(initial_cust_id, new_cust_id)
232        return
233
234    def test_checkUpdateRequirements(self):
235        # Make sure that pg customers can't be updated with wrong transition.
236        err = self.processor.checkUpdateRequirements(self.customer,
237            dict(reg_number='1', state='returning'), self.app)
238        self.assertTrue(err is None)
239
240    def test_delEntry(self):
241        assert self.customer.customer_id in self.app['customers'].keys()
242        self.processor.delEntry(
243            dict(reg_number=self.customer.reg_number), self.app)
244        assert self.customer.customer_id not in self.app['customers'].keys()
245
246    def test_import(self):
247        self.assertEqual(self.app['customers']._curr_cust_id, 1000001)
248        num, num_warns, fin_file, fail_file = self.processor.doImport(
249            self.csv_file, CUSTOMER_HEADER_FIELDS)
250        self.assertEqual(num_warns,0)
251        self.assertEqual(len(self.app['customers']), 10)
252        self.assertEqual(self.app['customers']['X666666'].reg_number,'1')
253        self.assertEqual(
254            self.app['customers']['X666666'].state, 'requested')
255        # Two new customer_ids have been created.
256        self.assertEqual(self.app['customers']._curr_cust_id, 1000003)
257        shutil.rmtree(os.path.dirname(fin_file))
258
259    def test_import_update(self):
260        num, num_warns, fin_file, fail_file = self.processor.doImport(
261            self.csv_file, CUSTOMER_HEADER_FIELDS)
262        shutil.rmtree(os.path.dirname(fin_file))
263        num, num_warns, fin_file, fail_file = self.processor.doImport(
264            self.csv_file_update, CUSTOMER_HEADER_FIELDS_UPDATE, 'update')
265        self.assertEqual(num_warns,0)
266        # state has changed
267        self.assertEqual(self.app['customers']['X666666'].state,'approved')
268        # state has not changed
269        self.assertEqual(self.app['customers']['Y777777'].state, 'requested')
270        shutil.rmtree(os.path.dirname(fin_file))
271
272    def test_import_duplicate_data(self):
273        num, num_warns, fin_file, fail_file = self.processor.doImport(
274            self.csv_file_duplicates, CUSTOMER_HEADER_FIELDS_DUPLICATES)
275        content = open(fail_file).read()
276        self.assertEqual(num_warns,2)
277        self.assertEqual(
278            content,
279            'reg_number,password,firstname,lastname,sex,phone,state,date_of_birth,customer_id,email,--ERRORS--\r\n'
280            '1,,Frank,Meyer,m,1234,,1990-01-06,F123456,aa@aa.ng,reg_number: Invalid input\r\n'
281            '3,,Uli,Schulz,m,1234,,1990-01-07,A123456,aa@aa.ng,This object already exists. Skipping.\r\n'
282
283            )
284        shutil.rmtree(os.path.dirname(fin_file))
285
286class CustomerSampleDocumentProcessorTest(CustomerImportExportSetup):
287
288    def setUp(self):
289        super(CustomerSampleDocumentProcessorTest, self).setUp()
290
291        customer = Customer()
292        self.app['customers'].addCustomer(customer)
293        customer = self.setup_customer(customer)
294        notify(grok.ObjectModifiedEvent(customer))
295        self.customer = self.app['customers'][customer.customer_id]
296
297        # Import customers with subobjects
298        customer_file = os.path.join(self.workdir, 'sample_customer_data.csv')
299        open(customer_file, 'wb').write(CUSTOMER_SAMPLE_DATA)
300        num, num_warns, fin_file, fail_file = CustomerProcessor().doImport(
301            customer_file, CUSTOMER_HEADER_FIELDS)
302        shutil.rmtree(os.path.dirname(fin_file))
303
304        self.processor = CustomerSampleDocumentProcessor()
305        self.csv_file = os.path.join(
306            self.workdir, 'sample_document_data.csv')
307        open(self.csv_file, 'wb').write(DOCUMENT_SAMPLE_DATA)
308
309    def test_interface(self):
310        # Make sure we fulfill the interface contracts.
311        assert verifyObject(IBatchProcessor, self.processor) is True
312        assert verifyClass(
313            IBatchProcessor, CustomerSampleDocumentProcessor) is True
314
315    def test_getEntry(self):
316        assert self.processor.getEntry(
317            dict(customer_id='ID_NONE', document_id='nonsense'), self.app) is None
318        assert self.processor.getEntry(
319            dict(customer_id=self.customer.customer_id, document_id='DOC1'),
320            self.app) is self.customer['documents']['DOC1']
321
322    def test_delEntry(self):
323        assert self.processor.getEntry(
324            dict(customer_id=self.customer.customer_id, document_id='DOC1'),
325            self.app) is self.customer['documents']['DOC1']
326        self.assertEqual(len(self.customer['documents'].keys()),1)
327        self.processor.delEntry(
328            dict(customer_id=self.customer.customer_id, document_id='DOC1'),
329            self.app)
330        assert self.processor.getEntry(
331            dict(customer_id=self.customer.customer_id, document_id='DOC1'),
332            self.app) is None
333        self.assertEqual(len(self.customer['documents'].keys()),0)
334
335    def test_addEntry(self):
336        self.assertEqual(len(self.customer['documents'].keys()),1)
337        document1 = createObject(u'waeup.CustomerSampleDocument')
338        document1.document_id = u'd234'
339        self.processor.addEntry(
340            document1, dict(customer_id=self.customer.customer_id, document_id='d234'),
341            self.app)
342        self.assertEqual(len(self.customer['documents'].keys()),2)
343        self.assertEqual(self.customer['documents']['d234'].document_id, 'd234')
344
345    def test_checkConversion(self):
346        errs, inv_errs, conv_dict = self.processor.checkConversion(
347            dict(document_id='DOC', class_name='CustomerSampleDocument'))
348        self.assertEqual(len(errs),0)
349        errs, inv_errs, conv_dict = self.processor.checkConversion(
350            dict(document_id='id with spaces', class_name='CustomerSampleDocument'))
351        self.assertEqual(len(errs),1)
352        errs, inv_errs, conv_dict = self.processor.checkConversion(
353            dict(document_id='DOC', class_name='WrongDocument'))
354        self.assertEqual(len(errs),1)
355
356    def test_import(self):
357        num, num_warns, fin_file, fail_file = self.processor.doImport(
358            self.csv_file, DOCUMENT_HEADER_FIELDS,'create')
359        fail_file = open(fail_file).read()
360        self.assertEqual(num_warns,1)
361        # document_id must be unique
362        self.assertEqual(fail_file,
363            'class_name,reg_number,document_id,title,--ERRORS--\r\n'
364            'CustomerSampleDocument,2,d5,My stolen doc,'
365            'This object already exists. Skipping.\r\n')
366        document = self.processor.getEntry(dict(reg_number='1',
367            document_id='d3'), self.app)
368        self.assertEqual(
369            self.app['customers']['X666666']['documents']['d3'],
370            document)
371        self.assertEqual(document.document_id, 'd3')
372        document = self.processor.getEntry(dict(reg_number='3',
373            document_id='d5'), self.app)
374        shutil.rmtree(os.path.dirname(fin_file))
375        logcontent = open(self.logfile).read()
376        # Fetch uuid
377        docid = [i for i in self.app['customers']['X666666']['documents'].keys()
378            if len(i) > 10][0]
379         # Logging message from updateEntry
380        self.assertTrue(
381            'INFO - system - Customer Sample Document Processor - '
382            'sample_document_data - X666666 - %s - updated: title=My 4th doc'
383            % docid in logcontent)
384
385    def test_import_update(self):
386        # We perform the same import twice,
387        # the second time in update mode. The number
388        # of warnings must be the same.
389        num, num_warns, fin_file, fail_file = self.processor.doImport(
390            self.csv_file, DOCUMENT_HEADER_FIELDS,'create')
391        shutil.rmtree(os.path.dirname(fin_file))
392        num, num_warns, fin_file, fail_file = self.processor.doImport(
393            self.csv_file, DOCUMENT_HEADER_FIELDS,'update')
394        # There is one record without document_id
395        self.assertEqual(num_warns,1)
396        shutil.rmtree(os.path.dirname(fin_file))
397
398    def test_import_remove(self):
399        # We perform the same import twice,
400        # the second time in remove mode. The number
401        # of warnings must be the same.
402        num, num_warns, fin_file, fail_file = self.processor.doImport(
403            self.csv_file, DOCUMENT_HEADER_FIELDS,'create')
404        shutil.rmtree(os.path.dirname(fin_file))
405        num, num_warns, fin_file, fail_file = self.processor.doImport(
406            self.csv_file, DOCUMENT_HEADER_FIELDS,'remove')
407        # There is one record without document_id and one duplicate
408        self.assertEqual(num_warns,2)
409        shutil.rmtree(os.path.dirname(fin_file))
410        logcontent = open(self.logfile).read()
411        self.assertTrue(
412            'INFO - system - K1000001 - Document removed: d5'
413            in logcontent)
414
415
416class SampleContractProcessorTest(CustomerImportExportSetup):
417
418    def setUp(self):
419        super(SampleContractProcessorTest, self).setUp()
420
421        customer = Customer()
422        self.app['customers'].addCustomer(customer)
423        customer = self.setup_customer(customer)
424        notify(grok.ObjectModifiedEvent(customer))
425        self.customer = self.app['customers'][customer.customer_id]
426
427        # Import customers with subobjects
428        customer_file = os.path.join(self.workdir, 'sample_customer_data.csv')
429        open(customer_file, 'wb').write(CUSTOMER_SAMPLE_DATA)
430        num, num_warns, fin_file, fail_file = CustomerProcessor().doImport(
431            customer_file, CUSTOMER_HEADER_FIELDS)
432        shutil.rmtree(os.path.dirname(fin_file))
433
434        self.processor = SampleContractProcessor()
435        self.csv_file = os.path.join(
436            self.workdir, 'sample_contract_data.csv')
437        open(self.csv_file, 'wb').write(CONTRACT_SAMPLE_DATA)
438
439    def test_interface(self):
440        # Make sure we fulfill the interface contracts.
441        assert verifyObject(IBatchProcessor, self.processor) is True
442        assert verifyClass(
443            IBatchProcessor, SampleContractProcessor) is True
444
445    def test_getEntry(self):
446        assert self.processor.getEntry(
447            dict(customer_id='ID_NONE', contract_id='nonsense'), self.app) is None
448        assert self.processor.getEntry(
449            dict(customer_id=self.customer.customer_id, contract_id='CON1'),
450            self.app) is self.customer['contracts']['CON1']
451
452    def test_delEntry(self):
453        assert self.processor.getEntry(
454            dict(customer_id=self.customer.customer_id, contract_id='CON1'),
455            self.app) is self.customer['contracts']['CON1']
456        self.assertEqual(len(self.customer['contracts'].keys()),1)
457        self.processor.delEntry(
458            dict(customer_id=self.customer.customer_id, contract_id='CON1'),
459            self.app)
460        assert self.processor.getEntry(
461            dict(customer_id=self.customer.customer_id, contract_id='CON1'),
462            self.app) is None
463        self.assertEqual(len(self.customer['contracts'].keys()),0)
464
465    def test_addEntry(self):
466        self.assertEqual(len(self.customer['contracts'].keys()),1)
467        contract1 = createObject(u'waeup.SampleContract')
468        contract1.contract_id = u'CON2'
469        self.processor.addEntry(
470            contract1, dict(customer_id=self.customer.customer_id, contract_id='CON2'),
471            self.app)
472        self.assertEqual(len(self.customer['contracts'].keys()),2)
473        self.assertEqual(self.customer['contracts']['CON2'].contract_id, 'CON2')
474
475    def test_checkConversion(self):
476        errs, inv_errs, conv_dict = self.processor.checkConversion(
477            dict(contract_id='CON3', class_name='SampleContract',
478                 document_object='DOC1', product_object='SAM'))
479        self.assertEqual(len(errs),0)
480        errs, inv_errs, conv_dict = self.processor.checkConversion(
481            dict(contract_id='id with spaces', class_name='SampleContract'))
482        self.assertEqual(len(errs),1)
483        errs, inv_errs, conv_dict = self.processor.checkConversion(
484            dict(contract_id='CON3', class_name='WrongContract'))
485        self.assertEqual(len(errs),1)
486
487    def test_import(self):
488        num, num_warns, fin_file, fail_file = self.processor.doImport(
489            self.csv_file, CONTRACT_HEADER_FIELDS,'create')
490        self.assertEqual(num_warns,3)
491        # contract_id must be unique
492        fail_file = open(fail_file).read()
493        self.assertEqual(fail_file,
494            'reg_number,contract_id,class_name,product_object,tc_dict,document_object,product_options,--ERRORS--\r\n'
495            '2,c5,SampleContract,SAM,{},DOC1,[],This object already exists. Skipping.\r\n'
496            '3,c6,SampleContract,SAM,{},NONEXISTENT,[],document_object: Invalid value\r\n'
497            '3,c7,SampleContract,NONEXISTENT,{},DOC1,[],product_object: Invalid value\r\n'
498            )
499        contract = self.processor.getEntry(dict(reg_number='1',
500            contract_id='c3'), self.app)
501        self.assertEqual(
502            self.app['customers']['X666666']['contracts']['c3'],
503            contract)
504        self.assertEqual(contract.product_options[0].title, 'Base Fee')
505        self.assertEqual(contract.product_options[0].fee, Decimal('800.6') )
506        self.assertEqual(contract.product_options[0].currency, 'USD')
507        self.assertEqual(contract.contract_id, 'c3')
508        self.assertEqual(contract.tc_dict, u"{'en':'Hello World'}")
509        contract = self.processor.getEntry(dict(reg_number='3',
510            contract_id='c5'), self.app)
511        shutil.rmtree(os.path.dirname(fin_file))
512        logcontent = open(self.logfile).read()
513        # Logging message from updateEntry
514        # Fetch uuid
515        conid = [i for i in self.app['customers']['X666666']['contracts'].keys()
516            if len(i) > 10][0]
517        self.assertTrue(
518            'INFO - system - Customer Sample Contract Processor - sample_contract_data - '
519            'X666666 - %s - updated: product_object=SAM, tc_dict={}, document_object=DOC1, '
520            'product_options=[]' % conid
521            in logcontent)
522
523    def test_import_update(self):
524        # We perform the same import twice,
525        # the second time in update mode. The number
526        # of warnings must be the same.
527        num, num_warns, fin_file, fail_file = self.processor.doImport(
528            self.csv_file, CONTRACT_HEADER_FIELDS,'create')
529        shutil.rmtree(os.path.dirname(fin_file))
530        num, num_warns, fin_file, fail_file = self.processor.doImport(
531            self.csv_file, CONTRACT_HEADER_FIELDS,'update')
532        # There are one record without contract_id and two
533        # records with wrong object identifier.
534        self.assertEqual(num_warns,3)
535        shutil.rmtree(os.path.dirname(fin_file))
536
537    def test_import_remove(self):
538        # We perform the same import twice,
539        # the second time in remove mode. The number
540        # of warnings must be the same.
541        num, num_warns, fin_file, fail_file = self.processor.doImport(
542            self.csv_file, CONTRACT_HEADER_FIELDS,'create')
543        shutil.rmtree(os.path.dirname(fin_file))
544        num, num_warns, fin_file, fail_file = self.processor.doImport(
545            self.csv_file, CONTRACT_HEADER_FIELDS,'remove')
546        # There are one record without contract_id, one duplicate and two
547        # records with wrong object identifier.
548        self.assertEqual(num_warns,4)
549        shutil.rmtree(os.path.dirname(fin_file))
550        logcontent = open(self.logfile).read()
551        self.assertTrue(
552            'INFO - system - K1000001 - Contract removed: c5'
553            in logcontent)
Note: See TracBrowser for help on using the repository browser.