source: main/waeup.kofa/trunk/src/waeup/kofa/utils/tests/test_batching.py

Last change on this file was 12516, checked in by Henrik Bettermann, 10 years ago

Export datasets of selected items specified by a list of identifiers. Will be used in students and applicants modules.

  • Property svn:keywords set to Id
File size: 22.4 KB
RevLine 
[7196]1## $Id: test_batching.py 12516 2015-01-27 08:31:34Z henrik $
[6273]2##
[7196]3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
[6273]4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
[7196]8##
[6273]9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
[7196]13##
[6273]14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
[8380]18import datetime
[6273]19import doctest
20import logging
21import os
22import shutil
23import tempfile
[7857]24import unittest
[9342]25from zc.async.interfaces import IJob
[6273]26from zope import schema
[9726]27from zope.component import provideUtility, getGlobalSiteManager, getUtility
[6273]28from zope.component.factory import Factory
[9726]29from zope.component.hooks import clearSite, setSite
[6273]30from zope.component.interfaces import IFactory
[7857]31from zope.interface import Interface, implements, verify
[7811]32from waeup.kofa.app import University
[9217]33from waeup.kofa.interfaces import (
34    ICSVExporter, IBatchProcessor, IExportJobContainer, IJobManager,
[9726]35    IExportJob, IExportContainerFinder)
[9342]36from waeup.kofa.testing import (
37    FunctionalLayer, FunctionalTestCase, FakeJob, FakeJobManager)
[9217]38from waeup.kofa.utils.batching import (
39    ExporterBase, BatchProcessor, export_job, AsyncExportJob,
[9726]40    ExportJobContainer, VirtualExportJobContainer, ExportContainerFinder)
[6273]41
42optionflags = (
43    doctest.REPORT_NDIFF + doctest.ELLIPSIS + doctest.NORMALIZE_WHITESPACE)
44
45
46class ICave(Interface):
47    """A cave."""
48    id_num = schema.TextLine(
49        title = u'internal id',
50        default = u'default',
51        required = True,
52        readonly = True,
53        )
54    name = schema.TextLine(
55        title = u'Cave name',
56        default = u'Unnamed',
57        required = True)
58    dinoports = schema.Int(
59        title = u'Number of DinoPorts (tm)',
60        required = False,
61        default = 1)
62    owner = schema.TextLine(
63        title = u'Owner name',
64        required = True,
65        missing_value = 'Fred Estates Inc.')
66    taxpayer = schema.Bool(
67        title = u'Payes taxes',
68        required = True,
69        default = False)
70
71class Cave(object):
72    implements(ICave)
73    def __init__(self, name=u'Unnamed', dinoports=2,
74                 owner='Fred Estates Inc.', taxpayer=False):
75        self.name = name
76        self.dinoports = 2
77        self.owner = owner
78        self.taxpayer = taxpayer
79
80stoneville = dict
81
[9217]82SAMPLE_DATA = """name,dinoports,owner,taxpayer
83Barneys Home,2,Barney,1
84Wilmas Asylum,1,Wilma,1
85Freds Dinoburgers,10,Fred,0
86Joeys Drive-in,110,Joey,0
87"""
88
[9797]89SAMPLE_FILTERED_DATA = """name,dinoports,owner,taxpayer
90Barneys Home,2,Barney,1
91Wilmas Asylum,1,Wilma,1
92"""
93
[6273]94class CaveProcessor(BatchProcessor):
95    util_name = 'caveprocessor'
96    name = 'Cave Processor'
97    iface = ICave
98    location_fields = ['name']
99    factory_name = 'Lovely Cave'
100
101    def parentsExist(self, row, site):
102        return True
103
104    def getParent(self, row, site):
105        return stoneville
106
107    def entryExists(self, row, site):
108        return row['name'] in stoneville.keys()
109
110    def getEntry(self, row, site):
111        if not self.entryExists(row, site):
112            return None
113        return stoneville[row['name']]
114
115    def delEntry(self, row, site):
116        del stoneville[row['name']]
117
118    def addEntry(self, obj, row, site):
119        stoneville[row['name']] = obj
120
121class BatchProcessorTests(FunctionalTestCase):
122
123    layer = FunctionalLayer
124
125    def setupLogger(self):
126
127        self.logger = logging.getLogger('stoneville')
128        self.logger.setLevel(logging.DEBUG)
129        self.logger.propagate = False
130        self.logfile = os.path.join(self.workdir, 'stoneville.log')
131        self.handler = logging.FileHandler(self.logfile, 'w')
132        self.logger.addHandler(self.handler)
133
134    def setUp(self):
135        global stoneville
136        super(BatchProcessorTests, self).setUp()
137
138        # Setup a sample site for each test
139        app = University()
140        self.dc_root = tempfile.mkdtemp()
141        app['datacenter'].setStoragePath(self.dc_root)
142
143        # Prepopulate the ZODB...
144        self.getRootFolder()['app'] = app
145        self.app = self.getRootFolder()['app']
146
147        self.workdir = tempfile.mkdtemp()
148        factory = Factory(Cave)
149        provideUtility(factory, IFactory, 'Lovely Cave')
150
151        # Provide sample data
152        self.newcomers_csv = os.path.join(self.workdir, 'newcomers.csv')
[9217]153        open(self.newcomers_csv, 'wb').write(SAMPLE_DATA)
[6273]154        self.setupLogger()
155        self.stoneville = stoneville
156        stoneville = dict()
157        self.resultpath = None
158        return
159
160    def tearDown(self):
161        super(BatchProcessorTests, self).tearDown()
162        shutil.rmtree(self.workdir)
163        shutil.rmtree(self.dc_root)
164        self.logger.removeHandler(self.handler)
165        clearSite()
166        if not isinstance(self.resultpath, list):
167            self.resultpath = [self.resultpath]
168        for path in self.resultpath:
169            if not isinstance(path, basestring):
170                continue
171            if not os.path.isdir(path):
172                path = os.path.dirname(path)
173            if os.path.exists(path):
174                shutil.rmtree(path)
175        return
176
[8219]177    def test_iface(self):
178        # make sure we fullfill interface contracts
179        obj = BatchProcessor()
180        verify.verifyClass(IBatchProcessor, BatchProcessor)
181        verify.verifyObject(IBatchProcessor, obj)
182        return
183
[6273]184    def test_import(self):
185        processor = CaveProcessor()
186        result = processor.doImport(
187            self.newcomers_csv,
188            ['name', 'dinoports', 'owner', 'taxpayer'],
189            mode='create', user='Bob', logger=self.logger)
190        num_succ, num_fail, finished_path, failed_path = result
191        self.resultpath = [finished_path, failed_path]
192        assert num_succ == 4
193        assert num_fail == 0
194        assert finished_path.endswith('/newcomers.finished.csv')
195        assert failed_path is None
196
197    def test_import_stoneville(self):
198        processor = CaveProcessor()
199        result = processor.doImport(
200            self.newcomers_csv,
201            ['name', 'dinoports', 'owner', 'taxpayer'],
202            mode='create', user='Bob', logger=self.logger)
203        num_succ, num_fail, finished_path, failed_path = result
204        self.resultpath = [finished_path, failed_path]
205        assert len(self.stoneville) == 4
206        self.assertEqual(
207            sorted(self.stoneville.keys()),
208            [u'Barneys Home', u'Freds Dinoburgers',
209             u'Joeys Drive-in', u'Wilmas Asylum'])
210
211    def test_import_correct_type(self):
212        processor = CaveProcessor()
213        result = processor.doImport(
214            self.newcomers_csv,
215            ['name', 'dinoports', 'owner', 'taxpayer'],
216            mode='create', user='Bob', logger=self.logger)
217        num_succ, num_fail, finished_path, failed_path = result
218        self.resultpath = [finished_path, failed_path]
219        assert isinstance(self.stoneville['Barneys Home'].dinoports, int)
220
221
222    def test_log(self):
223        """
224           >>> print log_contents
[9739]225           processed: /.../newcomers.csv, create mode, 4 lines (4 successful/ 0 failed), ... s (... s/item)
[6273]226
227        """
228        processor = CaveProcessor()
229        result = processor.doImport(
230            self.newcomers_csv,
231            ['name', 'dinoports', 'owner', 'taxpayer'],
232            mode='create', user='Bob', logger=self.logger)
233        num_succ, num_fail, finished_path, failed_path = result
234        self.resultpath = [finished_path, failed_path]
235        log_contents = open(self.logfile, 'rb').read()
236        doctest.run_docstring_examples(
237            self.test_log, locals(), False, 'test_log', None, optionflags)
238        return
239
[7857]240class ExporterBaseTests(unittest.TestCase):
241
242    def setUp(self):
243        self.workdir = tempfile.mkdtemp()
244        self.workfile = os.path.join(self.workdir, 'testfile.csv')
245        return
246
247    def tearDown(self):
248        shutil.rmtree(self.workdir)
249        return
250
251    def test_iface(self):
252        # ExporterBase really implements the promised interface.
253        obj = ExporterBase()
254        verify.verifyClass(ICSVExporter, ExporterBase)
255        verify.verifyObject(ICSVExporter, obj)
256        return
257
258    def test_unimplemented(self):
259        # make sure the not implemented methods signal that.
260        exporter = ExporterBase()
261        self.assertRaises(NotImplementedError, exporter.export_all, None)
262        self.assertRaises(NotImplementedError, exporter.export, None)
263        return
264
265    def test_mangle_value(self):
266        # some basic types are mangled correctly
267        exporter = ExporterBase()
268        result1 = exporter.mangle_value(True, 'foo')
269        result2 = exporter.mangle_value(False, 'foo')
270        result3 = exporter.mangle_value('string', 'foo')
271        result4 = exporter.mangle_value(u'string', 'foo')
272        result5 = exporter.mangle_value(None, 'foo')
[8380]273        result6 = exporter.mangle_value(datetime.date(2012, 4, 1), 'foo')
274        result7 = exporter.mangle_value(
275            datetime.datetime(2012, 4, 1, 12, 1, 1), 'foo')
[7857]276        self.assertEqual(
277            (result1, result2, result3, result4, result5),
278            ('1', '0', u'string', u'string', ''))
279        self.assertEqual(type(result3), type('string'))
280        self.assertEqual(type(result4), type('string'))
[8380]281        # dates are formatted with trailing hash
282        self.assertEqual(result6, '2012-04-01#')
283        # datetimes are formatted as yyyy-mm-dd hh:mm:ss
[11737]284        self.assertEqual(result7, '2012-04-01 12:01:01#')
[7857]285        return
286
287    def test_get_csv_writer(self):
288        # we can get a CSV writer to a memory file
289        exporter = ExporterBase()
290        writer, outfile = exporter.get_csv_writer()
291        writer.writerow(dict(code='A', title='B', title_prefix='C'))
292        outfile.seek(0)
293        self.assertEqual(
294            outfile.read(),
295            'code,title,title_prefix\r\nA,B,C\r\n')
296        return
297
298    def test_get_csv_writer_with_file(self):
299        # we can get CSV writer that writes to a real file
300        exporter = ExporterBase()
301        writer, outfile = exporter.get_csv_writer(filepath=self.workfile)
302        writer.writerow(dict(code='A', title='B', title_prefix='C'))
303        outfile.close()
304        resultfile = open(self.workfile, 'rb')
305        self.assertEqual(
306            resultfile.read(),
307            'code,title,title_prefix\r\nA,B,C\r\n')
308        return
309
310    def test_write_item(self):
311        # we can write items to opened exporter files.
312        exporter = ExporterBase()
313        writer, outfile = exporter.get_csv_writer()
314        class Sample(object):
315            code = 'A'
316            title = u'B'
317            title_prefix = True
318        exporter.write_item(Sample(), writer)
319        outfile.seek(0)
320        self.assertEqual(
321            outfile.read(),
322            'code,title,title_prefix\r\nA,B,1\r\n')
323        return
324
325    def test_close_outfile(self):
326        # exporters can help to close outfiles.
327        exporter = ExporterBase()
328        writer, outfile = exporter.get_csv_writer()
329        result = exporter.close_outfile(None, outfile)
330        self.assertEqual(result, 'code,title,title_prefix\r\n')
331        return
332
[7858]333    def test_close_outfile_real(self):
[7857]334        # we can also close outfiles in real files.
335        exporter = ExporterBase()
336        writer, outfile = exporter.get_csv_writer(filepath=self.workfile)
337        result = exporter.close_outfile(self.workfile, outfile)
338        self.assertEqual(result, None)
339        return
[9217]340
[9797]341    def test_export_filtered(self):
342        # we can pass in positional and keyword args
343        exporter = ExporterBase()
344        self.assertRaises(NotImplementedError, exporter.export_filtered,
345                          'foo', bar='bar')
[12516]346
347    def test_export_selected(self):
348        # we can pass in a list of identifiers
349        exporter = ExporterBase()
350        self.assertRaises(NotImplementedError, exporter.export_selected,
351                          'foo', selected=[])
[9797]352        return
[9217]353
354class CaveExporter(ExporterBase):
355    # A minimal fake exporter suitable to be called by export_jobs
356    fields = ('name', 'dinoports', 'owner', 'taxpayer')
357    title = u'Dummy cave exporter'
358
359    def export_all(self, site, filepath=None):
360        if filepath is None:
361            return SAMPLE_DATA
362        open(filepath, 'wb').write(SAMPLE_DATA)
363        return
364
[9797]365    def export_filtered(self, site, filepath=None, foo=None, bar=None):
366        if foo or bar:
367            open(filepath, 'wb').write(SAMPLE_FILTERED_DATA)
368            return
369        self.export_all(site, filepath=filepath)
370        return
371
[12516]372    def export_selected(self, site, filepath=None, selected=None):
373        if selected:
374            open(filepath, 'wb').write(SAMPLE_FILTERED_DATA)
375            return
376        self.export_all(site, filepath=filepath)
377        return
378
[9217]379class ExportJobTests(unittest.TestCase):
380    # Test asynchronous export functionality (simple cases)
381
382    def setUp(self):
383        # register a suitable ICSVExporter as named utility
384        self.exporter = CaveExporter()
385        self.gsm = getGlobalSiteManager()
386        self.gsm.registerUtility(
387            self.exporter, ICSVExporter, name='cave_exporter')
388
389    def tearDown(self):
390        self.gsm.unregisterUtility(self.exporter)
391
392    def test_export_job_func(self):
393        # the export_job func does really export data...
394        result_path = export_job(None, 'cave_exporter')
395        self.assertTrue(os.path.isfile(result_path))
396        contents = open(result_path, 'rb').read()
397        shutil.rmtree(os.path.dirname(result_path))
398        self.assertEqual(contents, SAMPLE_DATA)
399        return
400
401    def test_export_job_interfaces(self):
402        # the AsyncExportJob implements promised interfaces correctly...
403        job = AsyncExportJob(None, None)
404        verify.verifyClass(IJob, AsyncExportJob)
405        verify.verifyObject(IJob, job)
406        verify.verifyClass(IExportJob, AsyncExportJob)
407        verify.verifyObject(IExportJob, job)
408        return
409
[9797]410    def test_export_job_with_args(self):
411        # we export filtered sets
412        result_path = export_job(None, 'cave_exporter', foo='foo')
413        contents = open(result_path, 'rb').read()
414        shutil.rmtree(os.path.dirname(result_path))
415        self.assertEqual(contents, SAMPLE_FILTERED_DATA)
416        return
[9217]417
[12516]418    def test_export_job_selected(self):
419        # we export subsets
420        result_path = export_job(None, 'cave_exporter', selected=['foo'])
421        contents = open(result_path, 'rb').read()
422        shutil.rmtree(os.path.dirname(result_path))
423        self.assertEqual(contents, SAMPLE_FILTERED_DATA)
424        return
[9797]425
[12516]426
[9217]427class FakeJobWithResult(FakeJob):
428
429    def __init__(self):
[9283]430        self.dir_path = tempfile.mkdtemp()
431        self.result = os.path.join(self.dir_path, 'fake.csv')
[9217]432        open(self.result, 'wb').write('a fake result')
433        return
434
435class ExportJobContainerTests(unittest.TestCase):
436    # Test ExportJobContainer
437
[9726]438    TestedClass = ExportJobContainer
439
[9217]440    def setUp(self):
441        # register a suitable ICSVExporter as named utility
442        self.exporter = CaveExporter()
443        self.job_manager = FakeJobManager()
444        self.gsm = getGlobalSiteManager()
445        self.gsm.registerUtility(
446            self.exporter, ICSVExporter, name='cave_exporter')
447        self.gsm.registerUtility(
448            self.job_manager, IJobManager)
449
450    def tearDown(self):
451        self.gsm.unregisterUtility(self.exporter)
452        self.gsm.unregisterUtility(self.job_manager, IJobManager)
453
454    def test_export_job_interfaces(self):
455        # the ExportJobContainer implements promised interfaces correctly...
[9726]456        container = self.TestedClass()
457        verify.verifyClass(IExportJobContainer, self.TestedClass)
[9217]458        verify.verifyObject(IExportJobContainer, container)
459        return
460
461    def test_start_export_job(self):
462        # we can start jobs
[9726]463        container = self.TestedClass()
[9217]464        container.start_export_job('cave_exporter', 'bob')
465        result = self.job_manager._jobs.values()[0]
466        self.assertTrue(IJob.providedBy(result))
467        self.assertEqual(
468            container.running_exports,
469            [('1', 'cave_exporter', 'bob')]
470            )
471        return
472
473    def test_get_running_export_jobs_all(self):
474        # we can get export jobs of all users
[9726]475        container = self.TestedClass()
[9217]476        container.start_export_job('cave_exporter', 'bob')
477        container.start_export_job('cave_exporter', 'alice')
478        result = container.get_running_export_jobs()
479        self.assertEqual(
480            result,
481            [('1', 'cave_exporter', 'bob'),
482             ('2', 'cave_exporter', 'alice')]
483            )
484        return
485
486    def test_get_running_export_jobs_user(self):
487        # we can get the export jobs running for a certain user
[9726]488        container = self.TestedClass()
[9217]489        container.start_export_job('cave_exporter', 'bob')
490        container.start_export_job('cave_exporter', 'alice')
491        result1 = container.get_running_export_jobs(user_id='alice')
492        result2 = container.get_running_export_jobs(user_id='foo')
493        self.assertEqual(
494            result1, [('2', 'cave_exporter', 'alice')])
495        self.assertEqual(
496            result2, [])
497        return
498
499    def test_get_running_export_jobs_only_if_exist(self):
500        # we get only jobs that are accessible through the job manager...
[9726]501        container = self.TestedClass()
[9217]502        container.start_export_job('cave_exporter', 'bob')
503        container.start_export_job('cave_exporter', 'bob')
504        self.assertTrue(
505            ('2', 'cave_exporter', 'bob') in container.running_exports)
506        # we remove the second entry from job manager
507        del self.job_manager._jobs['2']
508        result = container.get_running_export_jobs(user_id='bob')
509        self.assertEqual(
510            result, [('1', 'cave_exporter', 'bob')])
511        self.assertTrue(
512            ('2', 'cave_exporter', 'bob') not in container.running_exports)
513        return
514
515    def test_get_export_job_status(self):
516        # we can get the stati of jobs...
[9726]517        container = self.TestedClass()
[9217]518        container.start_export_job('cave_exporter', 'alice')
519        container.start_export_job('cave_exporter', 'bob')
520        container.start_export_job('cave_exporter', 'bob')
521        result = container.get_export_jobs_status(user_id='bob')
522        # we'll get the raw value, a translation and the title of the
523        # exporter
524        self.assertEqual(
525            result,
526            [('new', u'new', u'Dummy cave exporter'),
527             ('completed', u'completed', u'Dummy cave exporter')]
528            )
529        return
530
531    def test_delete_export_entry(self):
532        # we can remove export entries in local lists and the job
533        # manager as well...
[9726]534        container = self.TestedClass()
[9217]535        container.start_export_job('cave_exporter', 'bob')
536        entry = container.running_exports[0]
537        container.delete_export_entry(entry)
538        # both, running_exports list and job manager are empty now
539        self.assertEqual(
540            container.running_exports, [])
541        self.assertEqual(
542            self.job_manager._jobs, {})
543        return
544
545    def test_delete_export_entry_remove_file(self):
546        # any result files of exports are deleted as well
[9726]547        container = self.TestedClass()
[9217]548        entry = ('4', 'cave_exporter', 'bob')
549        container.running_exports = [entry]
550        fake_job = FakeJobWithResult()
551        self.job_manager._jobs['4'] = fake_job
552        self.assertTrue(os.path.isfile(fake_job.result))
553        container.delete_export_entry(entry)
554        self.assertTrue(not os.path.exists(fake_job.result))
555        return
556
557    def test_entry_from_job_id(self):
558        # we can get an entry for a job_id if the id exists
[9726]559        container = self.TestedClass()
[9217]560        entry = ('4', 'cave_exporter', 'bob')
561        container.running_exports = [entry]
562        fake_job = FakeJobWithResult()
563        self.job_manager._jobs['4'] = fake_job
564        result1 = container.entry_from_job_id(None)
565        result2 = container.entry_from_job_id('4')
566        result3 = container.entry_from_job_id('23')
567        self.assertEqual(result1, None)
568        self.assertEqual(result2, ('4', 'cave_exporter', 'bob'))
569        self.assertEqual(result3, None)
[9283]570        shutil.rmtree(fake_job.dir_path)
[9217]571        return
[9726]572
573class VirtualExportJobContainerTests(ExportJobContainerTests):
574    # VirtualExportJobContainers should provide the
575    # same functionality as regular ones.
576
577    TestedClass = VirtualExportJobContainer
578
579    def setUp(self):
580        super(VirtualExportJobContainerTests, self).setUp()
581        self.root_job_container = ExportJobContainer()
582        def fake_finder():
583            return self.root_job_container
584        self.gsm = getGlobalSiteManager()
585        self.gsm.registerUtility(fake_finder, IExportContainerFinder)
586        return
587
588class ExportContainerFinderTests(FunctionalTestCase):
589    # Tests for export container finder.
590
591    layer = FunctionalLayer
592
593    def test_get_finder_as_util(self):
594        # we can get a finder by utility lookup
595        finder = getUtility(IExportContainerFinder)
596        self.assertTrue(finder is not None)
597        self.assertEqual(
598            IExportContainerFinder.providedBy(finder),
599            True)
600        return
601
602    def test_iface(self):
603        # the finder complies with the promised interface
604        finder = ExportContainerFinder()
605        verify.verifyClass(IExportContainerFinder,
606                           ExportContainerFinder)
607        verify.verifyObject(IExportContainerFinder, finder)
608        return
609
610    def test_no_site(self):
611        # a finder returns None if no site is available
612        finder = ExportContainerFinder()
613        self.assertEqual(
614            finder(), None)
615        return
616
617    def test_active_site(self):
618        # we get the datafinder if one is installed and site set
619        self.getRootFolder()['app'] = University()
620        finder = getUtility(IExportContainerFinder)
621        setSite(self.getRootFolder()['app'])
622        container = finder()
623        self.assertTrue(container is not None)
624        return
625
626    def test_broken_site(self):
627        # if the current site has no ExportContainer, we get None
628        self.getRootFolder()['app'] = University()
629        app = self.getRootFolder()['app']
630        del app['datacenter'] # datacenter _is_ the export container
631        setSite(app)
632        finder = getUtility(IExportContainerFinder)
633        container = finder()
634        self.assertTrue(container is None)
635        return
Note: See TracBrowser for help on using the repository browser.