import sets import logging from types import StringType from AccessControl.Permissions \ import manage_zcatalog_entries, view_management_screens from AccessControl import ModuleSecurityInfo, ClassSecurityInfo from AccessControl.SecurityInfo import ClassSecurityInformation from Products.PageTemplates.PageTemplateFile import PageTemplateFile from Products.QueueCatalog.CatalogEventQueue import CatalogEventQueue, EVENT_TYPES, ADDED_EVENTS from Products.QueueCatalog.CatalogEventQueue import ADDED, CHANGED, CHANGED_ADDED, REMOVED from Products.QueueCatalog.CatalogEventQueue import SAFE_POLICY, ALTERNATIVE_POLICY from Products.QueueCatalog.QueueCatalog import cataloged from Products.QueueCatalog.QueueCatalog import QueueCatalog from ZODB.POSException import ConflictError from ZEO.Exceptions import ClientDisconnected logger = logging.getLogger('event.QueueCatalog') security = ClassSecurityInformation() def __init__(self, buckets=1009, conflict_policy=SAFE_POLICY, bypass = False, location = "portal_catalog_real"): self._buckets = buckets self._conflict_policy = conflict_policy self._clearQueues() self._bypass = bypass self._location = location security.declareProtected(view_management_screens, 'getBypassQueue') def getBypassQueue(self): "get _by_pass" if not hasattr(self,"_bypass"): self._bypass = False return getattr(self,'_bypass',False) security.declareProtected(view_management_screens, 'setBypassQueue') def setBypassQueue(self, bypass=False): "set _bypass" self._bypass = bypass def catalog_object(self, obj, uid=None, idxs=None, update_metadata=1): # update_metadata=0 is ignored if the queued catalog is set to # update metadata during queue processing, rather than immediately # similarly, limiting the idxs only limits the immediate indexes. If # any work needs to be done in the queue processing, it will all be # done: we have not implemented partial indexing during queue # processing. The only way to avoid any of it is to avoid all of it # (i.e., update metadata immediately and don't have any indexes to # update on the queued side). # Make sure the current context is allowed to do this: catalog_object = self.getZCatalog('catalog_object') if uid is None: uid = '/'.join(obj.getPhysicalPath()) elif not isinstance(uid, StringType): uid = '/'.join(uid) catalog = self.getZCatalog() cat_indexes = sets.Set(catalog.indexes()) immediate_indexes = sets.Set(self._immediate_indexes) cat_indexes -= immediate_indexes # The ZCatalog API doesn't allow us to distinguish between # adds and updates, so we have to try to figure this out # ourselves. # There's a risk of a race here. What if there is a previously # unprocessed add event? If so, then this should be a changed # event. If we undo this transaction later, we'll generate a # remove event, when we should generate an add changed event. # To avoid this, we need to make sure we see consistent values # of the event queue. We also need to avoid resolving # (non-undo) conflicts of add events. This will slow things # down a bit, but adds should be relatively infrequent. bypass_queue = self.getBypassQueue() # Now, try to decide if the catalog has the uid (path). already_cataloged = cataloged(catalog, uid) if not already_cataloged: # Looks like we should add, but maybe there's already a # pending add event. We'd better check the event queue: already_cataloged = ( self._queues[hash(uid) % self._buckets].getEvent(uid) in ADDED_EVENTS) if idxs and already_cataloged: # if not already_cataloged, we index the whole thing idxs = sets.Set(idxs) immediate_indexes.intersection_update(idxs) cat_indexes.intersection_update(idxs) immediate_metadata = self.getImmediateMetadataUpdate() if cat_indexes or update_metadata and not immediate_metadata: if bypass_queue: catalog_object(obj, uid, cat_indexes) else: #import pdb;pdb.set_trace() self._update(uid, already_cataloged and CHANGED or ADDED) # Update some of the indexes immediately. if immediate_indexes: # Update some of the indexes immediately. catalog.catalog_object( obj, uid, immediate_indexes, update_metadata=update_metadata and immediate_metadata) elif update_metadata and immediate_metadata: # if it is added, no point in doing the metadata, and it will be # done in the queue process anyway catalog._catalog.updateMetadata(obj, uid) def _process_queue(self, queue, limit): """Process a single queue""" catalog = self.getZCatalog() if self.getProcessAllIndexes(): #idxs = None idxs = catalog.indexes() else: cat_indexes = sets.Set(catalog.indexes()) immediate_indexes = sets.Set(self._immediate_indexes) if not immediate_indexes or immediate_indexes==cat_indexes: idxs = catalog.indexes() # do all of 'em else: idxs = list(cat_indexes - immediate_indexes) events = queue.process(limit) count = 0 for uid, (t, event) in events.items(): if event is REMOVED: try: if cataloged(catalog, uid): catalog.uncatalog_object(uid) except (ConflictError, ClientDisconnected): logger.error('conflict-error uncataloging object', exc_info=True) except: logger.error('error uncataloging object', exc_info=True) else: # add or change if event is CHANGED and not cataloged(catalog, uid): continue # Note that the uid may be relative to the catalog. obj = catalog.unrestrictedTraverse(uid, None) if obj is not None: immediate_metadata = self.getImmediateMetadataUpdate() try: catalog.catalog_object( obj, uid, idxs=idxs, update_metadata=not immediate_metadata) except (ConflictError, ClientDisconnected): logger.error('conflict-error uncataloging object', exc_info=True) #raise except: logger.error('error cataloging object', exc_info=True) count = count + 1 return count def manage_edit(self, title='', location='', immediate_indexes=(), immediate_removal=0, bucket_count=0, immediate_metadata=0, all_indexes=0, conflict_policy=SAFE_POLICY, RESPONSE=None, enable_bypass=0): """ Edit the instance """ self.title = title self.setLocation(location or None) self.setImmediateIndexes(immediate_indexes) self.setImmediateRemoval(immediate_removal) self.setImmediateMetadataUpdate(immediate_metadata) self.setProcessAllIndexes(all_indexes) self.setConflictPolicy(conflict_policy) self.setBypassQueue(enable_bypass) if bucket_count: bucket_count = int(bucket_count) if bucket_count != self.getBucketCount(): self.setBucketCount(bucket_count) if RESPONSE is not None: RESPONSE.redirect('%s/manage_editForm?manage_tabs_message=' 'Properties+changed' % self.absolute_url()) QueueCatalog.__init__ = __init__ QueueCatalog.getBypassQueue = getBypassQueue QueueCatalog.getBypassQueue__roles__ = ['Manager', 'Owner',] QueueCatalog.setBypassQueue = setBypassQueue QueueCatalog.setBypassQueue__roles__ = ['Manager', 'Owner',] QueueCatalog.catalog_object = catalog_object QueueCatalog._process_queue = _process_queue QueueCatalog.manage_editForm = PageTemplateFile('zmi/qc_edit', globals()) QueueCatalog.manage_edit = manage_edit