Changeset 13802 for main/waeup.ikoba/trunk/src/waeup
- Timestamp:
- 5 Apr 2016, 21:04:39 (9 years ago)
- Location:
- main/waeup.ikoba/trunk/src/waeup/ikoba
- Files:
-
- 4 edited
Legend:
- Unmodified
- Added
- Removed
-
main/waeup.ikoba/trunk/src/waeup/ikoba/browser/pages.py
r13066 r13802 232 232 return entries 233 233 234 def running_jobs(view): 235 """True if any job is running. 236 """ 237 job_entries = view.context.get_running_export_jobs() 238 job_manager = getUtility(IJobManager) 239 entries = [] 240 for entry in job_entries: 241 job = job_manager.get(entry[0]) 242 status = job.finished and 'ready' or 'running' 243 status = job.failed and 'FAILED' or status 244 if status == 'running': 245 return True 246 return False 247 234 248 class LocalRoleAssignmentUtilityView(object): 235 249 """A view mixin with useful methods for local role assignment. … … 1467 1481 return 1468 1482 if CREATE: 1483 utils = queryUtility(IIkobaUtils) 1484 if utils.export_disabled_message(): 1485 self.flash(utils.export_disabled_message(), type='danger') 1486 self.entries = doll_up(self, user=None) 1487 return 1488 if running_jobs(self): 1489 self.flash(_( 1490 "Sorry, another export job is running. " 1491 "Please try again later."), type='danger') 1492 self.entries = doll_up(self, user=None) 1493 return 1469 1494 job_id = self.context.start_export_job( 1470 1495 exporter, self.request.principal.id) -
main/waeup.ikoba/trunk/src/waeup/ikoba/browser/tests/test_browser.py
r12842 r13802 69 69 70 70 71 class DataCenterUITests(CompanySetup ):71 class DataCenterUITests(CompanySetup, FunctionalAsyncTestCase): 72 72 # Tests for DataCenter class views and pages 73 73 74 74 layer = FunctionalLayer 75 76 def wait_for_export_job_completed(self): 77 # helper function waiting until the current export job is completed 78 manager = getUtility(IJobManager) 79 job_id = self.app['datacenter'].running_exports[0][0] 80 job = manager.get(job_id) 81 wait_for_result(job) 82 return job_id 83 84 def cleanup_run_job(self): 85 # helper to remove any result file of an export 86 job_id = self.wait_for_export_job_completed() 87 manager = getUtility(IJobManager) 88 job = manager.get(job_id) 89 if os.path.exists(job.result): 90 shutil.rmtree(os.path.dirname(job.result)) 75 91 76 92 def test_user_data_import_permission(self): … … 96 112 self.assertTrue('User Processor' in self.browser.contents) 97 113 return 114 115 def test_export_disabled(self): 116 self.app['configuration'].export_disabled_message = u'Export forbidden' 117 self.browser.addHeader('Authorization', 'Basic mgr:mgrpw') 118 self.browser.open(self.datacenter_path) 119 self.assertEqual(self.browser.headers['Status'], '200 Ok') 120 self.assertEqual(self.browser.url, self.datacenter_path) 121 self.browser.getLink("Export data").click() 122 self.browser.getControl(name="exporter").value = ['products'] 123 self.browser.getControl("Create CSV file").click() 124 self.assertEqual(self.browser.headers['Status'], '200 Ok') 125 self.assertTrue('div class="alert alert-danger">Export forbidden</div>' 126 in self.browser.contents) 127 return 128 129 def test_export_start(self): 130 # we can trigger export file creation 131 self.browser.addHeader('Authorization', 'Basic mgr:mgrpw') 132 self.browser.open(self.datacenter_path) 133 self.assertEqual(self.browser.headers['Status'], '200 Ok') 134 self.assertEqual(self.browser.url, self.datacenter_path) 135 self.browser.getLink("Export data").click() 136 self.browser.getControl(name="exporter").value = ['products'] 137 self.browser.getControl("Create CSV file").click() 138 # we can trigger export file creation only once 139 self.browser.getControl(name="exporter").value = ['pdfdocuments'] 140 self.browser.getControl("Create CSV file").click() 141 self.assertTrue('Sorry, another export job is running.' 142 in self.browser.contents) 143 self.wait_for_export_job_completed() 144 self.assertEqual(len(self.app['datacenter'].running_exports), 1) 145 self.browser.getControl(name="exporter").value = ['pdfdocuments'] 146 self.browser.getControl("Create CSV file").click() 147 self.assertEqual(self.browser.headers['Status'], '200 Ok') 148 self.assertEqual(len(self.app['datacenter'].running_exports), 2) 149 # The second job is running 150 self.assertTrue('<img src="/static/img/actionicon_reload.png" />' 151 in self.browser.contents) 152 self.cleanup_run_job() # make sure to remove temp dirs 153 # Also the second job must be cleaned up 154 manager = getUtility(IJobManager) 155 job_id = self.app['datacenter'].running_exports[1][0] 156 job = manager.get(job_id) 157 wait_for_result(job) 158 if os.path.exists(job.result): 159 shutil.rmtree(os.path.dirname(job.result)) 160 return 161 -
main/waeup.ikoba/trunk/src/waeup/ikoba/interfaces.py
r12839 r13802 625 625 ) 626 626 627 export_disabled_message = schema.Text( 628 title = _(u'Export disabled message'), 629 description = _(u'This message will show up if an officer tries ' 630 'to export data. Setting this field automatically ' 631 'disables all exporters.'), 632 required = False, 633 ) 634 627 635 628 636 class IDataCenter(IIkobaObject): -
main/waeup.ikoba/trunk/src/waeup/ikoba/utils/utils.py
r12777 r13802 338 338 return False 339 339 return True 340 341 def export_disabled_message(self): 342 export_disabled_message = grok.getSite()[ 343 'configuration'].export_disabled_message 344 if export_disabled_message: 345 return export_disabled_message 346 return None
Note: See TracChangeset for help on using the changeset viewer.