Ignore:
Timestamp:
5 Aug 2012, 13:29:12 (12 years ago)
Author:
uli
Message:

Clean up, move UI components away and provide a single job manager as
utility.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • main/waeup.kofa/branches/uli-zc-async/src/waeup/kofa/async.py

    r8864 r9082  
    1 import datetime
     1## $Id$
     2##
     3## Copyright (C) 2012 Uli Fouquet & Henrik Bettermann
     4## This program is free software; you can redistribute it and/or modify
     5## it under the terms of the GNU General Public License as published by
     6## the Free Software Foundation; either version 2 of the License, or
     7## (at your option) any later version.
     8##
     9## This program is distributed in the hope that it will be useful,
     10## but WITHOUT ANY WARRANTY; without even the implied warranty of
     11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
     12## GNU General Public License for more details.
     13##
     14## You should have received a copy of the GNU General Public License
     15## along with this program; if not, write to the Free Software
     16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
     17##
     18"""Components for asynchronous job (task) handling, mainly based on
     19:mod:`zc.async`.
     20"""
    221import grok
    3 import os
    422import time
     23import transaction
    524import zc.async.dispatcher
    6 from cgi import escape
    7 from pytz import utc
    8 from zc.async.interfaces import IQueue
     25import zc.async.utils
     26import zope.component
    927from zc.async.job import Job as AsyncJob
    1028from zc.async.queue import getDefaultQueue
    11 from waeup.kofa.interfaces import IUniversity
    12 from hurry.jqueryui import jqueryui, base, blitzer, ui_lightness, humanity
    13 from hurry.jquery import jquery
    14 from zc.async.interfaces import COMPLETED, NEW, PENDING, ACTIVE, ASSIGNED
    15 
     29from zc.async.interfaces import IJob, ObjectAdded
     30from ZODB.utils import u64
     31from zope.component.hooks import getSite
     32from zope.event import notify
     33from waeup.kofa.interfaces import (
     34    IJobManager, IProgressable, IJobContainer, WAEUP_KEY,
     35    )
    1636
    1737def compute(num):
     38    """A convenience function to test asynchronous jobs.
     39    """
    1840    start = time.time()
    1941    duration = 60
    2042    end = start + duration
    2143    print "MyJob starts computation at ", start
    22     #from zc.async.local import getJob, getQueue
    2344    myjob = zc.async.local.getJob()
    2445    print "MyJob's job: ", zc.async.local.getJob()
     
    4162    return num * 2
    4263
    43 def get_next_num(context):
    44     num = 1
    45     print "KEYS: ", [x for x in context['jobs'].keys()]
    46     while str(num) in [x for x in context['jobs'].keys()]:
    47         num += 1
    48     print "NEXT: ", str(num)
    49     return str(num)
    50 
    51 class Job(AsyncJob):
     64#
     65# Content components
     66#
     67class ProgressableJob(AsyncJob):
     68    """A job that can indicate its progress via a `percent` attribute.
     69    """
     70    grok.implements(IJob, IProgressable)
     71    grok.provides(IJob)
     72
    5273    percent = None
    53     pass
    54 
    55 grok.templatedir('.') #os.path.dirname(__file__))
    56 class ASyncView(grok.View):
    57     grok.context(IUniversity)
    58     grok.name('async')
    59     grok.template('async')
    60 
    61     def update(self,  start=None, DELETE=None, REMOVE=None, *args, **kw):
    62         humanity.need()
    63         jqueryui.need()
    64         # the image was created via http://ajaxload.info/
    65         self.spinner_url = self.url(
    66             grok.getSite()) + '/@@/waeup.kofa.browser/square_circles_black.gif'
    67         print args, kw
    68         self.dispatcher = zc.async.dispatcher.get()
    69         site = grok.getSite()
    70         newq = site.__parent__._p_jar.root()['zc.async'].items()[0][1]
    71         print "NEW: ", newq, list(newq)
    72 
    73 
    74         print "DISP LEN: ", dir(self.dispatcher), self.dispatcher.getStatusInfo(), self.dispatcher.getStatistics(), self.dispatcher.jobs.items() # self.dispatcher.getJobInfo()
    75         self.queue = IQueue(self.context)
    76         self.dispatcher2 = self.queue.dispatchers[zc.async.instanceuuid.UUID]
    77         print "JOBS: ", self.dispatcher.jobs.keys(), self.dispatcher.jobs.values()
    78         print "DISPATCHERS: ", self.dispatcher, self.dispatcher2
    79         print "QUEUES: ", self.dispatcher.queues
    80         print "QUEUE1: ", list(self.dispatcher.queues[''])
    81         print "QUEUE: ", list(self.queue)
    82         #print "DISPATCHERS: ", self.queue.dispatchers.items()
    83         print "DEFAULT: ", getDefaultQueue(self.context), len(getDefaultQueue(self.context))
    84         print "DELETE: ", DELETE
    85         if DELETE is not None:
    86             if DELETE in self.context['jobs']:
    87                 job = self.context['jobs'][DELETE]
    88                 from zc.async.interfaces import COMPLETED, NEW, PENDING
    89                 if job.status in (COMPLETED, NEW):
    90                     del self.context['jobs'][DELETE]
    91                     print "DELETED %s" % job
     74
     75
     76def get_job_id(persisted_job):
     77    """Get the object id of an already persisted job.
     78
     79    The `persisted_job` must provide a `_p_oid` attribute.
     80    """
     81    job_id = u64(persisted_job._p_oid)
     82    return "%s" % job_id
     83
     84class JobContainer(grok.Container):
     85    """A container for :class:`IKofa` jobs.
     86    """
     87    grok.implements(IJobContainer)
     88
     89
     90class JobManager(grok.GlobalUtility):
     91    """A manager for asynchronous running jobs (tasks).
     92
     93    Registered as a global utility for the
     94    `waeup.kofa.interfaces.IJobManager` interface.
     95
     96    This is the central location for managing asynchronous running
     97    jobs/tasks.
     98
     99    It works roughly like this: for usual tasks it looks up some
     100    JobContainer installed in a ZODB database root (the installation
     101    can happen during startup; see the respective installer classes
     102    and functions in this module) and then interacts with this
     103    JobContainer.
     104
     105    The optional `site` parameter for most methods in here serves for
     106    finding the databases' roots. It is sufficient to pass any
     107    persisted object (or more precisely: some object with a valid
     108    ``_p_jar__`` attribte). As long as some site was already set (for
     109    instance during regular requests), the site is looked up
     110    automatically and you don't have to pass the `site` parameter
     111    then. So, in most cases you won't have to give a `site` parameter.
     112    """
     113    grok.implements(IJobManager)
     114    grok.provides(IJobManager)
     115
     116    def _get_site(self, site):
     117        if site is None:
     118            site = getSite()
     119            if site is None:
     120                raise LookupError('cannot find a valid site')
     121        return site
     122
     123    def _get_jobs_container(self, site):
     124        # in fact we need _some_ persisted object, not necessarily a site
     125        return site._p_jar.root()[WAEUP_KEY]
     126
     127    def put(self, job, site=None):
     128        """Start the `job` and store it in local `site`.
     129
     130        The `job` must be an `IJob` instance.
     131
     132        It will be put into the default queue and then stored in local
     133        site. The status of the job can be seen immediately in
     134        `job.status`.
     135
     136        Please specify special treatments like `begin_after` or
     137        `begin_by` by setting the respectives attributes of the job
     138        itself.
     139        """
     140        site = self._get_site(site)
     141        container = self._get_jobs_container(site)
     142        transaction.begin()
     143        queue = getDefaultQueue(site)
     144        new_job = queue.put(job)
     145        job_id = get_job_id(new_job)
     146        container[job_id] = new_job
     147        transaction.commit()
     148        return job_id
     149
     150    def get(self, job_id, site=None):
     151        """Get the job with `job_id` from local `site`.
     152
     153        If `job_id` cannot be found, ``None`` is returned. This
     154        suitable e.g. when used with a traverser.
     155        """
     156        site = self._get_site(site)
     157        container = self._get_jobs_container(site)
     158        return container.get(job_id, None)
     159
     160    def jobs(self, site=None):
     161        """Get all stored jobs as an iterable.
     162
     163        Result provides tuples (JOB_ID, JOB_OBJECT).
     164        """
     165        site = self._get_site(site)
     166        container = self._get_jobs_container(site)
     167        for job_id, job in container.items():
     168            yield (job_id, job)
     169
     170    def remove(self, job_id, site=None):
     171        """Remove job with `job_id` from local job container.
     172
     173        If no such job can be found this is silently ignored.
     174
     175        Please note: removing a job from the job container does not
     176        mean to stop its execution (if it wasn't started yet or is
     177        currently running).
     178        """
     179        site = self._get_site(site)
     180        container = self._get_jobs_container(site)
     181        if job_id in container.keys():
     182            del container[job_id]
     183        return
     184
     185    def start_test_job(self):
     186        """Start a test job.
     187
     188        A method for testing the general asynchronous functionality of
     189        waeup.kofa. The test job started here executes the local
     190        :func:`compute` function with ``23`` as argument.
     191        """
     192        job = AsyncJob(compute, 23)
     193        job_id = self.put(job)
     194        return job_id
     195
     196class JobManagerTraverser(grok.Traverser):
     197    """A traverser for the global ``IJobManager``.
     198
     199    Looks up jobs by job_id and returns the respective job if it
     200    can be found.
     201    """
     202    grok.context(IJobManager)
     203
     204    def traverse(self, name):
     205        # ``None`` if name cannot be found.
     206        return self.context.get(name)
     207
     208##
     209## configuration (permissions, subscribers, etc.) ...
     210##
     211
     212class ViewJobs(grok.Permission):
     213    grok.name('waeup.viewJobs')
     214
     215class ManageJobs(grok.Permission):
     216    grok.name('waeup.manageJobs')
     217
     218class JobContainerInstaller(object):
     219    """Install a JobContainer in root of given DB.
     220
     221    Instances of this installer can be called when a Zope instance
     222    comes up (i.e. when an IDatabaseOpenedEvent was triggered).
     223
     224    It looks for some database named as in `db_name` and installs a
     225    job container in the root of this database (``None`` by default)
     226    if it does not exist already.
     227    """
     228    def __init__(self, db_name=None):
     229        # This IDatabaseOpenedEvent will be from zope.app.appsetup if that
     230        # package is around
     231        zope.component.adapter(zc.async.interfaces.IDatabaseOpenedEvent)(self)
     232        self.db_name = db_name
     233        return
     234
     235    def __call__(self, ev):
     236        db = ev.database
     237        tm = transaction.TransactionManager()
     238        conn = db.open(transaction_manager=tm)
     239        tm.begin()
     240        try:
     241            try:
     242                root = conn.root()
     243                if WAEUP_KEY in root:
     244                    return
     245                if self.db_name is not None:
     246                    other = conn.get_connection(self.db_name)
     247                    container = other.root()[WAEUP_KEY] = JobContainer()
     248                    other.add(container)
    92249                else:
    93                     if job in self.queue:
    94                         print "JOB IN ", self.queue, "!!!!!!!!!!!!!!!!!!"
    95                         print "QUEUE BEFORE: ", list(self.queue)
    96                         try:
    97                             self.queue.remove(job)
    98                         except:
    99                             "COULDN'T REMOVE ANYWAY!!"
    100                         print "QUEUE AFTER: ", list(self.queue)
    101                         self.queue.pull()
    102                         print "QUEUE AFTER2: ", list(self.queue)
    103                     #job_queue = job.queue
    104                     #agent = job.agent
    105                     #print "JOB AGENT: ", agent, dir(agent)
    106                     #print "JOB AGENT QUEUE: ", agent.queue, list(agent.queue)
    107                     #print "JOB QUEUE: ", job_queue, list(job_queue)
    108                     #if job in list(job_queue) or True:
    109                     #    agent.remove(job)
    110                     #    #job.queue.remove(job)
    111                     #    print "REMOVED %s from queue %s" % (
    112                     #        job, job_queue)
    113                     else:
    114                         print "CANT FIND JOB in QUEUE"
    115 
    116         #import pdb; pdb.set_trace()
    117         if start:
    118             start_time = datetime.datetime.utcnow()
    119             start_time += datetime.timedelta(seconds=10)
    120             start_time = utc.localize(start_time)
    121             print "START"
    122             print "STARTTIME: ", start_time
    123             import transaction
    124             transaction.begin()
    125             job = self.queue.put(Job(compute, 21),
    126                                  begin_after=start_time)
    127             job.annotations['percent'] = 0.0
    128             print " started: ", job.status, type(job)
    129             if 'jobs' not in self.context:
    130                 self.context['jobs'] = grok.Container()
    131             job_id = get_next_num(self.context)
    132             print "JOB ID: ", job_id, dir(job)
    133             self.context['jobs'][job_id] = job
    134             transaction.commit()
    135             time.sleep(0.5)
    136             self.redirect(self.url())
    137 
    138     def job_status(self):
    139         if 'jobs' not in self.context:
    140             return ''
    141         result = ""
    142         #for num, job in enumerate(self.queue):
    143         for num, job in self.context['jobs'].items():
    144             result += '<div><form>'
    145             if job.status in ('completed-status', 'new-status'):
    146                 result += '<a href="?DELETE=%s">delete</a>' %(
    147                     num)
    148             else: # job.status == 'pendig-status' or (
    149                 # job.status == 'active-status'):
    150                 result += '<a href="?DELETE=%s">remove from queue</a>' % (
    151                     num)
    152             result += '<input type="checkbox" name="remove" />%s %s, %s, %r</form></div>' % (
    153                 num,
    154                 escape('%r' % job),
    155                 job.status,
    156                 dict(job.annotations))
    157             print "job in queue: ", job, type(job)
    158         return result
    159 
    160     def NOrender(self):
    161         print "RENDER: ", self.namespace().items(), type(self.namespace)
    162         return grok.PageTemplate('Hallo!')
    163         #print "RENDER: ", self.static['square_circles_black.gif']
    164         #return r'''
    165         return '''
    166 <html>
    167 <head>
    168 <title>Async testing</title>
    169 <script type="text/javascript">
    170 $(document).ready(function() {
    171   var myLoadingDiv = $("#progressbar");
    172   var mySpinner = $("#loadspinner");
    173   myLoadingDiv.progressbar({disabled:true});
    174   function update() {
    175   $.getJSON("status?",  {jobid: "4",}, function(data) {
    176     if (data == null) {
    177       myLoadingDiv.progressbar("disable");
    178       myLoadingDiv.progressbar("value", 0);
    179       $("#msg").html("Syncing...");
    180       clearInterval(intervalID);
    181       //alert('No Data'):
    182       }
    183     else {
    184       myLoadingDiv.progressbar("enable");
    185       myLoadingDiv.progressbar("value", data.percent);
    186       $("#msg").html(data.msg + ' (' + data.status + ') ');
    187       //alert(data.msg);
    188       if (data.percent == 100) {
    189         clearInterval(intervalID);
    190         }
    191       }
    192   });}
    193   intervalID = setInterval(update, 500);
    194   //update();
    195 });
    196 </script>
    197 </head>
    198 <body>
    199 <h1>Async stuff</h1>
    200 <div>Dispatcher: %s</div>
    201 <div>Queue: %s</div>
    202 <div>Jobs: %s</div>
    203 <form method="POST">
    204   <input type="submit" name="reload" value="reload" />
    205   <input type="submit" name="start" value="start up" />
    206 </form>
    207 <div id="progressbar"
    208      class="ui-progressbar ui-widget ui-widget-content ui-corner-all">
    209        <div id="msg">Hi there</div>
    210 </div>
    211 <div id="loadspinner">
    212 </div>
    213 </body>
    214 </htlm>
    215 ''' % (
    216    escape('%r' % self.dispatcher),
    217    escape('%r' % self.queue),
    218    self.job_status(),
    219    )
    220 
    221 
    222 class ASyncJSON(grok.JSON):
    223     grok.context(IUniversity)
    224 
    225     def status(self, jobid):
    226         print "request for ", jobid
    227         msg = 'please wait...'
    228         job = self.context['jobs'].get(jobid, None)
    229         if not job:
    230             return
    231         percent = job.annotations.get('percent', None)
    232         if not percent:
    233             return
    234         percent = int(percent+0.5)
    235         print "percent: ", percent
    236         if percent >= 100:
    237             percent = 100
    238             msg = 'done.'
    239         status_map = {
    240             NEW: 'new',
    241             COMPLETED: 'completed',
    242             PENDING: 'pending',
    243             ACTIVE: 'active',
    244             ASSIGNED: 'assigned',
    245             }
    246         return {'msg': msg,
    247                 'percent': percent,
    248                 'status': status_map.get(job.status, 'unknown'),
    249                 }
     250                    container = JobContainer()
     251                root[WAEUP_KEY] = container
     252                notify(ObjectAdded(container, root, WAEUP_KEY))
     253                tm.commit()
     254                zc.async.utils.log.info('job container added')
     255            except:
     256                tm.abort()
     257                raise
     258        finally:
     259            conn.close()
     260        return
     261
     262#: Can be used as event subscriber from ZCML; installs a job container
     263#: in default database (named ``''``) root.
     264job_container_installer = JobContainerInstaller()
     265
     266#: An installer instance that installs a job container in a ZODB
     267#: called ``async`` - this name is used in several components of the
     268#: `zc.async` package we don't want to reimplement here.
     269#:
     270#: To use this installer as an event subscriber by ZCML, make sure the
     271#: instance provides also a ZODB called `async`.
     272multidb_job_container_installer = JobContainerInstaller(db_name='async')
Note: See TracChangeset for help on using the changeset viewer.