## $Id: test_batching.py 8228 2012-04-20 06:05:56Z henrik $
##
## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 2 of the License, or
## (at your option) any later version.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
## GNU General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with this program; if not, write to the Free Software
## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
##
import doctest
import logging
import os
import shutil
import tempfile
import unittest
from zope import schema
from zope.component import provideUtility
from zope.component.factory import Factory
from zope.component.hooks import clearSite
from zope.component.interfaces import IFactory
from zope.interface import Interface, implements, verify
from waeup.kofa.app import University
from waeup.kofa.interfaces import ICSVExporter, IBatchProcessor
from waeup.kofa.testing import FunctionalLayer, FunctionalTestCase
from waeup.kofa.utils.batching import ExporterBase

optionflags = (
    doctest.REPORT_NDIFF + doctest.ELLIPSIS + doctest.NORMALIZE_WHITESPACE)


class ICave(Interface):
    """A cave."""
    id_num = schema.TextLine(
        title = u'internal id',
        default = u'default',
        required = True,
        readonly = True,
        )
    name = schema.TextLine(
        title = u'Cave name',
        default = u'Unnamed',
        required = True)
    dinoports = schema.Int(
        title = u'Number of DinoPorts (tm)',
        required = False,
        default = 1)
    owner = schema.TextLine(
        title = u'Owner name',
        required = True,
        missing_value = 'Fred Estates Inc.')
    taxpayer = schema.Bool(
        title = u'Payes taxes',
        required = True,
        default = False)

class Cave(object):
    implements(ICave)
    def __init__(self, name=u'Unnamed', dinoports=2,
                 owner='Fred Estates Inc.', taxpayer=False):
        self.name = name
        self.dinoports = 2
        self.owner = owner
        self.taxpayer = taxpayer
#Cave = attrs_to_fields(Cave)

stoneville = dict

from waeup.kofa.utils.batching import BatchProcessor
class CaveProcessor(BatchProcessor):
    util_name = 'caveprocessor'
    #grok.name(util_name)
    name = 'Cave Processor'
    iface = ICave
    location_fields = ['name']
    factory_name = 'Lovely Cave'

    def parentsExist(self, row, site):
        return True

    def getParent(self, row, site):
        return stoneville

    def entryExists(self, row, site):
        return row['name'] in stoneville.keys()

    def getEntry(self, row, site):
        if not self.entryExists(row, site):
            return None
        return stoneville[row['name']]

    def delEntry(self, row, site):
        del stoneville[row['name']]

    def addEntry(self, obj, row, site):
        stoneville[row['name']] = obj

class BatchProcessorTests(FunctionalTestCase):

    layer = FunctionalLayer

    def setupLogger(self):

        self.logger = logging.getLogger('stoneville')
        self.logger.setLevel(logging.DEBUG)
        self.logger.propagate = False
        self.logfile = os.path.join(self.workdir, 'stoneville.log')
        self.handler = logging.FileHandler(self.logfile, 'w')
        self.logger.addHandler(self.handler)

    def setUp(self):
        global stoneville
        super(BatchProcessorTests, self).setUp()

        # Setup a sample site for each test
        app = University()
        self.dc_root = tempfile.mkdtemp()
        app['datacenter'].setStoragePath(self.dc_root)

        # Prepopulate the ZODB...
        self.getRootFolder()['app'] = app
        self.app = self.getRootFolder()['app']

        self.workdir = tempfile.mkdtemp()
        factory = Factory(Cave)
        provideUtility(factory, IFactory, 'Lovely Cave')

        # Provide sample data
        self.newcomers_csv = os.path.join(self.workdir, 'newcomers.csv')
        open(self.newcomers_csv, 'wb').write(
            """name,dinoports,owner,taxpayer
Barneys Home,2,Barney,1
Wilmas Asylum,1,Wilma,1
Freds Dinoburgers,10,Fred,0
Joeys Drive-in,110,Joey,0
""")
        self.setupLogger()
        self.stoneville = stoneville
        stoneville = dict()
        self.resultpath = None
        return

    def tearDown(self):
        super(BatchProcessorTests, self).tearDown()
        shutil.rmtree(self.workdir)
        shutil.rmtree(self.dc_root)
        self.logger.removeHandler(self.handler)
        clearSite()
        if not isinstance(self.resultpath, list):
            self.resultpath = [self.resultpath]
        for path in self.resultpath:
            if not isinstance(path, basestring):
                continue
            if not os.path.isdir(path):
                path = os.path.dirname(path)
            if os.path.exists(path):
                shutil.rmtree(path)
        return

    def test_iface(self):
        # make sure we fullfill interface contracts
        obj = BatchProcessor()
        verify.verifyClass(IBatchProcessor, BatchProcessor)
        verify.verifyObject(IBatchProcessor, obj)
        return

    def test_import(self):
        processor = CaveProcessor()
        result = processor.doImport(
            self.newcomers_csv,
            ['name', 'dinoports', 'owner', 'taxpayer'],
            mode='create', user='Bob', logger=self.logger)
        num_succ, num_fail, finished_path, failed_path = result
        self.resultpath = [finished_path, failed_path]
        assert num_succ == 4
        assert num_fail == 0
        assert finished_path.endswith('/newcomers.finished.csv')
        assert failed_path is None

    def test_import_stoneville(self):
        processor = CaveProcessor()
        result = processor.doImport(
            self.newcomers_csv,
            ['name', 'dinoports', 'owner', 'taxpayer'],
            mode='create', user='Bob', logger=self.logger)
        num_succ, num_fail, finished_path, failed_path = result
        self.resultpath = [finished_path, failed_path]
        assert len(self.stoneville) == 4
        self.assertEqual(
            sorted(self.stoneville.keys()),
            [u'Barneys Home', u'Freds Dinoburgers',
             u'Joeys Drive-in', u'Wilmas Asylum'])

    def test_import_correct_type(self):
        processor = CaveProcessor()
        result = processor.doImport(
            self.newcomers_csv,
            ['name', 'dinoports', 'owner', 'taxpayer'],
            mode='create', user='Bob', logger=self.logger)
        num_succ, num_fail, finished_path, failed_path = result
        self.resultpath = [finished_path, failed_path]
        assert isinstance(self.stoneville['Barneys Home'].dinoports, int)


    def test_log(self):
        """
           >>> print log_contents
           --------------------
           Bob: Batch processing finished: OK
           Bob: Source: /.../newcomers.csv
           Bob: Mode: create
           Bob: User: Bob
           Bob: Processing time: ... s (... s/item)
           Bob: Processed: 4 lines (4 successful/ 0 failed)
           --------------------

        """
        processor = CaveProcessor()
        result = processor.doImport(
            self.newcomers_csv,
            ['name', 'dinoports', 'owner', 'taxpayer'],
            mode='create', user='Bob', logger=self.logger)
        num_succ, num_fail, finished_path, failed_path = result
        self.resultpath = [finished_path, failed_path]
        log_contents = open(self.logfile, 'rb').read()
        doctest.run_docstring_examples(
            self.test_log, locals(), False, 'test_log', None, optionflags)
        return

class ExporterBaseTests(unittest.TestCase):

    def setUp(self):
        self.workdir = tempfile.mkdtemp()
        self.workfile = os.path.join(self.workdir, 'testfile.csv')
        return

    def tearDown(self):
        shutil.rmtree(self.workdir)
        return

    def test_iface(self):
        # ExporterBase really implements the promised interface.
        obj = ExporterBase()
        verify.verifyClass(ICSVExporter, ExporterBase)
        verify.verifyObject(ICSVExporter, obj)
        return

    def test_unimplemented(self):
        # make sure the not implemented methods signal that.
        exporter = ExporterBase()
        self.assertRaises(NotImplementedError, exporter.export_all, None)
        self.assertRaises(NotImplementedError, exporter.export, None)
        return

    def test_mangle_value(self):
        # some basic types are mangled correctly
        exporter = ExporterBase()
        result1 = exporter.mangle_value(True, 'foo')
        result2 = exporter.mangle_value(False, 'foo')
        result3 = exporter.mangle_value('string', 'foo')
        result4 = exporter.mangle_value(u'string', 'foo')
        result5 = exporter.mangle_value(None, 'foo')
        self.assertEqual(
            (result1, result2, result3, result4, result5),
            ('1', '0', u'string', u'string', ''))
        self.assertEqual(type(result3), type('string'))
        self.assertEqual(type(result4), type('string'))
        return

    def test_get_csv_writer(self):
        # we can get a CSV writer to a memory file
        exporter = ExporterBase()
        writer, outfile = exporter.get_csv_writer()
        writer.writerow(dict(code='A', title='B', title_prefix='C'))
        outfile.seek(0)
        self.assertEqual(
            outfile.read(),
            'code,title,title_prefix\r\nA,B,C\r\n')
        return

    def test_get_csv_writer_with_file(self):
        # we can get CSV writer that writes to a real file
        exporter = ExporterBase()
        writer, outfile = exporter.get_csv_writer(filepath=self.workfile)
        writer.writerow(dict(code='A', title='B', title_prefix='C'))
        outfile.close()
        resultfile = open(self.workfile, 'rb')
        self.assertEqual(
            resultfile.read(),
            'code,title,title_prefix\r\nA,B,C\r\n')
        return

    def test_write_item(self):
        # we can write items to opened exporter files.
        exporter = ExporterBase()
        writer, outfile = exporter.get_csv_writer()
        class Sample(object):
            code = 'A'
            title = u'B'
            title_prefix = True
        exporter.write_item(Sample(), writer)
        outfile.seek(0)
        self.assertEqual(
            outfile.read(),
            'code,title,title_prefix\r\nA,B,1\r\n')
        return

    def test_close_outfile(self):
        # exporters can help to close outfiles.
        exporter = ExporterBase()
        writer, outfile = exporter.get_csv_writer()
        result = exporter.close_outfile(None, outfile)
        self.assertEqual(result, 'code,title,title_prefix\r\n')
        return

    def test_close_outfile_real(self):
        # we can also close outfiles in real files.
        exporter = ExporterBase()
        writer, outfile = exporter.get_csv_writer(filepath=self.workfile)
        result = exporter.close_outfile(self.workfile, outfile)
        self.assertEqual(result, None)
        return
