source: main/waeup.kofa/trunk/src/waeup/kofa/datacenter.py @ 8685

Last change on this file since 8685 was 8634, checked in by uli, 13 years ago

Let datacenter use the new CSV file merger from helpers.

CAUTION: this is a bit tested, but not in real use. Try
some local imports before using it in production environment!

  • Property svn:keywords set to Id
File size: 13.5 KB
RevLine 
[7193]1## $Id: datacenter.py 8634 2012-06-06 01:40:08Z uli $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
[7819]18"""Kofa data center.
[4146]19
20The waeup data center cares for management of upload data and provides
21tools for importing/exporting CSV data.
22"""
23import os
[8512]24import re
[4896]25import shutil
[4146]26import grok
27from datetime import datetime
[7568]28from zope.component import getUtility
[4883]29from zope.component.interfaces import ObjectEvent
[7811]30from waeup.kofa.interfaces import (IDataCenter, IDataCenterFile,
[7568]31                                   IDataCenterStorageMovedEvent,
[7579]32                                   IDataCenterConfig)
[8634]33from waeup.kofa.utils.helpers import copy_filesystem_tree, merge_csv_files
[7811]34from waeup.kofa.utils.logger import Logger
[4146]35
[8512]36#: Regular expression describing a logfile name with backup extension
[8516]37RE_LOGFILE_BACKUP_NAME = re.compile('^.+\.\d+$')
[8512]38
[6578]39class DataCenter(grok.Container, Logger):
[4146]40    """A data center contains CSV files.
41    """
[4669]42    grok.implements(IDataCenter)
[4166]43
[7811]44    logger_name = 'waeup.kofa.${sitename}.datacenter'
[6578]45    logger_filename = 'datacenter.log'
[4873]46
[4892]47    def __init__(self, *args, **kw):
48        super(DataCenter, self).__init__(*args, **kw)
[7579]49        self.storage = getUtility(IDataCenterConfig)['path']
[4892]50        self._createSubDirs()
[6286]51
[4892]52    def _createSubDirs(self):
53        """Create standard subdirs.
54        """
[8372]55        for name in ['finished', 'unfinished', 'logs', 'deleted']:
[4892]56            path = os.path.join(self.storage, name)
57            if os.path.exists(path):
58                continue
59            os.mkdir(path)
60        return
[6286]61
[8395]62    @property
63    def deleted_path(self):
64        """Get the path for deleted object data.
65        """
66        return os.path.join(self.storage, 'deleted')
67
[4858]68    def getFiles(self, sort='name'):
[4146]69        """Get a list of files stored in `storage`.
[4574]70
71        Files are sorted by basename.
[4146]72        """
73        result = []
74        if not os.path.exists(self.storage):
75            return result
[4574]76        for filename in sorted(os.listdir(self.storage)):
[4146]77            fullpath = os.path.join(self.storage, filename)
78            if not os.path.isfile(fullpath):
79                continue
80            result.append(DataCenterFile(fullpath))
[4858]81        if sort == 'date':
82            # sort results in newest-first order...
83            result = sorted(result, key=lambda x: x.getTimeStamp(),
84                            reverse=True)
[4146]85        return result
86
[8516]87    def getLogFiles(self, exclude_backups=True):
[4908]88        """Get the files from logs/ subdir. Files are sorted by name.
[8516]89
90        By default backup logs ('app.log.1', etc.) are excluded.
[4858]91        """
92        result = []
[4908]93        logdir = os.path.join(self.storage, 'logs')
94        if not os.path.exists(logdir):
95            os.mkdir(logdir)
96        for name in sorted(os.listdir(logdir)):
97            if not os.path.isfile(os.path.join(logdir, name)):
[4858]98                continue
[8516]99            if exclude_backups == True and RE_LOGFILE_BACKUP_NAME.match(name):
100                continue
[4858]101            result.append(
[4908]102                LogFile(os.path.join(self.storage, 'logs', name)))
[4858]103        return result
[6286]104
[4190]105    def setStoragePath(self, path, move=False, overwrite=False):
[4146]106        """Set the path where to store files.
107        """
108        path = os.path.abspath(path)
[4190]109        not_copied = []
[4152]110        if not os.path.exists(path):
111            raise ValueError('The path given does not exist: %s' % path)
[4173]112        if move is True:
[7186]113            not_copied = copy_filesystem_tree(self.storage, path,
[4190]114                                            overwrite=overwrite)
[4146]115        self.storage = path
[4892]116        self._createSubDirs()
[4883]117        grok.notify(DataCenterStorageMovedEvent(self))
[4190]118        return not_copied
[4146]119
[4896]120    def _moveFile(self, source, dest):
121        """Move file source to dest preserving ctime, mtime, etc.
122        """
123        if not os.path.exists(source):
124            self.logger.warn('No such source path: %s' % source)
125            return
126        if source == dest:
127            return
128        shutil.copyfile(source, dest)
129        shutil.copystat(source, dest)
130        os.unlink(source)
[6286]131
[8634]132    def _appendCSVFile(self, source, dest):
133        """Append data from CSV file `source` to data from CSV file `dest`.
134
135        The `source` file is deleted afterwards.
136        """
137        if not os.path.exists(dest):
138            return self._moveFile(source, dest)
139        if not os.path.exists(source):
140            self.logger.warn('No such source path: %s' % source)
141            return
142        if source == dest:
143            return
144        result_path = merge_csv_files(dest, source)
145        self._moveFile(result_path, dest)
146        os.unlink(source)
147
[4896]148    def distProcessedFiles(self, successful, source_path, finished_file,
[4996]149                           pending_file, mode='create', move_orig=True):
[4896]150        """Put processed files into final locations.
[4858]151
[4896]152        ``successful`` is a boolean that tells, whether processing was
153        successful.
154
155        ``source_path``: path to file that was processed.
156
157        ``finished_file``, ``pending_file``: paths to the respective
158        generated .pending and .finished file. The .pending file path
159        may be ``None``.
160
[4904]161        If finished file is placed in a location outside the local
162        storage dir, the complete directory is removed
[7933]163        afterwards. Regular processors should put their stuff in
[4904]164        dedicated temporary dirs.
[6286]165
[4896]166        See datacenter.txt for more info about how this works.
167        """
168        basename = os.path.basename(source_path)
169        pending_name = basename
170        pending = False
171        finished_dir = os.path.join(self.storage, 'finished')
172        unfinished_dir = os.path.join(self.storage, 'unfinished')
173
174        if basename.endswith('.pending.csv'):
[4996]175            maybe_basename = "%s.csv" % basename.rsplit('.', 3)[0]
[4896]176            maybe_src = os.path.join(unfinished_dir, maybe_basename)
177            if os.path.isfile(maybe_src):
178                basename = maybe_basename
179                pending = True
[6286]180
[4896]181        base, ext = os.path.splitext(basename)
[4996]182        finished_name = "%s.%s.finished%s" % (base, mode, ext)
[4896]183        if not pending:
[4996]184            pending_name = "%s.%s.pending%s" % (base, mode, ext)
[4896]185
186        # Put .pending and .finished file into respective places...
187        pending_dest = os.path.join(self.storage, pending_name)
188        finished_dest = os.path.join(finished_dir, finished_name)
[8634]189        self._appendCSVFile(finished_file, finished_dest)
[4896]190        if pending_file is not None:
191            self._moveFile(pending_file, pending_dest)
192
193        # Put source file into final location...
194        finished_dest = os.path.join(finished_dir, basename)
195        unfinished_dest = os.path.join(unfinished_dir, basename)
196        if successful and not pending:
197            self._moveFile(source_path, finished_dest)
198        elif successful and pending:
199            self._moveFile(unfinished_dest, finished_dest)
200            os.unlink(source_path)
201        elif not successful and not pending:
202            self._moveFile(source_path, unfinished_dest)
[4904]203
204        # If finished and pending-file were created in a location
205        # outside datacenter storage, we remove it.
[4906]206        maybe_temp_dir = os.path.dirname(finished_file)
[4904]207        if os.path.commonprefix(
[4906]208            [self.storage, maybe_temp_dir]) != self.storage:
209            shutil.rmtree(maybe_temp_dir)
[4896]210        return
211
[8512]212    def _logfiles(self, basename):
213        """Get sorted logfiles starting with `basename`.
214        """
215        def numerical_suffix(name):
216            # return numerical suffix in `name` as number or 0.
217            suffix = name.rsplit('.', 1)[-1]
218            try:
219                return int(suffix)
220            except ValueError:
221                return 0
222            pass
223        files = [basename,]
224        for name in os.listdir(os.path.join(self.storage, 'logs')):
[8516]225            if RE_LOGFILE_BACKUP_NAME.match(name):
[8512]226                files.append(name)
[8592]227        return sorted(files, key=numerical_suffix, reverse=True)
[6286]228
[8512]229    def queryLogfiles(self, basename, query=None, limit=0, start=0):
230        """Search `query` in all logfiles starting with `basename`.
231
232        Returns an iterator of those lines in logfiles starting with
233        `basename` that match `query`. If you want the result as a
234        list, simply list() the iterator.
235
236        All logfiles with name `basename` and maybe some numerical
237        extension ('.1', '.2', ...) are searched for the `query` term
[8592]238        in correct chronological order. So, if you ask for a basename 'app.log',
239        then any file named 'app2.log', 'app.log.1', 'app.log',
[8512]240        etc. will be searched in that order.
241
242        The `query` is expected to be a string containing a regular
243        expression.
244
245        If `limit` is set to some numerical value, at most this number
246        of lines is returned.
247
248        With `start` you can give the number of first matched line to
249        return. `start` is zero-based, i.e. the first match has number
250        0, the scond one 1, etc.
251
252        Together with `limit` this allows some basic
253        batching. Please keep in mind that batching might give
254        unpredictable results, when logfiles change between two
255        requests. This is not a problem when only one file is searched
256        and changes include only appending new log messages.
257
258        Matches are found per line only (no multiline matches).
259
260        This method raises ValueError if some basic condition is not
261        met, for instance if the given query string is not a valid
262        regular expression.
263
264        Please note, that this exception will happen not before you
265        really fetch a result line.
266        """
267        try:
268            re_query = re.compile(query)
269        except:
270            raise ValueError('Invalid query string: %s' % query)
271
272        basename = basename.replace('/../', '')
273        files = self._logfiles(basename)
274
275        # Search the log files
276        num = 0
277        for name in files:
278            path = os.path.join(self.storage, 'logs', name)
279            if not os.path.isfile(path):
280                continue
281            for line in open(path, 'rb'):
[8513]282                if not re_query.search(line):
[8512]283                    continue
284                num += 1
285                if (num - 1) < start:
286                    continue
287                yield line
288
289                if limit and (num - limit >= start):
290                    raise StopIteration
291        pass
292
[4146]293class DataCenterFile(object):
294    """A description of a file stored in data center.
295    """
[4166]296    grok.implements(IDataCenterFile)
[6286]297
[4146]298    def __init__(self, context):
299        self.context = context
300        self.name = os.path.basename(self.context)
301        self.size = self.getSize()
302        self.uploaddate = self.getDate()
[4858]303        self.lines = self.getLinesNumber()
[4146]304
305    def getDate(self):
306        """Get a human readable datetime representation.
307        """
308        date = datetime.fromtimestamp(os.path.getctime(self.context))
[6827]309        return date.strftime("%Y-%m-%d %H:%M:%S")
[4146]310
[4858]311    def getTimeStamp(self):
312        """Get a (machine readable) timestamp.
313        """
314        return os.path.getctime(self.context)
[6286]315
[4146]316    def getSize(self):
317        """Get a human readable file size.
318        """
319        bytesize = os.path.getsize(self.context)
320        size = "%s bytes" % bytesize
321        units = ['kb', 'MB', 'GB']
322        for power, unit in reversed(list(enumerate(units))):
323            power += 1
324            if bytesize >= 1024 ** power:
325                size = "%.2f %s" % (bytesize/(1024.0**power), unit)
326                break
327        return size
328
[4858]329    def getLinesNumber(self):
330        """Get number of lines.
331        """
332        num = 0
333        for line in open(self.context, 'rb'):
334            num += 1
335        return num
[6286]336
[4858]337class LogFile(DataCenterFile):
338    """A description of a log file.
339    """
340    def __init__(self, context):
341        super(LogFile, self).__init__(context)
342        self._markers = dict()
343        self._parsed = False
344        self.userid = self.getUserId()
345        self.mode = self.getMode()
346        self.stats = self.getStats()
347        self.source = self.getSourcePath()
348
349    def _parseFile(self, maxline=10):
350        """Find markers in a file.
351        """
352        if self._parsed:
353            return
354        for line in open(self.context, 'rb'):
355            line = line.strip()
356            if not ':' in line:
357                continue
358            name, text = line.split(':', 1)
359            self._markers[name.lower()] = text
360        self._parsed = True
361        return
362
363    def _getMarker(self, marker):
364        marker = marker.lower()
365        if not self._parsed:
366            self._parseFile()
367        if marker in self._markers.keys():
368            return self._markers[marker]
[6286]369
[4858]370    def getUserId(self):
371        return self._getMarker('user') or '<UNKNOWN>'
372
373    def getMode(self):
374        return self._getMarker('mode') or '<NOT SET>'
375
376    def getStats(self):
377        return self._getMarker('processed') or '<Info not avail.>'
378
379    def getSourcePath(self):
380        return self._getMarker('source') or None
[4883]381
[4961]382
[4883]383class DataCenterStorageMovedEvent(ObjectEvent):
384    """An event fired, when datacenter storage moves.
385    """
386    grok.implements(IDataCenterStorageMovedEvent)
Note: See TracBrowser for help on using the repository browser.