source: main/waeup.kofa/branches/henrik-transcript-workflow/src/waeup/kofa/datacenter.py @ 15159

Last change on this file since 15159 was 14680, checked in by Henrik Bettermann, 8 years ago

Do not count lines if file isn't a csv file.

  • Property svn:keywords set to Id
File size: 15.3 KB
Line 
1## $Id: datacenter.py 14680 2017-04-24 13:21:51Z henrik $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
18"""Kofa data center.
19
20The waeup data center cares for management of upload data and provides
21tools for importing/exporting CSV data.
22"""
23import codecs
24import fnmatch
25import grok
26import os
27import re
28import shutil
29from datetime import datetime
30from zope.component import getUtility
31from zope.component.interfaces import ObjectEvent
32from waeup.kofa.interfaces import (IDataCenter, IDataCenterFile,
33                                   IDataCenterStorageMovedEvent,
34                                   IDataCenterConfig)
35from waeup.kofa.utils.batching import ExportJobContainer
36from waeup.kofa.utils.helpers import copy_filesystem_tree, merge_csv_files
37from waeup.kofa.utils.logger import Logger
38
39#: Regular expression describing a logfile name with backup extension
40RE_LOGFILE_BACKUP_NAME = re.compile('^.+\.\d+$')
41
42class DataCenter(grok.Container, Logger, ExportJobContainer):
43    """A data center contains CSV files.
44    """
45    grok.implements(IDataCenter)
46
47    logger_name = 'waeup.kofa.${sitename}.datacenter'
48    logger_filename = 'datacenter.log'
49
50    max_files = 100
51
52    def __init__(self, *args, **kw):
53        super(DataCenter, self).__init__(*args, **kw)
54        self.storage = getUtility(IDataCenterConfig)['path']
55        self._createSubDirs()
56
57    def _createSubDirs(self):
58        """Create standard subdirs.
59        """
60        for name in ['finished', 'unfinished', 'logs', 'deleted']:
61            path = os.path.join(self.storage, name)
62            if os.path.exists(path):
63                continue
64            os.mkdir(path)
65        return
66
67    @property
68    def deleted_path(self):
69        """Get the path for deleted object data.
70        """
71        return os.path.join(self.storage, 'deleted')
72
73    def getPendingFiles(self, sort='name'):
74        """Get a list of files stored in `storage`.
75
76        Files are sorted by basename.
77        """
78        result = []
79        if not os.path.exists(self.storage):
80            return result
81        for filename in sorted(os.listdir(self.storage)):
82            fullpath = os.path.join(self.storage, filename)
83            if not os.path.isfile(fullpath):
84                continue
85            if not (filename.endswith('.csv') or filename.endswith('.gz')):
86                continue
87            result.append(DataCenterFile(fullpath))
88        if sort == 'date':
89            # sort results in newest-first order...
90            result = sorted(result, key=lambda x: x.getTimeStamp(),
91                            reverse=True)
92        return result
93
94    def getFinishedFiles(self):
95        """Get a list of files stored in `finished` subfolder of `storage`.
96
97        Files are unsorted.
98        """
99        result = []
100        finished_dir = os.path.join(self.storage, 'finished')
101        if not os.path.exists(finished_dir):
102            return result
103        mtime = lambda f: os.stat(os.path.join(finished_dir, f)).st_mtime
104        finished_files = [f for f in
105            sorted(os.listdir(finished_dir), key=mtime, reverse=True)
106            if fnmatch.fnmatch(f, '*.finished.csv')]
107        for filename in finished_files[:self.max_files]:
108            fullpath = os.path.join(finished_dir, filename)
109            if not os.path.isfile(fullpath):
110                continue
111            if not filename.endswith('.csv'):
112                continue
113            result.append(DataCenterFile(fullpath, 'finished'))
114        return result
115
116    def getLogFiles(self, exclude_backups=True):
117        """Get the files from logs/ subdir. Files are sorted by name.
118
119        By default backup logs ('app.log.1', etc.) and payments.log
120        are excluded.
121        """
122        result = []
123        logdir = os.path.join(self.storage, 'logs')
124        if not os.path.exists(logdir):
125            os.mkdir(logdir)
126        for name in sorted(os.listdir(logdir)):
127            if not os.path.isfile(os.path.join(logdir, name)):
128                continue
129            if name == 'payments.log':
130                continue
131            if exclude_backups == True and RE_LOGFILE_BACKUP_NAME.match(name):
132                continue
133            result.append(
134                LogFile(os.path.join(self.storage, 'logs', name)))
135        return result
136
137    def setStoragePath(self, path, move=False, overwrite=False):
138        """Set the path where to store files.
139        """
140        path = os.path.abspath(path)
141        not_copied = []
142        if not os.path.exists(path):
143            raise ValueError('The path given does not exist: %s' % path)
144        if move is True:
145            not_copied = copy_filesystem_tree(self.storage, path,
146                                            overwrite=overwrite)
147        self.storage = path
148        self._createSubDirs()
149        grok.notify(DataCenterStorageMovedEvent(self))
150        return not_copied
151
152    def _moveFile(self, source, dest):
153        """Move file source to dest preserving ctime, mtime, etc.
154        """
155        if not os.path.exists(source):
156            self.logger.warn('No such source path: %s' % source)
157            return
158        if source == dest:
159            return
160        shutil.copyfile(source, dest)
161        shutil.copystat(source, dest)
162        os.unlink(source)
163
164    def _appendCSVFile(self, source, dest):
165        """Append data from CSV file `source` to data from CSV file `dest`.
166
167        The `source` file is deleted afterwards.
168        """
169        if not os.path.exists(dest):
170            return self._moveFile(source, dest)
171        if not os.path.exists(source):
172            self.logger.warn('No such source path: %s' % source)
173            return
174        if source == dest:
175            return
176        result_path = merge_csv_files(dest, source)
177        os.chmod(result_path, 0664)
178        self._moveFile(result_path, dest)
179        os.unlink(source)
180
181    def distProcessedFiles(self, successful, source_path, finished_file,
182                           pending_file, mode='create', move_orig=True):
183        """Put processed files into final locations.
184
185        ``successful`` is a boolean that tells, whether processing was
186        successful.
187
188        ``source_path``: path to file that was processed.
189
190        ``finished_file``, ``pending_file``: paths to the respective
191        generated .pending and .finished file. The .pending file path
192        may be ``None``.
193
194        If finished file is placed in a location outside the local
195        storage dir, the complete directory is removed
196        afterwards. Regular processors should put their stuff in
197        dedicated temporary dirs.
198
199        See datacenter.txt for more info about how this works.
200        """
201        basename = os.path.basename(source_path)
202        pending_name = basename
203        pending = False
204        finished_dir = os.path.join(self.storage, 'finished')
205        unfinished_dir = os.path.join(self.storage, 'unfinished')
206
207        if basename.endswith('.pending.csv'):
208            maybe_basename = "%s.csv" % basename.rsplit('.', 3)[0]
209            maybe_src = os.path.join(unfinished_dir, maybe_basename)
210            if os.path.isfile(maybe_src):
211                basename = maybe_basename
212                pending = True
213
214        base, ext = os.path.splitext(basename)
215        finished_name = "%s.%s.finished%s" % (base, mode, ext)
216        if not pending:
217            pending_name = "%s.%s.pending%s" % (base, mode, ext)
218
219        # Put .pending and .finished file into respective places...
220        pending_dest = os.path.join(self.storage, pending_name)
221        finished_dest = os.path.join(finished_dir, finished_name)
222        self._appendCSVFile(finished_file, finished_dest)
223        if pending_file is not None:
224            self._moveFile(pending_file, pending_dest)
225
226        # Put source file into final location...
227        finished_dest = os.path.join(finished_dir, basename)
228        unfinished_dest = os.path.join(unfinished_dir, basename)
229        if successful and not pending:
230            self._moveFile(source_path, finished_dest)
231        elif successful and pending:
232            self._moveFile(unfinished_dest, finished_dest)
233            os.unlink(source_path)
234        elif not successful and not pending:
235            self._moveFile(source_path, unfinished_dest)
236
237        # If finished and pending-file were created in a location
238        # outside datacenter storage, we remove it.
239        maybe_temp_dir = os.path.dirname(finished_file)
240        if os.path.commonprefix(
241            [self.storage, maybe_temp_dir]) != self.storage:
242            shutil.rmtree(maybe_temp_dir)
243        return
244
245    def _logfiles(self, basename):
246        """Get sorted logfiles starting with `basename`.
247        """
248        def numerical_suffix(name):
249            # return numerical suffix in `name` as number or 0.
250            suffix = name.rsplit('.', 1)[-1]
251            try:
252                return int(suffix)
253            except ValueError:
254                return 0
255            pass
256        files = [basename,]
257        for name in os.listdir(os.path.join(self.storage, 'logs')):
258            if RE_LOGFILE_BACKUP_NAME.match(name):
259                if name.rsplit('.', 1)[0] == basename:
260                    files.append(name)
261        return sorted(files, key=numerical_suffix, reverse=True)
262
263    def queryLogfiles(self, basename, query=None, limit=0, start=0):
264        """Search `query` in all logfiles starting with `basename`.
265
266        Returns an iterator of those lines in logfiles starting with
267        `basename` that match `query`. If you want the result as a
268        list, simply list() the iterator.
269
270        All logfiles with name `basename` and maybe some numerical
271        extension ('.1', '.2', ...) are searched for the `query` term
272        in correct chronological order. So, if you ask for a basename 'app.log',
273        then any file named 'app2.log', 'app.log.1', 'app.log',
274        etc. will be searched in that order.
275
276        The `query` is expected to be a string containing a regular
277        expression.
278
279        If `limit` is set to some numerical value, at most this number
280        of lines is returned.
281
282        With `start` you can give the number of first matched line to
283        return. `start` is zero-based, i.e. the first match has number
284        0, the scond one 1, etc.
285
286        Together with `limit` this allows some basic
287        batching. Please keep in mind that batching might give
288        unpredictable results, when logfiles change between two
289        requests. This is not a problem when only one file is searched
290        and changes include only appending new log messages.
291
292        Matches are found per line only (no multiline matches).
293
294        Result lines are returned as unicode instances decoded from
295        UTF-8 encoding. This means that logfiles must provide UTF-8
296        encoding for umlauts etc. if these should be rendered
297        properly. The returned unicode lines can be fed to page
298        templates even if they contain non-ASCII characters.
299
300        This method raises ValueError if some basic condition is not
301        met, for instance if the given query string is not a valid
302        regular expression.
303
304        Please note, that this exception will happen not before you
305        really fetch a result line.
306        """
307        try:
308            re_query = re.compile(query)
309        except:
310            raise ValueError('Invalid query string: %s' % query)
311
312        basename = basename.replace('/../', '')
313        files = self._logfiles(basename)
314
315        # Search the log files
316        num = 0
317        for name in files:
318            path = os.path.join(self.storage, 'logs', name)
319            if not os.path.isfile(path):
320                continue
321            for line in codecs.open(path, 'rb', 'utf-8'):
322                if not re_query.search(line):
323                    continue
324                num += 1
325                if (num - 1) < start:
326                    continue
327                yield line
328
329                if limit and (num - limit >= start):
330                    raise StopIteration
331        pass
332
333class DataCenterFile(object):
334    """A description of a file stored in data center.
335    """
336    grok.implements(IDataCenterFile)
337
338    def __init__(self, context, folder_name=''):
339        self.context = context
340        self.name = os.path.basename(self.context)
341        self.rel_path = os.path.join(folder_name, self.name)
342        self.size = self.getSize()
343        self.uploaddate = self.getDate()
344        self.lines = self.getLinesNumber()
345
346    def getDate(self):
347        """Get a human readable datetime representation.
348        """
349        date = datetime.fromtimestamp(os.path.getmtime(self.context))
350        return date.strftime("%Y-%m-%d %H:%M:%S")
351
352    def getTimeStamp(self):
353        """Get a (machine readable) timestamp.
354        """
355        return os.path.getmtime(self.context)
356
357    def getSize(self):
358        """Get a human readable file size.
359        """
360        bytesize = os.path.getsize(self.context)
361        size = "%s bytes" % bytesize
362        units = ['kb', 'MB', 'GB']
363        for power, unit in reversed(list(enumerate(units))):
364            power += 1
365            if bytesize >= 1024 ** power:
366                size = "%.2f %s" % (bytesize/(1024.0**power), unit)
367                break
368        return size
369
370    def getLinesNumber(self):
371        """Get number of lines.
372        """
373        num = 0
374        if not self.context.title().lower().endswith('csv'):
375            return 1
376        for line in open(self.context, 'rb'):
377            num += 1
378        return num
379
380class LogFile(DataCenterFile):
381    """A description of a log file.
382    """
383    def __init__(self, context):
384        super(LogFile, self).__init__(context)
385        self._markers = dict()
386        self._parsed = False
387        self.userid = self.getUserId()
388        self.mode = self.getMode()
389        self.stats = self.getStats()
390        self.source = self.getSourcePath()
391
392    def _parseFile(self, maxline=10):
393        """Find markers in a file.
394        """
395        if self._parsed:
396            return
397        for line in codecs.open(self.context, 'rb', 'utf-8'):
398            line = line.strip()
399            if not ':' in line:
400                continue
401            name, text = line.split(':', 1)
402            self._markers[name.lower()] = text
403        self._parsed = True
404        return
405
406    def _getMarker(self, marker):
407        marker = marker.lower()
408        if not self._parsed:
409            self._parseFile()
410        if marker in self._markers.keys():
411            return self._markers[marker]
412
413    def getUserId(self):
414        return self._getMarker('user') or '<UNKNOWN>'
415
416    def getMode(self):
417        return self._getMarker('mode') or '<NOT SET>'
418
419    def getStats(self):
420        return self._getMarker('processed') or '<Info not avail.>'
421
422    def getSourcePath(self):
423        return self._getMarker('source') or None
424
425
426class DataCenterStorageMovedEvent(ObjectEvent):
427    """An event fired, when datacenter storage moves.
428    """
429    grok.implements(IDataCenterStorageMovedEvent)
Note: See TracBrowser for help on using the repository browser.