source: main/waeup.kofa/trunk/src/waeup/kofa/datacenter.py @ 10009

Last change on this file since 10009 was 9769, checked in by Henrik Bettermann, 12 years ago

Ease customization of logging format. We use this for the new payments logger.

Do not show payments.log on logfile search page.

  • Property svn:keywords set to Id
File size: 14.8 KB
RevLine 
[7193]1## $Id: datacenter.py 9769 2012-12-05 08:32:11Z henrik $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
[7819]18"""Kofa data center.
[4146]19
20The waeup data center cares for management of upload data and provides
21tools for importing/exporting CSV data.
22"""
23import os
[9589]24import fnmatch
[8512]25import re
[4896]26import shutil
[4146]27import grok
28from datetime import datetime
[7568]29from zope.component import getUtility
[4883]30from zope.component.interfaces import ObjectEvent
[7811]31from waeup.kofa.interfaces import (IDataCenter, IDataCenterFile,
[7568]32                                   IDataCenterStorageMovedEvent,
[7579]33                                   IDataCenterConfig)
[9217]34from waeup.kofa.utils.batching import ExportJobContainer
[8634]35from waeup.kofa.utils.helpers import copy_filesystem_tree, merge_csv_files
[7811]36from waeup.kofa.utils.logger import Logger
[4146]37
[8512]38#: Regular expression describing a logfile name with backup extension
[8516]39RE_LOGFILE_BACKUP_NAME = re.compile('^.+\.\d+$')
[8512]40
[9217]41class DataCenter(grok.Container, Logger, ExportJobContainer):
[4146]42    """A data center contains CSV files.
43    """
[4669]44    grok.implements(IDataCenter)
[4166]45
[7811]46    logger_name = 'waeup.kofa.${sitename}.datacenter'
[6578]47    logger_filename = 'datacenter.log'
[4873]48
[9589]49    max_files = 100
50
[4892]51    def __init__(self, *args, **kw):
52        super(DataCenter, self).__init__(*args, **kw)
[7579]53        self.storage = getUtility(IDataCenterConfig)['path']
[4892]54        self._createSubDirs()
[6286]55
[4892]56    def _createSubDirs(self):
57        """Create standard subdirs.
58        """
[8372]59        for name in ['finished', 'unfinished', 'logs', 'deleted']:
[4892]60            path = os.path.join(self.storage, name)
61            if os.path.exists(path):
62                continue
63            os.mkdir(path)
64        return
[6286]65
[8395]66    @property
67    def deleted_path(self):
68        """Get the path for deleted object data.
69        """
70        return os.path.join(self.storage, 'deleted')
71
[9023]72    def getPendingFiles(self, sort='name'):
[4146]73        """Get a list of files stored in `storage`.
[4574]74
75        Files are sorted by basename.
[4146]76        """
77        result = []
78        if not os.path.exists(self.storage):
79            return result
[4574]80        for filename in sorted(os.listdir(self.storage)):
[4146]81            fullpath = os.path.join(self.storage, filename)
82            if not os.path.isfile(fullpath):
83                continue
[9008]84            if not filename.endswith('.csv'):
85                continue
[4146]86            result.append(DataCenterFile(fullpath))
[4858]87        if sort == 'date':
88            # sort results in newest-first order...
89            result = sorted(result, key=lambda x: x.getTimeStamp(),
90                            reverse=True)
[4146]91        return result
92
[9074]93    def getFinishedFiles(self):
[9023]94        """Get a list of files stored in `finished` subfolder of `storage`.
95
[9074]96        Files are unsorted.
[9023]97        """
98        result = []
99        finished_dir = os.path.join(self.storage, 'finished')
100        if not os.path.exists(finished_dir):
101            return result
[9589]102        mtime = lambda f: os.stat(os.path.join(finished_dir, f)).st_mtime
103        finished_files = [f for f in
104            sorted(os.listdir(finished_dir), key=mtime, reverse=True)
105            if fnmatch.fnmatch(f, '*.finished.csv')]
106        for filename in finished_files[:self.max_files]:
[9023]107            fullpath = os.path.join(finished_dir, filename)
108            if not os.path.isfile(fullpath):
109                continue
110            if not filename.endswith('.csv'):
111                continue
112            result.append(DataCenterFile(fullpath, 'finished'))
113        return result
114
[8516]115    def getLogFiles(self, exclude_backups=True):
[4908]116        """Get the files from logs/ subdir. Files are sorted by name.
[8516]117
[9769]118        By default backup logs ('app.log.1', etc.) and payments.log
119        are excluded.
[4858]120        """
121        result = []
[4908]122        logdir = os.path.join(self.storage, 'logs')
123        if not os.path.exists(logdir):
124            os.mkdir(logdir)
125        for name in sorted(os.listdir(logdir)):
126            if not os.path.isfile(os.path.join(logdir, name)):
[4858]127                continue
[9769]128            if name == 'payments.log':
129                continue
[8516]130            if exclude_backups == True and RE_LOGFILE_BACKUP_NAME.match(name):
131                continue
[4858]132            result.append(
[4908]133                LogFile(os.path.join(self.storage, 'logs', name)))
[4858]134        return result
[6286]135
[4190]136    def setStoragePath(self, path, move=False, overwrite=False):
[4146]137        """Set the path where to store files.
138        """
139        path = os.path.abspath(path)
[4190]140        not_copied = []
[4152]141        if not os.path.exists(path):
142            raise ValueError('The path given does not exist: %s' % path)
[4173]143        if move is True:
[7186]144            not_copied = copy_filesystem_tree(self.storage, path,
[4190]145                                            overwrite=overwrite)
[4146]146        self.storage = path
[4892]147        self._createSubDirs()
[4883]148        grok.notify(DataCenterStorageMovedEvent(self))
[4190]149        return not_copied
[4146]150
[4896]151    def _moveFile(self, source, dest):
152        """Move file source to dest preserving ctime, mtime, etc.
153        """
154        if not os.path.exists(source):
155            self.logger.warn('No such source path: %s' % source)
156            return
157        if source == dest:
158            return
159        shutil.copyfile(source, dest)
160        shutil.copystat(source, dest)
161        os.unlink(source)
[6286]162
[8634]163    def _appendCSVFile(self, source, dest):
164        """Append data from CSV file `source` to data from CSV file `dest`.
165
166        The `source` file is deleted afterwards.
167        """
168        if not os.path.exists(dest):
169            return self._moveFile(source, dest)
170        if not os.path.exists(source):
171            self.logger.warn('No such source path: %s' % source)
172            return
173        if source == dest:
174            return
175        result_path = merge_csv_files(dest, source)
[8905]176        os.chmod(result_path, 0664)
[8634]177        self._moveFile(result_path, dest)
178        os.unlink(source)
179
[4896]180    def distProcessedFiles(self, successful, source_path, finished_file,
[4996]181                           pending_file, mode='create', move_orig=True):
[4896]182        """Put processed files into final locations.
[4858]183
[4896]184        ``successful`` is a boolean that tells, whether processing was
185        successful.
186
187        ``source_path``: path to file that was processed.
188
189        ``finished_file``, ``pending_file``: paths to the respective
190        generated .pending and .finished file. The .pending file path
191        may be ``None``.
192
[4904]193        If finished file is placed in a location outside the local
194        storage dir, the complete directory is removed
[7933]195        afterwards. Regular processors should put their stuff in
[4904]196        dedicated temporary dirs.
[6286]197
[4896]198        See datacenter.txt for more info about how this works.
199        """
200        basename = os.path.basename(source_path)
201        pending_name = basename
202        pending = False
203        finished_dir = os.path.join(self.storage, 'finished')
204        unfinished_dir = os.path.join(self.storage, 'unfinished')
205
206        if basename.endswith('.pending.csv'):
[4996]207            maybe_basename = "%s.csv" % basename.rsplit('.', 3)[0]
[4896]208            maybe_src = os.path.join(unfinished_dir, maybe_basename)
209            if os.path.isfile(maybe_src):
210                basename = maybe_basename
211                pending = True
[6286]212
[4896]213        base, ext = os.path.splitext(basename)
[4996]214        finished_name = "%s.%s.finished%s" % (base, mode, ext)
[4896]215        if not pending:
[4996]216            pending_name = "%s.%s.pending%s" % (base, mode, ext)
[4896]217
218        # Put .pending and .finished file into respective places...
219        pending_dest = os.path.join(self.storage, pending_name)
220        finished_dest = os.path.join(finished_dir, finished_name)
[8634]221        self._appendCSVFile(finished_file, finished_dest)
[4896]222        if pending_file is not None:
223            self._moveFile(pending_file, pending_dest)
224
225        # Put source file into final location...
226        finished_dest = os.path.join(finished_dir, basename)
227        unfinished_dest = os.path.join(unfinished_dir, basename)
228        if successful and not pending:
229            self._moveFile(source_path, finished_dest)
230        elif successful and pending:
231            self._moveFile(unfinished_dest, finished_dest)
232            os.unlink(source_path)
233        elif not successful and not pending:
234            self._moveFile(source_path, unfinished_dest)
[4904]235
236        # If finished and pending-file were created in a location
237        # outside datacenter storage, we remove it.
[4906]238        maybe_temp_dir = os.path.dirname(finished_file)
[4904]239        if os.path.commonprefix(
[4906]240            [self.storage, maybe_temp_dir]) != self.storage:
241            shutil.rmtree(maybe_temp_dir)
[4896]242        return
243
[8512]244    def _logfiles(self, basename):
245        """Get sorted logfiles starting with `basename`.
246        """
247        def numerical_suffix(name):
248            # return numerical suffix in `name` as number or 0.
249            suffix = name.rsplit('.', 1)[-1]
250            try:
251                return int(suffix)
252            except ValueError:
253                return 0
254            pass
255        files = [basename,]
256        for name in os.listdir(os.path.join(self.storage, 'logs')):
[8516]257            if RE_LOGFILE_BACKUP_NAME.match(name):
[8725]258                if name.rsplit('.', 1)[0] == basename:
259                    files.append(name)
[8592]260        return sorted(files, key=numerical_suffix, reverse=True)
[6286]261
[8512]262    def queryLogfiles(self, basename, query=None, limit=0, start=0):
263        """Search `query` in all logfiles starting with `basename`.
264
265        Returns an iterator of those lines in logfiles starting with
266        `basename` that match `query`. If you want the result as a
267        list, simply list() the iterator.
268
269        All logfiles with name `basename` and maybe some numerical
270        extension ('.1', '.2', ...) are searched for the `query` term
[8592]271        in correct chronological order. So, if you ask for a basename 'app.log',
272        then any file named 'app2.log', 'app.log.1', 'app.log',
[8512]273        etc. will be searched in that order.
274
275        The `query` is expected to be a string containing a regular
276        expression.
277
278        If `limit` is set to some numerical value, at most this number
279        of lines is returned.
280
281        With `start` you can give the number of first matched line to
282        return. `start` is zero-based, i.e. the first match has number
283        0, the scond one 1, etc.
284
285        Together with `limit` this allows some basic
286        batching. Please keep in mind that batching might give
287        unpredictable results, when logfiles change between two
288        requests. This is not a problem when only one file is searched
289        and changes include only appending new log messages.
290
291        Matches are found per line only (no multiline matches).
292
293        This method raises ValueError if some basic condition is not
294        met, for instance if the given query string is not a valid
295        regular expression.
296
297        Please note, that this exception will happen not before you
298        really fetch a result line.
299        """
300        try:
301            re_query = re.compile(query)
302        except:
303            raise ValueError('Invalid query string: %s' % query)
304
305        basename = basename.replace('/../', '')
306        files = self._logfiles(basename)
307
308        # Search the log files
309        num = 0
310        for name in files:
311            path = os.path.join(self.storage, 'logs', name)
312            if not os.path.isfile(path):
313                continue
314            for line in open(path, 'rb'):
[8513]315                if not re_query.search(line):
[8512]316                    continue
317                num += 1
318                if (num - 1) < start:
319                    continue
320                yield line
321
322                if limit and (num - limit >= start):
323                    raise StopIteration
324        pass
325
[4146]326class DataCenterFile(object):
327    """A description of a file stored in data center.
328    """
[4166]329    grok.implements(IDataCenterFile)
[6286]330
[9023]331    def __init__(self, context, folder_name=''):
[4146]332        self.context = context
333        self.name = os.path.basename(self.context)
[9023]334        self.rel_path = os.path.join(folder_name, self.name)
[4146]335        self.size = self.getSize()
336        self.uploaddate = self.getDate()
[4858]337        self.lines = self.getLinesNumber()
[4146]338
339    def getDate(self):
340        """Get a human readable datetime representation.
341        """
[9340]342        date = datetime.fromtimestamp(os.path.getmtime(self.context))
[6827]343        return date.strftime("%Y-%m-%d %H:%M:%S")
[4146]344
[4858]345    def getTimeStamp(self):
346        """Get a (machine readable) timestamp.
347        """
[9340]348        return os.path.getmtime(self.context)
[6286]349
[4146]350    def getSize(self):
351        """Get a human readable file size.
352        """
353        bytesize = os.path.getsize(self.context)
354        size = "%s bytes" % bytesize
355        units = ['kb', 'MB', 'GB']
356        for power, unit in reversed(list(enumerate(units))):
357            power += 1
358            if bytesize >= 1024 ** power:
359                size = "%.2f %s" % (bytesize/(1024.0**power), unit)
360                break
361        return size
362
[4858]363    def getLinesNumber(self):
364        """Get number of lines.
365        """
366        num = 0
367        for line in open(self.context, 'rb'):
368            num += 1
369        return num
[6286]370
[4858]371class LogFile(DataCenterFile):
372    """A description of a log file.
373    """
374    def __init__(self, context):
375        super(LogFile, self).__init__(context)
376        self._markers = dict()
377        self._parsed = False
378        self.userid = self.getUserId()
379        self.mode = self.getMode()
380        self.stats = self.getStats()
381        self.source = self.getSourcePath()
382
383    def _parseFile(self, maxline=10):
384        """Find markers in a file.
385        """
386        if self._parsed:
387            return
388        for line in open(self.context, 'rb'):
389            line = line.strip()
390            if not ':' in line:
391                continue
392            name, text = line.split(':', 1)
393            self._markers[name.lower()] = text
394        self._parsed = True
395        return
396
397    def _getMarker(self, marker):
398        marker = marker.lower()
399        if not self._parsed:
400            self._parseFile()
401        if marker in self._markers.keys():
402            return self._markers[marker]
[6286]403
[4858]404    def getUserId(self):
405        return self._getMarker('user') or '<UNKNOWN>'
406
407    def getMode(self):
408        return self._getMarker('mode') or '<NOT SET>'
409
410    def getStats(self):
411        return self._getMarker('processed') or '<Info not avail.>'
412
413    def getSourcePath(self):
414        return self._getMarker('source') or None
[4883]415
[4961]416
[4883]417class DataCenterStorageMovedEvent(ObjectEvent):
418    """An event fired, when datacenter storage moves.
419    """
420    grok.implements(IDataCenterStorageMovedEvent)
Note: See TracBrowser for help on using the repository browser.