Ignore:
Timestamp:
10 Feb 2015, 14:13:17 (10 years ago)
Author:
uli
Message:

Block jobs if system load is too high.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • main/waeup.kofa/trunk/src/waeup/kofa/browser/tests/test_async.py

    r12110 r12581  
    33import tempfile
    44import transaction
     5from contextlib import contextmanager
    56from zc.async.testing import wait_for_result
    67from zope.component import getUtility
     
    1011from waeup.kofa.app import University
    1112from waeup.kofa.async import AsyncJob, get_job_id
    12 from waeup.kofa.interfaces import IJobManager
     13from waeup.kofa.interfaces import IJobManager, IKofaUtils
    1314from waeup.kofa.testing import FunctionalTestCase, FunctionalLayer
    1415from waeup.kofa.tests.test_async import FunctionalAsyncTestCase
    1516
     17
    1618def fake_get_export_jobs_status(user_id=None):
    1719    return [('completed', 'Completed', 'my_exporter'),]
    1820
     21
    1922def dummy_func():
    2023    return 42
     24
     25
     26@contextmanager
     27def expensive_actions():
     28    """A contextmanager to work with a modified `expensive_acrtions_allowed()`
     29
     30    This method is part of local IKofaUtils.
     31    """
     32    utils = getUtility(IKofaUtils)
     33    old_func = utils.expensive_actions_allowed
     34    try:
     35        yield utils
     36    finally:
     37        # restore function after use
     38        utils.expensive_actions_allowed = old_func
     39
    2140
    2241class AsyncBrowserTests(FunctionalAsyncTestCase):
     
    92111        return
    93112
     113
    94114class JobOverviewTests(AsyncBrowserTests):
    95115    # Tests for the job overview page of universities
     
    106126        manager.put(job, site=self.getRootFolder())
    107127        return job
     128
     129    def test_job_high_load(self):
     130        # under high load, jobs won't start (None is returned then)
     131        manager = getUtility(IJobManager)
     132        job = AsyncJob(dummy_func)
     133        site = self.getRootFolder()
     134        with expensive_actions() as utils:
     135            utils.expensive_actions_allowed = lambda : False
     136            result = manager.put(job, site=self.getRootFolder())
     137        self.assertTrue(result is None)
    108138
    109139    def test_job_get(self):
Note: See TracChangeset for help on using the changeset viewer.