[4146] | 1 | """WAeUP data center. |
---|
| 2 | |
---|
| 3 | The waeup data center cares for management of upload data and provides |
---|
| 4 | tools for importing/exporting CSV data. |
---|
| 5 | """ |
---|
[4873] | 6 | import logging |
---|
[4146] | 7 | import os |
---|
[4896] | 8 | import shutil |
---|
[4175] | 9 | import struct |
---|
[4146] | 10 | import grok |
---|
| 11 | from datetime import datetime |
---|
[4683] | 12 | from zope.component import getMultiAdapter |
---|
[4883] | 13 | from zope.component.interfaces import ObjectEvent |
---|
[4224] | 14 | from waeup.csvfile import getCSVFile |
---|
[4683] | 15 | from waeup.interfaces import (IDataCenter, IWAeUPCSVImporter, |
---|
[4883] | 16 | ICSVDataReceivers, IDataCenterFile, |
---|
| 17 | IDataCenterStorageMovedEvent) |
---|
[4190] | 18 | from waeup.utils.helpers import copyFileSystemTree |
---|
[4146] | 19 | |
---|
| 20 | class DataCenter(grok.Container): |
---|
| 21 | """A data center contains CSV files. |
---|
| 22 | """ |
---|
[4669] | 23 | grok.implements(IDataCenter) |
---|
[4146] | 24 | storage = os.path.join(os.path.dirname(__file__), 'files') |
---|
[4166] | 25 | |
---|
[4873] | 26 | @property |
---|
| 27 | def logger(self): |
---|
| 28 | """Get a logger for datacenter actions. |
---|
| 29 | """ |
---|
| 30 | # We need a different logger for every site... |
---|
| 31 | site = grok.getSite() |
---|
[4883] | 32 | sitename = getattr(site, '__name__', 'app') |
---|
[4873] | 33 | loggername = 'waeup.sirp.%s.datacenter' % sitename |
---|
| 34 | logger = logging.getLogger(loggername) |
---|
| 35 | if not logger.handlers: |
---|
| 36 | logger = self._setupLogger(logger) |
---|
| 37 | return logger |
---|
| 38 | |
---|
[4892] | 39 | def __init__(self, *args, **kw): |
---|
| 40 | super(DataCenter, self).__init__(*args, **kw) |
---|
| 41 | self._createSubDirs() |
---|
| 42 | |
---|
[4873] | 43 | def _setupLogger(self, logger): |
---|
| 44 | """Setup datacenter logger. |
---|
| 45 | """ |
---|
| 46 | logdir = os.path.join(self.storage, 'logs') |
---|
| 47 | if not os.path.exists(logdir): |
---|
| 48 | os.mkdir(logdir) |
---|
| 49 | filename = os.path.join(logdir, 'datacenter.log') |
---|
| 50 | |
---|
| 51 | # Create a rotating file handler logger for datacenter. |
---|
| 52 | handler = logging.handlers.RotatingFileHandler( |
---|
| 53 | filename, maxBytes=5*1024**1, backupCount=5) |
---|
| 54 | formatter = logging.Formatter( |
---|
| 55 | '%(asctime)s - %(levelname)s - %(message)s') |
---|
| 56 | handler.setFormatter(formatter) |
---|
| 57 | |
---|
| 58 | # Here we decide, whether our messages will _also_ go to |
---|
| 59 | # application log. |
---|
| 60 | logger.propagate = False |
---|
[4883] | 61 | logger.setLevel(logging.DEBUG) |
---|
[4873] | 62 | logger.addHandler(handler) |
---|
| 63 | return logger |
---|
| 64 | |
---|
[4892] | 65 | def _createSubDirs(self): |
---|
| 66 | """Create standard subdirs. |
---|
| 67 | """ |
---|
[4896] | 68 | for name in ['finished', 'unfinished']: |
---|
[4892] | 69 | path = os.path.join(self.storage, name) |
---|
| 70 | if os.path.exists(path): |
---|
| 71 | continue |
---|
| 72 | os.mkdir(path) |
---|
| 73 | return |
---|
| 74 | |
---|
[4173] | 75 | def getReceivers(self): |
---|
| 76 | receivers = [] |
---|
| 77 | curr_obj = getattr(self, '__parent__', None) |
---|
| 78 | while curr_obj is not None: |
---|
| 79 | if ICSVDataReceivers.providedBy(curr_obj): |
---|
| 80 | receivers = self.getCSVDataReceivers(curr_obj) |
---|
| 81 | break |
---|
| 82 | curr_obj = getattr(curr_obj, '__parent__', None) |
---|
| 83 | return receivers |
---|
[4177] | 84 | |
---|
| 85 | def getReceiverIds(self): |
---|
[4183] | 86 | """Get a dict of available receivers. |
---|
| 87 | |
---|
| 88 | The keys of the result are the receiver ids. |
---|
| 89 | """ |
---|
[4177] | 90 | receivers = self.getReceivers() |
---|
| 91 | return dict([(self.getReceiverId(x), x) for x in receivers]) |
---|
[4173] | 92 | |
---|
[4166] | 93 | def getImporters(self): |
---|
| 94 | """Get a list of all importers available. |
---|
| 95 | |
---|
| 96 | The search for available importers is done in two steps: |
---|
| 97 | |
---|
| 98 | 1) Look for a utility providing ICSVDataReceiver, |
---|
| 99 | |
---|
| 100 | 2) For every item of that utility: try to get an adapter |
---|
| 101 | providing IWAeUPCSVImporter. |
---|
| 102 | """ |
---|
| 103 | result = [] |
---|
[4173] | 104 | receivers = self.getReceivers() |
---|
[4224] | 105 | files = self.getFiles() |
---|
[4166] | 106 | for receiver in receivers: |
---|
[4224] | 107 | for file in files: |
---|
| 108 | wrapped_file = getCSVFile(file.context) |
---|
| 109 | try: |
---|
| 110 | importer = getMultiAdapter((wrapped_file, receiver), |
---|
| 111 | IWAeUPCSVImporter) |
---|
| 112 | result.append(importer) |
---|
| 113 | except: |
---|
| 114 | # No multi-adapter for this combination available... |
---|
| 115 | pass |
---|
[4166] | 116 | return result |
---|
[4146] | 117 | |
---|
[4858] | 118 | def getFiles(self, sort='name'): |
---|
[4146] | 119 | """Get a list of files stored in `storage`. |
---|
[4574] | 120 | |
---|
| 121 | Files are sorted by basename. |
---|
[4146] | 122 | """ |
---|
| 123 | result = [] |
---|
| 124 | if not os.path.exists(self.storage): |
---|
| 125 | return result |
---|
[4574] | 126 | for filename in sorted(os.listdir(self.storage)): |
---|
[4146] | 127 | fullpath = os.path.join(self.storage, filename) |
---|
| 128 | if not os.path.isfile(fullpath): |
---|
| 129 | continue |
---|
| 130 | result.append(DataCenterFile(fullpath)) |
---|
[4858] | 131 | if sort == 'date': |
---|
| 132 | # sort results in newest-first order... |
---|
| 133 | result = sorted(result, key=lambda x: x.getTimeStamp(), |
---|
| 134 | reverse=True) |
---|
[4146] | 135 | return result |
---|
| 136 | |
---|
[4858] | 137 | def getLogFiles(self): |
---|
[4908] | 138 | """Get the files from logs/ subdir. Files are sorted by name. |
---|
[4858] | 139 | """ |
---|
| 140 | result = [] |
---|
[4908] | 141 | logdir = os.path.join(self.storage, 'logs') |
---|
| 142 | if not os.path.exists(logdir): |
---|
| 143 | os.mkdir(logdir) |
---|
| 144 | for name in sorted(os.listdir(logdir)): |
---|
| 145 | if not os.path.isfile(os.path.join(logdir, name)): |
---|
[4858] | 146 | continue |
---|
| 147 | result.append( |
---|
[4908] | 148 | LogFile(os.path.join(self.storage, 'logs', name))) |
---|
[4858] | 149 | return result |
---|
| 150 | |
---|
[4190] | 151 | def setStoragePath(self, path, move=False, overwrite=False): |
---|
[4146] | 152 | """Set the path where to store files. |
---|
| 153 | """ |
---|
| 154 | path = os.path.abspath(path) |
---|
[4190] | 155 | not_copied = [] |
---|
[4152] | 156 | if not os.path.exists(path): |
---|
| 157 | raise ValueError('The path given does not exist: %s' % path) |
---|
[4173] | 158 | if move is True: |
---|
[4190] | 159 | |
---|
| 160 | not_copied = copyFileSystemTree(self.storage, path, |
---|
| 161 | overwrite=overwrite) |
---|
[4146] | 162 | self.storage = path |
---|
[4892] | 163 | self._createSubDirs() |
---|
[4883] | 164 | # Adjust logger... |
---|
| 165 | logger = self.logger |
---|
| 166 | handlers = logger.handlers |
---|
| 167 | for handler in handlers: |
---|
| 168 | logger.removeHandler(handler) |
---|
| 169 | self._setupLogger(logger) |
---|
| 170 | grok.notify(DataCenterStorageMovedEvent(self)) |
---|
[4190] | 171 | return not_copied |
---|
[4146] | 172 | |
---|
[4166] | 173 | def getCSVDataReceivers(self, obj): |
---|
| 174 | """Get a list of attributes, that can receive CSV data. |
---|
[4746] | 175 | |
---|
| 176 | We also get a list of values, if obj is a container that |
---|
| 177 | contains value items. |
---|
[4166] | 178 | """ |
---|
| 179 | result = [] |
---|
| 180 | for attr_name in dir(obj): |
---|
| 181 | if attr_name.startswith('_'): |
---|
| 182 | continue |
---|
| 183 | try: |
---|
| 184 | attr = getattr(obj, attr_name) |
---|
| 185 | # This might fail... |
---|
[4224] | 186 | #IWAeUPCSVImporter(attr) |
---|
[4166] | 187 | result.append(attr) |
---|
| 188 | except: |
---|
| 189 | pass |
---|
[4746] | 190 | if hasattr(obj, 'values'): |
---|
| 191 | try: |
---|
| 192 | result.extend(obj.values()) |
---|
| 193 | except: |
---|
| 194 | pass |
---|
[4166] | 195 | return result |
---|
| 196 | |
---|
[4170] | 197 | def getPossibleImports(self): |
---|
| 198 | """Get list of possible imports. |
---|
| 199 | |
---|
| 200 | A single import is defined as a tuple |
---|
[4166] | 201 | |
---|
[4170] | 202 | ( <file-descr>, <importers> ) |
---|
| 203 | |
---|
| 204 | where ``<file-descr>`` is an `IDataCenterFile` object and |
---|
| 205 | ``<importers>`` is a list of `IWAeUPCSVImporter` objects. |
---|
| 206 | """ |
---|
| 207 | result = [] |
---|
| 208 | importers = self.getImporters() |
---|
| 209 | for filedescr in self.getFiles(): |
---|
| 210 | possible_importers = [] |
---|
| 211 | for importer in importers: |
---|
[4246] | 212 | if importer.csvfile.path != filedescr.context: |
---|
| 213 | continue |
---|
[4173] | 214 | importer_context = ( |
---|
[4224] | 215 | importer, self.getReceiverId(importer.receiver)) |
---|
[4173] | 216 | possible_importers.append(importer_context) |
---|
[4170] | 217 | if len(possible_importers) == 0: |
---|
| 218 | continue |
---|
| 219 | result.append((filedescr, possible_importers)) |
---|
| 220 | return result |
---|
| 221 | |
---|
[4173] | 222 | def getReceiverId(self, obj): |
---|
| 223 | """Get a unique id for an object. |
---|
| 224 | |
---|
| 225 | If the object is stored in ZODB it should contain a `_p_oid` |
---|
| 226 | attribute, which is guaranteed to be unique over the ZODB. |
---|
| 227 | |
---|
| 228 | If the object has no such attribute, then it will be held in |
---|
| 229 | memory only and will vanish as soon as the request is over. In |
---|
| 230 | this case we can use the memory address of it. |
---|
[4184] | 231 | |
---|
| 232 | Both, the id() result and the ZODB oid, are actually integers |
---|
| 233 | which we return as strings. To make the difference chrystal |
---|
| 234 | clear, we prepend ``z`` to ids taken from ZODB oids. |
---|
[4173] | 235 | """ |
---|
[4175] | 236 | if not hasattr(obj, '_p_oid'): |
---|
| 237 | return str(id(obj)) |
---|
| 238 | oid = getattr(obj, '_p_oid') |
---|
[4184] | 239 | if oid is None: |
---|
| 240 | # The persistent object is not stored in the ZODB |
---|
| 241 | return str(id(obj)) |
---|
| 242 | return 'z%s' % struct.unpack('>Q', oid)[0] |
---|
[4173] | 243 | |
---|
[4224] | 244 | def doImport(self, path, receiverid, clear=None): |
---|
[4186] | 245 | receivers = self.getReceiverIds() |
---|
| 246 | if receiverid not in receivers.keys(): |
---|
| 247 | raise ValueError('The requested data receiver cannot be found.') |
---|
| 248 | receiver = receivers[receiverid] |
---|
[4224] | 249 | filewrapper = getCSVFile(path) |
---|
| 250 | if filewrapper is None: |
---|
| 251 | raise ValueError('Format of CSV file not supported.') |
---|
| 252 | importer = getMultiAdapter((filewrapper, receiver), IWAeUPCSVImporter) |
---|
[4186] | 253 | if clear is not None: |
---|
[4224] | 254 | importer.doImport(clear_old_data=clear) |
---|
[4186] | 255 | else: |
---|
[4224] | 256 | importer.doImport() |
---|
[4186] | 257 | return |
---|
| 258 | |
---|
[4896] | 259 | def _moveFile(self, source, dest): |
---|
| 260 | """Move file source to dest preserving ctime, mtime, etc. |
---|
| 261 | """ |
---|
| 262 | if not os.path.exists(source): |
---|
| 263 | self.logger.warn('No such source path: %s' % source) |
---|
| 264 | return |
---|
| 265 | if source == dest: |
---|
| 266 | return |
---|
| 267 | shutil.copyfile(source, dest) |
---|
| 268 | shutil.copystat(source, dest) |
---|
| 269 | os.unlink(source) |
---|
| 270 | |
---|
| 271 | def distProcessedFiles(self, successful, source_path, finished_file, |
---|
| 272 | pending_file, move_orig=True): |
---|
| 273 | """Put processed files into final locations. |
---|
[4858] | 274 | |
---|
[4896] | 275 | ``successful`` is a boolean that tells, whether processing was |
---|
| 276 | successful. |
---|
| 277 | |
---|
| 278 | ``source_path``: path to file that was processed. |
---|
| 279 | |
---|
| 280 | ``finished_file``, ``pending_file``: paths to the respective |
---|
| 281 | generated .pending and .finished file. The .pending file path |
---|
| 282 | may be ``None``. |
---|
| 283 | |
---|
[4904] | 284 | If finished file is placed in a location outside the local |
---|
| 285 | storage dir, the complete directory is removed |
---|
| 286 | afterwards. Regular importers should put their stuff in |
---|
| 287 | dedicated temporary dirs. |
---|
| 288 | |
---|
[4896] | 289 | See datacenter.txt for more info about how this works. |
---|
| 290 | """ |
---|
| 291 | basename = os.path.basename(source_path) |
---|
| 292 | pending_name = basename |
---|
| 293 | pending = False |
---|
| 294 | finished_dir = os.path.join(self.storage, 'finished') |
---|
| 295 | unfinished_dir = os.path.join(self.storage, 'unfinished') |
---|
| 296 | |
---|
| 297 | if basename.endswith('.pending.csv'): |
---|
| 298 | maybe_basename = "%s.csv" % basename.split('.', 2)[0] |
---|
| 299 | maybe_src = os.path.join(unfinished_dir, maybe_basename) |
---|
| 300 | if os.path.isfile(maybe_src): |
---|
| 301 | basename = maybe_basename |
---|
| 302 | pending = True |
---|
| 303 | |
---|
| 304 | base, ext = os.path.splitext(basename) |
---|
| 305 | finished_name = "%s.finished%s" % (base, ext) |
---|
| 306 | if not pending: |
---|
| 307 | pending_name = "%s.pending%s" % (base, ext) |
---|
| 308 | |
---|
| 309 | # Put .pending and .finished file into respective places... |
---|
| 310 | pending_dest = os.path.join(self.storage, pending_name) |
---|
| 311 | finished_dest = os.path.join(finished_dir, finished_name) |
---|
| 312 | self._moveFile(finished_file, finished_dest) |
---|
| 313 | if pending_file is not None: |
---|
| 314 | self._moveFile(pending_file, pending_dest) |
---|
| 315 | |
---|
| 316 | # Put source file into final location... |
---|
| 317 | finished_dest = os.path.join(finished_dir, basename) |
---|
| 318 | unfinished_dest = os.path.join(unfinished_dir, basename) |
---|
| 319 | if successful and not pending: |
---|
| 320 | self._moveFile(source_path, finished_dest) |
---|
| 321 | elif successful and pending: |
---|
| 322 | self._moveFile(unfinished_dest, finished_dest) |
---|
| 323 | os.unlink(source_path) |
---|
| 324 | elif not successful and not pending: |
---|
| 325 | self._moveFile(source_path, unfinished_dest) |
---|
[4904] | 326 | |
---|
| 327 | # If finished and pending-file were created in a location |
---|
| 328 | # outside datacenter storage, we remove it. |
---|
[4906] | 329 | maybe_temp_dir = os.path.dirname(finished_file) |
---|
[4904] | 330 | if os.path.commonprefix( |
---|
[4906] | 331 | [self.storage, maybe_temp_dir]) != self.storage: |
---|
| 332 | shutil.rmtree(maybe_temp_dir) |
---|
[4896] | 333 | return |
---|
| 334 | |
---|
| 335 | |
---|
[4146] | 336 | class DataCenterFile(object): |
---|
| 337 | """A description of a file stored in data center. |
---|
| 338 | """ |
---|
[4166] | 339 | grok.implements(IDataCenterFile) |
---|
| 340 | |
---|
[4146] | 341 | def __init__(self, context): |
---|
| 342 | self.context = context |
---|
| 343 | self.name = os.path.basename(self.context) |
---|
| 344 | self.size = self.getSize() |
---|
| 345 | self.uploaddate = self.getDate() |
---|
[4858] | 346 | self.lines = self.getLinesNumber() |
---|
[4146] | 347 | |
---|
| 348 | def getDate(self): |
---|
| 349 | """Get a human readable datetime representation. |
---|
| 350 | """ |
---|
| 351 | date = datetime.fromtimestamp(os.path.getctime(self.context)) |
---|
| 352 | return date.strftime('%c') |
---|
| 353 | |
---|
[4858] | 354 | def getTimeStamp(self): |
---|
| 355 | """Get a (machine readable) timestamp. |
---|
| 356 | """ |
---|
| 357 | return os.path.getctime(self.context) |
---|
| 358 | |
---|
[4146] | 359 | def getSize(self): |
---|
| 360 | """Get a human readable file size. |
---|
| 361 | """ |
---|
| 362 | bytesize = os.path.getsize(self.context) |
---|
| 363 | size = "%s bytes" % bytesize |
---|
| 364 | units = ['kb', 'MB', 'GB'] |
---|
| 365 | for power, unit in reversed(list(enumerate(units))): |
---|
| 366 | power += 1 |
---|
| 367 | if bytesize >= 1024 ** power: |
---|
| 368 | size = "%.2f %s" % (bytesize/(1024.0**power), unit) |
---|
| 369 | break |
---|
| 370 | return size |
---|
| 371 | |
---|
[4858] | 372 | def getLinesNumber(self): |
---|
| 373 | """Get number of lines. |
---|
| 374 | """ |
---|
| 375 | num = 0 |
---|
| 376 | for line in open(self.context, 'rb'): |
---|
| 377 | num += 1 |
---|
| 378 | return num |
---|
| 379 | |
---|
| 380 | class LogFile(DataCenterFile): |
---|
| 381 | """A description of a log file. |
---|
| 382 | """ |
---|
| 383 | def __init__(self, context): |
---|
| 384 | super(LogFile, self).__init__(context) |
---|
| 385 | self._markers = dict() |
---|
| 386 | self._parsed = False |
---|
| 387 | self.userid = self.getUserId() |
---|
| 388 | self.mode = self.getMode() |
---|
| 389 | self.stats = self.getStats() |
---|
| 390 | self.source = self.getSourcePath() |
---|
| 391 | |
---|
| 392 | def _parseFile(self, maxline=10): |
---|
| 393 | """Find markers in a file. |
---|
| 394 | """ |
---|
| 395 | if self._parsed: |
---|
| 396 | return |
---|
| 397 | for line in open(self.context, 'rb'): |
---|
| 398 | line = line.strip() |
---|
| 399 | if not ':' in line: |
---|
| 400 | continue |
---|
| 401 | name, text = line.split(':', 1) |
---|
| 402 | self._markers[name.lower()] = text |
---|
| 403 | self._parsed = True |
---|
| 404 | return |
---|
| 405 | |
---|
| 406 | def _getMarker(self, marker): |
---|
| 407 | marker = marker.lower() |
---|
| 408 | if not self._parsed: |
---|
| 409 | self._parseFile() |
---|
| 410 | if marker in self._markers.keys(): |
---|
| 411 | return self._markers[marker] |
---|
| 412 | |
---|
| 413 | def getUserId(self): |
---|
| 414 | return self._getMarker('user') or '<UNKNOWN>' |
---|
| 415 | |
---|
| 416 | def getMode(self): |
---|
| 417 | return self._getMarker('mode') or '<NOT SET>' |
---|
| 418 | |
---|
| 419 | def getStats(self): |
---|
| 420 | return self._getMarker('processed') or '<Info not avail.>' |
---|
| 421 | |
---|
| 422 | def getSourcePath(self): |
---|
| 423 | return self._getMarker('source') or None |
---|
| 424 | |
---|
[4173] | 425 | class Import(object): |
---|
[4186] | 426 | """Helper class to aggregate imports and their data. |
---|
| 427 | """ |
---|
[4173] | 428 | def __init__(self, filedescr, importers): |
---|
| 429 | self.file = filedescr |
---|
[4175] | 430 | self.importers = [] |
---|
| 431 | for importer, receiverid in importers: |
---|
| 432 | importer.receiverid = receiverid |
---|
| 433 | self.importers.append(importer) |
---|
[4883] | 434 | |
---|
| 435 | class DataCenterStorageMovedEvent(ObjectEvent): |
---|
| 436 | """An event fired, when datacenter storage moves. |
---|
| 437 | """ |
---|
| 438 | grok.implements(IDataCenterStorageMovedEvent) |
---|