source: main/waeup.ikoba/trunk/src/waeup/ikoba/customers/tests/test_batching.py @ 12820

Last change on this file since 12820 was 12812, checked in by Henrik Bettermann, 10 years ago

Skip empty rows produced by spreadsheet software.

  • Property svn:keywords set to Id
File size: 24.3 KB
Line 
1# -*- coding: utf-8 -*-
2## $Id: test_batching.py 12812 2015-03-23 11:32:12Z henrik $
3##
4## Copyright (C) 2014 Uli Fouquet & Henrik Bettermann
5## This program is free software; you can redistribute it and/or modify
6## it under the terms of the GNU General Public License as published by
7## the Free Software Foundation; either version 2 of the License, or
8## (at your option) any later version.
9##
10## This program is distributed in the hope that it will be useful,
11## but WITHOUT ANY WARRANTY; without even the implied warranty of
12## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13## GNU General Public License for more details.
14##
15## You should have received a copy of the GNU General Public License
16## along with this program; if not, write to the Free Software
17## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18##
19"""Unit tests for customer-related data processors.
20"""
21import os
22import shutil
23import tempfile
24import unittest
25import datetime
26import grok
27from decimal import Decimal
28from time import time
29from zope.event import notify
30from zope.component import createObject, queryUtility
31from zope.component.hooks import setSite, clearSite
32from zope.catalog.interfaces import ICatalog
33from zope.interface.verify import verifyClass, verifyObject
34from hurry.workflow.interfaces import IWorkflowState
35
36from waeup.ikoba.app import Company
37from waeup.ikoba.interfaces import (
38    IBatchProcessor, FatalCSVError, IUserAccount, VERIFIED)
39from waeup.ikoba.customers.batching import (
40    CustomerProcessor, CustomerSampleDocumentProcessor, SampleContractProcessor)
41from waeup.ikoba.customers.customer import Customer
42from waeup.ikoba.customers.documents import CustomerSampleDocument
43from waeup.ikoba.products.productoptions import ProductOption
44from waeup.ikoba.testing import FunctionalLayer, FunctionalTestCase
45
46CUSTOMER_SAMPLE_DATA = open(
47    os.path.join(os.path.dirname(__file__), 'sample_customer_data.csv'),
48    'rb').read()
49
50CUSTOMER_HEADER_FIELDS = CUSTOMER_SAMPLE_DATA.split(
51    '\n')[0].split(',')
52
53CUSTOMER_SAMPLE_DATA_UPDATE = open(
54    os.path.join(os.path.dirname(__file__), 'sample_customer_data_update.csv'),
55    'rb').read()
56
57CUSTOMER_HEADER_FIELDS_UPDATE = CUSTOMER_SAMPLE_DATA_UPDATE.split(
58    '\n')[0].split(',')
59
60CUSTOMER_SAMPLE_DATA_DUPLICATES = open(
61    os.path.join(os.path.dirname(__file__),
62                 'sample_customer_data_duplicates.csv'),
63    'rb').read()
64
65CUSTOMER_HEADER_FIELDS_DUPLICATES = CUSTOMER_SAMPLE_DATA_DUPLICATES.split(
66    '\n')[0].split(',')
67
68DOCUMENT_SAMPLE_DATA = open(
69    os.path.join(os.path.dirname(__file__), 'sample_document_data.csv'),
70    'rb').read()
71
72DOCUMENT_HEADER_FIELDS = DOCUMENT_SAMPLE_DATA.split(
73    '\n')[0].split(',')
74
75CONTRACT_SAMPLE_DATA = open(
76    os.path.join(os.path.dirname(__file__), 'sample_contract_data.csv'),
77    'rb').read()
78
79CONTRACT_HEADER_FIELDS = CONTRACT_SAMPLE_DATA.split(
80    '\n')[0].split(',')
81
82class CustomerImportExportSetup(FunctionalTestCase):
83
84    layer = FunctionalLayer
85
86    def setup_customizable_params(self):
87        self._contract_category = u'sample'
88        return
89
90    def setUp(self):
91        super(CustomerImportExportSetup, self).setUp()
92        self.setup_customizable_params()
93        self.dc_root = tempfile.mkdtemp()
94        self.workdir = tempfile.mkdtemp()
95        app = Company()
96        app['datacenter'].setStoragePath(self.dc_root)
97        self.getRootFolder()['app'] = app
98        self.app = self.getRootFolder()['app']
99        setSite(app)
100        self.logfile = os.path.join(
101            self.app['datacenter'].storage, 'logs', 'customers.log')
102
103        #Add some products
104        self.product = createObject('waeup.Product')
105        self.product.product_id = u'SAM'
106        self.product.title = u'Our Sample Product'
107        self.product.contract_category = self._contract_category
108        self.product.valid_from = datetime.date(2015, 12, 4)
109        self.product.tc_dict = {'en': u'Hello world'}
110        option = ProductOption()
111        option.title = u'Base Fee'
112        option.fee = Decimal('800.6')
113        option.currency = 'USD'
114        self.product.options = [option,]
115        self.app['products'].addProduct(self.product)
116        return
117
118    def tearDown(self):
119        super(CustomerImportExportSetup, self).tearDown()
120        shutil.rmtree(self.workdir)
121        shutil.rmtree(self.dc_root)
122        clearSite()
123        return
124
125    def setup_for_export(self):
126        customer = Customer()
127        customer.customer_id = u'A111111'
128        self.app['customers'][customer.customer_id] = self.customer = customer
129        self.outfile = os.path.join(self.workdir, 'myoutput.csv')
130        return
131
132    def setup_customer(self, customer):
133        customer.matric_number = u'234'
134        customer.perm_address = u'Customerroad 21\nLagos 123456\n'
135        customer.reg_number = u'123'
136        customer.firstname = u'Anna'
137        customer.lastname = u'Tester'
138        customer.middlename = u'M.'
139        customer.email = u'bb@bb.bb'
140        customer.date_of_birth = datetime.date(1981, 2, 4)
141        #customer.sex = 'f'
142        customer.email = u'anna@sample.com'
143        customer.phone = u'+234-123-12345'
144        customer.notice = u'Some notice\nin lines.'
145        customer.nationality = u'NG'
146        # Add document
147        document = createObject('waeup.CustomerSampleDocument')
148        document.title = u'My Document'
149        document.document_id = u'DOC1'
150        customer['documents'].addDocument(document)
151        IWorkflowState(document).setState(VERIFIED)
152        self.document = document
153        # Add contract
154        contract = createObject('waeup.SampleContract')
155        contract.contract_id = u'CON1'
156        contract.product_object = self.product
157        contract.document_object = self.document
158        contract.product_options = self.product.options
159        contract.tc_dict = {'en': u'Hello world'}
160        contract.valid_from = datetime.date(2014, 2, 4)
161        contract.valid_to = datetime.date(2014, 12, 4)
162
163        customer['contracts'].addContract(contract)
164        self.contract = contract
165        return customer
166
167class CustomerProcessorTest(CustomerImportExportSetup):
168
169    layer = FunctionalLayer
170
171    def setUp(self):
172        super(CustomerProcessorTest, self).setUp()
173
174        # Add customer
175        customer = Customer()
176        self.app['customers'].addCustomer(customer)
177        customer = self.setup_customer(customer)
178        notify(grok.ObjectModifiedEvent(customer))
179        self.customer = self.app['customers'][customer.customer_id]
180
181        self.processor = CustomerProcessor()
182        self.csv_file = os.path.join(self.workdir, 'sample_customer_data.csv')
183        self.csv_file_update = os.path.join(
184            self.workdir, 'sample_customer_data_update.csv')
185        self.csv_file_duplicates = os.path.join(
186            self.workdir, 'sample_customer_data_duplicates.csv')
187        open(self.csv_file, 'wb').write(CUSTOMER_SAMPLE_DATA)
188        open(self.csv_file_update, 'wb').write(CUSTOMER_SAMPLE_DATA_UPDATE)
189        open(self.csv_file_duplicates, 'wb').write(CUSTOMER_SAMPLE_DATA_DUPLICATES)
190
191    def test_interface(self):
192        # Make sure we fulfill the interface contracts.
193        assert verifyObject(IBatchProcessor, self.processor) is True
194        assert verifyClass(
195            IBatchProcessor, CustomerProcessor) is True
196
197    def test_parentsExist(self):
198        self.assertFalse(self.processor.parentsExist(None, dict()))
199        self.assertTrue(self.processor.parentsExist(None, self.app))
200
201    def test_entryExists(self):
202        assert self.processor.entryExists(
203            dict(customer_id='ID_NONE'), self.app) is False
204        assert self.processor.entryExists(
205            dict(reg_number='123'), self.app) is True
206
207    def test_getParent(self):
208        parent = self.processor.getParent(None, self.app)
209        assert parent is self.app['customers']
210
211    def test_getEntry(self):
212        assert self.processor.getEntry(
213            dict(customer_id='ID_NONE'), self.app) is None
214        assert self.processor.getEntry(
215            dict(customer_id=self.customer.customer_id), self.app) is self.customer
216
217    def test_addEntry(self):
218        new_customer = Customer()
219        self.processor.addEntry(
220            new_customer, dict(), self.app)
221        assert len(self.app['customers'].keys()) == 2
222
223    def test_checkConversion(self):
224        # Make sure we can check conversions and that the cust_id
225        # counter is not raised during such checks.
226        initial_cust_id = self.app['customers']._curr_cust_id
227        errs, inv_errs, conv_dict = self.processor.checkConversion(
228            dict(reg_number='1', state='approved'))
229        self.assertEqual(len(errs),0)
230        # Empty state is allowed
231        errs, inv_errs, conv_dict = self.processor.checkConversion(
232            dict(reg_number='1', state=''))
233        self.assertEqual(len(errs),0)
234        #self.assertTrue(('state', 'no value provided') in errs)
235        errs, inv_errs, conv_dict = self.processor.checkConversion(
236            dict(reg_number='1', state='nonsense'))
237        self.assertEqual(len(errs),1)
238        self.assertTrue(('state', 'not allowed') in errs)
239        new_cust_id = self.app['customers']._curr_cust_id
240        self.assertEqual(initial_cust_id, new_cust_id)
241        return
242
243    def test_checkUpdateRequirements(self):
244        # Make sure that pg customers can't be updated with wrong transition.
245        err = self.processor.checkUpdateRequirements(self.customer,
246            dict(reg_number='1', state='returning'), self.app)
247        self.assertTrue(err is None)
248
249    def test_delEntry(self):
250        assert self.customer.customer_id in self.app['customers'].keys()
251        self.processor.delEntry(
252            dict(reg_number=self.customer.reg_number), self.app)
253        assert self.customer.customer_id not in self.app['customers'].keys()
254
255    def test_import(self):
256        self.assertEqual(self.app['customers']._curr_cust_id, 1000001)
257        num, num_warns, fin_file, fail_file = self.processor.doImport(
258            self.csv_file, CUSTOMER_HEADER_FIELDS)
259        self.assertEqual(num_warns,0)
260        self.assertEqual(len(self.app['customers']), 10)
261        # Three empty rows have been skipped.
262        self.assertEqual(num, 12)
263        self.assertEqual(self.app['customers']['X666666'].reg_number,'1')
264        self.assertEqual(
265            self.app['customers']['X666666'].state, 'requested')
266        # Two new customer_ids have been created.
267        self.assertEqual(self.app['customers']._curr_cust_id, 1000003)
268        shutil.rmtree(os.path.dirname(fin_file))
269
270    def test_import_update(self):
271        num, num_warns, fin_file, fail_file = self.processor.doImport(
272            self.csv_file, CUSTOMER_HEADER_FIELDS)
273        shutil.rmtree(os.path.dirname(fin_file))
274        num, num_warns, fin_file, fail_file = self.processor.doImport(
275            self.csv_file_update, CUSTOMER_HEADER_FIELDS_UPDATE, 'update')
276        self.assertEqual(num_warns,0)
277        # state has changed
278        self.assertEqual(self.app['customers']['X666666'].state,'approved')
279        # state has not changed
280        self.assertEqual(self.app['customers']['Y777777'].state, 'requested')
281        shutil.rmtree(os.path.dirname(fin_file))
282
283    def test_import_duplicate_data(self):
284        num, num_warns, fin_file, fail_file = self.processor.doImport(
285            self.csv_file_duplicates, CUSTOMER_HEADER_FIELDS_DUPLICATES)
286        content = open(fail_file).read()
287        self.assertEqual(num_warns,3)
288        self.assertEqual(
289            content,
290            'reg_number,password,firstname,lastname,sex,phone,state,date_of_birth,customer_id,email,--ERRORS--\r\n'
291            '1,,Frank,Meyer,m,1234,,1990-01-06,F123456,cc@aa.ng,reg_number: Invalid input\r\n'
292            '3,,Uli,Schulz,m,1234,,1990-01-07,A123456,dd@aa.ng,This object already exists. Skipping.\r\n'
293            '4,,Berta,Pieri,m,1234,,1991-01-07,A234567,aa@aa.ng,email: Invalid input\r\n'
294            )
295        shutil.rmtree(os.path.dirname(fin_file))
296
297class CustomerSampleDocumentProcessorTest(CustomerImportExportSetup):
298
299    def setUp(self):
300        super(CustomerSampleDocumentProcessorTest, self).setUp()
301
302        customer = Customer()
303        self.app['customers'].addCustomer(customer)
304        customer = self.setup_customer(customer)
305        notify(grok.ObjectModifiedEvent(customer))
306        self.customer = self.app['customers'][customer.customer_id]
307
308        # Import customers with subobjects
309        customer_file = os.path.join(self.workdir, 'sample_customer_data.csv')
310        open(customer_file, 'wb').write(CUSTOMER_SAMPLE_DATA)
311        num, num_warns, fin_file, fail_file = CustomerProcessor().doImport(
312            customer_file, CUSTOMER_HEADER_FIELDS)
313        shutil.rmtree(os.path.dirname(fin_file))
314
315        self.processor = CustomerSampleDocumentProcessor()
316        self.csv_file = os.path.join(
317            self.workdir, 'sample_document_data.csv')
318        open(self.csv_file, 'wb').write(DOCUMENT_SAMPLE_DATA)
319
320    def test_interface(self):
321        # Make sure we fulfill the interface contracts.
322        assert verifyObject(IBatchProcessor, self.processor) is True
323        assert verifyClass(
324            IBatchProcessor, CustomerSampleDocumentProcessor) is True
325
326    def test_getEntry(self):
327        assert self.processor.getEntry(
328            dict(customer_id='ID_NONE', document_id='nonsense'), self.app) is None
329        assert self.processor.getEntry(
330            dict(customer_id=self.customer.customer_id, document_id='DOC1'),
331            self.app) is self.customer['documents']['DOC1']
332
333    def test_delEntry(self):
334        assert self.processor.getEntry(
335            dict(customer_id=self.customer.customer_id, document_id='DOC1'),
336            self.app) is self.customer['documents']['DOC1']
337        self.assertEqual(len(self.customer['documents'].keys()),1)
338        self.processor.delEntry(
339            dict(customer_id=self.customer.customer_id, document_id='DOC1'),
340            self.app)
341        assert self.processor.getEntry(
342            dict(customer_id=self.customer.customer_id, document_id='DOC1'),
343            self.app) is None
344        self.assertEqual(len(self.customer['documents'].keys()),0)
345
346    def test_addEntry(self):
347        self.assertEqual(len(self.customer['documents'].keys()),1)
348        document1 = createObject(u'waeup.CustomerSampleDocument')
349        document1.document_id = u'd234'
350        self.processor.addEntry(
351            document1, dict(customer_id=self.customer.customer_id, document_id='d234'),
352            self.app)
353        self.assertEqual(len(self.customer['documents'].keys()),2)
354        self.assertEqual(self.customer['documents']['d234'].document_id, 'd234')
355
356    def test_checkConversion(self):
357        errs, inv_errs, conv_dict = self.processor.checkConversion(
358            dict(document_id='DOC', class_name='CustomerSampleDocument'))
359        self.assertEqual(len(errs),0)
360        errs, inv_errs, conv_dict = self.processor.checkConversion(
361            dict(document_id='id with spaces', class_name='CustomerSampleDocument'))
362        self.assertEqual(len(errs),1)
363        errs, inv_errs, conv_dict = self.processor.checkConversion(
364            dict(document_id='DOC', class_name='WrongDocument'))
365        self.assertEqual(len(errs),1)
366
367    def test_import(self):
368        num, num_warns, fin_file, fail_file = self.processor.doImport(
369            self.csv_file, DOCUMENT_HEADER_FIELDS,'create')
370        fail_file = open(fail_file).read()
371        self.assertEqual(num_warns,1)
372        # document_id must be unique
373        self.assertEqual(fail_file,
374            'class_name,reg_number,document_id,title,--ERRORS--\r\n'
375            'CustomerSampleDocument,2,d5,My stolen doc,'
376            'This object already exists. Skipping.\r\n')
377        document = self.processor.getEntry(dict(reg_number='1',
378            document_id='d3'), self.app)
379        self.assertEqual(
380            self.app['customers']['X666666']['documents']['d3'],
381            document)
382        self.assertEqual(document.document_id, 'd3')
383        document = self.processor.getEntry(dict(reg_number='3',
384            document_id='d5'), self.app)
385        shutil.rmtree(os.path.dirname(fin_file))
386        logcontent = open(self.logfile).read()
387        # Fetch uuid
388        docid = [i for i in self.app['customers']['X666666']['documents'].keys()
389            if len(i) > 10][0]
390         # Logging message from updateEntry
391        self.assertTrue(
392            'INFO - system - Customer Sample Document Processor - '
393            'sample_document_data - X666666 - %s - updated: title=My 4th doc'
394            % docid in logcontent)
395
396    def test_import_update(self):
397        # We perform the same import twice,
398        # the second time in update mode. The number
399        # of warnings must be the same.
400        num, num_warns, fin_file, fail_file = self.processor.doImport(
401            self.csv_file, DOCUMENT_HEADER_FIELDS,'create')
402        shutil.rmtree(os.path.dirname(fin_file))
403        num, num_warns, fin_file, fail_file = self.processor.doImport(
404            self.csv_file, DOCUMENT_HEADER_FIELDS,'update')
405        # There is one record without document_id
406        self.assertEqual(num_warns,1)
407        shutil.rmtree(os.path.dirname(fin_file))
408
409    def test_import_remove(self):
410        # We perform the same import twice,
411        # the second time in remove mode. The number
412        # of warnings must be the same.
413        num, num_warns, fin_file, fail_file = self.processor.doImport(
414            self.csv_file, DOCUMENT_HEADER_FIELDS,'create')
415        shutil.rmtree(os.path.dirname(fin_file))
416        num, num_warns, fin_file, fail_file = self.processor.doImport(
417            self.csv_file, DOCUMENT_HEADER_FIELDS,'remove')
418        # There is one record without document_id and one duplicate
419        self.assertEqual(num_warns,2)
420        shutil.rmtree(os.path.dirname(fin_file))
421        logcontent = open(self.logfile).read()
422        self.assertTrue(
423            'INFO - system - K1000001 - Document removed: d5'
424            in logcontent)
425
426
427class SampleContractProcessorTest(CustomerImportExportSetup):
428
429    def setUp(self):
430        super(SampleContractProcessorTest, self).setUp()
431
432        customer = Customer()
433        self.app['customers'].addCustomer(customer)
434        customer = self.setup_customer(customer)
435        notify(grok.ObjectModifiedEvent(customer))
436        self.customer = self.app['customers'][customer.customer_id]
437
438        # Import customers with subobjects
439        customer_file = os.path.join(self.workdir, 'sample_customer_data.csv')
440        open(customer_file, 'wb').write(CUSTOMER_SAMPLE_DATA)
441        num, num_warns, fin_file, fail_file = CustomerProcessor().doImport(
442            customer_file, CUSTOMER_HEADER_FIELDS)
443        shutil.rmtree(os.path.dirname(fin_file))
444
445        self.processor = SampleContractProcessor()
446        self.csv_file = os.path.join(
447            self.workdir, 'sample_contract_data.csv')
448        open(self.csv_file, 'wb').write(CONTRACT_SAMPLE_DATA)
449
450    def test_interface(self):
451        # Make sure we fulfill the interface contracts.
452        assert verifyObject(IBatchProcessor, self.processor) is True
453        assert verifyClass(
454            IBatchProcessor, SampleContractProcessor) is True
455
456    def test_getEntry(self):
457        assert self.processor.getEntry(
458            dict(customer_id='ID_NONE', contract_id='nonsense'), self.app) is None
459        assert self.processor.getEntry(
460            dict(customer_id=self.customer.customer_id, contract_id='CON1'),
461            self.app) is self.customer['contracts']['CON1']
462
463    def test_delEntry(self):
464        assert self.processor.getEntry(
465            dict(customer_id=self.customer.customer_id, contract_id='CON1'),
466            self.app) is self.customer['contracts']['CON1']
467        self.assertEqual(len(self.customer['contracts'].keys()),1)
468        self.processor.delEntry(
469            dict(customer_id=self.customer.customer_id, contract_id='CON1'),
470            self.app)
471        assert self.processor.getEntry(
472            dict(customer_id=self.customer.customer_id, contract_id='CON1'),
473            self.app) is None
474        self.assertEqual(len(self.customer['contracts'].keys()),0)
475
476    def test_addEntry(self):
477        self.assertEqual(len(self.customer['contracts'].keys()),1)
478        contract1 = createObject(u'waeup.SampleContract')
479        contract1.contract_id = u'CON2'
480        self.processor.addEntry(
481            contract1, dict(customer_id=self.customer.customer_id, contract_id='CON2'),
482            self.app)
483        self.assertEqual(len(self.customer['contracts'].keys()),2)
484        self.assertEqual(self.customer['contracts']['CON2'].contract_id, 'CON2')
485
486    def test_checkConversion(self):
487        errs, inv_errs, conv_dict = self.processor.checkConversion(
488            dict(contract_id='CON3', class_name='SampleContract',
489                 document_object='DOC1', product_object='SAM'))
490        self.assertEqual(len(errs),0)
491        errs, inv_errs, conv_dict = self.processor.checkConversion(
492            dict(contract_id='id with spaces', class_name='SampleContract'))
493        self.assertEqual(len(errs),1)
494        errs, inv_errs, conv_dict = self.processor.checkConversion(
495            dict(contract_id='CON3', class_name='WrongContract'))
496        self.assertEqual(len(errs),1)
497
498    def test_import(self):
499        num, num_warns, fin_file, fail_file = self.processor.doImport(
500            self.csv_file, CONTRACT_HEADER_FIELDS,'create')
501        self.assertEqual(num_warns,3)
502        # contract_id must be unique
503        fail_file = open(fail_file).read()
504        self.assertEqual(fail_file,
505            'reg_number,valid_from,contract_id,title,class_name,valid_to,product_object,tc_dict,document_object,product_options,--ERRORS--\r\n'
506            '2,,c5,,SampleContract,,SAM,,DOC1,[],This object already exists. Skipping.\r\n'
507            '3,,c6,,SampleContract,,SAM,,NONEXISTENT,[],document_object: Invalid value\r\n'
508            '3,,c7,,SampleContract,,NONEXISTENT,,DOC1,[],product_object: Invalid value\r\n'
509            )
510        contract = self.processor.getEntry(dict(reg_number='1',
511            contract_id='c3'), self.app)
512        self.assertEqual(
513            self.app['customers']['X666666']['contracts']['c3'],
514            contract)
515        self.assertEqual(contract.product_options[0].title, 'Base Fee')
516        self.assertEqual(contract.product_options[0].fee, Decimal('800.6') )
517        self.assertEqual(contract.product_options[0].currency, 'USD')
518        self.assertEqual(contract.contract_id, 'c3')
519        self.assertEqual(contract.title, u'Nice Contract')
520        self.assertEqual(contract.tc_dict, {'en':'Hello World'})
521        contract = self.processor.getEntry(dict(reg_number='3',
522            contract_id='c5'), self.app)
523        shutil.rmtree(os.path.dirname(fin_file))
524        logcontent = open(self.logfile).read()
525        # Logging message from updateEntry
526        # Fetch uuid
527        conid = [i for i in self.app['customers']['X666666']['contracts'].keys()
528            if len(i) > 10][0]
529        self.assertTrue(
530            'INFO - system - Customer Sample Contract Processor - sample_contract_data - '
531            'X666666 - %s - updated: valid_from=, title=, valid_to=, product_object=SAM, tc_dict=, document_object=DOC1, '
532            'product_options=[]' % conid
533            in logcontent)
534
535    def test_import_update(self):
536        # We perform the same import twice,
537        # the second time in update mode. The number
538        # of warnings must be the same.
539        num, num_warns, fin_file, fail_file = self.processor.doImport(
540            self.csv_file, CONTRACT_HEADER_FIELDS,'create')
541        shutil.rmtree(os.path.dirname(fin_file))
542        num, num_warns, fin_file, fail_file = self.processor.doImport(
543            self.csv_file, CONTRACT_HEADER_FIELDS,'update')
544        # There are one record without contract_id and two
545        # records with wrong object identifier.
546        self.assertEqual(num_warns,3)
547        shutil.rmtree(os.path.dirname(fin_file))
548
549    def test_import_remove(self):
550        # We perform the same import twice,
551        # the second time in remove mode. The number
552        # of warnings must be the same.
553        num, num_warns, fin_file, fail_file = self.processor.doImport(
554            self.csv_file, CONTRACT_HEADER_FIELDS,'create')
555        shutil.rmtree(os.path.dirname(fin_file))
556        num, num_warns, fin_file, fail_file = self.processor.doImport(
557            self.csv_file, CONTRACT_HEADER_FIELDS,'remove')
558        # There are one record without contract_id, one duplicate and two
559        # records with wrong object identifier.
560        self.assertEqual(num_warns,4)
561        shutil.rmtree(os.path.dirname(fin_file))
562        logcontent = open(self.logfile).read()
563        self.assertTrue(
564            'INFO - system - K1000001 - Contract removed: c5'
565            in logcontent)
Note: See TracBrowser for help on using the repository browser.