source: main/waeup.kofa/branches/uli-async-update/src/waeup/kofa/async.py @ 10009

Last change on this file since 10009 was 9102, checked in by uli, 12 years ago

Support duration setting for async test jobs.

File size: 9.2 KB
Line 
1## $Id$
2##
3## Copyright (C) 2012 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
18"""Components for asynchronous job (task) handling, mainly based on
19:mod:`zc.async`.
20"""
21import grok
22import time
23import transaction
24import zc.async.dispatcher
25import zc.async.utils
26import zope.component
27from zc.async.job import Job as AsyncJob
28from zc.async.queue import getDefaultQueue
29from zc.async.interfaces import IJob, ObjectAdded
30from ZODB.utils import u64
31from zope.component.hooks import getSite
32from zope.event import notify
33from waeup.kofa.interfaces import (
34    IJobManager, IProgressable, IJobContainer, WAEUP_KEY,
35    )
36
37def compute(num, duration=60):
38    """A convenience function to test asynchronous jobs.
39
40    `duration` gives the seconds, this job should (artificially) need
41    for completing.
42    """
43    start = time.time()
44    end = start + duration
45    print "MyJob starts computation at ", start
46    myjob = zc.async.local.getJob()
47    print "MyJob's job: ", zc.async.local.getJob()
48    print "MyJob's queue: ", zc.async.local.getQueue(), list(
49        zc.async.local.getQueue())
50    while True:
51        if time.time() > end:
52            break
53        time.sleep(1)
54        percent = (time.time() - start) * 100.0 / duration
55        if percent > 100.0:
56            percent = 100.0
57        print "MyJob percent: ", percent
58        zc.async.local.setLiveAnnotation('percent', percent)
59        print "MyJob does something at %s of %s" % (
60            time.time() - start, duration)
61        print "MyJob's annotations: %r" % dict(myjob.annotations)
62        print "MyJob's percent: %r" % zc.async.local.getLiveAnnotation(
63            'percent')
64    zc.async.local.setLiveAnnotation('percent', 100.0)
65    return num * 2
66
67#
68# Content components
69#
70class ProgressableJob(AsyncJob):
71    """A job that can indicate its progress via a `percent` attribute.
72    """
73    grok.implements(IJob, IProgressable)
74    grok.provides(IJob)
75    percent = None
76
77
78def get_job_id(persisted_job):
79    """Get the object id of an already persisted job.
80
81    The `persisted_job` must provide a `_p_oid` attribute.
82    """
83    job_id = u64(persisted_job._p_oid)
84    return "%s" % job_id
85
86class JobContainer(grok.Container):
87    """A container for :class:`IKofa` jobs.
88    """
89    grok.implements(IJobContainer)
90
91
92class JobManager(grok.GlobalUtility):
93    """A manager for asynchronous running jobs (tasks).
94
95    Registered as a global utility for the
96    `waeup.kofa.interfaces.IJobManager` interface.
97
98    This is the central location for managing asynchronous running
99    jobs/tasks.
100
101    It works roughly like this: for usual tasks it looks up some
102    JobContainer installed in a ZODB database root (the installation
103    can happen during startup; see the respective installer classes
104    and functions in this module) and then interacts with this
105    JobContainer.
106
107    The optional `site` parameter for most methods in here serves for
108    finding the databases' roots. It is sufficient to pass any
109    persisted object (or more precisely: some object with a valid
110    ``_p_jar__`` attribte). As long as some site was already set (for
111    instance during regular requests), the site is looked up
112    automatically and you don't have to pass the `site` parameter
113    then. So, in most cases you won't have to give a `site` parameter.
114    """
115    grok.implements(IJobManager)
116    grok.provides(IJobManager)
117
118    def _get_site(self, site):
119        if site is None:
120            site = getSite()
121            if site is None:
122                raise LookupError('cannot find a valid site')
123        return site
124
125    def _get_jobs_container(self, site):
126        # in fact we need _some_ persisted object, not necessarily a site
127        return site._p_jar.root()[WAEUP_KEY]
128
129    def put(self, job, site=None):
130        """Start the `job` and store it in local `site`.
131
132        The `job` must be an `IJob` instance.
133
134        It will be put into the default queue and then stored in local
135        site. The status of the job can be seen immediately in
136        `job.status`.
137
138        Please specify special treatments like `begin_after` or
139        `begin_by` by setting the respectives attributes of the job
140        itself.
141        """
142        site = self._get_site(site)
143        container = self._get_jobs_container(site)
144        transaction.begin()
145        queue = getDefaultQueue(site)
146        new_job = queue.put(job)
147        job_id = get_job_id(new_job)
148        container[job_id] = new_job
149        transaction.commit()
150        return job_id
151
152    def get(self, job_id, site=None):
153        """Get the job with `job_id` from local `site`.
154
155        If `job_id` cannot be found, ``None`` is returned. This
156        suitable e.g. when used with a traverser.
157        """
158        site = self._get_site(site)
159        container = self._get_jobs_container(site)
160        return container.get(job_id, None)
161
162    def jobs(self, site=None):
163        """Get all stored jobs as an iterable.
164
165        Result provides tuples (JOB_ID, JOB_OBJECT).
166        """
167        site = self._get_site(site)
168        container = self._get_jobs_container(site)
169        for job_id, job in container.items():
170            yield (job_id, job)
171
172    def remove(self, job_id, site=None):
173        """Remove job with `job_id` from local job container.
174
175        If no such job can be found this is silently ignored.
176
177        Please note: removing a job from the job container does not
178        mean to stop its execution (if it wasn't started yet or is
179        currently running).
180        """
181        site = self._get_site(site)
182        container = self._get_jobs_container(site)
183        if job_id in container.keys():
184            del container[job_id]
185        return
186
187    def start_test_job(self, duration=60, site=None):
188        """Start a test job.
189
190        A method for testing the general asynchronous functionality of
191        waeup.kofa. The test job started here executes the local
192        :func:`compute` function with ``23`` as argument.
193        """
194        job = AsyncJob(compute, 23, duration)
195        job_id = self.put(job, site=site)
196        return job_id
197
198class JobManagerTraverser(grok.Traverser):
199    """A traverser for the global ``IJobManager``.
200
201    Looks up jobs by job_id and returns the respective job if it
202    can be found.
203    """
204    grok.context(IJobManager)
205
206    def traverse(self, name):
207        # ``None`` if name cannot be found.
208        return self.context.get(name)
209
210##
211## configuration (permissions, subscribers, etc.) ...
212##
213
214class ViewJobs(grok.Permission):
215    grok.name('waeup.viewJobs')
216
217class ManageJobs(grok.Permission):
218    grok.name('waeup.manageJobs')
219
220class JobContainerInstaller(object):
221    """Install a JobContainer in root of given DB.
222
223    Instances of this installer can be called when a Zope instance
224    comes up (i.e. when an IDatabaseOpenedEvent was triggered).
225
226    It looks for some database named as in `db_name` and installs a
227    job container in the root of this database (``None`` by default)
228    if it does not exist already.
229    """
230    def __init__(self, db_name=None):
231        # This IDatabaseOpenedEvent will be from zope.app.appsetup if that
232        # package is around
233        zope.component.adapter(zc.async.interfaces.IDatabaseOpenedEvent)(self)
234        self.db_name = db_name
235        return
236
237    def __call__(self, ev):
238        db = ev.database
239        tm = transaction.TransactionManager()
240        conn = db.open(transaction_manager=tm)
241        tm.begin()
242        try:
243            try:
244                root = conn.root()
245                if WAEUP_KEY in root:
246                    return
247                if self.db_name is not None:
248                    other = conn.get_connection(self.db_name)
249                    container = other.root()[WAEUP_KEY] = JobContainer()
250                    other.add(container)
251                else:
252                    container = JobContainer()
253                root[WAEUP_KEY] = container
254                notify(ObjectAdded(container, root, WAEUP_KEY))
255                tm.commit()
256                zc.async.utils.log.info('job container added')
257            except:
258                tm.abort()
259                raise
260        finally:
261            conn.close()
262        return
263
264#: Can be used as event subscriber from ZCML; installs a job container
265#: in default database (named ``''``) root.
266job_container_installer = JobContainerInstaller()
267
268#: An installer instance that installs a job container in a ZODB
269#: called ``async`` - this name is used in several components of the
270#: `zc.async` package we don't want to reimplement here.
271#:
272#: To use this installer as an event subscriber by ZCML, make sure the
273#: instance provides also a ZODB called `async`.
274multidb_job_container_installer = JobContainerInstaller(db_name='async')
Note: See TracBrowser for help on using the repository browser.