source: main/waeup.kofa/branches/uli-zc-async/src/waeup/kofa/async.py @ 8959

Last change on this file since 8959 was 8864, checked in by uli, 12 years ago

Commit all the cruel async stuff. Only for playing around, yet.

File size: 8.7 KB
Line 
1import datetime
2import grok
3import os
4import time
5import zc.async.dispatcher
6from cgi import escape
7from pytz import utc
8from zc.async.interfaces import IQueue
9from zc.async.job import Job as AsyncJob
10from zc.async.queue import getDefaultQueue
11from waeup.kofa.interfaces import IUniversity
12from hurry.jqueryui import jqueryui, base, blitzer, ui_lightness, humanity
13from hurry.jquery import jquery
14from zc.async.interfaces import COMPLETED, NEW, PENDING, ACTIVE, ASSIGNED
15
16
17def compute(num):
18    start = time.time()
19    duration = 60
20    end = start + duration
21    print "MyJob starts computation at ", start
22    #from zc.async.local import getJob, getQueue
23    myjob = zc.async.local.getJob()
24    print "MyJob's job: ", zc.async.local.getJob()
25    print "MyJob's queue: ", zc.async.local.getQueue(), list(
26        zc.async.local.getQueue())
27    while True:
28        if time.time() > end:
29            break
30        time.sleep(1)
31        percent = (time.time() - start) * 100.0 / duration
32        if percent > 100.0:
33            percent = 100.0
34        print "MyJob percent: ", percent
35        zc.async.local.setLiveAnnotation('percent', percent)
36        print "MyJob does something at %s of %s" % (
37            time.time() - start, duration)
38        print "MyJob's annotations: %r" % dict(myjob.annotations)
39        print "MyJob's percent: %r" % zc.async.local.getLiveAnnotation(
40            'percent')
41    return num * 2
42
43def get_next_num(context):
44    num = 1
45    print "KEYS: ", [x for x in context['jobs'].keys()]
46    while str(num) in [x for x in context['jobs'].keys()]:
47        num += 1
48    print "NEXT: ", str(num)
49    return str(num)
50
51class Job(AsyncJob):
52    percent = None
53    pass
54
55grok.templatedir('.') #os.path.dirname(__file__))
56class ASyncView(grok.View):
57    grok.context(IUniversity)
58    grok.name('async')
59    grok.template('async')
60
61    def update(self,  start=None, DELETE=None, REMOVE=None, *args, **kw):
62        humanity.need()
63        jqueryui.need()
64        # the image was created via http://ajaxload.info/
65        self.spinner_url = self.url(
66            grok.getSite()) + '/@@/waeup.kofa.browser/square_circles_black.gif'
67        print args, kw
68        self.dispatcher = zc.async.dispatcher.get()
69        site = grok.getSite()
70        newq = site.__parent__._p_jar.root()['zc.async'].items()[0][1]
71        print "NEW: ", newq, list(newq)
72
73
74        print "DISP LEN: ", dir(self.dispatcher), self.dispatcher.getStatusInfo(), self.dispatcher.getStatistics(), self.dispatcher.jobs.items() # self.dispatcher.getJobInfo()
75        self.queue = IQueue(self.context)
76        self.dispatcher2 = self.queue.dispatchers[zc.async.instanceuuid.UUID]
77        print "JOBS: ", self.dispatcher.jobs.keys(), self.dispatcher.jobs.values()
78        print "DISPATCHERS: ", self.dispatcher, self.dispatcher2
79        print "QUEUES: ", self.dispatcher.queues
80        print "QUEUE1: ", list(self.dispatcher.queues[''])
81        print "QUEUE: ", list(self.queue)
82        #print "DISPATCHERS: ", self.queue.dispatchers.items()
83        print "DEFAULT: ", getDefaultQueue(self.context), len(getDefaultQueue(self.context))
84        print "DELETE: ", DELETE
85        if DELETE is not None:
86            if DELETE in self.context['jobs']:
87                job = self.context['jobs'][DELETE]
88                from zc.async.interfaces import COMPLETED, NEW, PENDING
89                if job.status in (COMPLETED, NEW):
90                    del self.context['jobs'][DELETE]
91                    print "DELETED %s" % job
92                else:
93                    if job in self.queue:
94                        print "JOB IN ", self.queue, "!!!!!!!!!!!!!!!!!!"
95                        print "QUEUE BEFORE: ", list(self.queue)
96                        try:
97                            self.queue.remove(job)
98                        except:
99                            "COULDN'T REMOVE ANYWAY!!"
100                        print "QUEUE AFTER: ", list(self.queue)
101                        self.queue.pull()
102                        print "QUEUE AFTER2: ", list(self.queue)
103                    #job_queue = job.queue
104                    #agent = job.agent
105                    #print "JOB AGENT: ", agent, dir(agent)
106                    #print "JOB AGENT QUEUE: ", agent.queue, list(agent.queue)
107                    #print "JOB QUEUE: ", job_queue, list(job_queue)
108                    #if job in list(job_queue) or True:
109                    #    agent.remove(job)
110                    #    #job.queue.remove(job)
111                    #    print "REMOVED %s from queue %s" % (
112                    #        job, job_queue)
113                    else:
114                        print "CANT FIND JOB in QUEUE"
115
116        #import pdb; pdb.set_trace()
117        if start:
118            start_time = datetime.datetime.utcnow()
119            start_time += datetime.timedelta(seconds=10)
120            start_time = utc.localize(start_time)
121            print "START"
122            print "STARTTIME: ", start_time
123            import transaction
124            transaction.begin()
125            job = self.queue.put(Job(compute, 21),
126                                 begin_after=start_time)
127            job.annotations['percent'] = 0.0
128            print " started: ", job.status, type(job)
129            if 'jobs' not in self.context:
130                self.context['jobs'] = grok.Container()
131            job_id = get_next_num(self.context)
132            print "JOB ID: ", job_id, dir(job)
133            self.context['jobs'][job_id] = job
134            transaction.commit()
135            time.sleep(0.5)
136            self.redirect(self.url())
137
138    def job_status(self):
139        if 'jobs' not in self.context:
140            return ''
141        result = ""
142        #for num, job in enumerate(self.queue):
143        for num, job in self.context['jobs'].items():
144            result += '<div><form>'
145            if job.status in ('completed-status', 'new-status'):
146                result += '<a href="?DELETE=%s">delete</a>' %(
147                    num)
148            else: # job.status == 'pendig-status' or (
149                # job.status == 'active-status'):
150                result += '<a href="?DELETE=%s">remove from queue</a>' % (
151                    num)
152            result += '<input type="checkbox" name="remove" />%s %s, %s, %r</form></div>' % (
153                num,
154                escape('%r' % job),
155                job.status,
156                dict(job.annotations))
157            print "job in queue: ", job, type(job)
158        return result
159
160    def NOrender(self):
161        print "RENDER: ", self.namespace().items(), type(self.namespace)
162        return grok.PageTemplate('Hallo!')
163        #print "RENDER: ", self.static['square_circles_black.gif']
164        #return r'''
165        return '''
166<html>
167<head>
168<title>Async testing</title>
169<script type="text/javascript">
170$(document).ready(function() {
171  var myLoadingDiv = $("#progressbar");
172  var mySpinner = $("#loadspinner");
173  myLoadingDiv.progressbar({disabled:true});
174  function update() {
175  $.getJSON("status?",  {jobid: "4",}, function(data) {
176    if (data == null) {
177      myLoadingDiv.progressbar("disable");
178      myLoadingDiv.progressbar("value", 0);
179      $("#msg").html("Syncing...");
180      clearInterval(intervalID);
181      //alert('No Data'):
182      }
183    else {
184      myLoadingDiv.progressbar("enable");
185      myLoadingDiv.progressbar("value", data.percent);
186      $("#msg").html(data.msg + ' (' + data.status + ') ');
187      //alert(data.msg);
188      if (data.percent == 100) {
189        clearInterval(intervalID);
190        }
191      }
192  });}
193  intervalID = setInterval(update, 500);
194  //update();
195});
196</script>
197</head>
198<body>
199<h1>Async stuff</h1>
200<div>Dispatcher: %s</div>
201<div>Queue: %s</div>
202<div>Jobs: %s</div>
203<form method="POST">
204  <input type="submit" name="reload" value="reload" />
205  <input type="submit" name="start" value="start up" />
206</form>
207<div id="progressbar"
208     class="ui-progressbar ui-widget ui-widget-content ui-corner-all">
209       <div id="msg">Hi there</div>
210</div>
211<div id="loadspinner">
212</div>
213</body>
214</htlm>
215''' % (
216   escape('%r' % self.dispatcher),
217   escape('%r' % self.queue),
218   self.job_status(),
219   )
220
221
222class ASyncJSON(grok.JSON):
223    grok.context(IUniversity)
224
225    def status(self, jobid):
226        print "request for ", jobid
227        msg = 'please wait...'
228        job = self.context['jobs'].get(jobid, None)
229        if not job:
230            return
231        percent = job.annotations.get('percent', None)
232        if not percent:
233            return
234        percent = int(percent+0.5)
235        print "percent: ", percent
236        if percent >= 100:
237            percent = 100
238            msg = 'done.'
239        status_map = {
240            NEW: 'new',
241            COMPLETED: 'completed',
242            PENDING: 'pending',
243            ACTIVE: 'active',
244            ASSIGNED: 'assigned',
245            }
246        return {'msg': msg,
247                'percent': percent,
248                'status': status_map.get(job.status, 'unknown'),
249                }
Note: See TracBrowser for help on using the repository browser.