Changeset 12581 for main/waeup.kofa
- Timestamp:
- 10 Feb 2015, 14:13:17 (10 years ago)
- Location:
- main/waeup.kofa/trunk/src/waeup/kofa
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
main/waeup.kofa/trunk/src/waeup/kofa/async.py
r12110 r12581 29 29 from zc.async.interfaces import IJob, ObjectAdded 30 30 from ZODB.utils import u64 31 from zope.component import getUtility, adapter 31 32 from zope.component.hooks import getSite 32 33 from zope.event import notify 33 34 from waeup.kofa.interfaces import ( 34 IJobManager, IProgressable, IJobContainer, WAEUP_KEY, 35 IJobManager, IProgressable, IJobContainer, WAEUP_KEY, IKofaUtils, 35 36 ) 37 36 38 37 39 def compute(num, duration=60): … … 144 146 """ 145 147 site = self._get_site(site) 148 if not getUtility(IKofaUtils).expensive_actions_allowed(): 149 return None 146 150 container = self._get_jobs_container(site) 147 151 transaction.begin() … … 234 238 # This IDatabaseOpenedEvent will be from zope.app.appsetup if that 235 239 # package is around 236 zope.component.adapter(zc.async.interfaces.IDatabaseOpenedEvent)(self)240 adapter(zc.async.interfaces.IDatabaseOpenedEvent)(self) 237 241 self.db_name = db_name 238 242 return -
main/waeup.kofa/trunk/src/waeup/kofa/browser/tests/test_async.py
r12110 r12581 3 3 import tempfile 4 4 import transaction 5 from contextlib import contextmanager 5 6 from zc.async.testing import wait_for_result 6 7 from zope.component import getUtility … … 10 11 from waeup.kofa.app import University 11 12 from waeup.kofa.async import AsyncJob, get_job_id 12 from waeup.kofa.interfaces import IJobManager 13 from waeup.kofa.interfaces import IJobManager, IKofaUtils 13 14 from waeup.kofa.testing import FunctionalTestCase, FunctionalLayer 14 15 from waeup.kofa.tests.test_async import FunctionalAsyncTestCase 15 16 17 16 18 def fake_get_export_jobs_status(user_id=None): 17 19 return [('completed', 'Completed', 'my_exporter'),] 18 20 21 19 22 def dummy_func(): 20 23 return 42 24 25 26 @contextmanager 27 def expensive_actions(): 28 """A contextmanager to work with a modified `expensive_acrtions_allowed()` 29 30 This method is part of local IKofaUtils. 31 """ 32 utils = getUtility(IKofaUtils) 33 old_func = utils.expensive_actions_allowed 34 try: 35 yield utils 36 finally: 37 # restore function after use 38 utils.expensive_actions_allowed = old_func 39 21 40 22 41 class AsyncBrowserTests(FunctionalAsyncTestCase): … … 92 111 return 93 112 113 94 114 class JobOverviewTests(AsyncBrowserTests): 95 115 # Tests for the job overview page of universities … … 106 126 manager.put(job, site=self.getRootFolder()) 107 127 return job 128 129 def test_job_high_load(self): 130 # under high load, jobs won't start (None is returned then) 131 manager = getUtility(IJobManager) 132 job = AsyncJob(dummy_func) 133 site = self.getRootFolder() 134 with expensive_actions() as utils: 135 utils.expensive_actions_allowed = lambda : False 136 result = manager.put(job, site=self.getRootFolder()) 137 self.assertTrue(result is None) 108 138 109 139 def test_job_get(self):
Note: See TracChangeset for help on using the changeset viewer.