source: main/waeup.sirp/trunk/src/waeup/sirp/datacenter.py @ 4951

Last change on this file since 4951 was 4920, checked in by uli, 15 years ago

Make unit tests run again with the new package layout.

File size: 15.0 KB
Line 
1"""WAeUP data center.
2
3The waeup data center cares for management of upload data and provides
4tools for importing/exporting CSV data.
5"""
6import logging
7import os
8import shutil
9import struct
10import grok
11from datetime import datetime
12from zope.component import getMultiAdapter
13from zope.component.interfaces import ObjectEvent
14from waeup.sirp.csvfile import getCSVFile
15from waeup.sirp.interfaces import (IDataCenter, IWAeUPCSVImporter,
16                              ICSVDataReceivers, IDataCenterFile,
17                              IDataCenterStorageMovedEvent)
18from waeup.sirp.utils.helpers import copyFileSystemTree
19
20class DataCenter(grok.Container):
21    """A data center contains CSV files.
22    """
23    grok.implements(IDataCenter)
24    storage = os.path.join(os.path.dirname(__file__), 'files')
25
26    @property
27    def logger(self):
28        """Get a logger for datacenter actions.
29        """
30        # We need a different logger for every site...
31        site = grok.getSite()
32        sitename = getattr(site, '__name__', 'app')
33        loggername = 'waeup.sirp.%s.datacenter' % sitename
34        logger = logging.getLogger(loggername)
35        if not logger.handlers:
36            logger = self._setupLogger(logger)
37        return logger
38
39    def __init__(self, *args, **kw):
40        super(DataCenter, self).__init__(*args, **kw)
41        self._createSubDirs()
42       
43    def _setupLogger(self, logger):
44        """Setup datacenter logger.
45        """
46        logdir = os.path.join(self.storage, 'logs')
47        if not os.path.exists(logdir):
48            os.mkdir(logdir)
49        filename = os.path.join(logdir, 'datacenter.log')
50
51        # Create a rotating file handler logger for datacenter.
52        handler = logging.handlers.RotatingFileHandler(
53            filename, maxBytes=5*1024**1, backupCount=5)
54        formatter = logging.Formatter(
55            '%(asctime)s - %(levelname)s - %(message)s')
56        handler.setFormatter(formatter)
57       
58        # Here we decide, whether our messages will _also_ go to
59        # application log.
60        logger.propagate = False
61        logger.setLevel(logging.DEBUG)
62        logger.addHandler(handler)
63        return logger
64
65    def _createSubDirs(self):
66        """Create standard subdirs.
67        """
68        for name in ['finished', 'unfinished']:
69            path = os.path.join(self.storage, name)
70            if os.path.exists(path):
71                continue
72            os.mkdir(path)
73        return
74   
75    def getReceivers(self):
76        receivers = []
77        curr_obj = getattr(self, '__parent__', None)
78        while curr_obj is not None:
79            if ICSVDataReceivers.providedBy(curr_obj):
80                receivers = self.getCSVDataReceivers(curr_obj)
81                break
82            curr_obj = getattr(curr_obj, '__parent__', None)
83        return receivers
84
85    def getReceiverIds(self):
86        """Get a dict of available receivers.
87
88        The keys of the result are the receiver ids.
89        """
90        receivers = self.getReceivers()
91        return dict([(self.getReceiverId(x), x) for x in receivers])
92   
93    def getImporters(self):
94        """Get a list of all importers available.
95
96        The search for available importers is done in two steps:
97
98        1) Look for a utility providing ICSVDataReceiver,
99
100        2) For every item of that utility: try to get an adapter
101           providing IWAeUPCSVImporter.
102        """
103        result = []
104        receivers = self.getReceivers()
105        files = self.getFiles()
106        for receiver in receivers:
107            for file in files:
108                wrapped_file = getCSVFile(file.context)
109                try:
110                    importer = getMultiAdapter((wrapped_file, receiver),
111                                           IWAeUPCSVImporter)
112                    result.append(importer)
113                except:
114                    # No multi-adapter for this combination available...
115                    pass
116        return result
117   
118    def getFiles(self, sort='name'):
119        """Get a list of files stored in `storage`.
120
121        Files are sorted by basename.
122        """
123        result = []
124        if not os.path.exists(self.storage):
125            return result
126        for filename in sorted(os.listdir(self.storage)):
127            fullpath = os.path.join(self.storage, filename)
128            if not os.path.isfile(fullpath):
129                continue
130            result.append(DataCenterFile(fullpath))
131        if sort == 'date':
132            # sort results in newest-first order...
133            result = sorted(result, key=lambda x: x.getTimeStamp(),
134                            reverse=True)
135        return result
136
137    def getLogFiles(self):
138        """Get the files from logs/ subdir. Files are sorted by name.
139        """
140        result = []
141        logdir = os.path.join(self.storage, 'logs')
142        if not os.path.exists(logdir):
143            os.mkdir(logdir)
144        for name in sorted(os.listdir(logdir)):
145            if not os.path.isfile(os.path.join(logdir, name)):
146                continue
147            result.append(
148                LogFile(os.path.join(self.storage, 'logs', name)))
149        return result
150           
151    def setStoragePath(self, path, move=False, overwrite=False):
152        """Set the path where to store files.
153        """
154        path = os.path.abspath(path)
155        not_copied = []
156        if not os.path.exists(path):
157            raise ValueError('The path given does not exist: %s' % path)
158        if move is True:
159
160            not_copied = copyFileSystemTree(self.storage, path,
161                                            overwrite=overwrite)
162        self.storage = path
163        self._createSubDirs()
164        # Adjust logger...
165        logger = self.logger
166        handlers = logger.handlers
167        for handler in handlers:
168            logger.removeHandler(handler)
169        self._setupLogger(logger)
170        grok.notify(DataCenterStorageMovedEvent(self))
171        return not_copied
172
173    def getCSVDataReceivers(self, obj):
174        """Get a list of attributes, that can receive CSV data.
175
176        We also get a list of values, if obj is a container that
177        contains value items.
178        """
179        result = []
180        for attr_name in dir(obj):
181            if attr_name.startswith('_'):
182                continue
183            try:
184                attr = getattr(obj, attr_name)
185                # This might fail...
186                #IWAeUPCSVImporter(attr)
187                result.append(attr)
188            except:
189                pass
190        if hasattr(obj, 'values'):
191            try:
192                result.extend(obj.values())
193            except:
194                pass
195        return result
196
197    def getPossibleImports(self):
198        """Get list of possible imports.
199
200        A single import is defined as a tuple
201       
202          ( <file-descr>, <importers> )
203       
204        where ``<file-descr>`` is an `IDataCenterFile` object and
205        ``<importers>`` is a list of `IWAeUPCSVImporter` objects.
206        """
207        result = []
208        importers = self.getImporters()
209        for filedescr in self.getFiles():
210            possible_importers = []
211            for importer in importers:
212                if importer.csvfile.path != filedescr.context:
213                    continue
214                importer_context = (
215                    importer, self.getReceiverId(importer.receiver))
216                possible_importers.append(importer_context)
217            if len(possible_importers) == 0:
218                continue
219            result.append((filedescr, possible_importers))
220        return result
221
222    def getReceiverId(self, obj):
223        """Get a unique id for an object.
224
225        If the object is stored in ZODB it should contain a `_p_oid`
226        attribute, which is guaranteed to be unique over the ZODB.
227
228        If the object has no such attribute, then it will be held in
229        memory only and will vanish as soon as the request is over. In
230        this case we can use the memory address of it.
231
232        Both, the id() result and the ZODB oid, are actually integers
233        which we return as strings. To make the difference chrystal
234        clear, we prepend ``z`` to ids taken from ZODB oids.
235        """
236        if not hasattr(obj, '_p_oid'):
237            return str(id(obj))
238        oid = getattr(obj, '_p_oid')
239        if oid is None:
240            # The persistent object is not stored in the ZODB
241            return str(id(obj))
242        return 'z%s' % struct.unpack('>Q', oid)[0]
243
244    def doImport(self, path, receiverid, clear=None):
245        receivers = self.getReceiverIds()
246        if receiverid not in receivers.keys():
247            raise ValueError('The requested data receiver cannot be found.')
248        receiver = receivers[receiverid]
249        filewrapper = getCSVFile(path)
250        if filewrapper is None:
251            raise ValueError('Format of CSV file not supported.')
252        importer = getMultiAdapter((filewrapper, receiver), IWAeUPCSVImporter)
253        if clear is not None:
254            importer.doImport(clear_old_data=clear)
255        else:
256            importer.doImport()
257        return
258
259    def _moveFile(self, source, dest):
260        """Move file source to dest preserving ctime, mtime, etc.
261        """
262        if not os.path.exists(source):
263            self.logger.warn('No such source path: %s' % source)
264            return
265        if source == dest:
266            return
267        shutil.copyfile(source, dest)
268        shutil.copystat(source, dest)
269        os.unlink(source)
270   
271    def distProcessedFiles(self, successful, source_path, finished_file,
272                           pending_file, move_orig=True):
273        """Put processed files into final locations.
274
275        ``successful`` is a boolean that tells, whether processing was
276        successful.
277
278        ``source_path``: path to file that was processed.
279
280        ``finished_file``, ``pending_file``: paths to the respective
281        generated .pending and .finished file. The .pending file path
282        may be ``None``.
283
284        If finished file is placed in a location outside the local
285        storage dir, the complete directory is removed
286        afterwards. Regular importers should put their stuff in
287        dedicated temporary dirs.
288       
289        See datacenter.txt for more info about how this works.
290        """
291        basename = os.path.basename(source_path)
292        pending_name = basename
293        pending = False
294        finished_dir = os.path.join(self.storage, 'finished')
295        unfinished_dir = os.path.join(self.storage, 'unfinished')
296
297        if basename.endswith('.pending.csv'):
298            maybe_basename = "%s.csv" % basename.split('.', 2)[0]
299            maybe_src = os.path.join(unfinished_dir, maybe_basename)
300            if os.path.isfile(maybe_src):
301                basename = maybe_basename
302                pending = True
303               
304        base, ext = os.path.splitext(basename)
305        finished_name = "%s.finished%s" % (base, ext)
306        if not pending:
307            pending_name = "%s.pending%s" % (base, ext)
308
309        # Put .pending and .finished file into respective places...
310        pending_dest = os.path.join(self.storage, pending_name)
311        finished_dest = os.path.join(finished_dir, finished_name)
312        self._moveFile(finished_file, finished_dest)
313        if pending_file is not None:
314            self._moveFile(pending_file, pending_dest)
315
316        # Put source file into final location...
317        finished_dest = os.path.join(finished_dir, basename)
318        unfinished_dest = os.path.join(unfinished_dir, basename)
319        if successful and not pending:
320            self._moveFile(source_path, finished_dest)
321        elif successful and pending:
322            self._moveFile(unfinished_dest, finished_dest)
323            os.unlink(source_path)
324        elif not successful and not pending:
325            self._moveFile(source_path, unfinished_dest)
326
327        # If finished and pending-file were created in a location
328        # outside datacenter storage, we remove it.
329        maybe_temp_dir = os.path.dirname(finished_file)
330        if os.path.commonprefix(
331            [self.storage, maybe_temp_dir]) != self.storage:
332            shutil.rmtree(maybe_temp_dir)
333        return
334
335   
336class DataCenterFile(object):
337    """A description of a file stored in data center.
338    """
339    grok.implements(IDataCenterFile)
340   
341    def __init__(self, context):
342        self.context = context
343        self.name = os.path.basename(self.context)
344        self.size = self.getSize()
345        self.uploaddate = self.getDate()
346        self.lines = self.getLinesNumber()
347
348    def getDate(self):
349        """Get a human readable datetime representation.
350        """
351        date = datetime.fromtimestamp(os.path.getctime(self.context))
352        return date.strftime('%c')
353
354    def getTimeStamp(self):
355        """Get a (machine readable) timestamp.
356        """
357        return os.path.getctime(self.context)
358   
359    def getSize(self):
360        """Get a human readable file size.
361        """
362        bytesize = os.path.getsize(self.context)
363        size = "%s bytes" % bytesize
364        units = ['kb', 'MB', 'GB']
365        for power, unit in reversed(list(enumerate(units))):
366            power += 1
367            if bytesize >= 1024 ** power:
368                size = "%.2f %s" % (bytesize/(1024.0**power), unit)
369                break
370        return size
371
372    def getLinesNumber(self):
373        """Get number of lines.
374        """
375        num = 0
376        for line in open(self.context, 'rb'):
377            num += 1
378        return num
379   
380class LogFile(DataCenterFile):
381    """A description of a log file.
382    """
383    def __init__(self, context):
384        super(LogFile, self).__init__(context)
385        self._markers = dict()
386        self._parsed = False
387        self.userid = self.getUserId()
388        self.mode = self.getMode()
389        self.stats = self.getStats()
390        self.source = self.getSourcePath()
391
392    def _parseFile(self, maxline=10):
393        """Find markers in a file.
394        """
395        if self._parsed:
396            return
397        for line in open(self.context, 'rb'):
398            line = line.strip()
399            if not ':' in line:
400                continue
401            name, text = line.split(':', 1)
402            self._markers[name.lower()] = text
403        self._parsed = True
404        return
405
406    def _getMarker(self, marker):
407        marker = marker.lower()
408        if not self._parsed:
409            self._parseFile()
410        if marker in self._markers.keys():
411            return self._markers[marker]
412   
413    def getUserId(self):
414        return self._getMarker('user') or '<UNKNOWN>'
415
416    def getMode(self):
417        return self._getMarker('mode') or '<NOT SET>'
418
419    def getStats(self):
420        return self._getMarker('processed') or '<Info not avail.>'
421
422    def getSourcePath(self):
423        return self._getMarker('source') or None
424   
425class Import(object):
426    """Helper class to aggregate imports and their data.
427    """
428    def __init__(self, filedescr, importers):
429        self.file = filedescr
430        self.importers = []
431        for importer, receiverid in importers:
432            importer.receiverid = receiverid
433            self.importers.append(importer)
434
435class DataCenterStorageMovedEvent(ObjectEvent):
436    """An event fired, when datacenter storage moves.
437    """
438    grok.implements(IDataCenterStorageMovedEvent)
Note: See TracBrowser for help on using the repository browser.