source: main/waeup.kofa/branches/uli-zc-async/src/waeup/kofa/async.py @ 9090

Last change on this file since 9090 was 9082, checked in by uli, 12 years ago

Clean up, move UI components away and provide a single job manager as
utility.

File size: 9.0 KB
RevLine 
[9082]1## $Id$
2##
3## Copyright (C) 2012 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
18"""Components for asynchronous job (task) handling, mainly based on
19:mod:`zc.async`.
20"""
[8864]21import grok
22import time
[9082]23import transaction
[8864]24import zc.async.dispatcher
[9082]25import zc.async.utils
26import zope.component
[8864]27from zc.async.job import Job as AsyncJob
28from zc.async.queue import getDefaultQueue
[9082]29from zc.async.interfaces import IJob, ObjectAdded
30from ZODB.utils import u64
31from zope.component.hooks import getSite
32from zope.event import notify
33from waeup.kofa.interfaces import (
34    IJobManager, IProgressable, IJobContainer, WAEUP_KEY,
35    )
[8864]36
37def compute(num):
[9082]38    """A convenience function to test asynchronous jobs.
39    """
[8864]40    start = time.time()
41    duration = 60
42    end = start + duration
43    print "MyJob starts computation at ", start
44    myjob = zc.async.local.getJob()
45    print "MyJob's job: ", zc.async.local.getJob()
46    print "MyJob's queue: ", zc.async.local.getQueue(), list(
47        zc.async.local.getQueue())
48    while True:
49        if time.time() > end:
50            break
51        time.sleep(1)
52        percent = (time.time() - start) * 100.0 / duration
53        if percent > 100.0:
54            percent = 100.0
55        print "MyJob percent: ", percent
56        zc.async.local.setLiveAnnotation('percent', percent)
57        print "MyJob does something at %s of %s" % (
58            time.time() - start, duration)
59        print "MyJob's annotations: %r" % dict(myjob.annotations)
60        print "MyJob's percent: %r" % zc.async.local.getLiveAnnotation(
61            'percent')
62    return num * 2
63
[9082]64#
65# Content components
66#
67class ProgressableJob(AsyncJob):
68    """A job that can indicate its progress via a `percent` attribute.
69    """
70    grok.implements(IJob, IProgressable)
71    grok.provides(IJob)
[8864]72
73    percent = None
74
75
[9082]76def get_job_id(persisted_job):
77    """Get the object id of an already persisted job.
[8864]78
[9082]79    The `persisted_job` must provide a `_p_oid` attribute.
80    """
81    job_id = u64(persisted_job._p_oid)
82    return "%s" % job_id
[8864]83
[9082]84class JobContainer(grok.Container):
85    """A container for :class:`IKofa` jobs.
86    """
87    grok.implements(IJobContainer)
[8864]88
89
[9082]90class JobManager(grok.GlobalUtility):
91    """A manager for asynchronous running jobs (tasks).
[8864]92
[9082]93    Registered as a global utility for the
94    `waeup.kofa.interfaces.IJobManager` interface.
[8864]95
[9082]96    This is the central location for managing asynchronous running
97    jobs/tasks.
[8864]98
[9082]99    It works roughly like this: for usual tasks it looks up some
100    JobContainer installed in a ZODB database root (the installation
101    can happen during startup; see the respective installer classes
102    and functions in this module) and then interacts with this
103    JobContainer.
[8864]104
[9082]105    The optional `site` parameter for most methods in here serves for
106    finding the databases' roots. It is sufficient to pass any
107    persisted object (or more precisely: some object with a valid
108    ``_p_jar__`` attribte). As long as some site was already set (for
109    instance during regular requests), the site is looked up
110    automatically and you don't have to pass the `site` parameter
111    then. So, in most cases you won't have to give a `site` parameter.
112    """
113    grok.implements(IJobManager)
114    grok.provides(IJobManager)
115
116    def _get_site(self, site):
117        if site is None:
118            site = getSite()
119            if site is None:
120                raise LookupError('cannot find a valid site')
121        return site
122
123    def _get_jobs_container(self, site):
124        # in fact we need _some_ persisted object, not necessarily a site
125        return site._p_jar.root()[WAEUP_KEY]
126
127    def put(self, job, site=None):
128        """Start the `job` and store it in local `site`.
129
130        The `job` must be an `IJob` instance.
131
132        It will be put into the default queue and then stored in local
133        site. The status of the job can be seen immediately in
134        `job.status`.
135
136        Please specify special treatments like `begin_after` or
137        `begin_by` by setting the respectives attributes of the job
138        itself.
139        """
140        site = self._get_site(site)
141        container = self._get_jobs_container(site)
142        transaction.begin()
143        queue = getDefaultQueue(site)
144        new_job = queue.put(job)
145        job_id = get_job_id(new_job)
146        container[job_id] = new_job
147        transaction.commit()
148        return job_id
149
150    def get(self, job_id, site=None):
151        """Get the job with `job_id` from local `site`.
152
153        If `job_id` cannot be found, ``None`` is returned. This
154        suitable e.g. when used with a traverser.
155        """
156        site = self._get_site(site)
157        container = self._get_jobs_container(site)
158        return container.get(job_id, None)
159
160    def jobs(self, site=None):
161        """Get all stored jobs as an iterable.
162
163        Result provides tuples (JOB_ID, JOB_OBJECT).
164        """
165        site = self._get_site(site)
166        container = self._get_jobs_container(site)
167        for job_id, job in container.items():
168            yield (job_id, job)
169
170    def remove(self, job_id, site=None):
171        """Remove job with `job_id` from local job container.
172
173        If no such job can be found this is silently ignored.
174
175        Please note: removing a job from the job container does not
176        mean to stop its execution (if it wasn't started yet or is
177        currently running).
178        """
179        site = self._get_site(site)
180        container = self._get_jobs_container(site)
181        if job_id in container.keys():
182            del container[job_id]
183        return
184
185    def start_test_job(self):
186        """Start a test job.
187
188        A method for testing the general asynchronous functionality of
189        waeup.kofa. The test job started here executes the local
190        :func:`compute` function with ``23`` as argument.
191        """
192        job = AsyncJob(compute, 23)
193        job_id = self.put(job)
194        return job_id
195
196class JobManagerTraverser(grok.Traverser):
197    """A traverser for the global ``IJobManager``.
198
199    Looks up jobs by job_id and returns the respective job if it
200    can be found.
201    """
202    grok.context(IJobManager)
203
204    def traverse(self, name):
205        # ``None`` if name cannot be found.
206        return self.context.get(name)
207
208##
209## configuration (permissions, subscribers, etc.) ...
210##
211
212class ViewJobs(grok.Permission):
213    grok.name('waeup.viewJobs')
214
215class ManageJobs(grok.Permission):
216    grok.name('waeup.manageJobs')
217
218class JobContainerInstaller(object):
219    """Install a JobContainer in root of given DB.
220
221    Instances of this installer can be called when a Zope instance
222    comes up (i.e. when an IDatabaseOpenedEvent was triggered).
223
224    It looks for some database named as in `db_name` and installs a
225    job container in the root of this database (``None`` by default)
226    if it does not exist already.
227    """
228    def __init__(self, db_name=None):
229        # This IDatabaseOpenedEvent will be from zope.app.appsetup if that
230        # package is around
231        zope.component.adapter(zc.async.interfaces.IDatabaseOpenedEvent)(self)
232        self.db_name = db_name
233        return
234
235    def __call__(self, ev):
236        db = ev.database
237        tm = transaction.TransactionManager()
238        conn = db.open(transaction_manager=tm)
239        tm.begin()
240        try:
241            try:
242                root = conn.root()
243                if WAEUP_KEY in root:
244                    return
245                if self.db_name is not None:
246                    other = conn.get_connection(self.db_name)
247                    container = other.root()[WAEUP_KEY] = JobContainer()
248                    other.add(container)
249                else:
250                    container = JobContainer()
251                root[WAEUP_KEY] = container
252                notify(ObjectAdded(container, root, WAEUP_KEY))
253                tm.commit()
254                zc.async.utils.log.info('job container added')
255            except:
256                tm.abort()
257                raise
258        finally:
259            conn.close()
260        return
261
262#: Can be used as event subscriber from ZCML; installs a job container
263#: in default database (named ``''``) root.
264job_container_installer = JobContainerInstaller()
265
266#: An installer instance that installs a job container in a ZODB
267#: called ``async`` - this name is used in several components of the
268#: `zc.async` package we don't want to reimplement here.
269#:
270#: To use this installer as an event subscriber by ZCML, make sure the
271#: instance provides also a ZODB called `async`.
272multidb_job_container_installer = JobContainerInstaller(db_name='async')
Note: See TracBrowser for help on using the repository browser.