source: main/waeup.ikoba/trunk/src/waeup/ikoba/customers/tests/test_customer.py @ 15796

Last change on this file since 15796 was 12741, checked in by uli, 10 years ago

Merge changes from uli-payments back into trunk.

  • Property svn:keywords set to Id
File size: 10.5 KB
Line 
1## $Id: test_customer.py 12741 2015-03-12 05:29:43Z uli $
2##
3## Copyright (C) 2014 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
18"""Tests for customers and related.
19"""
20import os
21import re
22import unittest
23from cStringIO import StringIO
24from zope.component import getUtility, getUtilitiesFor
25from zope.component.hooks import setSite
26from zope.component.interfaces import IFactory
27from zope.interface import verify
28from waeup.ikoba.interfaces import IExtFileStore, IFileStoreNameChooser
29from waeup.ikoba.app import Company
30from waeup.ikoba.customers.customer import (
31    Customer, CustomerFactory, handle_customer_removed, path_from_custid,
32    CustomerPayer, CustomerFinder,
33    )
34from waeup.ikoba.customers.interfaces import (
35    ICustomer, ICustomerNavigation, ICustomersUtils)
36from waeup.ikoba.customers.tests.test_batching import CustomerImportExportSetup
37from waeup.ikoba.payments.interfaces import IPayer, IPayerFinder
38from waeup.ikoba.testing import FunctionalLayer, FunctionalTestCase
39
40
41class HelperTests(unittest.TestCase):
42    # Tests for helper functions in customer module.
43
44    def test_path_from_custid(self):
45        # make sure we get predictable paths from customer ids.
46        self.assertEqual(
47            path_from_custid('K1000000'), u'01000/K1000000')
48        self.assertEqual(
49            path_from_custid('K1234567'), u'01234/K1234567')
50        self.assertEqual(
51            path_from_custid('K12345678'), u'12345/K12345678')
52        # The algorithm works also for overlong numbers, just to be
53        # sure.
54        self.assertEqual(
55            path_from_custid('K123456789'), u'123456/K123456789')
56        # low numbers (< 10**6) are treated special: they get max. of
57        # 10,000 entries. That's mainly because of old customers
58        # migrated into our portal.
59        self.assertEqual(
60            path_from_custid('KM123456'), u'00120/KM123456')
61        return
62
63
64class CustomerTest(FunctionalTestCase):
65
66    layer = FunctionalLayer
67
68    def setUp(self):
69        super(CustomerTest, self).setUp()
70        self.customer = Customer()
71        self.customer.firstname = u'Anna'
72        self.customer.lastname = u'Tester'
73        return
74
75    def tearDown(self):
76        super(CustomerTest, self).tearDown()
77        return
78
79    def test_interfaces(self):
80        verify.verifyClass(ICustomer, Customer)
81        verify.verifyClass(ICustomerNavigation, Customer)
82        verify.verifyObject(ICustomer, self.customer)
83        verify.verifyObject(ICustomerNavigation, self.customer)
84        return
85
86    #def test_base(self):
87    #    department = Department()
88    #    studycourse = CustomerStudyCourse()
89    #    self.assertRaises(
90    #        TypeError, studycourse.addCustomerStudyLevel, department)
91    #    studylevel = CustomerStudyLevel()
92    #    self.assertRaises(
93    #        TypeError, studylevel.addCourseTicket, department, department)
94
95
96class TestCustomerHelpers(FunctionalTestCase):
97
98    layer = FunctionalLayer
99
100    def test_payer_adapter(self):
101        # we can adapt ICustomer to IPayer (i.e. create a payer)
102        customer = Customer()
103        customer.firstname, customer.lastname = u'Anna', u'Tester'
104        result = IPayer(customer)
105        self.assertTrue(isinstance(result, CustomerPayer))
106        verify.verifyObject(IPayer, result)
107        self.assertEqual(result.first_name, u'Anna')
108        self.assertEqual(result.last_name, u'Tester')
109        self.assertEqual(result.payer_id, customer.customer_id)
110
111    def test_customer_finder_iface(self):
112        # we have a customer finder that returns IPayableFinder data.
113        verify.verifyClass(IPayerFinder, CustomerFinder)
114
115    def test_customer_finder_registered(self):
116        # the customer finder is a utility registered on startup
117        util = getUtility(IPayerFinder, name='customer_finder')
118        self.assertTrue(isinstance(util, CustomerFinder))
119        utils = [util for name, util in getUtilitiesFor(IPayerFinder)
120                 if isinstance(util, CustomerFinder)]
121        self.assertEqual(len(utils), 1)
122
123    def create_customer_and_site(self):
124        customer = Customer()
125        customer.customer_id = u'CUST1'
126        self.getRootFolder()['app'] = Company()
127        app = self.getRootFolder()['app']
128        setSite(app)
129        return customer, app
130
131    def test_customer_finder(self):
132        # the customer finder can really find customers
133        customer, app = self.create_customer_and_site()
134        app['mycustomer'] = customer  # trigger cataloging
135        finder = CustomerFinder()
136        result = finder.get_payer_by_id('CUST1')
137        self.assertTrue(result is customer)
138
139    def test_customer_finder_not_stored(self):
140        # we get none if an id is not stored
141        customer, app = self.create_customer_and_site()
142        app['mycustomer'] = customer  # trigger cataloging
143        finder = CustomerFinder()
144        result = finder.get_payer_by_id('Not-a-valid-id')
145        self.assertTrue(result is None)
146
147    def test_customer_finder_no_catalog(self):
148        # customer finder does not complain about missing catalog
149        finder = CustomerFinder()
150        result = finder.get_payer_by_id('CUST1')
151        self.assertTrue(result is None)
152
153
154class CustomerRemovalTests(CustomerImportExportSetup):
155    # Test handle_customer_removed
156    #
157    # This is a complex action updating several CSV files and moving
158    # stored files to a backup location.
159    #
160    # These tests make no assumptions about the CSV files except that
161    # they contain a deletion timestamp at end of each data row
162
163    layer = FunctionalLayer
164
165    def setUp(self):
166        super(CustomerRemovalTests, self).setUp()
167        self.setup_for_export()
168        return
169
170    def create_passport_img(self, customer):
171        # create some passport file for `customer`
172        storage = getUtility(IExtFileStore)
173        image_path = os.path.join(os.path.dirname(__file__), 'test_image.jpg')
174        self.image_contents = open(image_path, 'rb').read()
175        file_id = IFileStoreNameChooser(customer).chooseName(
176            attr='passport.jpg')
177        storage.createFile(file_id, StringIO(self.image_contents))
178
179    def test_backup_single_customer_data(self):
180        # when a single customer is removed, the data is backed up.
181        self.setup_customer(self.customer)
182        # Add a fake image
183        self.create_passport_img(self.customer)
184        handle_customer_removed(self.customer, None)
185        del_dir = self.app['datacenter'].deleted_path
186        del_img_path = os.path.join(
187            del_dir, 'media', 'customers', '00110', 'A111111',
188            'passport_A111111.jpg')
189
190        # The image was copied over
191        self.assertTrue(os.path.isfile(del_img_path))
192        self.assertEqual(
193            open(del_img_path, 'rb').read(),
194            self.image_contents)
195
196        # The customer data were put into CSV files
197        for name in getUtility(ICustomersUtils).EXPORTER_NAMES:
198            csv_path = os.path.join(del_dir, '%s.csv' % name)
199            self.assertTrue(os.path.isfile(csv_path))
200            contents = open(csv_path, 'rb').read().split('\r\n')
201            # We expect 3 lines output including a linebreak at end of file.
202            self.assertEqual(len(contents), 3)
203        return
204
205    def test_backup_append_csv(self):
206        # when several customers are removed, existing CSVs are appended
207        self.setup_customer(self.customer)
208        # Add a fake image
209        self.create_passport_img(self.customer)
210        del_dir = self.app['datacenter'].deleted_path
211        # put fake data into customers.csv with trailing linebreak
212        customers_csv = os.path.join(del_dir, 'customers.csv')
213        open(customers_csv, 'wb').write('line1\r\nline2\r\n')
214        handle_customer_removed(self.customer, None)
215        contents = open(customers_csv, 'rb').read().split('\r\n')
216        # there should be 4 lines in result csv (including trailing linebreak)
217        self.assertEqual(len(contents), 4)
218        return
219
220    def test_old_files_removed(self):
221        # make sure old files are not accessible any more
222        self.setup_customer(self.customer)
223        # Add a fake image
224        self.create_passport_img(self.customer)
225        # make sure we can access the image before removal
226        file_store = getUtility(IExtFileStore)
227        image = file_store.getFileByContext(self.customer, attr='passport.jpg')
228        self.assertTrue(image is not None)
229
230        # remove image (hopefully)
231        handle_customer_removed(self.customer, None)
232
233        # the is not accessible anymore
234        image = file_store.getFileByContext(self.customer, attr='passport.jpg')
235        self.assertEqual(image, None)
236        return
237
238    def test_csv_file_entries_have_timestamp(self):
239        # each row in written csv files has a ``del_date`` column to
240        # tell when the associated customer was deleted
241        self.setup_customer(self.customer)
242        del_dir = self.app['datacenter'].deleted_path
243        customers_csv = os.path.join(del_dir, 'customers.csv')
244        handle_customer_removed(self.customer, None)
245        contents = open(customers_csv, 'rb').read().split('\r\n')
246        # the CSV header ends with a ``del_date`` column
247        self.assertTrue(contents[0].endswith(',del_date'))
248        # each line ends with an UTC timestamp
249        timestamp = contents[1][-23:]
250        self.assertTrue(re.match(
251            '^\d\d-\d\d-\d\d \d\d:\d\d:\d\d\+00:00$', timestamp))
252        return
253
254
255class CustomerFactoryTest(FunctionalTestCase):
256
257    layer = FunctionalLayer
258
259    def setUp(self):
260        super(CustomerFactoryTest, self).setUp()
261        self.factory = CustomerFactory()
262
263    def tearDown(self):
264        super(CustomerFactoryTest, self).tearDown()
265
266    def test_interfaces(self):
267        verify.verifyClass(IFactory, CustomerFactory)
268        verify.verifyObject(IFactory, self.factory)
269
270    def test_factory(self):
271        obj = self.factory()
272        assert isinstance(obj, Customer)
273
274    def test_getInterfaces(self):
275        implemented_by = self.factory.getInterfaces()
276        assert implemented_by.isOrExtends(ICustomer)
Note: See TracBrowser for help on using the repository browser.