## $Id: datacenter.py 7933 2012-03-21 13:42:00Z henrik $ ## ## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann ## This program is free software; you can redistribute it and/or modify ## it under the terms of the GNU General Public License as published by ## the Free Software Foundation; either version 2 of the License, or ## (at your option) any later version. ## ## This program is distributed in the hope that it will be useful, ## but WITHOUT ANY WARRANTY; without even the implied warranty of ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the ## GNU General Public License for more details. ## ## You should have received a copy of the GNU General Public License ## along with this program; if not, write to the Free Software ## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA ## """Kofa data center. The waeup data center cares for management of upload data and provides tools for importing/exporting CSV data. """ import logging import os import shutil import grok from datetime import datetime from zope.component import getUtility from zope.component.interfaces import ObjectEvent from waeup.kofa.interfaces import (IDataCenter, IDataCenterFile, IDataCenterStorageMovedEvent, IDataCenterConfig) from waeup.kofa.utils.helpers import copy_filesystem_tree from waeup.kofa.utils.logger import Logger class DataCenter(grok.Container, Logger): """A data center contains CSV files. """ grok.implements(IDataCenter) logger_name = 'waeup.kofa.${sitename}.datacenter' logger_filename = 'datacenter.log' def __init__(self, *args, **kw): super(DataCenter, self).__init__(*args, **kw) self.storage = getUtility(IDataCenterConfig)['path'] self._createSubDirs() def _createSubDirs(self): """Create standard subdirs. """ for name in ['finished', 'unfinished', 'logs']: path = os.path.join(self.storage, name) if os.path.exists(path): continue os.mkdir(path) return def getFiles(self, sort='name'): """Get a list of files stored in `storage`. Files are sorted by basename. """ result = [] if not os.path.exists(self.storage): return result for filename in sorted(os.listdir(self.storage)): fullpath = os.path.join(self.storage, filename) if not os.path.isfile(fullpath): continue result.append(DataCenterFile(fullpath)) if sort == 'date': # sort results in newest-first order... result = sorted(result, key=lambda x: x.getTimeStamp(), reverse=True) return result def getLogFiles(self): """Get the files from logs/ subdir. Files are sorted by name. """ result = [] logdir = os.path.join(self.storage, 'logs') if not os.path.exists(logdir): os.mkdir(logdir) for name in sorted(os.listdir(logdir)): if not os.path.isfile(os.path.join(logdir, name)): continue result.append( LogFile(os.path.join(self.storage, 'logs', name))) return result def setStoragePath(self, path, move=False, overwrite=False): """Set the path where to store files. """ path = os.path.abspath(path) not_copied = [] if not os.path.exists(path): raise ValueError('The path given does not exist: %s' % path) if move is True: not_copied = copy_filesystem_tree(self.storage, path, overwrite=overwrite) self.storage = path self._createSubDirs() grok.notify(DataCenterStorageMovedEvent(self)) return not_copied def _moveFile(self, source, dest): """Move file source to dest preserving ctime, mtime, etc. """ if not os.path.exists(source): self.logger.warn('No such source path: %s' % source) return if source == dest: return shutil.copyfile(source, dest) shutil.copystat(source, dest) os.unlink(source) def distProcessedFiles(self, successful, source_path, finished_file, pending_file, mode='create', move_orig=True): """Put processed files into final locations. ``successful`` is a boolean that tells, whether processing was successful. ``source_path``: path to file that was processed. ``finished_file``, ``pending_file``: paths to the respective generated .pending and .finished file. The .pending file path may be ``None``. If finished file is placed in a location outside the local storage dir, the complete directory is removed afterwards. Regular processors should put their stuff in dedicated temporary dirs. See datacenter.txt for more info about how this works. """ basename = os.path.basename(source_path) pending_name = basename pending = False finished_dir = os.path.join(self.storage, 'finished') unfinished_dir = os.path.join(self.storage, 'unfinished') if basename.endswith('.pending.csv'): maybe_basename = "%s.csv" % basename.rsplit('.', 3)[0] maybe_src = os.path.join(unfinished_dir, maybe_basename) if os.path.isfile(maybe_src): basename = maybe_basename pending = True base, ext = os.path.splitext(basename) finished_name = "%s.%s.finished%s" % (base, mode, ext) if not pending: pending_name = "%s.%s.pending%s" % (base, mode, ext) # Put .pending and .finished file into respective places... pending_dest = os.path.join(self.storage, pending_name) finished_dest = os.path.join(finished_dir, finished_name) self._moveFile(finished_file, finished_dest) if pending_file is not None: self._moveFile(pending_file, pending_dest) # Put source file into final location... finished_dest = os.path.join(finished_dir, basename) unfinished_dest = os.path.join(unfinished_dir, basename) if successful and not pending: self._moveFile(source_path, finished_dest) elif successful and pending: self._moveFile(unfinished_dest, finished_dest) os.unlink(source_path) elif not successful and not pending: self._moveFile(source_path, unfinished_dest) # If finished and pending-file were created in a location # outside datacenter storage, we remove it. maybe_temp_dir = os.path.dirname(finished_file) if os.path.commonprefix( [self.storage, maybe_temp_dir]) != self.storage: shutil.rmtree(maybe_temp_dir) return class DataCenterFile(object): """A description of a file stored in data center. """ grok.implements(IDataCenterFile) def __init__(self, context): self.context = context self.name = os.path.basename(self.context) self.size = self.getSize() self.uploaddate = self.getDate() self.lines = self.getLinesNumber() def getDate(self): """Get a human readable datetime representation. """ date = datetime.fromtimestamp(os.path.getctime(self.context)) return date.strftime("%Y-%m-%d %H:%M:%S") def getTimeStamp(self): """Get a (machine readable) timestamp. """ return os.path.getctime(self.context) def getSize(self): """Get a human readable file size. """ bytesize = os.path.getsize(self.context) size = "%s bytes" % bytesize units = ['kb', 'MB', 'GB'] for power, unit in reversed(list(enumerate(units))): power += 1 if bytesize >= 1024 ** power: size = "%.2f %s" % (bytesize/(1024.0**power), unit) break return size def getLinesNumber(self): """Get number of lines. """ num = 0 for line in open(self.context, 'rb'): num += 1 return num class LogFile(DataCenterFile): """A description of a log file. """ def __init__(self, context): super(LogFile, self).__init__(context) self._markers = dict() self._parsed = False self.userid = self.getUserId() self.mode = self.getMode() self.stats = self.getStats() self.source = self.getSourcePath() def _parseFile(self, maxline=10): """Find markers in a file. """ if self._parsed: return for line in open(self.context, 'rb'): line = line.strip() if not ':' in line: continue name, text = line.split(':', 1) self._markers[name.lower()] = text self._parsed = True return def _getMarker(self, marker): marker = marker.lower() if not self._parsed: self._parseFile() if marker in self._markers.keys(): return self._markers[marker] def getUserId(self): return self._getMarker('user') or '' def getMode(self): return self._getMarker('mode') or '' def getStats(self): return self._getMarker('processed') or '' def getSourcePath(self): return self._getMarker('source') or None class DataCenterStorageMovedEvent(ObjectEvent): """An event fired, when datacenter storage moves. """ grok.implements(IDataCenterStorageMovedEvent)