Changeset 12125


Ignore:
Timestamp:
3 Dec 2014, 12:18:21 (10 years ago)
Author:
Henrik Bettermann
Message:

Add batch processor for contracts.

Location:
main/waeup.ikoba/trunk/src/waeup/ikoba/customers
Files:
1 added
2 edited

Legend:

Unmodified
Added
Removed
  • main/waeup.ikoba/trunk/src/waeup/ikoba/customers/batching.py

    r12124 r12125  
    4141from waeup.ikoba.interfaces import MessageFactory as _
    4242from waeup.ikoba.documents.utils import generate_document_id
     43from waeup.ikoba.customers.utils import generate_contract_id
    4344from waeup.ikoba.customers.interfaces import (
    4445    ICustomer, ICustomerUpdateByRegNo,
     
    357358        raw_header = reader.next()
    358359        for num, field in enumerate(headerfields):
    359             if field not in ['customer_id', 'reg_number', 'document_id', 'code', 'level'
     360            if field not in ['customer_id', 'reg_number', 'document_id',
     361                             'code', 'level', 'contract_id'
    360362                             ] and mode == 'remove':
    361363                continue
     
    555557                errs.append(('contract_id','id exists'))
    556558        else:
    557             if not contract_id.startswith('a'):
     559            if not contract_id.startswith('c'):
    558560                errs.append(('contract_id','invalid format'))
    559561        return errs, inv_errs, conv_dict
  • main/waeup.ikoba/trunk/src/waeup/ikoba/customers/tests/test_batching.py

    r12124 r12125  
    3636from waeup.ikoba.interfaces import IBatchProcessor, FatalCSVError, IUserAccount
    3737from waeup.ikoba.customers.batching import (
    38     CustomerProcessor, CustomerDocumentProcessor)
     38    CustomerProcessor, CustomerDocumentProcessor, ContractProcessor)
    3939from waeup.ikoba.customers.customer import Customer
    4040from waeup.ikoba.customers.documents import CustomerSampleDocument
     
    6868
    6969DOCUMENT_HEADER_FIELDS = DOCUMENT_SAMPLE_DATA.split(
     70    '\n')[0].split(',')
     71
     72CONTRACT_SAMPLE_DATA = open(
     73    os.path.join(os.path.dirname(__file__), 'sample_contract_data.csv'),
     74    'rb').read()
     75
     76CONTRACT_HEADER_FIELDS = CONTRACT_SAMPLE_DATA.split(
    7077    '\n')[0].split(',')
    7178
     
    8390        self.app = self.getRootFolder()['app']
    8491        setSite(app)
    85 
    8692        self.logfile = os.path.join(
    8793            self.app['datacenter'].storage, 'logs', 'customers.log')
     94
     95        #Add some products
     96        self.product = createObject('waeup.Product')
     97        self.product.product_id = u'SAM'
     98        self.product.title = u'Our Samle Product'
     99        self.product.contract_category = u'sample'
     100        self.app['products'].addProduct(self.product)
    88101        return
    89102
     
    103116
    104117    def setup_customer(self, customer):
    105         # set predictable values for `customer`
    106118        customer.matric_number = u'234'
    107119        customer.perm_address = u'Customerroad 21\nLagos 123456\n'
     
    116128        customer.notice = u'Some notice\nin lines.'
    117129        customer.nationality = u'NG'
    118 
    119         document = CustomerSampleDocument()
     130        # Add document
     131        document = createObject('waeup.CustomerSampleDocument')
    120132        document.title = u'My Document'
    121133        customer['documents'].addDocument(document)
     134        self.document = document
     135        # Add contract
     136        contract = createObject('waeup.SampleContract')
     137        contract.title = u'My Contract'
     138        customer['contracts'].addContract(contract)
     139        self.contract = contract
    122140        return customer
    123141
     
    255273        super(CustomerDocumentProcessorTest, self).setUp()
    256274
    257         # Add customer with document
    258275        customer = Customer()
    259         customer.firstname = u'Anna'
    260         customer.lastname = u'Tester'
    261         customer.reg_number = u'123'
    262276        self.app['customers'].addCustomer(customer)
     277        customer = self.setup_customer(customer)
     278        notify(grok.ObjectModifiedEvent(customer))
    263279        self.customer = self.app['customers'][customer.customer_id]
    264         document = createObject(u'waeup.CustomerSampleDocument')
    265         document.document_id = 'd120'
    266         self.customer['documents'][document.document_id] = document
    267280
    268281        # Import customers with subobjects
     
    288301            dict(customer_id='ID_NONE', document_id='nonsense'), self.app) is None
    289302        assert self.processor.getEntry(
    290             dict(customer_id=self.customer.customer_id, document_id='d120'),
    291             self.app) is self.customer['documents']['d120']
     303            dict(customer_id=self.customer.customer_id, document_id='d101'),
     304            self.app) is self.customer['documents']['d101']
    292305
    293306    def test_delEntry(self):
    294307        assert self.processor.getEntry(
    295             dict(customer_id=self.customer.customer_id, document_id='d120'),
    296             self.app) is self.customer['documents']['d120']
     308            dict(customer_id=self.customer.customer_id, document_id='d101'),
     309            self.app) is self.customer['documents']['d101']
    297310        self.assertEqual(len(self.customer['documents'].keys()),1)
    298311        self.processor.delEntry(
    299             dict(customer_id=self.customer.customer_id, document_id='d120'),
     312            dict(customer_id=self.customer.customer_id, document_id='d101'),
    300313            self.app)
    301314        assert self.processor.getEntry(
    302             dict(customer_id=self.customer.customer_id, document_id='d120'),
     315            dict(customer_id=self.customer.customer_id, document_id='d101'),
    303316            self.app) is None
    304317        self.assertEqual(len(self.customer['documents'].keys()),0)
     
    307320        self.assertEqual(len(self.customer['documents'].keys()),1)
    308321        document1 = createObject(u'waeup.CustomerSampleDocument')
    309         document1.document_id = 'p234'
     322        document1.document_id = 'd234'
    310323        self.processor.addEntry(
    311             document1, dict(customer_id=self.customer.customer_id, document_id='p234'),
     324            document1, dict(customer_id=self.customer.customer_id, document_id='d234'),
    312325            self.app)
    313326        self.assertEqual(len(self.customer['documents'].keys()),2)
    314         self.assertEqual(self.customer['documents']['p234'].document_id, 'p234')
     327        self.assertEqual(self.customer['documents']['d234'].document_id, 'd234')
    315328        document2 = createObject(u'waeup.CustomerSampleDocument')
    316329        document1.document_id = 'nonsense'
     
    318331    def test_checkConversion(self):
    319332        errs, inv_errs, conv_dict = self.processor.checkConversion(
    320             dict(document_id='d1266236341955', class_name='CustomerSampleDocument'))
     333            dict(document_id='d126', class_name='CustomerSampleDocument'))
    321334        self.assertEqual(len(errs),0)
    322335        errs, inv_errs, conv_dict = self.processor.checkConversion(
    323336            dict(document_id='nonsense', class_name='CustomerSampleDocument'))
    324337        self.assertEqual(len(errs),1)
    325         timestamp = ("%d" % int(time()*10000))[1:]
    326         document_id = "d%s" % timestamp
    327         errs, inv_errs, conv_dict = self.processor.checkConversion(
    328             dict(document_id=document_id, class_name='CustomerSampleDocument'))
    329         self.assertEqual(len(errs),0)
    330         errs, inv_errs, conv_dict = self.processor.checkConversion(
    331             dict(document_id=document_id, class_name='WrongDocument'))
     338        errs, inv_errs, conv_dict = self.processor.checkConversion(
     339            dict(document_id='d127', class_name='WrongDocument'))
     340        self.assertEqual(len(errs),1)
     341        errs, inv_errs, conv_dict = self.processor.checkConversion(
     342            dict(document_id='', class_name='CustomerSampleDocument',
     343            mode='update'))
    332344        self.assertEqual(len(errs),1)
    333345
     
    387399            'INFO - system - K1000001 - Document removed: d5'
    388400            in logcontent)
     401
     402
     403class ContractProcessorTest(CustomerImportExportSetup):
     404
     405    def setUp(self):
     406        super(ContractProcessorTest, self).setUp()
     407
     408        customer = Customer()
     409        self.app['customers'].addCustomer(customer)
     410        customer = self.setup_customer(customer)
     411        notify(grok.ObjectModifiedEvent(customer))
     412        self.customer = self.app['customers'][customer.customer_id]
     413
     414        # Import customers with subobjects
     415        customer_file = os.path.join(self.workdir, 'sample_customer_data.csv')
     416        open(customer_file, 'wb').write(CUSTOMER_SAMPLE_DATA)
     417        num, num_warns, fin_file, fail_file = CustomerProcessor().doImport(
     418            customer_file, CUSTOMER_HEADER_FIELDS)
     419        shutil.rmtree(os.path.dirname(fin_file))
     420
     421        self.processor = ContractProcessor()
     422        self.csv_file = os.path.join(
     423            self.workdir, 'sample_contract_data.csv')
     424        open(self.csv_file, 'wb').write(CONTRACT_SAMPLE_DATA)
     425
     426    def test_interface(self):
     427        # Make sure we fulfill the interface contracts.
     428        assert verifyObject(IBatchProcessor, self.processor) is True
     429        assert verifyClass(
     430            IBatchProcessor, ContractProcessor) is True
     431
     432    def test_getEntry(self):
     433        assert self.processor.getEntry(
     434            dict(customer_id='ID_NONE', contract_id='nonsense'), self.app) is None
     435        assert self.processor.getEntry(
     436            dict(customer_id=self.customer.customer_id, contract_id='c101'),
     437            self.app) is self.customer['contracts']['c101']
     438
     439    def test_delEntry(self):
     440        assert self.processor.getEntry(
     441            dict(customer_id=self.customer.customer_id, contract_id='c101'),
     442            self.app) is self.customer['contracts']['c101']
     443        self.assertEqual(len(self.customer['contracts'].keys()),1)
     444        self.processor.delEntry(
     445            dict(customer_id=self.customer.customer_id, contract_id='c101'),
     446            self.app)
     447        assert self.processor.getEntry(
     448            dict(customer_id=self.customer.customer_id, contract_id='c101'),
     449            self.app) is None
     450        self.assertEqual(len(self.customer['contracts'].keys()),0)
     451
     452    def test_addEntry(self):
     453        self.assertEqual(len(self.customer['contracts'].keys()),1)
     454        contract1 = createObject(u'waeup.SampleContract')
     455        contract1.contract_id = 'c234'
     456        self.processor.addEntry(
     457            contract1, dict(customer_id=self.customer.customer_id, contract_id='c234'),
     458            self.app)
     459        self.assertEqual(len(self.customer['contracts'].keys()),2)
     460        self.assertEqual(self.customer['contracts']['c234'].contract_id, 'c234')
     461        contract2 = createObject(u'waeup.SampleContract')
     462        contract1.contract_id = 'nonsense'
     463
     464    def test_checkConversion(self):
     465        errs, inv_errs, conv_dict = self.processor.checkConversion(
     466            dict(contract_id='c126', class_name='SampleContract'))
     467        self.assertEqual(len(errs),0)
     468        errs, inv_errs, conv_dict = self.processor.checkConversion(
     469            dict(contract_id='nonsense', class_name='SampleContract'))
     470        self.assertEqual(len(errs),1)
     471        errs, inv_errs, conv_dict = self.processor.checkConversion(
     472            dict(contract_id='c127', class_name='WrongContract'))
     473        self.assertEqual(len(errs),1)
     474
     475    def test_import(self):
     476        num, num_warns, fin_file, fail_file = self.processor.doImport(
     477            self.csv_file, CONTRACT_HEADER_FIELDS,'create')
     478        self.assertEqual(num_warns,1)
     479        # contract_id must be unique
     480        fail_file = open(fail_file).read()
     481        self.assertEqual(fail_file,
     482            'class_name,reg_number,contract_id,title,--ERRORS--\r\n'
     483            'SampleContract,2,c5,My stolen contract,contract_id: id exists\r\n')
     484        contract = self.processor.getEntry(dict(reg_number='1',
     485            contract_id='c3'), self.app)
     486        self.assertEqual(
     487            self.app['customers']['X666666']['contracts']['c3'],
     488            contract)
     489        self.assertEqual(contract.contract_id, 'c3')
     490        contract = self.processor.getEntry(dict(reg_number='3',
     491            contract_id='c5'), self.app)
     492        shutil.rmtree(os.path.dirname(fin_file))
     493        logcontent = open(self.logfile).read()
     494        # Logging message from updateEntry
     495        self.assertTrue(
     496            'INFO - system - Contract Processor - '
     497            'sample_contract_data - X666666 - updated: '
     498            'contract_id=c3, title=My first contract'
     499            in logcontent)
     500
     501    def test_import_update(self):
     502        # We perform the same import twice,
     503        # the second time in update mode. The number
     504        # of warnings must be the same.
     505        num, num_warns, fin_file, fail_file = self.processor.doImport(
     506            self.csv_file, CONTRACT_HEADER_FIELDS,'create')
     507        shutil.rmtree(os.path.dirname(fin_file))
     508        num, num_warns, fin_file, fail_file = self.processor.doImport(
     509            self.csv_file, CONTRACT_HEADER_FIELDS,'update')
     510        # There is one record without contract_id and one duplicate
     511        self.assertEqual(num_warns,2)
     512        shutil.rmtree(os.path.dirname(fin_file))
     513
     514    def test_import_remove(self):
     515        # We perform the same import twice,
     516        # the second time in remove mode. The number
     517        # of warnings must be the same.
     518        num, num_warns, fin_file, fail_file = self.processor.doImport(
     519            self.csv_file, CONTRACT_HEADER_FIELDS,'create')
     520        shutil.rmtree(os.path.dirname(fin_file))
     521        num, num_warns, fin_file, fail_file = self.processor.doImport(
     522            self.csv_file, CONTRACT_HEADER_FIELDS,'remove')
     523        # There is one record without contract_id and one duplicate
     524        self.assertEqual(num_warns,2)
     525        shutil.rmtree(os.path.dirname(fin_file))
     526        logcontent = open(self.logfile).read()
     527        self.assertTrue(
     528            'INFO - system - K1000001 - Contract removed: c5'
     529            in logcontent)
Note: See TracChangeset for help on using the changeset viewer.