source: main/waeup.kofa/trunk/src/waeup/kofa/datacenter.py @ 15428

Last change on this file since 15428 was 15416, checked in by Henrik Bettermann, 6 years ago

Backup deleted graduated student data somewhere else to ease graduated student data migration.

  • Property svn:keywords set to Id
File size: 15.5 KB
RevLine 
[7193]1## $Id: datacenter.py 15416 2019-05-21 08:02:51Z henrik $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
[7819]18"""Kofa data center.
[4146]19
20The waeup data center cares for management of upload data and provides
21tools for importing/exporting CSV data.
22"""
[10045]23import codecs
24import fnmatch
25import grok
[4146]26import os
[8512]27import re
[4896]28import shutil
[4146]29from datetime import datetime
[7568]30from zope.component import getUtility
[4883]31from zope.component.interfaces import ObjectEvent
[7811]32from waeup.kofa.interfaces import (IDataCenter, IDataCenterFile,
[7568]33                                   IDataCenterStorageMovedEvent,
[7579]34                                   IDataCenterConfig)
[9217]35from waeup.kofa.utils.batching import ExportJobContainer
[8634]36from waeup.kofa.utils.helpers import copy_filesystem_tree, merge_csv_files
[7811]37from waeup.kofa.utils.logger import Logger
[4146]38
[8512]39#: Regular expression describing a logfile name with backup extension
[8516]40RE_LOGFILE_BACKUP_NAME = re.compile('^.+\.\d+$')
[8512]41
[9217]42class DataCenter(grok.Container, Logger, ExportJobContainer):
[4146]43    """A data center contains CSV files.
44    """
[4669]45    grok.implements(IDataCenter)
[4166]46
[7811]47    logger_name = 'waeup.kofa.${sitename}.datacenter'
[6578]48    logger_filename = 'datacenter.log'
[4873]49
[9589]50    max_files = 100
51
[4892]52    def __init__(self, *args, **kw):
53        super(DataCenter, self).__init__(*args, **kw)
[7579]54        self.storage = getUtility(IDataCenterConfig)['path']
[4892]55        self._createSubDirs()
[6286]56
[4892]57    def _createSubDirs(self):
58        """Create standard subdirs.
59        """
[15416]60        for name in ['finished', 'graduated', 'unfinished', 'logs', 'deleted']:
[4892]61            path = os.path.join(self.storage, name)
62            if os.path.exists(path):
63                continue
64            os.mkdir(path)
65        return
[6286]66
[8395]67    @property
68    def deleted_path(self):
[15416]69        """Get the path for deleted student data.
[8395]70        """
71        return os.path.join(self.storage, 'deleted')
72
[15416]73    @property
74    def graduated_path(self):
75        """Get the path for deleted graduated student data.
76        """
77        return os.path.join(self.storage, 'graduated')
78
[9023]79    def getPendingFiles(self, sort='name'):
[4146]80        """Get a list of files stored in `storage`.
[4574]81
82        Files are sorted by basename.
[4146]83        """
84        result = []
85        if not os.path.exists(self.storage):
86            return result
[4574]87        for filename in sorted(os.listdir(self.storage)):
[4146]88            fullpath = os.path.join(self.storage, filename)
89            if not os.path.isfile(fullpath):
90                continue
[14679]91            if not (filename.endswith('.csv') or filename.endswith('.gz')):
[9008]92                continue
[4146]93            result.append(DataCenterFile(fullpath))
[4858]94        if sort == 'date':
95            # sort results in newest-first order...
96            result = sorted(result, key=lambda x: x.getTimeStamp(),
97                            reverse=True)
[4146]98        return result
99
[9074]100    def getFinishedFiles(self):
[9023]101        """Get a list of files stored in `finished` subfolder of `storage`.
102
[9074]103        Files are unsorted.
[9023]104        """
105        result = []
106        finished_dir = os.path.join(self.storage, 'finished')
107        if not os.path.exists(finished_dir):
108            return result
[9589]109        mtime = lambda f: os.stat(os.path.join(finished_dir, f)).st_mtime
110        finished_files = [f for f in
111            sorted(os.listdir(finished_dir), key=mtime, reverse=True)
112            if fnmatch.fnmatch(f, '*.finished.csv')]
113        for filename in finished_files[:self.max_files]:
[9023]114            fullpath = os.path.join(finished_dir, filename)
115            if not os.path.isfile(fullpath):
116                continue
117            if not filename.endswith('.csv'):
118                continue
119            result.append(DataCenterFile(fullpath, 'finished'))
120        return result
121
[8516]122    def getLogFiles(self, exclude_backups=True):
[4908]123        """Get the files from logs/ subdir. Files are sorted by name.
[8516]124
[9769]125        By default backup logs ('app.log.1', etc.) and payments.log
126        are excluded.
[4858]127        """
128        result = []
[4908]129        logdir = os.path.join(self.storage, 'logs')
130        if not os.path.exists(logdir):
131            os.mkdir(logdir)
132        for name in sorted(os.listdir(logdir)):
133            if not os.path.isfile(os.path.join(logdir, name)):
[4858]134                continue
[9769]135            if name == 'payments.log':
136                continue
[8516]137            if exclude_backups == True and RE_LOGFILE_BACKUP_NAME.match(name):
138                continue
[4858]139            result.append(
[4908]140                LogFile(os.path.join(self.storage, 'logs', name)))
[4858]141        return result
[6286]142
[4190]143    def setStoragePath(self, path, move=False, overwrite=False):
[4146]144        """Set the path where to store files.
145        """
146        path = os.path.abspath(path)
[4190]147        not_copied = []
[4152]148        if not os.path.exists(path):
149            raise ValueError('The path given does not exist: %s' % path)
[4173]150        if move is True:
[7186]151            not_copied = copy_filesystem_tree(self.storage, path,
[4190]152                                            overwrite=overwrite)
[4146]153        self.storage = path
[4892]154        self._createSubDirs()
[4883]155        grok.notify(DataCenterStorageMovedEvent(self))
[4190]156        return not_copied
[4146]157
[4896]158    def _moveFile(self, source, dest):
159        """Move file source to dest preserving ctime, mtime, etc.
160        """
161        if not os.path.exists(source):
162            self.logger.warn('No such source path: %s' % source)
163            return
164        if source == dest:
165            return
166        shutil.copyfile(source, dest)
167        shutil.copystat(source, dest)
168        os.unlink(source)
[6286]169
[8634]170    def _appendCSVFile(self, source, dest):
171        """Append data from CSV file `source` to data from CSV file `dest`.
172
173        The `source` file is deleted afterwards.
174        """
175        if not os.path.exists(dest):
176            return self._moveFile(source, dest)
177        if not os.path.exists(source):
178            self.logger.warn('No such source path: %s' % source)
179            return
180        if source == dest:
181            return
182        result_path = merge_csv_files(dest, source)
[8905]183        os.chmod(result_path, 0664)
[8634]184        self._moveFile(result_path, dest)
185        os.unlink(source)
186
[4896]187    def distProcessedFiles(self, successful, source_path, finished_file,
[4996]188                           pending_file, mode='create', move_orig=True):
[4896]189        """Put processed files into final locations.
[4858]190
[4896]191        ``successful`` is a boolean that tells, whether processing was
192        successful.
193
194        ``source_path``: path to file that was processed.
195
196        ``finished_file``, ``pending_file``: paths to the respective
197        generated .pending and .finished file. The .pending file path
198        may be ``None``.
199
[4904]200        If finished file is placed in a location outside the local
201        storage dir, the complete directory is removed
[7933]202        afterwards. Regular processors should put their stuff in
[4904]203        dedicated temporary dirs.
[6286]204
[4896]205        See datacenter.txt for more info about how this works.
206        """
207        basename = os.path.basename(source_path)
208        pending_name = basename
209        pending = False
210        finished_dir = os.path.join(self.storage, 'finished')
211        unfinished_dir = os.path.join(self.storage, 'unfinished')
212
213        if basename.endswith('.pending.csv'):
[4996]214            maybe_basename = "%s.csv" % basename.rsplit('.', 3)[0]
[4896]215            maybe_src = os.path.join(unfinished_dir, maybe_basename)
216            if os.path.isfile(maybe_src):
217                basename = maybe_basename
218                pending = True
[6286]219
[4896]220        base, ext = os.path.splitext(basename)
[4996]221        finished_name = "%s.%s.finished%s" % (base, mode, ext)
[4896]222        if not pending:
[4996]223            pending_name = "%s.%s.pending%s" % (base, mode, ext)
[4896]224
225        # Put .pending and .finished file into respective places...
226        pending_dest = os.path.join(self.storage, pending_name)
227        finished_dest = os.path.join(finished_dir, finished_name)
[8634]228        self._appendCSVFile(finished_file, finished_dest)
[4896]229        if pending_file is not None:
230            self._moveFile(pending_file, pending_dest)
231
232        # Put source file into final location...
233        finished_dest = os.path.join(finished_dir, basename)
234        unfinished_dest = os.path.join(unfinished_dir, basename)
235        if successful and not pending:
236            self._moveFile(source_path, finished_dest)
237        elif successful and pending:
238            self._moveFile(unfinished_dest, finished_dest)
239            os.unlink(source_path)
240        elif not successful and not pending:
241            self._moveFile(source_path, unfinished_dest)
[4904]242
243        # If finished and pending-file were created in a location
244        # outside datacenter storage, we remove it.
[4906]245        maybe_temp_dir = os.path.dirname(finished_file)
[4904]246        if os.path.commonprefix(
[4906]247            [self.storage, maybe_temp_dir]) != self.storage:
248            shutil.rmtree(maybe_temp_dir)
[4896]249        return
250
[8512]251    def _logfiles(self, basename):
252        """Get sorted logfiles starting with `basename`.
253        """
254        def numerical_suffix(name):
255            # return numerical suffix in `name` as number or 0.
256            suffix = name.rsplit('.', 1)[-1]
257            try:
258                return int(suffix)
259            except ValueError:
260                return 0
261            pass
262        files = [basename,]
263        for name in os.listdir(os.path.join(self.storage, 'logs')):
[8516]264            if RE_LOGFILE_BACKUP_NAME.match(name):
[8725]265                if name.rsplit('.', 1)[0] == basename:
266                    files.append(name)
[8592]267        return sorted(files, key=numerical_suffix, reverse=True)
[6286]268
[8512]269    def queryLogfiles(self, basename, query=None, limit=0, start=0):
270        """Search `query` in all logfiles starting with `basename`.
271
272        Returns an iterator of those lines in logfiles starting with
273        `basename` that match `query`. If you want the result as a
274        list, simply list() the iterator.
275
276        All logfiles with name `basename` and maybe some numerical
277        extension ('.1', '.2', ...) are searched for the `query` term
[8592]278        in correct chronological order. So, if you ask for a basename 'app.log',
279        then any file named 'app2.log', 'app.log.1', 'app.log',
[8512]280        etc. will be searched in that order.
281
282        The `query` is expected to be a string containing a regular
283        expression.
284
285        If `limit` is set to some numerical value, at most this number
286        of lines is returned.
287
288        With `start` you can give the number of first matched line to
289        return. `start` is zero-based, i.e. the first match has number
290        0, the scond one 1, etc.
291
292        Together with `limit` this allows some basic
293        batching. Please keep in mind that batching might give
294        unpredictable results, when logfiles change between two
295        requests. This is not a problem when only one file is searched
296        and changes include only appending new log messages.
297
298        Matches are found per line only (no multiline matches).
299
[10045]300        Result lines are returned as unicode instances decoded from
301        UTF-8 encoding. This means that logfiles must provide UTF-8
302        encoding for umlauts etc. if these should be rendered
303        properly. The returned unicode lines can be fed to page
304        templates even if they contain non-ASCII characters.
305
[8512]306        This method raises ValueError if some basic condition is not
307        met, for instance if the given query string is not a valid
308        regular expression.
309
310        Please note, that this exception will happen not before you
311        really fetch a result line.
312        """
313        try:
314            re_query = re.compile(query)
315        except:
316            raise ValueError('Invalid query string: %s' % query)
317
318        basename = basename.replace('/../', '')
319        files = self._logfiles(basename)
320
321        # Search the log files
322        num = 0
323        for name in files:
324            path = os.path.join(self.storage, 'logs', name)
325            if not os.path.isfile(path):
326                continue
[10045]327            for line in codecs.open(path, 'rb', 'utf-8'):
[8513]328                if not re_query.search(line):
[8512]329                    continue
330                num += 1
331                if (num - 1) < start:
332                    continue
333                yield line
334
335                if limit and (num - limit >= start):
336                    raise StopIteration
337        pass
338
[4146]339class DataCenterFile(object):
340    """A description of a file stored in data center.
341    """
[4166]342    grok.implements(IDataCenterFile)
[6286]343
[9023]344    def __init__(self, context, folder_name=''):
[4146]345        self.context = context
346        self.name = os.path.basename(self.context)
[9023]347        self.rel_path = os.path.join(folder_name, self.name)
[4146]348        self.size = self.getSize()
349        self.uploaddate = self.getDate()
[4858]350        self.lines = self.getLinesNumber()
[4146]351
352    def getDate(self):
353        """Get a human readable datetime representation.
354        """
[9340]355        date = datetime.fromtimestamp(os.path.getmtime(self.context))
[6827]356        return date.strftime("%Y-%m-%d %H:%M:%S")
[4146]357
[4858]358    def getTimeStamp(self):
359        """Get a (machine readable) timestamp.
360        """
[9340]361        return os.path.getmtime(self.context)
[6286]362
[4146]363    def getSize(self):
364        """Get a human readable file size.
365        """
366        bytesize = os.path.getsize(self.context)
367        size = "%s bytes" % bytesize
368        units = ['kb', 'MB', 'GB']
369        for power, unit in reversed(list(enumerate(units))):
370            power += 1
371            if bytesize >= 1024 ** power:
372                size = "%.2f %s" % (bytesize/(1024.0**power), unit)
373                break
374        return size
375
[4858]376    def getLinesNumber(self):
377        """Get number of lines.
378        """
379        num = 0
[14680]380        if not self.context.title().lower().endswith('csv'):
381            return 1
[4858]382        for line in open(self.context, 'rb'):
383            num += 1
384        return num
[6286]385
[4858]386class LogFile(DataCenterFile):
387    """A description of a log file.
388    """
389    def __init__(self, context):
390        super(LogFile, self).__init__(context)
391        self._markers = dict()
392        self._parsed = False
393        self.userid = self.getUserId()
394        self.mode = self.getMode()
395        self.stats = self.getStats()
396        self.source = self.getSourcePath()
397
398    def _parseFile(self, maxline=10):
399        """Find markers in a file.
400        """
401        if self._parsed:
402            return
[10045]403        for line in codecs.open(self.context, 'rb', 'utf-8'):
[4858]404            line = line.strip()
405            if not ':' in line:
406                continue
407            name, text = line.split(':', 1)
408            self._markers[name.lower()] = text
409        self._parsed = True
410        return
411
412    def _getMarker(self, marker):
413        marker = marker.lower()
414        if not self._parsed:
415            self._parseFile()
416        if marker in self._markers.keys():
417            return self._markers[marker]
[6286]418
[4858]419    def getUserId(self):
420        return self._getMarker('user') or '<UNKNOWN>'
421
422    def getMode(self):
423        return self._getMarker('mode') or '<NOT SET>'
424
425    def getStats(self):
426        return self._getMarker('processed') or '<Info not avail.>'
427
428    def getSourcePath(self):
429        return self._getMarker('source') or None
[4883]430
[4961]431
[4883]432class DataCenterStorageMovedEvent(ObjectEvent):
433    """An event fired, when datacenter storage moves.
434    """
435    grok.implements(IDataCenterStorageMovedEvent)
Note: See TracBrowser for help on using the repository browser.