source: main/waeup.kofa/trunk/src/waeup/kofa/utils/tests/test_batching.py

Last change on this file was 12516, checked in by Henrik Bettermann, 10 years ago

Export datasets of selected items specified by a list of identifiers. Will be used in students and applicants modules.

  • Property svn:keywords set to Id
File size: 22.4 KB
Line 
1## $Id: test_batching.py 12516 2015-01-27 08:31:34Z henrik $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
18import datetime
19import doctest
20import logging
21import os
22import shutil
23import tempfile
24import unittest
25from zc.async.interfaces import IJob
26from zope import schema
27from zope.component import provideUtility, getGlobalSiteManager, getUtility
28from zope.component.factory import Factory
29from zope.component.hooks import clearSite, setSite
30from zope.component.interfaces import IFactory
31from zope.interface import Interface, implements, verify
32from waeup.kofa.app import University
33from waeup.kofa.interfaces import (
34    ICSVExporter, IBatchProcessor, IExportJobContainer, IJobManager,
35    IExportJob, IExportContainerFinder)
36from waeup.kofa.testing import (
37    FunctionalLayer, FunctionalTestCase, FakeJob, FakeJobManager)
38from waeup.kofa.utils.batching import (
39    ExporterBase, BatchProcessor, export_job, AsyncExportJob,
40    ExportJobContainer, VirtualExportJobContainer, ExportContainerFinder)
41
42optionflags = (
43    doctest.REPORT_NDIFF + doctest.ELLIPSIS + doctest.NORMALIZE_WHITESPACE)
44
45
46class ICave(Interface):
47    """A cave."""
48    id_num = schema.TextLine(
49        title = u'internal id',
50        default = u'default',
51        required = True,
52        readonly = True,
53        )
54    name = schema.TextLine(
55        title = u'Cave name',
56        default = u'Unnamed',
57        required = True)
58    dinoports = schema.Int(
59        title = u'Number of DinoPorts (tm)',
60        required = False,
61        default = 1)
62    owner = schema.TextLine(
63        title = u'Owner name',
64        required = True,
65        missing_value = 'Fred Estates Inc.')
66    taxpayer = schema.Bool(
67        title = u'Payes taxes',
68        required = True,
69        default = False)
70
71class Cave(object):
72    implements(ICave)
73    def __init__(self, name=u'Unnamed', dinoports=2,
74                 owner='Fred Estates Inc.', taxpayer=False):
75        self.name = name
76        self.dinoports = 2
77        self.owner = owner
78        self.taxpayer = taxpayer
79
80stoneville = dict
81
82SAMPLE_DATA = """name,dinoports,owner,taxpayer
83Barneys Home,2,Barney,1
84Wilmas Asylum,1,Wilma,1
85Freds Dinoburgers,10,Fred,0
86Joeys Drive-in,110,Joey,0
87"""
88
89SAMPLE_FILTERED_DATA = """name,dinoports,owner,taxpayer
90Barneys Home,2,Barney,1
91Wilmas Asylum,1,Wilma,1
92"""
93
94class CaveProcessor(BatchProcessor):
95    util_name = 'caveprocessor'
96    name = 'Cave Processor'
97    iface = ICave
98    location_fields = ['name']
99    factory_name = 'Lovely Cave'
100
101    def parentsExist(self, row, site):
102        return True
103
104    def getParent(self, row, site):
105        return stoneville
106
107    def entryExists(self, row, site):
108        return row['name'] in stoneville.keys()
109
110    def getEntry(self, row, site):
111        if not self.entryExists(row, site):
112            return None
113        return stoneville[row['name']]
114
115    def delEntry(self, row, site):
116        del stoneville[row['name']]
117
118    def addEntry(self, obj, row, site):
119        stoneville[row['name']] = obj
120
121class BatchProcessorTests(FunctionalTestCase):
122
123    layer = FunctionalLayer
124
125    def setupLogger(self):
126
127        self.logger = logging.getLogger('stoneville')
128        self.logger.setLevel(logging.DEBUG)
129        self.logger.propagate = False
130        self.logfile = os.path.join(self.workdir, 'stoneville.log')
131        self.handler = logging.FileHandler(self.logfile, 'w')
132        self.logger.addHandler(self.handler)
133
134    def setUp(self):
135        global stoneville
136        super(BatchProcessorTests, self).setUp()
137
138        # Setup a sample site for each test
139        app = University()
140        self.dc_root = tempfile.mkdtemp()
141        app['datacenter'].setStoragePath(self.dc_root)
142
143        # Prepopulate the ZODB...
144        self.getRootFolder()['app'] = app
145        self.app = self.getRootFolder()['app']
146
147        self.workdir = tempfile.mkdtemp()
148        factory = Factory(Cave)
149        provideUtility(factory, IFactory, 'Lovely Cave')
150
151        # Provide sample data
152        self.newcomers_csv = os.path.join(self.workdir, 'newcomers.csv')
153        open(self.newcomers_csv, 'wb').write(SAMPLE_DATA)
154        self.setupLogger()
155        self.stoneville = stoneville
156        stoneville = dict()
157        self.resultpath = None
158        return
159
160    def tearDown(self):
161        super(BatchProcessorTests, self).tearDown()
162        shutil.rmtree(self.workdir)
163        shutil.rmtree(self.dc_root)
164        self.logger.removeHandler(self.handler)
165        clearSite()
166        if not isinstance(self.resultpath, list):
167            self.resultpath = [self.resultpath]
168        for path in self.resultpath:
169            if not isinstance(path, basestring):
170                continue
171            if not os.path.isdir(path):
172                path = os.path.dirname(path)
173            if os.path.exists(path):
174                shutil.rmtree(path)
175        return
176
177    def test_iface(self):
178        # make sure we fullfill interface contracts
179        obj = BatchProcessor()
180        verify.verifyClass(IBatchProcessor, BatchProcessor)
181        verify.verifyObject(IBatchProcessor, obj)
182        return
183
184    def test_import(self):
185        processor = CaveProcessor()
186        result = processor.doImport(
187            self.newcomers_csv,
188            ['name', 'dinoports', 'owner', 'taxpayer'],
189            mode='create', user='Bob', logger=self.logger)
190        num_succ, num_fail, finished_path, failed_path = result
191        self.resultpath = [finished_path, failed_path]
192        assert num_succ == 4
193        assert num_fail == 0
194        assert finished_path.endswith('/newcomers.finished.csv')
195        assert failed_path is None
196
197    def test_import_stoneville(self):
198        processor = CaveProcessor()
199        result = processor.doImport(
200            self.newcomers_csv,
201            ['name', 'dinoports', 'owner', 'taxpayer'],
202            mode='create', user='Bob', logger=self.logger)
203        num_succ, num_fail, finished_path, failed_path = result
204        self.resultpath = [finished_path, failed_path]
205        assert len(self.stoneville) == 4
206        self.assertEqual(
207            sorted(self.stoneville.keys()),
208            [u'Barneys Home', u'Freds Dinoburgers',
209             u'Joeys Drive-in', u'Wilmas Asylum'])
210
211    def test_import_correct_type(self):
212        processor = CaveProcessor()
213        result = processor.doImport(
214            self.newcomers_csv,
215            ['name', 'dinoports', 'owner', 'taxpayer'],
216            mode='create', user='Bob', logger=self.logger)
217        num_succ, num_fail, finished_path, failed_path = result
218        self.resultpath = [finished_path, failed_path]
219        assert isinstance(self.stoneville['Barneys Home'].dinoports, int)
220
221
222    def test_log(self):
223        """
224           >>> print log_contents
225           processed: /.../newcomers.csv, create mode, 4 lines (4 successful/ 0 failed), ... s (... s/item)
226
227        """
228        processor = CaveProcessor()
229        result = processor.doImport(
230            self.newcomers_csv,
231            ['name', 'dinoports', 'owner', 'taxpayer'],
232            mode='create', user='Bob', logger=self.logger)
233        num_succ, num_fail, finished_path, failed_path = result
234        self.resultpath = [finished_path, failed_path]
235        log_contents = open(self.logfile, 'rb').read()
236        doctest.run_docstring_examples(
237            self.test_log, locals(), False, 'test_log', None, optionflags)
238        return
239
240class ExporterBaseTests(unittest.TestCase):
241
242    def setUp(self):
243        self.workdir = tempfile.mkdtemp()
244        self.workfile = os.path.join(self.workdir, 'testfile.csv')
245        return
246
247    def tearDown(self):
248        shutil.rmtree(self.workdir)
249        return
250
251    def test_iface(self):
252        # ExporterBase really implements the promised interface.
253        obj = ExporterBase()
254        verify.verifyClass(ICSVExporter, ExporterBase)
255        verify.verifyObject(ICSVExporter, obj)
256        return
257
258    def test_unimplemented(self):
259        # make sure the not implemented methods signal that.
260        exporter = ExporterBase()
261        self.assertRaises(NotImplementedError, exporter.export_all, None)
262        self.assertRaises(NotImplementedError, exporter.export, None)
263        return
264
265    def test_mangle_value(self):
266        # some basic types are mangled correctly
267        exporter = ExporterBase()
268        result1 = exporter.mangle_value(True, 'foo')
269        result2 = exporter.mangle_value(False, 'foo')
270        result3 = exporter.mangle_value('string', 'foo')
271        result4 = exporter.mangle_value(u'string', 'foo')
272        result5 = exporter.mangle_value(None, 'foo')
273        result6 = exporter.mangle_value(datetime.date(2012, 4, 1), 'foo')
274        result7 = exporter.mangle_value(
275            datetime.datetime(2012, 4, 1, 12, 1, 1), 'foo')
276        self.assertEqual(
277            (result1, result2, result3, result4, result5),
278            ('1', '0', u'string', u'string', ''))
279        self.assertEqual(type(result3), type('string'))
280        self.assertEqual(type(result4), type('string'))
281        # dates are formatted with trailing hash
282        self.assertEqual(result6, '2012-04-01#')
283        # datetimes are formatted as yyyy-mm-dd hh:mm:ss
284        self.assertEqual(result7, '2012-04-01 12:01:01#')
285        return
286
287    def test_get_csv_writer(self):
288        # we can get a CSV writer to a memory file
289        exporter = ExporterBase()
290        writer, outfile = exporter.get_csv_writer()
291        writer.writerow(dict(code='A', title='B', title_prefix='C'))
292        outfile.seek(0)
293        self.assertEqual(
294            outfile.read(),
295            'code,title,title_prefix\r\nA,B,C\r\n')
296        return
297
298    def test_get_csv_writer_with_file(self):
299        # we can get CSV writer that writes to a real file
300        exporter = ExporterBase()
301        writer, outfile = exporter.get_csv_writer(filepath=self.workfile)
302        writer.writerow(dict(code='A', title='B', title_prefix='C'))
303        outfile.close()
304        resultfile = open(self.workfile, 'rb')
305        self.assertEqual(
306            resultfile.read(),
307            'code,title,title_prefix\r\nA,B,C\r\n')
308        return
309
310    def test_write_item(self):
311        # we can write items to opened exporter files.
312        exporter = ExporterBase()
313        writer, outfile = exporter.get_csv_writer()
314        class Sample(object):
315            code = 'A'
316            title = u'B'
317            title_prefix = True
318        exporter.write_item(Sample(), writer)
319        outfile.seek(0)
320        self.assertEqual(
321            outfile.read(),
322            'code,title,title_prefix\r\nA,B,1\r\n')
323        return
324
325    def test_close_outfile(self):
326        # exporters can help to close outfiles.
327        exporter = ExporterBase()
328        writer, outfile = exporter.get_csv_writer()
329        result = exporter.close_outfile(None, outfile)
330        self.assertEqual(result, 'code,title,title_prefix\r\n')
331        return
332
333    def test_close_outfile_real(self):
334        # we can also close outfiles in real files.
335        exporter = ExporterBase()
336        writer, outfile = exporter.get_csv_writer(filepath=self.workfile)
337        result = exporter.close_outfile(self.workfile, outfile)
338        self.assertEqual(result, None)
339        return
340
341    def test_export_filtered(self):
342        # we can pass in positional and keyword args
343        exporter = ExporterBase()
344        self.assertRaises(NotImplementedError, exporter.export_filtered,
345                          'foo', bar='bar')
346
347    def test_export_selected(self):
348        # we can pass in a list of identifiers
349        exporter = ExporterBase()
350        self.assertRaises(NotImplementedError, exporter.export_selected,
351                          'foo', selected=[])
352        return
353
354class CaveExporter(ExporterBase):
355    # A minimal fake exporter suitable to be called by export_jobs
356    fields = ('name', 'dinoports', 'owner', 'taxpayer')
357    title = u'Dummy cave exporter'
358
359    def export_all(self, site, filepath=None):
360        if filepath is None:
361            return SAMPLE_DATA
362        open(filepath, 'wb').write(SAMPLE_DATA)
363        return
364
365    def export_filtered(self, site, filepath=None, foo=None, bar=None):
366        if foo or bar:
367            open(filepath, 'wb').write(SAMPLE_FILTERED_DATA)
368            return
369        self.export_all(site, filepath=filepath)
370        return
371
372    def export_selected(self, site, filepath=None, selected=None):
373        if selected:
374            open(filepath, 'wb').write(SAMPLE_FILTERED_DATA)
375            return
376        self.export_all(site, filepath=filepath)
377        return
378
379class ExportJobTests(unittest.TestCase):
380    # Test asynchronous export functionality (simple cases)
381
382    def setUp(self):
383        # register a suitable ICSVExporter as named utility
384        self.exporter = CaveExporter()
385        self.gsm = getGlobalSiteManager()
386        self.gsm.registerUtility(
387            self.exporter, ICSVExporter, name='cave_exporter')
388
389    def tearDown(self):
390        self.gsm.unregisterUtility(self.exporter)
391
392    def test_export_job_func(self):
393        # the export_job func does really export data...
394        result_path = export_job(None, 'cave_exporter')
395        self.assertTrue(os.path.isfile(result_path))
396        contents = open(result_path, 'rb').read()
397        shutil.rmtree(os.path.dirname(result_path))
398        self.assertEqual(contents, SAMPLE_DATA)
399        return
400
401    def test_export_job_interfaces(self):
402        # the AsyncExportJob implements promised interfaces correctly...
403        job = AsyncExportJob(None, None)
404        verify.verifyClass(IJob, AsyncExportJob)
405        verify.verifyObject(IJob, job)
406        verify.verifyClass(IExportJob, AsyncExportJob)
407        verify.verifyObject(IExportJob, job)
408        return
409
410    def test_export_job_with_args(self):
411        # we export filtered sets
412        result_path = export_job(None, 'cave_exporter', foo='foo')
413        contents = open(result_path, 'rb').read()
414        shutil.rmtree(os.path.dirname(result_path))
415        self.assertEqual(contents, SAMPLE_FILTERED_DATA)
416        return
417
418    def test_export_job_selected(self):
419        # we export subsets
420        result_path = export_job(None, 'cave_exporter', selected=['foo'])
421        contents = open(result_path, 'rb').read()
422        shutil.rmtree(os.path.dirname(result_path))
423        self.assertEqual(contents, SAMPLE_FILTERED_DATA)
424        return
425
426
427class FakeJobWithResult(FakeJob):
428
429    def __init__(self):
430        self.dir_path = tempfile.mkdtemp()
431        self.result = os.path.join(self.dir_path, 'fake.csv')
432        open(self.result, 'wb').write('a fake result')
433        return
434
435class ExportJobContainerTests(unittest.TestCase):
436    # Test ExportJobContainer
437
438    TestedClass = ExportJobContainer
439
440    def setUp(self):
441        # register a suitable ICSVExporter as named utility
442        self.exporter = CaveExporter()
443        self.job_manager = FakeJobManager()
444        self.gsm = getGlobalSiteManager()
445        self.gsm.registerUtility(
446            self.exporter, ICSVExporter, name='cave_exporter')
447        self.gsm.registerUtility(
448            self.job_manager, IJobManager)
449
450    def tearDown(self):
451        self.gsm.unregisterUtility(self.exporter)
452        self.gsm.unregisterUtility(self.job_manager, IJobManager)
453
454    def test_export_job_interfaces(self):
455        # the ExportJobContainer implements promised interfaces correctly...
456        container = self.TestedClass()
457        verify.verifyClass(IExportJobContainer, self.TestedClass)
458        verify.verifyObject(IExportJobContainer, container)
459        return
460
461    def test_start_export_job(self):
462        # we can start jobs
463        container = self.TestedClass()
464        container.start_export_job('cave_exporter', 'bob')
465        result = self.job_manager._jobs.values()[0]
466        self.assertTrue(IJob.providedBy(result))
467        self.assertEqual(
468            container.running_exports,
469            [('1', 'cave_exporter', 'bob')]
470            )
471        return
472
473    def test_get_running_export_jobs_all(self):
474        # we can get export jobs of all users
475        container = self.TestedClass()
476        container.start_export_job('cave_exporter', 'bob')
477        container.start_export_job('cave_exporter', 'alice')
478        result = container.get_running_export_jobs()
479        self.assertEqual(
480            result,
481            [('1', 'cave_exporter', 'bob'),
482             ('2', 'cave_exporter', 'alice')]
483            )
484        return
485
486    def test_get_running_export_jobs_user(self):
487        # we can get the export jobs running for a certain user
488        container = self.TestedClass()
489        container.start_export_job('cave_exporter', 'bob')
490        container.start_export_job('cave_exporter', 'alice')
491        result1 = container.get_running_export_jobs(user_id='alice')
492        result2 = container.get_running_export_jobs(user_id='foo')
493        self.assertEqual(
494            result1, [('2', 'cave_exporter', 'alice')])
495        self.assertEqual(
496            result2, [])
497        return
498
499    def test_get_running_export_jobs_only_if_exist(self):
500        # we get only jobs that are accessible through the job manager...
501        container = self.TestedClass()
502        container.start_export_job('cave_exporter', 'bob')
503        container.start_export_job('cave_exporter', 'bob')
504        self.assertTrue(
505            ('2', 'cave_exporter', 'bob') in container.running_exports)
506        # we remove the second entry from job manager
507        del self.job_manager._jobs['2']
508        result = container.get_running_export_jobs(user_id='bob')
509        self.assertEqual(
510            result, [('1', 'cave_exporter', 'bob')])
511        self.assertTrue(
512            ('2', 'cave_exporter', 'bob') not in container.running_exports)
513        return
514
515    def test_get_export_job_status(self):
516        # we can get the stati of jobs...
517        container = self.TestedClass()
518        container.start_export_job('cave_exporter', 'alice')
519        container.start_export_job('cave_exporter', 'bob')
520        container.start_export_job('cave_exporter', 'bob')
521        result = container.get_export_jobs_status(user_id='bob')
522        # we'll get the raw value, a translation and the title of the
523        # exporter
524        self.assertEqual(
525            result,
526            [('new', u'new', u'Dummy cave exporter'),
527             ('completed', u'completed', u'Dummy cave exporter')]
528            )
529        return
530
531    def test_delete_export_entry(self):
532        # we can remove export entries in local lists and the job
533        # manager as well...
534        container = self.TestedClass()
535        container.start_export_job('cave_exporter', 'bob')
536        entry = container.running_exports[0]
537        container.delete_export_entry(entry)
538        # both, running_exports list and job manager are empty now
539        self.assertEqual(
540            container.running_exports, [])
541        self.assertEqual(
542            self.job_manager._jobs, {})
543        return
544
545    def test_delete_export_entry_remove_file(self):
546        # any result files of exports are deleted as well
547        container = self.TestedClass()
548        entry = ('4', 'cave_exporter', 'bob')
549        container.running_exports = [entry]
550        fake_job = FakeJobWithResult()
551        self.job_manager._jobs['4'] = fake_job
552        self.assertTrue(os.path.isfile(fake_job.result))
553        container.delete_export_entry(entry)
554        self.assertTrue(not os.path.exists(fake_job.result))
555        return
556
557    def test_entry_from_job_id(self):
558        # we can get an entry for a job_id if the id exists
559        container = self.TestedClass()
560        entry = ('4', 'cave_exporter', 'bob')
561        container.running_exports = [entry]
562        fake_job = FakeJobWithResult()
563        self.job_manager._jobs['4'] = fake_job
564        result1 = container.entry_from_job_id(None)
565        result2 = container.entry_from_job_id('4')
566        result3 = container.entry_from_job_id('23')
567        self.assertEqual(result1, None)
568        self.assertEqual(result2, ('4', 'cave_exporter', 'bob'))
569        self.assertEqual(result3, None)
570        shutil.rmtree(fake_job.dir_path)
571        return
572
573class VirtualExportJobContainerTests(ExportJobContainerTests):
574    # VirtualExportJobContainers should provide the
575    # same functionality as regular ones.
576
577    TestedClass = VirtualExportJobContainer
578
579    def setUp(self):
580        super(VirtualExportJobContainerTests, self).setUp()
581        self.root_job_container = ExportJobContainer()
582        def fake_finder():
583            return self.root_job_container
584        self.gsm = getGlobalSiteManager()
585        self.gsm.registerUtility(fake_finder, IExportContainerFinder)
586        return
587
588class ExportContainerFinderTests(FunctionalTestCase):
589    # Tests for export container finder.
590
591    layer = FunctionalLayer
592
593    def test_get_finder_as_util(self):
594        # we can get a finder by utility lookup
595        finder = getUtility(IExportContainerFinder)
596        self.assertTrue(finder is not None)
597        self.assertEqual(
598            IExportContainerFinder.providedBy(finder),
599            True)
600        return
601
602    def test_iface(self):
603        # the finder complies with the promised interface
604        finder = ExportContainerFinder()
605        verify.verifyClass(IExportContainerFinder,
606                           ExportContainerFinder)
607        verify.verifyObject(IExportContainerFinder, finder)
608        return
609
610    def test_no_site(self):
611        # a finder returns None if no site is available
612        finder = ExportContainerFinder()
613        self.assertEqual(
614            finder(), None)
615        return
616
617    def test_active_site(self):
618        # we get the datafinder if one is installed and site set
619        self.getRootFolder()['app'] = University()
620        finder = getUtility(IExportContainerFinder)
621        setSite(self.getRootFolder()['app'])
622        container = finder()
623        self.assertTrue(container is not None)
624        return
625
626    def test_broken_site(self):
627        # if the current site has no ExportContainer, we get None
628        self.getRootFolder()['app'] = University()
629        app = self.getRootFolder()['app']
630        del app['datacenter'] # datacenter _is_ the export container
631        setSite(app)
632        finder = getUtility(IExportContainerFinder)
633        container = finder()
634        self.assertTrue(container is None)
635        return
Note: See TracBrowser for help on using the repository browser.