Changeset 9094


Ignore:
Timestamp:
8 Aug 2012, 22:57:01 (12 years ago)
Author:
uli
Message:

Add tests for export job containers.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • main/waeup.kofa/branches/uli-zc-async/src/waeup/kofa/utils/tests/test_batching.py

    r9092 r9094  
    2323import tempfile
    2424import unittest
    25 from zc.async.interfaces import IJob
     25from zc.async.interfaces import IJob, COMPLETED
    2626from zope import schema
    2727from zope.component import provideUtility, getGlobalSiteManager
     
    3131from zope.interface import Interface, implements, verify
    3232from waeup.kofa.app import University
    33 from waeup.kofa.interfaces import ICSVExporter, IBatchProcessor
     33from waeup.kofa.interfaces import (
     34    ICSVExporter, IBatchProcessor, IExportJobContainer, IJobManager)
    3435from waeup.kofa.testing import FunctionalLayer, FunctionalTestCase
    3536from waeup.kofa.utils.batching import (
    36     ExporterBase, BatchProcessor, export_job, AsyncExportJob)
     37    ExporterBase, BatchProcessor, export_job, AsyncExportJob,
     38    ExportJobContainer)
    3739
    3840optionflags = (
     
    341343    # A minimal fake exporter suitable to be called by export_jobs
    342344    fields = ('name', 'dinoports', 'owner', 'taxpayer')
     345    title = u'Dummy cave exporter'
    343346
    344347    def export_all(self, site, filepath=None):
     
    376379        verify.verifyObject(IJob, job)
    377380        return
     381
     382
     383class FakeJob(object):
     384
     385    status = COMPLETED
     386    result = None
     387
     388class FakeJobWithResult(FakeJob):
     389
     390    def __init__(self):
     391        dir_path = tempfile.mkdtemp()
     392        self.result = os.path.join(dir_path, 'fake.csv')
     393        open(self.result, 'wb').write('a fake result')
     394        return
     395
     396class FakeJobManager(object):
     397
     398    _jobs = dict()
     399    _curr_num = 1
     400
     401    def get(self, job_id):
     402        if job_id == '3':
     403            return FakeJob()
     404        return self._jobs.get(job_id, None)
     405
     406    def put(self, job):
     407        num = str(self._curr_num)
     408        self._jobs[num] = job
     409        self._curr_num += 1
     410        return num
     411
     412    def remove(self, job_id, site):
     413        if job_id in self._jobs:
     414            del self._jobs[job_id]
     415        return
     416
     417class ExportJobContainerTests(unittest.TestCase):
     418    # Test ExportJobContainer
     419
     420    def setUp(self):
     421        # register a suitable ICSVExporter as named utility
     422        self.exporter = CaveExporter()
     423        self.job_manager = FakeJobManager()
     424        self.gsm = getGlobalSiteManager()
     425        self.gsm.registerUtility(
     426            self.exporter, ICSVExporter, name='cave_exporter')
     427        self.gsm.registerUtility(
     428            self.job_manager, IJobManager)
     429
     430    def tearDown(self):
     431        self.gsm.unregisterUtility(self.exporter)
     432        self.gsm.unregisterUtility(self.job_manager, IJobManager)
     433
     434    def test_export_job_interfaces(self):
     435        # the ExportJobContainer implements promised interfaces correctly...
     436        container = ExportJobContainer()
     437        verify.verifyClass(IExportJobContainer, ExportJobContainer)
     438        verify.verifyObject(IExportJobContainer, container)
     439        return
     440
     441    def test_start_export_job(self):
     442        # we can start jobs
     443        container = ExportJobContainer()
     444        container.start_export_job('cave_exporter', 'bob')
     445        result = self.job_manager._jobs.values()[0]
     446        self.assertTrue(IJob.providedBy(result))
     447        self.assertEqual(
     448            container.running_exports,
     449            [('1', 'cave_exporter', 'bob')]
     450            )
     451        return
     452
     453    def test_get_running_export_jobs_all(self):
     454        # we can get export jobs of all users
     455        container = ExportJobContainer()
     456        container.start_export_job('cave_exporter', 'bob')
     457        container.start_export_job('cave_exporter', 'alice')
     458        result = container.get_running_export_jobs()
     459        self.assertEqual(
     460            result,
     461            [('1', 'cave_exporter', 'bob'),
     462             ('2', 'cave_exporter', 'alice')]
     463            )
     464        return
     465
     466    def test_get_running_export_jobs_user(self):
     467        # we can get the export jobs running for a certain user
     468        container = ExportJobContainer()
     469        container.start_export_job('cave_exporter', 'bob')
     470        container.start_export_job('cave_exporter', 'alice')
     471        result1 = container.get_running_export_jobs(user_id='alice')
     472        result2 = container.get_running_export_jobs(user_id='foo')
     473        self.assertEqual(
     474            result1, [('2', 'cave_exporter', 'alice')])
     475        self.assertEqual(
     476            result2, [])
     477        return
     478
     479    def test_get_running_export_jobs_only_if_exist(self):
     480        # we get only jobs that are accessible through the job manager...
     481        container = ExportJobContainer()
     482        container.start_export_job('cave_exporter', 'bob')
     483        container.start_export_job('cave_exporter', 'bob')
     484        self.assertTrue(
     485            ('2', 'cave_exporter', 'bob') in container.running_exports)
     486        # we remove the second entry from job manager
     487        del self.job_manager._jobs['2']
     488        result = container.get_running_export_jobs(user_id='bob')
     489        self.assertEqual(
     490            result, [('1', 'cave_exporter', 'bob')])
     491        self.assertTrue(
     492            ('2', 'cave_exporter', 'bob') not in container.running_exports)
     493        return
     494
     495    def test_get_export_job_status(self):
     496        # we can get the stati of jobs...
     497        container = ExportJobContainer()
     498        container.start_export_job('cave_exporter', 'alice')
     499        container.start_export_job('cave_exporter', 'bob')
     500        container.start_export_job('cave_exporter', 'bob')
     501        result = container.get_export_jobs_status(user_id='bob')
     502        # we'll get the raw value, a translation and the title of the
     503        # exporter
     504        self.assertEqual(
     505            result,
     506            [('new', u'new', u'Dummy cave exporter'),
     507             ('completed', u'completed', u'Dummy cave exporter')]
     508            )
     509        return
     510
     511    def test_delete_export_entry(self):
     512        # we can remove export entries in local lists and the job
     513        # manager as well...
     514        container = ExportJobContainer()
     515        container.start_export_job('cave_exporter', 'bob')
     516        entry = container.running_exports[0]
     517        container.delete_export_entry(entry)
     518        # both, running_exports list and job manager are empty now
     519        self.assertEqual(
     520            container.running_exports, [])
     521        self.assertEqual(
     522            self.job_manager._jobs, {})
     523        return
     524
     525    def test_delete_export_entry_remove_file(self):
     526        # any result files of exports are deleted as well
     527        container = ExportJobContainer()
     528        entry = ('4', 'cave_exporter', 'bob')
     529        container.running_exports = [entry]
     530        fake_job = FakeJobWithResult()
     531        self.job_manager._jobs['4'] = fake_job
     532        self.assertTrue(os.path.isfile(fake_job.result))
     533        container.delete_export_entry(entry)
     534        self.assertTrue(not os.path.exists(fake_job.result))
     535        return
Note: See TracChangeset for help on using the changeset viewer.