source: main/waeup.sirp/trunk/src/waeup/sirp/datacenter.py @ 6367

Last change on this file since 6367 was 6286, checked in by uli, 13 years ago

Remove trailing whitespace.

File size: 9.9 KB
RevLine 
[4146]1"""WAeUP data center.
2
3The waeup data center cares for management of upload data and provides
4tools for importing/exporting CSV data.
5"""
[4873]6import logging
[4146]7import os
[4896]8import shutil
[4146]9import grok
10from datetime import datetime
[4883]11from zope.component.interfaces import ObjectEvent
[4961]12from waeup.sirp.interfaces import (IDataCenter, IDataCenterFile,
13                                   IDataCenterStorageMovedEvent)
[4920]14from waeup.sirp.utils.helpers import copyFileSystemTree
[4146]15
16class DataCenter(grok.Container):
17    """A data center contains CSV files.
18    """
[4669]19    grok.implements(IDataCenter)
[4146]20    storage = os.path.join(os.path.dirname(__file__), 'files')
[4166]21
[4873]22    @property
23    def logger(self):
24        """Get a logger for datacenter actions.
25        """
26        # We need a different logger for every site...
27        site = grok.getSite()
[4883]28        sitename = getattr(site, '__name__', 'app')
[4873]29        loggername = 'waeup.sirp.%s.datacenter' % sitename
30        logger = logging.getLogger(loggername)
31        if not logger.handlers:
32            logger = self._setupLogger(logger)
33        return logger
34
[4892]35    def __init__(self, *args, **kw):
36        super(DataCenter, self).__init__(*args, **kw)
37        self._createSubDirs()
[6286]38
[4873]39    def _setupLogger(self, logger):
40        """Setup datacenter logger.
41        """
42        logdir = os.path.join(self.storage, 'logs')
43        if not os.path.exists(logdir):
44            os.mkdir(logdir)
45        filename = os.path.join(logdir, 'datacenter.log')
46
47        # Create a rotating file handler logger for datacenter.
48        handler = logging.handlers.RotatingFileHandler(
49            filename, maxBytes=5*1024**1, backupCount=5)
50        formatter = logging.Formatter(
51            '%(asctime)s - %(levelname)s - %(message)s')
52        handler.setFormatter(formatter)
[6286]53
[4873]54        # Here we decide, whether our messages will _also_ go to
55        # application log.
56        logger.propagate = False
[4883]57        logger.setLevel(logging.DEBUG)
[4873]58        logger.addHandler(handler)
59        return logger
60
[4892]61    def _createSubDirs(self):
62        """Create standard subdirs.
63        """
[4896]64        for name in ['finished', 'unfinished']:
[4892]65            path = os.path.join(self.storage, name)
66            if os.path.exists(path):
67                continue
68            os.mkdir(path)
69        return
[6286]70
[4858]71    def getFiles(self, sort='name'):
[4146]72        """Get a list of files stored in `storage`.
[4574]73
74        Files are sorted by basename.
[4146]75        """
76        result = []
77        if not os.path.exists(self.storage):
78            return result
[4574]79        for filename in sorted(os.listdir(self.storage)):
[4146]80            fullpath = os.path.join(self.storage, filename)
81            if not os.path.isfile(fullpath):
82                continue
83            result.append(DataCenterFile(fullpath))
[4858]84        if sort == 'date':
85            # sort results in newest-first order...
86            result = sorted(result, key=lambda x: x.getTimeStamp(),
87                            reverse=True)
[4146]88        return result
89
[4858]90    def getLogFiles(self):
[4908]91        """Get the files from logs/ subdir. Files are sorted by name.
[4858]92        """
93        result = []
[4908]94        logdir = os.path.join(self.storage, 'logs')
95        if not os.path.exists(logdir):
96            os.mkdir(logdir)
97        for name in sorted(os.listdir(logdir)):
98            if not os.path.isfile(os.path.join(logdir, name)):
[4858]99                continue
100            result.append(
[4908]101                LogFile(os.path.join(self.storage, 'logs', name)))
[4858]102        return result
[6286]103
[4190]104    def setStoragePath(self, path, move=False, overwrite=False):
[4146]105        """Set the path where to store files.
106        """
107        path = os.path.abspath(path)
[4190]108        not_copied = []
[4152]109        if not os.path.exists(path):
110            raise ValueError('The path given does not exist: %s' % path)
[4173]111        if move is True:
[4190]112
113            not_copied = copyFileSystemTree(self.storage, path,
114                                            overwrite=overwrite)
[4146]115        self.storage = path
[4892]116        self._createSubDirs()
[4883]117        # Adjust logger...
118        logger = self.logger
119        handlers = logger.handlers
120        for handler in handlers:
121            logger.removeHandler(handler)
122        self._setupLogger(logger)
123        grok.notify(DataCenterStorageMovedEvent(self))
[4190]124        return not_copied
[4146]125
[4896]126    def _moveFile(self, source, dest):
127        """Move file source to dest preserving ctime, mtime, etc.
128        """
129        if not os.path.exists(source):
130            self.logger.warn('No such source path: %s' % source)
131            return
132        if source == dest:
133            return
134        shutil.copyfile(source, dest)
135        shutil.copystat(source, dest)
136        os.unlink(source)
[6286]137
[4896]138    def distProcessedFiles(self, successful, source_path, finished_file,
[4996]139                           pending_file, mode='create', move_orig=True):
[4896]140        """Put processed files into final locations.
[4858]141
[4896]142        ``successful`` is a boolean that tells, whether processing was
143        successful.
144
145        ``source_path``: path to file that was processed.
146
147        ``finished_file``, ``pending_file``: paths to the respective
148        generated .pending and .finished file. The .pending file path
149        may be ``None``.
150
[4904]151        If finished file is placed in a location outside the local
152        storage dir, the complete directory is removed
153        afterwards. Regular importers should put their stuff in
154        dedicated temporary dirs.
[6286]155
[4896]156        See datacenter.txt for more info about how this works.
157        """
158        basename = os.path.basename(source_path)
159        pending_name = basename
160        pending = False
161        finished_dir = os.path.join(self.storage, 'finished')
162        unfinished_dir = os.path.join(self.storage, 'unfinished')
163
164        if basename.endswith('.pending.csv'):
[4996]165            maybe_basename = "%s.csv" % basename.rsplit('.', 3)[0]
[4896]166            maybe_src = os.path.join(unfinished_dir, maybe_basename)
167            if os.path.isfile(maybe_src):
168                basename = maybe_basename
169                pending = True
[6286]170
[4896]171        base, ext = os.path.splitext(basename)
[4996]172        finished_name = "%s.%s.finished%s" % (base, mode, ext)
[4896]173        if not pending:
[4996]174            pending_name = "%s.%s.pending%s" % (base, mode, ext)
[4896]175
176        # Put .pending and .finished file into respective places...
177        pending_dest = os.path.join(self.storage, pending_name)
178        finished_dest = os.path.join(finished_dir, finished_name)
179        self._moveFile(finished_file, finished_dest)
180        if pending_file is not None:
181            self._moveFile(pending_file, pending_dest)
182
183        # Put source file into final location...
184        finished_dest = os.path.join(finished_dir, basename)
185        unfinished_dest = os.path.join(unfinished_dir, basename)
186        if successful and not pending:
187            self._moveFile(source_path, finished_dest)
188        elif successful and pending:
189            self._moveFile(unfinished_dest, finished_dest)
190            os.unlink(source_path)
191        elif not successful and not pending:
192            self._moveFile(source_path, unfinished_dest)
[4904]193
194        # If finished and pending-file were created in a location
195        # outside datacenter storage, we remove it.
[4906]196        maybe_temp_dir = os.path.dirname(finished_file)
[4904]197        if os.path.commonprefix(
[4906]198            [self.storage, maybe_temp_dir]) != self.storage:
199            shutil.rmtree(maybe_temp_dir)
[4896]200        return
201
[6286]202
[4146]203class DataCenterFile(object):
204    """A description of a file stored in data center.
205    """
[4166]206    grok.implements(IDataCenterFile)
[6286]207
[4146]208    def __init__(self, context):
209        self.context = context
210        self.name = os.path.basename(self.context)
211        self.size = self.getSize()
212        self.uploaddate = self.getDate()
[4858]213        self.lines = self.getLinesNumber()
[4146]214
215    def getDate(self):
216        """Get a human readable datetime representation.
217        """
218        date = datetime.fromtimestamp(os.path.getctime(self.context))
219        return date.strftime('%c')
220
[4858]221    def getTimeStamp(self):
222        """Get a (machine readable) timestamp.
223        """
224        return os.path.getctime(self.context)
[6286]225
[4146]226    def getSize(self):
227        """Get a human readable file size.
228        """
229        bytesize = os.path.getsize(self.context)
230        size = "%s bytes" % bytesize
231        units = ['kb', 'MB', 'GB']
232        for power, unit in reversed(list(enumerate(units))):
233            power += 1
234            if bytesize >= 1024 ** power:
235                size = "%.2f %s" % (bytesize/(1024.0**power), unit)
236                break
237        return size
238
[4858]239    def getLinesNumber(self):
240        """Get number of lines.
241        """
242        num = 0
243        for line in open(self.context, 'rb'):
244            num += 1
245        return num
[6286]246
[4858]247class LogFile(DataCenterFile):
248    """A description of a log file.
249    """
250    def __init__(self, context):
251        super(LogFile, self).__init__(context)
252        self._markers = dict()
253        self._parsed = False
254        self.userid = self.getUserId()
255        self.mode = self.getMode()
256        self.stats = self.getStats()
257        self.source = self.getSourcePath()
258
259    def _parseFile(self, maxline=10):
260        """Find markers in a file.
261        """
262        if self._parsed:
263            return
264        for line in open(self.context, 'rb'):
265            line = line.strip()
266            if not ':' in line:
267                continue
268            name, text = line.split(':', 1)
269            self._markers[name.lower()] = text
270        self._parsed = True
271        return
272
273    def _getMarker(self, marker):
274        marker = marker.lower()
275        if not self._parsed:
276            self._parseFile()
277        if marker in self._markers.keys():
278            return self._markers[marker]
[6286]279
[4858]280    def getUserId(self):
281        return self._getMarker('user') or '<UNKNOWN>'
282
283    def getMode(self):
284        return self._getMarker('mode') or '<NOT SET>'
285
286    def getStats(self):
287        return self._getMarker('processed') or '<Info not avail.>'
288
289    def getSourcePath(self):
290        return self._getMarker('source') or None
[4883]291
[4961]292
[4883]293class DataCenterStorageMovedEvent(ObjectEvent):
294    """An event fired, when datacenter storage moves.
295    """
296    grok.implements(IDataCenterStorageMovedEvent)
Note: See TracBrowser for help on using the repository browser.