source: main/waeup.kofa/branches/uli-zc-async/src/waeup/kofa/datacenter.py @ 9324

Last change on this file since 9324 was 9211, checked in by uli, 12 years ago

Rollback r9209. Looks like multiple merges from trunk confuse svn when merging back into trunk.

  • Property svn:keywords set to Id
File size: 13.7 KB
RevLine 
[7193]1## $Id: datacenter.py 9211 2012-09-21 08:19:35Z uli $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
[7819]18"""Kofa data center.
[4146]19
20The waeup data center cares for management of upload data and provides
21tools for importing/exporting CSV data.
22"""
23import os
[8512]24import re
[4896]25import shutil
[4146]26import grok
27from datetime import datetime
[7568]28from zope.component import getUtility
[4883]29from zope.component.interfaces import ObjectEvent
[7811]30from waeup.kofa.interfaces import (IDataCenter, IDataCenterFile,
[7568]31                                   IDataCenterStorageMovedEvent,
[7579]32                                   IDataCenterConfig)
[9098]33from waeup.kofa.utils.batching import ExportJobContainer
[8634]34from waeup.kofa.utils.helpers import copy_filesystem_tree, merge_csv_files
[7811]35from waeup.kofa.utils.logger import Logger
[4146]36
[8512]37#: Regular expression describing a logfile name with backup extension
[8516]38RE_LOGFILE_BACKUP_NAME = re.compile('^.+\.\d+$')
[8512]39
[9098]40class DataCenter(grok.Container, Logger, ExportJobContainer):
[4146]41    """A data center contains CSV files.
42    """
[4669]43    grok.implements(IDataCenter)
[4166]44
[7811]45    logger_name = 'waeup.kofa.${sitename}.datacenter'
[6578]46    logger_filename = 'datacenter.log'
[4873]47
[4892]48    def __init__(self, *args, **kw):
49        super(DataCenter, self).__init__(*args, **kw)
[7579]50        self.storage = getUtility(IDataCenterConfig)['path']
[4892]51        self._createSubDirs()
[6286]52
[4892]53    def _createSubDirs(self):
54        """Create standard subdirs.
55        """
[8372]56        for name in ['finished', 'unfinished', 'logs', 'deleted']:
[4892]57            path = os.path.join(self.storage, name)
58            if os.path.exists(path):
59                continue
60            os.mkdir(path)
61        return
[6286]62
[8395]63    @property
64    def deleted_path(self):
65        """Get the path for deleted object data.
66        """
67        return os.path.join(self.storage, 'deleted')
68
[9211]69    def getFiles(self, sort='name'):
[4146]70        """Get a list of files stored in `storage`.
[4574]71
72        Files are sorted by basename.
[4146]73        """
74        result = []
75        if not os.path.exists(self.storage):
76            return result
[4574]77        for filename in sorted(os.listdir(self.storage)):
[4146]78            fullpath = os.path.join(self.storage, filename)
79            if not os.path.isfile(fullpath):
80                continue
81            result.append(DataCenterFile(fullpath))
[4858]82        if sort == 'date':
83            # sort results in newest-first order...
84            result = sorted(result, key=lambda x: x.getTimeStamp(),
85                            reverse=True)
[4146]86        return result
87
[8516]88    def getLogFiles(self, exclude_backups=True):
[4908]89        """Get the files from logs/ subdir. Files are sorted by name.
[8516]90
91        By default backup logs ('app.log.1', etc.) are excluded.
[4858]92        """
93        result = []
[4908]94        logdir = os.path.join(self.storage, 'logs')
95        if not os.path.exists(logdir):
96            os.mkdir(logdir)
97        for name in sorted(os.listdir(logdir)):
98            if not os.path.isfile(os.path.join(logdir, name)):
[4858]99                continue
[8516]100            if exclude_backups == True and RE_LOGFILE_BACKUP_NAME.match(name):
101                continue
[4858]102            result.append(
[4908]103                LogFile(os.path.join(self.storage, 'logs', name)))
[4858]104        return result
[6286]105
[4190]106    def setStoragePath(self, path, move=False, overwrite=False):
[4146]107        """Set the path where to store files.
108        """
109        path = os.path.abspath(path)
[4190]110        not_copied = []
[4152]111        if not os.path.exists(path):
112            raise ValueError('The path given does not exist: %s' % path)
[4173]113        if move is True:
[7186]114            not_copied = copy_filesystem_tree(self.storage, path,
[4190]115                                            overwrite=overwrite)
[4146]116        self.storage = path
[4892]117        self._createSubDirs()
[4883]118        grok.notify(DataCenterStorageMovedEvent(self))
[4190]119        return not_copied
[4146]120
[4896]121    def _moveFile(self, source, dest):
122        """Move file source to dest preserving ctime, mtime, etc.
123        """
124        if not os.path.exists(source):
125            self.logger.warn('No such source path: %s' % source)
126            return
127        if source == dest:
128            return
129        shutil.copyfile(source, dest)
130        shutil.copystat(source, dest)
131        os.unlink(source)
[6286]132
[8634]133    def _appendCSVFile(self, source, dest):
134        """Append data from CSV file `source` to data from CSV file `dest`.
135
136        The `source` file is deleted afterwards.
137        """
138        if not os.path.exists(dest):
139            return self._moveFile(source, dest)
140        if not os.path.exists(source):
141            self.logger.warn('No such source path: %s' % source)
142            return
143        if source == dest:
144            return
145        result_path = merge_csv_files(dest, source)
146        self._moveFile(result_path, dest)
147        os.unlink(source)
148
[4896]149    def distProcessedFiles(self, successful, source_path, finished_file,
[4996]150                           pending_file, mode='create', move_orig=True):
[4896]151        """Put processed files into final locations.
[4858]152
[4896]153        ``successful`` is a boolean that tells, whether processing was
154        successful.
155
156        ``source_path``: path to file that was processed.
157
158        ``finished_file``, ``pending_file``: paths to the respective
159        generated .pending and .finished file. The .pending file path
160        may be ``None``.
161
[4904]162        If finished file is placed in a location outside the local
163        storage dir, the complete directory is removed
[7933]164        afterwards. Regular processors should put their stuff in
[4904]165        dedicated temporary dirs.
[6286]166
[4896]167        See datacenter.txt for more info about how this works.
168        """
169        basename = os.path.basename(source_path)
170        pending_name = basename
171        pending = False
172        finished_dir = os.path.join(self.storage, 'finished')
173        unfinished_dir = os.path.join(self.storage, 'unfinished')
174
175        if basename.endswith('.pending.csv'):
[4996]176            maybe_basename = "%s.csv" % basename.rsplit('.', 3)[0]
[4896]177            maybe_src = os.path.join(unfinished_dir, maybe_basename)
178            if os.path.isfile(maybe_src):
179                basename = maybe_basename
180                pending = True
[6286]181
[4896]182        base, ext = os.path.splitext(basename)
[4996]183        finished_name = "%s.%s.finished%s" % (base, mode, ext)
[4896]184        if not pending:
[4996]185            pending_name = "%s.%s.pending%s" % (base, mode, ext)
[4896]186
187        # Put .pending and .finished file into respective places...
188        pending_dest = os.path.join(self.storage, pending_name)
189        finished_dest = os.path.join(finished_dir, finished_name)
[8634]190        self._appendCSVFile(finished_file, finished_dest)
[4896]191        if pending_file is not None:
192            self._moveFile(pending_file, pending_dest)
193
194        # Put source file into final location...
195        finished_dest = os.path.join(finished_dir, basename)
196        unfinished_dest = os.path.join(unfinished_dir, basename)
197        if successful and not pending:
198            self._moveFile(source_path, finished_dest)
199        elif successful and pending:
200            self._moveFile(unfinished_dest, finished_dest)
201            os.unlink(source_path)
202        elif not successful and not pending:
203            self._moveFile(source_path, unfinished_dest)
[4904]204
205        # If finished and pending-file were created in a location
206        # outside datacenter storage, we remove it.
[4906]207        maybe_temp_dir = os.path.dirname(finished_file)
[4904]208        if os.path.commonprefix(
[4906]209            [self.storage, maybe_temp_dir]) != self.storage:
210            shutil.rmtree(maybe_temp_dir)
[4896]211        return
212
[8512]213    def _logfiles(self, basename):
214        """Get sorted logfiles starting with `basename`.
215        """
216        def numerical_suffix(name):
217            # return numerical suffix in `name` as number or 0.
218            suffix = name.rsplit('.', 1)[-1]
219            try:
220                return int(suffix)
221            except ValueError:
222                return 0
223            pass
224        files = [basename,]
225        for name in os.listdir(os.path.join(self.storage, 'logs')):
[8516]226            if RE_LOGFILE_BACKUP_NAME.match(name):
[8725]227                if name.rsplit('.', 1)[0] == basename:
228                    files.append(name)
[8592]229        return sorted(files, key=numerical_suffix, reverse=True)
[6286]230
[8512]231    def queryLogfiles(self, basename, query=None, limit=0, start=0):
232        """Search `query` in all logfiles starting with `basename`.
233
234        Returns an iterator of those lines in logfiles starting with
235        `basename` that match `query`. If you want the result as a
236        list, simply list() the iterator.
237
238        All logfiles with name `basename` and maybe some numerical
239        extension ('.1', '.2', ...) are searched for the `query` term
[8592]240        in correct chronological order. So, if you ask for a basename 'app.log',
241        then any file named 'app2.log', 'app.log.1', 'app.log',
[8512]242        etc. will be searched in that order.
243
244        The `query` is expected to be a string containing a regular
245        expression.
246
247        If `limit` is set to some numerical value, at most this number
248        of lines is returned.
249
250        With `start` you can give the number of first matched line to
251        return. `start` is zero-based, i.e. the first match has number
252        0, the scond one 1, etc.
253
254        Together with `limit` this allows some basic
255        batching. Please keep in mind that batching might give
256        unpredictable results, when logfiles change between two
257        requests. This is not a problem when only one file is searched
258        and changes include only appending new log messages.
259
260        Matches are found per line only (no multiline matches).
261
262        This method raises ValueError if some basic condition is not
263        met, for instance if the given query string is not a valid
264        regular expression.
265
266        Please note, that this exception will happen not before you
267        really fetch a result line.
268        """
269        try:
270            re_query = re.compile(query)
271        except:
272            raise ValueError('Invalid query string: %s' % query)
273
274        basename = basename.replace('/../', '')
275        files = self._logfiles(basename)
276
277        # Search the log files
278        num = 0
279        for name in files:
280            path = os.path.join(self.storage, 'logs', name)
281            if not os.path.isfile(path):
282                continue
283            for line in open(path, 'rb'):
[8513]284                if not re_query.search(line):
[8512]285                    continue
286                num += 1
287                if (num - 1) < start:
288                    continue
289                yield line
290
291                if limit and (num - limit >= start):
292                    raise StopIteration
293        pass
294
[4146]295class DataCenterFile(object):
296    """A description of a file stored in data center.
297    """
[4166]298    grok.implements(IDataCenterFile)
[6286]299
[9211]300    def __init__(self, context):
[4146]301        self.context = context
302        self.name = os.path.basename(self.context)
303        self.size = self.getSize()
304        self.uploaddate = self.getDate()
[4858]305        self.lines = self.getLinesNumber()
[4146]306
307    def getDate(self):
308        """Get a human readable datetime representation.
309        """
310        date = datetime.fromtimestamp(os.path.getctime(self.context))
[6827]311        return date.strftime("%Y-%m-%d %H:%M:%S")
[4146]312
[4858]313    def getTimeStamp(self):
314        """Get a (machine readable) timestamp.
315        """
316        return os.path.getctime(self.context)
[6286]317
[4146]318    def getSize(self):
319        """Get a human readable file size.
320        """
321        bytesize = os.path.getsize(self.context)
322        size = "%s bytes" % bytesize
323        units = ['kb', 'MB', 'GB']
324        for power, unit in reversed(list(enumerate(units))):
325            power += 1
326            if bytesize >= 1024 ** power:
327                size = "%.2f %s" % (bytesize/(1024.0**power), unit)
328                break
329        return size
330
[4858]331    def getLinesNumber(self):
332        """Get number of lines.
333        """
334        num = 0
335        for line in open(self.context, 'rb'):
336            num += 1
337        return num
[6286]338
[4858]339class LogFile(DataCenterFile):
340    """A description of a log file.
341    """
342    def __init__(self, context):
343        super(LogFile, self).__init__(context)
344        self._markers = dict()
345        self._parsed = False
346        self.userid = self.getUserId()
347        self.mode = self.getMode()
348        self.stats = self.getStats()
349        self.source = self.getSourcePath()
350
351    def _parseFile(self, maxline=10):
352        """Find markers in a file.
353        """
354        if self._parsed:
355            return
356        for line in open(self.context, 'rb'):
357            line = line.strip()
358            if not ':' in line:
359                continue
360            name, text = line.split(':', 1)
361            self._markers[name.lower()] = text
362        self._parsed = True
363        return
364
365    def _getMarker(self, marker):
366        marker = marker.lower()
367        if not self._parsed:
368            self._parseFile()
369        if marker in self._markers.keys():
370            return self._markers[marker]
[6286]371
[4858]372    def getUserId(self):
373        return self._getMarker('user') or '<UNKNOWN>'
374
375    def getMode(self):
376        return self._getMarker('mode') or '<NOT SET>'
377
378    def getStats(self):
379        return self._getMarker('processed') or '<Info not avail.>'
380
381    def getSourcePath(self):
382        return self._getMarker('source') or None
[4883]383
[4961]384
[4883]385class DataCenterStorageMovedEvent(ObjectEvent):
386    """An event fired, when datacenter storage moves.
387    """
388    grok.implements(IDataCenterStorageMovedEvent)
Note: See TracBrowser for help on using the repository browser.