source: main/waeup.kofa/trunk/src/waeup/kofa/datacenter.py @ 9643

Last change on this file since 9643 was 9589, checked in by Henrik Bettermann, 12 years ago

Show only last 100 finished files.

  • Property svn:keywords set to Id
File size: 14.7 KB
Line 
1## $Id: datacenter.py 9589 2012-11-09 07:53:05Z henrik $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
18"""Kofa data center.
19
20The waeup data center cares for management of upload data and provides
21tools for importing/exporting CSV data.
22"""
23import os
24import fnmatch
25import re
26import shutil
27import grok
28from datetime import datetime
29from zope.component import getUtility
30from zope.component.interfaces import ObjectEvent
31from waeup.kofa.interfaces import (IDataCenter, IDataCenterFile,
32                                   IDataCenterStorageMovedEvent,
33                                   IDataCenterConfig)
34from waeup.kofa.utils.batching import ExportJobContainer
35from waeup.kofa.utils.helpers import copy_filesystem_tree, merge_csv_files
36from waeup.kofa.utils.logger import Logger
37
38#: Regular expression describing a logfile name with backup extension
39RE_LOGFILE_BACKUP_NAME = re.compile('^.+\.\d+$')
40
41class DataCenter(grok.Container, Logger, ExportJobContainer):
42    """A data center contains CSV files.
43    """
44    grok.implements(IDataCenter)
45
46    logger_name = 'waeup.kofa.${sitename}.datacenter'
47    logger_filename = 'datacenter.log'
48
49    max_files = 100
50
51    def __init__(self, *args, **kw):
52        super(DataCenter, self).__init__(*args, **kw)
53        self.storage = getUtility(IDataCenterConfig)['path']
54        self._createSubDirs()
55
56    def _createSubDirs(self):
57        """Create standard subdirs.
58        """
59        for name in ['finished', 'unfinished', 'logs', 'deleted']:
60            path = os.path.join(self.storage, name)
61            if os.path.exists(path):
62                continue
63            os.mkdir(path)
64        return
65
66    @property
67    def deleted_path(self):
68        """Get the path for deleted object data.
69        """
70        return os.path.join(self.storage, 'deleted')
71
72    def getPendingFiles(self, sort='name'):
73        """Get a list of files stored in `storage`.
74
75        Files are sorted by basename.
76        """
77        result = []
78        if not os.path.exists(self.storage):
79            return result
80        for filename in sorted(os.listdir(self.storage)):
81            fullpath = os.path.join(self.storage, filename)
82            if not os.path.isfile(fullpath):
83                continue
84            if not filename.endswith('.csv'):
85                continue
86            result.append(DataCenterFile(fullpath))
87        if sort == 'date':
88            # sort results in newest-first order...
89            result = sorted(result, key=lambda x: x.getTimeStamp(),
90                            reverse=True)
91        return result
92
93    def getFinishedFiles(self):
94        """Get a list of files stored in `finished` subfolder of `storage`.
95
96        Files are unsorted.
97        """
98        result = []
99        finished_dir = os.path.join(self.storage, 'finished')
100        if not os.path.exists(finished_dir):
101            return result
102        mtime = lambda f: os.stat(os.path.join(finished_dir, f)).st_mtime
103        finished_files = [f for f in
104            sorted(os.listdir(finished_dir), key=mtime, reverse=True)
105            if fnmatch.fnmatch(f, '*.finished.csv')]
106        for filename in finished_files[:self.max_files]:
107            fullpath = os.path.join(finished_dir, filename)
108            if not os.path.isfile(fullpath):
109                continue
110            if not filename.endswith('.csv'):
111                continue
112            result.append(DataCenterFile(fullpath, 'finished'))
113        return result
114
115    def getLogFiles(self, exclude_backups=True):
116        """Get the files from logs/ subdir. Files are sorted by name.
117
118        By default backup logs ('app.log.1', etc.) are excluded.
119        """
120        result = []
121        logdir = os.path.join(self.storage, 'logs')
122        if not os.path.exists(logdir):
123            os.mkdir(logdir)
124        for name in sorted(os.listdir(logdir)):
125            if not os.path.isfile(os.path.join(logdir, name)):
126                continue
127            if exclude_backups == True and RE_LOGFILE_BACKUP_NAME.match(name):
128                continue
129            result.append(
130                LogFile(os.path.join(self.storage, 'logs', name)))
131        return result
132
133    def setStoragePath(self, path, move=False, overwrite=False):
134        """Set the path where to store files.
135        """
136        path = os.path.abspath(path)
137        not_copied = []
138        if not os.path.exists(path):
139            raise ValueError('The path given does not exist: %s' % path)
140        if move is True:
141            not_copied = copy_filesystem_tree(self.storage, path,
142                                            overwrite=overwrite)
143        self.storage = path
144        self._createSubDirs()
145        grok.notify(DataCenterStorageMovedEvent(self))
146        return not_copied
147
148    def _moveFile(self, source, dest):
149        """Move file source to dest preserving ctime, mtime, etc.
150        """
151        if not os.path.exists(source):
152            self.logger.warn('No such source path: %s' % source)
153            return
154        if source == dest:
155            return
156        shutil.copyfile(source, dest)
157        shutil.copystat(source, dest)
158        os.unlink(source)
159
160    def _appendCSVFile(self, source, dest):
161        """Append data from CSV file `source` to data from CSV file `dest`.
162
163        The `source` file is deleted afterwards.
164        """
165        if not os.path.exists(dest):
166            return self._moveFile(source, dest)
167        if not os.path.exists(source):
168            self.logger.warn('No such source path: %s' % source)
169            return
170        if source == dest:
171            return
172        result_path = merge_csv_files(dest, source)
173        os.chmod(result_path, 0664)
174        self._moveFile(result_path, dest)
175        os.unlink(source)
176
177    def distProcessedFiles(self, successful, source_path, finished_file,
178                           pending_file, mode='create', move_orig=True):
179        """Put processed files into final locations.
180
181        ``successful`` is a boolean that tells, whether processing was
182        successful.
183
184        ``source_path``: path to file that was processed.
185
186        ``finished_file``, ``pending_file``: paths to the respective
187        generated .pending and .finished file. The .pending file path
188        may be ``None``.
189
190        If finished file is placed in a location outside the local
191        storage dir, the complete directory is removed
192        afterwards. Regular processors should put their stuff in
193        dedicated temporary dirs.
194
195        See datacenter.txt for more info about how this works.
196        """
197        basename = os.path.basename(source_path)
198        pending_name = basename
199        pending = False
200        finished_dir = os.path.join(self.storage, 'finished')
201        unfinished_dir = os.path.join(self.storage, 'unfinished')
202
203        if basename.endswith('.pending.csv'):
204            maybe_basename = "%s.csv" % basename.rsplit('.', 3)[0]
205            maybe_src = os.path.join(unfinished_dir, maybe_basename)
206            if os.path.isfile(maybe_src):
207                basename = maybe_basename
208                pending = True
209
210        base, ext = os.path.splitext(basename)
211        finished_name = "%s.%s.finished%s" % (base, mode, ext)
212        if not pending:
213            pending_name = "%s.%s.pending%s" % (base, mode, ext)
214
215        # Put .pending and .finished file into respective places...
216        pending_dest = os.path.join(self.storage, pending_name)
217        finished_dest = os.path.join(finished_dir, finished_name)
218        self._appendCSVFile(finished_file, finished_dest)
219        if pending_file is not None:
220            self._moveFile(pending_file, pending_dest)
221
222        # Put source file into final location...
223        finished_dest = os.path.join(finished_dir, basename)
224        unfinished_dest = os.path.join(unfinished_dir, basename)
225        if successful and not pending:
226            self._moveFile(source_path, finished_dest)
227        elif successful and pending:
228            self._moveFile(unfinished_dest, finished_dest)
229            os.unlink(source_path)
230        elif not successful and not pending:
231            self._moveFile(source_path, unfinished_dest)
232
233        # If finished and pending-file were created in a location
234        # outside datacenter storage, we remove it.
235        maybe_temp_dir = os.path.dirname(finished_file)
236        if os.path.commonprefix(
237            [self.storage, maybe_temp_dir]) != self.storage:
238            shutil.rmtree(maybe_temp_dir)
239        return
240
241    def _logfiles(self, basename):
242        """Get sorted logfiles starting with `basename`.
243        """
244        def numerical_suffix(name):
245            # return numerical suffix in `name` as number or 0.
246            suffix = name.rsplit('.', 1)[-1]
247            try:
248                return int(suffix)
249            except ValueError:
250                return 0
251            pass
252        files = [basename,]
253        for name in os.listdir(os.path.join(self.storage, 'logs')):
254            if RE_LOGFILE_BACKUP_NAME.match(name):
255                if name.rsplit('.', 1)[0] == basename:
256                    files.append(name)
257        return sorted(files, key=numerical_suffix, reverse=True)
258
259    def queryLogfiles(self, basename, query=None, limit=0, start=0):
260        """Search `query` in all logfiles starting with `basename`.
261
262        Returns an iterator of those lines in logfiles starting with
263        `basename` that match `query`. If you want the result as a
264        list, simply list() the iterator.
265
266        All logfiles with name `basename` and maybe some numerical
267        extension ('.1', '.2', ...) are searched for the `query` term
268        in correct chronological order. So, if you ask for a basename 'app.log',
269        then any file named 'app2.log', 'app.log.1', 'app.log',
270        etc. will be searched in that order.
271
272        The `query` is expected to be a string containing a regular
273        expression.
274
275        If `limit` is set to some numerical value, at most this number
276        of lines is returned.
277
278        With `start` you can give the number of first matched line to
279        return. `start` is zero-based, i.e. the first match has number
280        0, the scond one 1, etc.
281
282        Together with `limit` this allows some basic
283        batching. Please keep in mind that batching might give
284        unpredictable results, when logfiles change between two
285        requests. This is not a problem when only one file is searched
286        and changes include only appending new log messages.
287
288        Matches are found per line only (no multiline matches).
289
290        This method raises ValueError if some basic condition is not
291        met, for instance if the given query string is not a valid
292        regular expression.
293
294        Please note, that this exception will happen not before you
295        really fetch a result line.
296        """
297        try:
298            re_query = re.compile(query)
299        except:
300            raise ValueError('Invalid query string: %s' % query)
301
302        basename = basename.replace('/../', '')
303        files = self._logfiles(basename)
304
305        # Search the log files
306        num = 0
307        for name in files:
308            path = os.path.join(self.storage, 'logs', name)
309            if not os.path.isfile(path):
310                continue
311            for line in open(path, 'rb'):
312                if not re_query.search(line):
313                    continue
314                num += 1
315                if (num - 1) < start:
316                    continue
317                yield line
318
319                if limit and (num - limit >= start):
320                    raise StopIteration
321        pass
322
323class DataCenterFile(object):
324    """A description of a file stored in data center.
325    """
326    grok.implements(IDataCenterFile)
327
328    def __init__(self, context, folder_name=''):
329        self.context = context
330        self.name = os.path.basename(self.context)
331        self.rel_path = os.path.join(folder_name, self.name)
332        self.size = self.getSize()
333        self.uploaddate = self.getDate()
334        self.lines = self.getLinesNumber()
335
336    def getDate(self):
337        """Get a human readable datetime representation.
338        """
339        date = datetime.fromtimestamp(os.path.getmtime(self.context))
340        return date.strftime("%Y-%m-%d %H:%M:%S")
341
342    def getTimeStamp(self):
343        """Get a (machine readable) timestamp.
344        """
345        return os.path.getmtime(self.context)
346
347    def getSize(self):
348        """Get a human readable file size.
349        """
350        bytesize = os.path.getsize(self.context)
351        size = "%s bytes" % bytesize
352        units = ['kb', 'MB', 'GB']
353        for power, unit in reversed(list(enumerate(units))):
354            power += 1
355            if bytesize >= 1024 ** power:
356                size = "%.2f %s" % (bytesize/(1024.0**power), unit)
357                break
358        return size
359
360    def getLinesNumber(self):
361        """Get number of lines.
362        """
363        num = 0
364        for line in open(self.context, 'rb'):
365            num += 1
366        return num
367
368class LogFile(DataCenterFile):
369    """A description of a log file.
370    """
371    def __init__(self, context):
372        super(LogFile, self).__init__(context)
373        self._markers = dict()
374        self._parsed = False
375        self.userid = self.getUserId()
376        self.mode = self.getMode()
377        self.stats = self.getStats()
378        self.source = self.getSourcePath()
379
380    def _parseFile(self, maxline=10):
381        """Find markers in a file.
382        """
383        if self._parsed:
384            return
385        for line in open(self.context, 'rb'):
386            line = line.strip()
387            if not ':' in line:
388                continue
389            name, text = line.split(':', 1)
390            self._markers[name.lower()] = text
391        self._parsed = True
392        return
393
394    def _getMarker(self, marker):
395        marker = marker.lower()
396        if not self._parsed:
397            self._parseFile()
398        if marker in self._markers.keys():
399            return self._markers[marker]
400
401    def getUserId(self):
402        return self._getMarker('user') or '<UNKNOWN>'
403
404    def getMode(self):
405        return self._getMarker('mode') or '<NOT SET>'
406
407    def getStats(self):
408        return self._getMarker('processed') or '<Info not avail.>'
409
410    def getSourcePath(self):
411        return self._getMarker('source') or None
412
413
414class DataCenterStorageMovedEvent(ObjectEvent):
415    """An event fired, when datacenter storage moves.
416    """
417    grok.implements(IDataCenterStorageMovedEvent)
Note: See TracBrowser for help on using the repository browser.