Ignore:
Timestamp:
3 Dec 2014, 10:49:52 (10 years ago)
Author:
Henrik Bettermann
Message:

Revert changes from last revision. This was an accidental commit.

Location:
main/waeup.ikoba/trunk/src/waeup/ikoba/customers
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • main/waeup.ikoba/trunk/src/waeup/ikoba/customers/batching.py

    r12123 r12124  
    555555                errs.append(('contract_id','id exists'))
    556556        else:
    557             if not contract_id.startswith('c'):
     557            if not contract_id.startswith('a'):
    558558                errs.append(('contract_id','invalid format'))
    559559        return errs, inv_errs, conv_dict
  • main/waeup.ikoba/trunk/src/waeup/ikoba/customers/tests/test_batching.py

    r12123 r12124  
    3636from waeup.ikoba.interfaces import IBatchProcessor, FatalCSVError, IUserAccount
    3737from waeup.ikoba.customers.batching import (
    38     CustomerProcessor, CustomerDocumentProcessor, ContractProcessor)
     38    CustomerProcessor, CustomerDocumentProcessor)
    3939from waeup.ikoba.customers.customer import Customer
    4040from waeup.ikoba.customers.documents import CustomerSampleDocument
     
    6868
    6969DOCUMENT_HEADER_FIELDS = DOCUMENT_SAMPLE_DATA.split(
    70     '\n')[0].split(',')
    71 
    72 CONTRACT_SAMPLE_DATA = open(
    73     os.path.join(os.path.dirname(__file__), 'sample_contract_data.csv'),
    74     'rb').read()
    75 
    76 CONTRACT_HEADER_FIELDS = CONTRACT_SAMPLE_DATA.split(
    7770    '\n')[0].split(',')
    7871
     
    9083        self.app = self.getRootFolder()['app']
    9184        setSite(app)
     85
    9286        self.logfile = os.path.join(
    9387            self.app['datacenter'].storage, 'logs', 'customers.log')
    94 
    95         # Add some products
    96         self.product = createObject('waeup.Product')
    97         self.product.product_id = u'SAM'
    98         self.product.title = u'Our Samle Product'
    99         self.product.contract_category = u'sample'
    100         self.app['products'].addProduct(self.product)
    101 
    102         # Add customer
    103         customer = Customer()
    104         customer.firstname = u'Anna'
    105         customer.lastname = u'Tester'
    106         customer.reg_number = u'123'
    107         customer.date_of_birth = datetime.date(1981, 2, 4)
    108         customer.email = 'anna@sample.com'
    109         customer.phone = u'+234-123-12345'
    110         customer.notice = u'Some notice\nin lines.'
    111         customer.nationality = u'NG'
    112         self.app['customers'].addCustomer(customer)
    113         self.customer = customer
    114         #notify(grok.ObjectModifiedEvent(customer))
    115 
    116         # Add document
    117         document = createObject(u'waeup.CustomerSampleDocument')
    118         document.document_id = 'd120'
    119         document.title = u'My Document'
    120         customer['documents'].addDocument(document)
    12188        return
    12289
     
    161128    def setUp(self):
    162129        super(CustomerProcessorTest, self).setUp()
     130
     131        # Add customer with subobjects
     132        customer = Customer()
     133        self.app['customers'].addCustomer(customer)
     134        customer = self.setup_customer(customer)
     135        notify(grok.ObjectModifiedEvent(customer))
     136        self.customer = self.app['customers'][customer.customer_id]
     137
    163138        self.processor = CustomerProcessor()
    164139        self.csv_file = os.path.join(self.workdir, 'sample_customer_data.csv')
     
    280255        super(CustomerDocumentProcessorTest, self).setUp()
    281256
     257        # Add customer with document
     258        customer = Customer()
     259        customer.firstname = u'Anna'
     260        customer.lastname = u'Tester'
     261        customer.reg_number = u'123'
     262        self.app['customers'].addCustomer(customer)
     263        self.customer = self.app['customers'][customer.customer_id]
     264        document = createObject(u'waeup.CustomerSampleDocument')
     265        document.document_id = 'd120'
     266        self.customer['documents'][document.document_id] = document
     267
    282268        # Import customers with subobjects
    283269        customer_file = os.path.join(self.workdir, 'sample_customer_data.csv')
     
    321307        self.assertEqual(len(self.customer['documents'].keys()),1)
    322308        document1 = createObject(u'waeup.CustomerSampleDocument')
    323         document1.document_id = 'd234'
     309        document1.document_id = 'p234'
    324310        self.processor.addEntry(
    325             document1, dict(customer_id=self.customer.customer_id, document_id='d234'),
     311            document1, dict(customer_id=self.customer.customer_id, document_id='p234'),
    326312            self.app)
    327313        self.assertEqual(len(self.customer['documents'].keys()),2)
    328         self.assertEqual(self.customer['documents']['d234'].document_id, 'd234')
     314        self.assertEqual(self.customer['documents']['p234'].document_id, 'p234')
    329315        document2 = createObject(u'waeup.CustomerSampleDocument')
    330316        document1.document_id = 'nonsense'
     
    332318    def test_checkConversion(self):
    333319        errs, inv_errs, conv_dict = self.processor.checkConversion(
    334             dict(document_id='d126', class_name='CustomerSampleDocument'))
     320            dict(document_id='d1266236341955', class_name='CustomerSampleDocument'))
    335321        self.assertEqual(len(errs),0)
    336322        errs, inv_errs, conv_dict = self.processor.checkConversion(
    337323            dict(document_id='nonsense', class_name='CustomerSampleDocument'))
    338324        self.assertEqual(len(errs),1)
    339         errs, inv_errs, conv_dict = self.processor.checkConversion(
    340             dict(document_id='d127', class_name='WrongDocument'))
     325        timestamp = ("%d" % int(time()*10000))[1:]
     326        document_id = "d%s" % timestamp
     327        errs, inv_errs, conv_dict = self.processor.checkConversion(
     328            dict(document_id=document_id, class_name='CustomerSampleDocument'))
     329        self.assertEqual(len(errs),0)
     330        errs, inv_errs, conv_dict = self.processor.checkConversion(
     331            dict(document_id=document_id, class_name='WrongDocument'))
    341332        self.assertEqual(len(errs),1)
    342333
     
    396387            'INFO - system - K1000001 - Document removed: d5'
    397388            in logcontent)
    398 
    399 
    400 class ContractProcessorTest(CustomerImportExportSetup):
    401 
    402     def setUp(self):
    403         super(ContractProcessorTest, self).setUp()
    404 
    405         # Import customers with subobjects
    406         customer_file = os.path.join(self.workdir, 'sample_customer_data.csv')
    407         open(customer_file, 'wb').write(CUSTOMER_SAMPLE_DATA)
    408         num, num_warns, fin_file, fail_file = CustomerProcessor().doImport(
    409             customer_file, CUSTOMER_HEADER_FIELDS)
    410         shutil.rmtree(os.path.dirname(fin_file))
    411 
    412         self.processor = ContractProcessor()
    413         self.csv_file = os.path.join(
    414             self.workdir, 'sample_contract_data.csv')
    415         open(self.csv_file, 'wb').write(CONTRACT_SAMPLE_DATA)
    416 
    417     def test_interface(self):
    418         # Make sure we fulfill the interface contracts.
    419         assert verifyObject(IBatchProcessor, self.processor) is True
    420         assert verifyClass(
    421             IBatchProcessor, ContractProcessor) is True
    422 
    423     def test_getEntry(self):
    424         assert self.processor.getEntry(
    425             dict(customer_id='ID_NONE', contract_id='nonsense'), self.app) is None
    426         assert self.processor.getEntry(
    427             dict(customer_id=self.customer.customer_id, contract_id='c120'),
    428             self.app) is self.customer['contracts']['c120']
    429 
    430     def test_delEntry(self):
    431         assert self.processor.getEntry(
    432             dict(customer_id=self.customer.customer_id, contract_id='c120'),
    433             self.app) is self.customer['contracts']['c120']
    434         self.assertEqual(len(self.customer['contracts'].keys()),1)
    435         self.processor.delEntry(
    436             dict(customer_id=self.customer.customer_id, contract_id='c120'),
    437             self.app)
    438         assert self.processor.getEntry(
    439             dict(customer_id=self.customer.customer_id, contract_id='c120'),
    440             self.app) is None
    441         self.assertEqual(len(self.customer['contracts'].keys()),0)
    442 
    443     def test_addEntry(self):
    444         self.assertEqual(len(self.customer['contracts'].keys()),1)
    445         contract1 = createObject(u'waeup.SampleContract')
    446         contract1.contract_id = 'c234'
    447         self.processor.addEntry(
    448             contract1, dict(customer_id=self.customer.customer_id, contract_id='c234'),
    449             self.app)
    450         self.assertEqual(len(self.customer['contracts'].keys()),2)
    451         self.assertEqual(self.customer['contracts']['c234'].contract_id, 'c234')
    452         contract2 = createObject(u'waeup.SampleContract')
    453         contract1.contract_id = 'nonsense'
    454 
    455     def test_checkConversion(self):
    456         errs, inv_errs, conv_dict = self.processor.checkConversion(
    457             dict(contract_id='c126', class_name='SampleContract'))
    458         self.assertEqual(len(errs),0)
    459         errs, inv_errs, conv_dict = self.processor.checkConversion(
    460             dict(contract_id='nonsense', class_name='SampleContract'))
    461         self.assertEqual(len(errs),1)
    462         errs, inv_errs, conv_dict = self.processor.checkConversion(
    463             dict(contract_id='c127', class_name='WrongContract'))
    464         self.assertEqual(len(errs),1)
    465 
    466     def xxtest_import(self):
    467         num, num_warns, fin_file, fail_file = self.processor.doImport(
    468             self.csv_file, CONTRACT_HEADER_FIELDS,'create')
    469         self.assertEqual(num_warns,1)
    470         # contract_id must be unique
    471         fail_file = open(fail_file).read()
    472         self.assertEqual(fail_file,
    473             'class_name,reg_number,contract_id,title,--ERRORS--\r\n'
    474             'SampleContract,2,d5,My stolen doc,contract_id: id exists\r\n')
    475         contract = self.processor.getEntry(dict(reg_number='1',
    476             contract_id='d3'), self.app)
    477         self.assertEqual(
    478             self.app['customers']['X666666']['contracts']['d3'],
    479             contract)
    480         self.assertEqual(contract.contract_id, 'd3')
    481         contract = self.processor.getEntry(dict(reg_number='3',
    482             contract_id='d5'), self.app)
    483         shutil.rmtree(os.path.dirname(fin_file))
    484         logcontent = open(self.logfile).read()
    485         # Logging message from updateEntry
    486         self.assertTrue(
    487             'INFO - system - Contract Processor - '
    488             'sample_contract_data - X666666 - updated: '
    489             'contract_id=d3, title=My first doc'
    490             in logcontent)
    491 
    492     def xxtest_import_update(self):
    493         # We perform the same import twice,
    494         # the second time in update mode. The number
    495         # of warnings must be the same.
    496         num, num_warns, fin_file, fail_file = self.processor.doImport(
    497             self.csv_file, CONTRACT_HEADER_FIELDS,'create')
    498         shutil.rmtree(os.path.dirname(fin_file))
    499         num, num_warns, fin_file, fail_file = self.processor.doImport(
    500             self.csv_file, CONTRACT_HEADER_FIELDS,'update')
    501         # There is one record without contract_id and one duplicate
    502         self.assertEqual(num_warns,2)
    503         shutil.rmtree(os.path.dirname(fin_file))
    504 
    505     def xxtest_import_remove(self):
    506         # We perform the same import twice,
    507         # the second time in remove mode. The number
    508         # of warnings must be the same.
    509         num, num_warns, fin_file, fail_file = self.processor.doImport(
    510             self.csv_file, CONTRACT_HEADER_FIELDS,'create')
    511         shutil.rmtree(os.path.dirname(fin_file))
    512         num, num_warns, fin_file, fail_file = self.processor.doImport(
    513             self.csv_file, CONTRACT_HEADER_FIELDS,'remove')
    514         # There is one record without contract_id and one duplicate
    515         self.assertEqual(num_warns,2)
    516         shutil.rmtree(os.path.dirname(fin_file))
    517         logcontent = open(self.logfile).read()
    518         self.assertTrue(
    519             'INFO - system - K1000001 - Contract removed: d5'
    520             in logcontent)
Note: See TracChangeset for help on using the changeset viewer.