source: main/waeup.kofa/trunk/src/waeup/kofa/async.py @ 10654

Last change on this file since 10654 was 9792, checked in by uli, 12 years ago

Fix nasty problem that arises with virtual export containers: they are
not persisted and therefore no proper ZODB root could be found before
this fix.

File size: 9.4 KB
RevLine 
[9217]1## $Id$
2##
3## Copyright (C) 2012 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
18"""Components for asynchronous job (task) handling, mainly based on
19:mod:`zc.async`.
20"""
21import grok
22import time
23import transaction
24import zc.async.dispatcher
25import zc.async.utils
26import zope.component
27from zc.async.job import Job as AsyncJob
28from zc.async.queue import getDefaultQueue
29from zc.async.interfaces import IJob, ObjectAdded
30from ZODB.utils import u64
31from zope.component.hooks import getSite
32from zope.event import notify
33from waeup.kofa.interfaces import (
34    IJobManager, IProgressable, IJobContainer, WAEUP_KEY,
35    )
36
37def compute(num, duration=60):
38    """A convenience function to test asynchronous jobs.
39
40    `duration` gives the seconds, this job should (artificially) need
41    for completing.
42    """
43    start = time.time()
44    end = start + duration
45    print "MyJob starts computation at ", start
46    myjob = zc.async.local.getJob()
47    print "MyJob's job: ", zc.async.local.getJob()
48    print "MyJob's queue: ", zc.async.local.getQueue(), list(
49        zc.async.local.getQueue())
50    while True:
51        if time.time() > end:
52            break
53        time.sleep(1)
54        percent = (time.time() - start) * 100.0 / duration
55        if percent > 100.0:
56            percent = 100.0
57        print "MyJob percent: ", percent
58        zc.async.local.setLiveAnnotation('percent', percent)
59        print "MyJob does something at %s of %s" % (
60            time.time() - start, duration)
61        print "MyJob's annotations: %r" % dict(myjob.annotations)
62        print "MyJob's percent: %r" % zc.async.local.getLiveAnnotation(
63            'percent')
64    zc.async.local.setLiveAnnotation('percent', 100.0)
65    return num * 2
66
67#
68# Content components
69#
70class ProgressableJob(AsyncJob):
71    """A job that can indicate its progress via a `percent` attribute.
72    """
73    grok.implements(IJob, IProgressable)
74    grok.provides(IJob)
75    percent = None
76
77
78def get_job_id(persisted_job):
79    """Get the object id of an already persisted job.
80
81    The `persisted_job` must provide a `_p_oid` attribute.
82    """
83    job_id = u64(persisted_job._p_oid)
84    return "%s" % job_id
85
86class JobContainer(grok.Container):
87    """A container for :class:`IKofa` jobs.
88    """
89    grok.implements(IJobContainer)
90
91
92class JobManager(grok.GlobalUtility):
93    """A manager for asynchronous running jobs (tasks).
94
95    Registered as a global utility for the
96    `waeup.kofa.interfaces.IJobManager` interface.
97
98    This is the central location for managing asynchronous running
99    jobs/tasks.
100
101    It works roughly like this: for usual tasks it looks up some
102    JobContainer installed in a ZODB database root (the installation
103    can happen during startup; see the respective installer classes
104    and functions in this module) and then interacts with this
105    JobContainer.
106
107    The optional `site` parameter for most methods in here serves for
108    finding the databases' roots. It is sufficient to pass any
109    persisted object (or more precisely: some object with a valid
110    ``_p_jar__`` attribte). As long as some site was already set (for
111    instance during regular requests), the site is looked up
112    automatically and you don't have to pass the `site` parameter
113    then. So, in most cases you won't have to give a `site` parameter.
114    """
115    grok.implements(IJobManager)
116    grok.provides(IJobManager)
117
118    def _get_site(self, site):
[9792]119        # in fact we get some persisted object if available.
120        # As sites are normally persisted and easy to lookup, we use them
121        # to get _some_ persisted object.
122        if not hasattr(site, '_p_jar'):
[9217]123            site = getSite()
124            if site is None:
125                raise LookupError('cannot find a valid site')
126        return site
127
128    def _get_jobs_container(self, site):
129        # in fact we need _some_ persisted object, not necessarily a site
130        return site._p_jar.root()[WAEUP_KEY]
131
132    def put(self, job, site=None):
133        """Start the `job` and store it in local `site`.
134
135        The `job` must be an `IJob` instance.
136
137        It will be put into the default queue and then stored in local
138        site. The status of the job can be seen immediately in
139        `job.status`.
140
141        Please specify special treatments like `begin_after` or
142        `begin_by` by setting the respectives attributes of the job
143        itself.
144        """
145        site = self._get_site(site)
146        container = self._get_jobs_container(site)
147        transaction.begin()
148        queue = getDefaultQueue(site)
149        new_job = queue.put(job)
150        job_id = get_job_id(new_job)
151        container[job_id] = new_job
152        transaction.commit()
153        return job_id
154
155    def get(self, job_id, site=None):
156        """Get the job with `job_id` from local `site`.
157
158        If `job_id` cannot be found, ``None`` is returned. This
159        suitable e.g. when used with a traverser.
160        """
161        site = self._get_site(site)
162        container = self._get_jobs_container(site)
163        return container.get(job_id, None)
164
165    def jobs(self, site=None):
166        """Get all stored jobs as an iterable.
167
168        Result provides tuples (JOB_ID, JOB_OBJECT).
169        """
170        site = self._get_site(site)
171        container = self._get_jobs_container(site)
172        for job_id, job in container.items():
173            yield (job_id, job)
174
175    def remove(self, job_id, site=None):
176        """Remove job with `job_id` from local job container.
177
178        If no such job can be found this is silently ignored.
179
180        Please note: removing a job from the job container does not
181        mean to stop its execution (if it wasn't started yet or is
182        currently running).
183        """
184        site = self._get_site(site)
185        container = self._get_jobs_container(site)
186        if job_id in container.keys():
187            del container[job_id]
188        return
189
190    def start_test_job(self, duration=60, site=None):
191        """Start a test job.
192
193        A method for testing the general asynchronous functionality of
194        waeup.kofa. The test job started here executes the local
195        :func:`compute` function with ``23`` as argument.
196        """
197        job = AsyncJob(compute, 23, duration)
198        job_id = self.put(job, site=site)
199        return job_id
200
201class JobManagerTraverser(grok.Traverser):
202    """A traverser for the global ``IJobManager``.
203
204    Looks up jobs by job_id and returns the respective job if it
205    can be found.
206    """
207    grok.context(IJobManager)
208
209    def traverse(self, name):
210        # ``None`` if name cannot be found.
211        return self.context.get(name)
212
213##
214## configuration (permissions, subscribers, etc.) ...
215##
216
217class ViewJobs(grok.Permission):
218    grok.name('waeup.viewJobs')
219
220class ManageJobs(grok.Permission):
221    grok.name('waeup.manageJobs')
222
223class JobContainerInstaller(object):
224    """Install a JobContainer in root of given DB.
225
226    Instances of this installer can be called when a Zope instance
227    comes up (i.e. when an IDatabaseOpenedEvent was triggered).
228
229    It looks for some database named as in `db_name` and installs a
230    job container in the root of this database (``None`` by default)
231    if it does not exist already.
232    """
233    def __init__(self, db_name=None):
234        # This IDatabaseOpenedEvent will be from zope.app.appsetup if that
235        # package is around
236        zope.component.adapter(zc.async.interfaces.IDatabaseOpenedEvent)(self)
237        self.db_name = db_name
238        return
239
240    def __call__(self, ev):
241        db = ev.database
242        tm = transaction.TransactionManager()
243        conn = db.open(transaction_manager=tm)
244        tm.begin()
245        try:
246            try:
247                root = conn.root()
248                if WAEUP_KEY in root:
249                    return
250                if self.db_name is not None:
251                    other = conn.get_connection(self.db_name)
252                    container = other.root()[WAEUP_KEY] = JobContainer()
253                    other.add(container)
254                else:
255                    container = JobContainer()
256                root[WAEUP_KEY] = container
257                notify(ObjectAdded(container, root, WAEUP_KEY))
258                tm.commit()
259                zc.async.utils.log.info('job container added')
260            except:
261                tm.abort()
262                raise
263        finally:
264            conn.close()
265        return
266
267#: Can be used as event subscriber from ZCML; installs a job container
268#: in default database (named ``''``) root.
269job_container_installer = JobContainerInstaller()
270
271#: An installer instance that installs a job container in a ZODB
272#: called ``async`` - this name is used in several components of the
273#: `zc.async` package we don't want to reimplement here.
274#:
275#: To use this installer as an event subscriber by ZCML, make sure the
276#: instance provides also a ZODB called `async`.
277multidb_job_container_installer = JobContainerInstaller(db_name='async')
Note: See TracBrowser for help on using the repository browser.