source: main/waeup.kofa/trunk/src/waeup/kofa/datacenter.py @ 9180

Last change on this file since 9180 was 9074, checked in by Henrik Bettermann, 12 years ago

Fix export finished file name.

Sort files by upload date.

  • Property svn:keywords set to Id
File size: 14.4 KB
Line 
1## $Id: datacenter.py 9074 2012-07-28 04:47:58Z henrik $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
18"""Kofa data center.
19
20The waeup data center cares for management of upload data and provides
21tools for importing/exporting CSV data.
22"""
23import os
24import re
25import shutil
26import grok
27from datetime import datetime
28from zope.component import getUtility
29from zope.component.interfaces import ObjectEvent
30from waeup.kofa.interfaces import (IDataCenter, IDataCenterFile,
31                                   IDataCenterStorageMovedEvent,
32                                   IDataCenterConfig)
33from waeup.kofa.utils.helpers import copy_filesystem_tree, merge_csv_files
34from waeup.kofa.utils.logger import Logger
35
36#: Regular expression describing a logfile name with backup extension
37RE_LOGFILE_BACKUP_NAME = re.compile('^.+\.\d+$')
38
39class DataCenter(grok.Container, Logger):
40    """A data center contains CSV files.
41    """
42    grok.implements(IDataCenter)
43
44    logger_name = 'waeup.kofa.${sitename}.datacenter'
45    logger_filename = 'datacenter.log'
46
47    def __init__(self, *args, **kw):
48        super(DataCenter, self).__init__(*args, **kw)
49        self.storage = getUtility(IDataCenterConfig)['path']
50        self._createSubDirs()
51
52    def _createSubDirs(self):
53        """Create standard subdirs.
54        """
55        for name in ['finished', 'unfinished', 'logs', 'deleted']:
56            path = os.path.join(self.storage, name)
57            if os.path.exists(path):
58                continue
59            os.mkdir(path)
60        return
61
62    @property
63    def deleted_path(self):
64        """Get the path for deleted object data.
65        """
66        return os.path.join(self.storage, 'deleted')
67
68    def getPendingFiles(self, sort='name'):
69        """Get a list of files stored in `storage`.
70
71        Files are sorted by basename.
72        """
73        result = []
74        if not os.path.exists(self.storage):
75            return result
76        for filename in sorted(os.listdir(self.storage)):
77            fullpath = os.path.join(self.storage, filename)
78            if not os.path.isfile(fullpath):
79                continue
80            if not filename.endswith('.csv'):
81                continue
82            result.append(DataCenterFile(fullpath))
83        if sort == 'date':
84            # sort results in newest-first order...
85            result = sorted(result, key=lambda x: x.getTimeStamp(),
86                            reverse=True)
87        return result
88
89    def getFinishedFiles(self):
90        """Get a list of files stored in `finished` subfolder of `storage`.
91
92        Files are unsorted.
93        """
94        result = []
95        finished_dir = os.path.join(self.storage, 'finished')
96        if not os.path.exists(finished_dir):
97            return result
98        for filename in sorted(os.listdir(finished_dir)):
99            fullpath = os.path.join(finished_dir, filename)
100            if not os.path.isfile(fullpath):
101                continue
102            if not filename.endswith('.csv'):
103                continue
104            result.append(DataCenterFile(fullpath, 'finished'))
105        return result
106
107    def getLogFiles(self, exclude_backups=True):
108        """Get the files from logs/ subdir. Files are sorted by name.
109
110        By default backup logs ('app.log.1', etc.) are excluded.
111        """
112        result = []
113        logdir = os.path.join(self.storage, 'logs')
114        if not os.path.exists(logdir):
115            os.mkdir(logdir)
116        for name in sorted(os.listdir(logdir)):
117            if not os.path.isfile(os.path.join(logdir, name)):
118                continue
119            if exclude_backups == True and RE_LOGFILE_BACKUP_NAME.match(name):
120                continue
121            result.append(
122                LogFile(os.path.join(self.storage, 'logs', name)))
123        return result
124
125    def setStoragePath(self, path, move=False, overwrite=False):
126        """Set the path where to store files.
127        """
128        path = os.path.abspath(path)
129        not_copied = []
130        if not os.path.exists(path):
131            raise ValueError('The path given does not exist: %s' % path)
132        if move is True:
133            not_copied = copy_filesystem_tree(self.storage, path,
134                                            overwrite=overwrite)
135        self.storage = path
136        self._createSubDirs()
137        grok.notify(DataCenterStorageMovedEvent(self))
138        return not_copied
139
140    def _moveFile(self, source, dest):
141        """Move file source to dest preserving ctime, mtime, etc.
142        """
143        if not os.path.exists(source):
144            self.logger.warn('No such source path: %s' % source)
145            return
146        if source == dest:
147            return
148        shutil.copyfile(source, dest)
149        shutil.copystat(source, dest)
150        os.unlink(source)
151
152    def _appendCSVFile(self, source, dest):
153        """Append data from CSV file `source` to data from CSV file `dest`.
154
155        The `source` file is deleted afterwards.
156        """
157        if not os.path.exists(dest):
158            return self._moveFile(source, dest)
159        if not os.path.exists(source):
160            self.logger.warn('No such source path: %s' % source)
161            return
162        if source == dest:
163            return
164        result_path = merge_csv_files(dest, source)
165        os.chmod(result_path, 0664)
166        self._moveFile(result_path, dest)
167        os.unlink(source)
168
169    def distProcessedFiles(self, successful, source_path, finished_file,
170                           pending_file, mode='create', move_orig=True):
171        """Put processed files into final locations.
172
173        ``successful`` is a boolean that tells, whether processing was
174        successful.
175
176        ``source_path``: path to file that was processed.
177
178        ``finished_file``, ``pending_file``: paths to the respective
179        generated .pending and .finished file. The .pending file path
180        may be ``None``.
181
182        If finished file is placed in a location outside the local
183        storage dir, the complete directory is removed
184        afterwards. Regular processors should put their stuff in
185        dedicated temporary dirs.
186
187        See datacenter.txt for more info about how this works.
188        """
189        basename = os.path.basename(source_path)
190        pending_name = basename
191        pending = False
192        finished_dir = os.path.join(self.storage, 'finished')
193        unfinished_dir = os.path.join(self.storage, 'unfinished')
194
195        if basename.endswith('.pending.csv'):
196            maybe_basename = "%s.csv" % basename.rsplit('.', 3)[0]
197            maybe_src = os.path.join(unfinished_dir, maybe_basename)
198            if os.path.isfile(maybe_src):
199                basename = maybe_basename
200                pending = True
201
202        base, ext = os.path.splitext(basename)
203        finished_name = "%s.%s.finished%s" % (base, mode, ext)
204        if not pending:
205            pending_name = "%s.%s.pending%s" % (base, mode, ext)
206
207        # Put .pending and .finished file into respective places...
208        pending_dest = os.path.join(self.storage, pending_name)
209        finished_dest = os.path.join(finished_dir, finished_name)
210        self._appendCSVFile(finished_file, finished_dest)
211        if pending_file is not None:
212            self._moveFile(pending_file, pending_dest)
213
214        # Put source file into final location...
215        finished_dest = os.path.join(finished_dir, basename)
216        unfinished_dest = os.path.join(unfinished_dir, basename)
217        if successful and not pending:
218            self._moveFile(source_path, finished_dest)
219        elif successful and pending:
220            self._moveFile(unfinished_dest, finished_dest)
221            os.unlink(source_path)
222        elif not successful and not pending:
223            self._moveFile(source_path, unfinished_dest)
224
225        # If finished and pending-file were created in a location
226        # outside datacenter storage, we remove it.
227        maybe_temp_dir = os.path.dirname(finished_file)
228        if os.path.commonprefix(
229            [self.storage, maybe_temp_dir]) != self.storage:
230            shutil.rmtree(maybe_temp_dir)
231        return
232
233    def _logfiles(self, basename):
234        """Get sorted logfiles starting with `basename`.
235        """
236        def numerical_suffix(name):
237            # return numerical suffix in `name` as number or 0.
238            suffix = name.rsplit('.', 1)[-1]
239            try:
240                return int(suffix)
241            except ValueError:
242                return 0
243            pass
244        files = [basename,]
245        for name in os.listdir(os.path.join(self.storage, 'logs')):
246            if RE_LOGFILE_BACKUP_NAME.match(name):
247                if name.rsplit('.', 1)[0] == basename:
248                    files.append(name)
249        return sorted(files, key=numerical_suffix, reverse=True)
250
251    def queryLogfiles(self, basename, query=None, limit=0, start=0):
252        """Search `query` in all logfiles starting with `basename`.
253
254        Returns an iterator of those lines in logfiles starting with
255        `basename` that match `query`. If you want the result as a
256        list, simply list() the iterator.
257
258        All logfiles with name `basename` and maybe some numerical
259        extension ('.1', '.2', ...) are searched for the `query` term
260        in correct chronological order. So, if you ask for a basename 'app.log',
261        then any file named 'app2.log', 'app.log.1', 'app.log',
262        etc. will be searched in that order.
263
264        The `query` is expected to be a string containing a regular
265        expression.
266
267        If `limit` is set to some numerical value, at most this number
268        of lines is returned.
269
270        With `start` you can give the number of first matched line to
271        return. `start` is zero-based, i.e. the first match has number
272        0, the scond one 1, etc.
273
274        Together with `limit` this allows some basic
275        batching. Please keep in mind that batching might give
276        unpredictable results, when logfiles change between two
277        requests. This is not a problem when only one file is searched
278        and changes include only appending new log messages.
279
280        Matches are found per line only (no multiline matches).
281
282        This method raises ValueError if some basic condition is not
283        met, for instance if the given query string is not a valid
284        regular expression.
285
286        Please note, that this exception will happen not before you
287        really fetch a result line.
288        """
289        try:
290            re_query = re.compile(query)
291        except:
292            raise ValueError('Invalid query string: %s' % query)
293
294        basename = basename.replace('/../', '')
295        files = self._logfiles(basename)
296
297        # Search the log files
298        num = 0
299        for name in files:
300            path = os.path.join(self.storage, 'logs', name)
301            if not os.path.isfile(path):
302                continue
303            for line in open(path, 'rb'):
304                if not re_query.search(line):
305                    continue
306                num += 1
307                if (num - 1) < start:
308                    continue
309                yield line
310
311                if limit and (num - limit >= start):
312                    raise StopIteration
313        pass
314
315class DataCenterFile(object):
316    """A description of a file stored in data center.
317    """
318    grok.implements(IDataCenterFile)
319
320    def __init__(self, context, folder_name=''):
321        self.context = context
322        self.name = os.path.basename(self.context)
323        self.rel_path = os.path.join(folder_name, self.name)
324        self.size = self.getSize()
325        self.uploaddate = self.getDate()
326        self.lines = self.getLinesNumber()
327
328    def getDate(self):
329        """Get a human readable datetime representation.
330        """
331        date = datetime.fromtimestamp(os.path.getctime(self.context))
332        return date.strftime("%Y-%m-%d %H:%M:%S")
333
334    def getTimeStamp(self):
335        """Get a (machine readable) timestamp.
336        """
337        return os.path.getctime(self.context)
338
339    def getSize(self):
340        """Get a human readable file size.
341        """
342        bytesize = os.path.getsize(self.context)
343        size = "%s bytes" % bytesize
344        units = ['kb', 'MB', 'GB']
345        for power, unit in reversed(list(enumerate(units))):
346            power += 1
347            if bytesize >= 1024 ** power:
348                size = "%.2f %s" % (bytesize/(1024.0**power), unit)
349                break
350        return size
351
352    def getLinesNumber(self):
353        """Get number of lines.
354        """
355        num = 0
356        for line in open(self.context, 'rb'):
357            num += 1
358        return num
359
360class LogFile(DataCenterFile):
361    """A description of a log file.
362    """
363    def __init__(self, context):
364        super(LogFile, self).__init__(context)
365        self._markers = dict()
366        self._parsed = False
367        self.userid = self.getUserId()
368        self.mode = self.getMode()
369        self.stats = self.getStats()
370        self.source = self.getSourcePath()
371
372    def _parseFile(self, maxline=10):
373        """Find markers in a file.
374        """
375        if self._parsed:
376            return
377        for line in open(self.context, 'rb'):
378            line = line.strip()
379            if not ':' in line:
380                continue
381            name, text = line.split(':', 1)
382            self._markers[name.lower()] = text
383        self._parsed = True
384        return
385
386    def _getMarker(self, marker):
387        marker = marker.lower()
388        if not self._parsed:
389            self._parseFile()
390        if marker in self._markers.keys():
391            return self._markers[marker]
392
393    def getUserId(self):
394        return self._getMarker('user') or '<UNKNOWN>'
395
396    def getMode(self):
397        return self._getMarker('mode') or '<NOT SET>'
398
399    def getStats(self):
400        return self._getMarker('processed') or '<Info not avail.>'
401
402    def getSourcePath(self):
403        return self._getMarker('source') or None
404
405
406class DataCenterStorageMovedEvent(ObjectEvent):
407    """An event fired, when datacenter storage moves.
408    """
409    grok.implements(IDataCenterStorageMovedEvent)
Note: See TracBrowser for help on using the repository browser.