source: main/waeup.kofa/trunk/src/waeup/kofa/utils/tests/test_batching.py @ 8228

Last change on this file since 8228 was 8228, checked in by Henrik Bettermann, 13 years ago

I think it's better use the original updateEntry method even if it makes no difference here.

  • Property svn:keywords set to Id
File size: 11.0 KB
Line 
1## $Id: test_batching.py 8228 2012-04-20 06:05:56Z henrik $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
18import doctest
19import logging
20import os
21import shutil
22import tempfile
23import unittest
24from zope import schema
25from zope.component import provideUtility
26from zope.component.factory import Factory
27from zope.component.hooks import clearSite
28from zope.component.interfaces import IFactory
29from zope.interface import Interface, implements, verify
30from waeup.kofa.app import University
31from waeup.kofa.interfaces import ICSVExporter, IBatchProcessor
32from waeup.kofa.testing import FunctionalLayer, FunctionalTestCase
33from waeup.kofa.utils.batching import ExporterBase
34
35optionflags = (
36    doctest.REPORT_NDIFF + doctest.ELLIPSIS + doctest.NORMALIZE_WHITESPACE)
37
38
39class ICave(Interface):
40    """A cave."""
41    id_num = schema.TextLine(
42        title = u'internal id',
43        default = u'default',
44        required = True,
45        readonly = True,
46        )
47    name = schema.TextLine(
48        title = u'Cave name',
49        default = u'Unnamed',
50        required = True)
51    dinoports = schema.Int(
52        title = u'Number of DinoPorts (tm)',
53        required = False,
54        default = 1)
55    owner = schema.TextLine(
56        title = u'Owner name',
57        required = True,
58        missing_value = 'Fred Estates Inc.')
59    taxpayer = schema.Bool(
60        title = u'Payes taxes',
61        required = True,
62        default = False)
63
64class Cave(object):
65    implements(ICave)
66    def __init__(self, name=u'Unnamed', dinoports=2,
67                 owner='Fred Estates Inc.', taxpayer=False):
68        self.name = name
69        self.dinoports = 2
70        self.owner = owner
71        self.taxpayer = taxpayer
72#Cave = attrs_to_fields(Cave)
73
74stoneville = dict
75
76from waeup.kofa.utils.batching import BatchProcessor
77class CaveProcessor(BatchProcessor):
78    util_name = 'caveprocessor'
79    #grok.name(util_name)
80    name = 'Cave Processor'
81    iface = ICave
82    location_fields = ['name']
83    factory_name = 'Lovely Cave'
84
85    def parentsExist(self, row, site):
86        return True
87
88    def getParent(self, row, site):
89        return stoneville
90
91    def entryExists(self, row, site):
92        return row['name'] in stoneville.keys()
93
94    def getEntry(self, row, site):
95        if not self.entryExists(row, site):
96            return None
97        return stoneville[row['name']]
98
99    def delEntry(self, row, site):
100        del stoneville[row['name']]
101
102    def addEntry(self, obj, row, site):
103        stoneville[row['name']] = obj
104
105class BatchProcessorTests(FunctionalTestCase):
106
107    layer = FunctionalLayer
108
109    def setupLogger(self):
110
111        self.logger = logging.getLogger('stoneville')
112        self.logger.setLevel(logging.DEBUG)
113        self.logger.propagate = False
114        self.logfile = os.path.join(self.workdir, 'stoneville.log')
115        self.handler = logging.FileHandler(self.logfile, 'w')
116        self.logger.addHandler(self.handler)
117
118    def setUp(self):
119        global stoneville
120        super(BatchProcessorTests, self).setUp()
121
122        # Setup a sample site for each test
123        app = University()
124        self.dc_root = tempfile.mkdtemp()
125        app['datacenter'].setStoragePath(self.dc_root)
126
127        # Prepopulate the ZODB...
128        self.getRootFolder()['app'] = app
129        self.app = self.getRootFolder()['app']
130
131        self.workdir = tempfile.mkdtemp()
132        factory = Factory(Cave)
133        provideUtility(factory, IFactory, 'Lovely Cave')
134
135        # Provide sample data
136        self.newcomers_csv = os.path.join(self.workdir, 'newcomers.csv')
137        open(self.newcomers_csv, 'wb').write(
138            """name,dinoports,owner,taxpayer
139Barneys Home,2,Barney,1
140Wilmas Asylum,1,Wilma,1
141Freds Dinoburgers,10,Fred,0
142Joeys Drive-in,110,Joey,0
143""")
144        self.setupLogger()
145        self.stoneville = stoneville
146        stoneville = dict()
147        self.resultpath = None
148        return
149
150    def tearDown(self):
151        super(BatchProcessorTests, self).tearDown()
152        shutil.rmtree(self.workdir)
153        shutil.rmtree(self.dc_root)
154        self.logger.removeHandler(self.handler)
155        clearSite()
156        if not isinstance(self.resultpath, list):
157            self.resultpath = [self.resultpath]
158        for path in self.resultpath:
159            if not isinstance(path, basestring):
160                continue
161            if not os.path.isdir(path):
162                path = os.path.dirname(path)
163            if os.path.exists(path):
164                shutil.rmtree(path)
165        return
166
167    def test_iface(self):
168        # make sure we fullfill interface contracts
169        obj = BatchProcessor()
170        verify.verifyClass(IBatchProcessor, BatchProcessor)
171        verify.verifyObject(IBatchProcessor, obj)
172        return
173
174    def test_import(self):
175        processor = CaveProcessor()
176        result = processor.doImport(
177            self.newcomers_csv,
178            ['name', 'dinoports', 'owner', 'taxpayer'],
179            mode='create', user='Bob', logger=self.logger)
180        num_succ, num_fail, finished_path, failed_path = result
181        self.resultpath = [finished_path, failed_path]
182        assert num_succ == 4
183        assert num_fail == 0
184        assert finished_path.endswith('/newcomers.finished.csv')
185        assert failed_path is None
186
187    def test_import_stoneville(self):
188        processor = CaveProcessor()
189        result = processor.doImport(
190            self.newcomers_csv,
191            ['name', 'dinoports', 'owner', 'taxpayer'],
192            mode='create', user='Bob', logger=self.logger)
193        num_succ, num_fail, finished_path, failed_path = result
194        self.resultpath = [finished_path, failed_path]
195        assert len(self.stoneville) == 4
196        self.assertEqual(
197            sorted(self.stoneville.keys()),
198            [u'Barneys Home', u'Freds Dinoburgers',
199             u'Joeys Drive-in', u'Wilmas Asylum'])
200
201    def test_import_correct_type(self):
202        processor = CaveProcessor()
203        result = processor.doImport(
204            self.newcomers_csv,
205            ['name', 'dinoports', 'owner', 'taxpayer'],
206            mode='create', user='Bob', logger=self.logger)
207        num_succ, num_fail, finished_path, failed_path = result
208        self.resultpath = [finished_path, failed_path]
209        assert isinstance(self.stoneville['Barneys Home'].dinoports, int)
210
211
212    def test_log(self):
213        """
214           >>> print log_contents
215           --------------------
216           Bob: Batch processing finished: OK
217           Bob: Source: /.../newcomers.csv
218           Bob: Mode: create
219           Bob: User: Bob
220           Bob: Processing time: ... s (... s/item)
221           Bob: Processed: 4 lines (4 successful/ 0 failed)
222           --------------------
223
224        """
225        processor = CaveProcessor()
226        result = processor.doImport(
227            self.newcomers_csv,
228            ['name', 'dinoports', 'owner', 'taxpayer'],
229            mode='create', user='Bob', logger=self.logger)
230        num_succ, num_fail, finished_path, failed_path = result
231        self.resultpath = [finished_path, failed_path]
232        log_contents = open(self.logfile, 'rb').read()
233        doctest.run_docstring_examples(
234            self.test_log, locals(), False, 'test_log', None, optionflags)
235        return
236
237class ExporterBaseTests(unittest.TestCase):
238
239    def setUp(self):
240        self.workdir = tempfile.mkdtemp()
241        self.workfile = os.path.join(self.workdir, 'testfile.csv')
242        return
243
244    def tearDown(self):
245        shutil.rmtree(self.workdir)
246        return
247
248    def test_iface(self):
249        # ExporterBase really implements the promised interface.
250        obj = ExporterBase()
251        verify.verifyClass(ICSVExporter, ExporterBase)
252        verify.verifyObject(ICSVExporter, obj)
253        return
254
255    def test_unimplemented(self):
256        # make sure the not implemented methods signal that.
257        exporter = ExporterBase()
258        self.assertRaises(NotImplementedError, exporter.export_all, None)
259        self.assertRaises(NotImplementedError, exporter.export, None)
260        return
261
262    def test_mangle_value(self):
263        # some basic types are mangled correctly
264        exporter = ExporterBase()
265        result1 = exporter.mangle_value(True, 'foo')
266        result2 = exporter.mangle_value(False, 'foo')
267        result3 = exporter.mangle_value('string', 'foo')
268        result4 = exporter.mangle_value(u'string', 'foo')
269        result5 = exporter.mangle_value(None, 'foo')
270        self.assertEqual(
271            (result1, result2, result3, result4, result5),
272            ('1', '0', u'string', u'string', ''))
273        self.assertEqual(type(result3), type('string'))
274        self.assertEqual(type(result4), type('string'))
275        return
276
277    def test_get_csv_writer(self):
278        # we can get a CSV writer to a memory file
279        exporter = ExporterBase()
280        writer, outfile = exporter.get_csv_writer()
281        writer.writerow(dict(code='A', title='B', title_prefix='C'))
282        outfile.seek(0)
283        self.assertEqual(
284            outfile.read(),
285            'code,title,title_prefix\r\nA,B,C\r\n')
286        return
287
288    def test_get_csv_writer_with_file(self):
289        # we can get CSV writer that writes to a real file
290        exporter = ExporterBase()
291        writer, outfile = exporter.get_csv_writer(filepath=self.workfile)
292        writer.writerow(dict(code='A', title='B', title_prefix='C'))
293        outfile.close()
294        resultfile = open(self.workfile, 'rb')
295        self.assertEqual(
296            resultfile.read(),
297            'code,title,title_prefix\r\nA,B,C\r\n')
298        return
299
300    def test_write_item(self):
301        # we can write items to opened exporter files.
302        exporter = ExporterBase()
303        writer, outfile = exporter.get_csv_writer()
304        class Sample(object):
305            code = 'A'
306            title = u'B'
307            title_prefix = True
308        exporter.write_item(Sample(), writer)
309        outfile.seek(0)
310        self.assertEqual(
311            outfile.read(),
312            'code,title,title_prefix\r\nA,B,1\r\n')
313        return
314
315    def test_close_outfile(self):
316        # exporters can help to close outfiles.
317        exporter = ExporterBase()
318        writer, outfile = exporter.get_csv_writer()
319        result = exporter.close_outfile(None, outfile)
320        self.assertEqual(result, 'code,title,title_prefix\r\n')
321        return
322
323    def test_close_outfile_real(self):
324        # we can also close outfiles in real files.
325        exporter = ExporterBase()
326        writer, outfile = exporter.get_csv_writer(filepath=self.workfile)
327        result = exporter.close_outfile(self.workfile, outfile)
328        self.assertEqual(result, None)
329        return
Note: See TracBrowser for help on using the repository browser.