source: main/waeup.sirp/trunk/src/waeup/sirp/datacenter.py @ 7104

Last change on this file since 7104 was 6827, checked in by Henrik Bettermann, 13 years ago

Use SIRP datetime format.

File size: 8.7 KB
RevLine 
[4146]1"""WAeUP data center.
2
3The waeup data center cares for management of upload data and provides
4tools for importing/exporting CSV data.
5"""
[4873]6import logging
[4146]7import os
[4896]8import shutil
[4146]9import grok
10from datetime import datetime
[4883]11from zope.component.interfaces import ObjectEvent
[4961]12from waeup.sirp.interfaces import (IDataCenter, IDataCenterFile,
13                                   IDataCenterStorageMovedEvent)
[4920]14from waeup.sirp.utils.helpers import copyFileSystemTree
[6578]15from waeup.sirp.utils.logger import Logger
[4146]16
[6578]17class DataCenter(grok.Container, Logger):
[4146]18    """A data center contains CSV files.
19    """
[4669]20    grok.implements(IDataCenter)
[4146]21    storage = os.path.join(os.path.dirname(__file__), 'files')
[4166]22
[6578]23    logger_name = 'waeup.sirp.${sitename}.datacenter'
24    logger_filename = 'datacenter.log'
[4873]25
[4892]26    def __init__(self, *args, **kw):
27        super(DataCenter, self).__init__(*args, **kw)
28        self._createSubDirs()
[6286]29
[4892]30    def _createSubDirs(self):
31        """Create standard subdirs.
32        """
[6578]33        for name in ['finished', 'unfinished', 'logs']:
[4892]34            path = os.path.join(self.storage, name)
35            if os.path.exists(path):
36                continue
37            os.mkdir(path)
38        return
[6286]39
[4858]40    def getFiles(self, sort='name'):
[4146]41        """Get a list of files stored in `storage`.
[4574]42
43        Files are sorted by basename.
[4146]44        """
45        result = []
46        if not os.path.exists(self.storage):
47            return result
[4574]48        for filename in sorted(os.listdir(self.storage)):
[4146]49            fullpath = os.path.join(self.storage, filename)
50            if not os.path.isfile(fullpath):
51                continue
52            result.append(DataCenterFile(fullpath))
[4858]53        if sort == 'date':
54            # sort results in newest-first order...
55            result = sorted(result, key=lambda x: x.getTimeStamp(),
56                            reverse=True)
[4146]57        return result
58
[4858]59    def getLogFiles(self):
[4908]60        """Get the files from logs/ subdir. Files are sorted by name.
[4858]61        """
62        result = []
[4908]63        logdir = os.path.join(self.storage, 'logs')
64        if not os.path.exists(logdir):
65            os.mkdir(logdir)
66        for name in sorted(os.listdir(logdir)):
67            if not os.path.isfile(os.path.join(logdir, name)):
[4858]68                continue
69            result.append(
[4908]70                LogFile(os.path.join(self.storage, 'logs', name)))
[4858]71        return result
[6286]72
[4190]73    def setStoragePath(self, path, move=False, overwrite=False):
[4146]74        """Set the path where to store files.
75        """
76        path = os.path.abspath(path)
[4190]77        not_copied = []
[4152]78        if not os.path.exists(path):
79            raise ValueError('The path given does not exist: %s' % path)
[4173]80        if move is True:
[4190]81
82            not_copied = copyFileSystemTree(self.storage, path,
83                                            overwrite=overwrite)
[4146]84        self.storage = path
[4892]85        self._createSubDirs()
[4883]86        grok.notify(DataCenterStorageMovedEvent(self))
[4190]87        return not_copied
[4146]88
[4896]89    def _moveFile(self, source, dest):
90        """Move file source to dest preserving ctime, mtime, etc.
91        """
92        if not os.path.exists(source):
93            self.logger.warn('No such source path: %s' % source)
94            return
95        if source == dest:
96            return
97        shutil.copyfile(source, dest)
98        shutil.copystat(source, dest)
99        os.unlink(source)
[6286]100
[4896]101    def distProcessedFiles(self, successful, source_path, finished_file,
[4996]102                           pending_file, mode='create', move_orig=True):
[4896]103        """Put processed files into final locations.
[4858]104
[4896]105        ``successful`` is a boolean that tells, whether processing was
106        successful.
107
108        ``source_path``: path to file that was processed.
109
110        ``finished_file``, ``pending_file``: paths to the respective
111        generated .pending and .finished file. The .pending file path
112        may be ``None``.
113
[4904]114        If finished file is placed in a location outside the local
115        storage dir, the complete directory is removed
116        afterwards. Regular importers should put their stuff in
117        dedicated temporary dirs.
[6286]118
[4896]119        See datacenter.txt for more info about how this works.
120        """
121        basename = os.path.basename(source_path)
122        pending_name = basename
123        pending = False
124        finished_dir = os.path.join(self.storage, 'finished')
125        unfinished_dir = os.path.join(self.storage, 'unfinished')
126
127        if basename.endswith('.pending.csv'):
[4996]128            maybe_basename = "%s.csv" % basename.rsplit('.', 3)[0]
[4896]129            maybe_src = os.path.join(unfinished_dir, maybe_basename)
130            if os.path.isfile(maybe_src):
131                basename = maybe_basename
132                pending = True
[6286]133
[4896]134        base, ext = os.path.splitext(basename)
[4996]135        finished_name = "%s.%s.finished%s" % (base, mode, ext)
[4896]136        if not pending:
[4996]137            pending_name = "%s.%s.pending%s" % (base, mode, ext)
[4896]138
139        # Put .pending and .finished file into respective places...
140        pending_dest = os.path.join(self.storage, pending_name)
141        finished_dest = os.path.join(finished_dir, finished_name)
142        self._moveFile(finished_file, finished_dest)
143        if pending_file is not None:
144            self._moveFile(pending_file, pending_dest)
145
146        # Put source file into final location...
147        finished_dest = os.path.join(finished_dir, basename)
148        unfinished_dest = os.path.join(unfinished_dir, basename)
149        if successful and not pending:
150            self._moveFile(source_path, finished_dest)
151        elif successful and pending:
152            self._moveFile(unfinished_dest, finished_dest)
153            os.unlink(source_path)
154        elif not successful and not pending:
155            self._moveFile(source_path, unfinished_dest)
[4904]156
157        # If finished and pending-file were created in a location
158        # outside datacenter storage, we remove it.
[4906]159        maybe_temp_dir = os.path.dirname(finished_file)
[4904]160        if os.path.commonprefix(
[4906]161            [self.storage, maybe_temp_dir]) != self.storage:
162            shutil.rmtree(maybe_temp_dir)
[4896]163        return
164
[6286]165
[4146]166class DataCenterFile(object):
167    """A description of a file stored in data center.
168    """
[4166]169    grok.implements(IDataCenterFile)
[6286]170
[4146]171    def __init__(self, context):
172        self.context = context
173        self.name = os.path.basename(self.context)
174        self.size = self.getSize()
175        self.uploaddate = self.getDate()
[4858]176        self.lines = self.getLinesNumber()
[4146]177
178    def getDate(self):
179        """Get a human readable datetime representation.
180        """
181        date = datetime.fromtimestamp(os.path.getctime(self.context))
[6827]182        return date.strftime("%Y-%m-%d %H:%M:%S")
[4146]183
[4858]184    def getTimeStamp(self):
185        """Get a (machine readable) timestamp.
186        """
187        return os.path.getctime(self.context)
[6286]188
[4146]189    def getSize(self):
190        """Get a human readable file size.
191        """
192        bytesize = os.path.getsize(self.context)
193        size = "%s bytes" % bytesize
194        units = ['kb', 'MB', 'GB']
195        for power, unit in reversed(list(enumerate(units))):
196            power += 1
197            if bytesize >= 1024 ** power:
198                size = "%.2f %s" % (bytesize/(1024.0**power), unit)
199                break
200        return size
201
[4858]202    def getLinesNumber(self):
203        """Get number of lines.
204        """
205        num = 0
206        for line in open(self.context, 'rb'):
207            num += 1
208        return num
[6286]209
[4858]210class LogFile(DataCenterFile):
211    """A description of a log file.
212    """
213    def __init__(self, context):
214        super(LogFile, self).__init__(context)
215        self._markers = dict()
216        self._parsed = False
217        self.userid = self.getUserId()
218        self.mode = self.getMode()
219        self.stats = self.getStats()
220        self.source = self.getSourcePath()
221
222    def _parseFile(self, maxline=10):
223        """Find markers in a file.
224        """
225        if self._parsed:
226            return
227        for line in open(self.context, 'rb'):
228            line = line.strip()
229            if not ':' in line:
230                continue
231            name, text = line.split(':', 1)
232            self._markers[name.lower()] = text
233        self._parsed = True
234        return
235
236    def _getMarker(self, marker):
237        marker = marker.lower()
238        if not self._parsed:
239            self._parseFile()
240        if marker in self._markers.keys():
241            return self._markers[marker]
[6286]242
[4858]243    def getUserId(self):
244        return self._getMarker('user') or '<UNKNOWN>'
245
246    def getMode(self):
247        return self._getMarker('mode') or '<NOT SET>'
248
249    def getStats(self):
250        return self._getMarker('processed') or '<Info not avail.>'
251
252    def getSourcePath(self):
253        return self._getMarker('source') or None
[4883]254
[4961]255
[4883]256class DataCenterStorageMovedEvent(ObjectEvent):
257    """An event fired, when datacenter storage moves.
258    """
259    grok.implements(IDataCenterStorageMovedEvent)
Note: See TracBrowser for help on using the repository browser.