"""WAeUP data center. The waeup data center cares for management of upload data and provides tools for importing/exporting CSV data. """ import logging import os import shutil import struct import grok from datetime import datetime from zope.component import getMultiAdapter from zope.component.interfaces import ObjectEvent from waeup.csvfile import getCSVFile from waeup.interfaces import (IDataCenter, IWAeUPCSVImporter, ICSVDataReceivers, IDataCenterFile, IDataCenterStorageMovedEvent) from waeup.utils.helpers import copyFileSystemTree class DataCenter(grok.Container): """A data center contains CSV files. """ grok.implements(IDataCenter) storage = os.path.join(os.path.dirname(__file__), 'files') @property def logger(self): """Get a logger for datacenter actions. """ # We need a different logger for every site... site = grok.getSite() sitename = getattr(site, '__name__', 'app') loggername = 'waeup.sirp.%s.datacenter' % sitename logger = logging.getLogger(loggername) if not logger.handlers: logger = self._setupLogger(logger) return logger def __init__(self, *args, **kw): super(DataCenter, self).__init__(*args, **kw) self._createSubDirs() def _setupLogger(self, logger): """Setup datacenter logger. """ logdir = os.path.join(self.storage, 'logs') if not os.path.exists(logdir): os.mkdir(logdir) filename = os.path.join(logdir, 'datacenter.log') # Create a rotating file handler logger for datacenter. handler = logging.handlers.RotatingFileHandler( filename, maxBytes=5*1024**1, backupCount=5) formatter = logging.Formatter( '%(asctime)s - %(levelname)s - %(message)s') handler.setFormatter(formatter) # Here we decide, whether our messages will _also_ go to # application log. logger.propagate = False logger.setLevel(logging.DEBUG) logger.addHandler(handler) return logger def _createSubDirs(self): """Create standard subdirs. """ for name in ['finished', 'unfinished']: path = os.path.join(self.storage, name) if os.path.exists(path): continue os.mkdir(path) return def getReceivers(self): receivers = [] curr_obj = getattr(self, '__parent__', None) while curr_obj is not None: if ICSVDataReceivers.providedBy(curr_obj): receivers = self.getCSVDataReceivers(curr_obj) break curr_obj = getattr(curr_obj, '__parent__', None) return receivers def getReceiverIds(self): """Get a dict of available receivers. The keys of the result are the receiver ids. """ receivers = self.getReceivers() return dict([(self.getReceiverId(x), x) for x in receivers]) def getImporters(self): """Get a list of all importers available. The search for available importers is done in two steps: 1) Look for a utility providing ICSVDataReceiver, 2) For every item of that utility: try to get an adapter providing IWAeUPCSVImporter. """ result = [] receivers = self.getReceivers() files = self.getFiles() for receiver in receivers: for file in files: wrapped_file = getCSVFile(file.context) try: importer = getMultiAdapter((wrapped_file, receiver), IWAeUPCSVImporter) result.append(importer) except: # No multi-adapter for this combination available... pass return result def getFiles(self, sort='name'): """Get a list of files stored in `storage`. Files are sorted by basename. """ result = [] if not os.path.exists(self.storage): return result for filename in sorted(os.listdir(self.storage)): fullpath = os.path.join(self.storage, filename) if not os.path.isfile(fullpath): continue result.append(DataCenterFile(fullpath)) if sort == 'date': # sort results in newest-first order... result = sorted(result, key=lambda x: x.getTimeStamp(), reverse=True) return result def getLogFiles(self): """Get the files from logs/ subdir. Files are sorted by name. """ result = [] logdir = os.path.join(self.storage, 'logs') if not os.path.exists(logdir): os.mkdir(logdir) for name in sorted(os.listdir(logdir)): if not os.path.isfile(os.path.join(logdir, name)): continue result.append( LogFile(os.path.join(self.storage, 'logs', name))) return result def setStoragePath(self, path, move=False, overwrite=False): """Set the path where to store files. """ path = os.path.abspath(path) not_copied = [] if not os.path.exists(path): raise ValueError('The path given does not exist: %s' % path) if move is True: not_copied = copyFileSystemTree(self.storage, path, overwrite=overwrite) self.storage = path self._createSubDirs() # Adjust logger... logger = self.logger handlers = logger.handlers for handler in handlers: logger.removeHandler(handler) self._setupLogger(logger) grok.notify(DataCenterStorageMovedEvent(self)) return not_copied def getCSVDataReceivers(self, obj): """Get a list of attributes, that can receive CSV data. We also get a list of values, if obj is a container that contains value items. """ result = [] for attr_name in dir(obj): if attr_name.startswith('_'): continue try: attr = getattr(obj, attr_name) # This might fail... #IWAeUPCSVImporter(attr) result.append(attr) except: pass if hasattr(obj, 'values'): try: result.extend(obj.values()) except: pass return result def getPossibleImports(self): """Get list of possible imports. A single import is defined as a tuple ( , ) where ```` is an `IDataCenterFile` object and ```` is a list of `IWAeUPCSVImporter` objects. """ result = [] importers = self.getImporters() for filedescr in self.getFiles(): possible_importers = [] for importer in importers: if importer.csvfile.path != filedescr.context: continue importer_context = ( importer, self.getReceiverId(importer.receiver)) possible_importers.append(importer_context) if len(possible_importers) == 0: continue result.append((filedescr, possible_importers)) return result def getReceiverId(self, obj): """Get a unique id for an object. If the object is stored in ZODB it should contain a `_p_oid` attribute, which is guaranteed to be unique over the ZODB. If the object has no such attribute, then it will be held in memory only and will vanish as soon as the request is over. In this case we can use the memory address of it. Both, the id() result and the ZODB oid, are actually integers which we return as strings. To make the difference chrystal clear, we prepend ``z`` to ids taken from ZODB oids. """ if not hasattr(obj, '_p_oid'): return str(id(obj)) oid = getattr(obj, '_p_oid') if oid is None: # The persistent object is not stored in the ZODB return str(id(obj)) return 'z%s' % struct.unpack('>Q', oid)[0] def doImport(self, path, receiverid, clear=None): receivers = self.getReceiverIds() if receiverid not in receivers.keys(): raise ValueError('The requested data receiver cannot be found.') receiver = receivers[receiverid] filewrapper = getCSVFile(path) if filewrapper is None: raise ValueError('Format of CSV file not supported.') importer = getMultiAdapter((filewrapper, receiver), IWAeUPCSVImporter) if clear is not None: importer.doImport(clear_old_data=clear) else: importer.doImport() return def _moveFile(self, source, dest): """Move file source to dest preserving ctime, mtime, etc. """ if not os.path.exists(source): self.logger.warn('No such source path: %s' % source) return if source == dest: return shutil.copyfile(source, dest) shutil.copystat(source, dest) os.unlink(source) def distProcessedFiles(self, successful, source_path, finished_file, pending_file, move_orig=True): """Put processed files into final locations. ``successful`` is a boolean that tells, whether processing was successful. ``source_path``: path to file that was processed. ``finished_file``, ``pending_file``: paths to the respective generated .pending and .finished file. The .pending file path may be ``None``. If finished file is placed in a location outside the local storage dir, the complete directory is removed afterwards. Regular importers should put their stuff in dedicated temporary dirs. See datacenter.txt for more info about how this works. """ basename = os.path.basename(source_path) pending_name = basename pending = False finished_dir = os.path.join(self.storage, 'finished') unfinished_dir = os.path.join(self.storage, 'unfinished') if basename.endswith('.pending.csv'): maybe_basename = "%s.csv" % basename.split('.', 2)[0] maybe_src = os.path.join(unfinished_dir, maybe_basename) if os.path.isfile(maybe_src): basename = maybe_basename pending = True base, ext = os.path.splitext(basename) finished_name = "%s.finished%s" % (base, ext) if not pending: pending_name = "%s.pending%s" % (base, ext) # Put .pending and .finished file into respective places... pending_dest = os.path.join(self.storage, pending_name) finished_dest = os.path.join(finished_dir, finished_name) self._moveFile(finished_file, finished_dest) if pending_file is not None: self._moveFile(pending_file, pending_dest) # Put source file into final location... finished_dest = os.path.join(finished_dir, basename) unfinished_dest = os.path.join(unfinished_dir, basename) if successful and not pending: self._moveFile(source_path, finished_dest) elif successful and pending: self._moveFile(unfinished_dest, finished_dest) os.unlink(source_path) elif not successful and not pending: self._moveFile(source_path, unfinished_dest) # If finished and pending-file were created in a location # outside datacenter storage, we remove it. maybe_temp_dir = os.path.dirname(finished_file) if os.path.commonprefix( [self.storage, maybe_temp_dir]) != self.storage: shutil.rmtree(maybe_temp_dir) return class DataCenterFile(object): """A description of a file stored in data center. """ grok.implements(IDataCenterFile) def __init__(self, context): self.context = context self.name = os.path.basename(self.context) self.size = self.getSize() self.uploaddate = self.getDate() self.lines = self.getLinesNumber() def getDate(self): """Get a human readable datetime representation. """ date = datetime.fromtimestamp(os.path.getctime(self.context)) return date.strftime('%c') def getTimeStamp(self): """Get a (machine readable) timestamp. """ return os.path.getctime(self.context) def getSize(self): """Get a human readable file size. """ bytesize = os.path.getsize(self.context) size = "%s bytes" % bytesize units = ['kb', 'MB', 'GB'] for power, unit in reversed(list(enumerate(units))): power += 1 if bytesize >= 1024 ** power: size = "%.2f %s" % (bytesize/(1024.0**power), unit) break return size def getLinesNumber(self): """Get number of lines. """ num = 0 for line in open(self.context, 'rb'): num += 1 return num class LogFile(DataCenterFile): """A description of a log file. """ def __init__(self, context): super(LogFile, self).__init__(context) self._markers = dict() self._parsed = False self.userid = self.getUserId() self.mode = self.getMode() self.stats = self.getStats() self.source = self.getSourcePath() def _parseFile(self, maxline=10): """Find markers in a file. """ if self._parsed: return for line in open(self.context, 'rb'): line = line.strip() if not ':' in line: continue name, text = line.split(':', 1) self._markers[name.lower()] = text self._parsed = True return def _getMarker(self, marker): marker = marker.lower() if not self._parsed: self._parseFile() if marker in self._markers.keys(): return self._markers[marker] def getUserId(self): return self._getMarker('user') or '' def getMode(self): return self._getMarker('mode') or '' def getStats(self): return self._getMarker('processed') or '' def getSourcePath(self): return self._getMarker('source') or None class Import(object): """Helper class to aggregate imports and their data. """ def __init__(self, filedescr, importers): self.file = filedescr self.importers = [] for importer, receiverid in importers: importer.receiverid = receiverid self.importers.append(importer) class DataCenterStorageMovedEvent(ObjectEvent): """An event fired, when datacenter storage moves. """ grok.implements(IDataCenterStorageMovedEvent)