KOFA Data Center **************** The KOFA data center cares for managing CSV files and importing then. .. :doctest: .. :layer: waeup.kofa.testing.KOFAUnitTestLayer Creating a data center ====================== A data center can be created easily: >>> from waeup.kofa.datacenter import DataCenter >>> mydatacenter = DataCenter() >>> mydatacenter Each data center has a location in file system where files are stored: >>> storagepath = mydatacenter.storage >>> storagepath '/tmp/tmp...' Managing the storage path ------------------------- We can set another storage path: >>> import os >>> os.mkdir('newlocation') >>> newpath = os.path.abspath('newlocation') >>> mydatacenter.setStoragePath(newpath) [] The result here is a list of filenames, that could not be copied. Luckily, this list is empty. When we set a new storage path, we can tell to move all files in the old location to the new one. To see this feature in action, we first have to put a file into the old location: >>> open(os.path.join(newpath, 'myfile.txt'), 'wb').write('hello') Now we can set a new location and the file will be copied: >>> verynewpath = os.path.abspath('verynewlocation') >>> os.mkdir(verynewpath) >>> mydatacenter.setStoragePath(verynewpath, move=True) [] >>> storagepath = mydatacenter.storage >>> 'myfile.txt' in os.listdir(verynewpath) True We remove the created file to have a clean testing environment for upcoming examples: >>> os.unlink(os.path.join(storagepath, 'myfile.txt')) Uploading files =============== We can get a list of files stored in that location: >>> mydatacenter.getFiles() [] Let's put some file in the storage: >>> import os >>> filepath = os.path.join(storagepath, 'data.csv') >>> open(filepath, 'wb').write('Some Content\n') Now we can find a file: >>> mydatacenter.getFiles() [] As we can see, the actual file is wrapped by a convenience wrapper, that enables us to fetch some data about the file. The data returned is formatted in strings, so that it can easily be put into output pages: >>> datafile = mydatacenter.getFiles()[0] >>> datafile.getSize() '13 bytes' >>> datafile.getDate() # Nearly current datetime... '...' Clean up: >>> import shutil >>> shutil.rmtree(newpath) >>> shutil.rmtree(verynewpath) Distributing processed files ============================ When files were processed by a batch processor, we can put the resulting files into desired destinations. We recreate the datacenter root in case it is missing: >>> import os >>> dc_root = mydatacenter.storage >>> fin_dir = os.path.join(dc_root, 'finished') >>> unfin_dir = os.path.join(dc_root, 'unfinished') >>> def recreate_dc_storage(): ... if os.path.exists(dc_root): ... shutil.rmtree(dc_root) ... os.mkdir(dc_root) ... mydatacenter.setStoragePath(mydatacenter.storage) >>> recreate_dc_storage() We define a function that creates a set of faked result files: >>> import os >>> import tempfile >>> def create_fake_results(source_basename, create_pending=True): ... tmp_dir = tempfile.mkdtemp() ... src = os.path.join(dc_root, source_basename) ... pending_src = None ... if create_pending: ... pending_src = os.path.join(tmp_dir, 'mypendingsource.csv') ... finished_src = os.path.join(tmp_dir, 'myfinishedsource.csv') ... for path in (src, pending_src, finished_src): ... if path is not None: ... open(path, 'wb').write('blah') ... return tmp_dir, src, finished_src, pending_src Now we can create the set of result files, that typically come after a successful processing of a regular source: Now we can try to distribute those files. Let's start with a source file, that was processed successfully: >>> tmp_dir, src, finished_src, pending_src = create_fake_results( ... 'mysource.csv', create_pending=False) >>> mydatacenter.distProcessedFiles(True, src, finished_src, ... pending_src, mode='create') >>> sorted(os.listdir(dc_root)) ['finished', 'logs', 'unfinished'] >>> sorted(os.listdir(fin_dir)) ['mysource.create.finished.csv', 'mysource.csv'] >>> sorted(os.listdir(unfin_dir)) [] The created dir will be removed for us by the datacenter. This way we can assured, that less temporary dirs are left hanging around: >>> os.path.exists(tmp_dir) False The root dir is empty, while the original file and the file containing all processed data were moved to'finished/'. Now we restart, but this time we fake an erranous action: >>> recreate_dc_storage() >>> tmp_dir, src, finished_src, pending_src = create_fake_results( ... 'mysource.csv') >>> mydatacenter.distProcessedFiles(False, src, finished_src, ... pending_src, mode='create') >>> sorted(os.listdir(dc_root)) ['finished', 'logs', 'mysource.create.pending.csv', 'unfinished'] >>> sorted(os.listdir(fin_dir)) ['mysource.create.finished.csv'] >>> sorted(os.listdir(unfin_dir)) ['mysource.csv'] While the original source was moved to the 'unfinished' dir, the pending file went to the root and the set of already processed items are stored in finished/. We fake processing the pending file and assume that everything went well this time: >>> tmp_dir, src, finished_src, pending_src = create_fake_results( ... 'mysource.create.pending.csv', create_pending=False) >>> mydatacenter.distProcessedFiles(True, src, finished_src, ... pending_src, mode='create') >>> sorted(os.listdir(dc_root)) ['finished', 'logs', 'unfinished'] >>> sorted(os.listdir(fin_dir)) ['mysource.create.finished.csv', 'mysource.csv'] >>> sorted(os.listdir(unfin_dir)) [] The result is the same as in the first case shown above. We restart again, but this time we fake several non-working imports in a row. We start with a faulty start-import: >>> recreate_dc_storage() >>> tmp_dir, src, finished_src, pending_src = create_fake_results( ... 'mysource.csv') >>> mydatacenter.distProcessedFiles(False, src, finished_src, ... pending_src, mode='create') We try to process the pending file, which fails again: >>> tmp_dir, src, finished_src, pending_src = create_fake_results( ... 'mysource.create.pending.csv') >>> mydatacenter.distProcessedFiles(False, src, finished_src, ... pending_src, mode='create') We try to process the new pending file: >>> tmp_dir, src, finished_src, pending_src = create_fake_results( ... 'mysource.create.pending.csv') >>> mydatacenter.distProcessedFiles(False, src, finished_src, ... pending_src, mode='create') >>> sorted(os.listdir(dc_root)) ['finished', 'logs', 'mysource.create.pending.csv', 'unfinished'] >>> sorted(os.listdir(fin_dir)) ['mysource.create.finished.csv'] >>> sorted(os.listdir(unfin_dir)) ['mysource.csv'] Finally, we process the pending file and everything works: >>> tmp_dir, src, finished_src, pending_src = create_fake_results( ... 'mysource.create.pending.csv', create_pending=False) >>> mydatacenter.distProcessedFiles(True, src, finished_src, ... pending_src, mode='create') >>> sorted(os.listdir(dc_root)) ['finished', 'logs', 'unfinished'] >>> sorted(os.listdir(fin_dir)) ['mysource.create.finished.csv', 'mysource.csv'] >>> sorted(os.listdir(unfin_dir)) [] The root dir is empty (contains no input files) and only the files in finished-subdirectory remain. Clean up: >>> shutil.rmtree(verynewpath)