source: main/waeup.kofa/trunk/src/waeup/kofa/datacenter.py @ 13599

Last change on this file since 13599 was 10045, checked in by uli, 12 years ago

Make logfile parsing unicode-aware.

  • Property svn:keywords set to Id
File size: 15.2 KB
RevLine 
[7193]1## $Id: datacenter.py 10045 2013-03-23 10:48:31Z uli $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
[7819]18"""Kofa data center.
[4146]19
20The waeup data center cares for management of upload data and provides
21tools for importing/exporting CSV data.
22"""
[10045]23import codecs
24import fnmatch
25import grok
[4146]26import os
[8512]27import re
[4896]28import shutil
[4146]29from datetime import datetime
[7568]30from zope.component import getUtility
[4883]31from zope.component.interfaces import ObjectEvent
[7811]32from waeup.kofa.interfaces import (IDataCenter, IDataCenterFile,
[7568]33                                   IDataCenterStorageMovedEvent,
[7579]34                                   IDataCenterConfig)
[9217]35from waeup.kofa.utils.batching import ExportJobContainer
[8634]36from waeup.kofa.utils.helpers import copy_filesystem_tree, merge_csv_files
[7811]37from waeup.kofa.utils.logger import Logger
[4146]38
[8512]39#: Regular expression describing a logfile name with backup extension
[8516]40RE_LOGFILE_BACKUP_NAME = re.compile('^.+\.\d+$')
[8512]41
[9217]42class DataCenter(grok.Container, Logger, ExportJobContainer):
[4146]43    """A data center contains CSV files.
44    """
[4669]45    grok.implements(IDataCenter)
[4166]46
[7811]47    logger_name = 'waeup.kofa.${sitename}.datacenter'
[6578]48    logger_filename = 'datacenter.log'
[4873]49
[9589]50    max_files = 100
51
[4892]52    def __init__(self, *args, **kw):
53        super(DataCenter, self).__init__(*args, **kw)
[7579]54        self.storage = getUtility(IDataCenterConfig)['path']
[4892]55        self._createSubDirs()
[6286]56
[4892]57    def _createSubDirs(self):
58        """Create standard subdirs.
59        """
[8372]60        for name in ['finished', 'unfinished', 'logs', 'deleted']:
[4892]61            path = os.path.join(self.storage, name)
62            if os.path.exists(path):
63                continue
64            os.mkdir(path)
65        return
[6286]66
[8395]67    @property
68    def deleted_path(self):
69        """Get the path for deleted object data.
70        """
71        return os.path.join(self.storage, 'deleted')
72
[9023]73    def getPendingFiles(self, sort='name'):
[4146]74        """Get a list of files stored in `storage`.
[4574]75
76        Files are sorted by basename.
[4146]77        """
78        result = []
79        if not os.path.exists(self.storage):
80            return result
[4574]81        for filename in sorted(os.listdir(self.storage)):
[4146]82            fullpath = os.path.join(self.storage, filename)
83            if not os.path.isfile(fullpath):
84                continue
[9008]85            if not filename.endswith('.csv'):
86                continue
[4146]87            result.append(DataCenterFile(fullpath))
[4858]88        if sort == 'date':
89            # sort results in newest-first order...
90            result = sorted(result, key=lambda x: x.getTimeStamp(),
91                            reverse=True)
[4146]92        return result
93
[9074]94    def getFinishedFiles(self):
[9023]95        """Get a list of files stored in `finished` subfolder of `storage`.
96
[9074]97        Files are unsorted.
[9023]98        """
99        result = []
100        finished_dir = os.path.join(self.storage, 'finished')
101        if not os.path.exists(finished_dir):
102            return result
[9589]103        mtime = lambda f: os.stat(os.path.join(finished_dir, f)).st_mtime
104        finished_files = [f for f in
105            sorted(os.listdir(finished_dir), key=mtime, reverse=True)
106            if fnmatch.fnmatch(f, '*.finished.csv')]
107        for filename in finished_files[:self.max_files]:
[9023]108            fullpath = os.path.join(finished_dir, filename)
109            if not os.path.isfile(fullpath):
110                continue
111            if not filename.endswith('.csv'):
112                continue
113            result.append(DataCenterFile(fullpath, 'finished'))
114        return result
115
[8516]116    def getLogFiles(self, exclude_backups=True):
[4908]117        """Get the files from logs/ subdir. Files are sorted by name.
[8516]118
[9769]119        By default backup logs ('app.log.1', etc.) and payments.log
120        are excluded.
[4858]121        """
122        result = []
[4908]123        logdir = os.path.join(self.storage, 'logs')
124        if not os.path.exists(logdir):
125            os.mkdir(logdir)
126        for name in sorted(os.listdir(logdir)):
127            if not os.path.isfile(os.path.join(logdir, name)):
[4858]128                continue
[9769]129            if name == 'payments.log':
130                continue
[8516]131            if exclude_backups == True and RE_LOGFILE_BACKUP_NAME.match(name):
132                continue
[4858]133            result.append(
[4908]134                LogFile(os.path.join(self.storage, 'logs', name)))
[4858]135        return result
[6286]136
[4190]137    def setStoragePath(self, path, move=False, overwrite=False):
[4146]138        """Set the path where to store files.
139        """
140        path = os.path.abspath(path)
[4190]141        not_copied = []
[4152]142        if not os.path.exists(path):
143            raise ValueError('The path given does not exist: %s' % path)
[4173]144        if move is True:
[7186]145            not_copied = copy_filesystem_tree(self.storage, path,
[4190]146                                            overwrite=overwrite)
[4146]147        self.storage = path
[4892]148        self._createSubDirs()
[4883]149        grok.notify(DataCenterStorageMovedEvent(self))
[4190]150        return not_copied
[4146]151
[4896]152    def _moveFile(self, source, dest):
153        """Move file source to dest preserving ctime, mtime, etc.
154        """
155        if not os.path.exists(source):
156            self.logger.warn('No such source path: %s' % source)
157            return
158        if source == dest:
159            return
160        shutil.copyfile(source, dest)
161        shutil.copystat(source, dest)
162        os.unlink(source)
[6286]163
[8634]164    def _appendCSVFile(self, source, dest):
165        """Append data from CSV file `source` to data from CSV file `dest`.
166
167        The `source` file is deleted afterwards.
168        """
169        if not os.path.exists(dest):
170            return self._moveFile(source, dest)
171        if not os.path.exists(source):
172            self.logger.warn('No such source path: %s' % source)
173            return
174        if source == dest:
175            return
176        result_path = merge_csv_files(dest, source)
[8905]177        os.chmod(result_path, 0664)
[8634]178        self._moveFile(result_path, dest)
179        os.unlink(source)
180
[4896]181    def distProcessedFiles(self, successful, source_path, finished_file,
[4996]182                           pending_file, mode='create', move_orig=True):
[4896]183        """Put processed files into final locations.
[4858]184
[4896]185        ``successful`` is a boolean that tells, whether processing was
186        successful.
187
188        ``source_path``: path to file that was processed.
189
190        ``finished_file``, ``pending_file``: paths to the respective
191        generated .pending and .finished file. The .pending file path
192        may be ``None``.
193
[4904]194        If finished file is placed in a location outside the local
195        storage dir, the complete directory is removed
[7933]196        afterwards. Regular processors should put their stuff in
[4904]197        dedicated temporary dirs.
[6286]198
[4896]199        See datacenter.txt for more info about how this works.
200        """
201        basename = os.path.basename(source_path)
202        pending_name = basename
203        pending = False
204        finished_dir = os.path.join(self.storage, 'finished')
205        unfinished_dir = os.path.join(self.storage, 'unfinished')
206
207        if basename.endswith('.pending.csv'):
[4996]208            maybe_basename = "%s.csv" % basename.rsplit('.', 3)[0]
[4896]209            maybe_src = os.path.join(unfinished_dir, maybe_basename)
210            if os.path.isfile(maybe_src):
211                basename = maybe_basename
212                pending = True
[6286]213
[4896]214        base, ext = os.path.splitext(basename)
[4996]215        finished_name = "%s.%s.finished%s" % (base, mode, ext)
[4896]216        if not pending:
[4996]217            pending_name = "%s.%s.pending%s" % (base, mode, ext)
[4896]218
219        # Put .pending and .finished file into respective places...
220        pending_dest = os.path.join(self.storage, pending_name)
221        finished_dest = os.path.join(finished_dir, finished_name)
[8634]222        self._appendCSVFile(finished_file, finished_dest)
[4896]223        if pending_file is not None:
224            self._moveFile(pending_file, pending_dest)
225
226        # Put source file into final location...
227        finished_dest = os.path.join(finished_dir, basename)
228        unfinished_dest = os.path.join(unfinished_dir, basename)
229        if successful and not pending:
230            self._moveFile(source_path, finished_dest)
231        elif successful and pending:
232            self._moveFile(unfinished_dest, finished_dest)
233            os.unlink(source_path)
234        elif not successful and not pending:
235            self._moveFile(source_path, unfinished_dest)
[4904]236
237        # If finished and pending-file were created in a location
238        # outside datacenter storage, we remove it.
[4906]239        maybe_temp_dir = os.path.dirname(finished_file)
[4904]240        if os.path.commonprefix(
[4906]241            [self.storage, maybe_temp_dir]) != self.storage:
242            shutil.rmtree(maybe_temp_dir)
[4896]243        return
244
[8512]245    def _logfiles(self, basename):
246        """Get sorted logfiles starting with `basename`.
247        """
248        def numerical_suffix(name):
249            # return numerical suffix in `name` as number or 0.
250            suffix = name.rsplit('.', 1)[-1]
251            try:
252                return int(suffix)
253            except ValueError:
254                return 0
255            pass
256        files = [basename,]
257        for name in os.listdir(os.path.join(self.storage, 'logs')):
[8516]258            if RE_LOGFILE_BACKUP_NAME.match(name):
[8725]259                if name.rsplit('.', 1)[0] == basename:
260                    files.append(name)
[8592]261        return sorted(files, key=numerical_suffix, reverse=True)
[6286]262
[8512]263    def queryLogfiles(self, basename, query=None, limit=0, start=0):
264        """Search `query` in all logfiles starting with `basename`.
265
266        Returns an iterator of those lines in logfiles starting with
267        `basename` that match `query`. If you want the result as a
268        list, simply list() the iterator.
269
270        All logfiles with name `basename` and maybe some numerical
271        extension ('.1', '.2', ...) are searched for the `query` term
[8592]272        in correct chronological order. So, if you ask for a basename 'app.log',
273        then any file named 'app2.log', 'app.log.1', 'app.log',
[8512]274        etc. will be searched in that order.
275
276        The `query` is expected to be a string containing a regular
277        expression.
278
279        If `limit` is set to some numerical value, at most this number
280        of lines is returned.
281
282        With `start` you can give the number of first matched line to
283        return. `start` is zero-based, i.e. the first match has number
284        0, the scond one 1, etc.
285
286        Together with `limit` this allows some basic
287        batching. Please keep in mind that batching might give
288        unpredictable results, when logfiles change between two
289        requests. This is not a problem when only one file is searched
290        and changes include only appending new log messages.
291
292        Matches are found per line only (no multiline matches).
293
[10045]294        Result lines are returned as unicode instances decoded from
295        UTF-8 encoding. This means that logfiles must provide UTF-8
296        encoding for umlauts etc. if these should be rendered
297        properly. The returned unicode lines can be fed to page
298        templates even if they contain non-ASCII characters.
299
[8512]300        This method raises ValueError if some basic condition is not
301        met, for instance if the given query string is not a valid
302        regular expression.
303
304        Please note, that this exception will happen not before you
305        really fetch a result line.
306        """
307        try:
308            re_query = re.compile(query)
309        except:
310            raise ValueError('Invalid query string: %s' % query)
311
312        basename = basename.replace('/../', '')
313        files = self._logfiles(basename)
314
315        # Search the log files
316        num = 0
317        for name in files:
318            path = os.path.join(self.storage, 'logs', name)
319            if not os.path.isfile(path):
320                continue
[10045]321            for line in codecs.open(path, 'rb', 'utf-8'):
[8513]322                if not re_query.search(line):
[8512]323                    continue
324                num += 1
325                if (num - 1) < start:
326                    continue
327                yield line
328
329                if limit and (num - limit >= start):
330                    raise StopIteration
331        pass
332
[4146]333class DataCenterFile(object):
334    """A description of a file stored in data center.
335    """
[4166]336    grok.implements(IDataCenterFile)
[6286]337
[9023]338    def __init__(self, context, folder_name=''):
[4146]339        self.context = context
340        self.name = os.path.basename(self.context)
[9023]341        self.rel_path = os.path.join(folder_name, self.name)
[4146]342        self.size = self.getSize()
343        self.uploaddate = self.getDate()
[4858]344        self.lines = self.getLinesNumber()
[4146]345
346    def getDate(self):
347        """Get a human readable datetime representation.
348        """
[9340]349        date = datetime.fromtimestamp(os.path.getmtime(self.context))
[6827]350        return date.strftime("%Y-%m-%d %H:%M:%S")
[4146]351
[4858]352    def getTimeStamp(self):
353        """Get a (machine readable) timestamp.
354        """
[9340]355        return os.path.getmtime(self.context)
[6286]356
[4146]357    def getSize(self):
358        """Get a human readable file size.
359        """
360        bytesize = os.path.getsize(self.context)
361        size = "%s bytes" % bytesize
362        units = ['kb', 'MB', 'GB']
363        for power, unit in reversed(list(enumerate(units))):
364            power += 1
365            if bytesize >= 1024 ** power:
366                size = "%.2f %s" % (bytesize/(1024.0**power), unit)
367                break
368        return size
369
[4858]370    def getLinesNumber(self):
371        """Get number of lines.
372        """
373        num = 0
374        for line in open(self.context, 'rb'):
375            num += 1
376        return num
[6286]377
[4858]378class LogFile(DataCenterFile):
379    """A description of a log file.
380    """
381    def __init__(self, context):
382        super(LogFile, self).__init__(context)
383        self._markers = dict()
384        self._parsed = False
385        self.userid = self.getUserId()
386        self.mode = self.getMode()
387        self.stats = self.getStats()
388        self.source = self.getSourcePath()
389
390    def _parseFile(self, maxline=10):
391        """Find markers in a file.
392        """
393        if self._parsed:
394            return
[10045]395        for line in codecs.open(self.context, 'rb', 'utf-8'):
[4858]396            line = line.strip()
397            if not ':' in line:
398                continue
399            name, text = line.split(':', 1)
400            self._markers[name.lower()] = text
401        self._parsed = True
402        return
403
404    def _getMarker(self, marker):
405        marker = marker.lower()
406        if not self._parsed:
407            self._parseFile()
408        if marker in self._markers.keys():
409            return self._markers[marker]
[6286]410
[4858]411    def getUserId(self):
412        return self._getMarker('user') or '<UNKNOWN>'
413
414    def getMode(self):
415        return self._getMarker('mode') or '<NOT SET>'
416
417    def getStats(self):
418        return self._getMarker('processed') or '<Info not avail.>'
419
420    def getSourcePath(self):
421        return self._getMarker('source') or None
[4883]422
[4961]423
[4883]424class DataCenterStorageMovedEvent(ObjectEvent):
425    """An event fired, when datacenter storage moves.
426    """
427    grok.implements(IDataCenterStorageMovedEvent)
Note: See TracBrowser for help on using the repository browser.