source: main/waeup.kofa/branches/uli-async-update/src/waeup/kofa/datacenter.py @ 10009

Last change on this file since 10009 was 9169, checked in by uli, 12 years ago

Merge changes from trunk, r8786-HEAD

  • Property svn:keywords set to Id
File size: 14.5 KB
Line 
1## $Id: datacenter.py 9169 2012-09-10 11:05:07Z uli $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
18"""Kofa data center.
19
20The waeup data center cares for management of upload data and provides
21tools for importing/exporting CSV data.
22"""
23import os
24import re
25import shutil
26import grok
27from datetime import datetime
28from zope.component import getUtility
29from zope.component.interfaces import ObjectEvent
30from waeup.kofa.interfaces import (IDataCenter, IDataCenterFile,
31                                   IDataCenterStorageMovedEvent,
32                                   IDataCenterConfig)
33from waeup.kofa.utils.batching import ExportJobContainer
34from waeup.kofa.utils.helpers import copy_filesystem_tree, merge_csv_files
35from waeup.kofa.utils.logger import Logger
36
37#: Regular expression describing a logfile name with backup extension
38RE_LOGFILE_BACKUP_NAME = re.compile('^.+\.\d+$')
39
40class DataCenter(grok.Container, Logger, ExportJobContainer):
41    """A data center contains CSV files.
42    """
43    grok.implements(IDataCenter)
44
45    logger_name = 'waeup.kofa.${sitename}.datacenter'
46    logger_filename = 'datacenter.log'
47
48    def __init__(self, *args, **kw):
49        super(DataCenter, self).__init__(*args, **kw)
50        self.storage = getUtility(IDataCenterConfig)['path']
51        self._createSubDirs()
52
53    def _createSubDirs(self):
54        """Create standard subdirs.
55        """
56        for name in ['finished', 'unfinished', 'logs', 'deleted']:
57            path = os.path.join(self.storage, name)
58            if os.path.exists(path):
59                continue
60            os.mkdir(path)
61        return
62
63    @property
64    def deleted_path(self):
65        """Get the path for deleted object data.
66        """
67        return os.path.join(self.storage, 'deleted')
68
69    def getPendingFiles(self, sort='name'):
70        """Get a list of files stored in `storage`.
71
72        Files are sorted by basename.
73        """
74        result = []
75        if not os.path.exists(self.storage):
76            return result
77        for filename in sorted(os.listdir(self.storage)):
78            fullpath = os.path.join(self.storage, filename)
79            if not os.path.isfile(fullpath):
80                continue
81            if not filename.endswith('.csv'):
82                continue
83            result.append(DataCenterFile(fullpath))
84        if sort == 'date':
85            # sort results in newest-first order...
86            result = sorted(result, key=lambda x: x.getTimeStamp(),
87                            reverse=True)
88        return result
89
90    def getFinishedFiles(self):
91        """Get a list of files stored in `finished` subfolder of `storage`.
92
93        Files are unsorted.
94        """
95        result = []
96        finished_dir = os.path.join(self.storage, 'finished')
97        if not os.path.exists(finished_dir):
98            return result
99        for filename in sorted(os.listdir(finished_dir)):
100            fullpath = os.path.join(finished_dir, filename)
101            if not os.path.isfile(fullpath):
102                continue
103            if not filename.endswith('.csv'):
104                continue
105            result.append(DataCenterFile(fullpath, 'finished'))
106        return result
107
108    def getLogFiles(self, exclude_backups=True):
109        """Get the files from logs/ subdir. Files are sorted by name.
110
111        By default backup logs ('app.log.1', etc.) are excluded.
112        """
113        result = []
114        logdir = os.path.join(self.storage, 'logs')
115        if not os.path.exists(logdir):
116            os.mkdir(logdir)
117        for name in sorted(os.listdir(logdir)):
118            if not os.path.isfile(os.path.join(logdir, name)):
119                continue
120            if exclude_backups == True and RE_LOGFILE_BACKUP_NAME.match(name):
121                continue
122            result.append(
123                LogFile(os.path.join(self.storage, 'logs', name)))
124        return result
125
126    def setStoragePath(self, path, move=False, overwrite=False):
127        """Set the path where to store files.
128        """
129        path = os.path.abspath(path)
130        not_copied = []
131        if not os.path.exists(path):
132            raise ValueError('The path given does not exist: %s' % path)
133        if move is True:
134            not_copied = copy_filesystem_tree(self.storage, path,
135                                            overwrite=overwrite)
136        self.storage = path
137        self._createSubDirs()
138        grok.notify(DataCenterStorageMovedEvent(self))
139        return not_copied
140
141    def _moveFile(self, source, dest):
142        """Move file source to dest preserving ctime, mtime, etc.
143        """
144        if not os.path.exists(source):
145            self.logger.warn('No such source path: %s' % source)
146            return
147        if source == dest:
148            return
149        shutil.copyfile(source, dest)
150        shutil.copystat(source, dest)
151        os.unlink(source)
152
153    def _appendCSVFile(self, source, dest):
154        """Append data from CSV file `source` to data from CSV file `dest`.
155
156        The `source` file is deleted afterwards.
157        """
158        if not os.path.exists(dest):
159            return self._moveFile(source, dest)
160        if not os.path.exists(source):
161            self.logger.warn('No such source path: %s' % source)
162            return
163        if source == dest:
164            return
165        result_path = merge_csv_files(dest, source)
166        os.chmod(result_path, 0664)
167        self._moveFile(result_path, dest)
168        os.unlink(source)
169
170    def distProcessedFiles(self, successful, source_path, finished_file,
171                           pending_file, mode='create', move_orig=True):
172        """Put processed files into final locations.
173
174        ``successful`` is a boolean that tells, whether processing was
175        successful.
176
177        ``source_path``: path to file that was processed.
178
179        ``finished_file``, ``pending_file``: paths to the respective
180        generated .pending and .finished file. The .pending file path
181        may be ``None``.
182
183        If finished file is placed in a location outside the local
184        storage dir, the complete directory is removed
185        afterwards. Regular processors should put their stuff in
186        dedicated temporary dirs.
187
188        See datacenter.txt for more info about how this works.
189        """
190        basename = os.path.basename(source_path)
191        pending_name = basename
192        pending = False
193        finished_dir = os.path.join(self.storage, 'finished')
194        unfinished_dir = os.path.join(self.storage, 'unfinished')
195
196        if basename.endswith('.pending.csv'):
197            maybe_basename = "%s.csv" % basename.rsplit('.', 3)[0]
198            maybe_src = os.path.join(unfinished_dir, maybe_basename)
199            if os.path.isfile(maybe_src):
200                basename = maybe_basename
201                pending = True
202
203        base, ext = os.path.splitext(basename)
204        finished_name = "%s.%s.finished%s" % (base, mode, ext)
205        if not pending:
206            pending_name = "%s.%s.pending%s" % (base, mode, ext)
207
208        # Put .pending and .finished file into respective places...
209        pending_dest = os.path.join(self.storage, pending_name)
210        finished_dest = os.path.join(finished_dir, finished_name)
211        self._appendCSVFile(finished_file, finished_dest)
212        if pending_file is not None:
213            self._moveFile(pending_file, pending_dest)
214
215        # Put source file into final location...
216        finished_dest = os.path.join(finished_dir, basename)
217        unfinished_dest = os.path.join(unfinished_dir, basename)
218        if successful and not pending:
219            self._moveFile(source_path, finished_dest)
220        elif successful and pending:
221            self._moveFile(unfinished_dest, finished_dest)
222            os.unlink(source_path)
223        elif not successful and not pending:
224            self._moveFile(source_path, unfinished_dest)
225
226        # If finished and pending-file were created in a location
227        # outside datacenter storage, we remove it.
228        maybe_temp_dir = os.path.dirname(finished_file)
229        if os.path.commonprefix(
230            [self.storage, maybe_temp_dir]) != self.storage:
231            shutil.rmtree(maybe_temp_dir)
232        return
233
234    def _logfiles(self, basename):
235        """Get sorted logfiles starting with `basename`.
236        """
237        def numerical_suffix(name):
238            # return numerical suffix in `name` as number or 0.
239            suffix = name.rsplit('.', 1)[-1]
240            try:
241                return int(suffix)
242            except ValueError:
243                return 0
244            pass
245        files = [basename,]
246        for name in os.listdir(os.path.join(self.storage, 'logs')):
247            if RE_LOGFILE_BACKUP_NAME.match(name):
248                if name.rsplit('.', 1)[0] == basename:
249                    files.append(name)
250        return sorted(files, key=numerical_suffix, reverse=True)
251
252    def queryLogfiles(self, basename, query=None, limit=0, start=0):
253        """Search `query` in all logfiles starting with `basename`.
254
255        Returns an iterator of those lines in logfiles starting with
256        `basename` that match `query`. If you want the result as a
257        list, simply list() the iterator.
258
259        All logfiles with name `basename` and maybe some numerical
260        extension ('.1', '.2', ...) are searched for the `query` term
261        in correct chronological order. So, if you ask for a basename 'app.log',
262        then any file named 'app2.log', 'app.log.1', 'app.log',
263        etc. will be searched in that order.
264
265        The `query` is expected to be a string containing a regular
266        expression.
267
268        If `limit` is set to some numerical value, at most this number
269        of lines is returned.
270
271        With `start` you can give the number of first matched line to
272        return. `start` is zero-based, i.e. the first match has number
273        0, the scond one 1, etc.
274
275        Together with `limit` this allows some basic
276        batching. Please keep in mind that batching might give
277        unpredictable results, when logfiles change between two
278        requests. This is not a problem when only one file is searched
279        and changes include only appending new log messages.
280
281        Matches are found per line only (no multiline matches).
282
283        This method raises ValueError if some basic condition is not
284        met, for instance if the given query string is not a valid
285        regular expression.
286
287        Please note, that this exception will happen not before you
288        really fetch a result line.
289        """
290        try:
291            re_query = re.compile(query)
292        except:
293            raise ValueError('Invalid query string: %s' % query)
294
295        basename = basename.replace('/../', '')
296        files = self._logfiles(basename)
297
298        # Search the log files
299        num = 0
300        for name in files:
301            path = os.path.join(self.storage, 'logs', name)
302            if not os.path.isfile(path):
303                continue
304            for line in open(path, 'rb'):
305                if not re_query.search(line):
306                    continue
307                num += 1
308                if (num - 1) < start:
309                    continue
310                yield line
311
312                if limit and (num - limit >= start):
313                    raise StopIteration
314        pass
315
316class DataCenterFile(object):
317    """A description of a file stored in data center.
318    """
319    grok.implements(IDataCenterFile)
320
321    def __init__(self, context, folder_name=''):
322        self.context = context
323        self.name = os.path.basename(self.context)
324        self.rel_path = os.path.join(folder_name, self.name)
325        self.size = self.getSize()
326        self.uploaddate = self.getDate()
327        self.lines = self.getLinesNumber()
328
329    def getDate(self):
330        """Get a human readable datetime representation.
331        """
332        date = datetime.fromtimestamp(os.path.getctime(self.context))
333        return date.strftime("%Y-%m-%d %H:%M:%S")
334
335    def getTimeStamp(self):
336        """Get a (machine readable) timestamp.
337        """
338        return os.path.getctime(self.context)
339
340    def getSize(self):
341        """Get a human readable file size.
342        """
343        bytesize = os.path.getsize(self.context)
344        size = "%s bytes" % bytesize
345        units = ['kb', 'MB', 'GB']
346        for power, unit in reversed(list(enumerate(units))):
347            power += 1
348            if bytesize >= 1024 ** power:
349                size = "%.2f %s" % (bytesize/(1024.0**power), unit)
350                break
351        return size
352
353    def getLinesNumber(self):
354        """Get number of lines.
355        """
356        num = 0
357        for line in open(self.context, 'rb'):
358            num += 1
359        return num
360
361class LogFile(DataCenterFile):
362    """A description of a log file.
363    """
364    def __init__(self, context):
365        super(LogFile, self).__init__(context)
366        self._markers = dict()
367        self._parsed = False
368        self.userid = self.getUserId()
369        self.mode = self.getMode()
370        self.stats = self.getStats()
371        self.source = self.getSourcePath()
372
373    def _parseFile(self, maxline=10):
374        """Find markers in a file.
375        """
376        if self._parsed:
377            return
378        for line in open(self.context, 'rb'):
379            line = line.strip()
380            if not ':' in line:
381                continue
382            name, text = line.split(':', 1)
383            self._markers[name.lower()] = text
384        self._parsed = True
385        return
386
387    def _getMarker(self, marker):
388        marker = marker.lower()
389        if not self._parsed:
390            self._parseFile()
391        if marker in self._markers.keys():
392            return self._markers[marker]
393
394    def getUserId(self):
395        return self._getMarker('user') or '<UNKNOWN>'
396
397    def getMode(self):
398        return self._getMarker('mode') or '<NOT SET>'
399
400    def getStats(self):
401        return self._getMarker('processed') or '<Info not avail.>'
402
403    def getSourcePath(self):
404        return self._getMarker('source') or None
405
406
407class DataCenterStorageMovedEvent(ObjectEvent):
408    """An event fired, when datacenter storage moves.
409    """
410    grok.implements(IDataCenterStorageMovedEvent)
Note: See TracBrowser for help on using the repository browser.