source: main/waeup.kofa/trunk/src/waeup/kofa/utils/tests/test_batching.py @ 9649

Last change on this file since 9649 was 9342, checked in by uli, 12 years ago

Move fake job components to a more general location.

  • Property svn:keywords set to Id
File size: 18.6 KB
Line 
1## $Id: test_batching.py 9342 2012-10-16 09:42:18Z uli $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
18import datetime
19import doctest
20import logging
21import os
22import shutil
23import tempfile
24import unittest
25from zc.async.interfaces import IJob
26from zope import schema
27from zope.component import provideUtility, getGlobalSiteManager
28from zope.component.factory import Factory
29from zope.component.hooks import clearSite
30from zope.component.interfaces import IFactory
31from zope.interface import Interface, implements, verify
32from waeup.kofa.app import University
33from waeup.kofa.interfaces import (
34    ICSVExporter, IBatchProcessor, IExportJobContainer, IJobManager,
35    IExportJob)
36from waeup.kofa.testing import (
37    FunctionalLayer, FunctionalTestCase, FakeJob, FakeJobManager)
38from waeup.kofa.utils.batching import (
39    ExporterBase, BatchProcessor, export_job, AsyncExportJob,
40    ExportJobContainer)
41
42optionflags = (
43    doctest.REPORT_NDIFF + doctest.ELLIPSIS + doctest.NORMALIZE_WHITESPACE)
44
45
46class ICave(Interface):
47    """A cave."""
48    id_num = schema.TextLine(
49        title = u'internal id',
50        default = u'default',
51        required = True,
52        readonly = True,
53        )
54    name = schema.TextLine(
55        title = u'Cave name',
56        default = u'Unnamed',
57        required = True)
58    dinoports = schema.Int(
59        title = u'Number of DinoPorts (tm)',
60        required = False,
61        default = 1)
62    owner = schema.TextLine(
63        title = u'Owner name',
64        required = True,
65        missing_value = 'Fred Estates Inc.')
66    taxpayer = schema.Bool(
67        title = u'Payes taxes',
68        required = True,
69        default = False)
70
71class Cave(object):
72    implements(ICave)
73    def __init__(self, name=u'Unnamed', dinoports=2,
74                 owner='Fred Estates Inc.', taxpayer=False):
75        self.name = name
76        self.dinoports = 2
77        self.owner = owner
78        self.taxpayer = taxpayer
79
80stoneville = dict
81
82SAMPLE_DATA = """name,dinoports,owner,taxpayer
83Barneys Home,2,Barney,1
84Wilmas Asylum,1,Wilma,1
85Freds Dinoburgers,10,Fred,0
86Joeys Drive-in,110,Joey,0
87"""
88
89class CaveProcessor(BatchProcessor):
90    util_name = 'caveprocessor'
91    name = 'Cave Processor'
92    iface = ICave
93    location_fields = ['name']
94    factory_name = 'Lovely Cave'
95
96    def parentsExist(self, row, site):
97        return True
98
99    def getParent(self, row, site):
100        return stoneville
101
102    def entryExists(self, row, site):
103        return row['name'] in stoneville.keys()
104
105    def getEntry(self, row, site):
106        if not self.entryExists(row, site):
107            return None
108        return stoneville[row['name']]
109
110    def delEntry(self, row, site):
111        del stoneville[row['name']]
112
113    def addEntry(self, obj, row, site):
114        stoneville[row['name']] = obj
115
116class BatchProcessorTests(FunctionalTestCase):
117
118    layer = FunctionalLayer
119
120    def setupLogger(self):
121
122        self.logger = logging.getLogger('stoneville')
123        self.logger.setLevel(logging.DEBUG)
124        self.logger.propagate = False
125        self.logfile = os.path.join(self.workdir, 'stoneville.log')
126        self.handler = logging.FileHandler(self.logfile, 'w')
127        self.logger.addHandler(self.handler)
128
129    def setUp(self):
130        global stoneville
131        super(BatchProcessorTests, self).setUp()
132
133        # Setup a sample site for each test
134        app = University()
135        self.dc_root = tempfile.mkdtemp()
136        app['datacenter'].setStoragePath(self.dc_root)
137
138        # Prepopulate the ZODB...
139        self.getRootFolder()['app'] = app
140        self.app = self.getRootFolder()['app']
141
142        self.workdir = tempfile.mkdtemp()
143        factory = Factory(Cave)
144        provideUtility(factory, IFactory, 'Lovely Cave')
145
146        # Provide sample data
147        self.newcomers_csv = os.path.join(self.workdir, 'newcomers.csv')
148        open(self.newcomers_csv, 'wb').write(SAMPLE_DATA)
149        self.setupLogger()
150        self.stoneville = stoneville
151        stoneville = dict()
152        self.resultpath = None
153        return
154
155    def tearDown(self):
156        super(BatchProcessorTests, self).tearDown()
157        shutil.rmtree(self.workdir)
158        shutil.rmtree(self.dc_root)
159        self.logger.removeHandler(self.handler)
160        clearSite()
161        if not isinstance(self.resultpath, list):
162            self.resultpath = [self.resultpath]
163        for path in self.resultpath:
164            if not isinstance(path, basestring):
165                continue
166            if not os.path.isdir(path):
167                path = os.path.dirname(path)
168            if os.path.exists(path):
169                shutil.rmtree(path)
170        return
171
172    def test_iface(self):
173        # make sure we fullfill interface contracts
174        obj = BatchProcessor()
175        verify.verifyClass(IBatchProcessor, BatchProcessor)
176        verify.verifyObject(IBatchProcessor, obj)
177        return
178
179    def test_import(self):
180        processor = CaveProcessor()
181        result = processor.doImport(
182            self.newcomers_csv,
183            ['name', 'dinoports', 'owner', 'taxpayer'],
184            mode='create', user='Bob', logger=self.logger)
185        num_succ, num_fail, finished_path, failed_path = result
186        self.resultpath = [finished_path, failed_path]
187        assert num_succ == 4
188        assert num_fail == 0
189        assert finished_path.endswith('/newcomers.finished.csv')
190        assert failed_path is None
191
192    def test_import_stoneville(self):
193        processor = CaveProcessor()
194        result = processor.doImport(
195            self.newcomers_csv,
196            ['name', 'dinoports', 'owner', 'taxpayer'],
197            mode='create', user='Bob', logger=self.logger)
198        num_succ, num_fail, finished_path, failed_path = result
199        self.resultpath = [finished_path, failed_path]
200        assert len(self.stoneville) == 4
201        self.assertEqual(
202            sorted(self.stoneville.keys()),
203            [u'Barneys Home', u'Freds Dinoburgers',
204             u'Joeys Drive-in', u'Wilmas Asylum'])
205
206    def test_import_correct_type(self):
207        processor = CaveProcessor()
208        result = processor.doImport(
209            self.newcomers_csv,
210            ['name', 'dinoports', 'owner', 'taxpayer'],
211            mode='create', user='Bob', logger=self.logger)
212        num_succ, num_fail, finished_path, failed_path = result
213        self.resultpath = [finished_path, failed_path]
214        assert isinstance(self.stoneville['Barneys Home'].dinoports, int)
215
216
217    def test_log(self):
218        """
219           >>> print log_contents
220           --------------------
221           Bob: Batch processing finished: OK
222           Bob: Source: /.../newcomers.csv
223           Bob: Mode: create
224           Bob: User: Bob
225           Bob: Processing time: ... s (... s/item)
226           Bob: Processed: 4 lines (4 successful/ 0 failed)
227           --------------------
228
229        """
230        processor = CaveProcessor()
231        result = processor.doImport(
232            self.newcomers_csv,
233            ['name', 'dinoports', 'owner', 'taxpayer'],
234            mode='create', user='Bob', logger=self.logger)
235        num_succ, num_fail, finished_path, failed_path = result
236        self.resultpath = [finished_path, failed_path]
237        log_contents = open(self.logfile, 'rb').read()
238        doctest.run_docstring_examples(
239            self.test_log, locals(), False, 'test_log', None, optionflags)
240        return
241
242class ExporterBaseTests(unittest.TestCase):
243
244    def setUp(self):
245        self.workdir = tempfile.mkdtemp()
246        self.workfile = os.path.join(self.workdir, 'testfile.csv')
247        return
248
249    def tearDown(self):
250        shutil.rmtree(self.workdir)
251        return
252
253    def test_iface(self):
254        # ExporterBase really implements the promised interface.
255        obj = ExporterBase()
256        verify.verifyClass(ICSVExporter, ExporterBase)
257        verify.verifyObject(ICSVExporter, obj)
258        return
259
260    def test_unimplemented(self):
261        # make sure the not implemented methods signal that.
262        exporter = ExporterBase()
263        self.assertRaises(NotImplementedError, exporter.export_all, None)
264        self.assertRaises(NotImplementedError, exporter.export, None)
265        return
266
267    def test_mangle_value(self):
268        # some basic types are mangled correctly
269        exporter = ExporterBase()
270        result1 = exporter.mangle_value(True, 'foo')
271        result2 = exporter.mangle_value(False, 'foo')
272        result3 = exporter.mangle_value('string', 'foo')
273        result4 = exporter.mangle_value(u'string', 'foo')
274        result5 = exporter.mangle_value(None, 'foo')
275        result6 = exporter.mangle_value(datetime.date(2012, 4, 1), 'foo')
276        result7 = exporter.mangle_value(
277            datetime.datetime(2012, 4, 1, 12, 1, 1), 'foo')
278        self.assertEqual(
279            (result1, result2, result3, result4, result5),
280            ('1', '0', u'string', u'string', ''))
281        self.assertEqual(type(result3), type('string'))
282        self.assertEqual(type(result4), type('string'))
283        # dates are formatted with trailing hash
284        self.assertEqual(result6, '2012-04-01#')
285        # datetimes are formatted as yyyy-mm-dd hh:mm:ss
286        self.assertEqual(result7, '2012-04-01 12:01:01')
287        return
288
289    def test_get_csv_writer(self):
290        # we can get a CSV writer to a memory file
291        exporter = ExporterBase()
292        writer, outfile = exporter.get_csv_writer()
293        writer.writerow(dict(code='A', title='B', title_prefix='C'))
294        outfile.seek(0)
295        self.assertEqual(
296            outfile.read(),
297            'code,title,title_prefix\r\nA,B,C\r\n')
298        return
299
300    def test_get_csv_writer_with_file(self):
301        # we can get CSV writer that writes to a real file
302        exporter = ExporterBase()
303        writer, outfile = exporter.get_csv_writer(filepath=self.workfile)
304        writer.writerow(dict(code='A', title='B', title_prefix='C'))
305        outfile.close()
306        resultfile = open(self.workfile, 'rb')
307        self.assertEqual(
308            resultfile.read(),
309            'code,title,title_prefix\r\nA,B,C\r\n')
310        return
311
312    def test_write_item(self):
313        # we can write items to opened exporter files.
314        exporter = ExporterBase()
315        writer, outfile = exporter.get_csv_writer()
316        class Sample(object):
317            code = 'A'
318            title = u'B'
319            title_prefix = True
320        exporter.write_item(Sample(), writer)
321        outfile.seek(0)
322        self.assertEqual(
323            outfile.read(),
324            'code,title,title_prefix\r\nA,B,1\r\n')
325        return
326
327    def test_close_outfile(self):
328        # exporters can help to close outfiles.
329        exporter = ExporterBase()
330        writer, outfile = exporter.get_csv_writer()
331        result = exporter.close_outfile(None, outfile)
332        self.assertEqual(result, 'code,title,title_prefix\r\n')
333        return
334
335    def test_close_outfile_real(self):
336        # we can also close outfiles in real files.
337        exporter = ExporterBase()
338        writer, outfile = exporter.get_csv_writer(filepath=self.workfile)
339        result = exporter.close_outfile(self.workfile, outfile)
340        self.assertEqual(result, None)
341        return
342
343
344class CaveExporter(ExporterBase):
345    # A minimal fake exporter suitable to be called by export_jobs
346    fields = ('name', 'dinoports', 'owner', 'taxpayer')
347    title = u'Dummy cave exporter'
348
349    def export_all(self, site, filepath=None):
350        if filepath is None:
351            return SAMPLE_DATA
352        open(filepath, 'wb').write(SAMPLE_DATA)
353        return
354
355class ExportJobTests(unittest.TestCase):
356    # Test asynchronous export functionality (simple cases)
357
358    def setUp(self):
359        # register a suitable ICSVExporter as named utility
360        self.exporter = CaveExporter()
361        self.gsm = getGlobalSiteManager()
362        self.gsm.registerUtility(
363            self.exporter, ICSVExporter, name='cave_exporter')
364
365    def tearDown(self):
366        self.gsm.unregisterUtility(self.exporter)
367
368    def test_export_job_func(self):
369        # the export_job func does really export data...
370        result_path = export_job(None, 'cave_exporter')
371        self.assertTrue(os.path.isfile(result_path))
372        contents = open(result_path, 'rb').read()
373        shutil.rmtree(os.path.dirname(result_path))
374        self.assertEqual(contents, SAMPLE_DATA)
375        return
376
377    def test_export_job_interfaces(self):
378        # the AsyncExportJob implements promised interfaces correctly...
379        job = AsyncExportJob(None, None)
380        verify.verifyClass(IJob, AsyncExportJob)
381        verify.verifyObject(IJob, job)
382        verify.verifyClass(IExportJob, AsyncExportJob)
383        verify.verifyObject(IExportJob, job)
384        return
385
386
387class FakeJobWithResult(FakeJob):
388
389    def __init__(self):
390        self.dir_path = tempfile.mkdtemp()
391        self.result = os.path.join(self.dir_path, 'fake.csv')
392        open(self.result, 'wb').write('a fake result')
393        return
394
395class ExportJobContainerTests(unittest.TestCase):
396    # Test ExportJobContainer
397
398    def setUp(self):
399        # register a suitable ICSVExporter as named utility
400        self.exporter = CaveExporter()
401        self.job_manager = FakeJobManager()
402        self.gsm = getGlobalSiteManager()
403        self.gsm.registerUtility(
404            self.exporter, ICSVExporter, name='cave_exporter')
405        self.gsm.registerUtility(
406            self.job_manager, IJobManager)
407
408    def tearDown(self):
409        self.gsm.unregisterUtility(self.exporter)
410        self.gsm.unregisterUtility(self.job_manager, IJobManager)
411
412    def test_export_job_interfaces(self):
413        # the ExportJobContainer implements promised interfaces correctly...
414        container = ExportJobContainer()
415        verify.verifyClass(IExportJobContainer, ExportJobContainer)
416        verify.verifyObject(IExportJobContainer, container)
417        return
418
419    def test_start_export_job(self):
420        # we can start jobs
421        container = ExportJobContainer()
422        container.start_export_job('cave_exporter', 'bob')
423        result = self.job_manager._jobs.values()[0]
424        self.assertTrue(IJob.providedBy(result))
425        self.assertEqual(
426            container.running_exports,
427            [('1', 'cave_exporter', 'bob')]
428            )
429        return
430
431    def test_get_running_export_jobs_all(self):
432        # we can get export jobs of all users
433        container = ExportJobContainer()
434        container.start_export_job('cave_exporter', 'bob')
435        container.start_export_job('cave_exporter', 'alice')
436        result = container.get_running_export_jobs()
437        self.assertEqual(
438            result,
439            [('1', 'cave_exporter', 'bob'),
440             ('2', 'cave_exporter', 'alice')]
441            )
442        return
443
444    def test_get_running_export_jobs_user(self):
445        # we can get the export jobs running for a certain user
446        container = ExportJobContainer()
447        container.start_export_job('cave_exporter', 'bob')
448        container.start_export_job('cave_exporter', 'alice')
449        result1 = container.get_running_export_jobs(user_id='alice')
450        result2 = container.get_running_export_jobs(user_id='foo')
451        self.assertEqual(
452            result1, [('2', 'cave_exporter', 'alice')])
453        self.assertEqual(
454            result2, [])
455        return
456
457    def test_get_running_export_jobs_only_if_exist(self):
458        # we get only jobs that are accessible through the job manager...
459        container = ExportJobContainer()
460        container.start_export_job('cave_exporter', 'bob')
461        container.start_export_job('cave_exporter', 'bob')
462        self.assertTrue(
463            ('2', 'cave_exporter', 'bob') in container.running_exports)
464        # we remove the second entry from job manager
465        del self.job_manager._jobs['2']
466        result = container.get_running_export_jobs(user_id='bob')
467        self.assertEqual(
468            result, [('1', 'cave_exporter', 'bob')])
469        self.assertTrue(
470            ('2', 'cave_exporter', 'bob') not in container.running_exports)
471        return
472
473    def test_get_export_job_status(self):
474        # we can get the stati of jobs...
475        container = ExportJobContainer()
476        container.start_export_job('cave_exporter', 'alice')
477        container.start_export_job('cave_exporter', 'bob')
478        container.start_export_job('cave_exporter', 'bob')
479        result = container.get_export_jobs_status(user_id='bob')
480        # we'll get the raw value, a translation and the title of the
481        # exporter
482        self.assertEqual(
483            result,
484            [('new', u'new', u'Dummy cave exporter'),
485             ('completed', u'completed', u'Dummy cave exporter')]
486            )
487        return
488
489    def test_delete_export_entry(self):
490        # we can remove export entries in local lists and the job
491        # manager as well...
492        container = ExportJobContainer()
493        container.start_export_job('cave_exporter', 'bob')
494        entry = container.running_exports[0]
495        container.delete_export_entry(entry)
496        # both, running_exports list and job manager are empty now
497        self.assertEqual(
498            container.running_exports, [])
499        self.assertEqual(
500            self.job_manager._jobs, {})
501        return
502
503    def test_delete_export_entry_remove_file(self):
504        # any result files of exports are deleted as well
505        container = ExportJobContainer()
506        entry = ('4', 'cave_exporter', 'bob')
507        container.running_exports = [entry]
508        fake_job = FakeJobWithResult()
509        self.job_manager._jobs['4'] = fake_job
510        self.assertTrue(os.path.isfile(fake_job.result))
511        container.delete_export_entry(entry)
512        self.assertTrue(not os.path.exists(fake_job.result))
513        return
514
515    def test_entry_from_job_id(self):
516        # we can get an entry for a job_id if the id exists
517        container = ExportJobContainer()
518        entry = ('4', 'cave_exporter', 'bob')
519        container.running_exports = [entry]
520        fake_job = FakeJobWithResult()
521        self.job_manager._jobs['4'] = fake_job
522        result1 = container.entry_from_job_id(None)
523        result2 = container.entry_from_job_id('4')
524        result3 = container.entry_from_job_id('23')
525        self.assertEqual(result1, None)
526        self.assertEqual(result2, ('4', 'cave_exporter', 'bob'))
527        self.assertEqual(result3, None)
528        shutil.rmtree(fake_job.dir_path)
529        return
Note: See TracBrowser for help on using the repository browser.