source: main/waeup.kofa/trunk/src/waeup/kofa/datacenter.py @ 17985

Last change on this file since 17985 was 15430, checked in by Henrik Bettermann, 6 years ago

Make maximum number of files listed in finished subfolder customizable.

  • Property svn:keywords set to Id
File size: 15.6 KB
Line 
1## $Id: datacenter.py 15430 2019-05-28 06:14:44Z henrik $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
18"""Kofa data center.
19
20The waeup data center cares for management of upload data and provides
21tools for importing/exporting CSV data.
22"""
23import codecs
24import fnmatch
25import grok
26import os
27import re
28import shutil
29from datetime import datetime
30from zope.component import getUtility
31from zope.component.interfaces import ObjectEvent
32from waeup.kofa.interfaces import (IDataCenter, IDataCenterFile,
33                                   IDataCenterStorageMovedEvent,
34                                   IDataCenterConfig,
35                                   IKofaUtils)
36from waeup.kofa.utils.batching import ExportJobContainer
37from waeup.kofa.utils.helpers import copy_filesystem_tree, merge_csv_files
38from waeup.kofa.utils.logger import Logger
39
40
41#: Regular expression describing a logfile name with backup extension
42RE_LOGFILE_BACKUP_NAME = re.compile('^.+\.\d+$')
43
44class DataCenter(grok.Container, Logger, ExportJobContainer):
45    """A data center contains CSV files.
46    """
47    grok.implements(IDataCenter)
48
49    logger_name = 'waeup.kofa.${sitename}.datacenter'
50    logger_filename = 'datacenter.log'
51
52    @property
53    def max_files(self):
54        return getUtility(IKofaUtils).MAX_FILES
55
56    def __init__(self, *args, **kw):
57        super(DataCenter, self).__init__(*args, **kw)
58        self.storage = getUtility(IDataCenterConfig)['path']
59        self._createSubDirs()
60
61    def _createSubDirs(self):
62        """Create standard subdirs.
63        """
64        for name in ['finished', 'graduated', 'unfinished', 'logs', 'deleted']:
65            path = os.path.join(self.storage, name)
66            if os.path.exists(path):
67                continue
68            os.mkdir(path)
69        return
70
71    @property
72    def deleted_path(self):
73        """Get the path for deleted student data.
74        """
75        return os.path.join(self.storage, 'deleted')
76
77    @property
78    def graduated_path(self):
79        """Get the path for deleted graduated student data.
80        """
81        return os.path.join(self.storage, 'graduated')
82
83    def getPendingFiles(self, sort='name'):
84        """Get a list of files stored in `storage`.
85
86        Files are sorted by basename.
87        """
88        result = []
89        if not os.path.exists(self.storage):
90            return result
91        for filename in sorted(os.listdir(self.storage)):
92            fullpath = os.path.join(self.storage, filename)
93            if not os.path.isfile(fullpath):
94                continue
95            if not (filename.endswith('.csv') or filename.endswith('.gz')):
96                continue
97            result.append(DataCenterFile(fullpath))
98        if sort == 'date':
99            # sort results in newest-first order...
100            result = sorted(result, key=lambda x: x.getTimeStamp(),
101                            reverse=True)
102        return result
103
104    def getFinishedFiles(self):
105        """Get a list of files stored in `finished` subfolder of `storage`.
106
107        Files are unsorted.
108        """
109        result = []
110        finished_dir = os.path.join(self.storage, 'finished')
111        if not os.path.exists(finished_dir):
112            return result
113        mtime = lambda f: os.stat(os.path.join(finished_dir, f)).st_mtime
114        finished_files = [f for f in
115            sorted(os.listdir(finished_dir), key=mtime, reverse=True)
116            if fnmatch.fnmatch(f, '*.finished.csv')]
117        for filename in finished_files[:self.max_files]:
118            fullpath = os.path.join(finished_dir, filename)
119            if not os.path.isfile(fullpath):
120                continue
121            if not filename.endswith('.csv'):
122                continue
123            result.append(DataCenterFile(fullpath, 'finished'))
124        return result
125
126    def getLogFiles(self, exclude_backups=True):
127        """Get the files from logs/ subdir. Files are sorted by name.
128
129        By default backup logs ('app.log.1', etc.) and payments.log
130        are excluded.
131        """
132        result = []
133        logdir = os.path.join(self.storage, 'logs')
134        if not os.path.exists(logdir):
135            os.mkdir(logdir)
136        for name in sorted(os.listdir(logdir)):
137            if not os.path.isfile(os.path.join(logdir, name)):
138                continue
139            if name == 'payments.log':
140                continue
141            if exclude_backups == True and RE_LOGFILE_BACKUP_NAME.match(name):
142                continue
143            result.append(
144                LogFile(os.path.join(self.storage, 'logs', name)))
145        return result
146
147    def setStoragePath(self, path, move=False, overwrite=False):
148        """Set the path where to store files.
149        """
150        path = os.path.abspath(path)
151        not_copied = []
152        if not os.path.exists(path):
153            raise ValueError('The path given does not exist: %s' % path)
154        if move is True:
155            not_copied = copy_filesystem_tree(self.storage, path,
156                                            overwrite=overwrite)
157        self.storage = path
158        self._createSubDirs()
159        grok.notify(DataCenterStorageMovedEvent(self))
160        return not_copied
161
162    def _moveFile(self, source, dest):
163        """Move file source to dest preserving ctime, mtime, etc.
164        """
165        if not os.path.exists(source):
166            self.logger.warn('No such source path: %s' % source)
167            return
168        if source == dest:
169            return
170        shutil.copyfile(source, dest)
171        shutil.copystat(source, dest)
172        os.unlink(source)
173
174    def _appendCSVFile(self, source, dest):
175        """Append data from CSV file `source` to data from CSV file `dest`.
176
177        The `source` file is deleted afterwards.
178        """
179        if not os.path.exists(dest):
180            return self._moveFile(source, dest)
181        if not os.path.exists(source):
182            self.logger.warn('No such source path: %s' % source)
183            return
184        if source == dest:
185            return
186        result_path = merge_csv_files(dest, source)
187        os.chmod(result_path, 0664)
188        self._moveFile(result_path, dest)
189        os.unlink(source)
190
191    def distProcessedFiles(self, successful, source_path, finished_file,
192                           pending_file, mode='create', move_orig=True):
193        """Put processed files into final locations.
194
195        ``successful`` is a boolean that tells, whether processing was
196        successful.
197
198        ``source_path``: path to file that was processed.
199
200        ``finished_file``, ``pending_file``: paths to the respective
201        generated .pending and .finished file. The .pending file path
202        may be ``None``.
203
204        If finished file is placed in a location outside the local
205        storage dir, the complete directory is removed
206        afterwards. Regular processors should put their stuff in
207        dedicated temporary dirs.
208
209        See datacenter.txt for more info about how this works.
210        """
211        basename = os.path.basename(source_path)
212        pending_name = basename
213        pending = False
214        finished_dir = os.path.join(self.storage, 'finished')
215        unfinished_dir = os.path.join(self.storage, 'unfinished')
216
217        if basename.endswith('.pending.csv'):
218            maybe_basename = "%s.csv" % basename.rsplit('.', 3)[0]
219            maybe_src = os.path.join(unfinished_dir, maybe_basename)
220            if os.path.isfile(maybe_src):
221                basename = maybe_basename
222                pending = True
223
224        base, ext = os.path.splitext(basename)
225        finished_name = "%s.%s.finished%s" % (base, mode, ext)
226        if not pending:
227            pending_name = "%s.%s.pending%s" % (base, mode, ext)
228
229        # Put .pending and .finished file into respective places...
230        pending_dest = os.path.join(self.storage, pending_name)
231        finished_dest = os.path.join(finished_dir, finished_name)
232        self._appendCSVFile(finished_file, finished_dest)
233        if pending_file is not None:
234            self._moveFile(pending_file, pending_dest)
235
236        # Put source file into final location...
237        finished_dest = os.path.join(finished_dir, basename)
238        unfinished_dest = os.path.join(unfinished_dir, basename)
239        if successful and not pending:
240            self._moveFile(source_path, finished_dest)
241        elif successful and pending:
242            self._moveFile(unfinished_dest, finished_dest)
243            os.unlink(source_path)
244        elif not successful and not pending:
245            self._moveFile(source_path, unfinished_dest)
246
247        # If finished and pending-file were created in a location
248        # outside datacenter storage, we remove it.
249        maybe_temp_dir = os.path.dirname(finished_file)
250        if os.path.commonprefix(
251            [self.storage, maybe_temp_dir]) != self.storage:
252            shutil.rmtree(maybe_temp_dir)
253        return
254
255    def _logfiles(self, basename):
256        """Get sorted logfiles starting with `basename`.
257        """
258        def numerical_suffix(name):
259            # return numerical suffix in `name` as number or 0.
260            suffix = name.rsplit('.', 1)[-1]
261            try:
262                return int(suffix)
263            except ValueError:
264                return 0
265            pass
266        files = [basename,]
267        for name in os.listdir(os.path.join(self.storage, 'logs')):
268            if RE_LOGFILE_BACKUP_NAME.match(name):
269                if name.rsplit('.', 1)[0] == basename:
270                    files.append(name)
271        return sorted(files, key=numerical_suffix, reverse=True)
272
273    def queryLogfiles(self, basename, query=None, limit=0, start=0):
274        """Search `query` in all logfiles starting with `basename`.
275
276        Returns an iterator of those lines in logfiles starting with
277        `basename` that match `query`. If you want the result as a
278        list, simply list() the iterator.
279
280        All logfiles with name `basename` and maybe some numerical
281        extension ('.1', '.2', ...) are searched for the `query` term
282        in correct chronological order. So, if you ask for a basename 'app.log',
283        then any file named 'app2.log', 'app.log.1', 'app.log',
284        etc. will be searched in that order.
285
286        The `query` is expected to be a string containing a regular
287        expression.
288
289        If `limit` is set to some numerical value, at most this number
290        of lines is returned.
291
292        With `start` you can give the number of first matched line to
293        return. `start` is zero-based, i.e. the first match has number
294        0, the scond one 1, etc.
295
296        Together with `limit` this allows some basic
297        batching. Please keep in mind that batching might give
298        unpredictable results, when logfiles change between two
299        requests. This is not a problem when only one file is searched
300        and changes include only appending new log messages.
301
302        Matches are found per line only (no multiline matches).
303
304        Result lines are returned as unicode instances decoded from
305        UTF-8 encoding. This means that logfiles must provide UTF-8
306        encoding for umlauts etc. if these should be rendered
307        properly. The returned unicode lines can be fed to page
308        templates even if they contain non-ASCII characters.
309
310        This method raises ValueError if some basic condition is not
311        met, for instance if the given query string is not a valid
312        regular expression.
313
314        Please note, that this exception will happen not before you
315        really fetch a result line.
316        """
317        try:
318            re_query = re.compile(query)
319        except:
320            raise ValueError('Invalid query string: %s' % query)
321
322        basename = basename.replace('/../', '')
323        files = self._logfiles(basename)
324
325        # Search the log files
326        num = 0
327        for name in files:
328            path = os.path.join(self.storage, 'logs', name)
329            if not os.path.isfile(path):
330                continue
331            for line in codecs.open(path, 'rb', 'utf-8'):
332                if not re_query.search(line):
333                    continue
334                num += 1
335                if (num - 1) < start:
336                    continue
337                yield line
338
339                if limit and (num - limit >= start):
340                    raise StopIteration
341        pass
342
343class DataCenterFile(object):
344    """A description of a file stored in data center.
345    """
346    grok.implements(IDataCenterFile)
347
348    def __init__(self, context, folder_name=''):
349        self.context = context
350        self.name = os.path.basename(self.context)
351        self.rel_path = os.path.join(folder_name, self.name)
352        self.size = self.getSize()
353        self.uploaddate = self.getDate()
354        self.lines = self.getLinesNumber()
355
356    def getDate(self):
357        """Get a human readable datetime representation.
358        """
359        date = datetime.fromtimestamp(os.path.getmtime(self.context))
360        return date.strftime("%Y-%m-%d %H:%M:%S")
361
362    def getTimeStamp(self):
363        """Get a (machine readable) timestamp.
364        """
365        return os.path.getmtime(self.context)
366
367    def getSize(self):
368        """Get a human readable file size.
369        """
370        bytesize = os.path.getsize(self.context)
371        size = "%s bytes" % bytesize
372        units = ['kb', 'MB', 'GB']
373        for power, unit in reversed(list(enumerate(units))):
374            power += 1
375            if bytesize >= 1024 ** power:
376                size = "%.2f %s" % (bytesize/(1024.0**power), unit)
377                break
378        return size
379
380    def getLinesNumber(self):
381        """Get number of lines.
382        """
383        num = 0
384        if not self.context.title().lower().endswith('csv'):
385            return 1
386        for line in open(self.context, 'rb'):
387            num += 1
388        return num
389
390class LogFile(DataCenterFile):
391    """A description of a log file.
392    """
393    def __init__(self, context):
394        super(LogFile, self).__init__(context)
395        self._markers = dict()
396        self._parsed = False
397        self.userid = self.getUserId()
398        self.mode = self.getMode()
399        self.stats = self.getStats()
400        self.source = self.getSourcePath()
401
402    def _parseFile(self, maxline=10):
403        """Find markers in a file.
404        """
405        if self._parsed:
406            return
407        for line in codecs.open(self.context, 'rb', 'utf-8'):
408            line = line.strip()
409            if not ':' in line:
410                continue
411            name, text = line.split(':', 1)
412            self._markers[name.lower()] = text
413        self._parsed = True
414        return
415
416    def _getMarker(self, marker):
417        marker = marker.lower()
418        if not self._parsed:
419            self._parseFile()
420        if marker in self._markers.keys():
421            return self._markers[marker]
422
423    def getUserId(self):
424        return self._getMarker('user') or '<UNKNOWN>'
425
426    def getMode(self):
427        return self._getMarker('mode') or '<NOT SET>'
428
429    def getStats(self):
430        return self._getMarker('processed') or '<Info not avail.>'
431
432    def getSourcePath(self):
433        return self._getMarker('source') or None
434
435
436class DataCenterStorageMovedEvent(ObjectEvent):
437    """An event fired, when datacenter storage moves.
438    """
439    grok.implements(IDataCenterStorageMovedEvent)
Note: See TracBrowser for help on using the repository browser.