source: WAeUP_SRP/base/PatchQueueCatalogProcessQueue.py @ 3019

Last change on this file since 3019 was 2266, checked in by Henrik Bettermann, 17 years ago

catalog_objects ignored immediate_indexes - fixed

  • Property svn:keywords set to Id
File size: 7.8 KB
RevLine 
[1930]1import sets
[2074]2import logging
[2259]3from types import StringType
4from AccessControl.Permissions \
5import manage_zcatalog_entries, view_management_screens
6from AccessControl import ModuleSecurityInfo, ClassSecurityInfo
7from AccessControl.SecurityInfo import ClassSecurityInformation
8from Products.PageTemplates.PageTemplateFile import PageTemplateFile
[1934]9from Products.QueueCatalog.CatalogEventQueue import CatalogEventQueue, EVENT_TYPES, ADDED_EVENTS
10from Products.QueueCatalog.CatalogEventQueue import ADDED, CHANGED, CHANGED_ADDED, REMOVED
11from Products.QueueCatalog.CatalogEventQueue import SAFE_POLICY, ALTERNATIVE_POLICY
12from Products.QueueCatalog.QueueCatalog import cataloged
[2266]13from Products.QueueCatalog.QueueCatalog import QueueCatalog
[2073]14from ZODB.POSException import ConflictError
15from ZEO.Exceptions import ClientDisconnected
[2074]16logger = logging.getLogger('event.QueueCatalog')
[2259]17security = ClassSecurityInformation()
[1930]18
[2266]19def __init__(self,
20             buckets=1009,
[2235]21             conflict_policy=SAFE_POLICY,
[2259]22             bypass = False,
[2235]23             location = "portal_catalog_real"):
24    self._buckets = buckets
25    self._conflict_policy = conflict_policy
26    self._clearQueues()
[2259]27    self._bypass = bypass
[2235]28    self._location = location
[2259]29
30security.declareProtected(view_management_screens, 'getBypassQueue')
31def getBypassQueue(self):
32    "get _by_pass"
33    if not hasattr(self,"_bypass"):
34        self._bypass = False
[2260]35    return getattr(self,'_bypass',False)
[2259]36
37security.declareProtected(view_management_screens, 'setBypassQueue')
38def setBypassQueue(self, bypass=False):
39    "set _bypass"
40    self._bypass = bypass
41
42def catalog_object(self, obj, uid=None, idxs=None, update_metadata=1):
43    # update_metadata=0 is ignored if the queued catalog is set to
44    # update metadata during queue processing, rather than immediately
45
[2266]46    # similarly, limiting the idxs only limits the immediate indexes.  If
[2259]47    # any work needs to be done in the queue processing, it will all be
[2266]48    # done: we have not implemented partial indexing during queue
49    # processing.  The only way to avoid any of it is to avoid all of it
50    # (i.e., update metadata immediately and don't have any indexes to
[2259]51    # update on the queued side).
52
53    # Make sure the current context is allowed to do this:
54    catalog_object = self.getZCatalog('catalog_object')
55
56    if uid is None:
57        uid = '/'.join(obj.getPhysicalPath())
58    elif not isinstance(uid, StringType):
59        uid = '/'.join(uid)
60
61    catalog = self.getZCatalog()
62    cat_indexes = sets.Set(catalog.indexes())
63    immediate_indexes = sets.Set(self._immediate_indexes)
64    cat_indexes -= immediate_indexes
65
66    # The ZCatalog API doesn't allow us to distinguish between
67    # adds and updates, so we have to try to figure this out
68    # ourselves.
69
70    # There's a risk of a race here. What if there is a previously
71    # unprocessed add event? If so, then this should be a changed
72    # event. If we undo this transaction later, we'll generate a
73    # remove event, when we should generate an add changed event.
74    # To avoid this, we need to make sure we see consistent values
75    # of the event queue. We also need to avoid resolving
76    # (non-undo) conflicts of add events. This will slow things
77    # down a bit, but adds should be relatively infrequent.
[2266]78    bypass_queue = self.getBypassQueue()
[2259]79    # Now, try to decide if the catalog has the uid (path).
80    already_cataloged = cataloged(catalog, uid)
81    if not already_cataloged:
82        # Looks like we should add, but maybe there's already a
83        # pending add event. We'd better check the event queue:
84        already_cataloged = (
85            self._queues[hash(uid) % self._buckets].getEvent(uid) in
86            ADDED_EVENTS)
[2266]87           
[2259]88    if idxs and already_cataloged:
89        # if not already_cataloged, we index the whole thing
90        idxs = sets.Set(idxs)
91        immediate_indexes.intersection_update(idxs)
92        cat_indexes.intersection_update(idxs)
93
94    immediate_metadata = self.getImmediateMetadataUpdate()
95    if cat_indexes or update_metadata and not immediate_metadata:
96        if bypass_queue:
97            catalog_object(obj, uid, cat_indexes)
98        else:
[2264]99            #import pdb;pdb.set_trace()
[2259]100            self._update(uid, already_cataloged and CHANGED or ADDED)
101        # Update some of the indexes immediately.
[2266]102    if immediate_indexes:
[2259]103        # Update some of the indexes immediately.
104        catalog.catalog_object(
105            obj, uid, immediate_indexes,
106            update_metadata=update_metadata and immediate_metadata)
107    elif update_metadata and immediate_metadata:
108        # if it is added, no point in doing the metadata, and it will be
109        # done in the queue process anyway
110        catalog._catalog.updateMetadata(obj, uid)
111
[1926]112def _process_queue(self, queue, limit):
113    """Process a single queue"""
114    catalog = self.getZCatalog()
115
116    if self.getProcessAllIndexes():
117        #idxs = None
118        idxs = catalog.indexes()
119    else:
120        cat_indexes = sets.Set(catalog.indexes())
121        immediate_indexes = sets.Set(self._immediate_indexes)
122        if not immediate_indexes or immediate_indexes==cat_indexes:
123            idxs = catalog.indexes() # do all of 'em
124        else:
125            idxs = list(cat_indexes - immediate_indexes)
126    events = queue.process(limit)
127    count = 0
128
129    for uid, (t, event) in events.items():
130        if event is REMOVED:
131            try:
132                if cataloged(catalog, uid):
133                    catalog.uncatalog_object(uid)
[2079]134            except (ConflictError, ClientDisconnected):
135                logger.error('conflict-error uncataloging object', exc_info=True)
[1926]136            except:
137                logger.error('error uncataloging object', exc_info=True)
138        else:
139            # add or change
140            if event is CHANGED and not cataloged(catalog, uid):
141                continue
142            # Note that the uid may be relative to the catalog.
143            obj = catalog.unrestrictedTraverse(uid, None)
144            if obj is not None:
145                immediate_metadata = self.getImmediateMetadataUpdate()
146                try:
147                    catalog.catalog_object(
148                        obj, uid, idxs=idxs,
149                        update_metadata=not immediate_metadata)
[2079]150                except (ConflictError, ClientDisconnected):
151                    logger.error('conflict-error uncataloging object', exc_info=True)
152                    #raise
[1926]153                except:
154                    logger.error('error cataloging object', exc_info=True)
155
156        count = count + 1
157
158    return count
[2259]159def manage_edit(self, title='', location='', immediate_indexes=(),
160                immediate_removal=0, bucket_count=0, immediate_metadata=0,
161                all_indexes=0, conflict_policy=SAFE_POLICY, RESPONSE=None,
162               enable_bypass=0):
163    """ Edit the instance """
164    self.title = title
165    self.setLocation(location or None)
166    self.setImmediateIndexes(immediate_indexes)
167    self.setImmediateRemoval(immediate_removal)
168    self.setImmediateMetadataUpdate(immediate_metadata)
169    self.setProcessAllIndexes(all_indexes)
170    self.setConflictPolicy(conflict_policy)
171    self.setBypassQueue(enable_bypass)
172    if bucket_count:
173        bucket_count = int(bucket_count)
174        if bucket_count != self.getBucketCount():
175            self.setBucketCount(bucket_count)
176
177    if RESPONSE is not None:
178        RESPONSE.redirect('%s/manage_editForm?manage_tabs_message='
179                            'Properties+changed' % self.absolute_url())
180
[2235]181QueueCatalog.__init__ = __init__
[2259]182QueueCatalog.getBypassQueue = getBypassQueue
183QueueCatalog.getBypassQueue__roles__ = ['Manager', 'Owner',]
184QueueCatalog.setBypassQueue = setBypassQueue
185QueueCatalog.setBypassQueue__roles__ = ['Manager', 'Owner',]
186QueueCatalog.catalog_object = catalog_object
[1926]187QueueCatalog._process_queue = _process_queue
[2259]188QueueCatalog.manage_editForm = PageTemplateFile('zmi/qc_edit', globals())
189QueueCatalog.manage_edit = manage_edit
Note: See TracBrowser for help on using the repository browser.