source: WAeUP_SRP/base/PatchQueueCatalogProcessQueue.py @ 2259

Last change on this file since 2259 was 2259, checked in by joachim, 17 years ago

new feature bypass catalog_queue

  • Property svn:keywords set to Id
File size: 7.8 KB
Line 
1import sets
2import logging
3from types import StringType
4from AccessControl.Permissions \
5import manage_zcatalog_entries, view_management_screens
6from AccessControl import ModuleSecurityInfo, ClassSecurityInfo
7from AccessControl.SecurityInfo import ClassSecurityInformation
8from Products.PageTemplates.PageTemplateFile import PageTemplateFile
9from Products.QueueCatalog.CatalogEventQueue import CatalogEventQueue, EVENT_TYPES, ADDED_EVENTS
10from Products.QueueCatalog.CatalogEventQueue import ADDED, CHANGED, CHANGED_ADDED, REMOVED
11from Products.QueueCatalog.CatalogEventQueue import SAFE_POLICY, ALTERNATIVE_POLICY
12from Products.QueueCatalog.QueueCatalog import cataloged
13from Products.QueueCatalog.QueueCatalog import QueueCatalog
14from ZODB.POSException import ConflictError
15from ZEO.Exceptions import ClientDisconnected
16logger = logging.getLogger('event.QueueCatalog')
17security = ClassSecurityInformation()
18
19def __init__(self,
20             buckets=1009,
21             conflict_policy=SAFE_POLICY,
22             bypass = False,
23             location = "portal_catalog_real"):
24    self._buckets = buckets
25    self._conflict_policy = conflict_policy
26    self._clearQueues()
27    self._bypass = bypass
28    self._location = location
29
30security.declareProtected(view_management_screens, 'getBypassQueue')
31def getBypassQueue(self):
32    "get _by_pass"
33    if not hasattr(self,"_bypass"):
34        self._bypass = False
35    return self._bypass
36
37security.declareProtected(view_management_screens, 'setBypassQueue')
38def setBypassQueue(self, bypass=False):
39    "set _bypass"
40    self._bypass = bypass
41
42def catalog_object(self, obj, uid=None, idxs=None, update_metadata=1):
43    # update_metadata=0 is ignored if the queued catalog is set to
44    # update metadata during queue processing, rather than immediately
45
46    # similarly, limiting the idxs only limits the immediate indexes.  If
47    # any work needs to be done in the queue processing, it will all be
48    # done: we have not implemented partial indexing during queue
49    # processing.  The only way to avoid any of it is to avoid all of it
50    # (i.e., update metadata immediately and don't have any indexes to
51    # update on the queued side).
52
53    # Make sure the current context is allowed to do this:
54    catalog_object = self.getZCatalog('catalog_object')
55
56    if uid is None:
57        uid = '/'.join(obj.getPhysicalPath())
58    elif not isinstance(uid, StringType):
59        uid = '/'.join(uid)
60
61    catalog = self.getZCatalog()
62    cat_indexes = sets.Set(catalog.indexes())
63    immediate_indexes = sets.Set(self._immediate_indexes)
64    cat_indexes -= immediate_indexes
65
66    # The ZCatalog API doesn't allow us to distinguish between
67    # adds and updates, so we have to try to figure this out
68    # ourselves.
69
70    # There's a risk of a race here. What if there is a previously
71    # unprocessed add event? If so, then this should be a changed
72    # event. If we undo this transaction later, we'll generate a
73    # remove event, when we should generate an add changed event.
74    # To avoid this, we need to make sure we see consistent values
75    # of the event queue. We also need to avoid resolving
76    # (non-undo) conflicts of add events. This will slow things
77    # down a bit, but adds should be relatively infrequent.
78    bypass_queue = self.getBypassQueue()   
79    # Now, try to decide if the catalog has the uid (path).
80    already_cataloged = cataloged(catalog, uid)
81    if not already_cataloged:
82        # Looks like we should add, but maybe there's already a
83        # pending add event. We'd better check the event queue:
84        already_cataloged = (
85            self._queues[hash(uid) % self._buckets].getEvent(uid) in
86            ADDED_EVENTS)
87    if idxs and already_cataloged:
88        # if not already_cataloged, we index the whole thing
89        idxs = sets.Set(idxs)
90        immediate_indexes.intersection_update(idxs)
91        cat_indexes.intersection_update(idxs)
92
93    immediate_metadata = self.getImmediateMetadataUpdate()
94    if cat_indexes or update_metadata and not immediate_metadata:
95        if bypass_queue:
96            catalog_object(obj, uid, cat_indexes)
97        else:
98            import pdb;pdb.set_trace()
99            self._update(uid, already_cataloged and CHANGED or ADDED)
100        # Update some of the indexes immediately.
101    elif immediate_indexes:
102        # Update some of the indexes immediately.
103        catalog.catalog_object(
104            obj, uid, immediate_indexes,
105            update_metadata=update_metadata and immediate_metadata)
106    elif update_metadata and immediate_metadata:
107        # if it is added, no point in doing the metadata, and it will be
108        # done in the queue process anyway
109        catalog._catalog.updateMetadata(obj, uid)
110
111def _process_queue(self, queue, limit):
112    """Process a single queue"""
113    catalog = self.getZCatalog()
114
115    if self.getProcessAllIndexes():
116        #idxs = None
117        idxs = catalog.indexes()
118    else:
119        cat_indexes = sets.Set(catalog.indexes())
120        immediate_indexes = sets.Set(self._immediate_indexes)
121        if not immediate_indexes or immediate_indexes==cat_indexes:
122            idxs = catalog.indexes() # do all of 'em
123        else:
124            idxs = list(cat_indexes - immediate_indexes)
125    events = queue.process(limit)
126    count = 0
127
128    for uid, (t, event) in events.items():
129        if event is REMOVED:
130            try:
131                if cataloged(catalog, uid):
132                    catalog.uncatalog_object(uid)
133            except (ConflictError, ClientDisconnected):
134                logger.error('conflict-error uncataloging object', exc_info=True)
135            except:
136                logger.error('error uncataloging object', exc_info=True)
137        else:
138            # add or change
139            if event is CHANGED and not cataloged(catalog, uid):
140                continue
141            # Note that the uid may be relative to the catalog.
142            obj = catalog.unrestrictedTraverse(uid, None)
143            if obj is not None:
144                immediate_metadata = self.getImmediateMetadataUpdate()
145                try:
146                    catalog.catalog_object(
147                        obj, uid, idxs=idxs,
148                        update_metadata=not immediate_metadata)
149                except (ConflictError, ClientDisconnected):
150                    logger.error('conflict-error uncataloging object', exc_info=True)
151                    #raise
152                except:
153                    logger.error('error cataloging object', exc_info=True)
154
155        count = count + 1
156
157    return count
158def manage_edit(self, title='', location='', immediate_indexes=(),
159                immediate_removal=0, bucket_count=0, immediate_metadata=0,
160                all_indexes=0, conflict_policy=SAFE_POLICY, RESPONSE=None,
161               enable_bypass=0):
162    """ Edit the instance """
163    self.title = title
164    self.setLocation(location or None)
165    self.setImmediateIndexes(immediate_indexes)
166    self.setImmediateRemoval(immediate_removal)
167    self.setImmediateMetadataUpdate(immediate_metadata)
168    self.setProcessAllIndexes(all_indexes)
169    self.setConflictPolicy(conflict_policy)
170    self.setBypassQueue(enable_bypass)
171    if bucket_count:
172        bucket_count = int(bucket_count)
173        if bucket_count != self.getBucketCount():
174            self.setBucketCount(bucket_count)
175
176    if RESPONSE is not None:
177        RESPONSE.redirect('%s/manage_editForm?manage_tabs_message='
178                            'Properties+changed' % self.absolute_url())
179
180QueueCatalog.__init__ = __init__
181QueueCatalog.getBypassQueue = getBypassQueue
182QueueCatalog.getBypassQueue__roles__ = ['Manager', 'Owner',]
183QueueCatalog.setBypassQueue = setBypassQueue
184QueueCatalog.setBypassQueue__roles__ = ['Manager', 'Owner',]
185QueueCatalog.catalog_object = catalog_object
186QueueCatalog._process_queue = _process_queue
187QueueCatalog.manage_editForm = PageTemplateFile('zmi/qc_edit', globals())
188QueueCatalog.manage_edit = manage_edit
189
Note: See TracBrowser for help on using the repository browser.