## $Id: test_customer.py 12741 2015-03-12 05:29:43Z uli $
##
## Copyright (C) 2014 Uli Fouquet & Henrik Bettermann
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 2 of the License, or
## (at your option) any later version.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
## GNU General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with this program; if not, write to the Free Software
## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
##
"""Tests for customers and related.
"""
import os
import re
import unittest
from cStringIO import StringIO
from zope.component import getUtility, getUtilitiesFor
from zope.component.hooks import setSite
from zope.component.interfaces import IFactory
from zope.interface import verify
from waeup.ikoba.interfaces import IExtFileStore, IFileStoreNameChooser
from waeup.ikoba.app import Company
from waeup.ikoba.customers.customer import (
    Customer, CustomerFactory, handle_customer_removed, path_from_custid,
    CustomerPayer, CustomerFinder,
    )
from waeup.ikoba.customers.interfaces import (
    ICustomer, ICustomerNavigation, ICustomersUtils)
from waeup.ikoba.customers.tests.test_batching import CustomerImportExportSetup
from waeup.ikoba.payments.interfaces import IPayer, IPayerFinder
from waeup.ikoba.testing import FunctionalLayer, FunctionalTestCase


class HelperTests(unittest.TestCase):
    # Tests for helper functions in customer module.

    def test_path_from_custid(self):
        # make sure we get predictable paths from customer ids.
        self.assertEqual(
            path_from_custid('K1000000'), u'01000/K1000000')
        self.assertEqual(
            path_from_custid('K1234567'), u'01234/K1234567')
        self.assertEqual(
            path_from_custid('K12345678'), u'12345/K12345678')
        # The algorithm works also for overlong numbers, just to be
        # sure.
        self.assertEqual(
            path_from_custid('K123456789'), u'123456/K123456789')
        # low numbers (< 10**6) are treated special: they get max. of
        # 10,000 entries. That's mainly because of old customers
        # migrated into our portal.
        self.assertEqual(
            path_from_custid('KM123456'), u'00120/KM123456')
        return


class CustomerTest(FunctionalTestCase):

    layer = FunctionalLayer

    def setUp(self):
        super(CustomerTest, self).setUp()
        self.customer = Customer()
        self.customer.firstname = u'Anna'
        self.customer.lastname = u'Tester'
        return

    def tearDown(self):
        super(CustomerTest, self).tearDown()
        return

    def test_interfaces(self):
        verify.verifyClass(ICustomer, Customer)
        verify.verifyClass(ICustomerNavigation, Customer)
        verify.verifyObject(ICustomer, self.customer)
        verify.verifyObject(ICustomerNavigation, self.customer)
        return

    #def test_base(self):
    #    department = Department()
    #    studycourse = CustomerStudyCourse()
    #    self.assertRaises(
    #        TypeError, studycourse.addCustomerStudyLevel, department)
    #    studylevel = CustomerStudyLevel()
    #    self.assertRaises(
    #        TypeError, studylevel.addCourseTicket, department, department)


class TestCustomerHelpers(FunctionalTestCase):

    layer = FunctionalLayer

    def test_payer_adapter(self):
        # we can adapt ICustomer to IPayer (i.e. create a payer)
        customer = Customer()
        customer.firstname, customer.lastname = u'Anna', u'Tester'
        result = IPayer(customer)
        self.assertTrue(isinstance(result, CustomerPayer))
        verify.verifyObject(IPayer, result)
        self.assertEqual(result.first_name, u'Anna')
        self.assertEqual(result.last_name, u'Tester')
        self.assertEqual(result.payer_id, customer.customer_id)

    def test_customer_finder_iface(self):
        # we have a customer finder that returns IPayableFinder data.
        verify.verifyClass(IPayerFinder, CustomerFinder)

    def test_customer_finder_registered(self):
        # the customer finder is a utility registered on startup
        util = getUtility(IPayerFinder, name='customer_finder')
        self.assertTrue(isinstance(util, CustomerFinder))
        utils = [util for name, util in getUtilitiesFor(IPayerFinder)
                 if isinstance(util, CustomerFinder)]
        self.assertEqual(len(utils), 1)

    def create_customer_and_site(self):
        customer = Customer()
        customer.customer_id = u'CUST1'
        self.getRootFolder()['app'] = Company()
        app = self.getRootFolder()['app']
        setSite(app)
        return customer, app

    def test_customer_finder(self):
        # the customer finder can really find customers
        customer, app = self.create_customer_and_site()
        app['mycustomer'] = customer  # trigger cataloging
        finder = CustomerFinder()
        result = finder.get_payer_by_id('CUST1')
        self.assertTrue(result is customer)

    def test_customer_finder_not_stored(self):
        # we get none if an id is not stored
        customer, app = self.create_customer_and_site()
        app['mycustomer'] = customer  # trigger cataloging
        finder = CustomerFinder()
        result = finder.get_payer_by_id('Not-a-valid-id')
        self.assertTrue(result is None)

    def test_customer_finder_no_catalog(self):
        # customer finder does not complain about missing catalog
        finder = CustomerFinder()
        result = finder.get_payer_by_id('CUST1')
        self.assertTrue(result is None)


class CustomerRemovalTests(CustomerImportExportSetup):
    # Test handle_customer_removed
    #
    # This is a complex action updating several CSV files and moving
    # stored files to a backup location.
    #
    # These tests make no assumptions about the CSV files except that
    # they contain a deletion timestamp at end of each data row

    layer = FunctionalLayer

    def setUp(self):
        super(CustomerRemovalTests, self).setUp()
        self.setup_for_export()
        return

    def create_passport_img(self, customer):
        # create some passport file for `customer`
        storage = getUtility(IExtFileStore)
        image_path = os.path.join(os.path.dirname(__file__), 'test_image.jpg')
        self.image_contents = open(image_path, 'rb').read()
        file_id = IFileStoreNameChooser(customer).chooseName(
            attr='passport.jpg')
        storage.createFile(file_id, StringIO(self.image_contents))

    def test_backup_single_customer_data(self):
        # when a single customer is removed, the data is backed up.
        self.setup_customer(self.customer)
        # Add a fake image
        self.create_passport_img(self.customer)
        handle_customer_removed(self.customer, None)
        del_dir = self.app['datacenter'].deleted_path
        del_img_path = os.path.join(
            del_dir, 'media', 'customers', '00110', 'A111111',
            'passport_A111111.jpg')

        # The image was copied over
        self.assertTrue(os.path.isfile(del_img_path))
        self.assertEqual(
            open(del_img_path, 'rb').read(),
            self.image_contents)

        # The customer data were put into CSV files
        for name in getUtility(ICustomersUtils).EXPORTER_NAMES:
            csv_path = os.path.join(del_dir, '%s.csv' % name)
            self.assertTrue(os.path.isfile(csv_path))
            contents = open(csv_path, 'rb').read().split('\r\n')
            # We expect 3 lines output including a linebreak at end of file.
            self.assertEqual(len(contents), 3)
        return

    def test_backup_append_csv(self):
        # when several customers are removed, existing CSVs are appended
        self.setup_customer(self.customer)
        # Add a fake image
        self.create_passport_img(self.customer)
        del_dir = self.app['datacenter'].deleted_path
        # put fake data into customers.csv with trailing linebreak
        customers_csv = os.path.join(del_dir, 'customers.csv')
        open(customers_csv, 'wb').write('line1\r\nline2\r\n')
        handle_customer_removed(self.customer, None)
        contents = open(customers_csv, 'rb').read().split('\r\n')
        # there should be 4 lines in result csv (including trailing linebreak)
        self.assertEqual(len(contents), 4)
        return

    def test_old_files_removed(self):
        # make sure old files are not accessible any more
        self.setup_customer(self.customer)
        # Add a fake image
        self.create_passport_img(self.customer)
        # make sure we can access the image before removal
        file_store = getUtility(IExtFileStore)
        image = file_store.getFileByContext(self.customer, attr='passport.jpg')
        self.assertTrue(image is not None)

        # remove image (hopefully)
        handle_customer_removed(self.customer, None)

        # the is not accessible anymore
        image = file_store.getFileByContext(self.customer, attr='passport.jpg')
        self.assertEqual(image, None)
        return

    def test_csv_file_entries_have_timestamp(self):
        # each row in written csv files has a ``del_date`` column to
        # tell when the associated customer was deleted
        self.setup_customer(self.customer)
        del_dir = self.app['datacenter'].deleted_path
        customers_csv = os.path.join(del_dir, 'customers.csv')
        handle_customer_removed(self.customer, None)
        contents = open(customers_csv, 'rb').read().split('\r\n')
        # the CSV header ends with a ``del_date`` column
        self.assertTrue(contents[0].endswith(',del_date'))
        # each line ends with an UTC timestamp
        timestamp = contents[1][-23:]
        self.assertTrue(re.match(
            '^\d\d-\d\d-\d\d \d\d:\d\d:\d\d\+00:00$', timestamp))
        return


class CustomerFactoryTest(FunctionalTestCase):

    layer = FunctionalLayer

    def setUp(self):
        super(CustomerFactoryTest, self).setUp()
        self.factory = CustomerFactory()

    def tearDown(self):
        super(CustomerFactoryTest, self).tearDown()

    def test_interfaces(self):
        verify.verifyClass(IFactory, CustomerFactory)
        verify.verifyObject(IFactory, self.factory)

    def test_factory(self):
        obj = self.factory()
        assert isinstance(obj, Customer)

    def test_getInterfaces(self):
        implemented_by = self.factory.getInterfaces()
        assert implemented_by.isOrExtends(ICustomer)
