1 | import sets |
---|
2 | import logging |
---|
3 | from types import StringType |
---|
4 | from AccessControl.Permissions \ |
---|
5 | import manage_zcatalog_entries, view_management_screens |
---|
6 | from AccessControl import ModuleSecurityInfo, ClassSecurityInfo |
---|
7 | from AccessControl.SecurityInfo import ClassSecurityInformation |
---|
8 | from Products.PageTemplates.PageTemplateFile import PageTemplateFile |
---|
9 | from Products.QueueCatalog.CatalogEventQueue import CatalogEventQueue, EVENT_TYPES, ADDED_EVENTS |
---|
10 | from Products.QueueCatalog.CatalogEventQueue import ADDED, CHANGED, CHANGED_ADDED, REMOVED |
---|
11 | from Products.QueueCatalog.CatalogEventQueue import SAFE_POLICY, ALTERNATIVE_POLICY |
---|
12 | from Products.QueueCatalog.QueueCatalog import cataloged |
---|
13 | from Products.QueueCatalog.QueueCatalog import QueueCatalog |
---|
14 | from ZODB.POSException import ConflictError |
---|
15 | from ZEO.Exceptions import ClientDisconnected |
---|
16 | logger = logging.getLogger('event.QueueCatalog') |
---|
17 | security = ClassSecurityInformation() |
---|
18 | |
---|
19 | def __init__(self, |
---|
20 | buckets=1009, |
---|
21 | conflict_policy=SAFE_POLICY, |
---|
22 | bypass = False, |
---|
23 | location = "portal_catalog_real"): |
---|
24 | self._buckets = buckets |
---|
25 | self._conflict_policy = conflict_policy |
---|
26 | self._clearQueues() |
---|
27 | self._bypass = bypass |
---|
28 | self._location = location |
---|
29 | |
---|
30 | security.declareProtected(view_management_screens, 'getBypassQueue') |
---|
31 | def getBypassQueue(self): |
---|
32 | "get _by_pass" |
---|
33 | if not hasattr(self,"_bypass"): |
---|
34 | self._bypass = False |
---|
35 | return getattr(self,'_bypass',False) |
---|
36 | |
---|
37 | security.declareProtected(view_management_screens, 'setBypassQueue') |
---|
38 | def setBypassQueue(self, bypass=False): |
---|
39 | "set _bypass" |
---|
40 | self._bypass = bypass |
---|
41 | |
---|
42 | def catalog_object(self, obj, uid=None, idxs=None, update_metadata=1): |
---|
43 | # update_metadata=0 is ignored if the queued catalog is set to |
---|
44 | # update metadata during queue processing, rather than immediately |
---|
45 | |
---|
46 | # similarly, limiting the idxs only limits the immediate indexes. If |
---|
47 | # any work needs to be done in the queue processing, it will all be |
---|
48 | # done: we have not implemented partial indexing during queue |
---|
49 | # processing. The only way to avoid any of it is to avoid all of it |
---|
50 | # (i.e., update metadata immediately and don't have any indexes to |
---|
51 | # update on the queued side). |
---|
52 | |
---|
53 | # Make sure the current context is allowed to do this: |
---|
54 | catalog_object = self.getZCatalog('catalog_object') |
---|
55 | |
---|
56 | if uid is None: |
---|
57 | uid = '/'.join(obj.getPhysicalPath()) |
---|
58 | elif not isinstance(uid, StringType): |
---|
59 | uid = '/'.join(uid) |
---|
60 | |
---|
61 | catalog = self.getZCatalog() |
---|
62 | cat_indexes = sets.Set(catalog.indexes()) |
---|
63 | immediate_indexes = sets.Set(self._immediate_indexes) |
---|
64 | cat_indexes -= immediate_indexes |
---|
65 | |
---|
66 | # The ZCatalog API doesn't allow us to distinguish between |
---|
67 | # adds and updates, so we have to try to figure this out |
---|
68 | # ourselves. |
---|
69 | |
---|
70 | # There's a risk of a race here. What if there is a previously |
---|
71 | # unprocessed add event? If so, then this should be a changed |
---|
72 | # event. If we undo this transaction later, we'll generate a |
---|
73 | # remove event, when we should generate an add changed event. |
---|
74 | # To avoid this, we need to make sure we see consistent values |
---|
75 | # of the event queue. We also need to avoid resolving |
---|
76 | # (non-undo) conflicts of add events. This will slow things |
---|
77 | # down a bit, but adds should be relatively infrequent. |
---|
78 | bypass_queue = self.getBypassQueue() |
---|
79 | # Now, try to decide if the catalog has the uid (path). |
---|
80 | already_cataloged = cataloged(catalog, uid) |
---|
81 | if not already_cataloged: |
---|
82 | # Looks like we should add, but maybe there's already a |
---|
83 | # pending add event. We'd better check the event queue: |
---|
84 | already_cataloged = ( |
---|
85 | self._queues[hash(uid) % self._buckets].getEvent(uid) in |
---|
86 | ADDED_EVENTS) |
---|
87 | |
---|
88 | if idxs and already_cataloged: |
---|
89 | # if not already_cataloged, we index the whole thing |
---|
90 | idxs = sets.Set(idxs) |
---|
91 | immediate_indexes.intersection_update(idxs) |
---|
92 | cat_indexes.intersection_update(idxs) |
---|
93 | |
---|
94 | immediate_metadata = self.getImmediateMetadataUpdate() |
---|
95 | if cat_indexes or update_metadata and not immediate_metadata: |
---|
96 | if bypass_queue: |
---|
97 | catalog_object(obj, uid, cat_indexes) |
---|
98 | else: |
---|
99 | #import pdb;pdb.set_trace() |
---|
100 | self._update(uid, already_cataloged and CHANGED or ADDED) |
---|
101 | # Update some of the indexes immediately. |
---|
102 | if immediate_indexes: |
---|
103 | # Update some of the indexes immediately. |
---|
104 | catalog.catalog_object( |
---|
105 | obj, uid, immediate_indexes, |
---|
106 | update_metadata=update_metadata and immediate_metadata) |
---|
107 | elif update_metadata and immediate_metadata: |
---|
108 | # if it is added, no point in doing the metadata, and it will be |
---|
109 | # done in the queue process anyway |
---|
110 | catalog._catalog.updateMetadata(obj, uid) |
---|
111 | |
---|
112 | def _process_queue(self, queue, limit): |
---|
113 | """Process a single queue""" |
---|
114 | catalog = self.getZCatalog() |
---|
115 | |
---|
116 | if self.getProcessAllIndexes(): |
---|
117 | #idxs = None |
---|
118 | idxs = catalog.indexes() |
---|
119 | else: |
---|
120 | cat_indexes = sets.Set(catalog.indexes()) |
---|
121 | immediate_indexes = sets.Set(self._immediate_indexes) |
---|
122 | if not immediate_indexes or immediate_indexes==cat_indexes: |
---|
123 | idxs = catalog.indexes() # do all of 'em |
---|
124 | else: |
---|
125 | idxs = list(cat_indexes - immediate_indexes) |
---|
126 | events = queue.process(limit) |
---|
127 | count = 0 |
---|
128 | |
---|
129 | for uid, (t, event) in events.items(): |
---|
130 | if event is REMOVED: |
---|
131 | try: |
---|
132 | if cataloged(catalog, uid): |
---|
133 | catalog.uncatalog_object(uid) |
---|
134 | except (ConflictError, ClientDisconnected): |
---|
135 | logger.error('conflict-error uncataloging object', exc_info=True) |
---|
136 | except: |
---|
137 | logger.error('error uncataloging object', exc_info=True) |
---|
138 | else: |
---|
139 | # add or change |
---|
140 | if event is CHANGED and not cataloged(catalog, uid): |
---|
141 | continue |
---|
142 | # Note that the uid may be relative to the catalog. |
---|
143 | obj = catalog.unrestrictedTraverse(uid, None) |
---|
144 | if obj is not None: |
---|
145 | immediate_metadata = self.getImmediateMetadataUpdate() |
---|
146 | try: |
---|
147 | catalog.catalog_object( |
---|
148 | obj, uid, idxs=idxs, |
---|
149 | update_metadata=not immediate_metadata) |
---|
150 | except (ConflictError, ClientDisconnected): |
---|
151 | logger.error('conflict-error uncataloging object', exc_info=True) |
---|
152 | #raise |
---|
153 | except: |
---|
154 | logger.error('error cataloging object', exc_info=True) |
---|
155 | |
---|
156 | count = count + 1 |
---|
157 | |
---|
158 | return count |
---|
159 | def manage_edit(self, title='', location='', immediate_indexes=(), |
---|
160 | immediate_removal=0, bucket_count=0, immediate_metadata=0, |
---|
161 | all_indexes=0, conflict_policy=SAFE_POLICY, RESPONSE=None, |
---|
162 | enable_bypass=0): |
---|
163 | """ Edit the instance """ |
---|
164 | self.title = title |
---|
165 | self.setLocation(location or None) |
---|
166 | self.setImmediateIndexes(immediate_indexes) |
---|
167 | self.setImmediateRemoval(immediate_removal) |
---|
168 | self.setImmediateMetadataUpdate(immediate_metadata) |
---|
169 | self.setProcessAllIndexes(all_indexes) |
---|
170 | self.setConflictPolicy(conflict_policy) |
---|
171 | self.setBypassQueue(enable_bypass) |
---|
172 | if bucket_count: |
---|
173 | bucket_count = int(bucket_count) |
---|
174 | if bucket_count != self.getBucketCount(): |
---|
175 | self.setBucketCount(bucket_count) |
---|
176 | |
---|
177 | if RESPONSE is not None: |
---|
178 | RESPONSE.redirect('%s/manage_editForm?manage_tabs_message=' |
---|
179 | 'Properties+changed' % self.absolute_url()) |
---|
180 | |
---|
181 | QueueCatalog.__init__ = __init__ |
---|
182 | QueueCatalog.getBypassQueue = getBypassQueue |
---|
183 | QueueCatalog.getBypassQueue__roles__ = ['Manager', 'Owner',] |
---|
184 | QueueCatalog.setBypassQueue = setBypassQueue |
---|
185 | QueueCatalog.setBypassQueue__roles__ = ['Manager', 'Owner',] |
---|
186 | QueueCatalog.catalog_object = catalog_object |
---|
187 | QueueCatalog._process_queue = _process_queue |
---|
188 | QueueCatalog.manage_editForm = PageTemplateFile('zmi/qc_edit', globals()) |
---|
189 | QueueCatalog.manage_edit = manage_edit |
---|