source: main/waeup.kofa/trunk/src/waeup/kofa/datacenter.py @ 16310

Last change on this file since 16310 was 15430, checked in by Henrik Bettermann, 6 years ago

Make maximum number of files listed in finished subfolder customizable.

  • Property svn:keywords set to Id
File size: 15.6 KB
RevLine 
[7193]1## $Id: datacenter.py 15430 2019-05-28 06:14:44Z henrik $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
[7819]18"""Kofa data center.
[4146]19
20The waeup data center cares for management of upload data and provides
21tools for importing/exporting CSV data.
22"""
[10045]23import codecs
24import fnmatch
25import grok
[4146]26import os
[8512]27import re
[4896]28import shutil
[4146]29from datetime import datetime
[7568]30from zope.component import getUtility
[4883]31from zope.component.interfaces import ObjectEvent
[7811]32from waeup.kofa.interfaces import (IDataCenter, IDataCenterFile,
[7568]33                                   IDataCenterStorageMovedEvent,
[15430]34                                   IDataCenterConfig,
35                                   IKofaUtils)
[9217]36from waeup.kofa.utils.batching import ExportJobContainer
[8634]37from waeup.kofa.utils.helpers import copy_filesystem_tree, merge_csv_files
[7811]38from waeup.kofa.utils.logger import Logger
[4146]39
[15430]40
[8512]41#: Regular expression describing a logfile name with backup extension
[8516]42RE_LOGFILE_BACKUP_NAME = re.compile('^.+\.\d+$')
[8512]43
[9217]44class DataCenter(grok.Container, Logger, ExportJobContainer):
[4146]45    """A data center contains CSV files.
46    """
[4669]47    grok.implements(IDataCenter)
[4166]48
[7811]49    logger_name = 'waeup.kofa.${sitename}.datacenter'
[6578]50    logger_filename = 'datacenter.log'
[4873]51
[15430]52    @property
53    def max_files(self):
54        return getUtility(IKofaUtils).MAX_FILES
[9589]55
[4892]56    def __init__(self, *args, **kw):
57        super(DataCenter, self).__init__(*args, **kw)
[7579]58        self.storage = getUtility(IDataCenterConfig)['path']
[4892]59        self._createSubDirs()
[6286]60
[4892]61    def _createSubDirs(self):
62        """Create standard subdirs.
63        """
[15416]64        for name in ['finished', 'graduated', 'unfinished', 'logs', 'deleted']:
[4892]65            path = os.path.join(self.storage, name)
66            if os.path.exists(path):
67                continue
68            os.mkdir(path)
69        return
[6286]70
[8395]71    @property
72    def deleted_path(self):
[15416]73        """Get the path for deleted student data.
[8395]74        """
75        return os.path.join(self.storage, 'deleted')
76
[15416]77    @property
78    def graduated_path(self):
79        """Get the path for deleted graduated student data.
80        """
81        return os.path.join(self.storage, 'graduated')
82
[9023]83    def getPendingFiles(self, sort='name'):
[4146]84        """Get a list of files stored in `storage`.
[4574]85
86        Files are sorted by basename.
[4146]87        """
88        result = []
89        if not os.path.exists(self.storage):
90            return result
[4574]91        for filename in sorted(os.listdir(self.storage)):
[4146]92            fullpath = os.path.join(self.storage, filename)
93            if not os.path.isfile(fullpath):
94                continue
[14679]95            if not (filename.endswith('.csv') or filename.endswith('.gz')):
[9008]96                continue
[4146]97            result.append(DataCenterFile(fullpath))
[4858]98        if sort == 'date':
99            # sort results in newest-first order...
100            result = sorted(result, key=lambda x: x.getTimeStamp(),
101                            reverse=True)
[4146]102        return result
103
[9074]104    def getFinishedFiles(self):
[9023]105        """Get a list of files stored in `finished` subfolder of `storage`.
106
[9074]107        Files are unsorted.
[9023]108        """
109        result = []
110        finished_dir = os.path.join(self.storage, 'finished')
111        if not os.path.exists(finished_dir):
112            return result
[9589]113        mtime = lambda f: os.stat(os.path.join(finished_dir, f)).st_mtime
114        finished_files = [f for f in
115            sorted(os.listdir(finished_dir), key=mtime, reverse=True)
116            if fnmatch.fnmatch(f, '*.finished.csv')]
117        for filename in finished_files[:self.max_files]:
[9023]118            fullpath = os.path.join(finished_dir, filename)
119            if not os.path.isfile(fullpath):
120                continue
121            if not filename.endswith('.csv'):
122                continue
123            result.append(DataCenterFile(fullpath, 'finished'))
124        return result
125
[8516]126    def getLogFiles(self, exclude_backups=True):
[4908]127        """Get the files from logs/ subdir. Files are sorted by name.
[8516]128
[9769]129        By default backup logs ('app.log.1', etc.) and payments.log
130        are excluded.
[4858]131        """
132        result = []
[4908]133        logdir = os.path.join(self.storage, 'logs')
134        if not os.path.exists(logdir):
135            os.mkdir(logdir)
136        for name in sorted(os.listdir(logdir)):
137            if not os.path.isfile(os.path.join(logdir, name)):
[4858]138                continue
[9769]139            if name == 'payments.log':
140                continue
[8516]141            if exclude_backups == True and RE_LOGFILE_BACKUP_NAME.match(name):
142                continue
[4858]143            result.append(
[4908]144                LogFile(os.path.join(self.storage, 'logs', name)))
[4858]145        return result
[6286]146
[4190]147    def setStoragePath(self, path, move=False, overwrite=False):
[4146]148        """Set the path where to store files.
149        """
150        path = os.path.abspath(path)
[4190]151        not_copied = []
[4152]152        if not os.path.exists(path):
153            raise ValueError('The path given does not exist: %s' % path)
[4173]154        if move is True:
[7186]155            not_copied = copy_filesystem_tree(self.storage, path,
[4190]156                                            overwrite=overwrite)
[4146]157        self.storage = path
[4892]158        self._createSubDirs()
[4883]159        grok.notify(DataCenterStorageMovedEvent(self))
[4190]160        return not_copied
[4146]161
[4896]162    def _moveFile(self, source, dest):
163        """Move file source to dest preserving ctime, mtime, etc.
164        """
165        if not os.path.exists(source):
166            self.logger.warn('No such source path: %s' % source)
167            return
168        if source == dest:
169            return
170        shutil.copyfile(source, dest)
171        shutil.copystat(source, dest)
172        os.unlink(source)
[6286]173
[8634]174    def _appendCSVFile(self, source, dest):
175        """Append data from CSV file `source` to data from CSV file `dest`.
176
177        The `source` file is deleted afterwards.
178        """
179        if not os.path.exists(dest):
180            return self._moveFile(source, dest)
181        if not os.path.exists(source):
182            self.logger.warn('No such source path: %s' % source)
183            return
184        if source == dest:
185            return
186        result_path = merge_csv_files(dest, source)
[8905]187        os.chmod(result_path, 0664)
[8634]188        self._moveFile(result_path, dest)
189        os.unlink(source)
190
[4896]191    def distProcessedFiles(self, successful, source_path, finished_file,
[4996]192                           pending_file, mode='create', move_orig=True):
[4896]193        """Put processed files into final locations.
[4858]194
[4896]195        ``successful`` is a boolean that tells, whether processing was
196        successful.
197
198        ``source_path``: path to file that was processed.
199
200        ``finished_file``, ``pending_file``: paths to the respective
201        generated .pending and .finished file. The .pending file path
202        may be ``None``.
203
[4904]204        If finished file is placed in a location outside the local
205        storage dir, the complete directory is removed
[7933]206        afterwards. Regular processors should put their stuff in
[4904]207        dedicated temporary dirs.
[6286]208
[4896]209        See datacenter.txt for more info about how this works.
210        """
211        basename = os.path.basename(source_path)
212        pending_name = basename
213        pending = False
214        finished_dir = os.path.join(self.storage, 'finished')
215        unfinished_dir = os.path.join(self.storage, 'unfinished')
216
217        if basename.endswith('.pending.csv'):
[4996]218            maybe_basename = "%s.csv" % basename.rsplit('.', 3)[0]
[4896]219            maybe_src = os.path.join(unfinished_dir, maybe_basename)
220            if os.path.isfile(maybe_src):
221                basename = maybe_basename
222                pending = True
[6286]223
[4896]224        base, ext = os.path.splitext(basename)
[4996]225        finished_name = "%s.%s.finished%s" % (base, mode, ext)
[4896]226        if not pending:
[4996]227            pending_name = "%s.%s.pending%s" % (base, mode, ext)
[4896]228
229        # Put .pending and .finished file into respective places...
230        pending_dest = os.path.join(self.storage, pending_name)
231        finished_dest = os.path.join(finished_dir, finished_name)
[8634]232        self._appendCSVFile(finished_file, finished_dest)
[4896]233        if pending_file is not None:
234            self._moveFile(pending_file, pending_dest)
235
236        # Put source file into final location...
237        finished_dest = os.path.join(finished_dir, basename)
238        unfinished_dest = os.path.join(unfinished_dir, basename)
239        if successful and not pending:
240            self._moveFile(source_path, finished_dest)
241        elif successful and pending:
242            self._moveFile(unfinished_dest, finished_dest)
243            os.unlink(source_path)
244        elif not successful and not pending:
245            self._moveFile(source_path, unfinished_dest)
[4904]246
247        # If finished and pending-file were created in a location
248        # outside datacenter storage, we remove it.
[4906]249        maybe_temp_dir = os.path.dirname(finished_file)
[4904]250        if os.path.commonprefix(
[4906]251            [self.storage, maybe_temp_dir]) != self.storage:
252            shutil.rmtree(maybe_temp_dir)
[4896]253        return
254
[8512]255    def _logfiles(self, basename):
256        """Get sorted logfiles starting with `basename`.
257        """
258        def numerical_suffix(name):
259            # return numerical suffix in `name` as number or 0.
260            suffix = name.rsplit('.', 1)[-1]
261            try:
262                return int(suffix)
263            except ValueError:
264                return 0
265            pass
266        files = [basename,]
267        for name in os.listdir(os.path.join(self.storage, 'logs')):
[8516]268            if RE_LOGFILE_BACKUP_NAME.match(name):
[8725]269                if name.rsplit('.', 1)[0] == basename:
270                    files.append(name)
[8592]271        return sorted(files, key=numerical_suffix, reverse=True)
[6286]272
[8512]273    def queryLogfiles(self, basename, query=None, limit=0, start=0):
274        """Search `query` in all logfiles starting with `basename`.
275
276        Returns an iterator of those lines in logfiles starting with
277        `basename` that match `query`. If you want the result as a
278        list, simply list() the iterator.
279
280        All logfiles with name `basename` and maybe some numerical
281        extension ('.1', '.2', ...) are searched for the `query` term
[8592]282        in correct chronological order. So, if you ask for a basename 'app.log',
283        then any file named 'app2.log', 'app.log.1', 'app.log',
[8512]284        etc. will be searched in that order.
285
286        The `query` is expected to be a string containing a regular
287        expression.
288
289        If `limit` is set to some numerical value, at most this number
290        of lines is returned.
291
292        With `start` you can give the number of first matched line to
293        return. `start` is zero-based, i.e. the first match has number
294        0, the scond one 1, etc.
295
296        Together with `limit` this allows some basic
297        batching. Please keep in mind that batching might give
298        unpredictable results, when logfiles change between two
299        requests. This is not a problem when only one file is searched
300        and changes include only appending new log messages.
301
302        Matches are found per line only (no multiline matches).
303
[10045]304        Result lines are returned as unicode instances decoded from
305        UTF-8 encoding. This means that logfiles must provide UTF-8
306        encoding for umlauts etc. if these should be rendered
307        properly. The returned unicode lines can be fed to page
308        templates even if they contain non-ASCII characters.
309
[8512]310        This method raises ValueError if some basic condition is not
311        met, for instance if the given query string is not a valid
312        regular expression.
313
314        Please note, that this exception will happen not before you
315        really fetch a result line.
316        """
317        try:
318            re_query = re.compile(query)
319        except:
320            raise ValueError('Invalid query string: %s' % query)
321
322        basename = basename.replace('/../', '')
323        files = self._logfiles(basename)
324
325        # Search the log files
326        num = 0
327        for name in files:
328            path = os.path.join(self.storage, 'logs', name)
329            if not os.path.isfile(path):
330                continue
[10045]331            for line in codecs.open(path, 'rb', 'utf-8'):
[8513]332                if not re_query.search(line):
[8512]333                    continue
334                num += 1
335                if (num - 1) < start:
336                    continue
337                yield line
338
339                if limit and (num - limit >= start):
340                    raise StopIteration
341        pass
342
[4146]343class DataCenterFile(object):
344    """A description of a file stored in data center.
345    """
[4166]346    grok.implements(IDataCenterFile)
[6286]347
[9023]348    def __init__(self, context, folder_name=''):
[4146]349        self.context = context
350        self.name = os.path.basename(self.context)
[9023]351        self.rel_path = os.path.join(folder_name, self.name)
[4146]352        self.size = self.getSize()
353        self.uploaddate = self.getDate()
[4858]354        self.lines = self.getLinesNumber()
[4146]355
356    def getDate(self):
357        """Get a human readable datetime representation.
358        """
[9340]359        date = datetime.fromtimestamp(os.path.getmtime(self.context))
[6827]360        return date.strftime("%Y-%m-%d %H:%M:%S")
[4146]361
[4858]362    def getTimeStamp(self):
363        """Get a (machine readable) timestamp.
364        """
[9340]365        return os.path.getmtime(self.context)
[6286]366
[4146]367    def getSize(self):
368        """Get a human readable file size.
369        """
370        bytesize = os.path.getsize(self.context)
371        size = "%s bytes" % bytesize
372        units = ['kb', 'MB', 'GB']
373        for power, unit in reversed(list(enumerate(units))):
374            power += 1
375            if bytesize >= 1024 ** power:
376                size = "%.2f %s" % (bytesize/(1024.0**power), unit)
377                break
378        return size
379
[4858]380    def getLinesNumber(self):
381        """Get number of lines.
382        """
383        num = 0
[14680]384        if not self.context.title().lower().endswith('csv'):
385            return 1
[4858]386        for line in open(self.context, 'rb'):
387            num += 1
388        return num
[6286]389
[4858]390class LogFile(DataCenterFile):
391    """A description of a log file.
392    """
393    def __init__(self, context):
394        super(LogFile, self).__init__(context)
395        self._markers = dict()
396        self._parsed = False
397        self.userid = self.getUserId()
398        self.mode = self.getMode()
399        self.stats = self.getStats()
400        self.source = self.getSourcePath()
401
402    def _parseFile(self, maxline=10):
403        """Find markers in a file.
404        """
405        if self._parsed:
406            return
[10045]407        for line in codecs.open(self.context, 'rb', 'utf-8'):
[4858]408            line = line.strip()
409            if not ':' in line:
410                continue
411            name, text = line.split(':', 1)
412            self._markers[name.lower()] = text
413        self._parsed = True
414        return
415
416    def _getMarker(self, marker):
417        marker = marker.lower()
418        if not self._parsed:
419            self._parseFile()
420        if marker in self._markers.keys():
421            return self._markers[marker]
[6286]422
[4858]423    def getUserId(self):
424        return self._getMarker('user') or '<UNKNOWN>'
425
426    def getMode(self):
427        return self._getMarker('mode') or '<NOT SET>'
428
429    def getStats(self):
430        return self._getMarker('processed') or '<Info not avail.>'
431
432    def getSourcePath(self):
433        return self._getMarker('source') or None
[4883]434
[4961]435
[4883]436class DataCenterStorageMovedEvent(ObjectEvent):
437    """An event fired, when datacenter storage moves.
438    """
439    grok.implements(IDataCenterStorageMovedEvent)
Note: See TracBrowser for help on using the repository browser.