[1930] | 1 | import sets |
---|
[2074] | 2 | import logging |
---|
[2259] | 3 | from types import StringType |
---|
| 4 | from AccessControl.Permissions \ |
---|
| 5 | import manage_zcatalog_entries, view_management_screens |
---|
| 6 | from AccessControl import ModuleSecurityInfo, ClassSecurityInfo |
---|
| 7 | from AccessControl.SecurityInfo import ClassSecurityInformation |
---|
| 8 | from Products.PageTemplates.PageTemplateFile import PageTemplateFile |
---|
[1934] | 9 | from Products.QueueCatalog.CatalogEventQueue import CatalogEventQueue, EVENT_TYPES, ADDED_EVENTS |
---|
| 10 | from Products.QueueCatalog.CatalogEventQueue import ADDED, CHANGED, CHANGED_ADDED, REMOVED |
---|
| 11 | from Products.QueueCatalog.CatalogEventQueue import SAFE_POLICY, ALTERNATIVE_POLICY |
---|
| 12 | from Products.QueueCatalog.QueueCatalog import cataloged |
---|
[2266] | 13 | from Products.QueueCatalog.QueueCatalog import QueueCatalog |
---|
[2073] | 14 | from ZODB.POSException import ConflictError |
---|
| 15 | from ZEO.Exceptions import ClientDisconnected |
---|
[2074] | 16 | logger = logging.getLogger('event.QueueCatalog') |
---|
[2259] | 17 | security = ClassSecurityInformation() |
---|
[1930] | 18 | |
---|
[2266] | 19 | def __init__(self, |
---|
| 20 | buckets=1009, |
---|
[2235] | 21 | conflict_policy=SAFE_POLICY, |
---|
[2259] | 22 | bypass = False, |
---|
[2235] | 23 | location = "portal_catalog_real"): |
---|
| 24 | self._buckets = buckets |
---|
| 25 | self._conflict_policy = conflict_policy |
---|
| 26 | self._clearQueues() |
---|
[2259] | 27 | self._bypass = bypass |
---|
[2235] | 28 | self._location = location |
---|
[2259] | 29 | |
---|
| 30 | security.declareProtected(view_management_screens, 'getBypassQueue') |
---|
| 31 | def getBypassQueue(self): |
---|
| 32 | "get _by_pass" |
---|
| 33 | if not hasattr(self,"_bypass"): |
---|
| 34 | self._bypass = False |
---|
[2260] | 35 | return getattr(self,'_bypass',False) |
---|
[2259] | 36 | |
---|
| 37 | security.declareProtected(view_management_screens, 'setBypassQueue') |
---|
| 38 | def setBypassQueue(self, bypass=False): |
---|
| 39 | "set _bypass" |
---|
| 40 | self._bypass = bypass |
---|
| 41 | |
---|
| 42 | def catalog_object(self, obj, uid=None, idxs=None, update_metadata=1): |
---|
| 43 | # update_metadata=0 is ignored if the queued catalog is set to |
---|
| 44 | # update metadata during queue processing, rather than immediately |
---|
| 45 | |
---|
[2266] | 46 | # similarly, limiting the idxs only limits the immediate indexes. If |
---|
[2259] | 47 | # any work needs to be done in the queue processing, it will all be |
---|
[2266] | 48 | # done: we have not implemented partial indexing during queue |
---|
| 49 | # processing. The only way to avoid any of it is to avoid all of it |
---|
| 50 | # (i.e., update metadata immediately and don't have any indexes to |
---|
[2259] | 51 | # update on the queued side). |
---|
| 52 | |
---|
| 53 | # Make sure the current context is allowed to do this: |
---|
| 54 | catalog_object = self.getZCatalog('catalog_object') |
---|
| 55 | |
---|
| 56 | if uid is None: |
---|
| 57 | uid = '/'.join(obj.getPhysicalPath()) |
---|
| 58 | elif not isinstance(uid, StringType): |
---|
| 59 | uid = '/'.join(uid) |
---|
| 60 | |
---|
| 61 | catalog = self.getZCatalog() |
---|
| 62 | cat_indexes = sets.Set(catalog.indexes()) |
---|
| 63 | immediate_indexes = sets.Set(self._immediate_indexes) |
---|
| 64 | cat_indexes -= immediate_indexes |
---|
| 65 | |
---|
| 66 | # The ZCatalog API doesn't allow us to distinguish between |
---|
| 67 | # adds and updates, so we have to try to figure this out |
---|
| 68 | # ourselves. |
---|
| 69 | |
---|
| 70 | # There's a risk of a race here. What if there is a previously |
---|
| 71 | # unprocessed add event? If so, then this should be a changed |
---|
| 72 | # event. If we undo this transaction later, we'll generate a |
---|
| 73 | # remove event, when we should generate an add changed event. |
---|
| 74 | # To avoid this, we need to make sure we see consistent values |
---|
| 75 | # of the event queue. We also need to avoid resolving |
---|
| 76 | # (non-undo) conflicts of add events. This will slow things |
---|
| 77 | # down a bit, but adds should be relatively infrequent. |
---|
[2266] | 78 | bypass_queue = self.getBypassQueue() |
---|
[2259] | 79 | # Now, try to decide if the catalog has the uid (path). |
---|
| 80 | already_cataloged = cataloged(catalog, uid) |
---|
| 81 | if not already_cataloged: |
---|
| 82 | # Looks like we should add, but maybe there's already a |
---|
| 83 | # pending add event. We'd better check the event queue: |
---|
| 84 | already_cataloged = ( |
---|
| 85 | self._queues[hash(uid) % self._buckets].getEvent(uid) in |
---|
| 86 | ADDED_EVENTS) |
---|
[2266] | 87 | |
---|
[2259] | 88 | if idxs and already_cataloged: |
---|
| 89 | # if not already_cataloged, we index the whole thing |
---|
| 90 | idxs = sets.Set(idxs) |
---|
| 91 | immediate_indexes.intersection_update(idxs) |
---|
| 92 | cat_indexes.intersection_update(idxs) |
---|
| 93 | |
---|
| 94 | immediate_metadata = self.getImmediateMetadataUpdate() |
---|
| 95 | if cat_indexes or update_metadata and not immediate_metadata: |
---|
| 96 | if bypass_queue: |
---|
| 97 | catalog_object(obj, uid, cat_indexes) |
---|
| 98 | else: |
---|
[2264] | 99 | #import pdb;pdb.set_trace() |
---|
[2259] | 100 | self._update(uid, already_cataloged and CHANGED or ADDED) |
---|
| 101 | # Update some of the indexes immediately. |
---|
[2266] | 102 | if immediate_indexes: |
---|
[2259] | 103 | # Update some of the indexes immediately. |
---|
| 104 | catalog.catalog_object( |
---|
| 105 | obj, uid, immediate_indexes, |
---|
| 106 | update_metadata=update_metadata and immediate_metadata) |
---|
| 107 | elif update_metadata and immediate_metadata: |
---|
| 108 | # if it is added, no point in doing the metadata, and it will be |
---|
| 109 | # done in the queue process anyway |
---|
| 110 | catalog._catalog.updateMetadata(obj, uid) |
---|
| 111 | |
---|
[1926] | 112 | def _process_queue(self, queue, limit): |
---|
| 113 | """Process a single queue""" |
---|
| 114 | catalog = self.getZCatalog() |
---|
| 115 | |
---|
| 116 | if self.getProcessAllIndexes(): |
---|
| 117 | #idxs = None |
---|
| 118 | idxs = catalog.indexes() |
---|
| 119 | else: |
---|
| 120 | cat_indexes = sets.Set(catalog.indexes()) |
---|
| 121 | immediate_indexes = sets.Set(self._immediate_indexes) |
---|
| 122 | if not immediate_indexes or immediate_indexes==cat_indexes: |
---|
| 123 | idxs = catalog.indexes() # do all of 'em |
---|
| 124 | else: |
---|
| 125 | idxs = list(cat_indexes - immediate_indexes) |
---|
| 126 | events = queue.process(limit) |
---|
| 127 | count = 0 |
---|
| 128 | |
---|
| 129 | for uid, (t, event) in events.items(): |
---|
| 130 | if event is REMOVED: |
---|
| 131 | try: |
---|
| 132 | if cataloged(catalog, uid): |
---|
| 133 | catalog.uncatalog_object(uid) |
---|
[2079] | 134 | except (ConflictError, ClientDisconnected): |
---|
| 135 | logger.error('conflict-error uncataloging object', exc_info=True) |
---|
[1926] | 136 | except: |
---|
| 137 | logger.error('error uncataloging object', exc_info=True) |
---|
| 138 | else: |
---|
| 139 | # add or change |
---|
| 140 | if event is CHANGED and not cataloged(catalog, uid): |
---|
| 141 | continue |
---|
| 142 | # Note that the uid may be relative to the catalog. |
---|
| 143 | obj = catalog.unrestrictedTraverse(uid, None) |
---|
| 144 | if obj is not None: |
---|
| 145 | immediate_metadata = self.getImmediateMetadataUpdate() |
---|
| 146 | try: |
---|
| 147 | catalog.catalog_object( |
---|
| 148 | obj, uid, idxs=idxs, |
---|
| 149 | update_metadata=not immediate_metadata) |
---|
[2079] | 150 | except (ConflictError, ClientDisconnected): |
---|
| 151 | logger.error('conflict-error uncataloging object', exc_info=True) |
---|
| 152 | #raise |
---|
[1926] | 153 | except: |
---|
| 154 | logger.error('error cataloging object', exc_info=True) |
---|
| 155 | |
---|
| 156 | count = count + 1 |
---|
| 157 | |
---|
| 158 | return count |
---|
[2259] | 159 | def manage_edit(self, title='', location='', immediate_indexes=(), |
---|
| 160 | immediate_removal=0, bucket_count=0, immediate_metadata=0, |
---|
| 161 | all_indexes=0, conflict_policy=SAFE_POLICY, RESPONSE=None, |
---|
| 162 | enable_bypass=0): |
---|
| 163 | """ Edit the instance """ |
---|
| 164 | self.title = title |
---|
| 165 | self.setLocation(location or None) |
---|
| 166 | self.setImmediateIndexes(immediate_indexes) |
---|
| 167 | self.setImmediateRemoval(immediate_removal) |
---|
| 168 | self.setImmediateMetadataUpdate(immediate_metadata) |
---|
| 169 | self.setProcessAllIndexes(all_indexes) |
---|
| 170 | self.setConflictPolicy(conflict_policy) |
---|
| 171 | self.setBypassQueue(enable_bypass) |
---|
| 172 | if bucket_count: |
---|
| 173 | bucket_count = int(bucket_count) |
---|
| 174 | if bucket_count != self.getBucketCount(): |
---|
| 175 | self.setBucketCount(bucket_count) |
---|
| 176 | |
---|
| 177 | if RESPONSE is not None: |
---|
| 178 | RESPONSE.redirect('%s/manage_editForm?manage_tabs_message=' |
---|
| 179 | 'Properties+changed' % self.absolute_url()) |
---|
| 180 | |
---|
[2235] | 181 | QueueCatalog.__init__ = __init__ |
---|
[2259] | 182 | QueueCatalog.getBypassQueue = getBypassQueue |
---|
| 183 | QueueCatalog.getBypassQueue__roles__ = ['Manager', 'Owner',] |
---|
| 184 | QueueCatalog.setBypassQueue = setBypassQueue |
---|
| 185 | QueueCatalog.setBypassQueue__roles__ = ['Manager', 'Owner',] |
---|
| 186 | QueueCatalog.catalog_object = catalog_object |
---|
[1926] | 187 | QueueCatalog._process_queue = _process_queue |
---|
[2259] | 188 | QueueCatalog.manage_editForm = PageTemplateFile('zmi/qc_edit', globals()) |
---|
| 189 | QueueCatalog.manage_edit = manage_edit |
---|