source: main/waeup.ikoba/trunk/src/waeup/ikoba/customers/tests/test_batching.py @ 12345

Last change on this file since 12345 was 12342, checked in by Henrik Bettermann, 10 years ago

Fix currency keys.

  • Property svn:keywords set to Id
File size: 23.4 KB
Line 
1# -*- coding: utf-8 -*-
2## $Id: test_batching.py 12342 2014-12-30 10:19:47Z henrik $
3##
4## Copyright (C) 2014 Uli Fouquet & Henrik Bettermann
5## This program is free software; you can redistribute it and/or modify
6## it under the terms of the GNU General Public License as published by
7## the Free Software Foundation; either version 2 of the License, or
8## (at your option) any later version.
9##
10## This program is distributed in the hope that it will be useful,
11## but WITHOUT ANY WARRANTY; without even the implied warranty of
12## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13## GNU General Public License for more details.
14##
15## You should have received a copy of the GNU General Public License
16## along with this program; if not, write to the Free Software
17## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18##
19"""Unit tests for customer-related data processors.
20"""
21import os
22import shutil
23import tempfile
24import unittest
25import datetime
26import grok
27from decimal import Decimal
28from time import time
29from zope.event import notify
30from zope.component import createObject, queryUtility
31from zope.component.hooks import setSite, clearSite
32from zope.catalog.interfaces import ICatalog
33from zope.interface.verify import verifyClass, verifyObject
34from hurry.workflow.interfaces import IWorkflowState
35
36from waeup.ikoba.app import Company
37from waeup.ikoba.interfaces import (
38    IBatchProcessor, FatalCSVError, IUserAccount, VERIFIED)
39from waeup.ikoba.customers.batching import (
40    CustomerProcessor, CustomerSampleDocumentProcessor, SampleContractProcessor)
41from waeup.ikoba.customers.customer import Customer
42from waeup.ikoba.customers.documents import CustomerSampleDocument
43from waeup.ikoba.products.productoptions import ProductOption
44from waeup.ikoba.testing import FunctionalLayer, FunctionalTestCase
45
46CUSTOMER_SAMPLE_DATA = open(
47    os.path.join(os.path.dirname(__file__), 'sample_customer_data.csv'),
48    'rb').read()
49
50CUSTOMER_HEADER_FIELDS = CUSTOMER_SAMPLE_DATA.split(
51    '\n')[0].split(',')
52
53CUSTOMER_SAMPLE_DATA_UPDATE = open(
54    os.path.join(os.path.dirname(__file__), 'sample_customer_data_update.csv'),
55    'rb').read()
56
57CUSTOMER_HEADER_FIELDS_UPDATE = CUSTOMER_SAMPLE_DATA_UPDATE.split(
58    '\n')[0].split(',')
59
60CUSTOMER_SAMPLE_DATA_DUPLICATES = open(
61    os.path.join(os.path.dirname(__file__),
62                 'sample_customer_data_duplicates.csv'),
63    'rb').read()
64
65CUSTOMER_HEADER_FIELDS_DUPLICATES = CUSTOMER_SAMPLE_DATA_DUPLICATES.split(
66    '\n')[0].split(',')
67
68DOCUMENT_SAMPLE_DATA = open(
69    os.path.join(os.path.dirname(__file__), 'sample_document_data.csv'),
70    'rb').read()
71
72DOCUMENT_HEADER_FIELDS = DOCUMENT_SAMPLE_DATA.split(
73    '\n')[0].split(',')
74
75CONTRACT_SAMPLE_DATA = open(
76    os.path.join(os.path.dirname(__file__), 'sample_contract_data.csv'),
77    'rb').read()
78
79CONTRACT_HEADER_FIELDS = CONTRACT_SAMPLE_DATA.split(
80    '\n')[0].split(',')
81
82class CustomerImportExportSetup(FunctionalTestCase):
83
84    layer = FunctionalLayer
85
86    def setUp(self):
87        super(CustomerImportExportSetup, self).setUp()
88        self.dc_root = tempfile.mkdtemp()
89        self.workdir = tempfile.mkdtemp()
90        app = Company()
91        app['datacenter'].setStoragePath(self.dc_root)
92        self.getRootFolder()['app'] = app
93        self.app = self.getRootFolder()['app']
94        setSite(app)
95        self.logfile = os.path.join(
96            self.app['datacenter'].storage, 'logs', 'customers.log')
97
98        #Add some products
99        self.product = createObject('waeup.Product')
100        self.product.product_id = u'SAM'
101        self.product.title = u'Our Sample Product'
102        self.product.contract_category = u'sample'
103        option = ProductOption()
104        option.title = u'Base Fee'
105        option.fee = Decimal('800.6')
106        option.currency = 'USD'
107        self.product.options = [option,]
108        self.app['products'].addProduct(self.product)
109        return
110
111    def tearDown(self):
112        super(CustomerImportExportSetup, self).tearDown()
113        shutil.rmtree(self.workdir)
114        shutil.rmtree(self.dc_root)
115        clearSite()
116        return
117
118    def setup_for_export(self):
119        customer = Customer()
120        customer.customer_id = u'A111111'
121        self.app['customers'][customer.customer_id] = self.customer = customer
122        self.outfile = os.path.join(self.workdir, 'myoutput.csv')
123        return
124
125    def setup_customer(self, customer):
126        customer.matric_number = u'234'
127        customer.perm_address = u'Customerroad 21\nLagos 123456\n'
128        customer.reg_number = u'123'
129        customer.firstname = u'Anna'
130        customer.lastname = u'Tester'
131        customer.middlename = u'M.'
132        customer.date_of_birth = datetime.date(1981, 2, 4)
133        #customer.sex = 'f'
134        customer.email = 'anna@sample.com'
135        customer.phone = u'+234-123-12345'
136        customer.notice = u'Some notice\nin lines.'
137        customer.nationality = u'NG'
138        # Add document
139        document = createObject('waeup.CustomerSampleDocument')
140        document.title = u'My Document'
141        document.document_id = u'DOC1'
142        customer['documents'].addDocument(document)
143        IWorkflowState(document).setState(VERIFIED)
144        self.document = document
145        # Add contract
146        contract = createObject('waeup.SampleContract')
147        contract.contract_id = u'CON1'
148        contract.product_object = self.product
149        contract.document_object = self.document
150        contract.product_options = self.product.options
151        customer['contracts'].addContract(contract)
152        self.contract = contract
153        return customer
154
155class CustomerProcessorTest(CustomerImportExportSetup):
156
157    layer = FunctionalLayer
158
159    def setUp(self):
160        super(CustomerProcessorTest, self).setUp()
161
162        # Add customer
163        customer = Customer()
164        self.app['customers'].addCustomer(customer)
165        customer = self.setup_customer(customer)
166        notify(grok.ObjectModifiedEvent(customer))
167        self.customer = self.app['customers'][customer.customer_id]
168
169        self.processor = CustomerProcessor()
170        self.csv_file = os.path.join(self.workdir, 'sample_customer_data.csv')
171        self.csv_file_update = os.path.join(
172            self.workdir, 'sample_customer_data_update.csv')
173        self.csv_file_duplicates = os.path.join(
174            self.workdir, 'sample_customer_data_duplicates.csv')
175        open(self.csv_file, 'wb').write(CUSTOMER_SAMPLE_DATA)
176        open(self.csv_file_update, 'wb').write(CUSTOMER_SAMPLE_DATA_UPDATE)
177        open(self.csv_file_duplicates, 'wb').write(CUSTOMER_SAMPLE_DATA_DUPLICATES)
178
179    def test_interface(self):
180        # Make sure we fulfill the interface contracts.
181        assert verifyObject(IBatchProcessor, self.processor) is True
182        assert verifyClass(
183            IBatchProcessor, CustomerProcessor) is True
184
185    def test_parentsExist(self):
186        self.assertFalse(self.processor.parentsExist(None, dict()))
187        self.assertTrue(self.processor.parentsExist(None, self.app))
188
189    def test_entryExists(self):
190        assert self.processor.entryExists(
191            dict(customer_id='ID_NONE'), self.app) is False
192        assert self.processor.entryExists(
193            dict(reg_number='123'), self.app) is True
194
195    def test_getParent(self):
196        parent = self.processor.getParent(None, self.app)
197        assert parent is self.app['customers']
198
199    def test_getEntry(self):
200        assert self.processor.getEntry(
201            dict(customer_id='ID_NONE'), self.app) is None
202        assert self.processor.getEntry(
203            dict(customer_id=self.customer.customer_id), self.app) is self.customer
204
205    def test_addEntry(self):
206        new_customer = Customer()
207        self.processor.addEntry(
208            new_customer, dict(), self.app)
209        assert len(self.app['customers'].keys()) == 2
210
211    def test_checkConversion(self):
212        # Make sure we can check conversions and that the cust_id
213        # counter is not raised during such checks.
214        initial_cust_id = self.app['customers']._curr_cust_id
215        errs, inv_errs, conv_dict = self.processor.checkConversion(
216            dict(reg_number='1', state='approved'))
217        self.assertEqual(len(errs),0)
218        # Empty state is allowed
219        errs, inv_errs, conv_dict = self.processor.checkConversion(
220            dict(reg_number='1', state=''))
221        self.assertEqual(len(errs),0)
222        #self.assertTrue(('state', 'no value provided') in errs)
223        errs, inv_errs, conv_dict = self.processor.checkConversion(
224            dict(reg_number='1', state='nonsense'))
225        self.assertEqual(len(errs),1)
226        self.assertTrue(('state', 'not allowed') in errs)
227        new_cust_id = self.app['customers']._curr_cust_id
228        self.assertEqual(initial_cust_id, new_cust_id)
229        return
230
231    def test_checkUpdateRequirements(self):
232        # Make sure that pg customers can't be updated with wrong transition.
233        err = self.processor.checkUpdateRequirements(self.customer,
234            dict(reg_number='1', state='returning'), self.app)
235        self.assertTrue(err is None)
236
237    def test_delEntry(self):
238        assert self.customer.customer_id in self.app['customers'].keys()
239        self.processor.delEntry(
240            dict(reg_number=self.customer.reg_number), self.app)
241        assert self.customer.customer_id not in self.app['customers'].keys()
242
243    def test_import(self):
244        self.assertEqual(self.app['customers']._curr_cust_id, 1000001)
245        num, num_warns, fin_file, fail_file = self.processor.doImport(
246            self.csv_file, CUSTOMER_HEADER_FIELDS)
247        self.assertEqual(num_warns,0)
248        self.assertEqual(len(self.app['customers']), 10)
249        self.assertEqual(self.app['customers']['X666666'].reg_number,'1')
250        self.assertEqual(
251            self.app['customers']['X666666'].state, 'requested')
252        # Two new customer_ids have been created.
253        self.assertEqual(self.app['customers']._curr_cust_id, 1000003)
254        shutil.rmtree(os.path.dirname(fin_file))
255
256    def test_import_update(self):
257        num, num_warns, fin_file, fail_file = self.processor.doImport(
258            self.csv_file, CUSTOMER_HEADER_FIELDS)
259        shutil.rmtree(os.path.dirname(fin_file))
260        num, num_warns, fin_file, fail_file = self.processor.doImport(
261            self.csv_file_update, CUSTOMER_HEADER_FIELDS_UPDATE, 'update')
262        self.assertEqual(num_warns,0)
263        # state has changed
264        self.assertEqual(self.app['customers']['X666666'].state,'approved')
265        # state has not changed
266        self.assertEqual(self.app['customers']['Y777777'].state, 'requested')
267        shutil.rmtree(os.path.dirname(fin_file))
268
269    def test_import_duplicate_data(self):
270        num, num_warns, fin_file, fail_file = self.processor.doImport(
271            self.csv_file_duplicates, CUSTOMER_HEADER_FIELDS_DUPLICATES)
272        content = open(fail_file).read()
273        self.assertEqual(num_warns,2)
274        self.assertEqual(
275            content,
276            'reg_number,password,firstname,lastname,sex,phone,state,date_of_birth,customer_id,email,--ERRORS--\r\n'
277            '1,,Frank,Meyer,m,1234,,1990-01-06,F123456,aa@aa.ng,reg_number: Invalid input\r\n'
278            '3,,Uli,Schulz,m,1234,,1990-01-07,A123456,aa@aa.ng,This object already exists. Skipping.\r\n'
279
280            )
281        shutil.rmtree(os.path.dirname(fin_file))
282
283class CustomerSampleDocumentProcessorTest(CustomerImportExportSetup):
284
285    def setUp(self):
286        super(CustomerSampleDocumentProcessorTest, self).setUp()
287
288        customer = Customer()
289        self.app['customers'].addCustomer(customer)
290        customer = self.setup_customer(customer)
291        notify(grok.ObjectModifiedEvent(customer))
292        self.customer = self.app['customers'][customer.customer_id]
293
294        # Import customers with subobjects
295        customer_file = os.path.join(self.workdir, 'sample_customer_data.csv')
296        open(customer_file, 'wb').write(CUSTOMER_SAMPLE_DATA)
297        num, num_warns, fin_file, fail_file = CustomerProcessor().doImport(
298            customer_file, CUSTOMER_HEADER_FIELDS)
299        shutil.rmtree(os.path.dirname(fin_file))
300
301        self.processor = CustomerSampleDocumentProcessor()
302        self.csv_file = os.path.join(
303            self.workdir, 'sample_document_data.csv')
304        open(self.csv_file, 'wb').write(DOCUMENT_SAMPLE_DATA)
305
306    def test_interface(self):
307        # Make sure we fulfill the interface contracts.
308        assert verifyObject(IBatchProcessor, self.processor) is True
309        assert verifyClass(
310            IBatchProcessor, CustomerSampleDocumentProcessor) is True
311
312    def test_getEntry(self):
313        assert self.processor.getEntry(
314            dict(customer_id='ID_NONE', document_id='nonsense'), self.app) is None
315        assert self.processor.getEntry(
316            dict(customer_id=self.customer.customer_id, document_id='DOC1'),
317            self.app) is self.customer['documents']['DOC1']
318
319    def test_delEntry(self):
320        assert self.processor.getEntry(
321            dict(customer_id=self.customer.customer_id, document_id='DOC1'),
322            self.app) is self.customer['documents']['DOC1']
323        self.assertEqual(len(self.customer['documents'].keys()),1)
324        self.processor.delEntry(
325            dict(customer_id=self.customer.customer_id, document_id='DOC1'),
326            self.app)
327        assert self.processor.getEntry(
328            dict(customer_id=self.customer.customer_id, document_id='DOC1'),
329            self.app) is None
330        self.assertEqual(len(self.customer['documents'].keys()),0)
331
332    def test_addEntry(self):
333        self.assertEqual(len(self.customer['documents'].keys()),1)
334        document1 = createObject(u'waeup.CustomerSampleDocument')
335        document1.document_id = u'd234'
336        self.processor.addEntry(
337            document1, dict(customer_id=self.customer.customer_id, document_id='d234'),
338            self.app)
339        self.assertEqual(len(self.customer['documents'].keys()),2)
340        self.assertEqual(self.customer['documents']['d234'].document_id, 'd234')
341
342    def test_checkConversion(self):
343        errs, inv_errs, conv_dict = self.processor.checkConversion(
344            dict(document_id='DOC', class_name='CustomerSampleDocument'))
345        self.assertEqual(len(errs),0)
346        errs, inv_errs, conv_dict = self.processor.checkConversion(
347            dict(document_id='id with spaces', class_name='CustomerSampleDocument'))
348        self.assertEqual(len(errs),1)
349        errs, inv_errs, conv_dict = self.processor.checkConversion(
350            dict(document_id='DOC', class_name='WrongDocument'))
351        self.assertEqual(len(errs),1)
352
353    def test_import(self):
354        num, num_warns, fin_file, fail_file = self.processor.doImport(
355            self.csv_file, DOCUMENT_HEADER_FIELDS,'create')
356        fail_file = open(fail_file).read()
357        self.assertEqual(num_warns,1)
358        # document_id must be unique
359        self.assertEqual(fail_file,
360            'class_name,reg_number,document_id,title,--ERRORS--\r\n'
361            'CustomerSampleDocument,2,d5,My stolen doc,'
362            'This object already exists. Skipping.\r\n')
363        document = self.processor.getEntry(dict(reg_number='1',
364            document_id='d3'), self.app)
365        self.assertEqual(
366            self.app['customers']['X666666']['documents']['d3'],
367            document)
368        self.assertEqual(document.document_id, 'd3')
369        document = self.processor.getEntry(dict(reg_number='3',
370            document_id='d5'), self.app)
371        shutil.rmtree(os.path.dirname(fin_file))
372        logcontent = open(self.logfile).read()
373        # Fetch uuid
374        docid = [i for i in self.app['customers']['X666666']['documents'].keys()
375            if len(i) > 10][0]
376         # Logging message from updateEntry
377        self.assertTrue(
378            'INFO - system - Customer Sample Document Processor - '
379            'sample_document_data - X666666 - %s - updated: title=My 4th doc'
380            % docid in logcontent)
381
382    def test_import_update(self):
383        # We perform the same import twice,
384        # the second time in update mode. The number
385        # of warnings must be the same.
386        num, num_warns, fin_file, fail_file = self.processor.doImport(
387            self.csv_file, DOCUMENT_HEADER_FIELDS,'create')
388        shutil.rmtree(os.path.dirname(fin_file))
389        num, num_warns, fin_file, fail_file = self.processor.doImport(
390            self.csv_file, DOCUMENT_HEADER_FIELDS,'update')
391        # There is one record without document_id
392        self.assertEqual(num_warns,1)
393        shutil.rmtree(os.path.dirname(fin_file))
394
395    def test_import_remove(self):
396        # We perform the same import twice,
397        # the second time in remove mode. The number
398        # of warnings must be the same.
399        num, num_warns, fin_file, fail_file = self.processor.doImport(
400            self.csv_file, DOCUMENT_HEADER_FIELDS,'create')
401        shutil.rmtree(os.path.dirname(fin_file))
402        num, num_warns, fin_file, fail_file = self.processor.doImport(
403            self.csv_file, DOCUMENT_HEADER_FIELDS,'remove')
404        # There is one record without document_id and one duplicate
405        self.assertEqual(num_warns,2)
406        shutil.rmtree(os.path.dirname(fin_file))
407        logcontent = open(self.logfile).read()
408        self.assertTrue(
409            'INFO - system - K1000001 - Document removed: d5'
410            in logcontent)
411
412
413class SampleContractProcessorTest(CustomerImportExportSetup):
414
415    def setUp(self):
416        super(SampleContractProcessorTest, self).setUp()
417
418        customer = Customer()
419        self.app['customers'].addCustomer(customer)
420        customer = self.setup_customer(customer)
421        notify(grok.ObjectModifiedEvent(customer))
422        self.customer = self.app['customers'][customer.customer_id]
423
424        # Import customers with subobjects
425        customer_file = os.path.join(self.workdir, 'sample_customer_data.csv')
426        open(customer_file, 'wb').write(CUSTOMER_SAMPLE_DATA)
427        num, num_warns, fin_file, fail_file = CustomerProcessor().doImport(
428            customer_file, CUSTOMER_HEADER_FIELDS)
429        shutil.rmtree(os.path.dirname(fin_file))
430
431        self.processor = SampleContractProcessor()
432        self.csv_file = os.path.join(
433            self.workdir, 'sample_contract_data.csv')
434        open(self.csv_file, 'wb').write(CONTRACT_SAMPLE_DATA)
435
436    def test_interface(self):
437        # Make sure we fulfill the interface contracts.
438        assert verifyObject(IBatchProcessor, self.processor) is True
439        assert verifyClass(
440            IBatchProcessor, SampleContractProcessor) is True
441
442    def test_getEntry(self):
443        assert self.processor.getEntry(
444            dict(customer_id='ID_NONE', contract_id='nonsense'), self.app) is None
445        assert self.processor.getEntry(
446            dict(customer_id=self.customer.customer_id, contract_id='CON1'),
447            self.app) is self.customer['contracts']['CON1']
448
449    def test_delEntry(self):
450        assert self.processor.getEntry(
451            dict(customer_id=self.customer.customer_id, contract_id='CON1'),
452            self.app) is self.customer['contracts']['CON1']
453        self.assertEqual(len(self.customer['contracts'].keys()),1)
454        self.processor.delEntry(
455            dict(customer_id=self.customer.customer_id, contract_id='CON1'),
456            self.app)
457        assert self.processor.getEntry(
458            dict(customer_id=self.customer.customer_id, contract_id='CON1'),
459            self.app) is None
460        self.assertEqual(len(self.customer['contracts'].keys()),0)
461
462    def test_addEntry(self):
463        self.assertEqual(len(self.customer['contracts'].keys()),1)
464        contract1 = createObject(u'waeup.SampleContract')
465        contract1.contract_id = u'CON2'
466        self.processor.addEntry(
467            contract1, dict(customer_id=self.customer.customer_id, contract_id='CON2'),
468            self.app)
469        self.assertEqual(len(self.customer['contracts'].keys()),2)
470        self.assertEqual(self.customer['contracts']['CON2'].contract_id, 'CON2')
471
472    def test_checkConversion(self):
473        errs, inv_errs, conv_dict = self.processor.checkConversion(
474            dict(contract_id='CON3', class_name='SampleContract',
475                 document_object='DOC1', product_object='SAM'))
476        self.assertEqual(len(errs),0)
477        errs, inv_errs, conv_dict = self.processor.checkConversion(
478            dict(contract_id='id with spaces', class_name='SampleContract'))
479        self.assertEqual(len(errs),1)
480        errs, inv_errs, conv_dict = self.processor.checkConversion(
481            dict(contract_id='CON3', class_name='WrongContract'))
482        self.assertEqual(len(errs),1)
483
484    def test_import(self):
485        num, num_warns, fin_file, fail_file = self.processor.doImport(
486            self.csv_file, CONTRACT_HEADER_FIELDS,'create')
487        self.assertEqual(num_warns,3)
488        # contract_id must be unique
489        fail_file = open(fail_file).read()
490        self.assertEqual(fail_file,
491            'reg_number,contract_id,class_name,product_object,document_object,product_options,--ERRORS--\r\n'
492            '2,c5,SampleContract,SAM,DOC1,[],This object already exists. Skipping.\r\n'
493            '3,c6,SampleContract,SAM,NONEXISTENT,[],document_object: Invalid value\r\n'
494            '3,c7,SampleContract,NONEXISTENT,DOC1,[],product_object: Invalid value\r\n'
495            )
496        contract = self.processor.getEntry(dict(reg_number='1',
497            contract_id='c3'), self.app)
498        self.assertEqual(
499            self.app['customers']['X666666']['contracts']['c3'],
500            contract)
501        self.assertEqual(contract.product_options[0].title, 'Base Fee')
502        self.assertEqual(contract.product_options[0].fee, Decimal('800.6') )
503        self.assertEqual(contract.product_options[0].currency, 'USD')
504        self.assertEqual(contract.contract_id, 'c3')
505        contract = self.processor.getEntry(dict(reg_number='3',
506            contract_id='c5'), self.app)
507        shutil.rmtree(os.path.dirname(fin_file))
508        logcontent = open(self.logfile).read()
509        # Logging message from updateEntry
510        # Fetch uuid
511        conid = [i for i in self.app['customers']['X666666']['contracts'].keys()
512            if len(i) > 10][0]
513        self.assertTrue(
514            'INFO - system - Customer Sample Contract Processor - sample_contract_data - '
515            'X666666 - %s - updated: product_object=SAM, document_object=DOC1, '
516            'product_options=[]' % conid
517            in logcontent)
518
519    def test_import_update(self):
520        # We perform the same import twice,
521        # the second time in update mode. The number
522        # of warnings must be the same.
523        num, num_warns, fin_file, fail_file = self.processor.doImport(
524            self.csv_file, CONTRACT_HEADER_FIELDS,'create')
525        shutil.rmtree(os.path.dirname(fin_file))
526        num, num_warns, fin_file, fail_file = self.processor.doImport(
527            self.csv_file, CONTRACT_HEADER_FIELDS,'update')
528        # There are one record without contract_id and two
529        # records with wrong object identifier.
530        self.assertEqual(num_warns,3)
531        shutil.rmtree(os.path.dirname(fin_file))
532
533    def test_import_remove(self):
534        # We perform the same import twice,
535        # the second time in remove mode. The number
536        # of warnings must be the same.
537        num, num_warns, fin_file, fail_file = self.processor.doImport(
538            self.csv_file, CONTRACT_HEADER_FIELDS,'create')
539        shutil.rmtree(os.path.dirname(fin_file))
540        num, num_warns, fin_file, fail_file = self.processor.doImport(
541            self.csv_file, CONTRACT_HEADER_FIELDS,'remove')
542        # There are one record without contract_id, one duplicate and two
543        # records with wrong object identifier.
544        self.assertEqual(num_warns,4)
545        shutil.rmtree(os.path.dirname(fin_file))
546        logcontent = open(self.logfile).read()
547        self.assertTrue(
548            'INFO - system - K1000001 - Contract removed: c5'
549            in logcontent)
Note: See TracBrowser for help on using the repository browser.