import datetime import grok import os import time import zc.async.dispatcher from cgi import escape from pytz import utc from zc.async.interfaces import IQueue from zc.async.job import Job as AsyncJob from zc.async.queue import getDefaultQueue from waeup.kofa.interfaces import IUniversity from hurry.jqueryui import jqueryui, base, blitzer, ui_lightness, humanity from hurry.jquery import jquery from zc.async.interfaces import COMPLETED, NEW, PENDING, ACTIVE, ASSIGNED def compute(num): start = time.time() duration = 60 end = start + duration print "MyJob starts computation at ", start #from zc.async.local import getJob, getQueue myjob = zc.async.local.getJob() print "MyJob's job: ", zc.async.local.getJob() print "MyJob's queue: ", zc.async.local.getQueue(), list( zc.async.local.getQueue()) while True: if time.time() > end: break time.sleep(1) percent = (time.time() - start) * 100.0 / duration if percent > 100.0: percent = 100.0 print "MyJob percent: ", percent zc.async.local.setLiveAnnotation('percent', percent) print "MyJob does something at %s of %s" % ( time.time() - start, duration) print "MyJob's annotations: %r" % dict(myjob.annotations) print "MyJob's percent: %r" % zc.async.local.getLiveAnnotation( 'percent') return num * 2 def get_next_num(context): num = 1 print "KEYS: ", [x for x in context['jobs'].keys()] while str(num) in [x for x in context['jobs'].keys()]: num += 1 print "NEXT: ", str(num) return str(num) class Job(AsyncJob): percent = None pass grok.templatedir('.') #os.path.dirname(__file__)) class ASyncView(grok.View): grok.context(IUniversity) grok.name('async') grok.template('async') def update(self, start=None, DELETE=None, REMOVE=None, *args, **kw): humanity.need() jqueryui.need() # the image was created via http://ajaxload.info/ self.spinner_url = self.url( grok.getSite()) + '/@@/waeup.kofa.browser/square_circles_black.gif' print args, kw self.dispatcher = zc.async.dispatcher.get() site = grok.getSite() newq = site.__parent__._p_jar.root()['zc.async'].items()[0][1] print "NEW: ", newq, list(newq) print "DISP LEN: ", dir(self.dispatcher), self.dispatcher.getStatusInfo(), self.dispatcher.getStatistics(), self.dispatcher.jobs.items() # self.dispatcher.getJobInfo() self.queue = IQueue(self.context) self.dispatcher2 = self.queue.dispatchers[zc.async.instanceuuid.UUID] print "JOBS: ", self.dispatcher.jobs.keys(), self.dispatcher.jobs.values() print "DISPATCHERS: ", self.dispatcher, self.dispatcher2 print "QUEUES: ", self.dispatcher.queues print "QUEUE1: ", list(self.dispatcher.queues['']) print "QUEUE: ", list(self.queue) #print "DISPATCHERS: ", self.queue.dispatchers.items() print "DEFAULT: ", getDefaultQueue(self.context), len(getDefaultQueue(self.context)) print "DELETE: ", DELETE if DELETE is not None: if DELETE in self.context['jobs']: job = self.context['jobs'][DELETE] from zc.async.interfaces import COMPLETED, NEW, PENDING if job.status in (COMPLETED, NEW): del self.context['jobs'][DELETE] print "DELETED %s" % job else: if job in self.queue: print "JOB IN ", self.queue, "!!!!!!!!!!!!!!!!!!" print "QUEUE BEFORE: ", list(self.queue) try: self.queue.remove(job) except: "COULDN'T REMOVE ANYWAY!!" print "QUEUE AFTER: ", list(self.queue) self.queue.pull() print "QUEUE AFTER2: ", list(self.queue) #job_queue = job.queue #agent = job.agent #print "JOB AGENT: ", agent, dir(agent) #print "JOB AGENT QUEUE: ", agent.queue, list(agent.queue) #print "JOB QUEUE: ", job_queue, list(job_queue) #if job in list(job_queue) or True: # agent.remove(job) # #job.queue.remove(job) # print "REMOVED %s from queue %s" % ( # job, job_queue) else: print "CANT FIND JOB in QUEUE" #import pdb; pdb.set_trace() if start: start_time = datetime.datetime.utcnow() start_time += datetime.timedelta(seconds=10) start_time = utc.localize(start_time) print "START" print "STARTTIME: ", start_time import transaction transaction.begin() job = self.queue.put(Job(compute, 21), begin_after=start_time) job.annotations['percent'] = 0.0 print " started: ", job.status, type(job) if 'jobs' not in self.context: self.context['jobs'] = grok.Container() job_id = get_next_num(self.context) print "JOB ID: ", job_id, dir(job) self.context['jobs'][job_id] = job transaction.commit() time.sleep(0.5) self.redirect(self.url()) def job_status(self): if 'jobs' not in self.context: return '' result = "" #for num, job in enumerate(self.queue): for num, job in self.context['jobs'].items(): result += '
' if job.status in ('completed-status', 'new-status'): result += 'delete' %( num) else: # job.status == 'pendig-status' or ( # job.status == 'active-status'): result += 'remove from queue' % ( num) result += '%s %s, %s, %r
' % ( num, escape('%r' % job), job.status, dict(job.annotations)) print "job in queue: ", job, type(job) return result def NOrender(self): print "RENDER: ", self.namespace().items(), type(self.namespace) return grok.PageTemplate('Hallo!') #print "RENDER: ", self.static['square_circles_black.gif'] #return r''' return ''' Async testing

Async stuff

Dispatcher: %s
Queue: %s
Jobs: %s
Hi there
''' % ( escape('%r' % self.dispatcher), escape('%r' % self.queue), self.job_status(), ) class ASyncJSON(grok.JSON): grok.context(IUniversity) def status(self, jobid): print "request for ", jobid msg = 'please wait...' job = self.context['jobs'].get(jobid, None) if not job: return percent = job.annotations.get('percent', None) if not percent: return percent = int(percent+0.5) print "percent: ", percent if percent >= 100: percent = 100 msg = 'done.' status_map = { NEW: 'new', COMPLETED: 'completed', PENDING: 'pending', ACTIVE: 'active', ASSIGNED: 'assigned', } return {'msg': msg, 'percent': percent, 'status': status_map.get(job.status, 'unknown'), }