1 | :mod:`waeup.utils.batching` -- Batch processing |
---|
2 | *********************************************** |
---|
3 | |
---|
4 | Batch processing is much more than pure data import. |
---|
5 | |
---|
6 | :test-layer: functional |
---|
7 | |
---|
8 | Overview |
---|
9 | ======== |
---|
10 | |
---|
11 | Basically, it means processing CSV files in order to mass-create, |
---|
12 | mass-remove, or mass-update data. |
---|
13 | |
---|
14 | So you can feed CSV files to importers or processors, that are part of |
---|
15 | the batch-processing mechanism. |
---|
16 | |
---|
17 | Importers/Processors |
---|
18 | -------------------- |
---|
19 | |
---|
20 | Each CSV file processor |
---|
21 | |
---|
22 | * accepts a single data type identified by an interface. |
---|
23 | |
---|
24 | * knows about the places inside a site (University) where to store, |
---|
25 | remove or update the data. |
---|
26 | |
---|
27 | * can check headers before processing data. |
---|
28 | |
---|
29 | * supports the mode 'create', 'update', 'remove'. |
---|
30 | |
---|
31 | * creates log entries (optional) |
---|
32 | |
---|
33 | * creates csv files containing successful and not-successful processed |
---|
34 | data respectively. |
---|
35 | |
---|
36 | Output |
---|
37 | ------ |
---|
38 | |
---|
39 | The results of processing are written to loggers, if a logger was |
---|
40 | given. Beside this new CSV files are created during processing: |
---|
41 | |
---|
42 | * a pending CSV file, containing datasets that could not be processed |
---|
43 | |
---|
44 | * a finished CSV file, containing datasets successfully processed. |
---|
45 | |
---|
46 | The pending file is not created if everything works fine. The |
---|
47 | respective path returned in that case is ``None``. |
---|
48 | |
---|
49 | The pending file (if created) is a CSV file that contains the failed |
---|
50 | rows appended by a column ``--ERRROR--`` in which the reasons for |
---|
51 | processing failures are listed. |
---|
52 | |
---|
53 | The complete paths of these files are returned. They will be in a |
---|
54 | temporary directory created only for this purpose. It is the caller's |
---|
55 | responsibility to remove the temporay directories afterwards (the |
---|
56 | datacenters distProcessedFiles() method takes care for that). |
---|
57 | |
---|
58 | It looks like this:: |
---|
59 | |
---|
60 | -----+ +---------+ |
---|
61 | / | | | +------+ |
---|
62 | | .csv +----->|Batch- | | | |
---|
63 | | | |processor+----changes-->| ZODB | |
---|
64 | | +------+ | | | | |
---|
65 | +--| | | + +------+ |
---|
66 | | Mode +-->| | -------+ |
---|
67 | | | | +----outputs-+-> / | |
---|
68 | | +----+->+---------+ | |.pending| |
---|
69 | +--|Log | ^ | | | |
---|
70 | +----+ | | +--------+ |
---|
71 | +-----++ v |
---|
72 | |Inter-| ----------+ |
---|
73 | |face | / | |
---|
74 | +------+ | .finished | |
---|
75 | | | |
---|
76 | +-----------+ |
---|
77 | |
---|
78 | |
---|
79 | Creating a batch processor |
---|
80 | ========================== |
---|
81 | |
---|
82 | We create an own batch processor for an own datatype. This datatype |
---|
83 | must be based on an interface that the batcher can use for converting |
---|
84 | data. |
---|
85 | |
---|
86 | Founding Stoneville |
---|
87 | ------------------- |
---|
88 | |
---|
89 | We start with the interface: |
---|
90 | |
---|
91 | >>> from zope.interface import Interface |
---|
92 | >>> from zope import schema |
---|
93 | >>> class ICave(Interface): |
---|
94 | ... """A cave.""" |
---|
95 | ... name = schema.TextLine( |
---|
96 | ... title = u'Cave name', |
---|
97 | ... default = u'Unnamed', |
---|
98 | ... required = True) |
---|
99 | ... dinoports = schema.Int( |
---|
100 | ... title = u'Number of DinoPorts (tm)', |
---|
101 | ... required = False, |
---|
102 | ... default = 1) |
---|
103 | ... owner = schema.TextLine( |
---|
104 | ... title = u'Owner name', |
---|
105 | ... required = True, |
---|
106 | ... missing_value = 'Fred Estates Inc.') |
---|
107 | ... taxpayer = schema.Bool( |
---|
108 | ... title = u'Payes taxes', |
---|
109 | ... required = True, |
---|
110 | ... default = False) |
---|
111 | |
---|
112 | Now a class that implements this interface: |
---|
113 | |
---|
114 | >>> import grok |
---|
115 | >>> class Cave(object): |
---|
116 | ... grok.implements(ICave) |
---|
117 | ... def __init__(self, name=u'Unnamed', dinoports=2, |
---|
118 | ... owner='Fred Estates Inc.', taxpayer=False): |
---|
119 | ... self.name = name |
---|
120 | ... self.dinoports = 2 |
---|
121 | ... self.owner = owner |
---|
122 | ... self.taxpayer = taxpayer |
---|
123 | |
---|
124 | We also provide a factory for caves. Strictly speaking, this not |
---|
125 | necessary but makes the batch processor we create afterwards, better |
---|
126 | understandable. |
---|
127 | |
---|
128 | >>> from zope.component import getGlobalSiteManager |
---|
129 | >>> from zope.component.factory import Factory |
---|
130 | >>> from zope.component.interfaces import IFactory |
---|
131 | >>> gsm = getGlobalSiteManager() |
---|
132 | >>> cave_maker = Factory(Cave, 'A cave', 'Buy caves here!') |
---|
133 | >>> gsm.registerUtility(cave_maker, IFactory, 'Lovely Cave') |
---|
134 | |
---|
135 | Now we can create caves using a factory: |
---|
136 | |
---|
137 | >>> from zope.component import createObject |
---|
138 | >>> createObject('Lovely Cave') |
---|
139 | <Cave object at 0x...> |
---|
140 | |
---|
141 | This is nice, but we still lack a place, where we can place all the |
---|
142 | lovely caves we want to sell. |
---|
143 | |
---|
144 | Furthermore, as a replacement for a real site, we define a place where |
---|
145 | all caves can be stored: Stoneville! This is a lovely place for |
---|
146 | upperclass cavemen (which are the only ones that can afford more than |
---|
147 | one dinoport). |
---|
148 | |
---|
149 | We found Stoneville: |
---|
150 | |
---|
151 | >>> stoneville = dict() |
---|
152 | |
---|
153 | Everything in place. |
---|
154 | |
---|
155 | Now, to improve local health conditions, imagine we want to populate |
---|
156 | Stoneville with lots of new happy dino-hunting natives that slept on |
---|
157 | the bare ground in former times and had no idea of |
---|
158 | bathrooms. Disgusting, isn't it? |
---|
159 | |
---|
160 | Lots of cavemen need lots of caves. |
---|
161 | |
---|
162 | Of course we can do something like: |
---|
163 | |
---|
164 | >>> cave1 = createObject('Lovely Cave') |
---|
165 | >>> cave1.name = "Fred's home" |
---|
166 | >>> cave1.owner = "Fred" |
---|
167 | >>> stoneville[cave1.name] = cave1 |
---|
168 | |
---|
169 | and Stoneville has exactly |
---|
170 | |
---|
171 | >>> len(stoneville) |
---|
172 | 1 |
---|
173 | |
---|
174 | inhabitant. But we don't want to do this for hundreds or thousands of |
---|
175 | citizens-to-be, do we? |
---|
176 | |
---|
177 | It is much easier to create a simple CSV list, where we put in all the |
---|
178 | data and let a batch processor do the job. |
---|
179 | |
---|
180 | The list is already here: |
---|
181 | |
---|
182 | >>> open('newcomers.csv', 'wb').write( |
---|
183 | ... """name,dinoports,owner,taxpayer |
---|
184 | ... Barneys Home,2,Barney,1 |
---|
185 | ... Wilmas Asylum,1,Wilma,1 |
---|
186 | ... Freds Dinoburgers,10,Fred,0 |
---|
187 | ... Joeys Drive-in,110,Joey,0 |
---|
188 | ... """) |
---|
189 | |
---|
190 | All we need, is a batch processor now. |
---|
191 | |
---|
192 | >>> from waeup.utils.batching import BatchProcessor |
---|
193 | >>> class CaveProcessor(BatchProcessor): |
---|
194 | ... util_name = 'caveprocessor' |
---|
195 | ... grok.name(util_name) |
---|
196 | ... name = 'Cave Processor' |
---|
197 | ... iface = ICave |
---|
198 | ... location_fields = ['name'] |
---|
199 | ... factory_name = 'Lovely Cave' |
---|
200 | ... |
---|
201 | ... def parentsExist(self, row, site): |
---|
202 | ... return True |
---|
203 | ... |
---|
204 | ... def getParent(self, row, site): |
---|
205 | ... return stoneville |
---|
206 | ... |
---|
207 | ... def entryExists(self, row, site): |
---|
208 | ... return row['name'] in stoneville.keys() |
---|
209 | ... |
---|
210 | ... def getEntry(self, row, site): |
---|
211 | ... if not self.entryExists(row, site): |
---|
212 | ... return None |
---|
213 | ... return stoneville[row['name']] |
---|
214 | ... |
---|
215 | ... def delEntry(self, row, site): |
---|
216 | ... del stoneville[row['name']] |
---|
217 | ... |
---|
218 | ... def addEntry(self, obj, row, site): |
---|
219 | ... stoneville[row['name']] = obj |
---|
220 | ... |
---|
221 | ... def updateEntry(self, obj, row, site): |
---|
222 | ... for key, value in row.items(): |
---|
223 | ... setattr(obj, key, value) |
---|
224 | |
---|
225 | If we also want the results being logged, we must provide a logger |
---|
226 | (this is optional): |
---|
227 | |
---|
228 | >>> import logging |
---|
229 | >>> logger = logging.getLogger('stoneville') |
---|
230 | >>> logger.setLevel(logging.DEBUG) |
---|
231 | >>> logger.propagate = False |
---|
232 | >>> handler = logging.FileHandler('stoneville.log', 'w') |
---|
233 | >>> logger.addHandler(handler) |
---|
234 | |
---|
235 | Create the fellows: |
---|
236 | |
---|
237 | >>> processor = CaveProcessor() |
---|
238 | >>> result = processor.doImport('newcomers.csv', |
---|
239 | ... ['name', 'dinoports', 'owner', 'taxpayer'], |
---|
240 | ... mode='create', user='Bob', logger=logger) |
---|
241 | >>> result |
---|
242 | (4, 0, '/.../newcomers.finished.csv', None) |
---|
243 | |
---|
244 | The result means: four entries were processed and no warnings |
---|
245 | occured. Furthermore we get filepath to a CSV file with successfully |
---|
246 | processed entries and a filepath to a CSV file with erraneous entries. |
---|
247 | As everything went well, the latter is ``None``. Let's check: |
---|
248 | |
---|
249 | >>> sorted(stoneville.keys()) |
---|
250 | [u'Barneys Home', ..., u'Wilmas Asylum'] |
---|
251 | |
---|
252 | The values of the Cave instances have correct type: |
---|
253 | |
---|
254 | >>> barney = stoneville['Barneys Home'] |
---|
255 | >>> barney.dinoports |
---|
256 | 2 |
---|
257 | |
---|
258 | which is a number, not a string. |
---|
259 | |
---|
260 | Apparently, when calling the processor, we gave some more info than |
---|
261 | only the CSV filepath. What does it all mean? |
---|
262 | |
---|
263 | While the first argument is the path to the CSV file, we also have to |
---|
264 | give an ordered list of headernames. These replace the header field |
---|
265 | names that are actually in the file. This way we can override faulty |
---|
266 | headers. |
---|
267 | |
---|
268 | The ``mode`` paramter tells what kind of operation we want to perform: |
---|
269 | ``create``, ``update``, or ``remove`` data. |
---|
270 | |
---|
271 | The ``user`` parameter finally is optional and only used for logging. |
---|
272 | |
---|
273 | We can, by the way, see the results of our run in a logfile if we |
---|
274 | provided a logger during the call: |
---|
275 | |
---|
276 | >>> print open('stoneville.log').read() |
---|
277 | -------------------- |
---|
278 | Bob: Batch processing finished: OK |
---|
279 | Bob: Source: newcomers.csv |
---|
280 | Bob: Mode: create |
---|
281 | Bob: User: Bob |
---|
282 | Bob: Processing time: ... s (... s/item) |
---|
283 | Bob: Processed: 4 lines (4 successful/ 0 failed) |
---|
284 | -------------------- |
---|
285 | |
---|
286 | We cleanup the temporay dir created by doImport(): |
---|
287 | |
---|
288 | >>> import shutil |
---|
289 | >>> import os |
---|
290 | >>> shutil.rmtree(os.path.dirname(result[2])) |
---|
291 | |
---|
292 | As we can see, the processing was successful. Otherwise, all problems |
---|
293 | could be read here as we can see, if we do the same operation again: |
---|
294 | |
---|
295 | >>> result = processor.doImport('newcomers.csv', |
---|
296 | ... ['name', 'dinoports', 'owner', 'taxpayer'], |
---|
297 | ... mode='create', user='Bob', logger=logger) |
---|
298 | >>> result |
---|
299 | (4, 4, '/.../newcomers.finished.csv', '/.../newcomers.pending.csv') |
---|
300 | |
---|
301 | This time we also get a path to a .pending file. |
---|
302 | |
---|
303 | The log file will tell us this in more detail: |
---|
304 | |
---|
305 | >>> print open('stoneville.log').read() |
---|
306 | -------------------- |
---|
307 | ... |
---|
308 | -------------------- |
---|
309 | Bob: Batch processing finished: FAILED |
---|
310 | Bob: Source: newcomers.csv |
---|
311 | Bob: Mode: create |
---|
312 | Bob: User: Bob |
---|
313 | Bob: Failed datasets: newcomers.pending.csv |
---|
314 | Bob: Processing time: ... s (... s/item) |
---|
315 | Bob: Processed: 4 lines (0 successful/ 4 failed) |
---|
316 | -------------------- |
---|
317 | |
---|
318 | This time a new file was created, which keeps all the rows we could not |
---|
319 | process and an additional column with error messages: |
---|
320 | |
---|
321 | >>> print open(result[3]).read() |
---|
322 | owner,name,taxpayer,dinoports,--ERRORS-- |
---|
323 | Barney,Barneys Home,1,2,This object already exists. Skipping. |
---|
324 | Wilma,Wilmas Asylum,1,1,This object already exists. Skipping. |
---|
325 | Fred,Freds Dinoburgers,0,10,This object already exists. Skipping. |
---|
326 | Joey,Joeys Drive-in,0,110,This object already exists. Skipping. |
---|
327 | |
---|
328 | This way we can correct the faulty entries and afterwards retry without |
---|
329 | having the already processed rows in the way. |
---|
330 | |
---|
331 | We also notice, that the values of the taxpayer column are returned as |
---|
332 | in the input file. There we wrote '1' for ``True`` and '0' for |
---|
333 | ``False`` (which is accepted by the converters). |
---|
334 | |
---|
335 | Clean up: |
---|
336 | |
---|
337 | >>> shutil.rmtree(os.path.dirname(result[2])) |
---|
338 | |
---|
339 | Updating entries |
---|
340 | ---------------- |
---|
341 | |
---|
342 | To update entries, we just call the batchprocessor in a different |
---|
343 | mode: |
---|
344 | |
---|
345 | >>> result = processor.doImport('newcomers.csv', ['name', |
---|
346 | ... 'dinoports', 'owner'], |
---|
347 | ... mode='update', user='Bob') |
---|
348 | >>> result |
---|
349 | (4, 0, '...', None) |
---|
350 | |
---|
351 | Now we want to tell, that Wilma got an extra port for her second dino: |
---|
352 | |
---|
353 | >>> open('newcomers.csv', 'wb').write( |
---|
354 | ... """name,dinoports,owner |
---|
355 | ... Wilmas Asylum,2,Wilma |
---|
356 | ... """) |
---|
357 | |
---|
358 | >>> wilma = stoneville['Wilmas Asylum'] |
---|
359 | >>> wilma.dinoports |
---|
360 | 1 |
---|
361 | |
---|
362 | Clean up: |
---|
363 | |
---|
364 | >>> shutil.rmtree(os.path.dirname(result[2])) |
---|
365 | |
---|
366 | |
---|
367 | We start the processor: |
---|
368 | |
---|
369 | >>> result = processor.doImport('newcomers.csv', ['name', |
---|
370 | ... 'dinoports', 'owner'], mode='update', user='Bob') |
---|
371 | >>> result |
---|
372 | (1, 0, '...', None) |
---|
373 | |
---|
374 | >>> wilma = stoneville['Wilmas Asylum'] |
---|
375 | >>> wilma.dinoports |
---|
376 | 2 |
---|
377 | |
---|
378 | Wilma's number of dinoports raised. |
---|
379 | |
---|
380 | Clean up: |
---|
381 | |
---|
382 | >>> shutil.rmtree(os.path.dirname(result[2])) |
---|
383 | |
---|
384 | |
---|
385 | If we try to update an unexisting entry, an error occurs: |
---|
386 | |
---|
387 | >>> open('newcomers.csv', 'wb').write( |
---|
388 | ... """name,dinoports,owner |
---|
389 | ... NOT-WILMAS-ASYLUM,2,Wilma |
---|
390 | ... """) |
---|
391 | |
---|
392 | >>> result = processor.doImport('newcomers.csv', ['name', |
---|
393 | ... 'dinoports', 'owner'], |
---|
394 | ... mode='update', user='Bob') |
---|
395 | >>> result |
---|
396 | (1, 1, '/.../newcomers.finished.csv', '/.../newcomers.pending.csv') |
---|
397 | |
---|
398 | Clean up: |
---|
399 | |
---|
400 | >>> shutil.rmtree(os.path.dirname(result[2])) |
---|
401 | |
---|
402 | |
---|
403 | Also invalid values will be spotted: |
---|
404 | |
---|
405 | >>> open('newcomers.csv', 'wb').write( |
---|
406 | ... """name,dinoports,owner |
---|
407 | ... Wilmas Asylum,NOT-A-NUMBER,Wilma |
---|
408 | ... """) |
---|
409 | |
---|
410 | >>> result = processor.doImport('newcomers.csv', ['name', |
---|
411 | ... 'dinoports', 'owner'], |
---|
412 | ... mode='update', user='Bob') |
---|
413 | >>> result |
---|
414 | (1, 1, '...', '...') |
---|
415 | |
---|
416 | Clean up: |
---|
417 | |
---|
418 | >>> shutil.rmtree(os.path.dirname(result[2])) |
---|
419 | |
---|
420 | |
---|
421 | We can also update only some cols, leaving some out. We skip the |
---|
422 | 'dinoports' column in the next run: |
---|
423 | |
---|
424 | >>> open('newcomers.csv', 'wb').write( |
---|
425 | ... """name,owner |
---|
426 | ... Wilmas Asylum,Barney |
---|
427 | ... """) |
---|
428 | |
---|
429 | >>> result = processor.doImport('newcomers.csv', ['name', 'owner'], |
---|
430 | ... mode='update', user='Bob') |
---|
431 | >>> result |
---|
432 | (1, 0, '...', None) |
---|
433 | |
---|
434 | >>> wilma.owner |
---|
435 | u'Barney' |
---|
436 | |
---|
437 | Clean up: |
---|
438 | |
---|
439 | >>> shutil.rmtree(os.path.dirname(result[2])) |
---|
440 | |
---|
441 | |
---|
442 | We can however, not leave out the 'location field' ('name' in our |
---|
443 | case), as this one tells us which entry to update: |
---|
444 | |
---|
445 | >>> open('newcomers.csv', 'wb').write( |
---|
446 | ... """name,dinoports,owner |
---|
447 | ... 2,Wilma |
---|
448 | ... """) |
---|
449 | |
---|
450 | >>> processor.doImport('newcomers.csv', ['dinoports', 'owner'], |
---|
451 | ... mode='update', user='Bob') |
---|
452 | Traceback (most recent call last): |
---|
453 | ... |
---|
454 | FatalCSVError: Need at least columns 'name' for import! |
---|
455 | |
---|
456 | This time we get even an exception! |
---|
457 | |
---|
458 | We can tell to set dinoports to ``None`` although this is not a |
---|
459 | number, as we declared the field not required in the interface: |
---|
460 | |
---|
461 | >>> open('newcomers.csv', 'wb').write( |
---|
462 | ... """name,dinoports,owner |
---|
463 | ... "Wilmas Asylum",,"Wilma" |
---|
464 | ... """) |
---|
465 | |
---|
466 | >>> result = processor.doImport('newcomers.csv', ['name', |
---|
467 | ... 'dinoports', 'owner'], |
---|
468 | ... mode='update', user='Bob') |
---|
469 | >>> result |
---|
470 | (1, 0, '...', None) |
---|
471 | |
---|
472 | >>> wilma.dinoports is None |
---|
473 | True |
---|
474 | |
---|
475 | Clean up: |
---|
476 | |
---|
477 | >>> shutil.rmtree(os.path.dirname(result[2])) |
---|
478 | |
---|
479 | Generally, empty strings are considered as ``None``: |
---|
480 | |
---|
481 | >>> open('newcomers.csv', 'wb').write( |
---|
482 | ... """name,dinoports,owner |
---|
483 | ... "Wilmas Asylum","","Wilma" |
---|
484 | ... """) |
---|
485 | |
---|
486 | >>> result = processor.doImport('newcomers.csv', ['name', |
---|
487 | ... 'dinoports', 'owner'], |
---|
488 | ... mode='update', user='Bob') |
---|
489 | >>> result |
---|
490 | (1, 0, '...', None) |
---|
491 | |
---|
492 | >>> wilma.dinoports is None |
---|
493 | True |
---|
494 | |
---|
495 | Clean up: |
---|
496 | |
---|
497 | >>> shutil.rmtree(os.path.dirname(result[2])) |
---|
498 | |
---|
499 | |
---|
500 | Removing entries |
---|
501 | ---------------- |
---|
502 | |
---|
503 | In 'remove' mode we can delete entries. Here validity of values in |
---|
504 | non-location fields doesn't matter because those fields are ignored. |
---|
505 | |
---|
506 | >>> open('newcomers.csv', 'wb').write( |
---|
507 | ... """name,dinoports,owner |
---|
508 | ... "Wilmas Asylum","ILLEGAL-NUMBER","" |
---|
509 | ... """) |
---|
510 | |
---|
511 | >>> result = processor.doImport('newcomers.csv', ['name', |
---|
512 | ... 'dinoports', 'owner'], |
---|
513 | ... mode='remove', user='Bob') |
---|
514 | >>> result |
---|
515 | (1, 0, '...', None) |
---|
516 | |
---|
517 | >>> sorted(stoneville.keys()) |
---|
518 | [u'Barneys Home', "Fred's home", u'Freds Dinoburgers', u'Joeys Drive-in'] |
---|
519 | |
---|
520 | Oops! Wilma is gone. |
---|
521 | |
---|
522 | Clean up: |
---|
523 | |
---|
524 | >>> shutil.rmtree(os.path.dirname(result[2])) |
---|
525 | |
---|
526 | |
---|
527 | Clean up: |
---|
528 | |
---|
529 | >>> import os |
---|
530 | >>> os.unlink('newcomers.csv') |
---|
531 | >>> os.unlink('stoneville.log') |
---|