source: main/waeup.kofa/trunk/src/waeup/kofa/datacenter.py @ 9012

Last change on this file since 9012 was 9008, checked in by Henrik Bettermann, 12 years ago

Show only csv files in data center.

  • Property svn:keywords set to Id
File size: 13.7 KB
RevLine 
[7193]1## $Id: datacenter.py 9008 2012-07-17 04:45:43Z henrik $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
[7819]18"""Kofa data center.
[4146]19
20The waeup data center cares for management of upload data and provides
21tools for importing/exporting CSV data.
22"""
23import os
[8512]24import re
[4896]25import shutil
[4146]26import grok
27from datetime import datetime
[7568]28from zope.component import getUtility
[4883]29from zope.component.interfaces import ObjectEvent
[7811]30from waeup.kofa.interfaces import (IDataCenter, IDataCenterFile,
[7568]31                                   IDataCenterStorageMovedEvent,
[7579]32                                   IDataCenterConfig)
[8634]33from waeup.kofa.utils.helpers import copy_filesystem_tree, merge_csv_files
[7811]34from waeup.kofa.utils.logger import Logger
[4146]35
[8512]36#: Regular expression describing a logfile name with backup extension
[8516]37RE_LOGFILE_BACKUP_NAME = re.compile('^.+\.\d+$')
[8512]38
[6578]39class DataCenter(grok.Container, Logger):
[4146]40    """A data center contains CSV files.
41    """
[4669]42    grok.implements(IDataCenter)
[4166]43
[7811]44    logger_name = 'waeup.kofa.${sitename}.datacenter'
[6578]45    logger_filename = 'datacenter.log'
[4873]46
[4892]47    def __init__(self, *args, **kw):
48        super(DataCenter, self).__init__(*args, **kw)
[7579]49        self.storage = getUtility(IDataCenterConfig)['path']
[4892]50        self._createSubDirs()
[6286]51
[4892]52    def _createSubDirs(self):
53        """Create standard subdirs.
54        """
[8372]55        for name in ['finished', 'unfinished', 'logs', 'deleted']:
[4892]56            path = os.path.join(self.storage, name)
57            if os.path.exists(path):
58                continue
59            os.mkdir(path)
60        return
[6286]61
[8395]62    @property
63    def deleted_path(self):
64        """Get the path for deleted object data.
65        """
66        return os.path.join(self.storage, 'deleted')
67
[4858]68    def getFiles(self, sort='name'):
[4146]69        """Get a list of files stored in `storage`.
[4574]70
71        Files are sorted by basename.
[4146]72        """
73        result = []
74        if not os.path.exists(self.storage):
75            return result
[4574]76        for filename in sorted(os.listdir(self.storage)):
[4146]77            fullpath = os.path.join(self.storage, filename)
78            if not os.path.isfile(fullpath):
79                continue
[9008]80            if not filename.endswith('.csv'):
81                continue
[4146]82            result.append(DataCenterFile(fullpath))
[4858]83        if sort == 'date':
84            # sort results in newest-first order...
85            result = sorted(result, key=lambda x: x.getTimeStamp(),
86                            reverse=True)
[4146]87        return result
88
[8516]89    def getLogFiles(self, exclude_backups=True):
[4908]90        """Get the files from logs/ subdir. Files are sorted by name.
[8516]91
92        By default backup logs ('app.log.1', etc.) are excluded.
[4858]93        """
94        result = []
[4908]95        logdir = os.path.join(self.storage, 'logs')
96        if not os.path.exists(logdir):
97            os.mkdir(logdir)
98        for name in sorted(os.listdir(logdir)):
99            if not os.path.isfile(os.path.join(logdir, name)):
[4858]100                continue
[8516]101            if exclude_backups == True and RE_LOGFILE_BACKUP_NAME.match(name):
102                continue
[4858]103            result.append(
[4908]104                LogFile(os.path.join(self.storage, 'logs', name)))
[4858]105        return result
[6286]106
[4190]107    def setStoragePath(self, path, move=False, overwrite=False):
[4146]108        """Set the path where to store files.
109        """
110        path = os.path.abspath(path)
[4190]111        not_copied = []
[4152]112        if not os.path.exists(path):
113            raise ValueError('The path given does not exist: %s' % path)
[4173]114        if move is True:
[7186]115            not_copied = copy_filesystem_tree(self.storage, path,
[4190]116                                            overwrite=overwrite)
[4146]117        self.storage = path
[4892]118        self._createSubDirs()
[4883]119        grok.notify(DataCenterStorageMovedEvent(self))
[4190]120        return not_copied
[4146]121
[4896]122    def _moveFile(self, source, dest):
123        """Move file source to dest preserving ctime, mtime, etc.
124        """
125        if not os.path.exists(source):
126            self.logger.warn('No such source path: %s' % source)
127            return
128        if source == dest:
129            return
130        shutil.copyfile(source, dest)
131        shutil.copystat(source, dest)
132        os.unlink(source)
[6286]133
[8634]134    def _appendCSVFile(self, source, dest):
135        """Append data from CSV file `source` to data from CSV file `dest`.
136
137        The `source` file is deleted afterwards.
138        """
139        if not os.path.exists(dest):
140            return self._moveFile(source, dest)
141        if not os.path.exists(source):
142            self.logger.warn('No such source path: %s' % source)
143            return
144        if source == dest:
145            return
146        result_path = merge_csv_files(dest, source)
[8905]147        os.chmod(result_path, 0664)
[8634]148        self._moveFile(result_path, dest)
149        os.unlink(source)
150
[4896]151    def distProcessedFiles(self, successful, source_path, finished_file,
[4996]152                           pending_file, mode='create', move_orig=True):
[4896]153        """Put processed files into final locations.
[4858]154
[4896]155        ``successful`` is a boolean that tells, whether processing was
156        successful.
157
158        ``source_path``: path to file that was processed.
159
160        ``finished_file``, ``pending_file``: paths to the respective
161        generated .pending and .finished file. The .pending file path
162        may be ``None``.
163
[4904]164        If finished file is placed in a location outside the local
165        storage dir, the complete directory is removed
[7933]166        afterwards. Regular processors should put their stuff in
[4904]167        dedicated temporary dirs.
[6286]168
[4896]169        See datacenter.txt for more info about how this works.
170        """
171        basename = os.path.basename(source_path)
172        pending_name = basename
173        pending = False
174        finished_dir = os.path.join(self.storage, 'finished')
175        unfinished_dir = os.path.join(self.storage, 'unfinished')
176
177        if basename.endswith('.pending.csv'):
[4996]178            maybe_basename = "%s.csv" % basename.rsplit('.', 3)[0]
[4896]179            maybe_src = os.path.join(unfinished_dir, maybe_basename)
180            if os.path.isfile(maybe_src):
181                basename = maybe_basename
182                pending = True
[6286]183
[4896]184        base, ext = os.path.splitext(basename)
[4996]185        finished_name = "%s.%s.finished%s" % (base, mode, ext)
[4896]186        if not pending:
[4996]187            pending_name = "%s.%s.pending%s" % (base, mode, ext)
[4896]188
189        # Put .pending and .finished file into respective places...
190        pending_dest = os.path.join(self.storage, pending_name)
191        finished_dest = os.path.join(finished_dir, finished_name)
[8634]192        self._appendCSVFile(finished_file, finished_dest)
[4896]193        if pending_file is not None:
194            self._moveFile(pending_file, pending_dest)
195
196        # Put source file into final location...
197        finished_dest = os.path.join(finished_dir, basename)
198        unfinished_dest = os.path.join(unfinished_dir, basename)
199        if successful and not pending:
200            self._moveFile(source_path, finished_dest)
201        elif successful and pending:
202            self._moveFile(unfinished_dest, finished_dest)
203            os.unlink(source_path)
204        elif not successful and not pending:
205            self._moveFile(source_path, unfinished_dest)
[4904]206
207        # If finished and pending-file were created in a location
208        # outside datacenter storage, we remove it.
[4906]209        maybe_temp_dir = os.path.dirname(finished_file)
[4904]210        if os.path.commonprefix(
[4906]211            [self.storage, maybe_temp_dir]) != self.storage:
212            shutil.rmtree(maybe_temp_dir)
[4896]213        return
214
[8512]215    def _logfiles(self, basename):
216        """Get sorted logfiles starting with `basename`.
217        """
218        def numerical_suffix(name):
219            # return numerical suffix in `name` as number or 0.
220            suffix = name.rsplit('.', 1)[-1]
221            try:
222                return int(suffix)
223            except ValueError:
224                return 0
225            pass
226        files = [basename,]
227        for name in os.listdir(os.path.join(self.storage, 'logs')):
[8516]228            if RE_LOGFILE_BACKUP_NAME.match(name):
[8725]229                if name.rsplit('.', 1)[0] == basename:
230                    files.append(name)
[8592]231        return sorted(files, key=numerical_suffix, reverse=True)
[6286]232
[8512]233    def queryLogfiles(self, basename, query=None, limit=0, start=0):
234        """Search `query` in all logfiles starting with `basename`.
235
236        Returns an iterator of those lines in logfiles starting with
237        `basename` that match `query`. If you want the result as a
238        list, simply list() the iterator.
239
240        All logfiles with name `basename` and maybe some numerical
241        extension ('.1', '.2', ...) are searched for the `query` term
[8592]242        in correct chronological order. So, if you ask for a basename 'app.log',
243        then any file named 'app2.log', 'app.log.1', 'app.log',
[8512]244        etc. will be searched in that order.
245
246        The `query` is expected to be a string containing a regular
247        expression.
248
249        If `limit` is set to some numerical value, at most this number
250        of lines is returned.
251
252        With `start` you can give the number of first matched line to
253        return. `start` is zero-based, i.e. the first match has number
254        0, the scond one 1, etc.
255
256        Together with `limit` this allows some basic
257        batching. Please keep in mind that batching might give
258        unpredictable results, when logfiles change between two
259        requests. This is not a problem when only one file is searched
260        and changes include only appending new log messages.
261
262        Matches are found per line only (no multiline matches).
263
264        This method raises ValueError if some basic condition is not
265        met, for instance if the given query string is not a valid
266        regular expression.
267
268        Please note, that this exception will happen not before you
269        really fetch a result line.
270        """
271        try:
272            re_query = re.compile(query)
273        except:
274            raise ValueError('Invalid query string: %s' % query)
275
276        basename = basename.replace('/../', '')
277        files = self._logfiles(basename)
278
279        # Search the log files
280        num = 0
281        for name in files:
282            path = os.path.join(self.storage, 'logs', name)
283            if not os.path.isfile(path):
284                continue
285            for line in open(path, 'rb'):
[8513]286                if not re_query.search(line):
[8512]287                    continue
288                num += 1
289                if (num - 1) < start:
290                    continue
291                yield line
292
293                if limit and (num - limit >= start):
294                    raise StopIteration
295        pass
296
[4146]297class DataCenterFile(object):
298    """A description of a file stored in data center.
299    """
[4166]300    grok.implements(IDataCenterFile)
[6286]301
[4146]302    def __init__(self, context):
303        self.context = context
304        self.name = os.path.basename(self.context)
305        self.size = self.getSize()
306        self.uploaddate = self.getDate()
[4858]307        self.lines = self.getLinesNumber()
[4146]308
309    def getDate(self):
310        """Get a human readable datetime representation.
311        """
312        date = datetime.fromtimestamp(os.path.getctime(self.context))
[6827]313        return date.strftime("%Y-%m-%d %H:%M:%S")
[4146]314
[4858]315    def getTimeStamp(self):
316        """Get a (machine readable) timestamp.
317        """
318        return os.path.getctime(self.context)
[6286]319
[4146]320    def getSize(self):
321        """Get a human readable file size.
322        """
323        bytesize = os.path.getsize(self.context)
324        size = "%s bytes" % bytesize
325        units = ['kb', 'MB', 'GB']
326        for power, unit in reversed(list(enumerate(units))):
327            power += 1
328            if bytesize >= 1024 ** power:
329                size = "%.2f %s" % (bytesize/(1024.0**power), unit)
330                break
331        return size
332
[4858]333    def getLinesNumber(self):
334        """Get number of lines.
335        """
336        num = 0
337        for line in open(self.context, 'rb'):
338            num += 1
339        return num
[6286]340
[4858]341class LogFile(DataCenterFile):
342    """A description of a log file.
343    """
344    def __init__(self, context):
345        super(LogFile, self).__init__(context)
346        self._markers = dict()
347        self._parsed = False
348        self.userid = self.getUserId()
349        self.mode = self.getMode()
350        self.stats = self.getStats()
351        self.source = self.getSourcePath()
352
353    def _parseFile(self, maxline=10):
354        """Find markers in a file.
355        """
356        if self._parsed:
357            return
358        for line in open(self.context, 'rb'):
359            line = line.strip()
360            if not ':' in line:
361                continue
362            name, text = line.split(':', 1)
363            self._markers[name.lower()] = text
364        self._parsed = True
365        return
366
367    def _getMarker(self, marker):
368        marker = marker.lower()
369        if not self._parsed:
370            self._parseFile()
371        if marker in self._markers.keys():
372            return self._markers[marker]
[6286]373
[4858]374    def getUserId(self):
375        return self._getMarker('user') or '<UNKNOWN>'
376
377    def getMode(self):
378        return self._getMarker('mode') or '<NOT SET>'
379
380    def getStats(self):
381        return self._getMarker('processed') or '<Info not avail.>'
382
383    def getSourcePath(self):
384        return self._getMarker('source') or None
[4883]385
[4961]386
[4883]387class DataCenterStorageMovedEvent(ObjectEvent):
388    """An event fired, when datacenter storage moves.
389    """
390    grok.implements(IDataCenterStorageMovedEvent)
Note: See TracBrowser for help on using the repository browser.