- Timestamp:
- 27 Jan 2010, 11:33:44 (15 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
waeup/trunk/src/waeup/datacenter.txt
r4789 r4897 97 97 >>> shutil.rmtree(verynewpath) 98 98 99 100 Distributing processed files 101 ============================ 102 103 When files were processed by a batch processor, we can put the 104 resulting files into desired destinations. 105 106 We recreate the datacenter root in case it is missing: 107 108 >>> import os 109 >>> dc_root = mydatacenter.storage 110 >>> fin_dir = os.path.join(dc_root, 'finished') 111 >>> unfin_dir = os.path.join(dc_root, 'unfinished') 112 113 >>> def recreate_dc_storage(): 114 ... if os.path.exists(dc_root): 115 ... shutil.rmtree(dc_root) 116 ... os.mkdir(dc_root) 117 ... mydatacenter.setStoragePath(mydatacenter.storage) 118 >>> recreate_dc_storage() 119 120 We define a function that creates a set of faked result files: 121 122 >>> import os 123 >>> import tempfile 124 >>> def create_fake_results(source_basename, create_pending=True): 125 ... tmp_dir = tempfile.mkdtemp() 126 ... src = os.path.join(dc_root, source_basename) 127 ... pending_src = None 128 ... if create_pending: 129 ... pending_src = os.path.join(tmp_dir, 'mypendingsource.csv') 130 ... finished_src = os.path.join(tmp_dir, 'myfinishedsource.csv') 131 ... for path in (src, pending_src, finished_src): 132 ... if path is not None: 133 ... open(path, 'wb').write('blah') 134 ... return tmp_dir, src, finished_src, pending_src 135 136 Now we can create the set of result files, that typically come after a 137 successful processing of a regular source: 138 139 Now we can try to distribute those files. Let's start with a source 140 file, that was processed successfully: 141 142 >>> tmp_dir, src, finished_src, pending_src = create_fake_results( 143 ... 'mysource.csv', create_pending=False) 144 >>> mydatacenter.distProcessedFiles(True, src, finished_src, 145 ... pending_src) 146 >>> sorted(os.listdir(dc_root)) 147 ['finished', 'logs', 'unfinished'] 148 149 >>> sorted(os.listdir(fin_dir)) 150 ['mysource.csv', 'mysource.finished.csv'] 151 152 >>> sorted(os.listdir(unfin_dir)) 153 [] 154 155 >>> shutil.rmtree(tmp_dir) 156 157 The root dir is empty, while the original file and the file containing 158 all processed data were moved to'finished/'. 159 160 Now we restart, but this time we fake an erranous action: 161 162 >>> recreate_dc_storage() 163 >>> tmp_dir, src, finished_src, pending_src = create_fake_results( 164 ... 'mysource.csv') 165 >>> mydatacenter.distProcessedFiles(False, src, finished_src, 166 ... pending_src) 167 >>> sorted(os.listdir(dc_root)) 168 ['finished', 'logs', 'mysource.pending.csv', 'unfinished'] 169 170 >>> sorted(os.listdir(fin_dir)) 171 ['mysource.finished.csv'] 172 173 >>> sorted(os.listdir(unfin_dir)) 174 ['mysource.csv'] 175 176 >>> shutil.rmtree(tmp_dir) 177 178 While the original source was moved to the 'unfinished' dir, the 179 pending file went to the root and the set of already processed items 180 are stored in finished/. 181 182 We fake processing the pending file and assume that everything went 183 well this time: 184 185 >>> tmp_dir, src, finished_src, pending_src = create_fake_results( 186 ... 'mysource.pending.csv', create_pending=False) 187 >>> mydatacenter.distProcessedFiles(True, src, finished_src, 188 ... pending_src) 189 190 >>> sorted(os.listdir(dc_root)) 191 ['finished', 'logs', 'unfinished'] 192 193 >>> sorted(os.listdir(fin_dir)) 194 ['mysource.csv', 'mysource.finished.csv'] 195 196 >>> sorted(os.listdir(unfin_dir)) 197 [] 198 199 >>> shutil.rmtree(tmp_dir) 200 201 The result is the same as in the first case shown above. 202 203 We restart again, but this time we fake several non-working imports in 204 a row. 205 206 We start with a faulty start-import: 207 208 >>> recreate_dc_storage() 209 >>> tmp_dir, src, finished_src, pending_src = create_fake_results( 210 ... 'mysource.csv') 211 >>> mydatacenter.distProcessedFiles(False, src, finished_src, 212 ... pending_src) 213 >>> shutil.rmtree(tmp_dir) 214 215 We try to process the pending file, which fails again: 216 217 >>> tmp_dir, src, finished_src, pending_src = create_fake_results( 218 ... 'mysource.pending.csv') 219 >>> mydatacenter.distProcessedFiles(False, src, finished_src, 220 ... pending_src) 221 >>> shutil.rmtree(tmp_dir) 222 223 We try to process the new pending file: 224 225 >>> tmp_dir, src, finished_src, pending_src = create_fake_results( 226 ... 'mysource.pending.csv') 227 >>> mydatacenter.distProcessedFiles(False, src, finished_src, 228 ... pending_src) 229 230 >>> sorted(os.listdir(dc_root)) 231 ['finished', 'logs', 'mysource.pending.csv', 'unfinished'] 232 233 >>> sorted(os.listdir(fin_dir)) 234 ['mysource.finished.csv'] 235 236 >>> sorted(os.listdir(unfin_dir)) 237 ['mysource.csv'] 238 239 >>> shutil.rmtree(tmp_dir) 240 241 Finally, we process the pending file and everything works: 242 243 >>> tmp_dir, src, finished_src, pending_src = create_fake_results( 244 ... 'mysource.pending.csv', create_pending=False) 245 >>> mydatacenter.distProcessedFiles(True, src, finished_src, 246 ... pending_src) 247 248 >>> sorted(os.listdir(dc_root)) 249 ['finished', 'logs', 'unfinished'] 250 251 >>> sorted(os.listdir(fin_dir)) 252 ['mysource.csv', 'mysource.finished.csv'] 253 254 >>> sorted(os.listdir(unfin_dir)) 255 [] 256 257 The root dir is empty (contains no input files) and only the files in 258 finished-subdirectory remain. 259 260 Clean up: 261 262 >>> shutil.rmtree(tmp_dir) 263 >>> shutil.rmtree(verynewpath) 99 264 100 265 Handling imports
Note: See TracChangeset for help on using the changeset viewer.