source: main/waeup.kofa/branches/uli-zc-async/src/waeup/kofa/utils/tests/test_batching.py @ 9092

Last change on this file since 9092 was 9092, checked in by uli, 12 years ago

Add interface tests for AsyncExportJob?.

  • Property svn:keywords set to Id
File size: 12.8 KB
Line 
1## $Id: test_batching.py 9092 2012-08-08 14:07:44Z uli $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
18import datetime
19import doctest
20import logging
21import os
22import shutil
23import tempfile
24import unittest
25from zc.async.interfaces import IJob
26from zope import schema
27from zope.component import provideUtility, getGlobalSiteManager
28from zope.component.factory import Factory
29from zope.component.hooks import clearSite
30from zope.component.interfaces import IFactory
31from zope.interface import Interface, implements, verify
32from waeup.kofa.app import University
33from waeup.kofa.interfaces import ICSVExporter, IBatchProcessor
34from waeup.kofa.testing import FunctionalLayer, FunctionalTestCase
35from waeup.kofa.utils.batching import (
36    ExporterBase, BatchProcessor, export_job, AsyncExportJob)
37
38optionflags = (
39    doctest.REPORT_NDIFF + doctest.ELLIPSIS + doctest.NORMALIZE_WHITESPACE)
40
41
42class ICave(Interface):
43    """A cave."""
44    id_num = schema.TextLine(
45        title = u'internal id',
46        default = u'default',
47        required = True,
48        readonly = True,
49        )
50    name = schema.TextLine(
51        title = u'Cave name',
52        default = u'Unnamed',
53        required = True)
54    dinoports = schema.Int(
55        title = u'Number of DinoPorts (tm)',
56        required = False,
57        default = 1)
58    owner = schema.TextLine(
59        title = u'Owner name',
60        required = True,
61        missing_value = 'Fred Estates Inc.')
62    taxpayer = schema.Bool(
63        title = u'Payes taxes',
64        required = True,
65        default = False)
66
67class Cave(object):
68    implements(ICave)
69    def __init__(self, name=u'Unnamed', dinoports=2,
70                 owner='Fred Estates Inc.', taxpayer=False):
71        self.name = name
72        self.dinoports = 2
73        self.owner = owner
74        self.taxpayer = taxpayer
75
76stoneville = dict
77
78SAMPLE_DATA = """name,dinoports,owner,taxpayer
79Barneys Home,2,Barney,1
80Wilmas Asylum,1,Wilma,1
81Freds Dinoburgers,10,Fred,0
82Joeys Drive-in,110,Joey,0
83"""
84
85class CaveProcessor(BatchProcessor):
86    util_name = 'caveprocessor'
87    name = 'Cave Processor'
88    iface = ICave
89    location_fields = ['name']
90    factory_name = 'Lovely Cave'
91
92    def parentsExist(self, row, site):
93        return True
94
95    def getParent(self, row, site):
96        return stoneville
97
98    def entryExists(self, row, site):
99        return row['name'] in stoneville.keys()
100
101    def getEntry(self, row, site):
102        if not self.entryExists(row, site):
103            return None
104        return stoneville[row['name']]
105
106    def delEntry(self, row, site):
107        del stoneville[row['name']]
108
109    def addEntry(self, obj, row, site):
110        stoneville[row['name']] = obj
111
112class BatchProcessorTests(FunctionalTestCase):
113
114    layer = FunctionalLayer
115
116    def setupLogger(self):
117
118        self.logger = logging.getLogger('stoneville')
119        self.logger.setLevel(logging.DEBUG)
120        self.logger.propagate = False
121        self.logfile = os.path.join(self.workdir, 'stoneville.log')
122        self.handler = logging.FileHandler(self.logfile, 'w')
123        self.logger.addHandler(self.handler)
124
125    def setUp(self):
126        global stoneville
127        super(BatchProcessorTests, self).setUp()
128
129        # Setup a sample site for each test
130        app = University()
131        self.dc_root = tempfile.mkdtemp()
132        app['datacenter'].setStoragePath(self.dc_root)
133
134        # Prepopulate the ZODB...
135        self.getRootFolder()['app'] = app
136        self.app = self.getRootFolder()['app']
137
138        self.workdir = tempfile.mkdtemp()
139        factory = Factory(Cave)
140        provideUtility(factory, IFactory, 'Lovely Cave')
141
142        # Provide sample data
143        self.newcomers_csv = os.path.join(self.workdir, 'newcomers.csv')
144        open(self.newcomers_csv, 'wb').write(SAMPLE_DATA)
145        self.setupLogger()
146        self.stoneville = stoneville
147        stoneville = dict()
148        self.resultpath = None
149        return
150
151    def tearDown(self):
152        super(BatchProcessorTests, self).tearDown()
153        shutil.rmtree(self.workdir)
154        shutil.rmtree(self.dc_root)
155        self.logger.removeHandler(self.handler)
156        clearSite()
157        if not isinstance(self.resultpath, list):
158            self.resultpath = [self.resultpath]
159        for path in self.resultpath:
160            if not isinstance(path, basestring):
161                continue
162            if not os.path.isdir(path):
163                path = os.path.dirname(path)
164            if os.path.exists(path):
165                shutil.rmtree(path)
166        return
167
168    def test_iface(self):
169        # make sure we fullfill interface contracts
170        obj = BatchProcessor()
171        verify.verifyClass(IBatchProcessor, BatchProcessor)
172        verify.verifyObject(IBatchProcessor, obj)
173        return
174
175    def test_import(self):
176        processor = CaveProcessor()
177        result = processor.doImport(
178            self.newcomers_csv,
179            ['name', 'dinoports', 'owner', 'taxpayer'],
180            mode='create', user='Bob', logger=self.logger)
181        num_succ, num_fail, finished_path, failed_path = result
182        self.resultpath = [finished_path, failed_path]
183        assert num_succ == 4
184        assert num_fail == 0
185        assert finished_path.endswith('/newcomers.finished.csv')
186        assert failed_path is None
187
188    def test_import_stoneville(self):
189        processor = CaveProcessor()
190        result = processor.doImport(
191            self.newcomers_csv,
192            ['name', 'dinoports', 'owner', 'taxpayer'],
193            mode='create', user='Bob', logger=self.logger)
194        num_succ, num_fail, finished_path, failed_path = result
195        self.resultpath = [finished_path, failed_path]
196        assert len(self.stoneville) == 4
197        self.assertEqual(
198            sorted(self.stoneville.keys()),
199            [u'Barneys Home', u'Freds Dinoburgers',
200             u'Joeys Drive-in', u'Wilmas Asylum'])
201
202    def test_import_correct_type(self):
203        processor = CaveProcessor()
204        result = processor.doImport(
205            self.newcomers_csv,
206            ['name', 'dinoports', 'owner', 'taxpayer'],
207            mode='create', user='Bob', logger=self.logger)
208        num_succ, num_fail, finished_path, failed_path = result
209        self.resultpath = [finished_path, failed_path]
210        assert isinstance(self.stoneville['Barneys Home'].dinoports, int)
211
212
213    def test_log(self):
214        """
215           >>> print log_contents
216           --------------------
217           Bob: Batch processing finished: OK
218           Bob: Source: /.../newcomers.csv
219           Bob: Mode: create
220           Bob: User: Bob
221           Bob: Processing time: ... s (... s/item)
222           Bob: Processed: 4 lines (4 successful/ 0 failed)
223           --------------------
224
225        """
226        processor = CaveProcessor()
227        result = processor.doImport(
228            self.newcomers_csv,
229            ['name', 'dinoports', 'owner', 'taxpayer'],
230            mode='create', user='Bob', logger=self.logger)
231        num_succ, num_fail, finished_path, failed_path = result
232        self.resultpath = [finished_path, failed_path]
233        log_contents = open(self.logfile, 'rb').read()
234        doctest.run_docstring_examples(
235            self.test_log, locals(), False, 'test_log', None, optionflags)
236        return
237
238class ExporterBaseTests(unittest.TestCase):
239
240    def setUp(self):
241        self.workdir = tempfile.mkdtemp()
242        self.workfile = os.path.join(self.workdir, 'testfile.csv')
243        return
244
245    def tearDown(self):
246        shutil.rmtree(self.workdir)
247        return
248
249    def test_iface(self):
250        # ExporterBase really implements the promised interface.
251        obj = ExporterBase()
252        verify.verifyClass(ICSVExporter, ExporterBase)
253        verify.verifyObject(ICSVExporter, obj)
254        return
255
256    def test_unimplemented(self):
257        # make sure the not implemented methods signal that.
258        exporter = ExporterBase()
259        self.assertRaises(NotImplementedError, exporter.export_all, None)
260        self.assertRaises(NotImplementedError, exporter.export, None)
261        return
262
263    def test_mangle_value(self):
264        # some basic types are mangled correctly
265        exporter = ExporterBase()
266        result1 = exporter.mangle_value(True, 'foo')
267        result2 = exporter.mangle_value(False, 'foo')
268        result3 = exporter.mangle_value('string', 'foo')
269        result4 = exporter.mangle_value(u'string', 'foo')
270        result5 = exporter.mangle_value(None, 'foo')
271        result6 = exporter.mangle_value(datetime.date(2012, 4, 1), 'foo')
272        result7 = exporter.mangle_value(
273            datetime.datetime(2012, 4, 1, 12, 1, 1), 'foo')
274        self.assertEqual(
275            (result1, result2, result3, result4, result5),
276            ('1', '0', u'string', u'string', ''))
277        self.assertEqual(type(result3), type('string'))
278        self.assertEqual(type(result4), type('string'))
279        # dates are formatted with trailing hash
280        self.assertEqual(result6, '2012-04-01#')
281        # datetimes are formatted as yyyy-mm-dd hh:mm:ss
282        self.assertEqual(result7, '2012-04-01 12:01:01')
283        return
284
285    def test_get_csv_writer(self):
286        # we can get a CSV writer to a memory file
287        exporter = ExporterBase()
288        writer, outfile = exporter.get_csv_writer()
289        writer.writerow(dict(code='A', title='B', title_prefix='C'))
290        outfile.seek(0)
291        self.assertEqual(
292            outfile.read(),
293            'code,title,title_prefix\r\nA,B,C\r\n')
294        return
295
296    def test_get_csv_writer_with_file(self):
297        # we can get CSV writer that writes to a real file
298        exporter = ExporterBase()
299        writer, outfile = exporter.get_csv_writer(filepath=self.workfile)
300        writer.writerow(dict(code='A', title='B', title_prefix='C'))
301        outfile.close()
302        resultfile = open(self.workfile, 'rb')
303        self.assertEqual(
304            resultfile.read(),
305            'code,title,title_prefix\r\nA,B,C\r\n')
306        return
307
308    def test_write_item(self):
309        # we can write items to opened exporter files.
310        exporter = ExporterBase()
311        writer, outfile = exporter.get_csv_writer()
312        class Sample(object):
313            code = 'A'
314            title = u'B'
315            title_prefix = True
316        exporter.write_item(Sample(), writer)
317        outfile.seek(0)
318        self.assertEqual(
319            outfile.read(),
320            'code,title,title_prefix\r\nA,B,1\r\n')
321        return
322
323    def test_close_outfile(self):
324        # exporters can help to close outfiles.
325        exporter = ExporterBase()
326        writer, outfile = exporter.get_csv_writer()
327        result = exporter.close_outfile(None, outfile)
328        self.assertEqual(result, 'code,title,title_prefix\r\n')
329        return
330
331    def test_close_outfile_real(self):
332        # we can also close outfiles in real files.
333        exporter = ExporterBase()
334        writer, outfile = exporter.get_csv_writer(filepath=self.workfile)
335        result = exporter.close_outfile(self.workfile, outfile)
336        self.assertEqual(result, None)
337        return
338
339
340class CaveExporter(ExporterBase):
341    # A minimal fake exporter suitable to be called by export_jobs
342    fields = ('name', 'dinoports', 'owner', 'taxpayer')
343
344    def export_all(self, site, filepath=None):
345        if filepath is None:
346            return SAMPLE_DATA
347        open(filepath, 'wb').write(SAMPLE_DATA)
348        return
349
350class ExportJobTests(unittest.TestCase):
351    # Test asynchronous export functionality (simple cases)
352
353    def setUp(self):
354        # register a suitable ICSVExporter as named utility
355        self.exporter = CaveExporter()
356        self.gsm = getGlobalSiteManager()
357        self.gsm.registerUtility(
358            self.exporter, ICSVExporter, name='cave_exporter')
359
360    def tearDown(self):
361        self.gsm.unregisterUtility(self.exporter)
362
363    def test_export_job_func(self):
364        # the export_job func does really export data...
365        result_path = export_job(None, 'cave_exporter')
366        self.assertTrue(os.path.isfile(result_path))
367        contents = open(result_path, 'rb').read()
368        shutil.rmtree(os.path.dirname(result_path))
369        self.assertEqual(contents, SAMPLE_DATA)
370        return
371
372    def test_export_job_interfaces(self):
373        # the AsyncExportJob implements promised interfaces correctly...
374        job = AsyncExportJob(None, None)
375        verify.verifyClass(IJob, AsyncExportJob)
376        verify.verifyObject(IJob, job)
377        return
Note: See TracBrowser for help on using the repository browser.