source: main/waeup.ikoba/trunk/src/waeup/ikoba/customers/tests/test_batching.py @ 12253

Last change on this file since 12253 was 12250, checked in by Henrik Bettermann, 10 years ago

Some repairs of document and contract batch processors (work in progress!).

  • Property svn:keywords set to Id
File size: 23.3 KB
Line 
1# -*- coding: utf-8 -*-
2## $Id: test_batching.py 12250 2014-12-16 09:02:13Z henrik $
3##
4## Copyright (C) 2014 Uli Fouquet & Henrik Bettermann
5## This program is free software; you can redistribute it and/or modify
6## it under the terms of the GNU General Public License as published by
7## the Free Software Foundation; either version 2 of the License, or
8## (at your option) any later version.
9##
10## This program is distributed in the hope that it will be useful,
11## but WITHOUT ANY WARRANTY; without even the implied warranty of
12## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13## GNU General Public License for more details.
14##
15## You should have received a copy of the GNU General Public License
16## along with this program; if not, write to the Free Software
17## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18##
19"""Unit tests for customer-related data processors.
20"""
21import os
22import shutil
23import tempfile
24import unittest
25import datetime
26import grok
27from time import time
28from zope.event import notify
29from zope.component import createObject, queryUtility
30from zope.component.hooks import setSite, clearSite
31from zope.catalog.interfaces import ICatalog
32from zope.interface.verify import verifyClass, verifyObject
33from hurry.workflow.interfaces import IWorkflowState
34
35from waeup.ikoba.app import Company
36from waeup.ikoba.interfaces import (
37    IBatchProcessor, FatalCSVError, IUserAccount, VERIFIED)
38from waeup.ikoba.customers.batching import (
39    CustomerProcessor, CustomerDocumentProcessor, ContractProcessor)
40from waeup.ikoba.customers.customer import Customer
41from waeup.ikoba.customers.documents import CustomerSampleDocument
42from waeup.ikoba.testing import FunctionalLayer, FunctionalTestCase
43
44CUSTOMER_SAMPLE_DATA = open(
45    os.path.join(os.path.dirname(__file__), 'sample_customer_data.csv'),
46    'rb').read()
47
48CUSTOMER_HEADER_FIELDS = CUSTOMER_SAMPLE_DATA.split(
49    '\n')[0].split(',')
50
51CUSTOMER_SAMPLE_DATA_UPDATE = open(
52    os.path.join(os.path.dirname(__file__), 'sample_customer_data_update.csv'),
53    'rb').read()
54
55CUSTOMER_HEADER_FIELDS_UPDATE = CUSTOMER_SAMPLE_DATA_UPDATE.split(
56    '\n')[0].split(',')
57
58CUSTOMER_SAMPLE_DATA_DUPLICATES = open(
59    os.path.join(os.path.dirname(__file__),
60                 'sample_customer_data_duplicates.csv'),
61    'rb').read()
62
63CUSTOMER_HEADER_FIELDS_DUPLICATES = CUSTOMER_SAMPLE_DATA_DUPLICATES.split(
64    '\n')[0].split(',')
65
66DOCUMENT_SAMPLE_DATA = open(
67    os.path.join(os.path.dirname(__file__), 'sample_document_data.csv'),
68    'rb').read()
69
70DOCUMENT_HEADER_FIELDS = DOCUMENT_SAMPLE_DATA.split(
71    '\n')[0].split(',')
72
73CONTRACT_SAMPLE_DATA = open(
74    os.path.join(os.path.dirname(__file__), 'sample_contract_data.csv'),
75    'rb').read()
76
77CONTRACT_HEADER_FIELDS = CONTRACT_SAMPLE_DATA.split(
78    '\n')[0].split(',')
79
80class CustomerImportExportSetup(FunctionalTestCase):
81
82    layer = FunctionalLayer
83
84    def setUp(self):
85        super(CustomerImportExportSetup, self).setUp()
86        self.dc_root = tempfile.mkdtemp()
87        self.workdir = tempfile.mkdtemp()
88        app = Company()
89        app['datacenter'].setStoragePath(self.dc_root)
90        self.getRootFolder()['app'] = app
91        self.app = self.getRootFolder()['app']
92        setSite(app)
93        self.logfile = os.path.join(
94            self.app['datacenter'].storage, 'logs', 'customers.log')
95
96        #Add some products
97        self.product = createObject('waeup.Product')
98        self.product.product_id = u'SAM'
99        self.product.title = u'Our Samle Product'
100        self.product.contract_category = u'sample'
101        self.app['products'].addProduct(self.product)
102        return
103
104    def tearDown(self):
105        super(CustomerImportExportSetup, self).tearDown()
106        shutil.rmtree(self.workdir)
107        shutil.rmtree(self.dc_root)
108        clearSite()
109        return
110
111    def setup_for_export(self):
112        customer = Customer()
113        customer.customer_id = u'A111111'
114        self.app['customers'][customer.customer_id] = self.customer = customer
115        self.outfile = os.path.join(self.workdir, 'myoutput.csv')
116        return
117
118    def setup_customer(self, customer):
119        customer.matric_number = u'234'
120        customer.perm_address = u'Customerroad 21\nLagos 123456\n'
121        customer.reg_number = u'123'
122        customer.firstname = u'Anna'
123        customer.lastname = u'Tester'
124        customer.middlename = u'M.'
125        customer.date_of_birth = datetime.date(1981, 2, 4)
126        #customer.sex = 'f'
127        customer.email = 'anna@sample.com'
128        customer.phone = u'+234-123-12345'
129        customer.notice = u'Some notice\nin lines.'
130        customer.nationality = u'NG'
131        # Add document
132        document = createObject('waeup.CustomerSampleDocument')
133        document.title = u'My Document'
134        customer['documents'].addDocument(document)
135        IWorkflowState(document).setState(VERIFIED)
136        self.document = document
137        # Add contract
138        contract = createObject('waeup.SampleContract')
139        contract.title = u'My Contract'
140        contract.product_object = self.product
141        contract.document_object = self.document
142        customer['contracts'].addContract(contract)
143        self.contract = contract
144        return customer
145
146class CustomerProcessorTest(CustomerImportExportSetup):
147
148    layer = FunctionalLayer
149
150    def setUp(self):
151        super(CustomerProcessorTest, self).setUp()
152
153        # Add customer with subobjects
154        customer = Customer()
155        self.app['customers'].addCustomer(customer)
156        customer = self.setup_customer(customer)
157        notify(grok.ObjectModifiedEvent(customer))
158        self.customer = self.app['customers'][customer.customer_id]
159
160        self.processor = CustomerProcessor()
161        self.csv_file = os.path.join(self.workdir, 'sample_customer_data.csv')
162        self.csv_file_update = os.path.join(
163            self.workdir, 'sample_customer_data_update.csv')
164        self.csv_file_duplicates = os.path.join(
165            self.workdir, 'sample_customer_data_duplicates.csv')
166        open(self.csv_file, 'wb').write(CUSTOMER_SAMPLE_DATA)
167        open(self.csv_file_update, 'wb').write(CUSTOMER_SAMPLE_DATA_UPDATE)
168        open(self.csv_file_duplicates, 'wb').write(CUSTOMER_SAMPLE_DATA_DUPLICATES)
169
170    def test_interface(self):
171        # Make sure we fulfill the interface contracts.
172        assert verifyObject(IBatchProcessor, self.processor) is True
173        assert verifyClass(
174            IBatchProcessor, CustomerProcessor) is True
175
176    def test_parentsExist(self):
177        self.assertFalse(self.processor.parentsExist(None, dict()))
178        self.assertTrue(self.processor.parentsExist(None, self.app))
179
180    def test_entryExists(self):
181        assert self.processor.entryExists(
182            dict(customer_id='ID_NONE'), self.app) is False
183        assert self.processor.entryExists(
184            dict(reg_number='123'), self.app) is True
185
186    def test_getParent(self):
187        parent = self.processor.getParent(None, self.app)
188        assert parent is self.app['customers']
189
190    def test_getEntry(self):
191        assert self.processor.getEntry(
192            dict(customer_id='ID_NONE'), self.app) is None
193        assert self.processor.getEntry(
194            dict(customer_id=self.customer.customer_id), self.app) is self.customer
195
196    def test_addEntry(self):
197        new_customer = Customer()
198        self.processor.addEntry(
199            new_customer, dict(), self.app)
200        assert len(self.app['customers'].keys()) == 2
201
202    def test_checkConversion(self):
203        # Make sure we can check conversions and that the cust_id
204        # counter is not raised during such checks.
205        initial_cust_id = self.app['customers']._curr_cust_id
206        errs, inv_errs, conv_dict = self.processor.checkConversion(
207            dict(reg_number='1', state='approved'))
208        self.assertEqual(len(errs),0)
209        # Empty state is allowed
210        errs, inv_errs, conv_dict = self.processor.checkConversion(
211            dict(reg_number='1', state=''))
212        self.assertEqual(len(errs),0)
213        #self.assertTrue(('state', 'no value provided') in errs)
214        errs, inv_errs, conv_dict = self.processor.checkConversion(
215            dict(reg_number='1', state='nonsense'))
216        self.assertEqual(len(errs),1)
217        self.assertTrue(('state', 'not allowed') in errs)
218        new_cust_id = self.app['customers']._curr_cust_id
219        self.assertEqual(initial_cust_id, new_cust_id)
220        return
221
222    def test_checkUpdateRequirements(self):
223        # Make sure that pg customers can't be updated with wrong transition.
224        err = self.processor.checkUpdateRequirements(self.customer,
225            dict(reg_number='1', state='returning'), self.app)
226        self.assertTrue(err is None)
227
228    def test_delEntry(self):
229        assert self.customer.customer_id in self.app['customers'].keys()
230        self.processor.delEntry(
231            dict(reg_number=self.customer.reg_number), self.app)
232        assert self.customer.customer_id not in self.app['customers'].keys()
233
234    def test_import(self):
235        self.assertEqual(self.app['customers']._curr_cust_id, 1000001)
236        num, num_warns, fin_file, fail_file = self.processor.doImport(
237            self.csv_file, CUSTOMER_HEADER_FIELDS)
238        self.assertEqual(num_warns,0)
239        self.assertEqual(len(self.app['customers']), 10)
240        self.assertEqual(self.app['customers']['X666666'].reg_number,'1')
241        self.assertEqual(
242            self.app['customers']['X666666'].state, 'requested')
243        # Two new customer_ids have been created.
244        self.assertEqual(self.app['customers']._curr_cust_id, 1000003)
245        shutil.rmtree(os.path.dirname(fin_file))
246
247    def test_import_update(self):
248        num, num_warns, fin_file, fail_file = self.processor.doImport(
249            self.csv_file, CUSTOMER_HEADER_FIELDS)
250        shutil.rmtree(os.path.dirname(fin_file))
251        num, num_warns, fin_file, fail_file = self.processor.doImport(
252            self.csv_file_update, CUSTOMER_HEADER_FIELDS_UPDATE, 'update')
253        self.assertEqual(num_warns,0)
254        # state has changed
255        self.assertEqual(self.app['customers']['X666666'].state,'approved')
256        # state has not changed
257        self.assertEqual(self.app['customers']['Y777777'].state, 'requested')
258        shutil.rmtree(os.path.dirname(fin_file))
259
260    def test_import_duplicate_data(self):
261        num, num_warns, fin_file, fail_file = self.processor.doImport(
262            self.csv_file_duplicates, CUSTOMER_HEADER_FIELDS_DUPLICATES)
263        content = open(fail_file).read()
264        self.assertEqual(num_warns,2)
265        self.assertEqual(
266            content,
267            'reg_number,password,firstname,lastname,sex,phone,state,date_of_birth,customer_id,email,--ERRORS--\r\n'
268            '1,,Frank,Meyer,m,1234,,1990-01-06,F123456,aa@aa.ng,reg_number: reg_number\r\n'
269            '3,,Uli,Schulz,m,1234,,1990-01-07,A123456,aa@aa.ng,This object already exists. Skipping.\r\n'
270
271            )
272        shutil.rmtree(os.path.dirname(fin_file))
273
274class CustomerDocumentProcessorTest(CustomerImportExportSetup):
275
276    def setUp(self):
277        super(CustomerDocumentProcessorTest, self).setUp()
278
279        customer = Customer()
280        self.app['customers'].addCustomer(customer)
281        customer = self.setup_customer(customer)
282        notify(grok.ObjectModifiedEvent(customer))
283        self.customer = self.app['customers'][customer.customer_id]
284
285        # Import customers with subobjects
286        customer_file = os.path.join(self.workdir, 'sample_customer_data.csv')
287        open(customer_file, 'wb').write(CUSTOMER_SAMPLE_DATA)
288        num, num_warns, fin_file, fail_file = CustomerProcessor().doImport(
289            customer_file, CUSTOMER_HEADER_FIELDS)
290        shutil.rmtree(os.path.dirname(fin_file))
291
292        self.processor = CustomerDocumentProcessor()
293        self.csv_file = os.path.join(
294            self.workdir, 'sample_document_data.csv')
295        open(self.csv_file, 'wb').write(DOCUMENT_SAMPLE_DATA)
296
297    def test_interface(self):
298        # Make sure we fulfill the interface contracts.
299        assert verifyObject(IBatchProcessor, self.processor) is True
300        assert verifyClass(
301            IBatchProcessor, CustomerDocumentProcessor) is True
302
303    def test_getEntry(self):
304        assert self.processor.getEntry(
305            dict(customer_id='ID_NONE', document_id='nonsense'), self.app) is None
306        assert self.processor.getEntry(
307            dict(customer_id=self.customer.customer_id, document_id='d101'),
308            self.app) is self.customer['documents']['d101']
309
310    def test_delEntry(self):
311        assert self.processor.getEntry(
312            dict(customer_id=self.customer.customer_id, document_id='d101'),
313            self.app) is self.customer['documents']['d101']
314        self.assertEqual(len(self.customer['documents'].keys()),1)
315        self.processor.delEntry(
316            dict(customer_id=self.customer.customer_id, document_id='d101'),
317            self.app)
318        assert self.processor.getEntry(
319            dict(customer_id=self.customer.customer_id, document_id='d101'),
320            self.app) is None
321        self.assertEqual(len(self.customer['documents'].keys()),0)
322
323    def test_addEntry(self):
324        self.assertEqual(self.app._curr_doc_id, 102)
325        self.assertEqual(len(self.customer['documents'].keys()),1)
326        document1 = createObject(u'waeup.CustomerSampleDocument')
327        document1.document_id = 'd234'
328        self.processor.addEntry(
329            document1, dict(customer_id=self.customer.customer_id, document_id='d234'),
330            self.app)
331        self.assertEqual(len(self.customer['documents'].keys()),2)
332        self.assertEqual(self.customer['documents']['d234'].document_id, 'd234')
333        # _curr_doc_id hasn't changed
334        self.assertEqual(self.app._curr_doc_id, 102)
335
336    def test_checkConversion(self):
337        self.assertEqual(self.app._curr_doc_id, 102)
338        errs, inv_errs, conv_dict = self.processor.checkConversion(
339            dict(document_id='d126', class_name='CustomerSampleDocument'))
340        self.assertEqual(len(errs),0)
341        errs, inv_errs, conv_dict = self.processor.checkConversion(
342            dict(document_id='nonsense', class_name='CustomerSampleDocument'))
343        self.assertEqual(len(errs),1)
344        errs, inv_errs, conv_dict = self.processor.checkConversion(
345            dict(document_id='d127', class_name='WrongDocument'))
346        self.assertEqual(len(errs),1)
347        errs, inv_errs, conv_dict = self.processor.checkConversion(
348            dict(document_id='', class_name='CustomerSampleDocument',
349            mode='update'))
350        self.assertEqual(len(errs),1)
351        # _curr_doc_id hasn't changed
352        self.assertEqual(self.app._curr_doc_id, 102)
353
354    def test_import(self):
355        num, num_warns, fin_file, fail_file = self.processor.doImport(
356            self.csv_file, DOCUMENT_HEADER_FIELDS,'create')
357        self.assertEqual(num_warns,1)
358        # document_id must be unique
359        fail_file = open(fail_file).read()
360        self.assertEqual(fail_file,
361            'class_name,reg_number,document_id,title,--ERRORS--\r\n'
362            'CustomerSampleDocument,2,d5,My stolen doc,document_id: id exists\r\n')
363        document = self.processor.getEntry(dict(reg_number='1',
364            document_id='d3'), self.app)
365        self.assertEqual(
366            self.app['customers']['X666666']['documents']['d3'],
367            document)
368        self.assertEqual(document.document_id, 'd3')
369        document = self.processor.getEntry(dict(reg_number='3',
370            document_id='d5'), self.app)
371        shutil.rmtree(os.path.dirname(fin_file))
372        logcontent = open(self.logfile).read()
373        # Logging message from updateEntry
374        self.assertTrue(
375            'INFO - system - CustomerDocument Processor - '
376            'sample_document_data - X666666 - updated: '
377            'document_id=d3, title=My first doc'
378            in logcontent)
379
380    def test_import_update(self):
381        # We perform the same import twice,
382        # the second time in update mode. The number
383        # of warnings must be the same.
384        num, num_warns, fin_file, fail_file = self.processor.doImport(
385            self.csv_file, DOCUMENT_HEADER_FIELDS,'create')
386        shutil.rmtree(os.path.dirname(fin_file))
387        num, num_warns, fin_file, fail_file = self.processor.doImport(
388            self.csv_file, DOCUMENT_HEADER_FIELDS,'update')
389        # There is one record without document_id and one duplicate
390        self.assertEqual(num_warns,2)
391        shutil.rmtree(os.path.dirname(fin_file))
392
393    def test_import_remove(self):
394        # We perform the same import twice,
395        # the second time in remove mode. The number
396        # of warnings must be the same.
397        num, num_warns, fin_file, fail_file = self.processor.doImport(
398            self.csv_file, DOCUMENT_HEADER_FIELDS,'create')
399        shutil.rmtree(os.path.dirname(fin_file))
400        num, num_warns, fin_file, fail_file = self.processor.doImport(
401            self.csv_file, DOCUMENT_HEADER_FIELDS,'remove')
402        # There is one record without document_id and one duplicate
403        self.assertEqual(num_warns,2)
404        shutil.rmtree(os.path.dirname(fin_file))
405        logcontent = open(self.logfile).read()
406        self.assertTrue(
407            'INFO - system - K1000001 - Document removed: d5'
408            in logcontent)
409
410
411class ContractProcessorTest(CustomerImportExportSetup):
412
413    def setUp(self):
414        super(ContractProcessorTest, self).setUp()
415
416        customer = Customer()
417        self.app['customers'].addCustomer(customer)
418        customer = self.setup_customer(customer)
419        notify(grok.ObjectModifiedEvent(customer))
420        self.customer = self.app['customers'][customer.customer_id]
421
422        # Import customers with subobjects
423        customer_file = os.path.join(self.workdir, 'sample_customer_data.csv')
424        open(customer_file, 'wb').write(CUSTOMER_SAMPLE_DATA)
425        num, num_warns, fin_file, fail_file = CustomerProcessor().doImport(
426            customer_file, CUSTOMER_HEADER_FIELDS)
427        shutil.rmtree(os.path.dirname(fin_file))
428
429        self.processor = ContractProcessor()
430        self.csv_file = os.path.join(
431            self.workdir, 'sample_contract_data.csv')
432        open(self.csv_file, 'wb').write(CONTRACT_SAMPLE_DATA)
433
434    def test_interface(self):
435        # Make sure we fulfill the interface contracts.
436        assert verifyObject(IBatchProcessor, self.processor) is True
437        assert verifyClass(
438            IBatchProcessor, ContractProcessor) is True
439
440    def test_getEntry(self):
441        assert self.processor.getEntry(
442            dict(customer_id='ID_NONE', contract_id='nonsense'), self.app) is None
443        assert self.processor.getEntry(
444            dict(customer_id=self.customer.customer_id, contract_id='c101'),
445            self.app) is self.customer['contracts']['c101']
446
447    def test_delEntry(self):
448        assert self.processor.getEntry(
449            dict(customer_id=self.customer.customer_id, contract_id='c101'),
450            self.app) is self.customer['contracts']['c101']
451        self.assertEqual(len(self.customer['contracts'].keys()),1)
452        self.processor.delEntry(
453            dict(customer_id=self.customer.customer_id, contract_id='c101'),
454            self.app)
455        assert self.processor.getEntry(
456            dict(customer_id=self.customer.customer_id, contract_id='c101'),
457            self.app) is None
458        self.assertEqual(len(self.customer['contracts'].keys()),0)
459
460    def test_addEntry(self):
461        self.assertEqual(self.app._curr_con_id, 102)
462        self.assertEqual(len(self.customer['contracts'].keys()),1)
463        contract1 = createObject(u'waeup.SampleContract')
464        contract1.contract_id = 'c234'
465        self.processor.addEntry(
466            contract1, dict(customer_id=self.customer.customer_id, contract_id='c234'),
467            self.app)
468        self.assertEqual(len(self.customer['contracts'].keys()),2)
469        self.assertEqual(self.customer['contracts']['c234'].contract_id, 'c234')
470        # _curr_con_id hasn't changed
471        self.assertEqual(self.app._curr_con_id, 102)
472
473    def test_checkConversion(self):
474        self.assertEqual(self.app._curr_con_id, 102)
475        errs, inv_errs, conv_dict = self.processor.checkConversion(
476            dict(contract_id='c126', class_name='SampleContract',
477                 document_object='d101', product_object='SAM'))
478        self.assertEqual(len(errs),0)
479        errs, inv_errs, conv_dict = self.processor.checkConversion(
480            dict(contract_id='nonsense', class_name='SampleContract'))
481        self.assertEqual(len(errs),1)
482        errs, inv_errs, conv_dict = self.processor.checkConversion(
483            dict(contract_id='c127', class_name='WrongContract'))
484        self.assertEqual(len(errs),1)
485        # _curr_con_id hasn't changed
486        self.assertEqual(self.app._curr_con_id, 102)
487
488    def test_import(self):
489        num, num_warns, fin_file, fail_file = self.processor.doImport(
490            self.csv_file, CONTRACT_HEADER_FIELDS,'create')
491        self.assertEqual(num_warns,3)
492        # contract_id must be unique
493        fail_file = open(fail_file).read()
494        self.assertEqual(fail_file,
495            'reg_number,contract_id,title,class_name,product_object,'
496            'document_object,--ERRORS--\r\n'
497            '2,c5,My stolen contract,SampleContract,SAM,d101,'
498            'contract_id: id exists\r\n'
499            '3,c6,My 5th contract,SampleContract,SAM,NONEXISTENT,'
500            'document_object: Invalid value\r\n'
501            '3,c7,My sixt contract,SampleContract,NONEXISTENT,d101,'
502            'product_object: Invalid value\r\n')
503        contract = self.processor.getEntry(dict(reg_number='1',
504            contract_id='c3'), self.app)
505        self.assertEqual(
506            self.app['customers']['X666666']['contracts']['c3'],
507            contract)
508        self.assertEqual(contract.contract_id, 'c3')
509        contract = self.processor.getEntry(dict(reg_number='3',
510            contract_id='c5'), self.app)
511        shutil.rmtree(os.path.dirname(fin_file))
512        logcontent = open(self.logfile).read()
513        # Logging message from updateEntry
514        self.assertTrue(
515            'INFO - system - Contract Processor - '
516            'sample_contract_data - X666666 - updated: '
517            'contract_id=c3, title=My first contract'
518            in logcontent)
519
520    def test_import_update(self):
521        # We perform the same import twice,
522        # the second time in update mode. The number
523        # of warnings must be the same.
524        num, num_warns, fin_file, fail_file = self.processor.doImport(
525            self.csv_file, CONTRACT_HEADER_FIELDS,'create')
526        shutil.rmtree(os.path.dirname(fin_file))
527        num, num_warns, fin_file, fail_file = self.processor.doImport(
528            self.csv_file, CONTRACT_HEADER_FIELDS,'update')
529        # There are one record without contract_id, one duplicate and two
530        # records with wrong object identifier.
531        self.assertEqual(num_warns,4)
532        shutil.rmtree(os.path.dirname(fin_file))
533
534    def test_import_remove(self):
535        # We perform the same import twice,
536        # the second time in remove mode. The number
537        # of warnings must be the same.
538        num, num_warns, fin_file, fail_file = self.processor.doImport(
539            self.csv_file, CONTRACT_HEADER_FIELDS,'create')
540        shutil.rmtree(os.path.dirname(fin_file))
541        num, num_warns, fin_file, fail_file = self.processor.doImport(
542            self.csv_file, CONTRACT_HEADER_FIELDS,'remove')
543        # There are one record without contract_id, one duplicate and two
544        # records with wrong object identifier.
545        self.assertEqual(num_warns,4)
546        shutil.rmtree(os.path.dirname(fin_file))
547        logcontent = open(self.logfile).read()
548        self.assertTrue(
549            'INFO - system - K1000001 - Contract removed: c5'
550            in logcontent)
Note: See TracBrowser for help on using the repository browser.