source: main/waeup.kofa/trunk/src/waeup/kofa/utils/tests/test_batching.py @ 9053

Last change on this file since 9053 was 8380, checked in by uli, 13 years ago

Add hash by default when exporting dates.

  • Property svn:keywords set to Id
File size: 11.4 KB
Line 
1## $Id: test_batching.py 8380 2012-05-07 10:15:21Z uli $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
18import datetime
19import doctest
20import logging
21import os
22import shutil
23import tempfile
24import unittest
25from zope import schema
26from zope.component import provideUtility
27from zope.component.factory import Factory
28from zope.component.hooks import clearSite
29from zope.component.interfaces import IFactory
30from zope.interface import Interface, implements, verify
31from waeup.kofa.app import University
32from waeup.kofa.interfaces import ICSVExporter, IBatchProcessor
33from waeup.kofa.testing import FunctionalLayer, FunctionalTestCase
34from waeup.kofa.utils.batching import ExporterBase
35
36optionflags = (
37    doctest.REPORT_NDIFF + doctest.ELLIPSIS + doctest.NORMALIZE_WHITESPACE)
38
39
40class ICave(Interface):
41    """A cave."""
42    id_num = schema.TextLine(
43        title = u'internal id',
44        default = u'default',
45        required = True,
46        readonly = True,
47        )
48    name = schema.TextLine(
49        title = u'Cave name',
50        default = u'Unnamed',
51        required = True)
52    dinoports = schema.Int(
53        title = u'Number of DinoPorts (tm)',
54        required = False,
55        default = 1)
56    owner = schema.TextLine(
57        title = u'Owner name',
58        required = True,
59        missing_value = 'Fred Estates Inc.')
60    taxpayer = schema.Bool(
61        title = u'Payes taxes',
62        required = True,
63        default = False)
64
65class Cave(object):
66    implements(ICave)
67    def __init__(self, name=u'Unnamed', dinoports=2,
68                 owner='Fred Estates Inc.', taxpayer=False):
69        self.name = name
70        self.dinoports = 2
71        self.owner = owner
72        self.taxpayer = taxpayer
73#Cave = attrs_to_fields(Cave)
74
75stoneville = dict
76
77from waeup.kofa.utils.batching import BatchProcessor
78class CaveProcessor(BatchProcessor):
79    util_name = 'caveprocessor'
80    #grok.name(util_name)
81    name = 'Cave Processor'
82    iface = ICave
83    location_fields = ['name']
84    factory_name = 'Lovely Cave'
85
86    def parentsExist(self, row, site):
87        return True
88
89    def getParent(self, row, site):
90        return stoneville
91
92    def entryExists(self, row, site):
93        return row['name'] in stoneville.keys()
94
95    def getEntry(self, row, site):
96        if not self.entryExists(row, site):
97            return None
98        return stoneville[row['name']]
99
100    def delEntry(self, row, site):
101        del stoneville[row['name']]
102
103    def addEntry(self, obj, row, site):
104        stoneville[row['name']] = obj
105
106class BatchProcessorTests(FunctionalTestCase):
107
108    layer = FunctionalLayer
109
110    def setupLogger(self):
111
112        self.logger = logging.getLogger('stoneville')
113        self.logger.setLevel(logging.DEBUG)
114        self.logger.propagate = False
115        self.logfile = os.path.join(self.workdir, 'stoneville.log')
116        self.handler = logging.FileHandler(self.logfile, 'w')
117        self.logger.addHandler(self.handler)
118
119    def setUp(self):
120        global stoneville
121        super(BatchProcessorTests, self).setUp()
122
123        # Setup a sample site for each test
124        app = University()
125        self.dc_root = tempfile.mkdtemp()
126        app['datacenter'].setStoragePath(self.dc_root)
127
128        # Prepopulate the ZODB...
129        self.getRootFolder()['app'] = app
130        self.app = self.getRootFolder()['app']
131
132        self.workdir = tempfile.mkdtemp()
133        factory = Factory(Cave)
134        provideUtility(factory, IFactory, 'Lovely Cave')
135
136        # Provide sample data
137        self.newcomers_csv = os.path.join(self.workdir, 'newcomers.csv')
138        open(self.newcomers_csv, 'wb').write(
139            """name,dinoports,owner,taxpayer
140Barneys Home,2,Barney,1
141Wilmas Asylum,1,Wilma,1
142Freds Dinoburgers,10,Fred,0
143Joeys Drive-in,110,Joey,0
144""")
145        self.setupLogger()
146        self.stoneville = stoneville
147        stoneville = dict()
148        self.resultpath = None
149        return
150
151    def tearDown(self):
152        super(BatchProcessorTests, self).tearDown()
153        shutil.rmtree(self.workdir)
154        shutil.rmtree(self.dc_root)
155        self.logger.removeHandler(self.handler)
156        clearSite()
157        if not isinstance(self.resultpath, list):
158            self.resultpath = [self.resultpath]
159        for path in self.resultpath:
160            if not isinstance(path, basestring):
161                continue
162            if not os.path.isdir(path):
163                path = os.path.dirname(path)
164            if os.path.exists(path):
165                shutil.rmtree(path)
166        return
167
168    def test_iface(self):
169        # make sure we fullfill interface contracts
170        obj = BatchProcessor()
171        verify.verifyClass(IBatchProcessor, BatchProcessor)
172        verify.verifyObject(IBatchProcessor, obj)
173        return
174
175    def test_import(self):
176        processor = CaveProcessor()
177        result = processor.doImport(
178            self.newcomers_csv,
179            ['name', 'dinoports', 'owner', 'taxpayer'],
180            mode='create', user='Bob', logger=self.logger)
181        num_succ, num_fail, finished_path, failed_path = result
182        self.resultpath = [finished_path, failed_path]
183        assert num_succ == 4
184        assert num_fail == 0
185        assert finished_path.endswith('/newcomers.finished.csv')
186        assert failed_path is None
187
188    def test_import_stoneville(self):
189        processor = CaveProcessor()
190        result = processor.doImport(
191            self.newcomers_csv,
192            ['name', 'dinoports', 'owner', 'taxpayer'],
193            mode='create', user='Bob', logger=self.logger)
194        num_succ, num_fail, finished_path, failed_path = result
195        self.resultpath = [finished_path, failed_path]
196        assert len(self.stoneville) == 4
197        self.assertEqual(
198            sorted(self.stoneville.keys()),
199            [u'Barneys Home', u'Freds Dinoburgers',
200             u'Joeys Drive-in', u'Wilmas Asylum'])
201
202    def test_import_correct_type(self):
203        processor = CaveProcessor()
204        result = processor.doImport(
205            self.newcomers_csv,
206            ['name', 'dinoports', 'owner', 'taxpayer'],
207            mode='create', user='Bob', logger=self.logger)
208        num_succ, num_fail, finished_path, failed_path = result
209        self.resultpath = [finished_path, failed_path]
210        assert isinstance(self.stoneville['Barneys Home'].dinoports, int)
211
212
213    def test_log(self):
214        """
215           >>> print log_contents
216           --------------------
217           Bob: Batch processing finished: OK
218           Bob: Source: /.../newcomers.csv
219           Bob: Mode: create
220           Bob: User: Bob
221           Bob: Processing time: ... s (... s/item)
222           Bob: Processed: 4 lines (4 successful/ 0 failed)
223           --------------------
224
225        """
226        processor = CaveProcessor()
227        result = processor.doImport(
228            self.newcomers_csv,
229            ['name', 'dinoports', 'owner', 'taxpayer'],
230            mode='create', user='Bob', logger=self.logger)
231        num_succ, num_fail, finished_path, failed_path = result
232        self.resultpath = [finished_path, failed_path]
233        log_contents = open(self.logfile, 'rb').read()
234        doctest.run_docstring_examples(
235            self.test_log, locals(), False, 'test_log', None, optionflags)
236        return
237
238class ExporterBaseTests(unittest.TestCase):
239
240    def setUp(self):
241        self.workdir = tempfile.mkdtemp()
242        self.workfile = os.path.join(self.workdir, 'testfile.csv')
243        return
244
245    def tearDown(self):
246        shutil.rmtree(self.workdir)
247        return
248
249    def test_iface(self):
250        # ExporterBase really implements the promised interface.
251        obj = ExporterBase()
252        verify.verifyClass(ICSVExporter, ExporterBase)
253        verify.verifyObject(ICSVExporter, obj)
254        return
255
256    def test_unimplemented(self):
257        # make sure the not implemented methods signal that.
258        exporter = ExporterBase()
259        self.assertRaises(NotImplementedError, exporter.export_all, None)
260        self.assertRaises(NotImplementedError, exporter.export, None)
261        return
262
263    def test_mangle_value(self):
264        # some basic types are mangled correctly
265        exporter = ExporterBase()
266        result1 = exporter.mangle_value(True, 'foo')
267        result2 = exporter.mangle_value(False, 'foo')
268        result3 = exporter.mangle_value('string', 'foo')
269        result4 = exporter.mangle_value(u'string', 'foo')
270        result5 = exporter.mangle_value(None, 'foo')
271        result6 = exporter.mangle_value(datetime.date(2012, 4, 1), 'foo')
272        result7 = exporter.mangle_value(
273            datetime.datetime(2012, 4, 1, 12, 1, 1), 'foo')
274        self.assertEqual(
275            (result1, result2, result3, result4, result5),
276            ('1', '0', u'string', u'string', ''))
277        self.assertEqual(type(result3), type('string'))
278        self.assertEqual(type(result4), type('string'))
279        # dates are formatted with trailing hash
280        self.assertEqual(result6, '2012-04-01#')
281        # datetimes are formatted as yyyy-mm-dd hh:mm:ss
282        self.assertEqual(result7, '2012-04-01 12:01:01')
283        return
284
285    def test_get_csv_writer(self):
286        # we can get a CSV writer to a memory file
287        exporter = ExporterBase()
288        writer, outfile = exporter.get_csv_writer()
289        writer.writerow(dict(code='A', title='B', title_prefix='C'))
290        outfile.seek(0)
291        self.assertEqual(
292            outfile.read(),
293            'code,title,title_prefix\r\nA,B,C\r\n')
294        return
295
296    def test_get_csv_writer_with_file(self):
297        # we can get CSV writer that writes to a real file
298        exporter = ExporterBase()
299        writer, outfile = exporter.get_csv_writer(filepath=self.workfile)
300        writer.writerow(dict(code='A', title='B', title_prefix='C'))
301        outfile.close()
302        resultfile = open(self.workfile, 'rb')
303        self.assertEqual(
304            resultfile.read(),
305            'code,title,title_prefix\r\nA,B,C\r\n')
306        return
307
308    def test_write_item(self):
309        # we can write items to opened exporter files.
310        exporter = ExporterBase()
311        writer, outfile = exporter.get_csv_writer()
312        class Sample(object):
313            code = 'A'
314            title = u'B'
315            title_prefix = True
316        exporter.write_item(Sample(), writer)
317        outfile.seek(0)
318        self.assertEqual(
319            outfile.read(),
320            'code,title,title_prefix\r\nA,B,1\r\n')
321        return
322
323    def test_close_outfile(self):
324        # exporters can help to close outfiles.
325        exporter = ExporterBase()
326        writer, outfile = exporter.get_csv_writer()
327        result = exporter.close_outfile(None, outfile)
328        self.assertEqual(result, 'code,title,title_prefix\r\n')
329        return
330
331    def test_close_outfile_real(self):
332        # we can also close outfiles in real files.
333        exporter = ExporterBase()
334        writer, outfile = exporter.get_csv_writer(filepath=self.workfile)
335        result = exporter.close_outfile(self.workfile, outfile)
336        self.assertEqual(result, None)
337        return
Note: See TracBrowser for help on using the repository browser.