source: main/waeup.ikoba/trunk/src/waeup/ikoba/customers/tests/test_batching.py @ 12333

Last change on this file since 12333 was 12333, checked in by Henrik Bettermann, 10 years ago

Add interface for contract batch processor.

  • Property svn:keywords set to Id
File size: 23.5 KB
Line 
1# -*- coding: utf-8 -*-
2## $Id: test_batching.py 12333 2014-12-29 06:10:50Z henrik $
3##
4## Copyright (C) 2014 Uli Fouquet & Henrik Bettermann
5## This program is free software; you can redistribute it and/or modify
6## it under the terms of the GNU General Public License as published by
7## the Free Software Foundation; either version 2 of the License, or
8## (at your option) any later version.
9##
10## This program is distributed in the hope that it will be useful,
11## but WITHOUT ANY WARRANTY; without even the implied warranty of
12## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13## GNU General Public License for more details.
14##
15## You should have received a copy of the GNU General Public License
16## along with this program; if not, write to the Free Software
17## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18##
19"""Unit tests for customer-related data processors.
20"""
21import os
22import shutil
23import tempfile
24import unittest
25import datetime
26import grok
27from decimal import Decimal
28from time import time
29from zope.event import notify
30from zope.component import createObject, queryUtility
31from zope.component.hooks import setSite, clearSite
32from zope.catalog.interfaces import ICatalog
33from zope.interface.verify import verifyClass, verifyObject
34from hurry.workflow.interfaces import IWorkflowState
35
36from waeup.ikoba.app import Company
37from waeup.ikoba.interfaces import (
38    IBatchProcessor, FatalCSVError, IUserAccount, VERIFIED)
39from waeup.ikoba.customers.batching import (
40    CustomerProcessor, CustomerSampleDocumentProcessor, SampleContractProcessor)
41from waeup.ikoba.customers.customer import Customer
42from waeup.ikoba.customers.documents import CustomerSampleDocument
43from waeup.ikoba.products.productoptions import ProductOption
44from waeup.ikoba.testing import FunctionalLayer, FunctionalTestCase
45
46CUSTOMER_SAMPLE_DATA = open(
47    os.path.join(os.path.dirname(__file__), 'sample_customer_data.csv'),
48    'rb').read()
49
50CUSTOMER_HEADER_FIELDS = CUSTOMER_SAMPLE_DATA.split(
51    '\n')[0].split(',')
52
53CUSTOMER_SAMPLE_DATA_UPDATE = open(
54    os.path.join(os.path.dirname(__file__), 'sample_customer_data_update.csv'),
55    'rb').read()
56
57CUSTOMER_HEADER_FIELDS_UPDATE = CUSTOMER_SAMPLE_DATA_UPDATE.split(
58    '\n')[0].split(',')
59
60CUSTOMER_SAMPLE_DATA_DUPLICATES = open(
61    os.path.join(os.path.dirname(__file__),
62                 'sample_customer_data_duplicates.csv'),
63    'rb').read()
64
65CUSTOMER_HEADER_FIELDS_DUPLICATES = CUSTOMER_SAMPLE_DATA_DUPLICATES.split(
66    '\n')[0].split(',')
67
68DOCUMENT_SAMPLE_DATA = open(
69    os.path.join(os.path.dirname(__file__), 'sample_document_data.csv'),
70    'rb').read()
71
72DOCUMENT_HEADER_FIELDS = DOCUMENT_SAMPLE_DATA.split(
73    '\n')[0].split(',')
74
75CONTRACT_SAMPLE_DATA = open(
76    os.path.join(os.path.dirname(__file__), 'sample_contract_data.csv'),
77    'rb').read()
78
79CONTRACT_HEADER_FIELDS = CONTRACT_SAMPLE_DATA.split(
80    '\n')[0].split(',')
81
82class CustomerImportExportSetup(FunctionalTestCase):
83
84    layer = FunctionalLayer
85
86    def setUp(self):
87        super(CustomerImportExportSetup, self).setUp()
88        self.dc_root = tempfile.mkdtemp()
89        self.workdir = tempfile.mkdtemp()
90        app = Company()
91        app['datacenter'].setStoragePath(self.dc_root)
92        self.getRootFolder()['app'] = app
93        self.app = self.getRootFolder()['app']
94        setSite(app)
95        self.logfile = os.path.join(
96            self.app['datacenter'].storage, 'logs', 'customers.log')
97
98        #Add some products
99        self.product = createObject('waeup.Product')
100        self.product.product_id = u'SAM'
101        self.product.title = u'Our Samle Product'
102        self.product.contract_category = u'sample'
103        option = ProductOption()
104        option.title = u'Base Fee'
105        option.fee = Decimal('800.6')
106        option.currency = 'usd'
107        self.product.options = [option,]
108        self.app['products'].addProduct(self.product)
109        return
110
111    def tearDown(self):
112        super(CustomerImportExportSetup, self).tearDown()
113        shutil.rmtree(self.workdir)
114        shutil.rmtree(self.dc_root)
115        clearSite()
116        return
117
118    def setup_for_export(self):
119        customer = Customer()
120        customer.customer_id = u'A111111'
121        self.app['customers'][customer.customer_id] = self.customer = customer
122        self.outfile = os.path.join(self.workdir, 'myoutput.csv')
123        return
124
125    def setup_customer(self, customer):
126        customer.matric_number = u'234'
127        customer.perm_address = u'Customerroad 21\nLagos 123456\n'
128        customer.reg_number = u'123'
129        customer.firstname = u'Anna'
130        customer.lastname = u'Tester'
131        customer.middlename = u'M.'
132        customer.date_of_birth = datetime.date(1981, 2, 4)
133        #customer.sex = 'f'
134        customer.email = 'anna@sample.com'
135        customer.phone = u'+234-123-12345'
136        customer.notice = u'Some notice\nin lines.'
137        customer.nationality = u'NG'
138        # Add document
139        document = createObject('waeup.CustomerSampleDocument')
140        document.title = u'My Document'
141        document.document_id = u'DOC1'
142        customer['documents'].addDocument(document)
143        IWorkflowState(document).setState(VERIFIED)
144        self.document = document
145        # Add contract
146        contract = createObject('waeup.SampleContract')
147        contract.title = u'My Contract'
148        contract.contract_id = u'CON1'
149        contract.product_object = self.product
150        contract.document_object = self.document
151        contract.product_options = self.product.options
152        customer['contracts'].addContract(contract)
153        self.contract = contract
154        return customer
155
156class CustomerProcessorTest(CustomerImportExportSetup):
157
158    layer = FunctionalLayer
159
160    def setUp(self):
161        super(CustomerProcessorTest, self).setUp()
162
163        # Add customer
164        customer = Customer()
165        self.app['customers'].addCustomer(customer)
166        customer = self.setup_customer(customer)
167        notify(grok.ObjectModifiedEvent(customer))
168        self.customer = self.app['customers'][customer.customer_id]
169
170        self.processor = CustomerProcessor()
171        self.csv_file = os.path.join(self.workdir, 'sample_customer_data.csv')
172        self.csv_file_update = os.path.join(
173            self.workdir, 'sample_customer_data_update.csv')
174        self.csv_file_duplicates = os.path.join(
175            self.workdir, 'sample_customer_data_duplicates.csv')
176        open(self.csv_file, 'wb').write(CUSTOMER_SAMPLE_DATA)
177        open(self.csv_file_update, 'wb').write(CUSTOMER_SAMPLE_DATA_UPDATE)
178        open(self.csv_file_duplicates, 'wb').write(CUSTOMER_SAMPLE_DATA_DUPLICATES)
179
180    def test_interface(self):
181        # Make sure we fulfill the interface contracts.
182        assert verifyObject(IBatchProcessor, self.processor) is True
183        assert verifyClass(
184            IBatchProcessor, CustomerProcessor) is True
185
186    def test_parentsExist(self):
187        self.assertFalse(self.processor.parentsExist(None, dict()))
188        self.assertTrue(self.processor.parentsExist(None, self.app))
189
190    def test_entryExists(self):
191        assert self.processor.entryExists(
192            dict(customer_id='ID_NONE'), self.app) is False
193        assert self.processor.entryExists(
194            dict(reg_number='123'), self.app) is True
195
196    def test_getParent(self):
197        parent = self.processor.getParent(None, self.app)
198        assert parent is self.app['customers']
199
200    def test_getEntry(self):
201        assert self.processor.getEntry(
202            dict(customer_id='ID_NONE'), self.app) is None
203        assert self.processor.getEntry(
204            dict(customer_id=self.customer.customer_id), self.app) is self.customer
205
206    def test_addEntry(self):
207        new_customer = Customer()
208        self.processor.addEntry(
209            new_customer, dict(), self.app)
210        assert len(self.app['customers'].keys()) == 2
211
212    def test_checkConversion(self):
213        # Make sure we can check conversions and that the cust_id
214        # counter is not raised during such checks.
215        initial_cust_id = self.app['customers']._curr_cust_id
216        errs, inv_errs, conv_dict = self.processor.checkConversion(
217            dict(reg_number='1', state='approved'))
218        self.assertEqual(len(errs),0)
219        # Empty state is allowed
220        errs, inv_errs, conv_dict = self.processor.checkConversion(
221            dict(reg_number='1', state=''))
222        self.assertEqual(len(errs),0)
223        #self.assertTrue(('state', 'no value provided') in errs)
224        errs, inv_errs, conv_dict = self.processor.checkConversion(
225            dict(reg_number='1', state='nonsense'))
226        self.assertEqual(len(errs),1)
227        self.assertTrue(('state', 'not allowed') in errs)
228        new_cust_id = self.app['customers']._curr_cust_id
229        self.assertEqual(initial_cust_id, new_cust_id)
230        return
231
232    def test_checkUpdateRequirements(self):
233        # Make sure that pg customers can't be updated with wrong transition.
234        err = self.processor.checkUpdateRequirements(self.customer,
235            dict(reg_number='1', state='returning'), self.app)
236        self.assertTrue(err is None)
237
238    def test_delEntry(self):
239        assert self.customer.customer_id in self.app['customers'].keys()
240        self.processor.delEntry(
241            dict(reg_number=self.customer.reg_number), self.app)
242        assert self.customer.customer_id not in self.app['customers'].keys()
243
244    def test_import(self):
245        self.assertEqual(self.app['customers']._curr_cust_id, 1000001)
246        num, num_warns, fin_file, fail_file = self.processor.doImport(
247            self.csv_file, CUSTOMER_HEADER_FIELDS)
248        self.assertEqual(num_warns,0)
249        self.assertEqual(len(self.app['customers']), 10)
250        self.assertEqual(self.app['customers']['X666666'].reg_number,'1')
251        self.assertEqual(
252            self.app['customers']['X666666'].state, 'requested')
253        # Two new customer_ids have been created.
254        self.assertEqual(self.app['customers']._curr_cust_id, 1000003)
255        shutil.rmtree(os.path.dirname(fin_file))
256
257    def test_import_update(self):
258        num, num_warns, fin_file, fail_file = self.processor.doImport(
259            self.csv_file, CUSTOMER_HEADER_FIELDS)
260        shutil.rmtree(os.path.dirname(fin_file))
261        num, num_warns, fin_file, fail_file = self.processor.doImport(
262            self.csv_file_update, CUSTOMER_HEADER_FIELDS_UPDATE, 'update')
263        self.assertEqual(num_warns,0)
264        # state has changed
265        self.assertEqual(self.app['customers']['X666666'].state,'approved')
266        # state has not changed
267        self.assertEqual(self.app['customers']['Y777777'].state, 'requested')
268        shutil.rmtree(os.path.dirname(fin_file))
269
270    def test_import_duplicate_data(self):
271        num, num_warns, fin_file, fail_file = self.processor.doImport(
272            self.csv_file_duplicates, CUSTOMER_HEADER_FIELDS_DUPLICATES)
273        content = open(fail_file).read()
274        self.assertEqual(num_warns,2)
275        self.assertEqual(
276            content,
277            'reg_number,password,firstname,lastname,sex,phone,state,date_of_birth,customer_id,email,--ERRORS--\r\n'
278            '1,,Frank,Meyer,m,1234,,1990-01-06,F123456,aa@aa.ng,reg_number: Invalid input\r\n'
279            '3,,Uli,Schulz,m,1234,,1990-01-07,A123456,aa@aa.ng,This object already exists. Skipping.\r\n'
280
281            )
282        shutil.rmtree(os.path.dirname(fin_file))
283
284class CustomerSampleDocumentProcessorTest(CustomerImportExportSetup):
285
286    def setUp(self):
287        super(CustomerSampleDocumentProcessorTest, self).setUp()
288
289        customer = Customer()
290        self.app['customers'].addCustomer(customer)
291        customer = self.setup_customer(customer)
292        notify(grok.ObjectModifiedEvent(customer))
293        self.customer = self.app['customers'][customer.customer_id]
294
295        # Import customers with subobjects
296        customer_file = os.path.join(self.workdir, 'sample_customer_data.csv')
297        open(customer_file, 'wb').write(CUSTOMER_SAMPLE_DATA)
298        num, num_warns, fin_file, fail_file = CustomerProcessor().doImport(
299            customer_file, CUSTOMER_HEADER_FIELDS)
300        shutil.rmtree(os.path.dirname(fin_file))
301
302        self.processor = CustomerSampleDocumentProcessor()
303        self.csv_file = os.path.join(
304            self.workdir, 'sample_document_data.csv')
305        open(self.csv_file, 'wb').write(DOCUMENT_SAMPLE_DATA)
306
307    def test_interface(self):
308        # Make sure we fulfill the interface contracts.
309        assert verifyObject(IBatchProcessor, self.processor) is True
310        assert verifyClass(
311            IBatchProcessor, CustomerSampleDocumentProcessor) is True
312
313    def test_getEntry(self):
314        assert self.processor.getEntry(
315            dict(customer_id='ID_NONE', document_id='nonsense'), self.app) is None
316        assert self.processor.getEntry(
317            dict(customer_id=self.customer.customer_id, document_id='DOC1'),
318            self.app) is self.customer['documents']['DOC1']
319
320    def test_delEntry(self):
321        assert self.processor.getEntry(
322            dict(customer_id=self.customer.customer_id, document_id='DOC1'),
323            self.app) is self.customer['documents']['DOC1']
324        self.assertEqual(len(self.customer['documents'].keys()),1)
325        self.processor.delEntry(
326            dict(customer_id=self.customer.customer_id, document_id='DOC1'),
327            self.app)
328        assert self.processor.getEntry(
329            dict(customer_id=self.customer.customer_id, document_id='DOC1'),
330            self.app) is None
331        self.assertEqual(len(self.customer['documents'].keys()),0)
332
333    def test_addEntry(self):
334        self.assertEqual(len(self.customer['documents'].keys()),1)
335        document1 = createObject(u'waeup.CustomerSampleDocument')
336        document1.document_id = u'd234'
337        self.processor.addEntry(
338            document1, dict(customer_id=self.customer.customer_id, document_id='d234'),
339            self.app)
340        self.assertEqual(len(self.customer['documents'].keys()),2)
341        self.assertEqual(self.customer['documents']['d234'].document_id, 'd234')
342
343    def test_checkConversion(self):
344        errs, inv_errs, conv_dict = self.processor.checkConversion(
345            dict(document_id='DOC', class_name='CustomerSampleDocument'))
346        self.assertEqual(len(errs),0)
347        errs, inv_errs, conv_dict = self.processor.checkConversion(
348            dict(document_id='id with spaces', class_name='CustomerSampleDocument'))
349        self.assertEqual(len(errs),1)
350        errs, inv_errs, conv_dict = self.processor.checkConversion(
351            dict(document_id='DOC', class_name='WrongDocument'))
352        self.assertEqual(len(errs),1)
353
354    def test_import(self):
355        num, num_warns, fin_file, fail_file = self.processor.doImport(
356            self.csv_file, DOCUMENT_HEADER_FIELDS,'create')
357        fail_file = open(fail_file).read()
358        self.assertEqual(num_warns,1)
359        # document_id must be unique
360        self.assertEqual(fail_file,
361            'class_name,reg_number,document_id,title,--ERRORS--\r\n'
362            'CustomerSampleDocument,2,d5,My stolen doc,'
363            'This object already exists. Skipping.\r\n')
364        document = self.processor.getEntry(dict(reg_number='1',
365            document_id='d3'), self.app)
366        self.assertEqual(
367            self.app['customers']['X666666']['documents']['d3'],
368            document)
369        self.assertEqual(document.document_id, 'd3')
370        document = self.processor.getEntry(dict(reg_number='3',
371            document_id='d5'), self.app)
372        shutil.rmtree(os.path.dirname(fin_file))
373        logcontent = open(self.logfile).read()
374        # Fetch uuid
375        docid = [i for i in self.app['customers']['X666666']['documents'].keys()
376            if len(i) > 10][0]
377         # Logging message from updateEntry
378        self.assertTrue(
379            'INFO - system - Customer Sample Document Processor - '
380            'sample_document_data - X666666 - %s - updated: title=My 4th doc'
381            % docid in logcontent)
382
383    def test_import_update(self):
384        # We perform the same import twice,
385        # the second time in update mode. The number
386        # of warnings must be the same.
387        num, num_warns, fin_file, fail_file = self.processor.doImport(
388            self.csv_file, DOCUMENT_HEADER_FIELDS,'create')
389        shutil.rmtree(os.path.dirname(fin_file))
390        num, num_warns, fin_file, fail_file = self.processor.doImport(
391            self.csv_file, DOCUMENT_HEADER_FIELDS,'update')
392        # There is one record without document_id
393        self.assertEqual(num_warns,1)
394        shutil.rmtree(os.path.dirname(fin_file))
395
396    def test_import_remove(self):
397        # We perform the same import twice,
398        # the second time in remove mode. The number
399        # of warnings must be the same.
400        num, num_warns, fin_file, fail_file = self.processor.doImport(
401            self.csv_file, DOCUMENT_HEADER_FIELDS,'create')
402        shutil.rmtree(os.path.dirname(fin_file))
403        num, num_warns, fin_file, fail_file = self.processor.doImport(
404            self.csv_file, DOCUMENT_HEADER_FIELDS,'remove')
405        # There is one record without document_id and one duplicate
406        self.assertEqual(num_warns,2)
407        shutil.rmtree(os.path.dirname(fin_file))
408        logcontent = open(self.logfile).read()
409        self.assertTrue(
410            'INFO - system - K1000001 - Document removed: d5'
411            in logcontent)
412
413
414class SampleContractProcessorTest(CustomerImportExportSetup):
415
416    def setUp(self):
417        super(SampleContractProcessorTest, self).setUp()
418
419        customer = Customer()
420        self.app['customers'].addCustomer(customer)
421        customer = self.setup_customer(customer)
422        notify(grok.ObjectModifiedEvent(customer))
423        self.customer = self.app['customers'][customer.customer_id]
424
425        # Import customers with subobjects
426        customer_file = os.path.join(self.workdir, 'sample_customer_data.csv')
427        open(customer_file, 'wb').write(CUSTOMER_SAMPLE_DATA)
428        num, num_warns, fin_file, fail_file = CustomerProcessor().doImport(
429            customer_file, CUSTOMER_HEADER_FIELDS)
430        shutil.rmtree(os.path.dirname(fin_file))
431
432        self.processor = SampleContractProcessor()
433        self.csv_file = os.path.join(
434            self.workdir, 'sample_contract_data.csv')
435        open(self.csv_file, 'wb').write(CONTRACT_SAMPLE_DATA)
436
437    def test_interface(self):
438        # Make sure we fulfill the interface contracts.
439        assert verifyObject(IBatchProcessor, self.processor) is True
440        assert verifyClass(
441            IBatchProcessor, SampleContractProcessor) is True
442
443    def test_getEntry(self):
444        assert self.processor.getEntry(
445            dict(customer_id='ID_NONE', contract_id='nonsense'), self.app) is None
446        assert self.processor.getEntry(
447            dict(customer_id=self.customer.customer_id, contract_id='CON1'),
448            self.app) is self.customer['contracts']['CON1']
449
450    def test_delEntry(self):
451        assert self.processor.getEntry(
452            dict(customer_id=self.customer.customer_id, contract_id='CON1'),
453            self.app) is self.customer['contracts']['CON1']
454        self.assertEqual(len(self.customer['contracts'].keys()),1)
455        self.processor.delEntry(
456            dict(customer_id=self.customer.customer_id, contract_id='CON1'),
457            self.app)
458        assert self.processor.getEntry(
459            dict(customer_id=self.customer.customer_id, contract_id='CON1'),
460            self.app) is None
461        self.assertEqual(len(self.customer['contracts'].keys()),0)
462
463    def test_addEntry(self):
464        self.assertEqual(len(self.customer['contracts'].keys()),1)
465        contract1 = createObject(u'waeup.SampleContract')
466        contract1.contract_id = u'CON2'
467        self.processor.addEntry(
468            contract1, dict(customer_id=self.customer.customer_id, contract_id='CON2'),
469            self.app)
470        self.assertEqual(len(self.customer['contracts'].keys()),2)
471        self.assertEqual(self.customer['contracts']['CON2'].contract_id, 'CON2')
472
473    def test_checkConversion(self):
474        errs, inv_errs, conv_dict = self.processor.checkConversion(
475            dict(contract_id='CON3', class_name='SampleContract',
476                 document_object='DOC1', product_object='SAM'))
477        self.assertEqual(len(errs),0)
478        errs, inv_errs, conv_dict = self.processor.checkConversion(
479            dict(contract_id='id with spaces', class_name='SampleContract'))
480        self.assertEqual(len(errs),1)
481        errs, inv_errs, conv_dict = self.processor.checkConversion(
482            dict(contract_id='CON3', class_name='WrongContract'))
483        self.assertEqual(len(errs),1)
484
485    def test_import(self):
486        num, num_warns, fin_file, fail_file = self.processor.doImport(
487            self.csv_file, CONTRACT_HEADER_FIELDS,'create')
488        self.assertEqual(num_warns,3)
489        # contract_id must be unique
490        fail_file = open(fail_file).read()
491        self.assertEqual(fail_file,
492            'reg_number,contract_id,title,class_name,product_object,document_object,product_options,--ERRORS--\r\n'
493            '2,c5,My stolen contract,SampleContract,SAM,DOC1,[],This object already exists. Skipping.\r\n'
494            '3,c6,My 5th contract,SampleContract,SAM,NONEXISTENT,[],document_object: Invalid value\r\n'
495            '3,c7,My sixt contract,SampleContract,NONEXISTENT,DOC1,[],product_object: Invalid value\r\n'
496            )
497        contract = self.processor.getEntry(dict(reg_number='1',
498            contract_id='c3'), self.app)
499        self.assertEqual(
500            self.app['customers']['X666666']['contracts']['c3'],
501            contract)
502        self.assertEqual(contract.product_options[0].title, 'Base Fee')
503        self.assertEqual(contract.product_options[0].fee, Decimal('800.6') )
504        self.assertEqual(contract.product_options[0].currency, 'usd')
505        self.assertEqual(contract.contract_id, 'c3')
506        contract = self.processor.getEntry(dict(reg_number='3',
507            contract_id='c5'), self.app)
508        shutil.rmtree(os.path.dirname(fin_file))
509        logcontent = open(self.logfile).read()
510        # Logging message from updateEntry
511        # Fetch uuid
512        conid = [i for i in self.app['customers']['X666666']['contracts'].keys()
513            if len(i) > 10][0]
514        self.assertTrue(
515            'INFO - system - Customer Sample Contract Processor - sample_contract_data - '
516            'X666666 - %s - updated: title=My 4th contract, '
517            'product_object=SAM, document_object=DOC1' % conid
518            in logcontent)
519
520    def test_import_update(self):
521        # We perform the same import twice,
522        # the second time in update mode. The number
523        # of warnings must be the same.
524        num, num_warns, fin_file, fail_file = self.processor.doImport(
525            self.csv_file, CONTRACT_HEADER_FIELDS,'create')
526        shutil.rmtree(os.path.dirname(fin_file))
527        num, num_warns, fin_file, fail_file = self.processor.doImport(
528            self.csv_file, CONTRACT_HEADER_FIELDS,'update')
529        # There are one record without contract_id and two
530        # records with wrong object identifier.
531        self.assertEqual(num_warns,3)
532        shutil.rmtree(os.path.dirname(fin_file))
533
534    def test_import_remove(self):
535        # We perform the same import twice,
536        # the second time in remove mode. The number
537        # of warnings must be the same.
538        num, num_warns, fin_file, fail_file = self.processor.doImport(
539            self.csv_file, CONTRACT_HEADER_FIELDS,'create')
540        shutil.rmtree(os.path.dirname(fin_file))
541        num, num_warns, fin_file, fail_file = self.processor.doImport(
542            self.csv_file, CONTRACT_HEADER_FIELDS,'remove')
543        # There are one record without contract_id, one duplicate and two
544        # records with wrong object identifier.
545        self.assertEqual(num_warns,4)
546        shutil.rmtree(os.path.dirname(fin_file))
547        logcontent = open(self.logfile).read()
548        self.assertTrue(
549            'INFO - system - K1000001 - Contract removed: c5'
550            in logcontent)
Note: See TracBrowser for help on using the repository browser.