source: main/waeup.kofa/trunk/src/waeup/kofa/datacenter.py @ 8601

Last change on this file since 8601 was 8592, checked in by Henrik Bettermann, 13 years ago

Search log files in chronological order.

  • Property svn:keywords set to Id
File size: 13.0 KB
Line 
1## $Id: datacenter.py 8592 2012-06-01 12:30:57Z henrik $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
18"""Kofa data center.
19
20The waeup data center cares for management of upload data and provides
21tools for importing/exporting CSV data.
22"""
23import os
24import re
25import shutil
26import grok
27from datetime import datetime
28from zope.component import getUtility
29from zope.component.interfaces import ObjectEvent
30from waeup.kofa.interfaces import (IDataCenter, IDataCenterFile,
31                                   IDataCenterStorageMovedEvent,
32                                   IDataCenterConfig)
33from waeup.kofa.utils.helpers import copy_filesystem_tree
34from waeup.kofa.utils.logger import Logger
35
36#: Regular expression describing a logfile name with backup extension
37RE_LOGFILE_BACKUP_NAME = re.compile('^.+\.\d+$')
38
39class DataCenter(grok.Container, Logger):
40    """A data center contains CSV files.
41    """
42    grok.implements(IDataCenter)
43
44    logger_name = 'waeup.kofa.${sitename}.datacenter'
45    logger_filename = 'datacenter.log'
46
47    def __init__(self, *args, **kw):
48        super(DataCenter, self).__init__(*args, **kw)
49        self.storage = getUtility(IDataCenterConfig)['path']
50        self._createSubDirs()
51
52    def _createSubDirs(self):
53        """Create standard subdirs.
54        """
55        for name in ['finished', 'unfinished', 'logs', 'deleted']:
56            path = os.path.join(self.storage, name)
57            if os.path.exists(path):
58                continue
59            os.mkdir(path)
60        return
61
62    @property
63    def deleted_path(self):
64        """Get the path for deleted object data.
65        """
66        return os.path.join(self.storage, 'deleted')
67
68    def getFiles(self, sort='name'):
69        """Get a list of files stored in `storage`.
70
71        Files are sorted by basename.
72        """
73        result = []
74        if not os.path.exists(self.storage):
75            return result
76        for filename in sorted(os.listdir(self.storage)):
77            fullpath = os.path.join(self.storage, filename)
78            if not os.path.isfile(fullpath):
79                continue
80            result.append(DataCenterFile(fullpath))
81        if sort == 'date':
82            # sort results in newest-first order...
83            result = sorted(result, key=lambda x: x.getTimeStamp(),
84                            reverse=True)
85        return result
86
87    def getLogFiles(self, exclude_backups=True):
88        """Get the files from logs/ subdir. Files are sorted by name.
89
90        By default backup logs ('app.log.1', etc.) are excluded.
91        """
92        result = []
93        logdir = os.path.join(self.storage, 'logs')
94        if not os.path.exists(logdir):
95            os.mkdir(logdir)
96        for name in sorted(os.listdir(logdir)):
97            if not os.path.isfile(os.path.join(logdir, name)):
98                continue
99            if exclude_backups == True and RE_LOGFILE_BACKUP_NAME.match(name):
100                continue
101            result.append(
102                LogFile(os.path.join(self.storage, 'logs', name)))
103        return result
104
105    def setStoragePath(self, path, move=False, overwrite=False):
106        """Set the path where to store files.
107        """
108        path = os.path.abspath(path)
109        not_copied = []
110        if not os.path.exists(path):
111            raise ValueError('The path given does not exist: %s' % path)
112        if move is True:
113            not_copied = copy_filesystem_tree(self.storage, path,
114                                            overwrite=overwrite)
115        self.storage = path
116        self._createSubDirs()
117        grok.notify(DataCenterStorageMovedEvent(self))
118        return not_copied
119
120    def _moveFile(self, source, dest):
121        """Move file source to dest preserving ctime, mtime, etc.
122        """
123        if not os.path.exists(source):
124            self.logger.warn('No such source path: %s' % source)
125            return
126        if source == dest:
127            return
128        shutil.copyfile(source, dest)
129        shutil.copystat(source, dest)
130        os.unlink(source)
131
132    def distProcessedFiles(self, successful, source_path, finished_file,
133                           pending_file, mode='create', move_orig=True):
134        """Put processed files into final locations.
135
136        ``successful`` is a boolean that tells, whether processing was
137        successful.
138
139        ``source_path``: path to file that was processed.
140
141        ``finished_file``, ``pending_file``: paths to the respective
142        generated .pending and .finished file. The .pending file path
143        may be ``None``.
144
145        If finished file is placed in a location outside the local
146        storage dir, the complete directory is removed
147        afterwards. Regular processors should put their stuff in
148        dedicated temporary dirs.
149
150        See datacenter.txt for more info about how this works.
151        """
152        basename = os.path.basename(source_path)
153        pending_name = basename
154        pending = False
155        finished_dir = os.path.join(self.storage, 'finished')
156        unfinished_dir = os.path.join(self.storage, 'unfinished')
157
158        if basename.endswith('.pending.csv'):
159            maybe_basename = "%s.csv" % basename.rsplit('.', 3)[0]
160            maybe_src = os.path.join(unfinished_dir, maybe_basename)
161            if os.path.isfile(maybe_src):
162                basename = maybe_basename
163                pending = True
164
165        base, ext = os.path.splitext(basename)
166        finished_name = "%s.%s.finished%s" % (base, mode, ext)
167        if not pending:
168            pending_name = "%s.%s.pending%s" % (base, mode, ext)
169
170        # Put .pending and .finished file into respective places...
171        pending_dest = os.path.join(self.storage, pending_name)
172        finished_dest = os.path.join(finished_dir, finished_name)
173        self._moveFile(finished_file, finished_dest)
174        if pending_file is not None:
175            self._moveFile(pending_file, pending_dest)
176
177        # Put source file into final location...
178        finished_dest = os.path.join(finished_dir, basename)
179        unfinished_dest = os.path.join(unfinished_dir, basename)
180        if successful and not pending:
181            self._moveFile(source_path, finished_dest)
182        elif successful and pending:
183            self._moveFile(unfinished_dest, finished_dest)
184            os.unlink(source_path)
185        elif not successful and not pending:
186            self._moveFile(source_path, unfinished_dest)
187
188        # If finished and pending-file were created in a location
189        # outside datacenter storage, we remove it.
190        maybe_temp_dir = os.path.dirname(finished_file)
191        if os.path.commonprefix(
192            [self.storage, maybe_temp_dir]) != self.storage:
193            shutil.rmtree(maybe_temp_dir)
194        return
195
196    def _logfiles(self, basename):
197        """Get sorted logfiles starting with `basename`.
198        """
199        def numerical_suffix(name):
200            # return numerical suffix in `name` as number or 0.
201            suffix = name.rsplit('.', 1)[-1]
202            try:
203                return int(suffix)
204            except ValueError:
205                return 0
206            pass
207        files = [basename,]
208        for name in os.listdir(os.path.join(self.storage, 'logs')):
209            if RE_LOGFILE_BACKUP_NAME.match(name):
210                files.append(name)
211        return sorted(files, key=numerical_suffix, reverse=True)
212
213    def queryLogfiles(self, basename, query=None, limit=0, start=0):
214        """Search `query` in all logfiles starting with `basename`.
215
216        Returns an iterator of those lines in logfiles starting with
217        `basename` that match `query`. If you want the result as a
218        list, simply list() the iterator.
219
220        All logfiles with name `basename` and maybe some numerical
221        extension ('.1', '.2', ...) are searched for the `query` term
222        in correct chronological order. So, if you ask for a basename 'app.log',
223        then any file named 'app2.log', 'app.log.1', 'app.log',
224        etc. will be searched in that order.
225
226        The `query` is expected to be a string containing a regular
227        expression.
228
229        If `limit` is set to some numerical value, at most this number
230        of lines is returned.
231
232        With `start` you can give the number of first matched line to
233        return. `start` is zero-based, i.e. the first match has number
234        0, the scond one 1, etc.
235
236        Together with `limit` this allows some basic
237        batching. Please keep in mind that batching might give
238        unpredictable results, when logfiles change between two
239        requests. This is not a problem when only one file is searched
240        and changes include only appending new log messages.
241
242        Matches are found per line only (no multiline matches).
243
244        This method raises ValueError if some basic condition is not
245        met, for instance if the given query string is not a valid
246        regular expression.
247
248        Please note, that this exception will happen not before you
249        really fetch a result line.
250        """
251        try:
252            re_query = re.compile(query)
253        except:
254            raise ValueError('Invalid query string: %s' % query)
255
256        basename = basename.replace('/../', '')
257        files = self._logfiles(basename)
258
259        # Search the log files
260        num = 0
261        for name in files:
262            path = os.path.join(self.storage, 'logs', name)
263            if not os.path.isfile(path):
264                continue
265            for line in open(path, 'rb'):
266                if not re_query.search(line):
267                    continue
268                num += 1
269                if (num - 1) < start:
270                    continue
271                yield line
272
273                if limit and (num - limit >= start):
274                    raise StopIteration
275        pass
276
277class DataCenterFile(object):
278    """A description of a file stored in data center.
279    """
280    grok.implements(IDataCenterFile)
281
282    def __init__(self, context):
283        self.context = context
284        self.name = os.path.basename(self.context)
285        self.size = self.getSize()
286        self.uploaddate = self.getDate()
287        self.lines = self.getLinesNumber()
288
289    def getDate(self):
290        """Get a human readable datetime representation.
291        """
292        date = datetime.fromtimestamp(os.path.getctime(self.context))
293        return date.strftime("%Y-%m-%d %H:%M:%S")
294
295    def getTimeStamp(self):
296        """Get a (machine readable) timestamp.
297        """
298        return os.path.getctime(self.context)
299
300    def getSize(self):
301        """Get a human readable file size.
302        """
303        bytesize = os.path.getsize(self.context)
304        size = "%s bytes" % bytesize
305        units = ['kb', 'MB', 'GB']
306        for power, unit in reversed(list(enumerate(units))):
307            power += 1
308            if bytesize >= 1024 ** power:
309                size = "%.2f %s" % (bytesize/(1024.0**power), unit)
310                break
311        return size
312
313    def getLinesNumber(self):
314        """Get number of lines.
315        """
316        num = 0
317        for line in open(self.context, 'rb'):
318            num += 1
319        return num
320
321class LogFile(DataCenterFile):
322    """A description of a log file.
323    """
324    def __init__(self, context):
325        super(LogFile, self).__init__(context)
326        self._markers = dict()
327        self._parsed = False
328        self.userid = self.getUserId()
329        self.mode = self.getMode()
330        self.stats = self.getStats()
331        self.source = self.getSourcePath()
332
333    def _parseFile(self, maxline=10):
334        """Find markers in a file.
335        """
336        if self._parsed:
337            return
338        for line in open(self.context, 'rb'):
339            line = line.strip()
340            if not ':' in line:
341                continue
342            name, text = line.split(':', 1)
343            self._markers[name.lower()] = text
344        self._parsed = True
345        return
346
347    def _getMarker(self, marker):
348        marker = marker.lower()
349        if not self._parsed:
350            self._parseFile()
351        if marker in self._markers.keys():
352            return self._markers[marker]
353
354    def getUserId(self):
355        return self._getMarker('user') or '<UNKNOWN>'
356
357    def getMode(self):
358        return self._getMarker('mode') or '<NOT SET>'
359
360    def getStats(self):
361        return self._getMarker('processed') or '<Info not avail.>'
362
363    def getSourcePath(self):
364        return self._getMarker('source') or None
365
366
367class DataCenterStorageMovedEvent(ObjectEvent):
368    """An event fired, when datacenter storage moves.
369    """
370    grok.implements(IDataCenterStorageMovedEvent)
Note: See TracBrowser for help on using the repository browser.