source: main/waeup.kofa/branches/uli-zc-async/src/waeup/kofa/utils/tests/test_batching.py @ 9091

Last change on this file since 9091 was 9090, checked in by uli, 12 years ago

Add tests for export_job function.

  • Property svn:keywords set to Id
File size: 12.5 KB
Line 
1## $Id: test_batching.py 9090 2012-08-08 13:27:57Z uli $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
18import datetime
19import doctest
20import logging
21import os
22import shutil
23import tempfile
24import unittest
25from zope import schema
26from zope.component import provideUtility, getGlobalSiteManager
27from zope.component.factory import Factory
28from zope.component.hooks import clearSite
29from zope.component.interfaces import IFactory
30from zope.interface import Interface, implements, verify
31from waeup.kofa.app import University
32from waeup.kofa.interfaces import ICSVExporter, IBatchProcessor
33from waeup.kofa.testing import FunctionalLayer, FunctionalTestCase
34from waeup.kofa.utils.batching import (
35    ExporterBase, BatchProcessor, export_job)
36
37optionflags = (
38    doctest.REPORT_NDIFF + doctest.ELLIPSIS + doctest.NORMALIZE_WHITESPACE)
39
40
41class ICave(Interface):
42    """A cave."""
43    id_num = schema.TextLine(
44        title = u'internal id',
45        default = u'default',
46        required = True,
47        readonly = True,
48        )
49    name = schema.TextLine(
50        title = u'Cave name',
51        default = u'Unnamed',
52        required = True)
53    dinoports = schema.Int(
54        title = u'Number of DinoPorts (tm)',
55        required = False,
56        default = 1)
57    owner = schema.TextLine(
58        title = u'Owner name',
59        required = True,
60        missing_value = 'Fred Estates Inc.')
61    taxpayer = schema.Bool(
62        title = u'Payes taxes',
63        required = True,
64        default = False)
65
66class Cave(object):
67    implements(ICave)
68    def __init__(self, name=u'Unnamed', dinoports=2,
69                 owner='Fred Estates Inc.', taxpayer=False):
70        self.name = name
71        self.dinoports = 2
72        self.owner = owner
73        self.taxpayer = taxpayer
74
75stoneville = dict
76
77SAMPLE_DATA = """name,dinoports,owner,taxpayer
78Barneys Home,2,Barney,1
79Wilmas Asylum,1,Wilma,1
80Freds Dinoburgers,10,Fred,0
81Joeys Drive-in,110,Joey,0
82"""
83
84class CaveProcessor(BatchProcessor):
85    util_name = 'caveprocessor'
86    name = 'Cave Processor'
87    iface = ICave
88    location_fields = ['name']
89    factory_name = 'Lovely Cave'
90
91    def parentsExist(self, row, site):
92        return True
93
94    def getParent(self, row, site):
95        return stoneville
96
97    def entryExists(self, row, site):
98        return row['name'] in stoneville.keys()
99
100    def getEntry(self, row, site):
101        if not self.entryExists(row, site):
102            return None
103        return stoneville[row['name']]
104
105    def delEntry(self, row, site):
106        del stoneville[row['name']]
107
108    def addEntry(self, obj, row, site):
109        stoneville[row['name']] = obj
110
111class BatchProcessorTests(FunctionalTestCase):
112
113    layer = FunctionalLayer
114
115    def setupLogger(self):
116
117        self.logger = logging.getLogger('stoneville')
118        self.logger.setLevel(logging.DEBUG)
119        self.logger.propagate = False
120        self.logfile = os.path.join(self.workdir, 'stoneville.log')
121        self.handler = logging.FileHandler(self.logfile, 'w')
122        self.logger.addHandler(self.handler)
123
124    def setUp(self):
125        global stoneville
126        super(BatchProcessorTests, self).setUp()
127
128        # Setup a sample site for each test
129        app = University()
130        self.dc_root = tempfile.mkdtemp()
131        app['datacenter'].setStoragePath(self.dc_root)
132
133        # Prepopulate the ZODB...
134        self.getRootFolder()['app'] = app
135        self.app = self.getRootFolder()['app']
136
137        self.workdir = tempfile.mkdtemp()
138        factory = Factory(Cave)
139        provideUtility(factory, IFactory, 'Lovely Cave')
140
141        # Provide sample data
142        self.newcomers_csv = os.path.join(self.workdir, 'newcomers.csv')
143        open(self.newcomers_csv, 'wb').write(SAMPLE_DATA)
144        self.setupLogger()
145        self.stoneville = stoneville
146        stoneville = dict()
147        self.resultpath = None
148        return
149
150    def tearDown(self):
151        super(BatchProcessorTests, self).tearDown()
152        shutil.rmtree(self.workdir)
153        shutil.rmtree(self.dc_root)
154        self.logger.removeHandler(self.handler)
155        clearSite()
156        if not isinstance(self.resultpath, list):
157            self.resultpath = [self.resultpath]
158        for path in self.resultpath:
159            if not isinstance(path, basestring):
160                continue
161            if not os.path.isdir(path):
162                path = os.path.dirname(path)
163            if os.path.exists(path):
164                shutil.rmtree(path)
165        return
166
167    def test_iface(self):
168        # make sure we fullfill interface contracts
169        obj = BatchProcessor()
170        verify.verifyClass(IBatchProcessor, BatchProcessor)
171        verify.verifyObject(IBatchProcessor, obj)
172        return
173
174    def test_import(self):
175        processor = CaveProcessor()
176        result = processor.doImport(
177            self.newcomers_csv,
178            ['name', 'dinoports', 'owner', 'taxpayer'],
179            mode='create', user='Bob', logger=self.logger)
180        num_succ, num_fail, finished_path, failed_path = result
181        self.resultpath = [finished_path, failed_path]
182        assert num_succ == 4
183        assert num_fail == 0
184        assert finished_path.endswith('/newcomers.finished.csv')
185        assert failed_path is None
186
187    def test_import_stoneville(self):
188        processor = CaveProcessor()
189        result = processor.doImport(
190            self.newcomers_csv,
191            ['name', 'dinoports', 'owner', 'taxpayer'],
192            mode='create', user='Bob', logger=self.logger)
193        num_succ, num_fail, finished_path, failed_path = result
194        self.resultpath = [finished_path, failed_path]
195        assert len(self.stoneville) == 4
196        self.assertEqual(
197            sorted(self.stoneville.keys()),
198            [u'Barneys Home', u'Freds Dinoburgers',
199             u'Joeys Drive-in', u'Wilmas Asylum'])
200
201    def test_import_correct_type(self):
202        processor = CaveProcessor()
203        result = processor.doImport(
204            self.newcomers_csv,
205            ['name', 'dinoports', 'owner', 'taxpayer'],
206            mode='create', user='Bob', logger=self.logger)
207        num_succ, num_fail, finished_path, failed_path = result
208        self.resultpath = [finished_path, failed_path]
209        assert isinstance(self.stoneville['Barneys Home'].dinoports, int)
210
211
212    def test_log(self):
213        """
214           >>> print log_contents
215           --------------------
216           Bob: Batch processing finished: OK
217           Bob: Source: /.../newcomers.csv
218           Bob: Mode: create
219           Bob: User: Bob
220           Bob: Processing time: ... s (... s/item)
221           Bob: Processed: 4 lines (4 successful/ 0 failed)
222           --------------------
223
224        """
225        processor = CaveProcessor()
226        result = processor.doImport(
227            self.newcomers_csv,
228            ['name', 'dinoports', 'owner', 'taxpayer'],
229            mode='create', user='Bob', logger=self.logger)
230        num_succ, num_fail, finished_path, failed_path = result
231        self.resultpath = [finished_path, failed_path]
232        log_contents = open(self.logfile, 'rb').read()
233        doctest.run_docstring_examples(
234            self.test_log, locals(), False, 'test_log', None, optionflags)
235        return
236
237class ExporterBaseTests(unittest.TestCase):
238
239    def setUp(self):
240        self.workdir = tempfile.mkdtemp()
241        self.workfile = os.path.join(self.workdir, 'testfile.csv')
242        return
243
244    def tearDown(self):
245        shutil.rmtree(self.workdir)
246        return
247
248    def test_iface(self):
249        # ExporterBase really implements the promised interface.
250        obj = ExporterBase()
251        verify.verifyClass(ICSVExporter, ExporterBase)
252        verify.verifyObject(ICSVExporter, obj)
253        return
254
255    def test_unimplemented(self):
256        # make sure the not implemented methods signal that.
257        exporter = ExporterBase()
258        self.assertRaises(NotImplementedError, exporter.export_all, None)
259        self.assertRaises(NotImplementedError, exporter.export, None)
260        return
261
262    def test_mangle_value(self):
263        # some basic types are mangled correctly
264        exporter = ExporterBase()
265        result1 = exporter.mangle_value(True, 'foo')
266        result2 = exporter.mangle_value(False, 'foo')
267        result3 = exporter.mangle_value('string', 'foo')
268        result4 = exporter.mangle_value(u'string', 'foo')
269        result5 = exporter.mangle_value(None, 'foo')
270        result6 = exporter.mangle_value(datetime.date(2012, 4, 1), 'foo')
271        result7 = exporter.mangle_value(
272            datetime.datetime(2012, 4, 1, 12, 1, 1), 'foo')
273        self.assertEqual(
274            (result1, result2, result3, result4, result5),
275            ('1', '0', u'string', u'string', ''))
276        self.assertEqual(type(result3), type('string'))
277        self.assertEqual(type(result4), type('string'))
278        # dates are formatted with trailing hash
279        self.assertEqual(result6, '2012-04-01#')
280        # datetimes are formatted as yyyy-mm-dd hh:mm:ss
281        self.assertEqual(result7, '2012-04-01 12:01:01')
282        return
283
284    def test_get_csv_writer(self):
285        # we can get a CSV writer to a memory file
286        exporter = ExporterBase()
287        writer, outfile = exporter.get_csv_writer()
288        writer.writerow(dict(code='A', title='B', title_prefix='C'))
289        outfile.seek(0)
290        self.assertEqual(
291            outfile.read(),
292            'code,title,title_prefix\r\nA,B,C\r\n')
293        return
294
295    def test_get_csv_writer_with_file(self):
296        # we can get CSV writer that writes to a real file
297        exporter = ExporterBase()
298        writer, outfile = exporter.get_csv_writer(filepath=self.workfile)
299        writer.writerow(dict(code='A', title='B', title_prefix='C'))
300        outfile.close()
301        resultfile = open(self.workfile, 'rb')
302        self.assertEqual(
303            resultfile.read(),
304            'code,title,title_prefix\r\nA,B,C\r\n')
305        return
306
307    def test_write_item(self):
308        # we can write items to opened exporter files.
309        exporter = ExporterBase()
310        writer, outfile = exporter.get_csv_writer()
311        class Sample(object):
312            code = 'A'
313            title = u'B'
314            title_prefix = True
315        exporter.write_item(Sample(), writer)
316        outfile.seek(0)
317        self.assertEqual(
318            outfile.read(),
319            'code,title,title_prefix\r\nA,B,1\r\n')
320        return
321
322    def test_close_outfile(self):
323        # exporters can help to close outfiles.
324        exporter = ExporterBase()
325        writer, outfile = exporter.get_csv_writer()
326        result = exporter.close_outfile(None, outfile)
327        self.assertEqual(result, 'code,title,title_prefix\r\n')
328        return
329
330    def test_close_outfile_real(self):
331        # we can also close outfiles in real files.
332        exporter = ExporterBase()
333        writer, outfile = exporter.get_csv_writer(filepath=self.workfile)
334        result = exporter.close_outfile(self.workfile, outfile)
335        self.assertEqual(result, None)
336        return
337
338
339class CaveExporter(ExporterBase):
340    # A minimal fake exporter suitable to be called by export_jobs
341    fields = ('name', 'dinoports', 'owner', 'taxpayer')
342
343    def export_all(self, site, filepath=None):
344        if filepath is None:
345            return SAMPLE_DATA
346        open(filepath, 'wb').write(SAMPLE_DATA)
347        return
348
349class ExportJobTests(unittest.TestCase):
350    # Test asynchronous export functionality (simple cases)
351
352    def setUp(self):
353        # register a suitable ICSVExporter as named utility
354        self.exporter = CaveExporter()
355        self.gsm = getGlobalSiteManager()
356        self.gsm.registerUtility(
357            self.exporter, ICSVExporter, name='cave_exporter')
358
359    def tearDown(self):
360        self.gsm.unregisterUtility(self.exporter)
361
362    def test_export_job_func(self):
363        # the export_job func does really export data...
364        result_path = export_job(None, 'cave_exporter')
365        self.assertTrue(os.path.isfile(result_path))
366        contents = open(result_path, 'rb').read()
367        shutil.rmtree(os.path.dirname(result_path))
368        self.assertEqual(contents, SAMPLE_DATA)
369        return
Note: See TracBrowser for help on using the repository browser.