source: main/waeup.kofa/branches/uli-zc-async/src/waeup/kofa/async.py @ 9083

Last change on this file since 9083 was 9082, checked in by uli, 12 years ago

Clean up, move UI components away and provide a single job manager as
utility.

File size: 9.0 KB
Line 
1## $Id$
2##
3## Copyright (C) 2012 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
18"""Components for asynchronous job (task) handling, mainly based on
19:mod:`zc.async`.
20"""
21import grok
22import time
23import transaction
24import zc.async.dispatcher
25import zc.async.utils
26import zope.component
27from zc.async.job import Job as AsyncJob
28from zc.async.queue import getDefaultQueue
29from zc.async.interfaces import IJob, ObjectAdded
30from ZODB.utils import u64
31from zope.component.hooks import getSite
32from zope.event import notify
33from waeup.kofa.interfaces import (
34    IJobManager, IProgressable, IJobContainer, WAEUP_KEY,
35    )
36
37def compute(num):
38    """A convenience function to test asynchronous jobs.
39    """
40    start = time.time()
41    duration = 60
42    end = start + duration
43    print "MyJob starts computation at ", start
44    myjob = zc.async.local.getJob()
45    print "MyJob's job: ", zc.async.local.getJob()
46    print "MyJob's queue: ", zc.async.local.getQueue(), list(
47        zc.async.local.getQueue())
48    while True:
49        if time.time() > end:
50            break
51        time.sleep(1)
52        percent = (time.time() - start) * 100.0 / duration
53        if percent > 100.0:
54            percent = 100.0
55        print "MyJob percent: ", percent
56        zc.async.local.setLiveAnnotation('percent', percent)
57        print "MyJob does something at %s of %s" % (
58            time.time() - start, duration)
59        print "MyJob's annotations: %r" % dict(myjob.annotations)
60        print "MyJob's percent: %r" % zc.async.local.getLiveAnnotation(
61            'percent')
62    return num * 2
63
64#
65# Content components
66#
67class ProgressableJob(AsyncJob):
68    """A job that can indicate its progress via a `percent` attribute.
69    """
70    grok.implements(IJob, IProgressable)
71    grok.provides(IJob)
72
73    percent = None
74
75
76def get_job_id(persisted_job):
77    """Get the object id of an already persisted job.
78
79    The `persisted_job` must provide a `_p_oid` attribute.
80    """
81    job_id = u64(persisted_job._p_oid)
82    return "%s" % job_id
83
84class JobContainer(grok.Container):
85    """A container for :class:`IKofa` jobs.
86    """
87    grok.implements(IJobContainer)
88
89
90class JobManager(grok.GlobalUtility):
91    """A manager for asynchronous running jobs (tasks).
92
93    Registered as a global utility for the
94    `waeup.kofa.interfaces.IJobManager` interface.
95
96    This is the central location for managing asynchronous running
97    jobs/tasks.
98
99    It works roughly like this: for usual tasks it looks up some
100    JobContainer installed in a ZODB database root (the installation
101    can happen during startup; see the respective installer classes
102    and functions in this module) and then interacts with this
103    JobContainer.
104
105    The optional `site` parameter for most methods in here serves for
106    finding the databases' roots. It is sufficient to pass any
107    persisted object (or more precisely: some object with a valid
108    ``_p_jar__`` attribte). As long as some site was already set (for
109    instance during regular requests), the site is looked up
110    automatically and you don't have to pass the `site` parameter
111    then. So, in most cases you won't have to give a `site` parameter.
112    """
113    grok.implements(IJobManager)
114    grok.provides(IJobManager)
115
116    def _get_site(self, site):
117        if site is None:
118            site = getSite()
119            if site is None:
120                raise LookupError('cannot find a valid site')
121        return site
122
123    def _get_jobs_container(self, site):
124        # in fact we need _some_ persisted object, not necessarily a site
125        return site._p_jar.root()[WAEUP_KEY]
126
127    def put(self, job, site=None):
128        """Start the `job` and store it in local `site`.
129
130        The `job` must be an `IJob` instance.
131
132        It will be put into the default queue and then stored in local
133        site. The status of the job can be seen immediately in
134        `job.status`.
135
136        Please specify special treatments like `begin_after` or
137        `begin_by` by setting the respectives attributes of the job
138        itself.
139        """
140        site = self._get_site(site)
141        container = self._get_jobs_container(site)
142        transaction.begin()
143        queue = getDefaultQueue(site)
144        new_job = queue.put(job)
145        job_id = get_job_id(new_job)
146        container[job_id] = new_job
147        transaction.commit()
148        return job_id
149
150    def get(self, job_id, site=None):
151        """Get the job with `job_id` from local `site`.
152
153        If `job_id` cannot be found, ``None`` is returned. This
154        suitable e.g. when used with a traverser.
155        """
156        site = self._get_site(site)
157        container = self._get_jobs_container(site)
158        return container.get(job_id, None)
159
160    def jobs(self, site=None):
161        """Get all stored jobs as an iterable.
162
163        Result provides tuples (JOB_ID, JOB_OBJECT).
164        """
165        site = self._get_site(site)
166        container = self._get_jobs_container(site)
167        for job_id, job in container.items():
168            yield (job_id, job)
169
170    def remove(self, job_id, site=None):
171        """Remove job with `job_id` from local job container.
172
173        If no such job can be found this is silently ignored.
174
175        Please note: removing a job from the job container does not
176        mean to stop its execution (if it wasn't started yet or is
177        currently running).
178        """
179        site = self._get_site(site)
180        container = self._get_jobs_container(site)
181        if job_id in container.keys():
182            del container[job_id]
183        return
184
185    def start_test_job(self):
186        """Start a test job.
187
188        A method for testing the general asynchronous functionality of
189        waeup.kofa. The test job started here executes the local
190        :func:`compute` function with ``23`` as argument.
191        """
192        job = AsyncJob(compute, 23)
193        job_id = self.put(job)
194        return job_id
195
196class JobManagerTraverser(grok.Traverser):
197    """A traverser for the global ``IJobManager``.
198
199    Looks up jobs by job_id and returns the respective job if it
200    can be found.
201    """
202    grok.context(IJobManager)
203
204    def traverse(self, name):
205        # ``None`` if name cannot be found.
206        return self.context.get(name)
207
208##
209## configuration (permissions, subscribers, etc.) ...
210##
211
212class ViewJobs(grok.Permission):
213    grok.name('waeup.viewJobs')
214
215class ManageJobs(grok.Permission):
216    grok.name('waeup.manageJobs')
217
218class JobContainerInstaller(object):
219    """Install a JobContainer in root of given DB.
220
221    Instances of this installer can be called when a Zope instance
222    comes up (i.e. when an IDatabaseOpenedEvent was triggered).
223
224    It looks for some database named as in `db_name` and installs a
225    job container in the root of this database (``None`` by default)
226    if it does not exist already.
227    """
228    def __init__(self, db_name=None):
229        # This IDatabaseOpenedEvent will be from zope.app.appsetup if that
230        # package is around
231        zope.component.adapter(zc.async.interfaces.IDatabaseOpenedEvent)(self)
232        self.db_name = db_name
233        return
234
235    def __call__(self, ev):
236        db = ev.database
237        tm = transaction.TransactionManager()
238        conn = db.open(transaction_manager=tm)
239        tm.begin()
240        try:
241            try:
242                root = conn.root()
243                if WAEUP_KEY in root:
244                    return
245                if self.db_name is not None:
246                    other = conn.get_connection(self.db_name)
247                    container = other.root()[WAEUP_KEY] = JobContainer()
248                    other.add(container)
249                else:
250                    container = JobContainer()
251                root[WAEUP_KEY] = container
252                notify(ObjectAdded(container, root, WAEUP_KEY))
253                tm.commit()
254                zc.async.utils.log.info('job container added')
255            except:
256                tm.abort()
257                raise
258        finally:
259            conn.close()
260        return
261
262#: Can be used as event subscriber from ZCML; installs a job container
263#: in default database (named ``''``) root.
264job_container_installer = JobContainerInstaller()
265
266#: An installer instance that installs a job container in a ZODB
267#: called ``async`` - this name is used in several components of the
268#: `zc.async` package we don't want to reimplement here.
269#:
270#: To use this installer as an event subscriber by ZCML, make sure the
271#: instance provides also a ZODB called `async`.
272multidb_job_container_installer = JobContainerInstaller(db_name='async')
Note: See TracBrowser for help on using the repository browser.