source: main/waeup.ikoba/trunk/src/waeup/ikoba/customers/tests/test_batching.py @ 12076

Last change on this file since 12076 was 12057, checked in by Henrik Bettermann, 10 years ago

Use a baseclass for customer documents. That eases file viewlet configuration.

  • Property svn:keywords set to Id
File size: 16.2 KB
Line 
1# -*- coding: utf-8 -*-
2## $Id: test_batching.py 12057 2014-11-25 13:15:27Z henrik $
3##
4## Copyright (C) 2014 Uli Fouquet & Henrik Bettermann
5## This program is free software; you can redistribute it and/or modify
6## it under the terms of the GNU General Public License as published by
7## the Free Software Foundation; either version 2 of the License, or
8## (at your option) any later version.
9##
10## This program is distributed in the hope that it will be useful,
11## but WITHOUT ANY WARRANTY; without even the implied warranty of
12## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13## GNU General Public License for more details.
14##
15## You should have received a copy of the GNU General Public License
16## along with this program; if not, write to the Free Software
17## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18##
19"""Unit tests for customer-related data processors.
20"""
21import os
22import shutil
23import tempfile
24import unittest
25import datetime
26import grok
27from time import time
28from zope.event import notify
29from zope.component import createObject, queryUtility
30from zope.component.hooks import setSite, clearSite
31from zope.catalog.interfaces import ICatalog
32from zope.interface.verify import verifyClass, verifyObject
33from hurry.workflow.interfaces import IWorkflowState
34
35from waeup.ikoba.app import Company
36from waeup.ikoba.interfaces import IBatchProcessor, FatalCSVError, IUserAccount
37from waeup.ikoba.customers.batching import (
38    CustomerProcessor, CustomerDocumentProcessor)
39from waeup.ikoba.customers.customer import Customer
40from waeup.ikoba.customers.documents import CustomerSampleDocument
41from waeup.ikoba.testing import FunctionalLayer, FunctionalTestCase
42
43CUSTOMER_SAMPLE_DATA = open(
44    os.path.join(os.path.dirname(__file__), 'sample_customer_data.csv'),
45    'rb').read()
46
47CUSTOMER_HEADER_FIELDS = CUSTOMER_SAMPLE_DATA.split(
48    '\n')[0].split(',')
49
50CUSTOMER_SAMPLE_DATA_UPDATE = open(
51    os.path.join(os.path.dirname(__file__), 'sample_customer_data_update.csv'),
52    'rb').read()
53
54CUSTOMER_HEADER_FIELDS_UPDATE = CUSTOMER_SAMPLE_DATA_UPDATE.split(
55    '\n')[0].split(',')
56
57CUSTOMER_SAMPLE_DATA_DUPLICATES = open(
58    os.path.join(os.path.dirname(__file__),
59                 'sample_customer_data_duplicates.csv'),
60    'rb').read()
61
62CUSTOMER_HEADER_FIELDS_DUPLICATES = CUSTOMER_SAMPLE_DATA_DUPLICATES.split(
63    '\n')[0].split(',')
64
65DOCUMENT_SAMPLE_DATA = open(
66    os.path.join(os.path.dirname(__file__), 'sample_document_data.csv'),
67    'rb').read()
68
69DOCUMENT_HEADER_FIELDS = DOCUMENT_SAMPLE_DATA.split(
70    '\n')[0].split(',')
71
72class CustomerImportExportSetup(FunctionalTestCase):
73
74    layer = FunctionalLayer
75
76    def setUp(self):
77        super(CustomerImportExportSetup, self).setUp()
78        self.dc_root = tempfile.mkdtemp()
79        self.workdir = tempfile.mkdtemp()
80        app = Company()
81        app['datacenter'].setStoragePath(self.dc_root)
82        self.getRootFolder()['app'] = app
83        self.app = self.getRootFolder()['app']
84        setSite(app)
85
86        self.logfile = os.path.join(
87            self.app['datacenter'].storage, 'logs', 'customers.log')
88        return
89
90    def tearDown(self):
91        super(CustomerImportExportSetup, self).tearDown()
92        shutil.rmtree(self.workdir)
93        shutil.rmtree(self.dc_root)
94        clearSite()
95        return
96
97    def setup_for_export(self):
98        customer = Customer()
99        customer.customer_id = u'A111111'
100        self.app['customers'][customer.customer_id] = self.customer = customer
101        self.outfile = os.path.join(self.workdir, 'myoutput.csv')
102        return
103
104    def setup_customer(self, customer):
105        # set predictable values for `customer`
106        customer.matric_number = u'234'
107        customer.perm_address = u'Customerroad 21\nLagos 123456\n'
108        customer.reg_number = u'123'
109        customer.firstname = u'Anna'
110        customer.lastname = u'Tester'
111        customer.middlename = u'M.'
112        customer.date_of_birth = datetime.date(1981, 2, 4)
113        #customer.sex = 'f'
114        customer.email = 'anna@sample.com'
115        customer.phone = u'+234-123-12345'
116        customer.notice = u'Some notice\nin lines.'
117        customer.nationality = u'NG'
118
119        document = CustomerSampleDocument()
120        document.title = u'My Document'
121        customer['documents'].addDocument(document)
122        return customer
123
124class CustomerProcessorTest(CustomerImportExportSetup):
125
126    layer = FunctionalLayer
127
128    def setUp(self):
129        super(CustomerProcessorTest, self).setUp()
130
131        # Add customer with subobjects
132        customer = Customer()
133        self.app['customers'].addCustomer(customer)
134        customer = self.setup_customer(customer)
135        notify(grok.ObjectModifiedEvent(customer))
136        self.customer = self.app['customers'][customer.customer_id]
137
138        self.processor = CustomerProcessor()
139        self.csv_file = os.path.join(self.workdir, 'sample_customer_data.csv')
140        self.csv_file_update = os.path.join(
141            self.workdir, 'sample_customer_data_update.csv')
142        self.csv_file_duplicates = os.path.join(
143            self.workdir, 'sample_customer_data_duplicates.csv')
144        open(self.csv_file, 'wb').write(CUSTOMER_SAMPLE_DATA)
145        open(self.csv_file_update, 'wb').write(CUSTOMER_SAMPLE_DATA_UPDATE)
146        open(self.csv_file_duplicates, 'wb').write(CUSTOMER_SAMPLE_DATA_DUPLICATES)
147
148    def test_interface(self):
149        # Make sure we fulfill the interface contracts.
150        assert verifyObject(IBatchProcessor, self.processor) is True
151        assert verifyClass(
152            IBatchProcessor, CustomerProcessor) is True
153
154    def test_parentsExist(self):
155        self.assertFalse(self.processor.parentsExist(None, dict()))
156        self.assertTrue(self.processor.parentsExist(None, self.app))
157
158    def test_entryExists(self):
159        assert self.processor.entryExists(
160            dict(customer_id='ID_NONE'), self.app) is False
161        assert self.processor.entryExists(
162            dict(reg_number='123'), self.app) is True
163
164    def test_getParent(self):
165        parent = self.processor.getParent(None, self.app)
166        assert parent is self.app['customers']
167
168    def test_getEntry(self):
169        assert self.processor.getEntry(
170            dict(customer_id='ID_NONE'), self.app) is None
171        assert self.processor.getEntry(
172            dict(customer_id=self.customer.customer_id), self.app) is self.customer
173
174    def test_addEntry(self):
175        new_customer = Customer()
176        self.processor.addEntry(
177            new_customer, dict(), self.app)
178        assert len(self.app['customers'].keys()) == 2
179
180    def test_checkConversion(self):
181        # Make sure we can check conversions and that the cust_id
182        # counter is not raised during such checks.
183        initial_cust_id = self.app['customers']._curr_cust_id
184        errs, inv_errs, conv_dict = self.processor.checkConversion(
185            dict(reg_number='1', state='approved'))
186        self.assertEqual(len(errs),0)
187        # Empty state is allowed
188        errs, inv_errs, conv_dict = self.processor.checkConversion(
189            dict(reg_number='1', state=''))
190        self.assertEqual(len(errs),0)
191        #self.assertTrue(('state', 'no value provided') in errs)
192        errs, inv_errs, conv_dict = self.processor.checkConversion(
193            dict(reg_number='1', state='nonsense'))
194        self.assertEqual(len(errs),1)
195        self.assertTrue(('state', 'not allowed') in errs)
196        new_cust_id = self.app['customers']._curr_cust_id
197        self.assertEqual(initial_cust_id, new_cust_id)
198        return
199
200    def test_checkUpdateRequirements(self):
201        # Make sure that pg customers can't be updated with wrong transition.
202        err = self.processor.checkUpdateRequirements(self.customer,
203            dict(reg_number='1', state='returning'), self.app)
204        self.assertTrue(err is None)
205
206    def test_delEntry(self):
207        assert self.customer.customer_id in self.app['customers'].keys()
208        self.processor.delEntry(
209            dict(reg_number=self.customer.reg_number), self.app)
210        assert self.customer.customer_id not in self.app['customers'].keys()
211
212    def test_import(self):
213        self.assertEqual(self.app['customers']._curr_cust_id, 1000001)
214        num, num_warns, fin_file, fail_file = self.processor.doImport(
215            self.csv_file, CUSTOMER_HEADER_FIELDS)
216        self.assertEqual(num_warns,0)
217        self.assertEqual(len(self.app['customers']), 10)
218        self.assertEqual(self.app['customers']['X666666'].reg_number,'1')
219        self.assertEqual(
220            self.app['customers']['X666666'].state, 'requested')
221        # Two new customer_ids have been created.
222        self.assertEqual(self.app['customers']._curr_cust_id, 1000003)
223        shutil.rmtree(os.path.dirname(fin_file))
224
225    def test_import_update(self):
226        num, num_warns, fin_file, fail_file = self.processor.doImport(
227            self.csv_file, CUSTOMER_HEADER_FIELDS)
228        shutil.rmtree(os.path.dirname(fin_file))
229        num, num_warns, fin_file, fail_file = self.processor.doImport(
230            self.csv_file_update, CUSTOMER_HEADER_FIELDS_UPDATE, 'update')
231        self.assertEqual(num_warns,0)
232        # state has changed
233        self.assertEqual(self.app['customers']['X666666'].state,'approved')
234        # state has not changed
235        self.assertEqual(self.app['customers']['Y777777'].state, 'requested')
236        shutil.rmtree(os.path.dirname(fin_file))
237
238    def test_import_duplicate_data(self):
239        num, num_warns, fin_file, fail_file = self.processor.doImport(
240            self.csv_file_duplicates, CUSTOMER_HEADER_FIELDS_DUPLICATES)
241        content = open(fail_file).read()
242        self.assertEqual(num_warns,2)
243        self.assertEqual(
244            content,
245            'reg_number,password,firstname,lastname,sex,phone,state,date_of_birth,customer_id,email,--ERRORS--\r\n'
246            '1,,Frank,Meyer,m,1234,,1990-01-06,F123456,aa@aa.ng,reg_number: reg_number\r\n'
247            '3,,Uli,Schulz,m,1234,,1990-01-07,A123456,aa@aa.ng,This object already exists. Skipping.\r\n'
248
249            )
250        shutil.rmtree(os.path.dirname(fin_file))
251
252class CustomerDocumentProcessorTest(CustomerImportExportSetup):
253
254    def setUp(self):
255        super(CustomerDocumentProcessorTest, self).setUp()
256
257        # Add customer with document
258        customer = Customer()
259        customer.firstname = u'Anna'
260        customer.lastname = u'Tester'
261        customer.reg_number = u'123'
262        self.app['customers'].addCustomer(customer)
263        self.customer = self.app['customers'][customer.customer_id]
264        document = createObject(u'waeup.CustomerSampleDocument')
265        document.document_id = 'd120'
266        self.customer['documents'][document.document_id] = document
267
268        # Import customers with subobjects
269        customer_file = os.path.join(self.workdir, 'sample_customer_data.csv')
270        open(customer_file, 'wb').write(CUSTOMER_SAMPLE_DATA)
271        num, num_warns, fin_file, fail_file = CustomerProcessor().doImport(
272            customer_file, CUSTOMER_HEADER_FIELDS)
273        shutil.rmtree(os.path.dirname(fin_file))
274
275        self.processor = CustomerDocumentProcessor()
276        self.csv_file = os.path.join(
277            self.workdir, 'sample_document_data.csv')
278        open(self.csv_file, 'wb').write(DOCUMENT_SAMPLE_DATA)
279
280    def test_interface(self):
281        # Make sure we fulfill the interface contracts.
282        assert verifyObject(IBatchProcessor, self.processor) is True
283        assert verifyClass(
284            IBatchProcessor, CustomerDocumentProcessor) is True
285
286    def test_getEntry(self):
287        assert self.processor.getEntry(
288            dict(customer_id='ID_NONE', document_id='nonsense'), self.app) is None
289        assert self.processor.getEntry(
290            dict(customer_id=self.customer.customer_id, document_id='d120'),
291            self.app) is self.customer['documents']['d120']
292
293    def test_delEntry(self):
294        assert self.processor.getEntry(
295            dict(customer_id=self.customer.customer_id, document_id='d120'),
296            self.app) is self.customer['documents']['d120']
297        self.assertEqual(len(self.customer['documents'].keys()),1)
298        self.processor.delEntry(
299            dict(customer_id=self.customer.customer_id, document_id='d120'),
300            self.app)
301        assert self.processor.getEntry(
302            dict(customer_id=self.customer.customer_id, document_id='d120'),
303            self.app) is None
304        self.assertEqual(len(self.customer['documents'].keys()),0)
305
306    def test_addEntry(self):
307        self.assertEqual(len(self.customer['documents'].keys()),1)
308        document1 = createObject(u'waeup.CustomerSampleDocument')
309        document1.document_id = 'p234'
310        self.processor.addEntry(
311            document1, dict(customer_id=self.customer.customer_id, document_id='p234'),
312            self.app)
313        self.assertEqual(len(self.customer['documents'].keys()),2)
314        self.assertEqual(self.customer['documents']['p234'].document_id, 'p234')
315        document2 = createObject(u'waeup.CustomerSampleDocument')
316        document1.document_id = 'nonsense'
317
318    def test_checkConversion(self):
319        errs, inv_errs, conv_dict = self.processor.checkConversion(
320            dict(document_id='d1266236341955', class_name='CustomerSampleDocument'))
321        self.assertEqual(len(errs),0)
322        errs, inv_errs, conv_dict = self.processor.checkConversion(
323            dict(document_id='nonsense', class_name='CustomerSampleDocument'))
324        self.assertEqual(len(errs),1)
325        timestamp = ("%d" % int(time()*10000))[1:]
326        document_id = "d%s" % timestamp
327        errs, inv_errs, conv_dict = self.processor.checkConversion(
328            dict(document_id=document_id, class_name='CustomerSampleDocument'))
329        self.assertEqual(len(errs),0)
330        errs, inv_errs, conv_dict = self.processor.checkConversion(
331            dict(document_id=document_id, class_name='WrongDocument'))
332        self.assertEqual(len(errs),1)
333
334    def test_import(self):
335        num, num_warns, fin_file, fail_file = self.processor.doImport(
336            self.csv_file, DOCUMENT_HEADER_FIELDS,'create')
337        self.assertEqual(num_warns,1)
338        # document_id must be unique
339        fail_file = open(fail_file).read()
340        self.assertEqual(fail_file,
341            'class_name,reg_number,document_id,title,--ERRORS--\r\n'
342            'CustomerSampleDocument,2,d5,My stolen doc,document_id: id exists\r\n')
343        document = self.processor.getEntry(dict(reg_number='1',
344            document_id='d3'), self.app)
345        self.assertEqual(
346            self.app['customers']['X666666']['documents']['d3'],
347            document)
348        self.assertEqual(document.document_id, 'd3')
349        document = self.processor.getEntry(dict(reg_number='3',
350            document_id='d5'), self.app)
351        shutil.rmtree(os.path.dirname(fin_file))
352        logcontent = open(self.logfile).read()
353        # Logging message from updateEntry
354        self.assertTrue(
355            'INFO - system - CustomerDocument Processor - '
356            'sample_document_data - X666666 - updated: '
357            'document_id=d3, title=My first doc'
358            in logcontent)
359
360    def test_import_update(self):
361        # We perform the same import twice,
362        # the second time in update mode. The number
363        # of warnings must be the same.
364        num, num_warns, fin_file, fail_file = self.processor.doImport(
365            self.csv_file, DOCUMENT_HEADER_FIELDS,'create')
366        shutil.rmtree(os.path.dirname(fin_file))
367        num, num_warns, fin_file, fail_file = self.processor.doImport(
368            self.csv_file, DOCUMENT_HEADER_FIELDS,'update')
369        # There is one record without document_id and one duplicate
370        self.assertEqual(num_warns,2)
371        shutil.rmtree(os.path.dirname(fin_file))
372
373    def test_import_remove(self):
374        # We perform the same import twice,
375        # the second time in remove mode. The number
376        # of warnings must be the same.
377        num, num_warns, fin_file, fail_file = self.processor.doImport(
378            self.csv_file, DOCUMENT_HEADER_FIELDS,'create')
379        shutil.rmtree(os.path.dirname(fin_file))
380        num, num_warns, fin_file, fail_file = self.processor.doImport(
381            self.csv_file, DOCUMENT_HEADER_FIELDS,'remove')
382        # There is one record without document_id and one duplicate
383        self.assertEqual(num_warns,2)
384        shutil.rmtree(os.path.dirname(fin_file))
385        logcontent = open(self.logfile).read()
386        self.assertTrue(
387            'INFO - system - K1000001 - Document removed: d5'
388            in logcontent)
Note: See TracBrowser for help on using the repository browser.