source: main/waeup.kofa/branches/uli-py3/src/waeup/kofa/async.py @ 17497

Last change on this file since 17497 was 12581, checked in by uli, 10 years ago

Block jobs if system load is too high.

  • Property svn:keywords set to Id
File size: 9.5 KB
Line 
1## $Id: async.py 12581 2015-02-10 14:13:17Z uli $
2##
3## Copyright (C) 2012 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
18"""Components for asynchronous job (task) handling, mainly based on
19:mod:`zc.async`.
20"""
21import grok
22import time
23import transaction
24import zc.async.dispatcher
25import zc.async.utils
26import zope.component
27from zc.async.job import Job as AsyncJob
28from zc.async.queue import getDefaultQueue
29from zc.async.interfaces import IJob, ObjectAdded
30from ZODB.utils import u64
31from zope.component import getUtility, adapter
32from zope.component.hooks import getSite
33from zope.event import notify
34from waeup.kofa.interfaces import (
35    IJobManager, IProgressable, IJobContainer, WAEUP_KEY, IKofaUtils,
36    )
37
38
39def compute(num, duration=60):
40    """A convenience function to test asynchronous jobs.
41
42    `duration` gives the seconds, this job should (artificially) need
43    for completing.
44    """
45    start = time.time()
46    end = start + duration
47    print "MyJob starts computation at ", start
48    myjob = zc.async.local.getJob()
49    print "MyJob's job: ", zc.async.local.getJob()
50    print "MyJob's queue: ", zc.async.local.getQueue(), list(
51        zc.async.local.getQueue())
52    while True:
53        if time.time() > end:
54            break
55        time.sleep(1)
56        percent = (time.time() - start) * 100.0 / duration
57        if percent > 100.0:
58            percent = 100.0
59        print "MyJob percent: ", percent
60        zc.async.local.setLiveAnnotation('percent', percent)
61        print "MyJob does something at %s of %s" % (
62            time.time() - start, duration)
63        print "MyJob's annotations: %r" % dict(myjob.annotations)
64        print "MyJob's percent: %r" % zc.async.local.getLiveAnnotation(
65            'percent')
66    zc.async.local.setLiveAnnotation('percent', 100.0)
67    return num * 2
68
69#
70# Content components
71#
72class ProgressableJob(AsyncJob):
73    """A job that can indicate its progress via a `percent` attribute.
74    """
75    grok.implements(IJob, IProgressable)
76    grok.provides(IJob)
77    percent = None
78
79
80def get_job_id(persisted_job):
81    """Get the object id of an already persisted job.
82
83    The `persisted_job` must provide a `_p_oid` attribute.
84    """
85    job_id = u64(persisted_job._p_oid)
86    return "%s" % job_id
87
88class JobContainer(grok.Container):
89    """A container for :class:`IKofa` jobs.
90    """
91    grok.implements(IJobContainer)
92
93
94class JobManager(grok.GlobalUtility):
95    """A manager for asynchronous running jobs (tasks).
96
97    Registered as a global utility for the
98    `waeup.kofa.interfaces.IJobManager` interface.
99
100    This is the central location for managing asynchronous running
101    jobs/tasks.
102
103    It works roughly like this: for usual tasks it looks up some
104    JobContainer installed in a ZODB database root (the installation
105    can happen during startup; see the respective installer classes
106    and functions in this module) and then interacts with this
107    JobContainer.
108
109    The optional `site` parameter for most methods in here serves for
110    finding the databases' roots. It is sufficient to pass any
111    persisted object (or more precisely: some object with a valid
112    ``_p_jar__`` attribte). As long as some site was already set (for
113    instance during regular requests), the site is looked up
114    automatically and you don't have to pass the `site` parameter
115    then. So, in most cases you won't have to give a `site` parameter.
116    """
117    grok.implements(IJobManager)
118    grok.provides(IJobManager)
119
120    def _get_site(self, site):
121        # in fact we get some persisted object if available.
122        # As sites are normally persisted and easy to lookup, we use them
123        # to get _some_ persisted object.
124        if not hasattr(site, '_p_jar'):
125            site = getSite()
126            if site is None:
127                raise LookupError('cannot find a valid site')
128        return site
129
130    def _get_jobs_container(self, site):
131        # in fact we need _some_ persisted object, not necessarily a site
132        return site._p_jar.root()[WAEUP_KEY]
133
134    def put(self, job, site=None):
135        """Start the `job` and store it in local `site`.
136
137        The `job` must be an `IJob` instance.
138
139        It will be put into the default queue and then stored in local
140        site. The status of the job can be seen immediately in
141        `job.status`.
142
143        Please specify special treatments like `begin_after` or
144        `begin_by` by setting the respectives attributes of the job
145        itself.
146        """
147        site = self._get_site(site)
148        if not getUtility(IKofaUtils).expensive_actions_allowed():
149            return None
150        container = self._get_jobs_container(site)
151        transaction.begin()
152        queue = getDefaultQueue(site)
153        new_job = queue.put(job)
154        job_id = get_job_id(new_job)
155        container[job_id] = new_job
156        transaction.commit()
157        return job_id
158
159    def get(self, job_id, site=None):
160        """Get the job with `job_id` from local `site`.
161
162        If `job_id` cannot be found, ``None`` is returned. This
163        suitable e.g. when used with a traverser.
164        """
165        site = self._get_site(site)
166        container = self._get_jobs_container(site)
167        return container.get(job_id, None)
168
169    def jobs(self, site=None):
170        """Get all stored jobs as an iterable.
171
172        Result provides tuples (JOB_ID, JOB_OBJECT).
173        """
174        site = self._get_site(site)
175        container = self._get_jobs_container(site)
176        for job_id, job in container.items():
177            yield (job_id, job)
178
179    def remove(self, job_id, site=None):
180        """Remove job with `job_id` from local job container.
181
182        If no such job can be found this is silently ignored.
183
184        Please note: removing a job from the job container does not
185        mean to stop its execution (if it wasn't started yet or is
186        currently running).
187        """
188        site = self._get_site(site)
189        container = self._get_jobs_container(site)
190        if job_id in container.keys():
191            del container[job_id]
192        return
193
194    def start_test_job(self, duration=60, site=None):
195        """Start a test job.
196
197        A method for testing the general asynchronous functionality of
198        waeup.kofa. The test job started here executes the local
199        :func:`compute` function with ``23`` as argument.
200        """
201        job = AsyncJob(compute, 23, duration)
202        job_id = self.put(job, site=site)
203        return job_id
204
205class JobManagerTraverser(grok.Traverser):
206    """A traverser for the global ``IJobManager``.
207
208    Looks up jobs by job_id and returns the respective job if it
209    can be found.
210    """
211    grok.context(IJobManager)
212
213    def traverse(self, name):
214        # ``None`` if name cannot be found.
215        return self.context.get(name)
216
217##
218## configuration (permissions, subscribers, etc.) ...
219##
220
221class ViewJobs(grok.Permission):
222    grok.name('waeup.viewJobs')
223
224class ManageJobs(grok.Permission):
225    grok.name('waeup.manageJobs')
226
227class JobContainerInstaller(object):
228    """Install a JobContainer in root of given DB.
229
230    Instances of this installer can be called when a Zope instance
231    comes up (i.e. when an IDatabaseOpenedEvent was triggered).
232
233    It looks for some database named as in `db_name` and installs a
234    job container in the root of this database (``None`` by default)
235    if it does not exist already.
236    """
237    def __init__(self, db_name=None):
238        # This IDatabaseOpenedEvent will be from zope.app.appsetup if that
239        # package is around
240        adapter(zc.async.interfaces.IDatabaseOpenedEvent)(self)
241        self.db_name = db_name
242        return
243
244    def __call__(self, ev):
245        db = ev.database
246        tm = transaction.TransactionManager()
247        conn = db.open(transaction_manager=tm)
248        tm.begin()
249        try:
250            try:
251                root = conn.root()
252                if WAEUP_KEY in root:
253                    return
254                if self.db_name is not None:
255                    other = conn.get_connection(self.db_name)
256                    container = other.root()[WAEUP_KEY] = JobContainer()
257                    other.add(container)
258                else:
259                    container = JobContainer()
260                root[WAEUP_KEY] = container
261                notify(ObjectAdded(container, root, WAEUP_KEY))
262                tm.commit()
263                zc.async.utils.log.info('job container added')
264            except:
265                tm.abort()
266                raise
267        finally:
268            conn.close()
269        return
270
271#: Can be used as event subscriber from ZCML; installs a job container
272#: in default database (named ``''``) root.
273job_container_installer = JobContainerInstaller()
274
275#: An installer instance that installs a job container in a ZODB
276#: called ``async`` - this name is used in several components of the
277#: `zc.async` package we don't want to reimplement here.
278#:
279#: To use this installer as an event subscriber by ZCML, make sure the
280#: instance provides also a ZODB called `async`.
281multidb_job_container_installer = JobContainerInstaller(db_name='async')
Note: See TracBrowser for help on using the repository browser.