source: main/waeup.kofa/trunk/src/waeup/kofa/datacenter.py @ 9051

Last change on this file since 9051 was 9023, checked in by Henrik Bettermann, 13 years ago

Give access to processed (finished) import files.

  • Property svn:keywords set to Id
File size: 14.6 KB
RevLine 
[7193]1## $Id: datacenter.py 9023 2012-07-19 10:32:40Z henrik $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
[7819]18"""Kofa data center.
[4146]19
20The waeup data center cares for management of upload data and provides
21tools for importing/exporting CSV data.
22"""
23import os
[8512]24import re
[4896]25import shutil
[4146]26import grok
27from datetime import datetime
[7568]28from zope.component import getUtility
[4883]29from zope.component.interfaces import ObjectEvent
[7811]30from waeup.kofa.interfaces import (IDataCenter, IDataCenterFile,
[7568]31                                   IDataCenterStorageMovedEvent,
[7579]32                                   IDataCenterConfig)
[8634]33from waeup.kofa.utils.helpers import copy_filesystem_tree, merge_csv_files
[7811]34from waeup.kofa.utils.logger import Logger
[4146]35
[8512]36#: Regular expression describing a logfile name with backup extension
[8516]37RE_LOGFILE_BACKUP_NAME = re.compile('^.+\.\d+$')
[8512]38
[6578]39class DataCenter(grok.Container, Logger):
[4146]40    """A data center contains CSV files.
41    """
[4669]42    grok.implements(IDataCenter)
[4166]43
[7811]44    logger_name = 'waeup.kofa.${sitename}.datacenter'
[6578]45    logger_filename = 'datacenter.log'
[4873]46
[4892]47    def __init__(self, *args, **kw):
48        super(DataCenter, self).__init__(*args, **kw)
[7579]49        self.storage = getUtility(IDataCenterConfig)['path']
[4892]50        self._createSubDirs()
[6286]51
[4892]52    def _createSubDirs(self):
53        """Create standard subdirs.
54        """
[8372]55        for name in ['finished', 'unfinished', 'logs', 'deleted']:
[4892]56            path = os.path.join(self.storage, name)
57            if os.path.exists(path):
58                continue
59            os.mkdir(path)
60        return
[6286]61
[8395]62    @property
63    def deleted_path(self):
64        """Get the path for deleted object data.
65        """
66        return os.path.join(self.storage, 'deleted')
67
[9023]68    def getPendingFiles(self, sort='name'):
[4146]69        """Get a list of files stored in `storage`.
[4574]70
71        Files are sorted by basename.
[4146]72        """
73        result = []
74        if not os.path.exists(self.storage):
75            return result
[4574]76        for filename in sorted(os.listdir(self.storage)):
[4146]77            fullpath = os.path.join(self.storage, filename)
78            if not os.path.isfile(fullpath):
79                continue
[9008]80            if not filename.endswith('.csv'):
81                continue
[4146]82            result.append(DataCenterFile(fullpath))
[4858]83        if sort == 'date':
84            # sort results in newest-first order...
85            result = sorted(result, key=lambda x: x.getTimeStamp(),
86                            reverse=True)
[4146]87        return result
88
[9023]89    def getFinishedFiles(self, sort='name'):
90        """Get a list of files stored in `finished` subfolder of `storage`.
91
92        Files are sorted by basename.
93        """
94        result = []
95        finished_dir = os.path.join(self.storage, 'finished')
96        if not os.path.exists(finished_dir):
97            return result
98        for filename in sorted(os.listdir(finished_dir)):
99            fullpath = os.path.join(finished_dir, filename)
100            if not os.path.isfile(fullpath):
101                continue
102            if not filename.endswith('.csv'):
103                continue
104            result.append(DataCenterFile(fullpath, 'finished'))
105        if sort == 'date':
106            # sort results in newest-first order...
107            result = sorted(result, key=lambda x: x.getTimeStamp(),
108                            reverse=True)
109        return result
110
[8516]111    def getLogFiles(self, exclude_backups=True):
[4908]112        """Get the files from logs/ subdir. Files are sorted by name.
[8516]113
114        By default backup logs ('app.log.1', etc.) are excluded.
[4858]115        """
116        result = []
[4908]117        logdir = os.path.join(self.storage, 'logs')
118        if not os.path.exists(logdir):
119            os.mkdir(logdir)
120        for name in sorted(os.listdir(logdir)):
121            if not os.path.isfile(os.path.join(logdir, name)):
[4858]122                continue
[8516]123            if exclude_backups == True and RE_LOGFILE_BACKUP_NAME.match(name):
124                continue
[4858]125            result.append(
[4908]126                LogFile(os.path.join(self.storage, 'logs', name)))
[4858]127        return result
[6286]128
[4190]129    def setStoragePath(self, path, move=False, overwrite=False):
[4146]130        """Set the path where to store files.
131        """
132        path = os.path.abspath(path)
[4190]133        not_copied = []
[4152]134        if not os.path.exists(path):
135            raise ValueError('The path given does not exist: %s' % path)
[4173]136        if move is True:
[7186]137            not_copied = copy_filesystem_tree(self.storage, path,
[4190]138                                            overwrite=overwrite)
[4146]139        self.storage = path
[4892]140        self._createSubDirs()
[4883]141        grok.notify(DataCenterStorageMovedEvent(self))
[4190]142        return not_copied
[4146]143
[4896]144    def _moveFile(self, source, dest):
145        """Move file source to dest preserving ctime, mtime, etc.
146        """
147        if not os.path.exists(source):
148            self.logger.warn('No such source path: %s' % source)
149            return
150        if source == dest:
151            return
152        shutil.copyfile(source, dest)
153        shutil.copystat(source, dest)
154        os.unlink(source)
[6286]155
[8634]156    def _appendCSVFile(self, source, dest):
157        """Append data from CSV file `source` to data from CSV file `dest`.
158
159        The `source` file is deleted afterwards.
160        """
161        if not os.path.exists(dest):
162            return self._moveFile(source, dest)
163        if not os.path.exists(source):
164            self.logger.warn('No such source path: %s' % source)
165            return
166        if source == dest:
167            return
168        result_path = merge_csv_files(dest, source)
[8905]169        os.chmod(result_path, 0664)
[8634]170        self._moveFile(result_path, dest)
171        os.unlink(source)
172
[4896]173    def distProcessedFiles(self, successful, source_path, finished_file,
[4996]174                           pending_file, mode='create', move_orig=True):
[4896]175        """Put processed files into final locations.
[4858]176
[4896]177        ``successful`` is a boolean that tells, whether processing was
178        successful.
179
180        ``source_path``: path to file that was processed.
181
182        ``finished_file``, ``pending_file``: paths to the respective
183        generated .pending and .finished file. The .pending file path
184        may be ``None``.
185
[4904]186        If finished file is placed in a location outside the local
187        storage dir, the complete directory is removed
[7933]188        afterwards. Regular processors should put their stuff in
[4904]189        dedicated temporary dirs.
[6286]190
[4896]191        See datacenter.txt for more info about how this works.
192        """
193        basename = os.path.basename(source_path)
194        pending_name = basename
195        pending = False
196        finished_dir = os.path.join(self.storage, 'finished')
197        unfinished_dir = os.path.join(self.storage, 'unfinished')
198
199        if basename.endswith('.pending.csv'):
[4996]200            maybe_basename = "%s.csv" % basename.rsplit('.', 3)[0]
[4896]201            maybe_src = os.path.join(unfinished_dir, maybe_basename)
202            if os.path.isfile(maybe_src):
203                basename = maybe_basename
204                pending = True
[6286]205
[4896]206        base, ext = os.path.splitext(basename)
[4996]207        finished_name = "%s.%s.finished%s" % (base, mode, ext)
[4896]208        if not pending:
[4996]209            pending_name = "%s.%s.pending%s" % (base, mode, ext)
[4896]210
211        # Put .pending and .finished file into respective places...
212        pending_dest = os.path.join(self.storage, pending_name)
213        finished_dest = os.path.join(finished_dir, finished_name)
[8634]214        self._appendCSVFile(finished_file, finished_dest)
[4896]215        if pending_file is not None:
216            self._moveFile(pending_file, pending_dest)
217
218        # Put source file into final location...
219        finished_dest = os.path.join(finished_dir, basename)
220        unfinished_dest = os.path.join(unfinished_dir, basename)
221        if successful and not pending:
222            self._moveFile(source_path, finished_dest)
223        elif successful and pending:
224            self._moveFile(unfinished_dest, finished_dest)
225            os.unlink(source_path)
226        elif not successful and not pending:
227            self._moveFile(source_path, unfinished_dest)
[4904]228
229        # If finished and pending-file were created in a location
230        # outside datacenter storage, we remove it.
[4906]231        maybe_temp_dir = os.path.dirname(finished_file)
[4904]232        if os.path.commonprefix(
[4906]233            [self.storage, maybe_temp_dir]) != self.storage:
234            shutil.rmtree(maybe_temp_dir)
[4896]235        return
236
[8512]237    def _logfiles(self, basename):
238        """Get sorted logfiles starting with `basename`.
239        """
240        def numerical_suffix(name):
241            # return numerical suffix in `name` as number or 0.
242            suffix = name.rsplit('.', 1)[-1]
243            try:
244                return int(suffix)
245            except ValueError:
246                return 0
247            pass
248        files = [basename,]
249        for name in os.listdir(os.path.join(self.storage, 'logs')):
[8516]250            if RE_LOGFILE_BACKUP_NAME.match(name):
[8725]251                if name.rsplit('.', 1)[0] == basename:
252                    files.append(name)
[8592]253        return sorted(files, key=numerical_suffix, reverse=True)
[6286]254
[8512]255    def queryLogfiles(self, basename, query=None, limit=0, start=0):
256        """Search `query` in all logfiles starting with `basename`.
257
258        Returns an iterator of those lines in logfiles starting with
259        `basename` that match `query`. If you want the result as a
260        list, simply list() the iterator.
261
262        All logfiles with name `basename` and maybe some numerical
263        extension ('.1', '.2', ...) are searched for the `query` term
[8592]264        in correct chronological order. So, if you ask for a basename 'app.log',
265        then any file named 'app2.log', 'app.log.1', 'app.log',
[8512]266        etc. will be searched in that order.
267
268        The `query` is expected to be a string containing a regular
269        expression.
270
271        If `limit` is set to some numerical value, at most this number
272        of lines is returned.
273
274        With `start` you can give the number of first matched line to
275        return. `start` is zero-based, i.e. the first match has number
276        0, the scond one 1, etc.
277
278        Together with `limit` this allows some basic
279        batching. Please keep in mind that batching might give
280        unpredictable results, when logfiles change between two
281        requests. This is not a problem when only one file is searched
282        and changes include only appending new log messages.
283
284        Matches are found per line only (no multiline matches).
285
286        This method raises ValueError if some basic condition is not
287        met, for instance if the given query string is not a valid
288        regular expression.
289
290        Please note, that this exception will happen not before you
291        really fetch a result line.
292        """
293        try:
294            re_query = re.compile(query)
295        except:
296            raise ValueError('Invalid query string: %s' % query)
297
298        basename = basename.replace('/../', '')
299        files = self._logfiles(basename)
300
301        # Search the log files
302        num = 0
303        for name in files:
304            path = os.path.join(self.storage, 'logs', name)
305            if not os.path.isfile(path):
306                continue
307            for line in open(path, 'rb'):
[8513]308                if not re_query.search(line):
[8512]309                    continue
310                num += 1
311                if (num - 1) < start:
312                    continue
313                yield line
314
315                if limit and (num - limit >= start):
316                    raise StopIteration
317        pass
318
[4146]319class DataCenterFile(object):
320    """A description of a file stored in data center.
321    """
[4166]322    grok.implements(IDataCenterFile)
[6286]323
[9023]324    def __init__(self, context, folder_name=''):
[4146]325        self.context = context
326        self.name = os.path.basename(self.context)
[9023]327        self.rel_path = os.path.join(folder_name, self.name)
[4146]328        self.size = self.getSize()
329        self.uploaddate = self.getDate()
[4858]330        self.lines = self.getLinesNumber()
[4146]331
332    def getDate(self):
333        """Get a human readable datetime representation.
334        """
335        date = datetime.fromtimestamp(os.path.getctime(self.context))
[6827]336        return date.strftime("%Y-%m-%d %H:%M:%S")
[4146]337
[4858]338    def getTimeStamp(self):
339        """Get a (machine readable) timestamp.
340        """
341        return os.path.getctime(self.context)
[6286]342
[4146]343    def getSize(self):
344        """Get a human readable file size.
345        """
346        bytesize = os.path.getsize(self.context)
347        size = "%s bytes" % bytesize
348        units = ['kb', 'MB', 'GB']
349        for power, unit in reversed(list(enumerate(units))):
350            power += 1
351            if bytesize >= 1024 ** power:
352                size = "%.2f %s" % (bytesize/(1024.0**power), unit)
353                break
354        return size
355
[4858]356    def getLinesNumber(self):
357        """Get number of lines.
358        """
359        num = 0
360        for line in open(self.context, 'rb'):
361            num += 1
362        return num
[6286]363
[4858]364class LogFile(DataCenterFile):
365    """A description of a log file.
366    """
367    def __init__(self, context):
368        super(LogFile, self).__init__(context)
369        self._markers = dict()
370        self._parsed = False
371        self.userid = self.getUserId()
372        self.mode = self.getMode()
373        self.stats = self.getStats()
374        self.source = self.getSourcePath()
375
376    def _parseFile(self, maxline=10):
377        """Find markers in a file.
378        """
379        if self._parsed:
380            return
381        for line in open(self.context, 'rb'):
382            line = line.strip()
383            if not ':' in line:
384                continue
385            name, text = line.split(':', 1)
386            self._markers[name.lower()] = text
387        self._parsed = True
388        return
389
390    def _getMarker(self, marker):
391        marker = marker.lower()
392        if not self._parsed:
393            self._parseFile()
394        if marker in self._markers.keys():
395            return self._markers[marker]
[6286]396
[4858]397    def getUserId(self):
398        return self._getMarker('user') or '<UNKNOWN>'
399
400    def getMode(self):
401        return self._getMarker('mode') or '<NOT SET>'
402
403    def getStats(self):
404        return self._getMarker('processed') or '<Info not avail.>'
405
406    def getSourcePath(self):
407        return self._getMarker('source') or None
[4883]408
[4961]409
[4883]410class DataCenterStorageMovedEvent(ObjectEvent):
411    """An event fired, when datacenter storage moves.
412    """
413    grok.implements(IDataCenterStorageMovedEvent)
Note: See TracBrowser for help on using the repository browser.