## $Id: test_browser.py 12502 2015-01-20 18:09:03Z henrik $
##
## Copyright (C) 2014 Uli Fouquet & Henrik Bettermann
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 2 of the License, or
## (at your option) any later version.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
## GNU General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with this program; if not, write to the Free Software
## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
##
"""
Document browser and functional tests.
"""
import os
import grok
import datetime
from cStringIO import StringIO
from zope.component import queryUtility, getUtility, createObject
from zope.event import notify
from zope.interface.verify import verifyObject, verifyClass
from zope.testbrowser.testing import Browser
from hurry.workflow.interfaces import IWorkflowState
from waeup.ikoba.customers.tests.test_batching import CustomerImportExportSetup
from waeup.ikoba.customers.tests.test_browser import CustomersFullSetup
from ikobacustom.skeleton.customers.export import (
    SkeletonCustomerExporter,
    SkeletonCustomerDocumentExporter,
    SkeletonContractExporter)
from ikobacustom.skeleton.customers.batching import (
    SkeletonCustomerProcessor,
    SkeletonCustomerDocumentProcessor,
    SkeletonContractProcessor)
from ikobacustom.skeleton.testing import FunctionalLayer, samples_dir

SAMPLE_IMAGE = os.path.join(os.path.dirname(__file__), 'test_image.jpg')
SAMPLE_IMAGE_BMP = os.path.join(os.path.dirname(__file__), 'test_image.bmp')
SAMPLE_PDF = os.path.join(os.path.dirname(__file__), 'test_pdf.pdf')

class CustomerImportExportTest(CustomerImportExportSetup):

    layer = FunctionalLayer

    def setup_customizable_params(self):
        self._contract_category = u'sample'
        return

    def setup_for_export(self):
        customer = createObject(u'waeup.Customer')
        customer.firstname = u'Beate'
        customer.lastname = u'Mueller'
        customer.reg_number = u'123'
        customer.sex = u'f'
        IWorkflowState(customer).setState('started')
        self.app['customers'].addCustomer(customer)
        document = createObject(u'waeup.SkeletonCustomerDocument')
        document.title = u'My first document'
        customer['documents'].addDocument(document)
        contract = createObject(u'waeup.SkeletonContract')
        contract.tc_dict = {'en':u'Hello World'}
        customer['contracts'].addContract(contract)
        self.customer = customer
        self.document = document
        self.contract = contract
        self.outfile = os.path.join(self.workdir, 'myoutput.csv')
        return

    def test_export_reimport_customers(self):
        # we can export all customers in a portal
        # set values we can expect in export file
        self.setup_for_export()
        exporter = SkeletonCustomerExporter()
        exporter.export_all(self.app, self.outfile)
        result = open(self.outfile, 'rb').read()
        self.assertEqual(result,
            'customer_id,email,firstname,lastname,middlename,phone,'
            'reg_number,sex,suspended,suspended_comment,password,state,history\r\n'
            'K1000000,,Beate,Mueller,,,123,f,0,,,started,[]\r\n')
        # We can reimport the file ...
        processor = SkeletonCustomerProcessor()
        result = processor.doImport(
            self.outfile,
            ['customer_id','email','firstname','lastname','middlename','phone',
            'reg_number','sex','suspended','suspended_comment','password','state'],
            mode='create')
        num, num_fail, finished_path, failed_path = result
        self.assertEqual(num_fail,1)
        # ... if we remove the original customer.
        del self.app['customers']['K1000000']
        result = processor.doImport(
            self.outfile,
            ['customer_id','email','firstname','lastname','middlename','phone',
            'reg_number','sex','suspended','suspended_comment','password','state'],
            mode='create')
        num_succ, num_fail, finished_path, failed_path = result
        self.assertEqual(num_fail,0)
        # We can import the same file in update mode if we ignore the reg_number.
        result = processor.doImport(
            self.outfile,
            ['customer_id','email','firstname','lastname','middlename','phone',
            'xx_reg_number','sex','suspended','suspended_comment','password','state'],
            mode='update')
        num_succ, num_fail, finished_path, failed_path = result
        self.assertEqual(num_succ,1)
        self.assertEqual(num_fail,0)
        return

    def test_export_reimport_documents(self):
        # we can export all documents in a portal
        # set values we can expect in export file
        self.setup_for_export()
        exporter = SkeletonCustomerDocumentExporter()
        exporter.export_all(self.app, self.outfile)
        result = open(self.outfile, 'rb').read()
        self.assertMatches(result,
            'class_name,document_id,history,state,title,user_id\r\n'
            'SkeletonCustomerDocument,%s,'
            '[u\'2014-12-21 17:00:36 WAT - Document created by system\'],'
            'created,My first document,K1000000\r\n'
            % self.document.document_id)
        # We can reimport the file if we change the header (user_id -> customer_id)
        processor = SkeletonCustomerDocumentProcessor()
        open(self.outfile, 'wb').write(
            'customer_id,class_name,document_id,state,title\r\n'
            'K1000000,SkeletonCustomerDocument,%s,started,My first title\r\n'
            % self.document.document_id)
        result = processor.doImport(
            self.outfile,
            ['customer_id','class_name','document_id','state','title'],
            mode='create')
        num, num_fail, finished_path, failed_path = result
        # The object exists.
        self.assertEqual(num_fail,1)
        # We remove the original document.
        del self.customer['documents'][self.document.document_id]
        result = processor.doImport(
            self.outfile,
            ['customer_id','class_name','document_id','state','title'],
            mode='create')
        num_succ, num_fail, finished_path, failed_path = result
        self.assertEqual(num_fail,0)
        # We can import the same file in update mode.
        result = processor.doImport(
            self.outfile,
            ['customer_id','class_name','document_id','state','title'],
            mode='update')
        num_succ, num_fail, finished_path, failed_path = result
        self.assertEqual(num_succ,1)
        self.assertEqual(num_fail,0)
        return

    def test_export_reimport_contracts(self):
        # we can export all contracts in a portal
        # set values we can expect in export file
        self.setup_for_export()
        exporter = SkeletonContractExporter()
        exporter.export_all(self.app, self.outfile)
        result = open(self.outfile, 'rb').read()
        self.assertMatches(result,
            'class_name,comment,contract_category,contract_id,document_object,'
            'history,last_product_id,product_object,product_options,'
            'state,tc_dict,title,user_id\r\n'
            'SkeletonContract,,sample,%s,,'
            '[u\'2014-12-21 22:26:00 WAT - Contract created by system\']'
            ',,,[],created,{\'en\': u\'Hello World\'},,K1000000\r\n'
            % self.contract.contract_id)
        # We can reimport the file if we change the header (user_id -> customer_id)
        processor = SkeletonContractProcessor()
        open(self.outfile, 'wb').write(
            'class_name,contract_category,contract_id,document_object,'
            'history,last_product_id,product_object,product_options,'
            'state,tc_dict,title,user_id\r\n'
            'SkeletonContract,sample,%s,,'
            '[u\'2014-12-21 22:26:00 WAT - Contract created by system\']'
            ',,,[],created,{\'en\': u\'Hello World\'},,K1000000\r\n'
            % self.contract.contract_id)
        result = processor.doImport(
            self.outfile,
            ['class_name','contract_category','contract_id','document_object',
            'history','last_product_id','product_object','product_options',
            'state','tc_dict','title','customer_id'],
            mode='create')
        num, num_fail, finished_path, failed_path = result
        # The object exists.
        self.assertEqual(num_fail,1)
        # We remove the original contract.
        del self.customer['contracts'][self.contract.contract_id]
        result = processor.doImport(
            self.outfile,
            ['class_name','contract_category','contract_id','document_object',
            'history','last_product_id','product_object','product_options',
            'state','tc_dict','title','customer_id'],
            mode='create')
        num_succ, num_fail, finished_path, failed_path = result
        self.assertEqual(num_fail,0)
        # We can import the same file in update mode.
        result = processor.doImport(
            self.outfile,
            ['class_name','contract_category','contract_id','document_object',
            'history','last_product_id','product_object','product_options',
            'state','tc_dict','title','customer_id'],
            mode='update')
        num_succ, num_fail, finished_path, failed_path = result
        self.assertEqual(num_succ,1)
        self.assertEqual(num_fail,0)
        return

class DocumentUITests(CustomersFullSetup):
    # Tests for customer document related views and pages

    layer = FunctionalLayer

    def setup_customizable_params(self):
        self._contract_category = u'sample'
        self._document_factory = 'waeup.SkeletonCustomerDocument'
        self._contract_factory = 'waeup.SkeletonContract'
        return

    def test_manage_upload_sample_file(self):
        # Managers can upload a file via the DocumentManageFormPage
        # The image is stored even if form has errors
        self.browser.addHeader('Authorization', 'Basic mgr:mgrpw')
        self.browser.open(self.customer_path + '/documents/DOC1/manage')
        # Create a pseudo image file and select it to be uploaded
        image = open(SAMPLE_IMAGE, 'rb')
        ctrl = self.browser.getControl(name='samplescanmanageupload')
        file_ctrl = ctrl.mech_control
        file_ctrl.add_file(image, filename='my_sample_scan.jpg')
        # The Save action does not upload files
        self.browser.getControl("Save").click() # submit form
        self.assertFalse(
            'href="http://localhost/app/customers/K1000000/documents/DOC1/sample"'
            in self.browser.contents)
        # ... but the correct upload submit button does
        image = open(SAMPLE_IMAGE)
        ctrl = self.browser.getControl(name='samplescanmanageupload')
        file_ctrl = ctrl.mech_control
        file_ctrl.add_file(image, filename='my_sample_scan.jpg')
        self.browser.getControl(
            name='upload_samplescanmanageupload').click()
        self.assertTrue(
            'href="http://localhost/app/customers/K1000000/documents/DOC1/sample"'
            in self.browser.contents)
        # Browsing the link shows a real image
        self.browser.open('sample')
        self.assertEqual(
            self.browser.headers['content-type'], 'image/jpeg')
        self.assertEqual(len(self.browser.contents), 2787)
        # We can't reupload a file. The existing file must be deleted first.
        self.browser.open(self.customer_path + '/documents/DOC1/manage')
        self.assertFalse(
            'upload_samplescanmanageupload' in self.browser.contents)
        # File must be deleted first
        self.browser.getControl(name='delete_samplescanmanageupload').click()
        self.assertTrue(
            'sample deleted' in self.browser.contents)
        # Uploading a file which is bigger than 150k will raise an error
        big_image = StringIO(open(SAMPLE_IMAGE, 'rb').read() * 75)
        ctrl = self.browser.getControl(name='samplescanmanageupload')
        file_ctrl = ctrl.mech_control
        file_ctrl.add_file(big_image, filename='my_sample_scan.jpg')
        self.browser.getControl(
            name='upload_samplescanmanageupload').click()
        self.assertTrue(
            'Uploaded file is too big' in self.browser.contents)
        # We do not rely on filename extensions given by uploaders
        image = open(SAMPLE_IMAGE, 'rb') # a jpg-file
        ctrl = self.browser.getControl(name='samplescanmanageupload')
        file_ctrl = ctrl.mech_control
        # Tell uploaded file is bmp
        file_ctrl.add_file(image, filename='my_sample_scan.bmp')
        self.browser.getControl(
            name='upload_samplescanmanageupload').click()
        self.assertTrue(
            # jpg file was recognized
            'File sample.jpg uploaded.' in self.browser.contents)
        # Delete file again
        self.browser.getControl(name='delete_samplescanmanageupload').click()
        self.assertTrue(
            'sample deleted' in self.browser.contents)
        # File names must meet several conditions
        bmp_image = open(SAMPLE_IMAGE_BMP, 'rb')
        ctrl = self.browser.getControl(name='samplescanmanageupload')
        file_ctrl = ctrl.mech_control
        file_ctrl.add_file(bmp_image, filename='my_sample_scan.bmp')
        self.browser.getControl(
            name='upload_samplescanmanageupload').click()
        self.assertTrue('Only the following extensions are allowed'
            in self.browser.contents)

class ContractUITests(CustomersFullSetup):
    # Tests for contract related views and pages

    layer = FunctionalLayer

    def setup_customizable_params(self):
        self._contract_category = u'sample'
        self._document_factory = 'waeup.SkeletonCustomerDocument'
        self._contract_factory = 'waeup.SkeletonContract'
        return

    def test_view_slips(self):
        self.browser.addHeader('Authorization', 'Basic mgr:mgrpw')
        # Officers can open contract slips.
        # First we add a submitted document and a product.
        IWorkflowState(self.document).setState('submitted')
        self.contract.document_object = self.document
        self.contract.product_object = self.product
        self.contract.tc_dict = {'en': u'<strong>Hello world</strong>'}
        self.browser.open(self.customer_path + '/contracts/CON1')
        self.browser.getLink("Download contract slip").click()
        self.assertEqual(self.browser.headers['Status'], '200 Ok')
        self.assertEqual(self.browser.headers['Content-Type'], 'application/pdf')
        path = os.path.join(samples_dir(), 'contract_slip.pdf')
        open(path, 'wb').write(self.browser.contents)
        print "Sample contract_slip.pdf written to %s" % path
