source: main/waeup.kofa/trunk/src/waeup/kofa/datacenter.py @ 8512

Last change on this file since 8512 was 8512, checked in by uli, 12 years ago

Add support for datacenter to search log files.

  • Property svn:keywords set to Id
File size: 12.7 KB
Line 
1## $Id: datacenter.py 8512 2012-05-25 00:30:22Z uli $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
18"""Kofa data center.
19
20The waeup data center cares for management of upload data and provides
21tools for importing/exporting CSV data.
22"""
23import os
24import re
25import shutil
26import grok
27from datetime import datetime
28from zope.component import getUtility
29from zope.component.interfaces import ObjectEvent
30from waeup.kofa.interfaces import (IDataCenter, IDataCenterFile,
31                                   IDataCenterStorageMovedEvent,
32                                   IDataCenterConfig)
33from waeup.kofa.utils.helpers import copy_filesystem_tree
34from waeup.kofa.utils.logger import Logger
35
36#: Regular expression describing a logfile name with backup extension
37RE_LOGFILE_NAME = re.compile('^.+\.\d+$')
38
39class DataCenter(grok.Container, Logger):
40    """A data center contains CSV files.
41    """
42    grok.implements(IDataCenter)
43
44    logger_name = 'waeup.kofa.${sitename}.datacenter'
45    logger_filename = 'datacenter.log'
46
47    def __init__(self, *args, **kw):
48        super(DataCenter, self).__init__(*args, **kw)
49        self.storage = getUtility(IDataCenterConfig)['path']
50        self._createSubDirs()
51
52    def _createSubDirs(self):
53        """Create standard subdirs.
54        """
55        for name in ['finished', 'unfinished', 'logs', 'deleted']:
56            path = os.path.join(self.storage, name)
57            if os.path.exists(path):
58                continue
59            os.mkdir(path)
60        return
61
62    @property
63    def deleted_path(self):
64        """Get the path for deleted object data.
65        """
66        return os.path.join(self.storage, 'deleted')
67
68    def getFiles(self, sort='name'):
69        """Get a list of files stored in `storage`.
70
71        Files are sorted by basename.
72        """
73        result = []
74        if not os.path.exists(self.storage):
75            return result
76        for filename in sorted(os.listdir(self.storage)):
77            fullpath = os.path.join(self.storage, filename)
78            if not os.path.isfile(fullpath):
79                continue
80            result.append(DataCenterFile(fullpath))
81        if sort == 'date':
82            # sort results in newest-first order...
83            result = sorted(result, key=lambda x: x.getTimeStamp(),
84                            reverse=True)
85        return result
86
87    def getLogFiles(self):
88        """Get the files from logs/ subdir. Files are sorted by name.
89        """
90        result = []
91        logdir = os.path.join(self.storage, 'logs')
92        if not os.path.exists(logdir):
93            os.mkdir(logdir)
94        for name in sorted(os.listdir(logdir)):
95            if not os.path.isfile(os.path.join(logdir, name)):
96                continue
97            result.append(
98                LogFile(os.path.join(self.storage, 'logs', name)))
99        return result
100
101    def setStoragePath(self, path, move=False, overwrite=False):
102        """Set the path where to store files.
103        """
104        path = os.path.abspath(path)
105        not_copied = []
106        if not os.path.exists(path):
107            raise ValueError('The path given does not exist: %s' % path)
108        if move is True:
109            not_copied = copy_filesystem_tree(self.storage, path,
110                                            overwrite=overwrite)
111        self.storage = path
112        self._createSubDirs()
113        grok.notify(DataCenterStorageMovedEvent(self))
114        return not_copied
115
116    def _moveFile(self, source, dest):
117        """Move file source to dest preserving ctime, mtime, etc.
118        """
119        if not os.path.exists(source):
120            self.logger.warn('No such source path: %s' % source)
121            return
122        if source == dest:
123            return
124        shutil.copyfile(source, dest)
125        shutil.copystat(source, dest)
126        os.unlink(source)
127
128    def distProcessedFiles(self, successful, source_path, finished_file,
129                           pending_file, mode='create', move_orig=True):
130        """Put processed files into final locations.
131
132        ``successful`` is a boolean that tells, whether processing was
133        successful.
134
135        ``source_path``: path to file that was processed.
136
137        ``finished_file``, ``pending_file``: paths to the respective
138        generated .pending and .finished file. The .pending file path
139        may be ``None``.
140
141        If finished file is placed in a location outside the local
142        storage dir, the complete directory is removed
143        afterwards. Regular processors should put their stuff in
144        dedicated temporary dirs.
145
146        See datacenter.txt for more info about how this works.
147        """
148        basename = os.path.basename(source_path)
149        pending_name = basename
150        pending = False
151        finished_dir = os.path.join(self.storage, 'finished')
152        unfinished_dir = os.path.join(self.storage, 'unfinished')
153
154        if basename.endswith('.pending.csv'):
155            maybe_basename = "%s.csv" % basename.rsplit('.', 3)[0]
156            maybe_src = os.path.join(unfinished_dir, maybe_basename)
157            if os.path.isfile(maybe_src):
158                basename = maybe_basename
159                pending = True
160
161        base, ext = os.path.splitext(basename)
162        finished_name = "%s.%s.finished%s" % (base, mode, ext)
163        if not pending:
164            pending_name = "%s.%s.pending%s" % (base, mode, ext)
165
166        # Put .pending and .finished file into respective places...
167        pending_dest = os.path.join(self.storage, pending_name)
168        finished_dest = os.path.join(finished_dir, finished_name)
169        self._moveFile(finished_file, finished_dest)
170        if pending_file is not None:
171            self._moveFile(pending_file, pending_dest)
172
173        # Put source file into final location...
174        finished_dest = os.path.join(finished_dir, basename)
175        unfinished_dest = os.path.join(unfinished_dir, basename)
176        if successful and not pending:
177            self._moveFile(source_path, finished_dest)
178        elif successful and pending:
179            self._moveFile(unfinished_dest, finished_dest)
180            os.unlink(source_path)
181        elif not successful and not pending:
182            self._moveFile(source_path, unfinished_dest)
183
184        # If finished and pending-file were created in a location
185        # outside datacenter storage, we remove it.
186        maybe_temp_dir = os.path.dirname(finished_file)
187        if os.path.commonprefix(
188            [self.storage, maybe_temp_dir]) != self.storage:
189            shutil.rmtree(maybe_temp_dir)
190        return
191
192    def _logfiles(self, basename):
193        """Get sorted logfiles starting with `basename`.
194        """
195        def numerical_suffix(name):
196            # return numerical suffix in `name` as number or 0.
197            suffix = name.rsplit('.', 1)[-1]
198            try:
199                return int(suffix)
200            except ValueError:
201                return 0
202            pass
203        files = [basename,]
204        for name in os.listdir(os.path.join(self.storage, 'logs')):
205            if RE_LOGFILE_NAME.match(name):
206                files.append(name)
207        return sorted(files, key=numerical_suffix)
208
209    def queryLogfiles(self, basename, query=None, limit=0, start=0):
210        """Search `query` in all logfiles starting with `basename`.
211
212        Returns an iterator of those lines in logfiles starting with
213        `basename` that match `query`. If you want the result as a
214        list, simply list() the iterator.
215
216        All logfiles with name `basename` and maybe some numerical
217        extension ('.1', '.2', ...) are searched for the `query` term
218        in correct order. So, if you ask for a basename 'app.log',
219        then any file named 'app.log', 'app.log.1', 'app.log.2',
220        etc. will be searched in that order.
221
222        The `query` is expected to be a string containing a regular
223        expression.
224
225        If `limit` is set to some numerical value, at most this number
226        of lines is returned.
227
228        With `start` you can give the number of first matched line to
229        return. `start` is zero-based, i.e. the first match has number
230        0, the scond one 1, etc.
231
232        Together with `limit` this allows some basic
233        batching. Please keep in mind that batching might give
234        unpredictable results, when logfiles change between two
235        requests. This is not a problem when only one file is searched
236        and changes include only appending new log messages.
237
238        Matches are found per line only (no multiline matches).
239
240        This method raises ValueError if some basic condition is not
241        met, for instance if the given query string is not a valid
242        regular expression.
243
244        Please note, that this exception will happen not before you
245        really fetch a result line.
246        """
247        try:
248            re_query = re.compile(query)
249        except:
250            raise ValueError('Invalid query string: %s' % query)
251
252        basename = basename.replace('/../', '')
253        files = self._logfiles(basename)
254
255        # Search the log files
256        num = 0
257        for name in files:
258            path = os.path.join(self.storage, 'logs', name)
259            if not os.path.isfile(path):
260                continue
261            for line in open(path, 'rb'):
262                if not re_query.match(line):
263                    continue
264                num += 1
265                if (num - 1) < start:
266                    continue
267                yield line
268
269                if limit and (num - limit >= start):
270                    raise StopIteration
271        pass
272
273class DataCenterFile(object):
274    """A description of a file stored in data center.
275    """
276    grok.implements(IDataCenterFile)
277
278    def __init__(self, context):
279        self.context = context
280        self.name = os.path.basename(self.context)
281        self.size = self.getSize()
282        self.uploaddate = self.getDate()
283        self.lines = self.getLinesNumber()
284
285    def getDate(self):
286        """Get a human readable datetime representation.
287        """
288        date = datetime.fromtimestamp(os.path.getctime(self.context))
289        return date.strftime("%Y-%m-%d %H:%M:%S")
290
291    def getTimeStamp(self):
292        """Get a (machine readable) timestamp.
293        """
294        return os.path.getctime(self.context)
295
296    def getSize(self):
297        """Get a human readable file size.
298        """
299        bytesize = os.path.getsize(self.context)
300        size = "%s bytes" % bytesize
301        units = ['kb', 'MB', 'GB']
302        for power, unit in reversed(list(enumerate(units))):
303            power += 1
304            if bytesize >= 1024 ** power:
305                size = "%.2f %s" % (bytesize/(1024.0**power), unit)
306                break
307        return size
308
309    def getLinesNumber(self):
310        """Get number of lines.
311        """
312        num = 0
313        for line in open(self.context, 'rb'):
314            num += 1
315        return num
316
317class LogFile(DataCenterFile):
318    """A description of a log file.
319    """
320    def __init__(self, context):
321        super(LogFile, self).__init__(context)
322        self._markers = dict()
323        self._parsed = False
324        self.userid = self.getUserId()
325        self.mode = self.getMode()
326        self.stats = self.getStats()
327        self.source = self.getSourcePath()
328
329    def _parseFile(self, maxline=10):
330        """Find markers in a file.
331        """
332        if self._parsed:
333            return
334        for line in open(self.context, 'rb'):
335            line = line.strip()
336            if not ':' in line:
337                continue
338            name, text = line.split(':', 1)
339            self._markers[name.lower()] = text
340        self._parsed = True
341        return
342
343    def _getMarker(self, marker):
344        marker = marker.lower()
345        if not self._parsed:
346            self._parseFile()
347        if marker in self._markers.keys():
348            return self._markers[marker]
349
350    def getUserId(self):
351        return self._getMarker('user') or '<UNKNOWN>'
352
353    def getMode(self):
354        return self._getMarker('mode') or '<NOT SET>'
355
356    def getStats(self):
357        return self._getMarker('processed') or '<Info not avail.>'
358
359    def getSourcePath(self):
360        return self._getMarker('source') or None
361
362
363class DataCenterStorageMovedEvent(ObjectEvent):
364    """An event fired, when datacenter storage moves.
365    """
366    grok.implements(IDataCenterStorageMovedEvent)
Note: See TracBrowser for help on using the repository browser.