source: main/waeup.ikoba/trunk/src/waeup/ikoba/utils/tests/test_batching.py @ 17397

Last change on this file since 17397 was 12267, checked in by Henrik Bettermann, 10 years ago

Use a baseclass for document batch processors.

  • Property svn:keywords set to Id
File size: 21.5 KB
Line 
1## $Id: test_batching.py 12267 2014-12-20 15:59:04Z henrik $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
18import datetime
19import doctest
20import logging
21import os
22import shutil
23import tempfile
24import unittest
25from zc.async.interfaces import IJob
26from zope import schema
27from zope.component import provideUtility, getGlobalSiteManager, getUtility
28from zope.component.factory import Factory
29from zope.component.hooks import clearSite, setSite
30from zope.component.interfaces import IFactory
31from zope.interface import Interface, implements, verify
32from waeup.ikoba.app import Company
33from waeup.ikoba.interfaces import (
34    ICSVExporter, IBatchProcessor, IExportJobContainer, IJobManager,
35    IExportJob, IExportContainerFinder)
36from waeup.ikoba.testing import (
37    FunctionalLayer, FunctionalTestCase, FakeJob, FakeJobManager)
38from waeup.ikoba.utils.batching import (
39    ExporterBase, BatchProcessor, export_job, AsyncExportJob,
40    ExportJobContainer, VirtualExportJobContainer, ExportContainerFinder)
41
42optionflags = (
43    doctest.REPORT_NDIFF + doctest.ELLIPSIS + doctest.NORMALIZE_WHITESPACE)
44
45
46class ICave(Interface):
47    """A cave."""
48    id_num = schema.TextLine(
49        title = u'internal id',
50        default = u'default',
51        required = True,
52        readonly = True,
53        )
54    name = schema.TextLine(
55        title = u'Cave name',
56        default = u'Unnamed',
57        required = True)
58    dinoports = schema.Int(
59        title = u'Number of DinoPorts (tm)',
60        required = False,
61        default = 1)
62    owner = schema.TextLine(
63        title = u'Owner name',
64        required = True,
65        missing_value = 'Fred Estates Inc.')
66    taxpayer = schema.Bool(
67        title = u'Payes taxes',
68        required = True,
69        default = False)
70
71class Cave(object):
72    implements(ICave)
73    def __init__(self, name=u'Unnamed', dinoports=2,
74                 owner='Fred Estates Inc.', taxpayer=False):
75        self.name = name
76        self.dinoports = 2
77        self.owner = owner
78        self.taxpayer = taxpayer
79
80stoneville = dict
81
82SAMPLE_DATA = """name,dinoports,owner,taxpayer
83Barneys Home,2,Barney,1
84Wilmas Asylum,1,Wilma,1
85Freds Dinoburgers,10,Fred,0
86Joeys Drive-in,110,Joey,0
87"""
88
89SAMPLE_FILTERED_DATA = """name,dinoports,owner,taxpayer
90Barneys Home,2,Barney,1
91Wilmas Asylum,1,Wilma,1
92"""
93
94class CaveProcessor(BatchProcessor):
95    util_name = 'caveprocessor'
96    name = 'Cave Processor'
97    iface = ICave
98    location_fields = ['name']
99    factory_name = 'Lovely Cave'
100
101    def parentsExist(self, row, site):
102        return True
103
104    def getParent(self, row, site):
105        return stoneville
106
107    def entryExists(self, row, site):
108        return row['name'] in stoneville.keys()
109
110    def getEntry(self, row, site):
111        if not self.entryExists(row, site):
112            return None
113        return stoneville[row['name']]
114
115    def delEntry(self, row, site):
116        del stoneville[row['name']]
117
118    def addEntry(self, obj, row, site):
119        stoneville[row['name']] = obj
120
121class BatchProcessorTests(FunctionalTestCase):
122
123    layer = FunctionalLayer
124
125    def setupLogger(self):
126
127        self.logger = logging.getLogger('stoneville')
128        self.logger.setLevel(logging.DEBUG)
129        self.logger.propagate = False
130        self.logfile = os.path.join(self.workdir, 'stoneville.log')
131        self.handler = logging.FileHandler(self.logfile, 'w')
132        self.logger.addHandler(self.handler)
133
134    def setUp(self):
135        global stoneville
136        super(BatchProcessorTests, self).setUp()
137
138        # Setup a sample site for each test
139        app = Company()
140        self.dc_root = tempfile.mkdtemp()
141        app['datacenter'].setStoragePath(self.dc_root)
142
143        # Prepopulate the ZODB...
144        self.getRootFolder()['app'] = app
145        self.app = self.getRootFolder()['app']
146
147        self.workdir = tempfile.mkdtemp()
148        factory = Factory(Cave)
149        provideUtility(factory, IFactory, 'Lovely Cave')
150
151        # Provide sample data
152        self.newcomers_csv = os.path.join(self.workdir, 'newcomers.csv')
153        open(self.newcomers_csv, 'wb').write(SAMPLE_DATA)
154        self.setupLogger()
155        self.stoneville = stoneville
156        stoneville = dict()
157        self.resultpath = None
158        return
159
160    def tearDown(self):
161        super(BatchProcessorTests, self).tearDown()
162        shutil.rmtree(self.workdir)
163        shutil.rmtree(self.dc_root)
164        self.logger.removeHandler(self.handler)
165        clearSite()
166        if not isinstance(self.resultpath, list):
167            self.resultpath = [self.resultpath]
168        for path in self.resultpath:
169            if not isinstance(path, basestring):
170                continue
171            if not os.path.isdir(path):
172                path = os.path.dirname(path)
173            if os.path.exists(path):
174                shutil.rmtree(path)
175        return
176
177    def test_iface(self):
178        # make sure we fullfill interface contracts
179        obj = BatchProcessor()
180        verify.verifyClass(IBatchProcessor, BatchProcessor)
181        verify.verifyObject(IBatchProcessor, obj)
182        return
183
184    def test_import(self):
185        processor = CaveProcessor()
186        result = processor.doImport(
187            self.newcomers_csv,
188            ['name', 'dinoports', 'owner', 'taxpayer'],
189            mode='create', user='Bob', logger=self.logger)
190        num, num_fail, finished_path, failed_path = result
191        self.resultpath = [finished_path, failed_path]
192        assert num == 4
193        assert num_fail == 0
194        assert finished_path.endswith('/newcomers.finished.csv')
195        assert failed_path is None
196
197    def test_import_stoneville(self):
198        processor = CaveProcessor()
199        result = processor.doImport(
200            self.newcomers_csv,
201            ['name', 'dinoports', 'owner', 'taxpayer'],
202            mode='create', user='Bob', logger=self.logger)
203        num, num_fail, finished_path, failed_path = result
204        self.resultpath = [finished_path, failed_path]
205        assert len(self.stoneville) == 4
206        self.assertEqual(
207            sorted(self.stoneville.keys()),
208            [u'Barneys Home', u'Freds Dinoburgers',
209             u'Joeys Drive-in', u'Wilmas Asylum'])
210
211    def test_import_correct_type(self):
212        processor = CaveProcessor()
213        result = processor.doImport(
214            self.newcomers_csv,
215            ['name', 'dinoports', 'owner', 'taxpayer'],
216            mode='create', user='Bob', logger=self.logger)
217        num, num_fail, finished_path, failed_path = result
218        self.resultpath = [finished_path, failed_path]
219        assert isinstance(self.stoneville['Barneys Home'].dinoports, int)
220
221
222    def test_log(self):
223        """
224           >>> print log_contents
225           processed: /.../newcomers.csv, create mode, 4 lines (4 successful/ 0 failed), ... s (... s/item)
226
227        """
228        processor = CaveProcessor()
229        result = processor.doImport(
230            self.newcomers_csv,
231            ['name', 'dinoports', 'owner', 'taxpayer'],
232            mode='create', user='Bob', logger=self.logger)
233        num, num_fail, finished_path, failed_path = result
234        self.resultpath = [finished_path, failed_path]
235        log_contents = open(self.logfile, 'rb').read()
236        doctest.run_docstring_examples(
237            self.test_log, locals(), False, 'test_log', None, optionflags)
238        return
239
240class ExporterBaseTests(unittest.TestCase):
241
242    def setUp(self):
243        self.workdir = tempfile.mkdtemp()
244        self.workfile = os.path.join(self.workdir, 'testfile.csv')
245        return
246
247    def tearDown(self):
248        shutil.rmtree(self.workdir)
249        return
250
251    def test_iface(self):
252        # ExporterBase really implements the promised interface.
253        obj = ExporterBase()
254        verify.verifyClass(ICSVExporter, ExporterBase)
255        verify.verifyObject(ICSVExporter, obj)
256        return
257
258    def test_unimplemented(self):
259        # make sure the not implemented methods signal that.
260        exporter = ExporterBase()
261        self.assertRaises(NotImplementedError, exporter.export_all, None)
262        self.assertRaises(NotImplementedError, exporter.export, None)
263        return
264
265    def test_mangle_value(self):
266        # some basic types are mangled correctly
267        exporter = ExporterBase()
268        result1 = exporter.mangle_value(True, 'foo')
269        result2 = exporter.mangle_value(False, 'foo')
270        result3 = exporter.mangle_value('string', 'foo')
271        result4 = exporter.mangle_value(u'string', 'foo')
272        result5 = exporter.mangle_value(None, 'foo')
273        result6 = exporter.mangle_value(datetime.date(2012, 4, 1), 'foo')
274        result7 = exporter.mangle_value(
275            datetime.datetime(2012, 4, 1, 12, 1, 1), 'foo')
276        self.assertEqual(
277            (result1, result2, result3, result4, result5),
278            ('1', '0', u'string', u'string', ''))
279        self.assertEqual(type(result3), type('string'))
280        self.assertEqual(type(result4), type('string'))
281        # dates are formatted with trailing hash
282        self.assertEqual(result6, '2012-04-01#')
283        # datetimes are formatted as yyyy-mm-dd hh:mm:ss
284        self.assertEqual(result7, '2012-04-01 12:01:01#')
285        return
286
287    def test_get_csv_writer(self):
288        # we can get a CSV writer to a memory file
289        exporter = ExporterBase()
290        writer, outfile = exporter.get_csv_writer()
291        writer.writerow(dict(any_id='A', any_title='B'))
292        outfile.seek(0)
293        self.assertEqual(
294            outfile.read(),
295            'any_id,any_title\r\nA,B\r\n')
296        return
297
298    def test_get_csv_writer_with_file(self):
299        # we can get CSV writer that writes to a real file
300        exporter = ExporterBase()
301        writer, outfile = exporter.get_csv_writer(filepath=self.workfile)
302        writer.writerow(dict(any_id='A', any_title='B'))
303        outfile.close()
304        resultfile = open(self.workfile, 'rb')
305        self.assertEqual(
306            resultfile.read(),
307            'any_id,any_title\r\nA,B\r\n')
308        return
309
310    def test_write_item(self):
311        # we can write items to opened exporter files.
312        exporter = ExporterBase()
313        writer, outfile = exporter.get_csv_writer()
314        class Sample(object):
315            any_id = 'A'
316            any_title = u'B'
317        exporter.write_item(Sample(), writer)
318        outfile.seek(0)
319        self.assertEqual(
320            outfile.read(),
321            'any_id,any_title\r\nA,B\r\n')
322        return
323
324    def test_close_outfile(self):
325        # exporters can help to close outfiles.
326        exporter = ExporterBase()
327        writer, outfile = exporter.get_csv_writer()
328        result = exporter.close_outfile(None, outfile)
329        self.assertEqual(result, 'any_id,any_title\r\n')
330        return
331
332    def test_close_outfile_real(self):
333        # we can also close outfiles in real files.
334        exporter = ExporterBase()
335        writer, outfile = exporter.get_csv_writer(filepath=self.workfile)
336        result = exporter.close_outfile(self.workfile, outfile)
337        self.assertEqual(result, None)
338        return
339
340    def test_export_filtered(self):
341        # we can pass in positional and keyword args
342        exporter = ExporterBase()
343        writer, outfile = exporter.get_csv_writer(filepath=self.workfile)
344        self.assertRaises(NotImplementedError, exporter.export_filtered,
345                          'foo', bar='bar')
346        return
347
348class CaveExporter(ExporterBase):
349    # A minimal fake exporter suitable to be called by export_jobs
350    fields = ('name', 'dinoports', 'owner', 'taxpayer')
351    title = u'Dummy cave exporter'
352
353    def export_all(self, site, filepath=None):
354        if filepath is None:
355            return SAMPLE_DATA
356        open(filepath, 'wb').write(SAMPLE_DATA)
357        return
358
359    def export_filtered(self, site, filepath=None, foo=None, bar=None):
360        if foo or bar:
361            open(filepath, 'wb').write(SAMPLE_FILTERED_DATA)
362            return
363        self.export_all(site, filepath=filepath)
364        return
365
366class ExportJobTests(unittest.TestCase):
367    # Test asynchronous export functionality (simple cases)
368
369    def setUp(self):
370        # register a suitable ICSVExporter as named utility
371        self.exporter = CaveExporter()
372        self.gsm = getGlobalSiteManager()
373        self.gsm.registerUtility(
374            self.exporter, ICSVExporter, name='cave_exporter')
375
376    def tearDown(self):
377        self.gsm.unregisterUtility(self.exporter)
378
379    def test_export_job_func(self):
380        # the export_job func does really export data...
381        result_path = export_job(None, 'cave_exporter')
382        self.assertTrue(os.path.isfile(result_path))
383        contents = open(result_path, 'rb').read()
384        shutil.rmtree(os.path.dirname(result_path))
385        self.assertEqual(contents, SAMPLE_DATA)
386        return
387
388    def test_export_job_interfaces(self):
389        # the AsyncExportJob implements promised interfaces correctly...
390        job = AsyncExportJob(None, None)
391        verify.verifyClass(IJob, AsyncExportJob)
392        verify.verifyObject(IJob, job)
393        verify.verifyClass(IExportJob, AsyncExportJob)
394        verify.verifyObject(IExportJob, job)
395        return
396
397    def test_export_job_with_args(self):
398        # we export filtered sets
399        result_path = export_job(None, 'cave_exporter', foo='foo')
400        contents = open(result_path, 'rb').read()
401        shutil.rmtree(os.path.dirname(result_path))
402        self.assertEqual(contents, SAMPLE_FILTERED_DATA)
403        return
404
405
406class FakeJobWithResult(FakeJob):
407
408    def __init__(self):
409        self.dir_path = tempfile.mkdtemp()
410        self.result = os.path.join(self.dir_path, 'fake.csv')
411        open(self.result, 'wb').write('a fake result')
412        return
413
414class ExportJobContainerTests(unittest.TestCase):
415    # Test ExportJobContainer
416
417    TestedClass = ExportJobContainer
418
419    def setUp(self):
420        # register a suitable ICSVExporter as named utility
421        self.exporter = CaveExporter()
422        self.job_manager = FakeJobManager()
423        self.gsm = getGlobalSiteManager()
424        self.gsm.registerUtility(
425            self.exporter, ICSVExporter, name='cave_exporter')
426        self.gsm.registerUtility(
427            self.job_manager, IJobManager)
428
429    def tearDown(self):
430        self.gsm.unregisterUtility(self.exporter)
431        self.gsm.unregisterUtility(self.job_manager, IJobManager)
432
433    def test_export_job_interfaces(self):
434        # the ExportJobContainer implements promised interfaces correctly...
435        container = self.TestedClass()
436        verify.verifyClass(IExportJobContainer, self.TestedClass)
437        verify.verifyObject(IExportJobContainer, container)
438        return
439
440    def test_start_export_job(self):
441        # we can start jobs
442        container = self.TestedClass()
443        container.start_export_job('cave_exporter', 'bob')
444        result = self.job_manager._jobs.values()[0]
445        self.assertTrue(IJob.providedBy(result))
446        self.assertEqual(
447            container.running_exports,
448            [('1', 'cave_exporter', 'bob')]
449            )
450        return
451
452    def test_get_running_export_jobs_all(self):
453        # we can get export jobs of all users
454        container = self.TestedClass()
455        container.start_export_job('cave_exporter', 'bob')
456        container.start_export_job('cave_exporter', 'alice')
457        result = container.get_running_export_jobs()
458        self.assertEqual(
459            result,
460            [('1', 'cave_exporter', 'bob'),
461             ('2', 'cave_exporter', 'alice')]
462            )
463        return
464
465    def test_get_running_export_jobs_user(self):
466        # we can get the export jobs running for a certain user
467        container = self.TestedClass()
468        container.start_export_job('cave_exporter', 'bob')
469        container.start_export_job('cave_exporter', 'alice')
470        result1 = container.get_running_export_jobs(user_id='alice')
471        result2 = container.get_running_export_jobs(user_id='foo')
472        self.assertEqual(
473            result1, [('2', 'cave_exporter', 'alice')])
474        self.assertEqual(
475            result2, [])
476        return
477
478    def test_get_running_export_jobs_only_if_exist(self):
479        # we get only jobs that are accessible through the job manager...
480        container = self.TestedClass()
481        container.start_export_job('cave_exporter', 'bob')
482        container.start_export_job('cave_exporter', 'bob')
483        self.assertTrue(
484            ('2', 'cave_exporter', 'bob') in container.running_exports)
485        # we remove the second entry from job manager
486        del self.job_manager._jobs['2']
487        result = container.get_running_export_jobs(user_id='bob')
488        self.assertEqual(
489            result, [('1', 'cave_exporter', 'bob')])
490        self.assertTrue(
491            ('2', 'cave_exporter', 'bob') not in container.running_exports)
492        return
493
494    def test_get_export_job_status(self):
495        # we can get the stati of jobs...
496        container = self.TestedClass()
497        container.start_export_job('cave_exporter', 'alice')
498        container.start_export_job('cave_exporter', 'bob')
499        container.start_export_job('cave_exporter', 'bob')
500        result = container.get_export_jobs_status(user_id='bob')
501        # we'll get the raw value, a translation and the title of the
502        # exporter
503        self.assertEqual(
504            result,
505            [('new', u'new', u'Dummy cave exporter'),
506             ('completed', u'completed', u'Dummy cave exporter')]
507            )
508        return
509
510    def test_delete_export_entry(self):
511        # we can remove export entries in local lists and the job
512        # manager as well...
513        container = self.TestedClass()
514        container.start_export_job('cave_exporter', 'bob')
515        entry = container.running_exports[0]
516        container.delete_export_entry(entry)
517        # both, running_exports list and job manager are empty now
518        self.assertEqual(
519            container.running_exports, [])
520        self.assertEqual(
521            self.job_manager._jobs, {})
522        return
523
524    def test_delete_export_entry_remove_file(self):
525        # any result files of exports are deleted as well
526        container = self.TestedClass()
527        entry = ('4', 'cave_exporter', 'bob')
528        container.running_exports = [entry]
529        fake_job = FakeJobWithResult()
530        self.job_manager._jobs['4'] = fake_job
531        self.assertTrue(os.path.isfile(fake_job.result))
532        container.delete_export_entry(entry)
533        self.assertTrue(not os.path.exists(fake_job.result))
534        return
535
536    def test_entry_from_job_id(self):
537        # we can get an entry for a job_id if the id exists
538        container = self.TestedClass()
539        entry = ('4', 'cave_exporter', 'bob')
540        container.running_exports = [entry]
541        fake_job = FakeJobWithResult()
542        self.job_manager._jobs['4'] = fake_job
543        result1 = container.entry_from_job_id(None)
544        result2 = container.entry_from_job_id('4')
545        result3 = container.entry_from_job_id('23')
546        self.assertEqual(result1, None)
547        self.assertEqual(result2, ('4', 'cave_exporter', 'bob'))
548        self.assertEqual(result3, None)
549        shutil.rmtree(fake_job.dir_path)
550        return
551
552class VirtualExportJobContainerTests(ExportJobContainerTests):
553    # VirtualExportJobContainers should provide the
554    # same functionality as regular ones.
555
556    TestedClass = VirtualExportJobContainer
557
558    def setUp(self):
559        super(VirtualExportJobContainerTests, self).setUp()
560        self.root_job_container = ExportJobContainer()
561        def fake_finder():
562            return self.root_job_container
563        self.gsm = getGlobalSiteManager()
564        self.gsm.registerUtility(fake_finder, IExportContainerFinder)
565        return
566
567class ExportContainerFinderTests(FunctionalTestCase):
568    # Tests for export container finder.
569
570    layer = FunctionalLayer
571
572    def test_get_finder_as_util(self):
573        # we can get a finder by utility lookup
574        finder = getUtility(IExportContainerFinder)
575        self.assertTrue(finder is not None)
576        self.assertEqual(
577            IExportContainerFinder.providedBy(finder),
578            True)
579        return
580
581    def test_iface(self):
582        # the finder complies with the promised interface
583        finder = ExportContainerFinder()
584        verify.verifyClass(IExportContainerFinder,
585                           ExportContainerFinder)
586        verify.verifyObject(IExportContainerFinder, finder)
587        return
588
589    def test_no_site(self):
590        # a finder returns None if no site is available
591        finder = ExportContainerFinder()
592        self.assertEqual(
593            finder(), None)
594        return
595
596    def test_active_site(self):
597        # we get the datafinder if one is installed and site set
598        self.getRootFolder()['app'] = Company()
599        finder = getUtility(IExportContainerFinder)
600        setSite(self.getRootFolder()['app'])
601        container = finder()
602        self.assertTrue(container is not None)
603        return
604
605    def test_broken_site(self):
606        # if the current site has no ExportContainer, we get None
607        self.getRootFolder()['app'] = Company()
608        app = self.getRootFolder()['app']
609        del app['datacenter'] # datacenter _is_ the export container
610        setSite(app)
611        finder = getUtility(IExportContainerFinder)
612        container = finder()
613        self.assertTrue(container is None)
614        return
Note: See TracBrowser for help on using the repository browser.