source: main/waeup.kofa/branches/uli-zc-async/src/waeup/kofa/async.py @ 10400

Last change on this file since 10400 was 9102, checked in by uli, 12 years ago

Support duration setting for async test jobs.

File size: 9.2 KB
RevLine 
[9082]1## $Id$
2##
3## Copyright (C) 2012 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
18"""Components for asynchronous job (task) handling, mainly based on
19:mod:`zc.async`.
20"""
[8864]21import grok
22import time
[9082]23import transaction
[8864]24import zc.async.dispatcher
[9082]25import zc.async.utils
26import zope.component
[8864]27from zc.async.job import Job as AsyncJob
28from zc.async.queue import getDefaultQueue
[9082]29from zc.async.interfaces import IJob, ObjectAdded
30from ZODB.utils import u64
31from zope.component.hooks import getSite
32from zope.event import notify
33from waeup.kofa.interfaces import (
34    IJobManager, IProgressable, IJobContainer, WAEUP_KEY,
35    )
[8864]36
[9102]37def compute(num, duration=60):
[9082]38    """A convenience function to test asynchronous jobs.
[9102]39
40    `duration` gives the seconds, this job should (artificially) need
41    for completing.
[9082]42    """
[8864]43    start = time.time()
44    end = start + duration
45    print "MyJob starts computation at ", start
46    myjob = zc.async.local.getJob()
47    print "MyJob's job: ", zc.async.local.getJob()
48    print "MyJob's queue: ", zc.async.local.getQueue(), list(
49        zc.async.local.getQueue())
50    while True:
51        if time.time() > end:
52            break
53        time.sleep(1)
54        percent = (time.time() - start) * 100.0 / duration
55        if percent > 100.0:
56            percent = 100.0
57        print "MyJob percent: ", percent
58        zc.async.local.setLiveAnnotation('percent', percent)
59        print "MyJob does something at %s of %s" % (
60            time.time() - start, duration)
61        print "MyJob's annotations: %r" % dict(myjob.annotations)
62        print "MyJob's percent: %r" % zc.async.local.getLiveAnnotation(
63            'percent')
[9102]64    zc.async.local.setLiveAnnotation('percent', 100.0)
[8864]65    return num * 2
66
[9082]67#
68# Content components
69#
70class ProgressableJob(AsyncJob):
71    """A job that can indicate its progress via a `percent` attribute.
72    """
73    grok.implements(IJob, IProgressable)
74    grok.provides(IJob)
[8864]75    percent = None
76
77
[9082]78def get_job_id(persisted_job):
79    """Get the object id of an already persisted job.
[8864]80
[9082]81    The `persisted_job` must provide a `_p_oid` attribute.
82    """
83    job_id = u64(persisted_job._p_oid)
84    return "%s" % job_id
[8864]85
[9082]86class JobContainer(grok.Container):
87    """A container for :class:`IKofa` jobs.
88    """
89    grok.implements(IJobContainer)
[8864]90
91
[9082]92class JobManager(grok.GlobalUtility):
93    """A manager for asynchronous running jobs (tasks).
[8864]94
[9082]95    Registered as a global utility for the
96    `waeup.kofa.interfaces.IJobManager` interface.
[8864]97
[9082]98    This is the central location for managing asynchronous running
99    jobs/tasks.
[8864]100
[9082]101    It works roughly like this: for usual tasks it looks up some
102    JobContainer installed in a ZODB database root (the installation
103    can happen during startup; see the respective installer classes
104    and functions in this module) and then interacts with this
105    JobContainer.
[8864]106
[9082]107    The optional `site` parameter for most methods in here serves for
108    finding the databases' roots. It is sufficient to pass any
109    persisted object (or more precisely: some object with a valid
110    ``_p_jar__`` attribte). As long as some site was already set (for
111    instance during regular requests), the site is looked up
112    automatically and you don't have to pass the `site` parameter
113    then. So, in most cases you won't have to give a `site` parameter.
114    """
115    grok.implements(IJobManager)
116    grok.provides(IJobManager)
117
118    def _get_site(self, site):
119        if site is None:
120            site = getSite()
121            if site is None:
122                raise LookupError('cannot find a valid site')
123        return site
124
125    def _get_jobs_container(self, site):
126        # in fact we need _some_ persisted object, not necessarily a site
127        return site._p_jar.root()[WAEUP_KEY]
128
129    def put(self, job, site=None):
130        """Start the `job` and store it in local `site`.
131
132        The `job` must be an `IJob` instance.
133
134        It will be put into the default queue and then stored in local
135        site. The status of the job can be seen immediately in
136        `job.status`.
137
138        Please specify special treatments like `begin_after` or
139        `begin_by` by setting the respectives attributes of the job
140        itself.
141        """
142        site = self._get_site(site)
143        container = self._get_jobs_container(site)
144        transaction.begin()
145        queue = getDefaultQueue(site)
146        new_job = queue.put(job)
147        job_id = get_job_id(new_job)
148        container[job_id] = new_job
149        transaction.commit()
150        return job_id
151
152    def get(self, job_id, site=None):
153        """Get the job with `job_id` from local `site`.
154
155        If `job_id` cannot be found, ``None`` is returned. This
156        suitable e.g. when used with a traverser.
157        """
158        site = self._get_site(site)
159        container = self._get_jobs_container(site)
160        return container.get(job_id, None)
161
162    def jobs(self, site=None):
163        """Get all stored jobs as an iterable.
164
165        Result provides tuples (JOB_ID, JOB_OBJECT).
166        """
167        site = self._get_site(site)
168        container = self._get_jobs_container(site)
169        for job_id, job in container.items():
170            yield (job_id, job)
171
172    def remove(self, job_id, site=None):
173        """Remove job with `job_id` from local job container.
174
175        If no such job can be found this is silently ignored.
176
177        Please note: removing a job from the job container does not
178        mean to stop its execution (if it wasn't started yet or is
179        currently running).
180        """
181        site = self._get_site(site)
182        container = self._get_jobs_container(site)
183        if job_id in container.keys():
184            del container[job_id]
185        return
186
[9102]187    def start_test_job(self, duration=60, site=None):
[9082]188        """Start a test job.
189
190        A method for testing the general asynchronous functionality of
191        waeup.kofa. The test job started here executes the local
192        :func:`compute` function with ``23`` as argument.
193        """
[9102]194        job = AsyncJob(compute, 23, duration)
195        job_id = self.put(job, site=site)
[9082]196        return job_id
197
198class JobManagerTraverser(grok.Traverser):
199    """A traverser for the global ``IJobManager``.
200
201    Looks up jobs by job_id and returns the respective job if it
202    can be found.
203    """
204    grok.context(IJobManager)
205
206    def traverse(self, name):
207        # ``None`` if name cannot be found.
208        return self.context.get(name)
209
210##
211## configuration (permissions, subscribers, etc.) ...
212##
213
214class ViewJobs(grok.Permission):
215    grok.name('waeup.viewJobs')
216
217class ManageJobs(grok.Permission):
218    grok.name('waeup.manageJobs')
219
220class JobContainerInstaller(object):
221    """Install a JobContainer in root of given DB.
222
223    Instances of this installer can be called when a Zope instance
224    comes up (i.e. when an IDatabaseOpenedEvent was triggered).
225
226    It looks for some database named as in `db_name` and installs a
227    job container in the root of this database (``None`` by default)
228    if it does not exist already.
229    """
230    def __init__(self, db_name=None):
231        # This IDatabaseOpenedEvent will be from zope.app.appsetup if that
232        # package is around
233        zope.component.adapter(zc.async.interfaces.IDatabaseOpenedEvent)(self)
234        self.db_name = db_name
235        return
236
237    def __call__(self, ev):
238        db = ev.database
239        tm = transaction.TransactionManager()
240        conn = db.open(transaction_manager=tm)
241        tm.begin()
242        try:
243            try:
244                root = conn.root()
245                if WAEUP_KEY in root:
246                    return
247                if self.db_name is not None:
248                    other = conn.get_connection(self.db_name)
249                    container = other.root()[WAEUP_KEY] = JobContainer()
250                    other.add(container)
251                else:
252                    container = JobContainer()
253                root[WAEUP_KEY] = container
254                notify(ObjectAdded(container, root, WAEUP_KEY))
255                tm.commit()
256                zc.async.utils.log.info('job container added')
257            except:
258                tm.abort()
259                raise
260        finally:
261            conn.close()
262        return
263
264#: Can be used as event subscriber from ZCML; installs a job container
265#: in default database (named ``''``) root.
266job_container_installer = JobContainerInstaller()
267
268#: An installer instance that installs a job container in a ZODB
269#: called ``async`` - this name is used in several components of the
270#: `zc.async` package we don't want to reimplement here.
271#:
272#: To use this installer as an event subscriber by ZCML, make sure the
273#: instance provides also a ZODB called `async`.
274multidb_job_container_installer = JobContainerInstaller(db_name='async')
Note: See TracBrowser for help on using the repository browser.