source: main/waeup.kofa/trunk/src/waeup/kofa/utils/tests/test_batching.py @ 7941

Last change on this file since 7941 was 7858, checked in by uli, 13 years ago

Fix testname.

  • Property svn:keywords set to Id
File size: 11.0 KB
Line 
1## $Id: test_batching.py 7858 2012-03-13 01:16:14Z uli $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
18import doctest
19import logging
20import os
21import shutil
22import tempfile
23import unittest
24from zope import schema
25from zope.component import provideUtility
26from zope.component.factory import Factory
27from zope.component.hooks import clearSite
28from zope.component.interfaces import IFactory
29from zope.interface import Interface, implements, verify
30from waeup.kofa.app import University
31from waeup.kofa.interfaces import ICSVExporter
32from waeup.kofa.testing import FunctionalLayer, FunctionalTestCase
33from waeup.kofa.utils.batching import ExporterBase
34
35optionflags = (
36    doctest.REPORT_NDIFF + doctest.ELLIPSIS + doctest.NORMALIZE_WHITESPACE)
37
38
39class ICave(Interface):
40    """A cave."""
41    id_num = schema.TextLine(
42        title = u'internal id',
43        default = u'default',
44        required = True,
45        readonly = True,
46        )
47    name = schema.TextLine(
48        title = u'Cave name',
49        default = u'Unnamed',
50        required = True)
51    dinoports = schema.Int(
52        title = u'Number of DinoPorts (tm)',
53        required = False,
54        default = 1)
55    owner = schema.TextLine(
56        title = u'Owner name',
57        required = True,
58        missing_value = 'Fred Estates Inc.')
59    taxpayer = schema.Bool(
60        title = u'Payes taxes',
61        required = True,
62        default = False)
63
64class Cave(object):
65    implements(ICave)
66    def __init__(self, name=u'Unnamed', dinoports=2,
67                 owner='Fred Estates Inc.', taxpayer=False):
68        self.name = name
69        self.dinoports = 2
70        self.owner = owner
71        self.taxpayer = taxpayer
72#Cave = attrs_to_fields(Cave)
73
74stoneville = dict
75
76from waeup.kofa.utils.batching import BatchProcessor
77class CaveProcessor(BatchProcessor):
78    util_name = 'caveprocessor'
79    #grok.name(util_name)
80    name = 'Cave Processor'
81    iface = ICave
82    location_fields = ['name']
83    factory_name = 'Lovely Cave'
84
85    def parentsExist(self, row, site):
86        return True
87
88    def getParent(self, row, site):
89        return stoneville
90
91    def entryExists(self, row, site):
92        return row['name'] in stoneville.keys()
93
94    def getEntry(self, row, site):
95        if not self.entryExists(row, site):
96            return None
97        return stoneville[row['name']]
98
99    def delEntry(self, row, site):
100        del stoneville[row['name']]
101
102    def addEntry(self, obj, row, site):
103        stoneville[row['name']] = obj
104
105    def updateEntry(self, obj, row, site):
106        # This is not strictly necessary, as the default
107        # updateEntry method does exactly the same
108        for key, value in row.items():
109            setattr(obj, key, value)
110
111class BatchProcessorTests(FunctionalTestCase):
112
113    layer = FunctionalLayer
114
115    def setupLogger(self):
116
117        self.logger = logging.getLogger('stoneville')
118        self.logger.setLevel(logging.DEBUG)
119        self.logger.propagate = False
120        self.logfile = os.path.join(self.workdir, 'stoneville.log')
121        self.handler = logging.FileHandler(self.logfile, 'w')
122        self.logger.addHandler(self.handler)
123
124    def setUp(self):
125        global stoneville
126        super(BatchProcessorTests, self).setUp()
127
128        # Setup a sample site for each test
129        app = University()
130        self.dc_root = tempfile.mkdtemp()
131        app['datacenter'].setStoragePath(self.dc_root)
132
133        # Prepopulate the ZODB...
134        self.getRootFolder()['app'] = app
135        self.app = self.getRootFolder()['app']
136
137        self.workdir = tempfile.mkdtemp()
138        factory = Factory(Cave)
139        provideUtility(factory, IFactory, 'Lovely Cave')
140
141        # Provide sample data
142        self.newcomers_csv = os.path.join(self.workdir, 'newcomers.csv')
143        open(self.newcomers_csv, 'wb').write(
144            """name,dinoports,owner,taxpayer
145Barneys Home,2,Barney,1
146Wilmas Asylum,1,Wilma,1
147Freds Dinoburgers,10,Fred,0
148Joeys Drive-in,110,Joey,0
149""")
150        self.setupLogger()
151        self.stoneville = stoneville
152        stoneville = dict()
153        self.resultpath = None
154        return
155
156    def tearDown(self):
157        super(BatchProcessorTests, self).tearDown()
158        shutil.rmtree(self.workdir)
159        shutil.rmtree(self.dc_root)
160        self.logger.removeHandler(self.handler)
161        clearSite()
162        if not isinstance(self.resultpath, list):
163            self.resultpath = [self.resultpath]
164        for path in self.resultpath:
165            if not isinstance(path, basestring):
166                continue
167            if not os.path.isdir(path):
168                path = os.path.dirname(path)
169            if os.path.exists(path):
170                shutil.rmtree(path)
171        return
172
173    def test_import(self):
174        processor = CaveProcessor()
175        result = processor.doImport(
176            self.newcomers_csv,
177            ['name', 'dinoports', 'owner', 'taxpayer'],
178            mode='create', user='Bob', logger=self.logger)
179        num_succ, num_fail, finished_path, failed_path = result
180        self.resultpath = [finished_path, failed_path]
181        assert num_succ == 4
182        assert num_fail == 0
183        assert finished_path.endswith('/newcomers.finished.csv')
184        assert failed_path is None
185
186    def test_import_stoneville(self):
187        processor = CaveProcessor()
188        result = processor.doImport(
189            self.newcomers_csv,
190            ['name', 'dinoports', 'owner', 'taxpayer'],
191            mode='create', user='Bob', logger=self.logger)
192        num_succ, num_fail, finished_path, failed_path = result
193        self.resultpath = [finished_path, failed_path]
194        assert len(self.stoneville) == 4
195        self.assertEqual(
196            sorted(self.stoneville.keys()),
197            [u'Barneys Home', u'Freds Dinoburgers',
198             u'Joeys Drive-in', u'Wilmas Asylum'])
199
200    def test_import_correct_type(self):
201        processor = CaveProcessor()
202        result = processor.doImport(
203            self.newcomers_csv,
204            ['name', 'dinoports', 'owner', 'taxpayer'],
205            mode='create', user='Bob', logger=self.logger)
206        num_succ, num_fail, finished_path, failed_path = result
207        self.resultpath = [finished_path, failed_path]
208        assert isinstance(self.stoneville['Barneys Home'].dinoports, int)
209
210
211    def test_log(self):
212        """
213           >>> print log_contents
214           --------------------
215           Bob: Batch processing finished: OK
216           Bob: Source: /.../newcomers.csv
217           Bob: Mode: create
218           Bob: User: Bob
219           Bob: Processing time: ... s (... s/item)
220           Bob: Processed: 4 lines (4 successful/ 0 failed)
221           --------------------
222
223        """
224        processor = CaveProcessor()
225        result = processor.doImport(
226            self.newcomers_csv,
227            ['name', 'dinoports', 'owner', 'taxpayer'],
228            mode='create', user='Bob', logger=self.logger)
229        num_succ, num_fail, finished_path, failed_path = result
230        self.resultpath = [finished_path, failed_path]
231        log_contents = open(self.logfile, 'rb').read()
232        doctest.run_docstring_examples(
233            self.test_log, locals(), False, 'test_log', None, optionflags)
234        return
235
236class ExporterBaseTests(unittest.TestCase):
237
238    def setUp(self):
239        self.workdir = tempfile.mkdtemp()
240        self.workfile = os.path.join(self.workdir, 'testfile.csv')
241        return
242
243    def tearDown(self):
244        shutil.rmtree(self.workdir)
245        return
246
247    def test_iface(self):
248        # ExporterBase really implements the promised interface.
249        obj = ExporterBase()
250        verify.verifyClass(ICSVExporter, ExporterBase)
251        verify.verifyObject(ICSVExporter, obj)
252        return
253
254    def test_unimplemented(self):
255        # make sure the not implemented methods signal that.
256        exporter = ExporterBase()
257        self.assertRaises(NotImplementedError, exporter.export_all, None)
258        self.assertRaises(NotImplementedError, exporter.export, None)
259        return
260
261    def test_mangle_value(self):
262        # some basic types are mangled correctly
263        exporter = ExporterBase()
264        result1 = exporter.mangle_value(True, 'foo')
265        result2 = exporter.mangle_value(False, 'foo')
266        result3 = exporter.mangle_value('string', 'foo')
267        result4 = exporter.mangle_value(u'string', 'foo')
268        result5 = exporter.mangle_value(None, 'foo')
269        self.assertEqual(
270            (result1, result2, result3, result4, result5),
271            ('1', '0', u'string', u'string', ''))
272        self.assertEqual(type(result3), type('string'))
273        self.assertEqual(type(result4), type('string'))
274        return
275
276    def test_get_csv_writer(self):
277        # we can get a CSV writer to a memory file
278        exporter = ExporterBase()
279        writer, outfile = exporter.get_csv_writer()
280        writer.writerow(dict(code='A', title='B', title_prefix='C'))
281        outfile.seek(0)
282        self.assertEqual(
283            outfile.read(),
284            'code,title,title_prefix\r\nA,B,C\r\n')
285        return
286
287    def test_get_csv_writer_with_file(self):
288        # we can get CSV writer that writes to a real file
289        exporter = ExporterBase()
290        writer, outfile = exporter.get_csv_writer(filepath=self.workfile)
291        writer.writerow(dict(code='A', title='B', title_prefix='C'))
292        outfile.close()
293        resultfile = open(self.workfile, 'rb')
294        self.assertEqual(
295            resultfile.read(),
296            'code,title,title_prefix\r\nA,B,C\r\n')
297        return
298
299    def test_write_item(self):
300        # we can write items to opened exporter files.
301        exporter = ExporterBase()
302        writer, outfile = exporter.get_csv_writer()
303        class Sample(object):
304            code = 'A'
305            title = u'B'
306            title_prefix = True
307        exporter.write_item(Sample(), writer)
308        outfile.seek(0)
309        self.assertEqual(
310            outfile.read(),
311            'code,title,title_prefix\r\nA,B,1\r\n')
312        return
313
314    def test_close_outfile(self):
315        # exporters can help to close outfiles.
316        exporter = ExporterBase()
317        writer, outfile = exporter.get_csv_writer()
318        result = exporter.close_outfile(None, outfile)
319        self.assertEqual(result, 'code,title,title_prefix\r\n')
320        return
321
322    def test_close_outfile_real(self):
323        # we can also close outfiles in real files.
324        exporter = ExporterBase()
325        writer, outfile = exporter.get_csv_writer(filepath=self.workfile)
326        result = exporter.close_outfile(self.workfile, outfile)
327        self.assertEqual(result, None)
328        return
Note: See TracBrowser for help on using the repository browser.