source: main/waeup.kofa/trunk/src/waeup/kofa/utils/tests/test_batching.py @ 7809

Last change on this file since 7809 was 7196, checked in by Henrik Bettermann, 13 years ago

More copyright adjustments (job finished).

  • Property svn:keywords set to Id
File size: 7.6 KB
Line 
1## $Id: test_batching.py 7196 2011-11-25 07:44:52Z henrik $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
18import doctest
19import logging
20import os
21import shutil
22import tempfile
23from zope import schema
24from zope.component import provideUtility
25from zope.component.factory import Factory
26from zope.component.hooks import clearSite
27from zope.component.interfaces import IFactory
28from zope.interface import Interface, implements
29from waeup.sirp.app import University
30from waeup.sirp.testing import FunctionalLayer, FunctionalTestCase
31
32optionflags = (
33    doctest.REPORT_NDIFF + doctest.ELLIPSIS + doctest.NORMALIZE_WHITESPACE)
34
35
36class ICave(Interface):
37    """A cave."""
38    id_num = schema.TextLine(
39        title = u'internal id',
40        default = u'default',
41        required = True,
42        readonly = True,
43        )
44    name = schema.TextLine(
45        title = u'Cave name',
46        default = u'Unnamed',
47        required = True)
48    dinoports = schema.Int(
49        title = u'Number of DinoPorts (tm)',
50        required = False,
51        default = 1)
52    owner = schema.TextLine(
53        title = u'Owner name',
54        required = True,
55        missing_value = 'Fred Estates Inc.')
56    taxpayer = schema.Bool(
57        title = u'Payes taxes',
58        required = True,
59        default = False)
60
61class Cave(object):
62    implements(ICave)
63    def __init__(self, name=u'Unnamed', dinoports=2,
64                 owner='Fred Estates Inc.', taxpayer=False):
65        self.name = name
66        self.dinoports = 2
67        self.owner = owner
68        self.taxpayer = taxpayer
69#Cave = attrs_to_fields(Cave)
70
71stoneville = dict
72
73from waeup.sirp.utils.batching import BatchProcessor
74class CaveProcessor(BatchProcessor):
75    util_name = 'caveprocessor'
76    #grok.name(util_name)
77    name = 'Cave Processor'
78    iface = ICave
79    location_fields = ['name']
80    factory_name = 'Lovely Cave'
81
82    def parentsExist(self, row, site):
83        return True
84
85    def getParent(self, row, site):
86        return stoneville
87
88    def entryExists(self, row, site):
89        return row['name'] in stoneville.keys()
90
91    def getEntry(self, row, site):
92        if not self.entryExists(row, site):
93            return None
94        return stoneville[row['name']]
95
96    def delEntry(self, row, site):
97        del stoneville[row['name']]
98
99    def addEntry(self, obj, row, site):
100        stoneville[row['name']] = obj
101
102    def updateEntry(self, obj, row, site):
103        # This is not strictly necessary, as the default
104        # updateEntry method does exactly the same
105        for key, value in row.items():
106            setattr(obj, key, value)
107
108class BatchProcessorTests(FunctionalTestCase):
109
110    layer = FunctionalLayer
111
112    def setupLogger(self):
113
114        self.logger = logging.getLogger('stoneville')
115        self.logger.setLevel(logging.DEBUG)
116        self.logger.propagate = False
117        self.logfile = os.path.join(self.workdir, 'stoneville.log')
118        self.handler = logging.FileHandler(self.logfile, 'w')
119        self.logger.addHandler(self.handler)
120
121    def setUp(self):
122        global stoneville
123        super(BatchProcessorTests, self).setUp()
124
125        # Setup a sample site for each test
126        app = University()
127        self.dc_root = tempfile.mkdtemp()
128        app['datacenter'].setStoragePath(self.dc_root)
129
130        # Prepopulate the ZODB...
131        self.getRootFolder()['app'] = app
132        self.app = self.getRootFolder()['app']
133
134        self.workdir = tempfile.mkdtemp()
135        factory = Factory(Cave)
136        provideUtility(factory, IFactory, 'Lovely Cave')
137
138        # Provide sample data
139        self.newcomers_csv = os.path.join(self.workdir, 'newcomers.csv')
140        open(self.newcomers_csv, 'wb').write(
141            """name,dinoports,owner,taxpayer
142Barneys Home,2,Barney,1
143Wilmas Asylum,1,Wilma,1
144Freds Dinoburgers,10,Fred,0
145Joeys Drive-in,110,Joey,0
146""")
147        self.setupLogger()
148        self.stoneville = stoneville
149        stoneville = dict()
150        self.resultpath = None
151        return
152
153    def tearDown(self):
154        super(BatchProcessorTests, self).tearDown()
155        shutil.rmtree(self.workdir)
156        shutil.rmtree(self.dc_root)
157        self.logger.removeHandler(self.handler)
158        clearSite()
159        if not isinstance(self.resultpath, list):
160            self.resultpath = [self.resultpath]
161        for path in self.resultpath:
162            if not isinstance(path, basestring):
163                continue
164            if not os.path.isdir(path):
165                path = os.path.dirname(path)
166            if os.path.exists(path):
167                shutil.rmtree(path)
168        return
169
170    def test_import(self):
171        processor = CaveProcessor()
172        result = processor.doImport(
173            self.newcomers_csv,
174            ['name', 'dinoports', 'owner', 'taxpayer'],
175            mode='create', user='Bob', logger=self.logger)
176        num_succ, num_fail, finished_path, failed_path = result
177        self.resultpath = [finished_path, failed_path]
178        assert num_succ == 4
179        assert num_fail == 0
180        assert finished_path.endswith('/newcomers.finished.csv')
181        assert failed_path is None
182
183    def test_import_stoneville(self):
184        processor = CaveProcessor()
185        result = processor.doImport(
186            self.newcomers_csv,
187            ['name', 'dinoports', 'owner', 'taxpayer'],
188            mode='create', user='Bob', logger=self.logger)
189        num_succ, num_fail, finished_path, failed_path = result
190        self.resultpath = [finished_path, failed_path]
191        assert len(self.stoneville) == 4
192        self.assertEqual(
193            sorted(self.stoneville.keys()),
194            [u'Barneys Home', u'Freds Dinoburgers',
195             u'Joeys Drive-in', u'Wilmas Asylum'])
196
197    def test_import_correct_type(self):
198        processor = CaveProcessor()
199        result = processor.doImport(
200            self.newcomers_csv,
201            ['name', 'dinoports', 'owner', 'taxpayer'],
202            mode='create', user='Bob', logger=self.logger)
203        num_succ, num_fail, finished_path, failed_path = result
204        self.resultpath = [finished_path, failed_path]
205        assert isinstance(self.stoneville['Barneys Home'].dinoports, int)
206
207
208    def test_log(self):
209        """
210           >>> print log_contents
211           --------------------
212           Bob: Batch processing finished: OK
213           Bob: Source: /.../newcomers.csv
214           Bob: Mode: create
215           Bob: User: Bob
216           Bob: Processing time: ... s (... s/item)
217           Bob: Processed: 4 lines (4 successful/ 0 failed)
218           --------------------
219
220        """
221        processor = CaveProcessor()
222        result = processor.doImport(
223            self.newcomers_csv,
224            ['name', 'dinoports', 'owner', 'taxpayer'],
225            mode='create', user='Bob', logger=self.logger)
226        num_succ, num_fail, finished_path, failed_path = result
227        self.resultpath = [finished_path, failed_path]
228        log_contents = open(self.logfile, 'rb').read()
229        doctest.run_docstring_examples(
230            self.test_log, locals(), False, 'test_log', None, optionflags)
231        return
232
233#def test_suite():
234#    return unittest.TestSuite()
Note: See TracBrowser for help on using the repository browser.