source: main/waeup.kofa/trunk/src/waeup/kofa/utils/tests/test_batching.py @ 8213

Last change on this file since 8213 was 8202, checked in by Henrik Bettermann, 12 years ago

Empty If no value is provided in import files, attributes must not be cleared. Clear attribute only if value == DELETIONMARKER.

  • Property svn:keywords set to Id
File size: 10.9 KB
Line 
1## $Id: test_batching.py 8202 2012-04-18 05:12:32Z henrik $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
18import doctest
19import logging
20import os
21import shutil
22import tempfile
23import unittest
24from zope import schema
25from zope.component import provideUtility
26from zope.component.factory import Factory
27from zope.component.hooks import clearSite
28from zope.component.interfaces import IFactory
29from zope.interface import Interface, implements, verify
30from waeup.kofa.app import University
31from waeup.kofa.interfaces import ICSVExporter
32from waeup.kofa.testing import FunctionalLayer, FunctionalTestCase
33from waeup.kofa.utils.batching import ExporterBase
34
35optionflags = (
36    doctest.REPORT_NDIFF + doctest.ELLIPSIS + doctest.NORMALIZE_WHITESPACE)
37
38
39class ICave(Interface):
40    """A cave."""
41    id_num = schema.TextLine(
42        title = u'internal id',
43        default = u'default',
44        required = True,
45        readonly = True,
46        )
47    name = schema.TextLine(
48        title = u'Cave name',
49        default = u'Unnamed',
50        required = True)
51    dinoports = schema.Int(
52        title = u'Number of DinoPorts (tm)',
53        required = False,
54        default = 1)
55    owner = schema.TextLine(
56        title = u'Owner name',
57        required = True,
58        missing_value = 'Fred Estates Inc.')
59    taxpayer = schema.Bool(
60        title = u'Payes taxes',
61        required = True,
62        default = False)
63
64class Cave(object):
65    implements(ICave)
66    def __init__(self, name=u'Unnamed', dinoports=2,
67                 owner='Fred Estates Inc.', taxpayer=False):
68        self.name = name
69        self.dinoports = 2
70        self.owner = owner
71        self.taxpayer = taxpayer
72#Cave = attrs_to_fields(Cave)
73
74stoneville = dict
75
76from waeup.kofa.utils.batching import BatchProcessor
77class CaveProcessor(BatchProcessor):
78    util_name = 'caveprocessor'
79    #grok.name(util_name)
80    name = 'Cave Processor'
81    iface = ICave
82    location_fields = ['name']
83    factory_name = 'Lovely Cave'
84
85    def parentsExist(self, row, site):
86        return True
87
88    def getParent(self, row, site):
89        return stoneville
90
91    def entryExists(self, row, site):
92        return row['name'] in stoneville.keys()
93
94    def getEntry(self, row, site):
95        if not self.entryExists(row, site):
96            return None
97        return stoneville[row['name']]
98
99    def delEntry(self, row, site):
100        del stoneville[row['name']]
101
102    def addEntry(self, obj, row, site):
103        stoneville[row['name']] = obj
104
105    def updateEntry(self, obj, row, site):
106        for key, value in row.items():
107            if value is not None:
108                setattr(obj, key, value)
109
110class BatchProcessorTests(FunctionalTestCase):
111
112    layer = FunctionalLayer
113
114    def setupLogger(self):
115
116        self.logger = logging.getLogger('stoneville')
117        self.logger.setLevel(logging.DEBUG)
118        self.logger.propagate = False
119        self.logfile = os.path.join(self.workdir, 'stoneville.log')
120        self.handler = logging.FileHandler(self.logfile, 'w')
121        self.logger.addHandler(self.handler)
122
123    def setUp(self):
124        global stoneville
125        super(BatchProcessorTests, self).setUp()
126
127        # Setup a sample site for each test
128        app = University()
129        self.dc_root = tempfile.mkdtemp()
130        app['datacenter'].setStoragePath(self.dc_root)
131
132        # Prepopulate the ZODB...
133        self.getRootFolder()['app'] = app
134        self.app = self.getRootFolder()['app']
135
136        self.workdir = tempfile.mkdtemp()
137        factory = Factory(Cave)
138        provideUtility(factory, IFactory, 'Lovely Cave')
139
140        # Provide sample data
141        self.newcomers_csv = os.path.join(self.workdir, 'newcomers.csv')
142        open(self.newcomers_csv, 'wb').write(
143            """name,dinoports,owner,taxpayer
144Barneys Home,2,Barney,1
145Wilmas Asylum,1,Wilma,1
146Freds Dinoburgers,10,Fred,0
147Joeys Drive-in,110,Joey,0
148""")
149        self.setupLogger()
150        self.stoneville = stoneville
151        stoneville = dict()
152        self.resultpath = None
153        return
154
155    def tearDown(self):
156        super(BatchProcessorTests, self).tearDown()
157        shutil.rmtree(self.workdir)
158        shutil.rmtree(self.dc_root)
159        self.logger.removeHandler(self.handler)
160        clearSite()
161        if not isinstance(self.resultpath, list):
162            self.resultpath = [self.resultpath]
163        for path in self.resultpath:
164            if not isinstance(path, basestring):
165                continue
166            if not os.path.isdir(path):
167                path = os.path.dirname(path)
168            if os.path.exists(path):
169                shutil.rmtree(path)
170        return
171
172    def test_import(self):
173        processor = CaveProcessor()
174        result = processor.doImport(
175            self.newcomers_csv,
176            ['name', 'dinoports', 'owner', 'taxpayer'],
177            mode='create', user='Bob', logger=self.logger)
178        num_succ, num_fail, finished_path, failed_path = result
179        self.resultpath = [finished_path, failed_path]
180        assert num_succ == 4
181        assert num_fail == 0
182        assert finished_path.endswith('/newcomers.finished.csv')
183        assert failed_path is None
184
185    def test_import_stoneville(self):
186        processor = CaveProcessor()
187        result = processor.doImport(
188            self.newcomers_csv,
189            ['name', 'dinoports', 'owner', 'taxpayer'],
190            mode='create', user='Bob', logger=self.logger)
191        num_succ, num_fail, finished_path, failed_path = result
192        self.resultpath = [finished_path, failed_path]
193        assert len(self.stoneville) == 4
194        self.assertEqual(
195            sorted(self.stoneville.keys()),
196            [u'Barneys Home', u'Freds Dinoburgers',
197             u'Joeys Drive-in', u'Wilmas Asylum'])
198
199    def test_import_correct_type(self):
200        processor = CaveProcessor()
201        result = processor.doImport(
202            self.newcomers_csv,
203            ['name', 'dinoports', 'owner', 'taxpayer'],
204            mode='create', user='Bob', logger=self.logger)
205        num_succ, num_fail, finished_path, failed_path = result
206        self.resultpath = [finished_path, failed_path]
207        assert isinstance(self.stoneville['Barneys Home'].dinoports, int)
208
209
210    def test_log(self):
211        """
212           >>> print log_contents
213           --------------------
214           Bob: Batch processing finished: OK
215           Bob: Source: /.../newcomers.csv
216           Bob: Mode: create
217           Bob: User: Bob
218           Bob: Processing time: ... s (... s/item)
219           Bob: Processed: 4 lines (4 successful/ 0 failed)
220           --------------------
221
222        """
223        processor = CaveProcessor()
224        result = processor.doImport(
225            self.newcomers_csv,
226            ['name', 'dinoports', 'owner', 'taxpayer'],
227            mode='create', user='Bob', logger=self.logger)
228        num_succ, num_fail, finished_path, failed_path = result
229        self.resultpath = [finished_path, failed_path]
230        log_contents = open(self.logfile, 'rb').read()
231        doctest.run_docstring_examples(
232            self.test_log, locals(), False, 'test_log', None, optionflags)
233        return
234
235class ExporterBaseTests(unittest.TestCase):
236
237    def setUp(self):
238        self.workdir = tempfile.mkdtemp()
239        self.workfile = os.path.join(self.workdir, 'testfile.csv')
240        return
241
242    def tearDown(self):
243        shutil.rmtree(self.workdir)
244        return
245
246    def test_iface(self):
247        # ExporterBase really implements the promised interface.
248        obj = ExporterBase()
249        verify.verifyClass(ICSVExporter, ExporterBase)
250        verify.verifyObject(ICSVExporter, obj)
251        return
252
253    def test_unimplemented(self):
254        # make sure the not implemented methods signal that.
255        exporter = ExporterBase()
256        self.assertRaises(NotImplementedError, exporter.export_all, None)
257        self.assertRaises(NotImplementedError, exporter.export, None)
258        return
259
260    def test_mangle_value(self):
261        # some basic types are mangled correctly
262        exporter = ExporterBase()
263        result1 = exporter.mangle_value(True, 'foo')
264        result2 = exporter.mangle_value(False, 'foo')
265        result3 = exporter.mangle_value('string', 'foo')
266        result4 = exporter.mangle_value(u'string', 'foo')
267        result5 = exporter.mangle_value(None, 'foo')
268        self.assertEqual(
269            (result1, result2, result3, result4, result5),
270            ('1', '0', u'string', u'string', ''))
271        self.assertEqual(type(result3), type('string'))
272        self.assertEqual(type(result4), type('string'))
273        return
274
275    def test_get_csv_writer(self):
276        # we can get a CSV writer to a memory file
277        exporter = ExporterBase()
278        writer, outfile = exporter.get_csv_writer()
279        writer.writerow(dict(code='A', title='B', title_prefix='C'))
280        outfile.seek(0)
281        self.assertEqual(
282            outfile.read(),
283            'code,title,title_prefix\r\nA,B,C\r\n')
284        return
285
286    def test_get_csv_writer_with_file(self):
287        # we can get CSV writer that writes to a real file
288        exporter = ExporterBase()
289        writer, outfile = exporter.get_csv_writer(filepath=self.workfile)
290        writer.writerow(dict(code='A', title='B', title_prefix='C'))
291        outfile.close()
292        resultfile = open(self.workfile, 'rb')
293        self.assertEqual(
294            resultfile.read(),
295            'code,title,title_prefix\r\nA,B,C\r\n')
296        return
297
298    def test_write_item(self):
299        # we can write items to opened exporter files.
300        exporter = ExporterBase()
301        writer, outfile = exporter.get_csv_writer()
302        class Sample(object):
303            code = 'A'
304            title = u'B'
305            title_prefix = True
306        exporter.write_item(Sample(), writer)
307        outfile.seek(0)
308        self.assertEqual(
309            outfile.read(),
310            'code,title,title_prefix\r\nA,B,1\r\n')
311        return
312
313    def test_close_outfile(self):
314        # exporters can help to close outfiles.
315        exporter = ExporterBase()
316        writer, outfile = exporter.get_csv_writer()
317        result = exporter.close_outfile(None, outfile)
318        self.assertEqual(result, 'code,title,title_prefix\r\n')
319        return
320
321    def test_close_outfile_real(self):
322        # we can also close outfiles in real files.
323        exporter = ExporterBase()
324        writer, outfile = exporter.get_csv_writer(filepath=self.workfile)
325        result = exporter.close_outfile(self.workfile, outfile)
326        self.assertEqual(result, None)
327        return
Note: See TracBrowser for help on using the repository browser.