source: main/waeup.kofa/trunk/src/waeup/kofa/datacenter.py @ 15422

Last change on this file since 15422 was 15416, checked in by Henrik Bettermann, 5 years ago

Backup deleted graduated student data somewhere else to ease graduated student data migration.

  • Property svn:keywords set to Id
File size: 15.5 KB
Line 
1## $Id: datacenter.py 15416 2019-05-21 08:02:51Z henrik $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
18"""Kofa data center.
19
20The waeup data center cares for management of upload data and provides
21tools for importing/exporting CSV data.
22"""
23import codecs
24import fnmatch
25import grok
26import os
27import re
28import shutil
29from datetime import datetime
30from zope.component import getUtility
31from zope.component.interfaces import ObjectEvent
32from waeup.kofa.interfaces import (IDataCenter, IDataCenterFile,
33                                   IDataCenterStorageMovedEvent,
34                                   IDataCenterConfig)
35from waeup.kofa.utils.batching import ExportJobContainer
36from waeup.kofa.utils.helpers import copy_filesystem_tree, merge_csv_files
37from waeup.kofa.utils.logger import Logger
38
39#: Regular expression describing a logfile name with backup extension
40RE_LOGFILE_BACKUP_NAME = re.compile('^.+\.\d+$')
41
42class DataCenter(grok.Container, Logger, ExportJobContainer):
43    """A data center contains CSV files.
44    """
45    grok.implements(IDataCenter)
46
47    logger_name = 'waeup.kofa.${sitename}.datacenter'
48    logger_filename = 'datacenter.log'
49
50    max_files = 100
51
52    def __init__(self, *args, **kw):
53        super(DataCenter, self).__init__(*args, **kw)
54        self.storage = getUtility(IDataCenterConfig)['path']
55        self._createSubDirs()
56
57    def _createSubDirs(self):
58        """Create standard subdirs.
59        """
60        for name in ['finished', 'graduated', 'unfinished', 'logs', 'deleted']:
61            path = os.path.join(self.storage, name)
62            if os.path.exists(path):
63                continue
64            os.mkdir(path)
65        return
66
67    @property
68    def deleted_path(self):
69        """Get the path for deleted student data.
70        """
71        return os.path.join(self.storage, 'deleted')
72
73    @property
74    def graduated_path(self):
75        """Get the path for deleted graduated student data.
76        """
77        return os.path.join(self.storage, 'graduated')
78
79    def getPendingFiles(self, sort='name'):
80        """Get a list of files stored in `storage`.
81
82        Files are sorted by basename.
83        """
84        result = []
85        if not os.path.exists(self.storage):
86            return result
87        for filename in sorted(os.listdir(self.storage)):
88            fullpath = os.path.join(self.storage, filename)
89            if not os.path.isfile(fullpath):
90                continue
91            if not (filename.endswith('.csv') or filename.endswith('.gz')):
92                continue
93            result.append(DataCenterFile(fullpath))
94        if sort == 'date':
95            # sort results in newest-first order...
96            result = sorted(result, key=lambda x: x.getTimeStamp(),
97                            reverse=True)
98        return result
99
100    def getFinishedFiles(self):
101        """Get a list of files stored in `finished` subfolder of `storage`.
102
103        Files are unsorted.
104        """
105        result = []
106        finished_dir = os.path.join(self.storage, 'finished')
107        if not os.path.exists(finished_dir):
108            return result
109        mtime = lambda f: os.stat(os.path.join(finished_dir, f)).st_mtime
110        finished_files = [f for f in
111            sorted(os.listdir(finished_dir), key=mtime, reverse=True)
112            if fnmatch.fnmatch(f, '*.finished.csv')]
113        for filename in finished_files[:self.max_files]:
114            fullpath = os.path.join(finished_dir, filename)
115            if not os.path.isfile(fullpath):
116                continue
117            if not filename.endswith('.csv'):
118                continue
119            result.append(DataCenterFile(fullpath, 'finished'))
120        return result
121
122    def getLogFiles(self, exclude_backups=True):
123        """Get the files from logs/ subdir. Files are sorted by name.
124
125        By default backup logs ('app.log.1', etc.) and payments.log
126        are excluded.
127        """
128        result = []
129        logdir = os.path.join(self.storage, 'logs')
130        if not os.path.exists(logdir):
131            os.mkdir(logdir)
132        for name in sorted(os.listdir(logdir)):
133            if not os.path.isfile(os.path.join(logdir, name)):
134                continue
135            if name == 'payments.log':
136                continue
137            if exclude_backups == True and RE_LOGFILE_BACKUP_NAME.match(name):
138                continue
139            result.append(
140                LogFile(os.path.join(self.storage, 'logs', name)))
141        return result
142
143    def setStoragePath(self, path, move=False, overwrite=False):
144        """Set the path where to store files.
145        """
146        path = os.path.abspath(path)
147        not_copied = []
148        if not os.path.exists(path):
149            raise ValueError('The path given does not exist: %s' % path)
150        if move is True:
151            not_copied = copy_filesystem_tree(self.storage, path,
152                                            overwrite=overwrite)
153        self.storage = path
154        self._createSubDirs()
155        grok.notify(DataCenterStorageMovedEvent(self))
156        return not_copied
157
158    def _moveFile(self, source, dest):
159        """Move file source to dest preserving ctime, mtime, etc.
160        """
161        if not os.path.exists(source):
162            self.logger.warn('No such source path: %s' % source)
163            return
164        if source == dest:
165            return
166        shutil.copyfile(source, dest)
167        shutil.copystat(source, dest)
168        os.unlink(source)
169
170    def _appendCSVFile(self, source, dest):
171        """Append data from CSV file `source` to data from CSV file `dest`.
172
173        The `source` file is deleted afterwards.
174        """
175        if not os.path.exists(dest):
176            return self._moveFile(source, dest)
177        if not os.path.exists(source):
178            self.logger.warn('No such source path: %s' % source)
179            return
180        if source == dest:
181            return
182        result_path = merge_csv_files(dest, source)
183        os.chmod(result_path, 0664)
184        self._moveFile(result_path, dest)
185        os.unlink(source)
186
187    def distProcessedFiles(self, successful, source_path, finished_file,
188                           pending_file, mode='create', move_orig=True):
189        """Put processed files into final locations.
190
191        ``successful`` is a boolean that tells, whether processing was
192        successful.
193
194        ``source_path``: path to file that was processed.
195
196        ``finished_file``, ``pending_file``: paths to the respective
197        generated .pending and .finished file. The .pending file path
198        may be ``None``.
199
200        If finished file is placed in a location outside the local
201        storage dir, the complete directory is removed
202        afterwards. Regular processors should put their stuff in
203        dedicated temporary dirs.
204
205        See datacenter.txt for more info about how this works.
206        """
207        basename = os.path.basename(source_path)
208        pending_name = basename
209        pending = False
210        finished_dir = os.path.join(self.storage, 'finished')
211        unfinished_dir = os.path.join(self.storage, 'unfinished')
212
213        if basename.endswith('.pending.csv'):
214            maybe_basename = "%s.csv" % basename.rsplit('.', 3)[0]
215            maybe_src = os.path.join(unfinished_dir, maybe_basename)
216            if os.path.isfile(maybe_src):
217                basename = maybe_basename
218                pending = True
219
220        base, ext = os.path.splitext(basename)
221        finished_name = "%s.%s.finished%s" % (base, mode, ext)
222        if not pending:
223            pending_name = "%s.%s.pending%s" % (base, mode, ext)
224
225        # Put .pending and .finished file into respective places...
226        pending_dest = os.path.join(self.storage, pending_name)
227        finished_dest = os.path.join(finished_dir, finished_name)
228        self._appendCSVFile(finished_file, finished_dest)
229        if pending_file is not None:
230            self._moveFile(pending_file, pending_dest)
231
232        # Put source file into final location...
233        finished_dest = os.path.join(finished_dir, basename)
234        unfinished_dest = os.path.join(unfinished_dir, basename)
235        if successful and not pending:
236            self._moveFile(source_path, finished_dest)
237        elif successful and pending:
238            self._moveFile(unfinished_dest, finished_dest)
239            os.unlink(source_path)
240        elif not successful and not pending:
241            self._moveFile(source_path, unfinished_dest)
242
243        # If finished and pending-file were created in a location
244        # outside datacenter storage, we remove it.
245        maybe_temp_dir = os.path.dirname(finished_file)
246        if os.path.commonprefix(
247            [self.storage, maybe_temp_dir]) != self.storage:
248            shutil.rmtree(maybe_temp_dir)
249        return
250
251    def _logfiles(self, basename):
252        """Get sorted logfiles starting with `basename`.
253        """
254        def numerical_suffix(name):
255            # return numerical suffix in `name` as number or 0.
256            suffix = name.rsplit('.', 1)[-1]
257            try:
258                return int(suffix)
259            except ValueError:
260                return 0
261            pass
262        files = [basename,]
263        for name in os.listdir(os.path.join(self.storage, 'logs')):
264            if RE_LOGFILE_BACKUP_NAME.match(name):
265                if name.rsplit('.', 1)[0] == basename:
266                    files.append(name)
267        return sorted(files, key=numerical_suffix, reverse=True)
268
269    def queryLogfiles(self, basename, query=None, limit=0, start=0):
270        """Search `query` in all logfiles starting with `basename`.
271
272        Returns an iterator of those lines in logfiles starting with
273        `basename` that match `query`. If you want the result as a
274        list, simply list() the iterator.
275
276        All logfiles with name `basename` and maybe some numerical
277        extension ('.1', '.2', ...) are searched for the `query` term
278        in correct chronological order. So, if you ask for a basename 'app.log',
279        then any file named 'app2.log', 'app.log.1', 'app.log',
280        etc. will be searched in that order.
281
282        The `query` is expected to be a string containing a regular
283        expression.
284
285        If `limit` is set to some numerical value, at most this number
286        of lines is returned.
287
288        With `start` you can give the number of first matched line to
289        return. `start` is zero-based, i.e. the first match has number
290        0, the scond one 1, etc.
291
292        Together with `limit` this allows some basic
293        batching. Please keep in mind that batching might give
294        unpredictable results, when logfiles change between two
295        requests. This is not a problem when only one file is searched
296        and changes include only appending new log messages.
297
298        Matches are found per line only (no multiline matches).
299
300        Result lines are returned as unicode instances decoded from
301        UTF-8 encoding. This means that logfiles must provide UTF-8
302        encoding for umlauts etc. if these should be rendered
303        properly. The returned unicode lines can be fed to page
304        templates even if they contain non-ASCII characters.
305
306        This method raises ValueError if some basic condition is not
307        met, for instance if the given query string is not a valid
308        regular expression.
309
310        Please note, that this exception will happen not before you
311        really fetch a result line.
312        """
313        try:
314            re_query = re.compile(query)
315        except:
316            raise ValueError('Invalid query string: %s' % query)
317
318        basename = basename.replace('/../', '')
319        files = self._logfiles(basename)
320
321        # Search the log files
322        num = 0
323        for name in files:
324            path = os.path.join(self.storage, 'logs', name)
325            if not os.path.isfile(path):
326                continue
327            for line in codecs.open(path, 'rb', 'utf-8'):
328                if not re_query.search(line):
329                    continue
330                num += 1
331                if (num - 1) < start:
332                    continue
333                yield line
334
335                if limit and (num - limit >= start):
336                    raise StopIteration
337        pass
338
339class DataCenterFile(object):
340    """A description of a file stored in data center.
341    """
342    grok.implements(IDataCenterFile)
343
344    def __init__(self, context, folder_name=''):
345        self.context = context
346        self.name = os.path.basename(self.context)
347        self.rel_path = os.path.join(folder_name, self.name)
348        self.size = self.getSize()
349        self.uploaddate = self.getDate()
350        self.lines = self.getLinesNumber()
351
352    def getDate(self):
353        """Get a human readable datetime representation.
354        """
355        date = datetime.fromtimestamp(os.path.getmtime(self.context))
356        return date.strftime("%Y-%m-%d %H:%M:%S")
357
358    def getTimeStamp(self):
359        """Get a (machine readable) timestamp.
360        """
361        return os.path.getmtime(self.context)
362
363    def getSize(self):
364        """Get a human readable file size.
365        """
366        bytesize = os.path.getsize(self.context)
367        size = "%s bytes" % bytesize
368        units = ['kb', 'MB', 'GB']
369        for power, unit in reversed(list(enumerate(units))):
370            power += 1
371            if bytesize >= 1024 ** power:
372                size = "%.2f %s" % (bytesize/(1024.0**power), unit)
373                break
374        return size
375
376    def getLinesNumber(self):
377        """Get number of lines.
378        """
379        num = 0
380        if not self.context.title().lower().endswith('csv'):
381            return 1
382        for line in open(self.context, 'rb'):
383            num += 1
384        return num
385
386class LogFile(DataCenterFile):
387    """A description of a log file.
388    """
389    def __init__(self, context):
390        super(LogFile, self).__init__(context)
391        self._markers = dict()
392        self._parsed = False
393        self.userid = self.getUserId()
394        self.mode = self.getMode()
395        self.stats = self.getStats()
396        self.source = self.getSourcePath()
397
398    def _parseFile(self, maxline=10):
399        """Find markers in a file.
400        """
401        if self._parsed:
402            return
403        for line in codecs.open(self.context, 'rb', 'utf-8'):
404            line = line.strip()
405            if not ':' in line:
406                continue
407            name, text = line.split(':', 1)
408            self._markers[name.lower()] = text
409        self._parsed = True
410        return
411
412    def _getMarker(self, marker):
413        marker = marker.lower()
414        if not self._parsed:
415            self._parseFile()
416        if marker in self._markers.keys():
417            return self._markers[marker]
418
419    def getUserId(self):
420        return self._getMarker('user') or '<UNKNOWN>'
421
422    def getMode(self):
423        return self._getMarker('mode') or '<NOT SET>'
424
425    def getStats(self):
426        return self._getMarker('processed') or '<Info not avail.>'
427
428    def getSourcePath(self):
429        return self._getMarker('source') or None
430
431
432class DataCenterStorageMovedEvent(ObjectEvent):
433    """An event fired, when datacenter storage moves.
434    """
435    grok.implements(IDataCenterStorageMovedEvent)
Note: See TracBrowser for help on using the repository browser.