source: main/waeup.kofa/branches/uli-zc-async/src/waeup/kofa/utils/tests/test_batching.py @ 9094

Last change on this file since 9094 was 9094, checked in by uli, 12 years ago

Add tests for export job containers.

  • Property svn:keywords set to Id
File size: 18.3 KB
Line 
1## $Id: test_batching.py 9094 2012-08-08 22:57:01Z uli $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
18import datetime
19import doctest
20import logging
21import os
22import shutil
23import tempfile
24import unittest
25from zc.async.interfaces import IJob, COMPLETED
26from zope import schema
27from zope.component import provideUtility, getGlobalSiteManager
28from zope.component.factory import Factory
29from zope.component.hooks import clearSite
30from zope.component.interfaces import IFactory
31from zope.interface import Interface, implements, verify
32from waeup.kofa.app import University
33from waeup.kofa.interfaces import (
34    ICSVExporter, IBatchProcessor, IExportJobContainer, IJobManager)
35from waeup.kofa.testing import FunctionalLayer, FunctionalTestCase
36from waeup.kofa.utils.batching import (
37    ExporterBase, BatchProcessor, export_job, AsyncExportJob,
38    ExportJobContainer)
39
40optionflags = (
41    doctest.REPORT_NDIFF + doctest.ELLIPSIS + doctest.NORMALIZE_WHITESPACE)
42
43
44class ICave(Interface):
45    """A cave."""
46    id_num = schema.TextLine(
47        title = u'internal id',
48        default = u'default',
49        required = True,
50        readonly = True,
51        )
52    name = schema.TextLine(
53        title = u'Cave name',
54        default = u'Unnamed',
55        required = True)
56    dinoports = schema.Int(
57        title = u'Number of DinoPorts (tm)',
58        required = False,
59        default = 1)
60    owner = schema.TextLine(
61        title = u'Owner name',
62        required = True,
63        missing_value = 'Fred Estates Inc.')
64    taxpayer = schema.Bool(
65        title = u'Payes taxes',
66        required = True,
67        default = False)
68
69class Cave(object):
70    implements(ICave)
71    def __init__(self, name=u'Unnamed', dinoports=2,
72                 owner='Fred Estates Inc.', taxpayer=False):
73        self.name = name
74        self.dinoports = 2
75        self.owner = owner
76        self.taxpayer = taxpayer
77
78stoneville = dict
79
80SAMPLE_DATA = """name,dinoports,owner,taxpayer
81Barneys Home,2,Barney,1
82Wilmas Asylum,1,Wilma,1
83Freds Dinoburgers,10,Fred,0
84Joeys Drive-in,110,Joey,0
85"""
86
87class CaveProcessor(BatchProcessor):
88    util_name = 'caveprocessor'
89    name = 'Cave Processor'
90    iface = ICave
91    location_fields = ['name']
92    factory_name = 'Lovely Cave'
93
94    def parentsExist(self, row, site):
95        return True
96
97    def getParent(self, row, site):
98        return stoneville
99
100    def entryExists(self, row, site):
101        return row['name'] in stoneville.keys()
102
103    def getEntry(self, row, site):
104        if not self.entryExists(row, site):
105            return None
106        return stoneville[row['name']]
107
108    def delEntry(self, row, site):
109        del stoneville[row['name']]
110
111    def addEntry(self, obj, row, site):
112        stoneville[row['name']] = obj
113
114class BatchProcessorTests(FunctionalTestCase):
115
116    layer = FunctionalLayer
117
118    def setupLogger(self):
119
120        self.logger = logging.getLogger('stoneville')
121        self.logger.setLevel(logging.DEBUG)
122        self.logger.propagate = False
123        self.logfile = os.path.join(self.workdir, 'stoneville.log')
124        self.handler = logging.FileHandler(self.logfile, 'w')
125        self.logger.addHandler(self.handler)
126
127    def setUp(self):
128        global stoneville
129        super(BatchProcessorTests, self).setUp()
130
131        # Setup a sample site for each test
132        app = University()
133        self.dc_root = tempfile.mkdtemp()
134        app['datacenter'].setStoragePath(self.dc_root)
135
136        # Prepopulate the ZODB...
137        self.getRootFolder()['app'] = app
138        self.app = self.getRootFolder()['app']
139
140        self.workdir = tempfile.mkdtemp()
141        factory = Factory(Cave)
142        provideUtility(factory, IFactory, 'Lovely Cave')
143
144        # Provide sample data
145        self.newcomers_csv = os.path.join(self.workdir, 'newcomers.csv')
146        open(self.newcomers_csv, 'wb').write(SAMPLE_DATA)
147        self.setupLogger()
148        self.stoneville = stoneville
149        stoneville = dict()
150        self.resultpath = None
151        return
152
153    def tearDown(self):
154        super(BatchProcessorTests, self).tearDown()
155        shutil.rmtree(self.workdir)
156        shutil.rmtree(self.dc_root)
157        self.logger.removeHandler(self.handler)
158        clearSite()
159        if not isinstance(self.resultpath, list):
160            self.resultpath = [self.resultpath]
161        for path in self.resultpath:
162            if not isinstance(path, basestring):
163                continue
164            if not os.path.isdir(path):
165                path = os.path.dirname(path)
166            if os.path.exists(path):
167                shutil.rmtree(path)
168        return
169
170    def test_iface(self):
171        # make sure we fullfill interface contracts
172        obj = BatchProcessor()
173        verify.verifyClass(IBatchProcessor, BatchProcessor)
174        verify.verifyObject(IBatchProcessor, obj)
175        return
176
177    def test_import(self):
178        processor = CaveProcessor()
179        result = processor.doImport(
180            self.newcomers_csv,
181            ['name', 'dinoports', 'owner', 'taxpayer'],
182            mode='create', user='Bob', logger=self.logger)
183        num_succ, num_fail, finished_path, failed_path = result
184        self.resultpath = [finished_path, failed_path]
185        assert num_succ == 4
186        assert num_fail == 0
187        assert finished_path.endswith('/newcomers.finished.csv')
188        assert failed_path is None
189
190    def test_import_stoneville(self):
191        processor = CaveProcessor()
192        result = processor.doImport(
193            self.newcomers_csv,
194            ['name', 'dinoports', 'owner', 'taxpayer'],
195            mode='create', user='Bob', logger=self.logger)
196        num_succ, num_fail, finished_path, failed_path = result
197        self.resultpath = [finished_path, failed_path]
198        assert len(self.stoneville) == 4
199        self.assertEqual(
200            sorted(self.stoneville.keys()),
201            [u'Barneys Home', u'Freds Dinoburgers',
202             u'Joeys Drive-in', u'Wilmas Asylum'])
203
204    def test_import_correct_type(self):
205        processor = CaveProcessor()
206        result = processor.doImport(
207            self.newcomers_csv,
208            ['name', 'dinoports', 'owner', 'taxpayer'],
209            mode='create', user='Bob', logger=self.logger)
210        num_succ, num_fail, finished_path, failed_path = result
211        self.resultpath = [finished_path, failed_path]
212        assert isinstance(self.stoneville['Barneys Home'].dinoports, int)
213
214
215    def test_log(self):
216        """
217           >>> print log_contents
218           --------------------
219           Bob: Batch processing finished: OK
220           Bob: Source: /.../newcomers.csv
221           Bob: Mode: create
222           Bob: User: Bob
223           Bob: Processing time: ... s (... s/item)
224           Bob: Processed: 4 lines (4 successful/ 0 failed)
225           --------------------
226
227        """
228        processor = CaveProcessor()
229        result = processor.doImport(
230            self.newcomers_csv,
231            ['name', 'dinoports', 'owner', 'taxpayer'],
232            mode='create', user='Bob', logger=self.logger)
233        num_succ, num_fail, finished_path, failed_path = result
234        self.resultpath = [finished_path, failed_path]
235        log_contents = open(self.logfile, 'rb').read()
236        doctest.run_docstring_examples(
237            self.test_log, locals(), False, 'test_log', None, optionflags)
238        return
239
240class ExporterBaseTests(unittest.TestCase):
241
242    def setUp(self):
243        self.workdir = tempfile.mkdtemp()
244        self.workfile = os.path.join(self.workdir, 'testfile.csv')
245        return
246
247    def tearDown(self):
248        shutil.rmtree(self.workdir)
249        return
250
251    def test_iface(self):
252        # ExporterBase really implements the promised interface.
253        obj = ExporterBase()
254        verify.verifyClass(ICSVExporter, ExporterBase)
255        verify.verifyObject(ICSVExporter, obj)
256        return
257
258    def test_unimplemented(self):
259        # make sure the not implemented methods signal that.
260        exporter = ExporterBase()
261        self.assertRaises(NotImplementedError, exporter.export_all, None)
262        self.assertRaises(NotImplementedError, exporter.export, None)
263        return
264
265    def test_mangle_value(self):
266        # some basic types are mangled correctly
267        exporter = ExporterBase()
268        result1 = exporter.mangle_value(True, 'foo')
269        result2 = exporter.mangle_value(False, 'foo')
270        result3 = exporter.mangle_value('string', 'foo')
271        result4 = exporter.mangle_value(u'string', 'foo')
272        result5 = exporter.mangle_value(None, 'foo')
273        result6 = exporter.mangle_value(datetime.date(2012, 4, 1), 'foo')
274        result7 = exporter.mangle_value(
275            datetime.datetime(2012, 4, 1, 12, 1, 1), 'foo')
276        self.assertEqual(
277            (result1, result2, result3, result4, result5),
278            ('1', '0', u'string', u'string', ''))
279        self.assertEqual(type(result3), type('string'))
280        self.assertEqual(type(result4), type('string'))
281        # dates are formatted with trailing hash
282        self.assertEqual(result6, '2012-04-01#')
283        # datetimes are formatted as yyyy-mm-dd hh:mm:ss
284        self.assertEqual(result7, '2012-04-01 12:01:01')
285        return
286
287    def test_get_csv_writer(self):
288        # we can get a CSV writer to a memory file
289        exporter = ExporterBase()
290        writer, outfile = exporter.get_csv_writer()
291        writer.writerow(dict(code='A', title='B', title_prefix='C'))
292        outfile.seek(0)
293        self.assertEqual(
294            outfile.read(),
295            'code,title,title_prefix\r\nA,B,C\r\n')
296        return
297
298    def test_get_csv_writer_with_file(self):
299        # we can get CSV writer that writes to a real file
300        exporter = ExporterBase()
301        writer, outfile = exporter.get_csv_writer(filepath=self.workfile)
302        writer.writerow(dict(code='A', title='B', title_prefix='C'))
303        outfile.close()
304        resultfile = open(self.workfile, 'rb')
305        self.assertEqual(
306            resultfile.read(),
307            'code,title,title_prefix\r\nA,B,C\r\n')
308        return
309
310    def test_write_item(self):
311        # we can write items to opened exporter files.
312        exporter = ExporterBase()
313        writer, outfile = exporter.get_csv_writer()
314        class Sample(object):
315            code = 'A'
316            title = u'B'
317            title_prefix = True
318        exporter.write_item(Sample(), writer)
319        outfile.seek(0)
320        self.assertEqual(
321            outfile.read(),
322            'code,title,title_prefix\r\nA,B,1\r\n')
323        return
324
325    def test_close_outfile(self):
326        # exporters can help to close outfiles.
327        exporter = ExporterBase()
328        writer, outfile = exporter.get_csv_writer()
329        result = exporter.close_outfile(None, outfile)
330        self.assertEqual(result, 'code,title,title_prefix\r\n')
331        return
332
333    def test_close_outfile_real(self):
334        # we can also close outfiles in real files.
335        exporter = ExporterBase()
336        writer, outfile = exporter.get_csv_writer(filepath=self.workfile)
337        result = exporter.close_outfile(self.workfile, outfile)
338        self.assertEqual(result, None)
339        return
340
341
342class CaveExporter(ExporterBase):
343    # A minimal fake exporter suitable to be called by export_jobs
344    fields = ('name', 'dinoports', 'owner', 'taxpayer')
345    title = u'Dummy cave exporter'
346
347    def export_all(self, site, filepath=None):
348        if filepath is None:
349            return SAMPLE_DATA
350        open(filepath, 'wb').write(SAMPLE_DATA)
351        return
352
353class ExportJobTests(unittest.TestCase):
354    # Test asynchronous export functionality (simple cases)
355
356    def setUp(self):
357        # register a suitable ICSVExporter as named utility
358        self.exporter = CaveExporter()
359        self.gsm = getGlobalSiteManager()
360        self.gsm.registerUtility(
361            self.exporter, ICSVExporter, name='cave_exporter')
362
363    def tearDown(self):
364        self.gsm.unregisterUtility(self.exporter)
365
366    def test_export_job_func(self):
367        # the export_job func does really export data...
368        result_path = export_job(None, 'cave_exporter')
369        self.assertTrue(os.path.isfile(result_path))
370        contents = open(result_path, 'rb').read()
371        shutil.rmtree(os.path.dirname(result_path))
372        self.assertEqual(contents, SAMPLE_DATA)
373        return
374
375    def test_export_job_interfaces(self):
376        # the AsyncExportJob implements promised interfaces correctly...
377        job = AsyncExportJob(None, None)
378        verify.verifyClass(IJob, AsyncExportJob)
379        verify.verifyObject(IJob, job)
380        return
381
382
383class FakeJob(object):
384
385    status = COMPLETED
386    result = None
387
388class FakeJobWithResult(FakeJob):
389
390    def __init__(self):
391        dir_path = tempfile.mkdtemp()
392        self.result = os.path.join(dir_path, 'fake.csv')
393        open(self.result, 'wb').write('a fake result')
394        return
395
396class FakeJobManager(object):
397
398    _jobs = dict()
399    _curr_num = 1
400
401    def get(self, job_id):
402        if job_id == '3':
403            return FakeJob()
404        return self._jobs.get(job_id, None)
405
406    def put(self, job):
407        num = str(self._curr_num)
408        self._jobs[num] = job
409        self._curr_num += 1
410        return num
411
412    def remove(self, job_id, site):
413        if job_id in self._jobs:
414            del self._jobs[job_id]
415        return
416
417class ExportJobContainerTests(unittest.TestCase):
418    # Test ExportJobContainer
419
420    def setUp(self):
421        # register a suitable ICSVExporter as named utility
422        self.exporter = CaveExporter()
423        self.job_manager = FakeJobManager()
424        self.gsm = getGlobalSiteManager()
425        self.gsm.registerUtility(
426            self.exporter, ICSVExporter, name='cave_exporter')
427        self.gsm.registerUtility(
428            self.job_manager, IJobManager)
429
430    def tearDown(self):
431        self.gsm.unregisterUtility(self.exporter)
432        self.gsm.unregisterUtility(self.job_manager, IJobManager)
433
434    def test_export_job_interfaces(self):
435        # the ExportJobContainer implements promised interfaces correctly...
436        container = ExportJobContainer()
437        verify.verifyClass(IExportJobContainer, ExportJobContainer)
438        verify.verifyObject(IExportJobContainer, container)
439        return
440
441    def test_start_export_job(self):
442        # we can start jobs
443        container = ExportJobContainer()
444        container.start_export_job('cave_exporter', 'bob')
445        result = self.job_manager._jobs.values()[0]
446        self.assertTrue(IJob.providedBy(result))
447        self.assertEqual(
448            container.running_exports,
449            [('1', 'cave_exporter', 'bob')]
450            )
451        return
452
453    def test_get_running_export_jobs_all(self):
454        # we can get export jobs of all users
455        container = ExportJobContainer()
456        container.start_export_job('cave_exporter', 'bob')
457        container.start_export_job('cave_exporter', 'alice')
458        result = container.get_running_export_jobs()
459        self.assertEqual(
460            result,
461            [('1', 'cave_exporter', 'bob'),
462             ('2', 'cave_exporter', 'alice')]
463            )
464        return
465
466    def test_get_running_export_jobs_user(self):
467        # we can get the export jobs running for a certain user
468        container = ExportJobContainer()
469        container.start_export_job('cave_exporter', 'bob')
470        container.start_export_job('cave_exporter', 'alice')
471        result1 = container.get_running_export_jobs(user_id='alice')
472        result2 = container.get_running_export_jobs(user_id='foo')
473        self.assertEqual(
474            result1, [('2', 'cave_exporter', 'alice')])
475        self.assertEqual(
476            result2, [])
477        return
478
479    def test_get_running_export_jobs_only_if_exist(self):
480        # we get only jobs that are accessible through the job manager...
481        container = ExportJobContainer()
482        container.start_export_job('cave_exporter', 'bob')
483        container.start_export_job('cave_exporter', 'bob')
484        self.assertTrue(
485            ('2', 'cave_exporter', 'bob') in container.running_exports)
486        # we remove the second entry from job manager
487        del self.job_manager._jobs['2']
488        result = container.get_running_export_jobs(user_id='bob')
489        self.assertEqual(
490            result, [('1', 'cave_exporter', 'bob')])
491        self.assertTrue(
492            ('2', 'cave_exporter', 'bob') not in container.running_exports)
493        return
494
495    def test_get_export_job_status(self):
496        # we can get the stati of jobs...
497        container = ExportJobContainer()
498        container.start_export_job('cave_exporter', 'alice')
499        container.start_export_job('cave_exporter', 'bob')
500        container.start_export_job('cave_exporter', 'bob')
501        result = container.get_export_jobs_status(user_id='bob')
502        # we'll get the raw value, a translation and the title of the
503        # exporter
504        self.assertEqual(
505            result,
506            [('new', u'new', u'Dummy cave exporter'),
507             ('completed', u'completed', u'Dummy cave exporter')]
508            )
509        return
510
511    def test_delete_export_entry(self):
512        # we can remove export entries in local lists and the job
513        # manager as well...
514        container = ExportJobContainer()
515        container.start_export_job('cave_exporter', 'bob')
516        entry = container.running_exports[0]
517        container.delete_export_entry(entry)
518        # both, running_exports list and job manager are empty now
519        self.assertEqual(
520            container.running_exports, [])
521        self.assertEqual(
522            self.job_manager._jobs, {})
523        return
524
525    def test_delete_export_entry_remove_file(self):
526        # any result files of exports are deleted as well
527        container = ExportJobContainer()
528        entry = ('4', 'cave_exporter', 'bob')
529        container.running_exports = [entry]
530        fake_job = FakeJobWithResult()
531        self.job_manager._jobs['4'] = fake_job
532        self.assertTrue(os.path.isfile(fake_job.result))
533        container.delete_export_entry(entry)
534        self.assertTrue(not os.path.exists(fake_job.result))
535        return
Note: See TracBrowser for help on using the repository browser.