Changeset 9082
- Timestamp:
- 5 Aug 2012, 13:29:12 (12 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
main/waeup.kofa/branches/uli-zc-async/src/waeup/kofa/async.py
r8864 r9082 1 import datetime 1 ## $Id$ 2 ## 3 ## Copyright (C) 2012 Uli Fouquet & Henrik Bettermann 4 ## This program is free software; you can redistribute it and/or modify 5 ## it under the terms of the GNU General Public License as published by 6 ## the Free Software Foundation; either version 2 of the License, or 7 ## (at your option) any later version. 8 ## 9 ## This program is distributed in the hope that it will be useful, 10 ## but WITHOUT ANY WARRANTY; without even the implied warranty of 11 ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 12 ## GNU General Public License for more details. 13 ## 14 ## You should have received a copy of the GNU General Public License 15 ## along with this program; if not, write to the Free Software 16 ## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 17 ## 18 """Components for asynchronous job (task) handling, mainly based on 19 :mod:`zc.async`. 20 """ 2 21 import grok 3 import os4 22 import time 23 import transaction 5 24 import zc.async.dispatcher 6 from cgi import escape 7 from pytz import utc 8 from zc.async.interfaces import IQueue 25 import zc.async.utils 26 import zope.component 9 27 from zc.async.job import Job as AsyncJob 10 28 from zc.async.queue import getDefaultQueue 11 from waeup.kofa.interfaces import IUniversity 12 from hurry.jqueryui import jqueryui, base, blitzer, ui_lightness, humanity 13 from hurry.jquery import jquery 14 from zc.async.interfaces import COMPLETED, NEW, PENDING, ACTIVE, ASSIGNED 15 29 from zc.async.interfaces import IJob, ObjectAdded 30 from ZODB.utils import u64 31 from zope.component.hooks import getSite 32 from zope.event import notify 33 from waeup.kofa.interfaces import ( 34 IJobManager, IProgressable, IJobContainer, WAEUP_KEY, 35 ) 16 36 17 37 def compute(num): 38 """A convenience function to test asynchronous jobs. 39 """ 18 40 start = time.time() 19 41 duration = 60 20 42 end = start + duration 21 43 print "MyJob starts computation at ", start 22 #from zc.async.local import getJob, getQueue23 44 myjob = zc.async.local.getJob() 24 45 print "MyJob's job: ", zc.async.local.getJob() … … 41 62 return num * 2 42 63 43 def get_next_num(context): 44 num = 1 45 print "KEYS: ", [x for x in context['jobs'].keys()] 46 while str(num) in [x for x in context['jobs'].keys()]:47 num += 148 print "NEXT: ", str(num)49 return str(num)50 51 class Job(AsyncJob): 64 # 65 # Content components 66 # 67 class ProgressableJob(AsyncJob): 68 """A job that can indicate its progress via a `percent` attribute. 69 """ 70 grok.implements(IJob, IProgressable) 71 grok.provides(IJob) 72 52 73 percent = None 53 pass 54 55 grok.templatedir('.') #os.path.dirname(__file__)) 56 class ASyncView(grok.View): 57 grok.context(IUniversity) 58 grok.name('async') 59 grok.template('async') 60 61 def update(self, start=None, DELETE=None, REMOVE=None, *args, **kw): 62 humanity.need() 63 jqueryui.need() 64 # the image was created via http://ajaxload.info/ 65 self.spinner_url = self.url( 66 grok.getSite()) + '/@@/waeup.kofa.browser/square_circles_black.gif' 67 print args, kw 68 self.dispatcher = zc.async.dispatcher.get() 69 site = grok.getSite() 70 newq = site.__parent__._p_jar.root()['zc.async'].items()[0][1] 71 print "NEW: ", newq, list(newq) 72 73 74 print "DISP LEN: ", dir(self.dispatcher), self.dispatcher.getStatusInfo(), self.dispatcher.getStatistics(), self.dispatcher.jobs.items() # self.dispatcher.getJobInfo() 75 self.queue = IQueue(self.context) 76 self.dispatcher2 = self.queue.dispatchers[zc.async.instanceuuid.UUID] 77 print "JOBS: ", self.dispatcher.jobs.keys(), self.dispatcher.jobs.values() 78 print "DISPATCHERS: ", self.dispatcher, self.dispatcher2 79 print "QUEUES: ", self.dispatcher.queues 80 print "QUEUE1: ", list(self.dispatcher.queues['']) 81 print "QUEUE: ", list(self.queue) 82 #print "DISPATCHERS: ", self.queue.dispatchers.items() 83 print "DEFAULT: ", getDefaultQueue(self.context), len(getDefaultQueue(self.context)) 84 print "DELETE: ", DELETE 85 if DELETE is not None: 86 if DELETE in self.context['jobs']: 87 job = self.context['jobs'][DELETE] 88 from zc.async.interfaces import COMPLETED, NEW, PENDING 89 if job.status in (COMPLETED, NEW): 90 del self.context['jobs'][DELETE] 91 print "DELETED %s" % job 74 75 76 def get_job_id(persisted_job): 77 """Get the object id of an already persisted job. 78 79 The `persisted_job` must provide a `_p_oid` attribute. 80 """ 81 job_id = u64(persisted_job._p_oid) 82 return "%s" % job_id 83 84 class JobContainer(grok.Container): 85 """A container for :class:`IKofa` jobs. 86 """ 87 grok.implements(IJobContainer) 88 89 90 class JobManager(grok.GlobalUtility): 91 """A manager for asynchronous running jobs (tasks). 92 93 Registered as a global utility for the 94 `waeup.kofa.interfaces.IJobManager` interface. 95 96 This is the central location for managing asynchronous running 97 jobs/tasks. 98 99 It works roughly like this: for usual tasks it looks up some 100 JobContainer installed in a ZODB database root (the installation 101 can happen during startup; see the respective installer classes 102 and functions in this module) and then interacts with this 103 JobContainer. 104 105 The optional `site` parameter for most methods in here serves for 106 finding the databases' roots. It is sufficient to pass any 107 persisted object (or more precisely: some object with a valid 108 ``_p_jar__`` attribte). As long as some site was already set (for 109 instance during regular requests), the site is looked up 110 automatically and you don't have to pass the `site` parameter 111 then. So, in most cases you won't have to give a `site` parameter. 112 """ 113 grok.implements(IJobManager) 114 grok.provides(IJobManager) 115 116 def _get_site(self, site): 117 if site is None: 118 site = getSite() 119 if site is None: 120 raise LookupError('cannot find a valid site') 121 return site 122 123 def _get_jobs_container(self, site): 124 # in fact we need _some_ persisted object, not necessarily a site 125 return site._p_jar.root()[WAEUP_KEY] 126 127 def put(self, job, site=None): 128 """Start the `job` and store it in local `site`. 129 130 The `job` must be an `IJob` instance. 131 132 It will be put into the default queue and then stored in local 133 site. The status of the job can be seen immediately in 134 `job.status`. 135 136 Please specify special treatments like `begin_after` or 137 `begin_by` by setting the respectives attributes of the job 138 itself. 139 """ 140 site = self._get_site(site) 141 container = self._get_jobs_container(site) 142 transaction.begin() 143 queue = getDefaultQueue(site) 144 new_job = queue.put(job) 145 job_id = get_job_id(new_job) 146 container[job_id] = new_job 147 transaction.commit() 148 return job_id 149 150 def get(self, job_id, site=None): 151 """Get the job with `job_id` from local `site`. 152 153 If `job_id` cannot be found, ``None`` is returned. This 154 suitable e.g. when used with a traverser. 155 """ 156 site = self._get_site(site) 157 container = self._get_jobs_container(site) 158 return container.get(job_id, None) 159 160 def jobs(self, site=None): 161 """Get all stored jobs as an iterable. 162 163 Result provides tuples (JOB_ID, JOB_OBJECT). 164 """ 165 site = self._get_site(site) 166 container = self._get_jobs_container(site) 167 for job_id, job in container.items(): 168 yield (job_id, job) 169 170 def remove(self, job_id, site=None): 171 """Remove job with `job_id` from local job container. 172 173 If no such job can be found this is silently ignored. 174 175 Please note: removing a job from the job container does not 176 mean to stop its execution (if it wasn't started yet or is 177 currently running). 178 """ 179 site = self._get_site(site) 180 container = self._get_jobs_container(site) 181 if job_id in container.keys(): 182 del container[job_id] 183 return 184 185 def start_test_job(self): 186 """Start a test job. 187 188 A method for testing the general asynchronous functionality of 189 waeup.kofa. The test job started here executes the local 190 :func:`compute` function with ``23`` as argument. 191 """ 192 job = AsyncJob(compute, 23) 193 job_id = self.put(job) 194 return job_id 195 196 class JobManagerTraverser(grok.Traverser): 197 """A traverser for the global ``IJobManager``. 198 199 Looks up jobs by job_id and returns the respective job if it 200 can be found. 201 """ 202 grok.context(IJobManager) 203 204 def traverse(self, name): 205 # ``None`` if name cannot be found. 206 return self.context.get(name) 207 208 ## 209 ## configuration (permissions, subscribers, etc.) ... 210 ## 211 212 class ViewJobs(grok.Permission): 213 grok.name('waeup.viewJobs') 214 215 class ManageJobs(grok.Permission): 216 grok.name('waeup.manageJobs') 217 218 class JobContainerInstaller(object): 219 """Install a JobContainer in root of given DB. 220 221 Instances of this installer can be called when a Zope instance 222 comes up (i.e. when an IDatabaseOpenedEvent was triggered). 223 224 It looks for some database named as in `db_name` and installs a 225 job container in the root of this database (``None`` by default) 226 if it does not exist already. 227 """ 228 def __init__(self, db_name=None): 229 # This IDatabaseOpenedEvent will be from zope.app.appsetup if that 230 # package is around 231 zope.component.adapter(zc.async.interfaces.IDatabaseOpenedEvent)(self) 232 self.db_name = db_name 233 return 234 235 def __call__(self, ev): 236 db = ev.database 237 tm = transaction.TransactionManager() 238 conn = db.open(transaction_manager=tm) 239 tm.begin() 240 try: 241 try: 242 root = conn.root() 243 if WAEUP_KEY in root: 244 return 245 if self.db_name is not None: 246 other = conn.get_connection(self.db_name) 247 container = other.root()[WAEUP_KEY] = JobContainer() 248 other.add(container) 92 249 else: 93 if job in self.queue: 94 print "JOB IN ", self.queue, "!!!!!!!!!!!!!!!!!!" 95 print "QUEUE BEFORE: ", list(self.queue) 96 try: 97 self.queue.remove(job) 98 except: 99 "COULDN'T REMOVE ANYWAY!!" 100 print "QUEUE AFTER: ", list(self.queue) 101 self.queue.pull() 102 print "QUEUE AFTER2: ", list(self.queue) 103 #job_queue = job.queue 104 #agent = job.agent 105 #print "JOB AGENT: ", agent, dir(agent) 106 #print "JOB AGENT QUEUE: ", agent.queue, list(agent.queue) 107 #print "JOB QUEUE: ", job_queue, list(job_queue) 108 #if job in list(job_queue) or True: 109 # agent.remove(job) 110 # #job.queue.remove(job) 111 # print "REMOVED %s from queue %s" % ( 112 # job, job_queue) 113 else: 114 print "CANT FIND JOB in QUEUE" 115 116 #import pdb; pdb.set_trace() 117 if start: 118 start_time = datetime.datetime.utcnow() 119 start_time += datetime.timedelta(seconds=10) 120 start_time = utc.localize(start_time) 121 print "START" 122 print "STARTTIME: ", start_time 123 import transaction 124 transaction.begin() 125 job = self.queue.put(Job(compute, 21), 126 begin_after=start_time) 127 job.annotations['percent'] = 0.0 128 print " started: ", job.status, type(job) 129 if 'jobs' not in self.context: 130 self.context['jobs'] = grok.Container() 131 job_id = get_next_num(self.context) 132 print "JOB ID: ", job_id, dir(job) 133 self.context['jobs'][job_id] = job 134 transaction.commit() 135 time.sleep(0.5) 136 self.redirect(self.url()) 137 138 def job_status(self): 139 if 'jobs' not in self.context: 140 return '' 141 result = "" 142 #for num, job in enumerate(self.queue): 143 for num, job in self.context['jobs'].items(): 144 result += '<div><form>' 145 if job.status in ('completed-status', 'new-status'): 146 result += '<a href="?DELETE=%s">delete</a>' %( 147 num) 148 else: # job.status == 'pendig-status' or ( 149 # job.status == 'active-status'): 150 result += '<a href="?DELETE=%s">remove from queue</a>' % ( 151 num) 152 result += '<input type="checkbox" name="remove" />%s %s, %s, %r</form></div>' % ( 153 num, 154 escape('%r' % job), 155 job.status, 156 dict(job.annotations)) 157 print "job in queue: ", job, type(job) 158 return result 159 160 def NOrender(self): 161 print "RENDER: ", self.namespace().items(), type(self.namespace) 162 return grok.PageTemplate('Hallo!') 163 #print "RENDER: ", self.static['square_circles_black.gif'] 164 #return r''' 165 return ''' 166 <html> 167 <head> 168 <title>Async testing</title> 169 <script type="text/javascript"> 170 $(document).ready(function() { 171 var myLoadingDiv = $("#progressbar"); 172 var mySpinner = $("#loadspinner"); 173 myLoadingDiv.progressbar({disabled:true}); 174 function update() { 175 $.getJSON("status?", {jobid: "4",}, function(data) { 176 if (data == null) { 177 myLoadingDiv.progressbar("disable"); 178 myLoadingDiv.progressbar("value", 0); 179 $("#msg").html("Syncing..."); 180 clearInterval(intervalID); 181 //alert('No Data'): 182 } 183 else { 184 myLoadingDiv.progressbar("enable"); 185 myLoadingDiv.progressbar("value", data.percent); 186 $("#msg").html(data.msg + ' (' + data.status + ') '); 187 //alert(data.msg); 188 if (data.percent == 100) { 189 clearInterval(intervalID); 190 } 191 } 192 });} 193 intervalID = setInterval(update, 500); 194 //update(); 195 }); 196 </script> 197 </head> 198 <body> 199 <h1>Async stuff</h1> 200 <div>Dispatcher: %s</div> 201 <div>Queue: %s</div> 202 <div>Jobs: %s</div> 203 <form method="POST"> 204 <input type="submit" name="reload" value="reload" /> 205 <input type="submit" name="start" value="start up" /> 206 </form> 207 <div id="progressbar" 208 class="ui-progressbar ui-widget ui-widget-content ui-corner-all"> 209 <div id="msg">Hi there</div> 210 </div> 211 <div id="loadspinner"> 212 </div> 213 </body> 214 </htlm> 215 ''' % ( 216 escape('%r' % self.dispatcher), 217 escape('%r' % self.queue), 218 self.job_status(), 219 ) 220 221 222 class ASyncJSON(grok.JSON): 223 grok.context(IUniversity) 224 225 def status(self, jobid): 226 print "request for ", jobid 227 msg = 'please wait...' 228 job = self.context['jobs'].get(jobid, None) 229 if not job: 230 return 231 percent = job.annotations.get('percent', None) 232 if not percent: 233 return 234 percent = int(percent+0.5) 235 print "percent: ", percent 236 if percent >= 100: 237 percent = 100 238 msg = 'done.' 239 status_map = { 240 NEW: 'new', 241 COMPLETED: 'completed', 242 PENDING: 'pending', 243 ACTIVE: 'active', 244 ASSIGNED: 'assigned', 245 } 246 return {'msg': msg, 247 'percent': percent, 248 'status': status_map.get(job.status, 'unknown'), 249 } 250 container = JobContainer() 251 root[WAEUP_KEY] = container 252 notify(ObjectAdded(container, root, WAEUP_KEY)) 253 tm.commit() 254 zc.async.utils.log.info('job container added') 255 except: 256 tm.abort() 257 raise 258 finally: 259 conn.close() 260 return 261 262 #: Can be used as event subscriber from ZCML; installs a job container 263 #: in default database (named ``''``) root. 264 job_container_installer = JobContainerInstaller() 265 266 #: An installer instance that installs a job container in a ZODB 267 #: called ``async`` - this name is used in several components of the 268 #: `zc.async` package we don't want to reimplement here. 269 #: 270 #: To use this installer as an event subscriber by ZCML, make sure the 271 #: instance provides also a ZODB called `async`. 272 multidb_job_container_installer = JobContainerInstaller(db_name='async')
Note: See TracChangeset for help on using the changeset viewer.