source: main/waeup.kofa/trunk/src/waeup/kofa/datacenter.txt @ 9148

Last change on this file since 9148 was 9023, checked in by Henrik Bettermann, 12 years ago

Give access to processed (finished) import files.

File size: 8.7 KB
RevLine 
[7819]1Kofa Data Center
[7321]2****************
[4169]3
[7819]4The Kofa data center cares for managing CSV files and importing then.
[4169]5
[5140]6.. :doctest:
[7819]7.. :layer: waeup.kofa.testing.KofaUnitTestLayer
[4169]8
9Creating a data center
10======================
11
12A data center can be created easily:
13
[7811]14    >>> from waeup.kofa.datacenter import DataCenter
[4169]15    >>> mydatacenter = DataCenter()
16    >>> mydatacenter
[7811]17    <waeup.kofa.datacenter.DataCenter object at 0x...>
[4169]18
19Each data center has a location in file system where files are stored:
20
21    >>> storagepath = mydatacenter.storage
22    >>> storagepath
[7584]23    '/tmp/tmp...'
[4169]24
[8398]25Beside other things it provides a location to put data of deleted
26items into:
[4169]27
[8398]28    >>> import os
29    >>> del_path = mydatacenter.deleted_path
30    >>> os.path.isdir(del_path)
31    True
32
33Overall it complies with the `IDataCenter` interface:
34
35    >>> from zope.interface import verify
36    >>> from waeup.kofa.interfaces import IDataCenter
37    >>> verify.verifyObject(IDataCenter, DataCenter() )
38    True
39
40    >>> verify.verifyClass(IDataCenter, DataCenter)
41    True
42
[4174]43Managing the storage path
44-------------------------
45
46We can set another storage path:
47
48    >>> import os
49    >>> os.mkdir('newlocation')
50    >>> newpath = os.path.abspath('newlocation')
51    >>> mydatacenter.setStoragePath(newpath)
[4191]52    []
[4174]53
[4191]54The result here is a list of filenames, that could not be
55copied. Luckily, this list is empty.
56
[4174]57When we set a new storage path, we can tell to move all files in the
58old location to the new one. To see this feature in action, we first
59have to put a file into the old location:
60
61    >>> open(os.path.join(newpath, 'myfile.txt'), 'wb').write('hello')
62
63Now we can set a new location and the file will be copied:
64
65    >>> verynewpath = os.path.abspath('verynewlocation')
66    >>> os.mkdir(verynewpath)
67
68    >>> mydatacenter.setStoragePath(verynewpath, move=True)
[4191]69    []
70
[4174]71    >>> storagepath = mydatacenter.storage
72    >>> 'myfile.txt' in os.listdir(verynewpath)
73    True
74
75We remove the created file to have a clean testing environment for
76upcoming examples:
77
78    >>> os.unlink(os.path.join(storagepath, 'myfile.txt'))
79
[4169]80Uploading files
81===============
82
83We can get a list of files stored in that location:
84
[9023]85    >>> mydatacenter.getPendingFiles()
[4169]86    []
87
88Let's put some file in the storage:
89
90    >>> import os
91    >>> filepath = os.path.join(storagepath, 'data.csv')
92    >>> open(filepath, 'wb').write('Some Content\n')
93
94Now we can find a file:
95
[9023]96    >>> mydatacenter.getPendingFiles()
[7811]97    [<waeup.kofa.datacenter.DataCenterFile object at 0x...>]
[4169]98
99As we can see, the actual file is wrapped by a convenience wrapper,
100that enables us to fetch some data about the file. The data returned
101is formatted in strings, so that it can easily be put into output
102pages:
103
[9023]104    >>> datafile = mydatacenter.getPendingFiles()[0]
[4169]105    >>> datafile.getSize()
106    '13 bytes'
107
108    >>> datafile.getDate() # Nearly current datetime...
109    '...'
110
111Clean up:
112
[4174]113    >>> import shutil
114    >>> shutil.rmtree(newpath)
115    >>> shutil.rmtree(verynewpath)
[4169]116
117
[4897]118Distributing processed files
119============================
120
121When files were processed by a batch processor, we can put the
122resulting files into desired destinations.
123
124We recreate the datacenter root in case it is missing:
125
126    >>> import os
127    >>> dc_root = mydatacenter.storage
128    >>> fin_dir = os.path.join(dc_root, 'finished')
129    >>> unfin_dir = os.path.join(dc_root, 'unfinished')
130
131    >>> def recreate_dc_storage():
132    ...   if os.path.exists(dc_root):
133    ...     shutil.rmtree(dc_root)
134    ...   os.mkdir(dc_root)
135    ...   mydatacenter.setStoragePath(mydatacenter.storage)
136    >>> recreate_dc_storage()
137
138We define a function that creates a set of faked result files:
139
140    >>> import os
141    >>> import tempfile
142    >>> def create_fake_results(source_basename, create_pending=True):
143    ...   tmp_dir = tempfile.mkdtemp()
144    ...   src = os.path.join(dc_root, source_basename)
145    ...   pending_src = None
146    ...   if create_pending:
147    ...     pending_src = os.path.join(tmp_dir, 'mypendingsource.csv')
148    ...   finished_src = os.path.join(tmp_dir, 'myfinishedsource.csv')
149    ...   for path in (src, pending_src, finished_src):
150    ...     if path is not None:
151    ...       open(path, 'wb').write('blah')
152    ...   return tmp_dir, src, finished_src, pending_src
153
154Now we can create the set of result files, that typically come after a
155successful processing of a regular source:
156
157Now we can try to distribute those files. Let's start with a source
158file, that was processed successfully:
159
160    >>> tmp_dir, src, finished_src, pending_src = create_fake_results(
161    ...  'mysource.csv', create_pending=False)
162    >>> mydatacenter.distProcessedFiles(True, src, finished_src,
[4999]163    ...                            pending_src, mode='create')
[4897]164    >>> sorted(os.listdir(dc_root))
[8372]165    ['deleted', 'finished', 'logs', 'unfinished']
[4897]166
167    >>> sorted(os.listdir(fin_dir))
[4999]168    ['mysource.create.finished.csv', 'mysource.csv']
[4897]169
170    >>> sorted(os.listdir(unfin_dir))
171    []
172
[4907]173The created dir will be removed for us by the datacenter. This way we
174can assured, that less temporary dirs are left hanging around:
[4897]175
[4907]176    >>> os.path.exists(tmp_dir)
177    False
178
[4897]179The root dir is empty, while the original file and the file containing
180all processed data were moved to'finished/'.
181
182Now we restart, but this time we fake an erranous action:
183
184    >>> recreate_dc_storage()
185    >>> tmp_dir, src, finished_src, pending_src = create_fake_results(
186    ...  'mysource.csv')
187    >>> mydatacenter.distProcessedFiles(False, src, finished_src,
[4999]188    ...                                 pending_src, mode='create')
[4897]189    >>> sorted(os.listdir(dc_root))
[8372]190    ['deleted', 'finished', 'logs', 'mysource.create.pending.csv', 'unfinished']
[4897]191
192    >>> sorted(os.listdir(fin_dir))
[4999]193    ['mysource.create.finished.csv']
[4897]194
195    >>> sorted(os.listdir(unfin_dir))
196    ['mysource.csv']
197
198While the original source was moved to the 'unfinished' dir, the
199pending file went to the root and the set of already processed items
200are stored in finished/.
201
202We fake processing the pending file and assume that everything went
203well this time:
204
205    >>> tmp_dir, src, finished_src, pending_src = create_fake_results(
[4999]206    ...  'mysource.create.pending.csv', create_pending=False)
[4897]207    >>> mydatacenter.distProcessedFiles(True, src, finished_src,
[4999]208    ...                                 pending_src, mode='create')
[4897]209
210    >>> sorted(os.listdir(dc_root))
[8372]211    ['deleted', 'finished', 'logs', 'unfinished']
[4897]212
213    >>> sorted(os.listdir(fin_dir))
[4999]214    ['mysource.create.finished.csv', 'mysource.csv']
[4897]215
216    >>> sorted(os.listdir(unfin_dir))
217    []
218
219The result is the same as in the first case shown above.
220
221We restart again, but this time we fake several non-working imports in
222a row.
223
224We start with a faulty start-import:
225
226    >>> recreate_dc_storage()
227    >>> tmp_dir, src, finished_src, pending_src = create_fake_results(
228    ...  'mysource.csv')
229    >>> mydatacenter.distProcessedFiles(False, src, finished_src,
[4999]230    ...                                 pending_src, mode='create')
[4897]231
232We try to process the pending file, which fails again:
233
234    >>> tmp_dir, src, finished_src, pending_src = create_fake_results(
[4999]235    ...  'mysource.create.pending.csv')
[4897]236    >>> mydatacenter.distProcessedFiles(False, src, finished_src,
[4999]237    ...                                 pending_src, mode='create')
[4897]238
239We try to process the new pending file:
240
241    >>> tmp_dir, src, finished_src, pending_src = create_fake_results(
[4999]242    ...  'mysource.create.pending.csv')
[4897]243    >>> mydatacenter.distProcessedFiles(False, src, finished_src,
[4999]244    ...                                 pending_src, mode='create')
[4897]245
246    >>> sorted(os.listdir(dc_root))
[8372]247    ['deleted', 'finished', 'logs', 'mysource.create.pending.csv', 'unfinished']
[4897]248
249    >>> sorted(os.listdir(fin_dir))
[4999]250    ['mysource.create.finished.csv']
[4897]251
252    >>> sorted(os.listdir(unfin_dir))
253    ['mysource.csv']
254
255Finally, we process the pending file and everything works:
256
257    >>> tmp_dir, src, finished_src, pending_src = create_fake_results(
[4999]258    ...  'mysource.create.pending.csv', create_pending=False)
[4897]259    >>> mydatacenter.distProcessedFiles(True, src, finished_src,
[4999]260    ...                                 pending_src, mode='create')
[4897]261
262    >>> sorted(os.listdir(dc_root))
[8372]263    ['deleted', 'finished', 'logs', 'unfinished']
[4897]264
265    >>> sorted(os.listdir(fin_dir))
[4999]266    ['mysource.create.finished.csv', 'mysource.csv']
[4897]267
268    >>> sorted(os.listdir(unfin_dir))
269    []
270
271The root dir is empty (contains no input files) and only the files in
272finished-subdirectory remain.
273
[9023]274
275We can get a list of imported files stored in the finished subfolder:
276
277    >>> mydatacenter.getFinishedFiles()
278    [<waeup.kofa.datacenter.DataCenterFile object at ...>, <waeup.kofa.datacenter.DataCenterFile object at ...>]
279
280    >>> datafile = mydatacenter.getFinishedFiles()[0]
281    >>> datafile.getSize()
282    '2 bytes'
283
284    >>> datafile.getDate() # Nearly current datetime...
285    '...'
286
287
[4897]288Clean up:
289
290    >>> shutil.rmtree(verynewpath)
Note: See TracBrowser for help on using the repository browser.