source: main/waeup.kofa/trunk/src/waeup/kofa/utils/tests/test_batching.py @ 9271

Last change on this file since 9271 was 9217, checked in by uli, 12 years ago

Merge changes from uli-async-update back into trunk.

  • Property svn:keywords set to Id
File size: 19.1 KB
Line 
1## $Id: test_batching.py 9217 2012-09-21 11:21:05Z uli $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
18import datetime
19import doctest
20import logging
21import os
22import shutil
23import tempfile
24import unittest
25from zc.async.interfaces import IJob, COMPLETED
26from zope import schema
27from zope.component import provideUtility, getGlobalSiteManager
28from zope.component.factory import Factory
29from zope.component.hooks import clearSite
30from zope.component.interfaces import IFactory
31from zope.interface import Interface, implements, verify
32from waeup.kofa.app import University
33from waeup.kofa.interfaces import (
34    ICSVExporter, IBatchProcessor, IExportJobContainer, IJobManager,
35    IExportJob)
36from waeup.kofa.testing import FunctionalLayer, FunctionalTestCase
37from waeup.kofa.utils.batching import (
38    ExporterBase, BatchProcessor, export_job, AsyncExportJob,
39    ExportJobContainer)
40
41optionflags = (
42    doctest.REPORT_NDIFF + doctest.ELLIPSIS + doctest.NORMALIZE_WHITESPACE)
43
44
45class ICave(Interface):
46    """A cave."""
47    id_num = schema.TextLine(
48        title = u'internal id',
49        default = u'default',
50        required = True,
51        readonly = True,
52        )
53    name = schema.TextLine(
54        title = u'Cave name',
55        default = u'Unnamed',
56        required = True)
57    dinoports = schema.Int(
58        title = u'Number of DinoPorts (tm)',
59        required = False,
60        default = 1)
61    owner = schema.TextLine(
62        title = u'Owner name',
63        required = True,
64        missing_value = 'Fred Estates Inc.')
65    taxpayer = schema.Bool(
66        title = u'Payes taxes',
67        required = True,
68        default = False)
69
70class Cave(object):
71    implements(ICave)
72    def __init__(self, name=u'Unnamed', dinoports=2,
73                 owner='Fred Estates Inc.', taxpayer=False):
74        self.name = name
75        self.dinoports = 2
76        self.owner = owner
77        self.taxpayer = taxpayer
78
79stoneville = dict
80
81SAMPLE_DATA = """name,dinoports,owner,taxpayer
82Barneys Home,2,Barney,1
83Wilmas Asylum,1,Wilma,1
84Freds Dinoburgers,10,Fred,0
85Joeys Drive-in,110,Joey,0
86"""
87
88class CaveProcessor(BatchProcessor):
89    util_name = 'caveprocessor'
90    name = 'Cave Processor'
91    iface = ICave
92    location_fields = ['name']
93    factory_name = 'Lovely Cave'
94
95    def parentsExist(self, row, site):
96        return True
97
98    def getParent(self, row, site):
99        return stoneville
100
101    def entryExists(self, row, site):
102        return row['name'] in stoneville.keys()
103
104    def getEntry(self, row, site):
105        if not self.entryExists(row, site):
106            return None
107        return stoneville[row['name']]
108
109    def delEntry(self, row, site):
110        del stoneville[row['name']]
111
112    def addEntry(self, obj, row, site):
113        stoneville[row['name']] = obj
114
115class BatchProcessorTests(FunctionalTestCase):
116
117    layer = FunctionalLayer
118
119    def setupLogger(self):
120
121        self.logger = logging.getLogger('stoneville')
122        self.logger.setLevel(logging.DEBUG)
123        self.logger.propagate = False
124        self.logfile = os.path.join(self.workdir, 'stoneville.log')
125        self.handler = logging.FileHandler(self.logfile, 'w')
126        self.logger.addHandler(self.handler)
127
128    def setUp(self):
129        global stoneville
130        super(BatchProcessorTests, self).setUp()
131
132        # Setup a sample site for each test
133        app = University()
134        self.dc_root = tempfile.mkdtemp()
135        app['datacenter'].setStoragePath(self.dc_root)
136
137        # Prepopulate the ZODB...
138        self.getRootFolder()['app'] = app
139        self.app = self.getRootFolder()['app']
140
141        self.workdir = tempfile.mkdtemp()
142        factory = Factory(Cave)
143        provideUtility(factory, IFactory, 'Lovely Cave')
144
145        # Provide sample data
146        self.newcomers_csv = os.path.join(self.workdir, 'newcomers.csv')
147        open(self.newcomers_csv, 'wb').write(SAMPLE_DATA)
148        self.setupLogger()
149        self.stoneville = stoneville
150        stoneville = dict()
151        self.resultpath = None
152        return
153
154    def tearDown(self):
155        super(BatchProcessorTests, self).tearDown()
156        shutil.rmtree(self.workdir)
157        shutil.rmtree(self.dc_root)
158        self.logger.removeHandler(self.handler)
159        clearSite()
160        if not isinstance(self.resultpath, list):
161            self.resultpath = [self.resultpath]
162        for path in self.resultpath:
163            if not isinstance(path, basestring):
164                continue
165            if not os.path.isdir(path):
166                path = os.path.dirname(path)
167            if os.path.exists(path):
168                shutil.rmtree(path)
169        return
170
171    def test_iface(self):
172        # make sure we fullfill interface contracts
173        obj = BatchProcessor()
174        verify.verifyClass(IBatchProcessor, BatchProcessor)
175        verify.verifyObject(IBatchProcessor, obj)
176        return
177
178    def test_import(self):
179        processor = CaveProcessor()
180        result = processor.doImport(
181            self.newcomers_csv,
182            ['name', 'dinoports', 'owner', 'taxpayer'],
183            mode='create', user='Bob', logger=self.logger)
184        num_succ, num_fail, finished_path, failed_path = result
185        self.resultpath = [finished_path, failed_path]
186        assert num_succ == 4
187        assert num_fail == 0
188        assert finished_path.endswith('/newcomers.finished.csv')
189        assert failed_path is None
190
191    def test_import_stoneville(self):
192        processor = CaveProcessor()
193        result = processor.doImport(
194            self.newcomers_csv,
195            ['name', 'dinoports', 'owner', 'taxpayer'],
196            mode='create', user='Bob', logger=self.logger)
197        num_succ, num_fail, finished_path, failed_path = result
198        self.resultpath = [finished_path, failed_path]
199        assert len(self.stoneville) == 4
200        self.assertEqual(
201            sorted(self.stoneville.keys()),
202            [u'Barneys Home', u'Freds Dinoburgers',
203             u'Joeys Drive-in', u'Wilmas Asylum'])
204
205    def test_import_correct_type(self):
206        processor = CaveProcessor()
207        result = processor.doImport(
208            self.newcomers_csv,
209            ['name', 'dinoports', 'owner', 'taxpayer'],
210            mode='create', user='Bob', logger=self.logger)
211        num_succ, num_fail, finished_path, failed_path = result
212        self.resultpath = [finished_path, failed_path]
213        assert isinstance(self.stoneville['Barneys Home'].dinoports, int)
214
215
216    def test_log(self):
217        """
218           >>> print log_contents
219           --------------------
220           Bob: Batch processing finished: OK
221           Bob: Source: /.../newcomers.csv
222           Bob: Mode: create
223           Bob: User: Bob
224           Bob: Processing time: ... s (... s/item)
225           Bob: Processed: 4 lines (4 successful/ 0 failed)
226           --------------------
227
228        """
229        processor = CaveProcessor()
230        result = processor.doImport(
231            self.newcomers_csv,
232            ['name', 'dinoports', 'owner', 'taxpayer'],
233            mode='create', user='Bob', logger=self.logger)
234        num_succ, num_fail, finished_path, failed_path = result
235        self.resultpath = [finished_path, failed_path]
236        log_contents = open(self.logfile, 'rb').read()
237        doctest.run_docstring_examples(
238            self.test_log, locals(), False, 'test_log', None, optionflags)
239        return
240
241class ExporterBaseTests(unittest.TestCase):
242
243    def setUp(self):
244        self.workdir = tempfile.mkdtemp()
245        self.workfile = os.path.join(self.workdir, 'testfile.csv')
246        return
247
248    def tearDown(self):
249        shutil.rmtree(self.workdir)
250        return
251
252    def test_iface(self):
253        # ExporterBase really implements the promised interface.
254        obj = ExporterBase()
255        verify.verifyClass(ICSVExporter, ExporterBase)
256        verify.verifyObject(ICSVExporter, obj)
257        return
258
259    def test_unimplemented(self):
260        # make sure the not implemented methods signal that.
261        exporter = ExporterBase()
262        self.assertRaises(NotImplementedError, exporter.export_all, None)
263        self.assertRaises(NotImplementedError, exporter.export, None)
264        return
265
266    def test_mangle_value(self):
267        # some basic types are mangled correctly
268        exporter = ExporterBase()
269        result1 = exporter.mangle_value(True, 'foo')
270        result2 = exporter.mangle_value(False, 'foo')
271        result3 = exporter.mangle_value('string', 'foo')
272        result4 = exporter.mangle_value(u'string', 'foo')
273        result5 = exporter.mangle_value(None, 'foo')
274        result6 = exporter.mangle_value(datetime.date(2012, 4, 1), 'foo')
275        result7 = exporter.mangle_value(
276            datetime.datetime(2012, 4, 1, 12, 1, 1), 'foo')
277        self.assertEqual(
278            (result1, result2, result3, result4, result5),
279            ('1', '0', u'string', u'string', ''))
280        self.assertEqual(type(result3), type('string'))
281        self.assertEqual(type(result4), type('string'))
282        # dates are formatted with trailing hash
283        self.assertEqual(result6, '2012-04-01#')
284        # datetimes are formatted as yyyy-mm-dd hh:mm:ss
285        self.assertEqual(result7, '2012-04-01 12:01:01')
286        return
287
288    def test_get_csv_writer(self):
289        # we can get a CSV writer to a memory file
290        exporter = ExporterBase()
291        writer, outfile = exporter.get_csv_writer()
292        writer.writerow(dict(code='A', title='B', title_prefix='C'))
293        outfile.seek(0)
294        self.assertEqual(
295            outfile.read(),
296            'code,title,title_prefix\r\nA,B,C\r\n')
297        return
298
299    def test_get_csv_writer_with_file(self):
300        # we can get CSV writer that writes to a real file
301        exporter = ExporterBase()
302        writer, outfile = exporter.get_csv_writer(filepath=self.workfile)
303        writer.writerow(dict(code='A', title='B', title_prefix='C'))
304        outfile.close()
305        resultfile = open(self.workfile, 'rb')
306        self.assertEqual(
307            resultfile.read(),
308            'code,title,title_prefix\r\nA,B,C\r\n')
309        return
310
311    def test_write_item(self):
312        # we can write items to opened exporter files.
313        exporter = ExporterBase()
314        writer, outfile = exporter.get_csv_writer()
315        class Sample(object):
316            code = 'A'
317            title = u'B'
318            title_prefix = True
319        exporter.write_item(Sample(), writer)
320        outfile.seek(0)
321        self.assertEqual(
322            outfile.read(),
323            'code,title,title_prefix\r\nA,B,1\r\n')
324        return
325
326    def test_close_outfile(self):
327        # exporters can help to close outfiles.
328        exporter = ExporterBase()
329        writer, outfile = exporter.get_csv_writer()
330        result = exporter.close_outfile(None, outfile)
331        self.assertEqual(result, 'code,title,title_prefix\r\n')
332        return
333
334    def test_close_outfile_real(self):
335        # we can also close outfiles in real files.
336        exporter = ExporterBase()
337        writer, outfile = exporter.get_csv_writer(filepath=self.workfile)
338        result = exporter.close_outfile(self.workfile, outfile)
339        self.assertEqual(result, None)
340        return
341
342
343class CaveExporter(ExporterBase):
344    # A minimal fake exporter suitable to be called by export_jobs
345    fields = ('name', 'dinoports', 'owner', 'taxpayer')
346    title = u'Dummy cave exporter'
347
348    def export_all(self, site, filepath=None):
349        if filepath is None:
350            return SAMPLE_DATA
351        open(filepath, 'wb').write(SAMPLE_DATA)
352        return
353
354class ExportJobTests(unittest.TestCase):
355    # Test asynchronous export functionality (simple cases)
356
357    def setUp(self):
358        # register a suitable ICSVExporter as named utility
359        self.exporter = CaveExporter()
360        self.gsm = getGlobalSiteManager()
361        self.gsm.registerUtility(
362            self.exporter, ICSVExporter, name='cave_exporter')
363
364    def tearDown(self):
365        self.gsm.unregisterUtility(self.exporter)
366
367    def test_export_job_func(self):
368        # the export_job func does really export data...
369        result_path = export_job(None, 'cave_exporter')
370        self.assertTrue(os.path.isfile(result_path))
371        contents = open(result_path, 'rb').read()
372        shutil.rmtree(os.path.dirname(result_path))
373        self.assertEqual(contents, SAMPLE_DATA)
374        return
375
376    def test_export_job_interfaces(self):
377        # the AsyncExportJob implements promised interfaces correctly...
378        job = AsyncExportJob(None, None)
379        verify.verifyClass(IJob, AsyncExportJob)
380        verify.verifyObject(IJob, job)
381        verify.verifyClass(IExportJob, AsyncExportJob)
382        verify.verifyObject(IExportJob, job)
383        return
384
385
386class FakeJob(object):
387
388    status = COMPLETED
389    result = None
390
391class FakeJobWithResult(FakeJob):
392
393    def __init__(self):
394        dir_path = tempfile.mkdtemp()
395        self.result = os.path.join(dir_path, 'fake.csv')
396        open(self.result, 'wb').write('a fake result')
397        return
398
399class FakeJobManager(object):
400
401    _jobs = dict()
402    _curr_num = 1
403
404    def get(self, job_id):
405        if job_id == '3':
406            return FakeJob()
407        return self._jobs.get(job_id, None)
408
409    def put(self, job):
410        num = str(self._curr_num)
411        self._jobs[num] = job
412        self._curr_num += 1
413        return num
414
415    def remove(self, job_id, site):
416        if job_id in self._jobs:
417            del self._jobs[job_id]
418        return
419
420class ExportJobContainerTests(unittest.TestCase):
421    # Test ExportJobContainer
422
423    def setUp(self):
424        # register a suitable ICSVExporter as named utility
425        self.exporter = CaveExporter()
426        self.job_manager = FakeJobManager()
427        self.gsm = getGlobalSiteManager()
428        self.gsm.registerUtility(
429            self.exporter, ICSVExporter, name='cave_exporter')
430        self.gsm.registerUtility(
431            self.job_manager, IJobManager)
432
433    def tearDown(self):
434        self.gsm.unregisterUtility(self.exporter)
435        self.gsm.unregisterUtility(self.job_manager, IJobManager)
436
437    def test_export_job_interfaces(self):
438        # the ExportJobContainer implements promised interfaces correctly...
439        container = ExportJobContainer()
440        verify.verifyClass(IExportJobContainer, ExportJobContainer)
441        verify.verifyObject(IExportJobContainer, container)
442        return
443
444    def test_start_export_job(self):
445        # we can start jobs
446        container = ExportJobContainer()
447        container.start_export_job('cave_exporter', 'bob')
448        result = self.job_manager._jobs.values()[0]
449        self.assertTrue(IJob.providedBy(result))
450        self.assertEqual(
451            container.running_exports,
452            [('1', 'cave_exporter', 'bob')]
453            )
454        return
455
456    def test_get_running_export_jobs_all(self):
457        # we can get export jobs of all users
458        container = ExportJobContainer()
459        container.start_export_job('cave_exporter', 'bob')
460        container.start_export_job('cave_exporter', 'alice')
461        result = container.get_running_export_jobs()
462        self.assertEqual(
463            result,
464            [('1', 'cave_exporter', 'bob'),
465             ('2', 'cave_exporter', 'alice')]
466            )
467        return
468
469    def test_get_running_export_jobs_user(self):
470        # we can get the export jobs running for a certain user
471        container = ExportJobContainer()
472        container.start_export_job('cave_exporter', 'bob')
473        container.start_export_job('cave_exporter', 'alice')
474        result1 = container.get_running_export_jobs(user_id='alice')
475        result2 = container.get_running_export_jobs(user_id='foo')
476        self.assertEqual(
477            result1, [('2', 'cave_exporter', 'alice')])
478        self.assertEqual(
479            result2, [])
480        return
481
482    def test_get_running_export_jobs_only_if_exist(self):
483        # we get only jobs that are accessible through the job manager...
484        container = ExportJobContainer()
485        container.start_export_job('cave_exporter', 'bob')
486        container.start_export_job('cave_exporter', 'bob')
487        self.assertTrue(
488            ('2', 'cave_exporter', 'bob') in container.running_exports)
489        # we remove the second entry from job manager
490        del self.job_manager._jobs['2']
491        result = container.get_running_export_jobs(user_id='bob')
492        self.assertEqual(
493            result, [('1', 'cave_exporter', 'bob')])
494        self.assertTrue(
495            ('2', 'cave_exporter', 'bob') not in container.running_exports)
496        return
497
498    def test_get_export_job_status(self):
499        # we can get the stati of jobs...
500        container = ExportJobContainer()
501        container.start_export_job('cave_exporter', 'alice')
502        container.start_export_job('cave_exporter', 'bob')
503        container.start_export_job('cave_exporter', 'bob')
504        result = container.get_export_jobs_status(user_id='bob')
505        # we'll get the raw value, a translation and the title of the
506        # exporter
507        self.assertEqual(
508            result,
509            [('new', u'new', u'Dummy cave exporter'),
510             ('completed', u'completed', u'Dummy cave exporter')]
511            )
512        return
513
514    def test_delete_export_entry(self):
515        # we can remove export entries in local lists and the job
516        # manager as well...
517        container = ExportJobContainer()
518        container.start_export_job('cave_exporter', 'bob')
519        entry = container.running_exports[0]
520        container.delete_export_entry(entry)
521        # both, running_exports list and job manager are empty now
522        self.assertEqual(
523            container.running_exports, [])
524        self.assertEqual(
525            self.job_manager._jobs, {})
526        return
527
528    def test_delete_export_entry_remove_file(self):
529        # any result files of exports are deleted as well
530        container = ExportJobContainer()
531        entry = ('4', 'cave_exporter', 'bob')
532        container.running_exports = [entry]
533        fake_job = FakeJobWithResult()
534        self.job_manager._jobs['4'] = fake_job
535        self.assertTrue(os.path.isfile(fake_job.result))
536        container.delete_export_entry(entry)
537        self.assertTrue(not os.path.exists(fake_job.result))
538        return
539
540    def test_entry_from_job_id(self):
541        # we can get an entry for a job_id if the id exists
542        container = ExportJobContainer()
543        entry = ('4', 'cave_exporter', 'bob')
544        container.running_exports = [entry]
545        fake_job = FakeJobWithResult()
546        self.job_manager._jobs['4'] = fake_job
547        result1 = container.entry_from_job_id(None)
548        result2 = container.entry_from_job_id('4')
549        result3 = container.entry_from_job_id('23')
550        self.assertEqual(result1, None)
551        self.assertEqual(result2, ('4', 'cave_exporter', 'bob'))
552        self.assertEqual(result3, None)
553        return
Note: See TracBrowser for help on using the repository browser.