source: main/waeup.sirp/trunk/src/waeup/sirp/datacenter.py @ 7068

Last change on this file since 7068 was 6827, checked in by Henrik Bettermann, 13 years ago

Use SIRP datetime format.

File size: 8.7 KB
Line 
1"""WAeUP data center.
2
3The waeup data center cares for management of upload data and provides
4tools for importing/exporting CSV data.
5"""
6import logging
7import os
8import shutil
9import grok
10from datetime import datetime
11from zope.component.interfaces import ObjectEvent
12from waeup.sirp.interfaces import (IDataCenter, IDataCenterFile,
13                                   IDataCenterStorageMovedEvent)
14from waeup.sirp.utils.helpers import copyFileSystemTree
15from waeup.sirp.utils.logger import Logger
16
17class DataCenter(grok.Container, Logger):
18    """A data center contains CSV files.
19    """
20    grok.implements(IDataCenter)
21    storage = os.path.join(os.path.dirname(__file__), 'files')
22
23    logger_name = 'waeup.sirp.${sitename}.datacenter'
24    logger_filename = 'datacenter.log'
25
26    def __init__(self, *args, **kw):
27        super(DataCenter, self).__init__(*args, **kw)
28        self._createSubDirs()
29
30    def _createSubDirs(self):
31        """Create standard subdirs.
32        """
33        for name in ['finished', 'unfinished', 'logs']:
34            path = os.path.join(self.storage, name)
35            if os.path.exists(path):
36                continue
37            os.mkdir(path)
38        return
39
40    def getFiles(self, sort='name'):
41        """Get a list of files stored in `storage`.
42
43        Files are sorted by basename.
44        """
45        result = []
46        if not os.path.exists(self.storage):
47            return result
48        for filename in sorted(os.listdir(self.storage)):
49            fullpath = os.path.join(self.storage, filename)
50            if not os.path.isfile(fullpath):
51                continue
52            result.append(DataCenterFile(fullpath))
53        if sort == 'date':
54            # sort results in newest-first order...
55            result = sorted(result, key=lambda x: x.getTimeStamp(),
56                            reverse=True)
57        return result
58
59    def getLogFiles(self):
60        """Get the files from logs/ subdir. Files are sorted by name.
61        """
62        result = []
63        logdir = os.path.join(self.storage, 'logs')
64        if not os.path.exists(logdir):
65            os.mkdir(logdir)
66        for name in sorted(os.listdir(logdir)):
67            if not os.path.isfile(os.path.join(logdir, name)):
68                continue
69            result.append(
70                LogFile(os.path.join(self.storage, 'logs', name)))
71        return result
72
73    def setStoragePath(self, path, move=False, overwrite=False):
74        """Set the path where to store files.
75        """
76        path = os.path.abspath(path)
77        not_copied = []
78        if not os.path.exists(path):
79            raise ValueError('The path given does not exist: %s' % path)
80        if move is True:
81
82            not_copied = copyFileSystemTree(self.storage, path,
83                                            overwrite=overwrite)
84        self.storage = path
85        self._createSubDirs()
86        grok.notify(DataCenterStorageMovedEvent(self))
87        return not_copied
88
89    def _moveFile(self, source, dest):
90        """Move file source to dest preserving ctime, mtime, etc.
91        """
92        if not os.path.exists(source):
93            self.logger.warn('No such source path: %s' % source)
94            return
95        if source == dest:
96            return
97        shutil.copyfile(source, dest)
98        shutil.copystat(source, dest)
99        os.unlink(source)
100
101    def distProcessedFiles(self, successful, source_path, finished_file,
102                           pending_file, mode='create', move_orig=True):
103        """Put processed files into final locations.
104
105        ``successful`` is a boolean that tells, whether processing was
106        successful.
107
108        ``source_path``: path to file that was processed.
109
110        ``finished_file``, ``pending_file``: paths to the respective
111        generated .pending and .finished file. The .pending file path
112        may be ``None``.
113
114        If finished file is placed in a location outside the local
115        storage dir, the complete directory is removed
116        afterwards. Regular importers should put their stuff in
117        dedicated temporary dirs.
118
119        See datacenter.txt for more info about how this works.
120        """
121        basename = os.path.basename(source_path)
122        pending_name = basename
123        pending = False
124        finished_dir = os.path.join(self.storage, 'finished')
125        unfinished_dir = os.path.join(self.storage, 'unfinished')
126
127        if basename.endswith('.pending.csv'):
128            maybe_basename = "%s.csv" % basename.rsplit('.', 3)[0]
129            maybe_src = os.path.join(unfinished_dir, maybe_basename)
130            if os.path.isfile(maybe_src):
131                basename = maybe_basename
132                pending = True
133
134        base, ext = os.path.splitext(basename)
135        finished_name = "%s.%s.finished%s" % (base, mode, ext)
136        if not pending:
137            pending_name = "%s.%s.pending%s" % (base, mode, ext)
138
139        # Put .pending and .finished file into respective places...
140        pending_dest = os.path.join(self.storage, pending_name)
141        finished_dest = os.path.join(finished_dir, finished_name)
142        self._moveFile(finished_file, finished_dest)
143        if pending_file is not None:
144            self._moveFile(pending_file, pending_dest)
145
146        # Put source file into final location...
147        finished_dest = os.path.join(finished_dir, basename)
148        unfinished_dest = os.path.join(unfinished_dir, basename)
149        if successful and not pending:
150            self._moveFile(source_path, finished_dest)
151        elif successful and pending:
152            self._moveFile(unfinished_dest, finished_dest)
153            os.unlink(source_path)
154        elif not successful and not pending:
155            self._moveFile(source_path, unfinished_dest)
156
157        # If finished and pending-file were created in a location
158        # outside datacenter storage, we remove it.
159        maybe_temp_dir = os.path.dirname(finished_file)
160        if os.path.commonprefix(
161            [self.storage, maybe_temp_dir]) != self.storage:
162            shutil.rmtree(maybe_temp_dir)
163        return
164
165
166class DataCenterFile(object):
167    """A description of a file stored in data center.
168    """
169    grok.implements(IDataCenterFile)
170
171    def __init__(self, context):
172        self.context = context
173        self.name = os.path.basename(self.context)
174        self.size = self.getSize()
175        self.uploaddate = self.getDate()
176        self.lines = self.getLinesNumber()
177
178    def getDate(self):
179        """Get a human readable datetime representation.
180        """
181        date = datetime.fromtimestamp(os.path.getctime(self.context))
182        return date.strftime("%Y-%m-%d %H:%M:%S")
183
184    def getTimeStamp(self):
185        """Get a (machine readable) timestamp.
186        """
187        return os.path.getctime(self.context)
188
189    def getSize(self):
190        """Get a human readable file size.
191        """
192        bytesize = os.path.getsize(self.context)
193        size = "%s bytes" % bytesize
194        units = ['kb', 'MB', 'GB']
195        for power, unit in reversed(list(enumerate(units))):
196            power += 1
197            if bytesize >= 1024 ** power:
198                size = "%.2f %s" % (bytesize/(1024.0**power), unit)
199                break
200        return size
201
202    def getLinesNumber(self):
203        """Get number of lines.
204        """
205        num = 0
206        for line in open(self.context, 'rb'):
207            num += 1
208        return num
209
210class LogFile(DataCenterFile):
211    """A description of a log file.
212    """
213    def __init__(self, context):
214        super(LogFile, self).__init__(context)
215        self._markers = dict()
216        self._parsed = False
217        self.userid = self.getUserId()
218        self.mode = self.getMode()
219        self.stats = self.getStats()
220        self.source = self.getSourcePath()
221
222    def _parseFile(self, maxline=10):
223        """Find markers in a file.
224        """
225        if self._parsed:
226            return
227        for line in open(self.context, 'rb'):
228            line = line.strip()
229            if not ':' in line:
230                continue
231            name, text = line.split(':', 1)
232            self._markers[name.lower()] = text
233        self._parsed = True
234        return
235
236    def _getMarker(self, marker):
237        marker = marker.lower()
238        if not self._parsed:
239            self._parseFile()
240        if marker in self._markers.keys():
241            return self._markers[marker]
242
243    def getUserId(self):
244        return self._getMarker('user') or '<UNKNOWN>'
245
246    def getMode(self):
247        return self._getMarker('mode') or '<NOT SET>'
248
249    def getStats(self):
250        return self._getMarker('processed') or '<Info not avail.>'
251
252    def getSourcePath(self):
253        return self._getMarker('source') or None
254
255
256class DataCenterStorageMovedEvent(ObjectEvent):
257    """An event fired, when datacenter storage moves.
258    """
259    grok.implements(IDataCenterStorageMovedEvent)
Note: See TracBrowser for help on using the repository browser.