source: main/waeup.kofa/trunk/src/waeup/kofa/utils/tests/test_batching.py @ 9988

Last change on this file since 9988 was 9797, checked in by uli, 12 years ago

Local exports for departments.

  • Property svn:keywords set to Id
File size: 21.7 KB
Line 
1## $Id: test_batching.py 9797 2012-12-13 15:39:31Z uli $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
18import datetime
19import doctest
20import logging
21import os
22import shutil
23import tempfile
24import unittest
25from zc.async.interfaces import IJob
26from zope import schema
27from zope.component import provideUtility, getGlobalSiteManager, getUtility
28from zope.component.factory import Factory
29from zope.component.hooks import clearSite, setSite
30from zope.component.interfaces import IFactory
31from zope.interface import Interface, implements, verify
32from waeup.kofa.app import University
33from waeup.kofa.interfaces import (
34    ICSVExporter, IBatchProcessor, IExportJobContainer, IJobManager,
35    IExportJob, IExportContainerFinder)
36from waeup.kofa.testing import (
37    FunctionalLayer, FunctionalTestCase, FakeJob, FakeJobManager)
38from waeup.kofa.utils.batching import (
39    ExporterBase, BatchProcessor, export_job, AsyncExportJob,
40    ExportJobContainer, VirtualExportJobContainer, ExportContainerFinder)
41
42optionflags = (
43    doctest.REPORT_NDIFF + doctest.ELLIPSIS + doctest.NORMALIZE_WHITESPACE)
44
45
46class ICave(Interface):
47    """A cave."""
48    id_num = schema.TextLine(
49        title = u'internal id',
50        default = u'default',
51        required = True,
52        readonly = True,
53        )
54    name = schema.TextLine(
55        title = u'Cave name',
56        default = u'Unnamed',
57        required = True)
58    dinoports = schema.Int(
59        title = u'Number of DinoPorts (tm)',
60        required = False,
61        default = 1)
62    owner = schema.TextLine(
63        title = u'Owner name',
64        required = True,
65        missing_value = 'Fred Estates Inc.')
66    taxpayer = schema.Bool(
67        title = u'Payes taxes',
68        required = True,
69        default = False)
70
71class Cave(object):
72    implements(ICave)
73    def __init__(self, name=u'Unnamed', dinoports=2,
74                 owner='Fred Estates Inc.', taxpayer=False):
75        self.name = name
76        self.dinoports = 2
77        self.owner = owner
78        self.taxpayer = taxpayer
79
80stoneville = dict
81
82SAMPLE_DATA = """name,dinoports,owner,taxpayer
83Barneys Home,2,Barney,1
84Wilmas Asylum,1,Wilma,1
85Freds Dinoburgers,10,Fred,0
86Joeys Drive-in,110,Joey,0
87"""
88
89SAMPLE_FILTERED_DATA = """name,dinoports,owner,taxpayer
90Barneys Home,2,Barney,1
91Wilmas Asylum,1,Wilma,1
92"""
93
94class CaveProcessor(BatchProcessor):
95    util_name = 'caveprocessor'
96    name = 'Cave Processor'
97    iface = ICave
98    location_fields = ['name']
99    factory_name = 'Lovely Cave'
100
101    def parentsExist(self, row, site):
102        return True
103
104    def getParent(self, row, site):
105        return stoneville
106
107    def entryExists(self, row, site):
108        return row['name'] in stoneville.keys()
109
110    def getEntry(self, row, site):
111        if not self.entryExists(row, site):
112            return None
113        return stoneville[row['name']]
114
115    def delEntry(self, row, site):
116        del stoneville[row['name']]
117
118    def addEntry(self, obj, row, site):
119        stoneville[row['name']] = obj
120
121class BatchProcessorTests(FunctionalTestCase):
122
123    layer = FunctionalLayer
124
125    def setupLogger(self):
126
127        self.logger = logging.getLogger('stoneville')
128        self.logger.setLevel(logging.DEBUG)
129        self.logger.propagate = False
130        self.logfile = os.path.join(self.workdir, 'stoneville.log')
131        self.handler = logging.FileHandler(self.logfile, 'w')
132        self.logger.addHandler(self.handler)
133
134    def setUp(self):
135        global stoneville
136        super(BatchProcessorTests, self).setUp()
137
138        # Setup a sample site for each test
139        app = University()
140        self.dc_root = tempfile.mkdtemp()
141        app['datacenter'].setStoragePath(self.dc_root)
142
143        # Prepopulate the ZODB...
144        self.getRootFolder()['app'] = app
145        self.app = self.getRootFolder()['app']
146
147        self.workdir = tempfile.mkdtemp()
148        factory = Factory(Cave)
149        provideUtility(factory, IFactory, 'Lovely Cave')
150
151        # Provide sample data
152        self.newcomers_csv = os.path.join(self.workdir, 'newcomers.csv')
153        open(self.newcomers_csv, 'wb').write(SAMPLE_DATA)
154        self.setupLogger()
155        self.stoneville = stoneville
156        stoneville = dict()
157        self.resultpath = None
158        return
159
160    def tearDown(self):
161        super(BatchProcessorTests, self).tearDown()
162        shutil.rmtree(self.workdir)
163        shutil.rmtree(self.dc_root)
164        self.logger.removeHandler(self.handler)
165        clearSite()
166        if not isinstance(self.resultpath, list):
167            self.resultpath = [self.resultpath]
168        for path in self.resultpath:
169            if not isinstance(path, basestring):
170                continue
171            if not os.path.isdir(path):
172                path = os.path.dirname(path)
173            if os.path.exists(path):
174                shutil.rmtree(path)
175        return
176
177    def test_iface(self):
178        # make sure we fullfill interface contracts
179        obj = BatchProcessor()
180        verify.verifyClass(IBatchProcessor, BatchProcessor)
181        verify.verifyObject(IBatchProcessor, obj)
182        return
183
184    def test_import(self):
185        processor = CaveProcessor()
186        result = processor.doImport(
187            self.newcomers_csv,
188            ['name', 'dinoports', 'owner', 'taxpayer'],
189            mode='create', user='Bob', logger=self.logger)
190        num_succ, num_fail, finished_path, failed_path = result
191        self.resultpath = [finished_path, failed_path]
192        assert num_succ == 4
193        assert num_fail == 0
194        assert finished_path.endswith('/newcomers.finished.csv')
195        assert failed_path is None
196
197    def test_import_stoneville(self):
198        processor = CaveProcessor()
199        result = processor.doImport(
200            self.newcomers_csv,
201            ['name', 'dinoports', 'owner', 'taxpayer'],
202            mode='create', user='Bob', logger=self.logger)
203        num_succ, num_fail, finished_path, failed_path = result
204        self.resultpath = [finished_path, failed_path]
205        assert len(self.stoneville) == 4
206        self.assertEqual(
207            sorted(self.stoneville.keys()),
208            [u'Barneys Home', u'Freds Dinoburgers',
209             u'Joeys Drive-in', u'Wilmas Asylum'])
210
211    def test_import_correct_type(self):
212        processor = CaveProcessor()
213        result = processor.doImport(
214            self.newcomers_csv,
215            ['name', 'dinoports', 'owner', 'taxpayer'],
216            mode='create', user='Bob', logger=self.logger)
217        num_succ, num_fail, finished_path, failed_path = result
218        self.resultpath = [finished_path, failed_path]
219        assert isinstance(self.stoneville['Barneys Home'].dinoports, int)
220
221
222    def test_log(self):
223        """
224           >>> print log_contents
225           processed: /.../newcomers.csv, create mode, 4 lines (4 successful/ 0 failed), ... s (... s/item)
226
227        """
228        processor = CaveProcessor()
229        result = processor.doImport(
230            self.newcomers_csv,
231            ['name', 'dinoports', 'owner', 'taxpayer'],
232            mode='create', user='Bob', logger=self.logger)
233        num_succ, num_fail, finished_path, failed_path = result
234        self.resultpath = [finished_path, failed_path]
235        log_contents = open(self.logfile, 'rb').read()
236        doctest.run_docstring_examples(
237            self.test_log, locals(), False, 'test_log', None, optionflags)
238        return
239
240class ExporterBaseTests(unittest.TestCase):
241
242    def setUp(self):
243        self.workdir = tempfile.mkdtemp()
244        self.workfile = os.path.join(self.workdir, 'testfile.csv')
245        return
246
247    def tearDown(self):
248        shutil.rmtree(self.workdir)
249        return
250
251    def test_iface(self):
252        # ExporterBase really implements the promised interface.
253        obj = ExporterBase()
254        verify.verifyClass(ICSVExporter, ExporterBase)
255        verify.verifyObject(ICSVExporter, obj)
256        return
257
258    def test_unimplemented(self):
259        # make sure the not implemented methods signal that.
260        exporter = ExporterBase()
261        self.assertRaises(NotImplementedError, exporter.export_all, None)
262        self.assertRaises(NotImplementedError, exporter.export, None)
263        return
264
265    def test_mangle_value(self):
266        # some basic types are mangled correctly
267        exporter = ExporterBase()
268        result1 = exporter.mangle_value(True, 'foo')
269        result2 = exporter.mangle_value(False, 'foo')
270        result3 = exporter.mangle_value('string', 'foo')
271        result4 = exporter.mangle_value(u'string', 'foo')
272        result5 = exporter.mangle_value(None, 'foo')
273        result6 = exporter.mangle_value(datetime.date(2012, 4, 1), 'foo')
274        result7 = exporter.mangle_value(
275            datetime.datetime(2012, 4, 1, 12, 1, 1), 'foo')
276        self.assertEqual(
277            (result1, result2, result3, result4, result5),
278            ('1', '0', u'string', u'string', ''))
279        self.assertEqual(type(result3), type('string'))
280        self.assertEqual(type(result4), type('string'))
281        # dates are formatted with trailing hash
282        self.assertEqual(result6, '2012-04-01#')
283        # datetimes are formatted as yyyy-mm-dd hh:mm:ss
284        self.assertEqual(result7, '2012-04-01 12:01:01')
285        return
286
287    def test_get_csv_writer(self):
288        # we can get a CSV writer to a memory file
289        exporter = ExporterBase()
290        writer, outfile = exporter.get_csv_writer()
291        writer.writerow(dict(code='A', title='B', title_prefix='C'))
292        outfile.seek(0)
293        self.assertEqual(
294            outfile.read(),
295            'code,title,title_prefix\r\nA,B,C\r\n')
296        return
297
298    def test_get_csv_writer_with_file(self):
299        # we can get CSV writer that writes to a real file
300        exporter = ExporterBase()
301        writer, outfile = exporter.get_csv_writer(filepath=self.workfile)
302        writer.writerow(dict(code='A', title='B', title_prefix='C'))
303        outfile.close()
304        resultfile = open(self.workfile, 'rb')
305        self.assertEqual(
306            resultfile.read(),
307            'code,title,title_prefix\r\nA,B,C\r\n')
308        return
309
310    def test_write_item(self):
311        # we can write items to opened exporter files.
312        exporter = ExporterBase()
313        writer, outfile = exporter.get_csv_writer()
314        class Sample(object):
315            code = 'A'
316            title = u'B'
317            title_prefix = True
318        exporter.write_item(Sample(), writer)
319        outfile.seek(0)
320        self.assertEqual(
321            outfile.read(),
322            'code,title,title_prefix\r\nA,B,1\r\n')
323        return
324
325    def test_close_outfile(self):
326        # exporters can help to close outfiles.
327        exporter = ExporterBase()
328        writer, outfile = exporter.get_csv_writer()
329        result = exporter.close_outfile(None, outfile)
330        self.assertEqual(result, 'code,title,title_prefix\r\n')
331        return
332
333    def test_close_outfile_real(self):
334        # we can also close outfiles in real files.
335        exporter = ExporterBase()
336        writer, outfile = exporter.get_csv_writer(filepath=self.workfile)
337        result = exporter.close_outfile(self.workfile, outfile)
338        self.assertEqual(result, None)
339        return
340
341    def test_export_filtered(self):
342        # we can pass in positional and keyword args
343        exporter = ExporterBase()
344        writer, outfile = exporter.get_csv_writer(filepath=self.workfile)
345        self.assertRaises(NotImplementedError, exporter.export_filtered,
346                          'foo', bar='bar')
347        return
348
349class CaveExporter(ExporterBase):
350    # A minimal fake exporter suitable to be called by export_jobs
351    fields = ('name', 'dinoports', 'owner', 'taxpayer')
352    title = u'Dummy cave exporter'
353
354    def export_all(self, site, filepath=None):
355        if filepath is None:
356            return SAMPLE_DATA
357        open(filepath, 'wb').write(SAMPLE_DATA)
358        return
359
360    def export_filtered(self, site, filepath=None, foo=None, bar=None):
361        if foo or bar:
362            open(filepath, 'wb').write(SAMPLE_FILTERED_DATA)
363            return
364        self.export_all(site, filepath=filepath)
365        return
366
367class ExportJobTests(unittest.TestCase):
368    # Test asynchronous export functionality (simple cases)
369
370    def setUp(self):
371        # register a suitable ICSVExporter as named utility
372        self.exporter = CaveExporter()
373        self.gsm = getGlobalSiteManager()
374        self.gsm.registerUtility(
375            self.exporter, ICSVExporter, name='cave_exporter')
376
377    def tearDown(self):
378        self.gsm.unregisterUtility(self.exporter)
379
380    def test_export_job_func(self):
381        # the export_job func does really export data...
382        result_path = export_job(None, 'cave_exporter')
383        self.assertTrue(os.path.isfile(result_path))
384        contents = open(result_path, 'rb').read()
385        shutil.rmtree(os.path.dirname(result_path))
386        self.assertEqual(contents, SAMPLE_DATA)
387        return
388
389    def test_export_job_interfaces(self):
390        # the AsyncExportJob implements promised interfaces correctly...
391        job = AsyncExportJob(None, None)
392        verify.verifyClass(IJob, AsyncExportJob)
393        verify.verifyObject(IJob, job)
394        verify.verifyClass(IExportJob, AsyncExportJob)
395        verify.verifyObject(IExportJob, job)
396        return
397
398    def test_export_job_with_args(self):
399        # we export filtered sets
400        result_path = export_job(None, 'cave_exporter', foo='foo')
401        contents = open(result_path, 'rb').read()
402        shutil.rmtree(os.path.dirname(result_path))
403        self.assertEqual(contents, SAMPLE_FILTERED_DATA)
404        return
405
406
407class FakeJobWithResult(FakeJob):
408
409    def __init__(self):
410        self.dir_path = tempfile.mkdtemp()
411        self.result = os.path.join(self.dir_path, 'fake.csv')
412        open(self.result, 'wb').write('a fake result')
413        return
414
415class ExportJobContainerTests(unittest.TestCase):
416    # Test ExportJobContainer
417
418    TestedClass = ExportJobContainer
419
420    def setUp(self):
421        # register a suitable ICSVExporter as named utility
422        self.exporter = CaveExporter()
423        self.job_manager = FakeJobManager()
424        self.gsm = getGlobalSiteManager()
425        self.gsm.registerUtility(
426            self.exporter, ICSVExporter, name='cave_exporter')
427        self.gsm.registerUtility(
428            self.job_manager, IJobManager)
429
430    def tearDown(self):
431        self.gsm.unregisterUtility(self.exporter)
432        self.gsm.unregisterUtility(self.job_manager, IJobManager)
433
434    def test_export_job_interfaces(self):
435        # the ExportJobContainer implements promised interfaces correctly...
436        container = self.TestedClass()
437        verify.verifyClass(IExportJobContainer, self.TestedClass)
438        verify.verifyObject(IExportJobContainer, container)
439        return
440
441    def test_start_export_job(self):
442        # we can start jobs
443        container = self.TestedClass()
444        container.start_export_job('cave_exporter', 'bob')
445        result = self.job_manager._jobs.values()[0]
446        self.assertTrue(IJob.providedBy(result))
447        self.assertEqual(
448            container.running_exports,
449            [('1', 'cave_exporter', 'bob')]
450            )
451        return
452
453    def test_get_running_export_jobs_all(self):
454        # we can get export jobs of all users
455        container = self.TestedClass()
456        container.start_export_job('cave_exporter', 'bob')
457        container.start_export_job('cave_exporter', 'alice')
458        result = container.get_running_export_jobs()
459        self.assertEqual(
460            result,
461            [('1', 'cave_exporter', 'bob'),
462             ('2', 'cave_exporter', 'alice')]
463            )
464        return
465
466    def test_get_running_export_jobs_user(self):
467        # we can get the export jobs running for a certain user
468        container = self.TestedClass()
469        container.start_export_job('cave_exporter', 'bob')
470        container.start_export_job('cave_exporter', 'alice')
471        result1 = container.get_running_export_jobs(user_id='alice')
472        result2 = container.get_running_export_jobs(user_id='foo')
473        self.assertEqual(
474            result1, [('2', 'cave_exporter', 'alice')])
475        self.assertEqual(
476            result2, [])
477        return
478
479    def test_get_running_export_jobs_only_if_exist(self):
480        # we get only jobs that are accessible through the job manager...
481        container = self.TestedClass()
482        container.start_export_job('cave_exporter', 'bob')
483        container.start_export_job('cave_exporter', 'bob')
484        self.assertTrue(
485            ('2', 'cave_exporter', 'bob') in container.running_exports)
486        # we remove the second entry from job manager
487        del self.job_manager._jobs['2']
488        result = container.get_running_export_jobs(user_id='bob')
489        self.assertEqual(
490            result, [('1', 'cave_exporter', 'bob')])
491        self.assertTrue(
492            ('2', 'cave_exporter', 'bob') not in container.running_exports)
493        return
494
495    def test_get_export_job_status(self):
496        # we can get the stati of jobs...
497        container = self.TestedClass()
498        container.start_export_job('cave_exporter', 'alice')
499        container.start_export_job('cave_exporter', 'bob')
500        container.start_export_job('cave_exporter', 'bob')
501        result = container.get_export_jobs_status(user_id='bob')
502        # we'll get the raw value, a translation and the title of the
503        # exporter
504        self.assertEqual(
505            result,
506            [('new', u'new', u'Dummy cave exporter'),
507             ('completed', u'completed', u'Dummy cave exporter')]
508            )
509        return
510
511    def test_delete_export_entry(self):
512        # we can remove export entries in local lists and the job
513        # manager as well...
514        container = self.TestedClass()
515        container.start_export_job('cave_exporter', 'bob')
516        entry = container.running_exports[0]
517        container.delete_export_entry(entry)
518        # both, running_exports list and job manager are empty now
519        self.assertEqual(
520            container.running_exports, [])
521        self.assertEqual(
522            self.job_manager._jobs, {})
523        return
524
525    def test_delete_export_entry_remove_file(self):
526        # any result files of exports are deleted as well
527        container = self.TestedClass()
528        entry = ('4', 'cave_exporter', 'bob')
529        container.running_exports = [entry]
530        fake_job = FakeJobWithResult()
531        self.job_manager._jobs['4'] = fake_job
532        self.assertTrue(os.path.isfile(fake_job.result))
533        container.delete_export_entry(entry)
534        self.assertTrue(not os.path.exists(fake_job.result))
535        return
536
537    def test_entry_from_job_id(self):
538        # we can get an entry for a job_id if the id exists
539        container = self.TestedClass()
540        entry = ('4', 'cave_exporter', 'bob')
541        container.running_exports = [entry]
542        fake_job = FakeJobWithResult()
543        self.job_manager._jobs['4'] = fake_job
544        result1 = container.entry_from_job_id(None)
545        result2 = container.entry_from_job_id('4')
546        result3 = container.entry_from_job_id('23')
547        self.assertEqual(result1, None)
548        self.assertEqual(result2, ('4', 'cave_exporter', 'bob'))
549        self.assertEqual(result3, None)
550        shutil.rmtree(fake_job.dir_path)
551        return
552
553class VirtualExportJobContainerTests(ExportJobContainerTests):
554    # VirtualExportJobContainers should provide the
555    # same functionality as regular ones.
556
557    TestedClass = VirtualExportJobContainer
558
559    def setUp(self):
560        super(VirtualExportJobContainerTests, self).setUp()
561        self.root_job_container = ExportJobContainer()
562        def fake_finder():
563            return self.root_job_container
564        self.gsm = getGlobalSiteManager()
565        self.gsm.registerUtility(fake_finder, IExportContainerFinder)
566        return
567
568class ExportContainerFinderTests(FunctionalTestCase):
569    # Tests for export container finder.
570
571    layer = FunctionalLayer
572
573    def test_get_finder_as_util(self):
574        # we can get a finder by utility lookup
575        finder = getUtility(IExportContainerFinder)
576        self.assertTrue(finder is not None)
577        self.assertEqual(
578            IExportContainerFinder.providedBy(finder),
579            True)
580        return
581
582    def test_iface(self):
583        # the finder complies with the promised interface
584        finder = ExportContainerFinder()
585        verify.verifyClass(IExportContainerFinder,
586                           ExportContainerFinder)
587        verify.verifyObject(IExportContainerFinder, finder)
588        return
589
590    def test_no_site(self):
591        # a finder returns None if no site is available
592        finder = ExportContainerFinder()
593        self.assertEqual(
594            finder(), None)
595        return
596
597    def test_active_site(self):
598        # we get the datafinder if one is installed and site set
599        self.getRootFolder()['app'] = University()
600        finder = getUtility(IExportContainerFinder)
601        setSite(self.getRootFolder()['app'])
602        container = finder()
603        self.assertTrue(container is not None)
604        return
605
606    def test_broken_site(self):
607        # if the current site has no ExportContainer, we get None
608        self.getRootFolder()['app'] = University()
609        app = self.getRootFolder()['app']
610        del app['datacenter'] # datacenter _is_ the export container
611        setSite(app)
612        finder = getUtility(IExportContainerFinder)
613        container = finder()
614        self.assertTrue(container is None)
615        return
Note: See TracBrowser for help on using the repository browser.