1 | :mod:`waeup.kofa.utils.batching` -- Batch processing |
---|
2 | **************************************************** |
---|
3 | |
---|
4 | Batch processing is much more than pure data import. |
---|
5 | |
---|
6 | Overview |
---|
7 | ======== |
---|
8 | |
---|
9 | Basically, it means processing CSV files in order to mass-create, |
---|
10 | mass-remove, or mass-update data. |
---|
11 | |
---|
12 | So you can feed CSV files to processors, that are part of |
---|
13 | the batch-processing mechanism. |
---|
14 | |
---|
15 | Processors |
---|
16 | ---------- |
---|
17 | |
---|
18 | Each CSV file processor |
---|
19 | |
---|
20 | * accepts a single data type identified by an interface. |
---|
21 | |
---|
22 | * knows about the places inside a site (University) where to store, |
---|
23 | remove or update the data. |
---|
24 | |
---|
25 | * can check headers before processing data. |
---|
26 | |
---|
27 | * supports the mode 'create', 'update', 'remove'. |
---|
28 | |
---|
29 | * creates log entries (optional) |
---|
30 | |
---|
31 | * creates csv files containing successful and not-successful processed |
---|
32 | data respectively. |
---|
33 | |
---|
34 | Output |
---|
35 | ------ |
---|
36 | |
---|
37 | The results of processing are written to loggers, if a logger was |
---|
38 | given. Beside this new CSV files are created during processing: |
---|
39 | |
---|
40 | * a pending CSV file, containing datasets that could not be processed |
---|
41 | |
---|
42 | * a finished CSV file, containing datasets successfully processed. |
---|
43 | |
---|
44 | The pending file is not created if everything works fine. The |
---|
45 | respective path returned in that case is ``None``. |
---|
46 | |
---|
47 | The pending file (if created) is a CSV file that contains the failed |
---|
48 | rows appended by a column ``--ERRROR--`` in which the reasons for |
---|
49 | processing failures are listed. |
---|
50 | |
---|
51 | The complete paths of these files are returned. They will be in a |
---|
52 | temporary directory created only for this purpose. It is the caller's |
---|
53 | responsibility to remove the temporay directories afterwards (the |
---|
54 | datacenters distProcessedFiles() method takes care for that). |
---|
55 | |
---|
56 | It looks like this:: |
---|
57 | |
---|
58 | -----+ +---------+ |
---|
59 | / | | | +------+ |
---|
60 | | .csv +----->|Batch- | | | |
---|
61 | | | |processor+----changes-->| ZODB | |
---|
62 | | +------+ | | | | |
---|
63 | +--| | | + +------+ |
---|
64 | | Mode +-->| | -------+ |
---|
65 | | | | +----outputs-+-> / | |
---|
66 | | +----+->+---------+ | |.pending| |
---|
67 | +--|Log | ^ | | | |
---|
68 | +----+ | | +--------+ |
---|
69 | +-----++ v |
---|
70 | |Inter-| ----------+ |
---|
71 | |face | / | |
---|
72 | +------+ | .finished | |
---|
73 | | | |
---|
74 | +-----------+ |
---|
75 | |
---|
76 | |
---|
77 | Creating a batch processor |
---|
78 | ========================== |
---|
79 | |
---|
80 | We create an own batch processor for an own datatype. This datatype |
---|
81 | must be based on an interface that the batcher can use for converting |
---|
82 | data. |
---|
83 | |
---|
84 | Founding Stoneville |
---|
85 | ------------------- |
---|
86 | |
---|
87 | We start with the interface: |
---|
88 | |
---|
89 | >>> from zope.interface import Interface |
---|
90 | >>> from zope import schema |
---|
91 | >>> class ICave(Interface): |
---|
92 | ... """A cave.""" |
---|
93 | ... name = schema.TextLine( |
---|
94 | ... title = u'Cave name', |
---|
95 | ... default = u'Unnamed', |
---|
96 | ... required = True) |
---|
97 | ... dinoports = schema.Int( |
---|
98 | ... title = u'Number of DinoPorts (tm)', |
---|
99 | ... required = False, |
---|
100 | ... default = 1) |
---|
101 | ... owner = schema.TextLine( |
---|
102 | ... title = u'Owner name', |
---|
103 | ... required = True, |
---|
104 | ... missing_value = 'Fred Estates Inc.') |
---|
105 | ... taxpayer = schema.Bool( |
---|
106 | ... title = u'Payes taxes', |
---|
107 | ... required = True, |
---|
108 | ... default = False) |
---|
109 | |
---|
110 | Now a class that implements this interface: |
---|
111 | |
---|
112 | >>> import grok |
---|
113 | >>> class Cave(object): |
---|
114 | ... grok.implements(ICave) |
---|
115 | ... def __init__(self, name=u'Unnamed', dinoports=2, |
---|
116 | ... owner='Fred Estates Inc.', taxpayer=False): |
---|
117 | ... self.name = name |
---|
118 | ... self.dinoports = 2 |
---|
119 | ... self.owner = owner |
---|
120 | ... self.taxpayer = taxpayer |
---|
121 | |
---|
122 | We also provide a factory for caves. Strictly speaking, this not |
---|
123 | necessary but makes the batch processor we create afterwards, better |
---|
124 | understandable. |
---|
125 | |
---|
126 | >>> from zope.component import getGlobalSiteManager |
---|
127 | >>> from zope.component.factory import Factory |
---|
128 | >>> from zope.component.interfaces import IFactory |
---|
129 | >>> gsm = getGlobalSiteManager() |
---|
130 | >>> cave_maker = Factory(Cave, 'A cave', 'Buy caves here!') |
---|
131 | >>> gsm.registerUtility(cave_maker, IFactory, 'Lovely Cave') |
---|
132 | |
---|
133 | Now we can create caves using a factory: |
---|
134 | |
---|
135 | >>> from zope.component import createObject |
---|
136 | >>> createObject('Lovely Cave') |
---|
137 | <Cave object at 0x...> |
---|
138 | |
---|
139 | This is nice, but we still lack a place, where we can place all the |
---|
140 | lovely caves we want to sell. |
---|
141 | |
---|
142 | Furthermore, as a replacement for a real site, we define a place where |
---|
143 | all caves can be stored: Stoneville! This is a lovely place for |
---|
144 | upperclass cavemen (which are the only ones that can afford more than |
---|
145 | one dinoport). |
---|
146 | |
---|
147 | We found Stoneville: |
---|
148 | |
---|
149 | >>> stoneville = dict() |
---|
150 | |
---|
151 | Everything in place. |
---|
152 | |
---|
153 | Now, to improve local health conditions, imagine we want to populate |
---|
154 | Stoneville with lots of new happy dino-hunting natives that slept on |
---|
155 | the bare ground in former times and had no idea of |
---|
156 | bathrooms. Disgusting, isn't it? |
---|
157 | |
---|
158 | Lots of cavemen need lots of caves. |
---|
159 | |
---|
160 | Of course we can do something like: |
---|
161 | |
---|
162 | >>> cave1 = createObject('Lovely Cave') |
---|
163 | >>> cave1.name = "Fred's home" |
---|
164 | >>> cave1.owner = "Fred" |
---|
165 | >>> stoneville[cave1.name] = cave1 |
---|
166 | |
---|
167 | and Stoneville has exactly |
---|
168 | |
---|
169 | >>> len(stoneville) |
---|
170 | 1 |
---|
171 | |
---|
172 | inhabitant. But we don't want to do this for hundreds or thousands of |
---|
173 | citizens-to-be, do we? |
---|
174 | |
---|
175 | It is much easier to create a simple CSV list, where we put in all the |
---|
176 | data and let a batch processor do the job. |
---|
177 | |
---|
178 | The list is already here: |
---|
179 | |
---|
180 | >>> open('newcomers.csv', 'wb').write( |
---|
181 | ... """name,dinoports,owner,taxpayer |
---|
182 | ... Barneys Home,2,Barney,1 |
---|
183 | ... Wilmas Asylum,1,Wilma,1 |
---|
184 | ... Freds Dinoburgers,10,Fred,0 |
---|
185 | ... Joeys Drive-in,110,Joey,0 |
---|
186 | ... """) |
---|
187 | |
---|
188 | All we need, is a batch processor now. |
---|
189 | |
---|
190 | >>> from waeup.kofa.utils.batching import BatchProcessor |
---|
191 | >>> from waeup.kofa.interfaces import IGNORE_MARKER |
---|
192 | >>> class CaveProcessor(BatchProcessor): |
---|
193 | ... util_name = 'caveprocessor' |
---|
194 | ... grok.name(util_name) |
---|
195 | ... name = 'Cave Processor' |
---|
196 | ... iface = ICave |
---|
197 | ... location_fields = ['name'] |
---|
198 | ... factory_name = 'Lovely Cave' |
---|
199 | ... |
---|
200 | ... def parentsExist(self, row, site): |
---|
201 | ... return True |
---|
202 | ... |
---|
203 | ... def getParent(self, row, site): |
---|
204 | ... return stoneville |
---|
205 | ... |
---|
206 | ... def entryExists(self, row, site): |
---|
207 | ... return row['name'] in stoneville.keys() |
---|
208 | ... |
---|
209 | ... def getEntry(self, row, site): |
---|
210 | ... if not self.entryExists(row, site): |
---|
211 | ... return None |
---|
212 | ... return stoneville[row['name']] |
---|
213 | ... |
---|
214 | ... def delEntry(self, row, site): |
---|
215 | ... del stoneville[row['name']] |
---|
216 | ... |
---|
217 | ... def addEntry(self, obj, row, site): |
---|
218 | ... stoneville[row['name']] = obj |
---|
219 | ... |
---|
220 | ... def updateEntry(self, obj, row, site, filename): |
---|
221 | ... # This is not strictly necessary, as the default |
---|
222 | ... # updateEntry method does exactly the same |
---|
223 | ... for key, value in row.items(): |
---|
224 | ... if value != IGNORE_MARKER: |
---|
225 | ... setattr(obj, key, value) |
---|
226 | |
---|
227 | If we also want the results being logged, we must provide a logger |
---|
228 | (this is optional): |
---|
229 | |
---|
230 | >>> import logging |
---|
231 | >>> logger = logging.getLogger('stoneville') |
---|
232 | >>> logger.setLevel(logging.DEBUG) |
---|
233 | >>> logger.propagate = False |
---|
234 | >>> handler = logging.FileHandler('stoneville.log', 'w') |
---|
235 | >>> logger.addHandler(handler) |
---|
236 | |
---|
237 | Create the fellows: |
---|
238 | |
---|
239 | >>> processor = CaveProcessor() |
---|
240 | >>> result = processor.doImport('newcomers.csv', |
---|
241 | ... ['name', 'dinoports', 'owner', 'taxpayer'], |
---|
242 | ... mode='create', user='Bob', logger=logger) |
---|
243 | >>> result |
---|
244 | (4, 0, '/.../newcomers.finished.csv', None) |
---|
245 | |
---|
246 | The result means: four entries were processed and no warnings |
---|
247 | occured. Furthermore we get filepath to a CSV file with successfully |
---|
248 | processed entries and a filepath to a CSV file with erraneous entries. |
---|
249 | As everything went well, the latter is ``None``. Let's check: |
---|
250 | |
---|
251 | >>> sorted(stoneville.keys()) |
---|
252 | [u'Barneys Home', ..., u'Wilmas Asylum'] |
---|
253 | |
---|
254 | The values of the Cave instances have correct type: |
---|
255 | |
---|
256 | >>> barney = stoneville['Barneys Home'] |
---|
257 | >>> barney.dinoports |
---|
258 | 2 |
---|
259 | |
---|
260 | which is a number, not a string. |
---|
261 | |
---|
262 | Apparently, when calling the processor, we gave some more info than |
---|
263 | only the CSV filepath. What does it all mean? |
---|
264 | |
---|
265 | While the first argument is the path to the CSV file, we also have to |
---|
266 | give an ordered list of headernames. These replace the header field |
---|
267 | names that are actually in the file. This way we can override faulty |
---|
268 | headers. |
---|
269 | |
---|
270 | The ``mode`` paramter tells what kind of operation we want to perform: |
---|
271 | ``create``, ``update``, or ``remove`` data. |
---|
272 | |
---|
273 | The ``user`` parameter finally is optional and only used for logging. |
---|
274 | |
---|
275 | We can, by the way, see the results of our run in a logfile if we |
---|
276 | provided a logger during the call: |
---|
277 | |
---|
278 | >>> print open('stoneville.log').read() |
---|
279 | processed: newcomers.csv, create mode, 4 lines (4 successful/ 0 failed), ... s (... s/item) |
---|
280 | |
---|
281 | |
---|
282 | We cleanup the temporay dir created by doImport(): |
---|
283 | |
---|
284 | >>> import shutil |
---|
285 | >>> import os |
---|
286 | >>> shutil.rmtree(os.path.dirname(result[2])) |
---|
287 | |
---|
288 | As we can see, the processing was successful. Otherwise, all problems |
---|
289 | could be read here as we can see, if we do the same operation again: |
---|
290 | |
---|
291 | >>> result = processor.doImport('newcomers.csv', |
---|
292 | ... ['name', 'dinoports', 'owner', 'taxpayer'], |
---|
293 | ... mode='create', user='Bob', logger=logger) |
---|
294 | >>> result |
---|
295 | (4, 4, '/.../newcomers.finished.csv', '/.../newcomers.pending.csv') |
---|
296 | |
---|
297 | This time we also get a path to a .pending file. |
---|
298 | |
---|
299 | The log file will tell us this in more detail: |
---|
300 | |
---|
301 | >>> print open('stoneville.log').read() |
---|
302 | processed: newcomers.csv, create mode, 4 lines (4 successful/ 0 failed), ... s (... s/item) |
---|
303 | processed: newcomers.csv, create mode, 4 lines (0 successful/ 4 failed), ... s (... s/item) |
---|
304 | |
---|
305 | |
---|
306 | This time a new file was created, which keeps all the rows we could not |
---|
307 | process and an additional column with error messages: |
---|
308 | |
---|
309 | >>> print open(result[3]).read() |
---|
310 | owner,name,taxpayer,dinoports,--ERRORS-- |
---|
311 | Barney,Barneys Home,1,2,This object already exists. Skipping. |
---|
312 | Wilma,Wilmas Asylum,1,1,This object already exists. Skipping. |
---|
313 | Fred,Freds Dinoburgers,0,10,This object already exists. Skipping. |
---|
314 | Joey,Joeys Drive-in,0,110,This object already exists. Skipping. |
---|
315 | |
---|
316 | This way we can correct the faulty entries and afterwards retry without |
---|
317 | having the already processed rows in the way. |
---|
318 | |
---|
319 | We also notice, that the values of the taxpayer column are returned as |
---|
320 | in the input file. There we wrote '1' for ``True`` and '0' for |
---|
321 | ``False`` (which is accepted by the converters). |
---|
322 | |
---|
323 | Clean up: |
---|
324 | |
---|
325 | >>> shutil.rmtree(os.path.dirname(result[2])) |
---|
326 | |
---|
327 | |
---|
328 | We can also tell to ignore some cols from input by passing |
---|
329 | ``--IGNORE--`` as col name: |
---|
330 | |
---|
331 | >>> result = processor.doImport('newcomers.csv', ['name', |
---|
332 | ... '--IGNORE--', '--IGNORE--'], |
---|
333 | ... mode='update', user='Bob') |
---|
334 | >>> result |
---|
335 | (4, 0, '...', None) |
---|
336 | |
---|
337 | Clean up: |
---|
338 | |
---|
339 | >>> shutil.rmtree(os.path.dirname(result[2])) |
---|
340 | |
---|
341 | If something goes wrong during processing, the respective --IGNORE-- |
---|
342 | cols won't be populated in the resulting pending file: |
---|
343 | |
---|
344 | >>> result = processor.doImport('newcomers.csv', ['name', 'dinoports', |
---|
345 | ... '--IGNORE--', '--IGNORE--'], |
---|
346 | ... mode='create', user='Bob') |
---|
347 | >>> result |
---|
348 | (4, 4, '...', '...') |
---|
349 | |
---|
350 | >>> print open(result[3], 'rb').read() |
---|
351 | name,dinoports,--ERRORS-- |
---|
352 | Barneys Home,2,This object already exists. Skipping. |
---|
353 | Wilmas Asylum,1,This object already exists. Skipping. |
---|
354 | Freds Dinoburgers,10,This object already exists. Skipping. |
---|
355 | Joeys Drive-in,110,This object already exists. Skipping. |
---|
356 | |
---|
357 | |
---|
358 | Clean up: |
---|
359 | |
---|
360 | >>> shutil.rmtree(os.path.dirname(result[2])) |
---|
361 | |
---|
362 | |
---|
363 | |
---|
364 | |
---|
365 | Updating entries |
---|
366 | ---------------- |
---|
367 | |
---|
368 | To update entries, we just call the batchprocessor in a different |
---|
369 | mode: |
---|
370 | |
---|
371 | >>> result = processor.doImport('newcomers.csv', ['name', |
---|
372 | ... 'dinoports', 'owner'], |
---|
373 | ... mode='update', user='Bob') |
---|
374 | >>> result |
---|
375 | (4, 0, '...', None) |
---|
376 | |
---|
377 | Now we want to tell, that Wilma got an extra port for her second dino: |
---|
378 | |
---|
379 | >>> open('newcomers.csv', 'wb').write( |
---|
380 | ... """name,dinoports,owner |
---|
381 | ... Wilmas Asylum,2,Wilma |
---|
382 | ... """) |
---|
383 | |
---|
384 | >>> wilma = stoneville['Wilmas Asylum'] |
---|
385 | >>> wilma.dinoports |
---|
386 | 1 |
---|
387 | |
---|
388 | Clean up: |
---|
389 | |
---|
390 | >>> shutil.rmtree(os.path.dirname(result[2])) |
---|
391 | |
---|
392 | |
---|
393 | We start the processor: |
---|
394 | |
---|
395 | >>> result = processor.doImport('newcomers.csv', ['name', |
---|
396 | ... 'dinoports', 'owner'], mode='update', user='Bob') |
---|
397 | >>> result |
---|
398 | (1, 0, '...', None) |
---|
399 | |
---|
400 | >>> wilma = stoneville['Wilmas Asylum'] |
---|
401 | >>> wilma.dinoports |
---|
402 | 2 |
---|
403 | |
---|
404 | Wilma's number of dinoports raised. |
---|
405 | |
---|
406 | Clean up: |
---|
407 | |
---|
408 | >>> shutil.rmtree(os.path.dirname(result[2])) |
---|
409 | |
---|
410 | |
---|
411 | If we try to update an unexisting entry, an error occurs: |
---|
412 | |
---|
413 | >>> open('newcomers.csv', 'wb').write( |
---|
414 | ... """name,dinoports,owner |
---|
415 | ... NOT-WILMAS-ASYLUM,2,Wilma |
---|
416 | ... """) |
---|
417 | |
---|
418 | >>> result = processor.doImport('newcomers.csv', ['name', |
---|
419 | ... 'dinoports', 'owner'], |
---|
420 | ... mode='update', user='Bob') |
---|
421 | >>> result |
---|
422 | (1, 1, '/.../newcomers.finished.csv', '/.../newcomers.pending.csv') |
---|
423 | |
---|
424 | Clean up: |
---|
425 | |
---|
426 | >>> shutil.rmtree(os.path.dirname(result[2])) |
---|
427 | |
---|
428 | |
---|
429 | Also invalid values will be spotted: |
---|
430 | |
---|
431 | >>> open('newcomers.csv', 'wb').write( |
---|
432 | ... """name,dinoports,owner |
---|
433 | ... Wilmas Asylum,NOT-A-NUMBER,Wilma |
---|
434 | ... """) |
---|
435 | |
---|
436 | >>> result = processor.doImport('newcomers.csv', ['name', |
---|
437 | ... 'dinoports', 'owner'], |
---|
438 | ... mode='update', user='Bob') |
---|
439 | >>> result |
---|
440 | (1, 1, '...', '...') |
---|
441 | |
---|
442 | Clean up: |
---|
443 | |
---|
444 | >>> shutil.rmtree(os.path.dirname(result[2])) |
---|
445 | |
---|
446 | |
---|
447 | We can also update only some cols, leaving some out. We skip the |
---|
448 | 'dinoports' column in the next run: |
---|
449 | |
---|
450 | >>> open('newcomers.csv', 'wb').write( |
---|
451 | ... """name,owner |
---|
452 | ... Wilmas Asylum,Barney |
---|
453 | ... """) |
---|
454 | |
---|
455 | >>> result = processor.doImport('newcomers.csv', ['name', 'owner'], |
---|
456 | ... mode='update', user='Bob') |
---|
457 | >>> result |
---|
458 | (1, 0, '...', None) |
---|
459 | |
---|
460 | >>> wilma.owner |
---|
461 | u'Barney' |
---|
462 | |
---|
463 | Clean up: |
---|
464 | |
---|
465 | >>> shutil.rmtree(os.path.dirname(result[2])) |
---|
466 | |
---|
467 | |
---|
468 | We can however, not leave out the 'location field' ('name' in our |
---|
469 | case), as this one tells us which entry to update: |
---|
470 | |
---|
471 | >>> open('newcomers.csv', 'wb').write( |
---|
472 | ... """name,dinoports,owner |
---|
473 | ... 2,Wilma |
---|
474 | ... """) |
---|
475 | |
---|
476 | >>> processor.doImport('newcomers.csv', ['dinoports', 'owner'], |
---|
477 | ... mode='update', user='Bob') |
---|
478 | Traceback (most recent call last): |
---|
479 | ... |
---|
480 | FatalCSVError: Need at least columns 'name' for import! |
---|
481 | |
---|
482 | This time we get even an exception! |
---|
483 | |
---|
484 | Generally, empty strings are considered as ``None``: |
---|
485 | |
---|
486 | >>> open('newcomers.csv', 'wb').write( |
---|
487 | ... """name,dinoports,owner |
---|
488 | ... "Wilmas Asylum","","Wilma" |
---|
489 | ... """) |
---|
490 | |
---|
491 | >>> result = processor.doImport('newcomers.csv', ['name', |
---|
492 | ... 'dinoports', 'owner'], |
---|
493 | ... mode='update', user='Bob') |
---|
494 | >>> result |
---|
495 | (1, 0, '...', None) |
---|
496 | |
---|
497 | >>> wilma.dinoports |
---|
498 | 2 |
---|
499 | |
---|
500 | Clean up: |
---|
501 | |
---|
502 | >>> shutil.rmtree(os.path.dirname(result[2])) |
---|
503 | |
---|
504 | We can tell to set dinoports to ``None`` although this is not a |
---|
505 | number, as we declared the field not required in the interface: |
---|
506 | |
---|
507 | >>> open('newcomers.csv', 'wb').write( |
---|
508 | ... """name,dinoports,owner |
---|
509 | ... "Wilmas Asylum","XXX","Wilma" |
---|
510 | ... """) |
---|
511 | |
---|
512 | >>> result = processor.doImport('newcomers.csv', ['name', |
---|
513 | ... 'dinoports', 'owner'], |
---|
514 | ... mode='update', user='Bob', ignore_empty=False) |
---|
515 | >>> result |
---|
516 | (1, 0, '...', None) |
---|
517 | |
---|
518 | >>> wilma.dinoports is None |
---|
519 | True |
---|
520 | |
---|
521 | Clean up: |
---|
522 | |
---|
523 | >>> shutil.rmtree(os.path.dirname(result[2])) |
---|
524 | |
---|
525 | Removing entries |
---|
526 | ---------------- |
---|
527 | |
---|
528 | In 'remove' mode we can delete entries. Here validity of values in |
---|
529 | non-location fields doesn't matter because those fields are ignored. |
---|
530 | |
---|
531 | >>> open('newcomers.csv', 'wb').write( |
---|
532 | ... """name,dinoports,owner |
---|
533 | ... "Wilmas Asylum","ILLEGAL-NUMBER","" |
---|
534 | ... """) |
---|
535 | |
---|
536 | >>> result = processor.doImport('newcomers.csv', ['name', |
---|
537 | ... 'dinoports', 'owner'], |
---|
538 | ... mode='remove', user='Bob') |
---|
539 | >>> result |
---|
540 | (1, 0, '...', None) |
---|
541 | |
---|
542 | >>> sorted(stoneville.keys()) |
---|
543 | [u'Barneys Home', "Fred's home", u'Freds Dinoburgers', u'Joeys Drive-in'] |
---|
544 | |
---|
545 | Oops! Wilma is gone. |
---|
546 | |
---|
547 | Clean up: |
---|
548 | |
---|
549 | >>> shutil.rmtree(os.path.dirname(result[2])) |
---|
550 | |
---|
551 | |
---|
552 | Clean up: |
---|
553 | |
---|
554 | >>> import os |
---|
555 | >>> os.unlink('newcomers.csv') |
---|
556 | >>> os.unlink('stoneville.log') |
---|