source: main/waeup.kofa/trunk/src/waeup/kofa/datacenter.py @ 8305

Last change on this file since 8305 was 7933, checked in by Henrik Bettermann, 13 years ago

Rename importers to processors.

  • Property svn:keywords set to Id
File size: 9.5 KB
Line 
1## $Id: datacenter.py 7933 2012-03-21 13:42:00Z henrik $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
18"""Kofa data center.
19
20The waeup data center cares for management of upload data and provides
21tools for importing/exporting CSV data.
22"""
23import logging
24import os
25import shutil
26import grok
27from datetime import datetime
28from zope.component import getUtility
29from zope.component.interfaces import ObjectEvent
30from waeup.kofa.interfaces import (IDataCenter, IDataCenterFile,
31                                   IDataCenterStorageMovedEvent,
32                                   IDataCenterConfig)
33from waeup.kofa.utils.helpers import copy_filesystem_tree
34from waeup.kofa.utils.logger import Logger
35
36class DataCenter(grok.Container, Logger):
37    """A data center contains CSV files.
38    """
39    grok.implements(IDataCenter)
40
41    logger_name = 'waeup.kofa.${sitename}.datacenter'
42    logger_filename = 'datacenter.log'
43
44    def __init__(self, *args, **kw):
45        super(DataCenter, self).__init__(*args, **kw)
46        self.storage = getUtility(IDataCenterConfig)['path']
47        self._createSubDirs()
48
49    def _createSubDirs(self):
50        """Create standard subdirs.
51        """
52        for name in ['finished', 'unfinished', 'logs']:
53            path = os.path.join(self.storage, name)
54            if os.path.exists(path):
55                continue
56            os.mkdir(path)
57        return
58
59    def getFiles(self, sort='name'):
60        """Get a list of files stored in `storage`.
61
62        Files are sorted by basename.
63        """
64        result = []
65        if not os.path.exists(self.storage):
66            return result
67        for filename in sorted(os.listdir(self.storage)):
68            fullpath = os.path.join(self.storage, filename)
69            if not os.path.isfile(fullpath):
70                continue
71            result.append(DataCenterFile(fullpath))
72        if sort == 'date':
73            # sort results in newest-first order...
74            result = sorted(result, key=lambda x: x.getTimeStamp(),
75                            reverse=True)
76        return result
77
78    def getLogFiles(self):
79        """Get the files from logs/ subdir. Files are sorted by name.
80        """
81        result = []
82        logdir = os.path.join(self.storage, 'logs')
83        if not os.path.exists(logdir):
84            os.mkdir(logdir)
85        for name in sorted(os.listdir(logdir)):
86            if not os.path.isfile(os.path.join(logdir, name)):
87                continue
88            result.append(
89                LogFile(os.path.join(self.storage, 'logs', name)))
90        return result
91
92    def setStoragePath(self, path, move=False, overwrite=False):
93        """Set the path where to store files.
94        """
95        path = os.path.abspath(path)
96        not_copied = []
97        if not os.path.exists(path):
98            raise ValueError('The path given does not exist: %s' % path)
99        if move is True:
100
101            not_copied = copy_filesystem_tree(self.storage, path,
102                                            overwrite=overwrite)
103        self.storage = path
104        self._createSubDirs()
105        grok.notify(DataCenterStorageMovedEvent(self))
106        return not_copied
107
108    def _moveFile(self, source, dest):
109        """Move file source to dest preserving ctime, mtime, etc.
110        """
111        if not os.path.exists(source):
112            self.logger.warn('No such source path: %s' % source)
113            return
114        if source == dest:
115            return
116        shutil.copyfile(source, dest)
117        shutil.copystat(source, dest)
118        os.unlink(source)
119
120    def distProcessedFiles(self, successful, source_path, finished_file,
121                           pending_file, mode='create', move_orig=True):
122        """Put processed files into final locations.
123
124        ``successful`` is a boolean that tells, whether processing was
125        successful.
126
127        ``source_path``: path to file that was processed.
128
129        ``finished_file``, ``pending_file``: paths to the respective
130        generated .pending and .finished file. The .pending file path
131        may be ``None``.
132
133        If finished file is placed in a location outside the local
134        storage dir, the complete directory is removed
135        afterwards. Regular processors should put their stuff in
136        dedicated temporary dirs.
137
138        See datacenter.txt for more info about how this works.
139        """
140        basename = os.path.basename(source_path)
141        pending_name = basename
142        pending = False
143        finished_dir = os.path.join(self.storage, 'finished')
144        unfinished_dir = os.path.join(self.storage, 'unfinished')
145
146        if basename.endswith('.pending.csv'):
147            maybe_basename = "%s.csv" % basename.rsplit('.', 3)[0]
148            maybe_src = os.path.join(unfinished_dir, maybe_basename)
149            if os.path.isfile(maybe_src):
150                basename = maybe_basename
151                pending = True
152
153        base, ext = os.path.splitext(basename)
154        finished_name = "%s.%s.finished%s" % (base, mode, ext)
155        if not pending:
156            pending_name = "%s.%s.pending%s" % (base, mode, ext)
157
158        # Put .pending and .finished file into respective places...
159        pending_dest = os.path.join(self.storage, pending_name)
160        finished_dest = os.path.join(finished_dir, finished_name)
161        self._moveFile(finished_file, finished_dest)
162        if pending_file is not None:
163            self._moveFile(pending_file, pending_dest)
164
165        # Put source file into final location...
166        finished_dest = os.path.join(finished_dir, basename)
167        unfinished_dest = os.path.join(unfinished_dir, basename)
168        if successful and not pending:
169            self._moveFile(source_path, finished_dest)
170        elif successful and pending:
171            self._moveFile(unfinished_dest, finished_dest)
172            os.unlink(source_path)
173        elif not successful and not pending:
174            self._moveFile(source_path, unfinished_dest)
175
176        # If finished and pending-file were created in a location
177        # outside datacenter storage, we remove it.
178        maybe_temp_dir = os.path.dirname(finished_file)
179        if os.path.commonprefix(
180            [self.storage, maybe_temp_dir]) != self.storage:
181            shutil.rmtree(maybe_temp_dir)
182        return
183
184
185class DataCenterFile(object):
186    """A description of a file stored in data center.
187    """
188    grok.implements(IDataCenterFile)
189
190    def __init__(self, context):
191        self.context = context
192        self.name = os.path.basename(self.context)
193        self.size = self.getSize()
194        self.uploaddate = self.getDate()
195        self.lines = self.getLinesNumber()
196
197    def getDate(self):
198        """Get a human readable datetime representation.
199        """
200        date = datetime.fromtimestamp(os.path.getctime(self.context))
201        return date.strftime("%Y-%m-%d %H:%M:%S")
202
203    def getTimeStamp(self):
204        """Get a (machine readable) timestamp.
205        """
206        return os.path.getctime(self.context)
207
208    def getSize(self):
209        """Get a human readable file size.
210        """
211        bytesize = os.path.getsize(self.context)
212        size = "%s bytes" % bytesize
213        units = ['kb', 'MB', 'GB']
214        for power, unit in reversed(list(enumerate(units))):
215            power += 1
216            if bytesize >= 1024 ** power:
217                size = "%.2f %s" % (bytesize/(1024.0**power), unit)
218                break
219        return size
220
221    def getLinesNumber(self):
222        """Get number of lines.
223        """
224        num = 0
225        for line in open(self.context, 'rb'):
226            num += 1
227        return num
228
229class LogFile(DataCenterFile):
230    """A description of a log file.
231    """
232    def __init__(self, context):
233        super(LogFile, self).__init__(context)
234        self._markers = dict()
235        self._parsed = False
236        self.userid = self.getUserId()
237        self.mode = self.getMode()
238        self.stats = self.getStats()
239        self.source = self.getSourcePath()
240
241    def _parseFile(self, maxline=10):
242        """Find markers in a file.
243        """
244        if self._parsed:
245            return
246        for line in open(self.context, 'rb'):
247            line = line.strip()
248            if not ':' in line:
249                continue
250            name, text = line.split(':', 1)
251            self._markers[name.lower()] = text
252        self._parsed = True
253        return
254
255    def _getMarker(self, marker):
256        marker = marker.lower()
257        if not self._parsed:
258            self._parseFile()
259        if marker in self._markers.keys():
260            return self._markers[marker]
261
262    def getUserId(self):
263        return self._getMarker('user') or '<UNKNOWN>'
264
265    def getMode(self):
266        return self._getMarker('mode') or '<NOT SET>'
267
268    def getStats(self):
269        return self._getMarker('processed') or '<Info not avail.>'
270
271    def getSourcePath(self):
272        return self._getMarker('source') or None
273
274
275class DataCenterStorageMovedEvent(ObjectEvent):
276    """An event fired, when datacenter storage moves.
277    """
278    grok.implements(IDataCenterStorageMovedEvent)
Note: See TracBrowser for help on using the repository browser.