Ignore:
Timestamp:
3 Dec 2014, 10:25:50 (10 years ago)
Author:
Henrik Bettermann
Message:
 
Location:
main/waeup.ikoba/trunk/src/waeup/ikoba/customers
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • main/waeup.ikoba/trunk/src/waeup/ikoba/customers/batching.py

    r12097 r12123  
    555555                errs.append(('contract_id','id exists'))
    556556        else:
    557             if not contract_id.startswith('a'):
     557            if not contract_id.startswith('c'):
    558558                errs.append(('contract_id','invalid format'))
    559559        return errs, inv_errs, conv_dict
  • main/waeup.ikoba/trunk/src/waeup/ikoba/customers/tests/test_batching.py

    r12057 r12123  
    3636from waeup.ikoba.interfaces import IBatchProcessor, FatalCSVError, IUserAccount
    3737from waeup.ikoba.customers.batching import (
    38     CustomerProcessor, CustomerDocumentProcessor)
     38    CustomerProcessor, CustomerDocumentProcessor, ContractProcessor)
    3939from waeup.ikoba.customers.customer import Customer
    4040from waeup.ikoba.customers.documents import CustomerSampleDocument
     
    6868
    6969DOCUMENT_HEADER_FIELDS = DOCUMENT_SAMPLE_DATA.split(
     70    '\n')[0].split(',')
     71
     72CONTRACT_SAMPLE_DATA = open(
     73    os.path.join(os.path.dirname(__file__), 'sample_contract_data.csv'),
     74    'rb').read()
     75
     76CONTRACT_HEADER_FIELDS = CONTRACT_SAMPLE_DATA.split(
    7077    '\n')[0].split(',')
    7178
     
    8390        self.app = self.getRootFolder()['app']
    8491        setSite(app)
    85 
    8692        self.logfile = os.path.join(
    8793            self.app['datacenter'].storage, 'logs', 'customers.log')
     94
     95        # Add some products
     96        self.product = createObject('waeup.Product')
     97        self.product.product_id = u'SAM'
     98        self.product.title = u'Our Samle Product'
     99        self.product.contract_category = u'sample'
     100        self.app['products'].addProduct(self.product)
     101
     102        # Add customer
     103        customer = Customer()
     104        customer.firstname = u'Anna'
     105        customer.lastname = u'Tester'
     106        customer.reg_number = u'123'
     107        customer.date_of_birth = datetime.date(1981, 2, 4)
     108        customer.email = 'anna@sample.com'
     109        customer.phone = u'+234-123-12345'
     110        customer.notice = u'Some notice\nin lines.'
     111        customer.nationality = u'NG'
     112        self.app['customers'].addCustomer(customer)
     113        self.customer = customer
     114        #notify(grok.ObjectModifiedEvent(customer))
     115
     116        # Add document
     117        document = createObject(u'waeup.CustomerSampleDocument')
     118        document.document_id = 'd120'
     119        document.title = u'My Document'
     120        customer['documents'].addDocument(document)
    88121        return
    89122
     
    128161    def setUp(self):
    129162        super(CustomerProcessorTest, self).setUp()
    130 
    131         # Add customer with subobjects
    132         customer = Customer()
    133         self.app['customers'].addCustomer(customer)
    134         customer = self.setup_customer(customer)
    135         notify(grok.ObjectModifiedEvent(customer))
    136         self.customer = self.app['customers'][customer.customer_id]
    137 
    138163        self.processor = CustomerProcessor()
    139164        self.csv_file = os.path.join(self.workdir, 'sample_customer_data.csv')
     
    255280        super(CustomerDocumentProcessorTest, self).setUp()
    256281
    257         # Add customer with document
    258         customer = Customer()
    259         customer.firstname = u'Anna'
    260         customer.lastname = u'Tester'
    261         customer.reg_number = u'123'
    262         self.app['customers'].addCustomer(customer)
    263         self.customer = self.app['customers'][customer.customer_id]
    264         document = createObject(u'waeup.CustomerSampleDocument')
    265         document.document_id = 'd120'
    266         self.customer['documents'][document.document_id] = document
    267 
    268282        # Import customers with subobjects
    269283        customer_file = os.path.join(self.workdir, 'sample_customer_data.csv')
     
    307321        self.assertEqual(len(self.customer['documents'].keys()),1)
    308322        document1 = createObject(u'waeup.CustomerSampleDocument')
    309         document1.document_id = 'p234'
     323        document1.document_id = 'd234'
    310324        self.processor.addEntry(
    311             document1, dict(customer_id=self.customer.customer_id, document_id='p234'),
     325            document1, dict(customer_id=self.customer.customer_id, document_id='d234'),
    312326            self.app)
    313327        self.assertEqual(len(self.customer['documents'].keys()),2)
    314         self.assertEqual(self.customer['documents']['p234'].document_id, 'p234')
     328        self.assertEqual(self.customer['documents']['d234'].document_id, 'd234')
    315329        document2 = createObject(u'waeup.CustomerSampleDocument')
    316330        document1.document_id = 'nonsense'
     
    318332    def test_checkConversion(self):
    319333        errs, inv_errs, conv_dict = self.processor.checkConversion(
    320             dict(document_id='d1266236341955', class_name='CustomerSampleDocument'))
     334            dict(document_id='d126', class_name='CustomerSampleDocument'))
    321335        self.assertEqual(len(errs),0)
    322336        errs, inv_errs, conv_dict = self.processor.checkConversion(
    323337            dict(document_id='nonsense', class_name='CustomerSampleDocument'))
    324338        self.assertEqual(len(errs),1)
    325         timestamp = ("%d" % int(time()*10000))[1:]
    326         document_id = "d%s" % timestamp
    327         errs, inv_errs, conv_dict = self.processor.checkConversion(
    328             dict(document_id=document_id, class_name='CustomerSampleDocument'))
    329         self.assertEqual(len(errs),0)
    330         errs, inv_errs, conv_dict = self.processor.checkConversion(
    331             dict(document_id=document_id, class_name='WrongDocument'))
     339        errs, inv_errs, conv_dict = self.processor.checkConversion(
     340            dict(document_id='d127', class_name='WrongDocument'))
    332341        self.assertEqual(len(errs),1)
    333342
     
    387396            'INFO - system - K1000001 - Document removed: d5'
    388397            in logcontent)
     398
     399
     400class ContractProcessorTest(CustomerImportExportSetup):
     401
     402    def setUp(self):
     403        super(ContractProcessorTest, self).setUp()
     404
     405        # Import customers with subobjects
     406        customer_file = os.path.join(self.workdir, 'sample_customer_data.csv')
     407        open(customer_file, 'wb').write(CUSTOMER_SAMPLE_DATA)
     408        num, num_warns, fin_file, fail_file = CustomerProcessor().doImport(
     409            customer_file, CUSTOMER_HEADER_FIELDS)
     410        shutil.rmtree(os.path.dirname(fin_file))
     411
     412        self.processor = ContractProcessor()
     413        self.csv_file = os.path.join(
     414            self.workdir, 'sample_contract_data.csv')
     415        open(self.csv_file, 'wb').write(CONTRACT_SAMPLE_DATA)
     416
     417    def test_interface(self):
     418        # Make sure we fulfill the interface contracts.
     419        assert verifyObject(IBatchProcessor, self.processor) is True
     420        assert verifyClass(
     421            IBatchProcessor, ContractProcessor) is True
     422
     423    def test_getEntry(self):
     424        assert self.processor.getEntry(
     425            dict(customer_id='ID_NONE', contract_id='nonsense'), self.app) is None
     426        assert self.processor.getEntry(
     427            dict(customer_id=self.customer.customer_id, contract_id='c120'),
     428            self.app) is self.customer['contracts']['c120']
     429
     430    def test_delEntry(self):
     431        assert self.processor.getEntry(
     432            dict(customer_id=self.customer.customer_id, contract_id='c120'),
     433            self.app) is self.customer['contracts']['c120']
     434        self.assertEqual(len(self.customer['contracts'].keys()),1)
     435        self.processor.delEntry(
     436            dict(customer_id=self.customer.customer_id, contract_id='c120'),
     437            self.app)
     438        assert self.processor.getEntry(
     439            dict(customer_id=self.customer.customer_id, contract_id='c120'),
     440            self.app) is None
     441        self.assertEqual(len(self.customer['contracts'].keys()),0)
     442
     443    def test_addEntry(self):
     444        self.assertEqual(len(self.customer['contracts'].keys()),1)
     445        contract1 = createObject(u'waeup.SampleContract')
     446        contract1.contract_id = 'c234'
     447        self.processor.addEntry(
     448            contract1, dict(customer_id=self.customer.customer_id, contract_id='c234'),
     449            self.app)
     450        self.assertEqual(len(self.customer['contracts'].keys()),2)
     451        self.assertEqual(self.customer['contracts']['c234'].contract_id, 'c234')
     452        contract2 = createObject(u'waeup.SampleContract')
     453        contract1.contract_id = 'nonsense'
     454
     455    def test_checkConversion(self):
     456        errs, inv_errs, conv_dict = self.processor.checkConversion(
     457            dict(contract_id='c126', class_name='SampleContract'))
     458        self.assertEqual(len(errs),0)
     459        errs, inv_errs, conv_dict = self.processor.checkConversion(
     460            dict(contract_id='nonsense', class_name='SampleContract'))
     461        self.assertEqual(len(errs),1)
     462        errs, inv_errs, conv_dict = self.processor.checkConversion(
     463            dict(contract_id='c127', class_name='WrongContract'))
     464        self.assertEqual(len(errs),1)
     465
     466    def xxtest_import(self):
     467        num, num_warns, fin_file, fail_file = self.processor.doImport(
     468            self.csv_file, CONTRACT_HEADER_FIELDS,'create')
     469        self.assertEqual(num_warns,1)
     470        # contract_id must be unique
     471        fail_file = open(fail_file).read()
     472        self.assertEqual(fail_file,
     473            'class_name,reg_number,contract_id,title,--ERRORS--\r\n'
     474            'SampleContract,2,d5,My stolen doc,contract_id: id exists\r\n')
     475        contract = self.processor.getEntry(dict(reg_number='1',
     476            contract_id='d3'), self.app)
     477        self.assertEqual(
     478            self.app['customers']['X666666']['contracts']['d3'],
     479            contract)
     480        self.assertEqual(contract.contract_id, 'd3')
     481        contract = self.processor.getEntry(dict(reg_number='3',
     482            contract_id='d5'), self.app)
     483        shutil.rmtree(os.path.dirname(fin_file))
     484        logcontent = open(self.logfile).read()
     485        # Logging message from updateEntry
     486        self.assertTrue(
     487            'INFO - system - Contract Processor - '
     488            'sample_contract_data - X666666 - updated: '
     489            'contract_id=d3, title=My first doc'
     490            in logcontent)
     491
     492    def xxtest_import_update(self):
     493        # We perform the same import twice,
     494        # the second time in update mode. The number
     495        # of warnings must be the same.
     496        num, num_warns, fin_file, fail_file = self.processor.doImport(
     497            self.csv_file, CONTRACT_HEADER_FIELDS,'create')
     498        shutil.rmtree(os.path.dirname(fin_file))
     499        num, num_warns, fin_file, fail_file = self.processor.doImport(
     500            self.csv_file, CONTRACT_HEADER_FIELDS,'update')
     501        # There is one record without contract_id and one duplicate
     502        self.assertEqual(num_warns,2)
     503        shutil.rmtree(os.path.dirname(fin_file))
     504
     505    def xxtest_import_remove(self):
     506        # We perform the same import twice,
     507        # the second time in remove mode. The number
     508        # of warnings must be the same.
     509        num, num_warns, fin_file, fail_file = self.processor.doImport(
     510            self.csv_file, CONTRACT_HEADER_FIELDS,'create')
     511        shutil.rmtree(os.path.dirname(fin_file))
     512        num, num_warns, fin_file, fail_file = self.processor.doImport(
     513            self.csv_file, CONTRACT_HEADER_FIELDS,'remove')
     514        # There is one record without contract_id and one duplicate
     515        self.assertEqual(num_warns,2)
     516        shutil.rmtree(os.path.dirname(fin_file))
     517        logcontent = open(self.logfile).read()
     518        self.assertTrue(
     519            'INFO - system - K1000001 - Contract removed: d5'
     520            in logcontent)
Note: See TracChangeset for help on using the changeset viewer.