source: main/waeup.kofa/trunk/src/waeup/kofa/utils/helpers.py @ 8626

Last change on this file since 8626 was 8466, checked in by uli, 13 years ago

Add helper funcs to determine fileformat of media files.

  • Property svn:keywords set to Id
File size: 20.0 KB
RevLine 
[7196]1## $Id: helpers.py 8466 2012-05-17 16:30:02Z uli $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
[7819]18"""General helper functions for Kofa.
[4188]19"""
[8185]20import datetime
[8466]21import imghdr
[4188]22import os
[8185]23import pytz
[4375]24import re
[4188]25import shutil
[5731]26import grok
[5848]27from cStringIO import StringIO
28from docutils.core import publish_string
[7943]29from zope.component import getUtility
[5731]30from zope.component.interfaces import IFactory
[5734]31from zope.interface import implementedBy
[7941]32from zope.interface.interface import Method, Attribute
[6071]33from zope.schema import getFieldNames
34from zope.schema.fieldproperty import FieldProperty
[6372]35from zope.security.interfaces import NoInteraction
36from zope.security.management import getInteraction
[7175]37from zope.pluggableauth.interfaces import IAuthenticatorPlugin
[4188]38
[6503]39BUFSIZE = 8 * 1024
[6372]40
[7186]41def remove_file_or_directory(filepath):
[4188]42    """Remove a file or directory.
[5738]43
44    Different to :func:`shutil.rmtree` we also accept not existing
45    paths (returning silently) and if a dir turns out to be a regular
46    file, we remove that.
[4188]47    """
48    filepath = os.path.abspath(filepath)
49    if not os.path.exists(filepath):
50        return
51    if os.path.isdir(filepath):
52        shutil.rmtree(filepath)
53    else:
54        os.unlink(filepath)
55    return
56
[7186]57def copy_filesystem_tree(src, dst, overwrite=False, del_old=False):
[4188]58    """Copy contents of directory src to directory dst.
59
60    Both directories must exists.
61
62    If `overwrite` is true, any same named objects will be
63    overwritten. Otherwise these files will not be touched.
64
65    If `del_old` is true, copied files and directories will be removed
66    from the src directory.
67
68    This functions returns a list of non-copied files.
69
70    Unix hidden files and directories (starting with '.') are not
71    processed by this function.
72    """
73    if not os.path.exists(src):
74        raise ValueError('source path does not exist: %s' % src)
75    if not os.path.exists(dst):
76        raise ValueError('destination path does not exist: %s' % dst)
77    if not os.path.isdir(src):
78        raise ValueError('source path is not a directory: %s' % src)
79    if not os.path.isdir(dst):
80        raise ValueError('destination path is not a directory: %s' % dst)
81    not_copied = []
82    for item in os.listdir(src):
83        if item.startswith('.'):
84            continue # We do not copy hidden stuff...
85        itemsrc = os.path.join(src, item)
86        itemdst = os.path.join(dst, item)
87
88        if os.path.exists(itemdst):
89            if overwrite is True:
[7186]90                remove_file_or_directory(itemdst)
[4188]91            else:
92                not_copied.append(item)
93                continue
[6113]94
[4188]95        if os.path.isdir(itemsrc):
96            shutil.copytree(itemsrc, itemdst)
97        else:
98            shutil.copy2(itemsrc, itemdst)
99        if del_old:
[7186]100            remove_file_or_directory(itemsrc)
[4188]101    return not_copied
[4375]102
103
[7186]104def get_inner_HTML_part(html_code):
[4375]105    """Return the 'inner' part of a complete HTML snippet.
106
107    If there is a form part, get this.
108
109    If there is no form part, try to return the body part contents.
110
111    If there is no body, return as-is.
[5738]112
113    Let's see how that works. If we deliver some doc with form, we
114    will get that form only:
115
116       >>> doc = '<html><form>My Form</form>Outside the form</html>'
[7186]117       >>> get_inner_HTML_part(doc)
[5738]118       '<form>My Form</form>'
119
120    No form? Then seek for a body part and get the contents:
121
122       >>> doc = '<html><body>My Body</body>Trailing Trash</html>'
[7186]123       >>> get_inner_HTML_part(doc)
[5738]124       'My Body'
125
126    If none of these is included, return what we got:
127
128       >>> doc = '<html>without body nor form</html>'
[7186]129       >>> get_inner_HTML_part(doc)
[5738]130       '<html>without body nor form</html>'
131
[4375]132    """
133
134    try:
[5738]135        result = re.match('^.+(<form[^\>]*>.*</form>).+$', html_code,
[4375]136                          re.DOTALL).groups()[0]
137        return result
138    except AttributeError:
139        # No <form> part included
140        try:
141            result = re.match('^.+<body[^\>]*>(.*)</body>.*$', html_code,
142                              re.DOTALL).groups()[0]
143            return result
144        except AttributeError:
145            # No <form> and no <body> tag...
146            pass
147    return html_code
148
[5731]149class FactoryBase(grok.GlobalUtility):
150    """A factory for things.
151
152    This is a baseclass for easier creation of factories. Factories
153    are utilities that are registered under a certain name and return
154    instances of certain classes when called.
155
[7811]156    In :mod:`waeup.kofa` we use factories extensively for
[7933]157    batching. While processing a batch some processors looks up a
[5731]158    factory to create real-world instances that then get filled with
159    data from imported CSV files.
160
161    To get rid of reimplementing the same stuff over and over again,
162    most notably the methods defined here, we offer this base class
163    (which will *not* be registered as a factory itself).
164
165    Real factories can then be created like this:
166
167       >>> import grok
[7811]168       >>> from waeup.kofa.utils.helpers import FactoryBase
[5731]169       >>> class MyObject(object):
170       ...   # Some class we want to get instances of.
171       ...   pass
172       >>> class MyObjectFactory(FactoryBase):
173       ...   # This is the factory for MyObject instances
[7811]174       ...   grok.name(u'waeup.kofa.factory.MyObject')
[5731]175       ...   factory = MyObject
176
177    That's it. It is essential to set the ``factory`` attribute, which
178    will determine the class of which instances should be created when
179    called. The given name must even be unique amongst all utilities
180    registered during runtime. While you can pick any name you like
[7811]181    you might want to prepend ``waeup.kofa.factory.`` to the name
[5731]182    string to make sure it does not clash with names of other
183    utilities one day.
184
185    Before all this works we have to grok the baseclass once and our
186    freshly defined factory. This executes all the component
187    registration stuff we don't want to do ourselves. In daily use
[7811]188    this is done automatically on startup of a :mod:`waeup.kofa`
[5731]189    system.
[6113]190
[7811]191       >>> grok.testing.grok('waeup.kofa.utils.helpers')
[5731]192       >>> grok.testing.grok_component(
193       ...    'MyObjectFactory', MyObjectFactory
194       ...  )
195       True
196
[7933]197    After grokking we (and processors) can create objects without
[5731]198    knowing about the location of the real class definition, just by
199    the factory name:
200
201       >>> from zope.component import createObject
[7811]202       >>> obj = createObject('waeup.kofa.factory.MyObject')
[5731]203       >>> isinstance(obj, MyObject)
204       True
205
206    We can also use the regular utility lookups to find our new
207    factory:
208
209       >>> from zope.component import getUtility
210       >>> from zope.component.interfaces import IFactory
211       >>> factory = getUtility(
[7811]212       ...   IFactory, name='waeup.kofa.factory.MyObject'
[5731]213       ...   )
214       >>> isinstance(factory, MyObjectFactory)
215       True
216
217    And this factory generates `MyObject` instances:
218
219       >>> obj = factory()
220       >>> isinstance(obj, MyObject)
221       True
222
223    """
224    grok.baseclass() # Do not grok this class, do not register us.
225    grok.implements(IFactory)
226    # You can override any of the following attributes in derived
227    # classes. The `grok.name` setting *must* even be set to some
228    # unique value.
229    grok.name(u'waeup.Factory')
230    title = u"Create instances of ``factory``.",
231    description = u"This factory instantiates new applicant instances."
232    factory = None
233
234    def __call__(self, *args, **kw):
235        """The main factory function.
236
237        Returns an instance of the requested object.
238        """
239        return self.factory()
240
241    def getInterfaces(self):
242        # Required by IFactory
243        return implementedBy(self.factory)
[5848]244
245def ReST2HTML_w_warnings(source_string):
246    """Convert a reStructuredText string to HTML preserving warnings.
247
248    Returns a tuple ``(<HTML_CODE>, <WARNINGS>)``, both being
249    strings. Where ``<HTML_CODE>`` is the HTML code generated from the
[5876]250    source string (in unicode), ``<WARNINGS>`` is a string containing
251    any warning messages or ``None``.
[6113]252
[5848]253    Regular multi-line ReStructuredText strings will be returned as
254    HTML code:
255
[7811]256        >>> from waeup.kofa.utils.helpers import ReST2HTML
[5848]257        >>> source = '''
258        ... Headline
259        ... ========
260        ...
261        ... - A list item
262        ... - Another item
263        ...
264        ... Thanks for watching!
265        ... '''
266        >>> html, warnings = ReST2HTML_w_warnings(source)
267        >>> print html
268        <div class="document" id="headline">
269        <h1 class="title">Headline</h1>
270        <BLANKLINE>
271        <ul class="simple">
272        <li>A list item</li>
273        <li>Another item</li>
274        </ul>
275        <p>Thanks for watching!</p>
276        </div>
277
278    Here no warnings happened, so the `warnings` are ``None``:
279
280        >>> warnings is None
281        True
[6113]282
[5848]283    If warnings happen then they can be retrieved in the returned
284    ``warnings``. We try to render an erraneous document:
285
286        >>> source = '''
287        ... Headline
288        ... ======
289        ...
290        ... Thanks for watching!
291        ... '''
292        >>> html, warnings = ReST2HTML_w_warnings(source)
293        >>> print html
294        <div class="document" id="headline">
295        <h1 class="title">Headline</h1>
296        <BLANKLINE>
297        <p>Thanks for watching!</p>
298        </div>
299
300        >>> print warnings
301        <string>:3: (WARNING/2) Title underline too short.
302        <BLANKLINE>
303        Headline
304        ======
305        <BLANKLINE>
306
307    As you can see, the warnings are not displayed inline the document
308    but can be retrieved from the returned warnings, which is a string
309    or ``None``.
310    """
311    warnings = StringIO()
312    fulldoc = publish_string(
313        source_string, writer_name='html4css1',
314        settings_overrides={
315            'report_level': 0,
316            'warning_stream': warnings,
317            })
318    warnings.seek(0)
319    warning_msgs = warnings.read()
320    if warning_msgs:
321        # Render again, this time with no warnings inline...
322        fulldoc =  publish_string(
323        source_string, writer_name='html4css1',
324        settings_overrides={
325            'report_level': 10000,
326            'halt_level': 10000,
327            'warning_stream': warnings,
328            })
329    if warning_msgs == '':
330        warning_msgs = None
[7186]331    result = get_inner_HTML_part(fulldoc).strip()
[5876]332    if not isinstance(result, unicode):
333        result = result.decode('utf-8')
334    return result, warning_msgs
[5848]335
336def ReST2HTML(source_string):
337    """Render a string containing ReStructuredText to HTML.
338
339    Any warnings about too short headings, etc. are silently
340    discarded. Use :func:`ReST2HTML_w_warnings` if you want to get any
341    warnings.
342
[5876]343    The returned string will be unicode.
[6113]344
[5848]345    A regular document will be rendered like this:
346
347        >>> source = '''
348        ... Headline
349        ... ========
350        ...
351        ... Thanks for watching!
352        ... '''
353        >>> html = ReST2HTML(source)
354        >>> print html
355        <div class="document" id="headline">
356        <h1 class="title">Headline</h1>
357        <BLANKLINE>
358        <p>Thanks for watching!</p>
359        </div>
360
361    A document with markup problems (here: the underline is too short)
362    will look similar:
363
364        >>> source = '''
365        ... Headline
366        ... ======
367        ...
368        ... Thanks for watching!
369        ... '''
370        >>> html = ReST2HTML(source)
371        >>> print html
372        <div class="document" id="headline">
373        <h1 class="title">Headline</h1>
374        <BLANKLINE>
375        <p>Thanks for watching!</p>
376        </div>
[6113]377
[5848]378    """
379    html, warnings = ReST2HTML_w_warnings(source_string)
380    return html
[6071]381
382def attrs_to_fields(cls):
383    """Turn the attributes of a class into FieldProperty instances.
[6113]384
385    With Python >= 2.6 we can even use this function as a class decorator.
[6071]386    """
387    iface = list(implementedBy(cls))[0]
388    for field_name in getFieldNames(iface):
389        setattr(cls, field_name, FieldProperty(iface[field_name]))
390    return cls
[6372]391
392def get_current_principal():
393    """Get the 'current' principal.
394
395    This method works without a request. Examining a request is the
396    regular (and recommended) way to get a principal involved
397    'currently'.
398
399    Use this method only if you really have no access to the current
400    request.
401
402    Returns ``None`` when no principal is involved (for instance
403    during tests).
404    """
405    try:
406        principal = getInteraction().participations[0].principal
407    except NoInteraction:
408        return None
409    except IndexError: # No participations present
410        return None
411    return principal
[6503]412
413def cmp_files(file_descr1, file_descr2):
414    """Compare two files by their file descriptors.
415
416    Returns ``True`` if both are equal, ``False`` otherwise.
417    """
[6531]418    file_descr1.seek(0)
419    file_descr2.seek(0)
[6503]420    while True:
421        b1 = file_descr1.read(BUFSIZE)
422        b2 = file_descr2.read(BUFSIZE)
423        if b1 != b2:
424            return False
425        if not b1:
426            return True
[7078]427
428def string_from_bytes(number):
429    """Turn a number into some textual representation.
430
431      Examples:
432
433        >>> string_from_bytes(1)
434        u'1 byte(s)'
435
436        >>> string_from_bytes(1025)
437        u'1 KB'
438
439        >>> string_from_bytes(1.5 * 1024*1024)
440        u'1.50 MB'
441
442        >>> string_from_bytes(673.286 * 1024**3)
443        u'673.29 GB'
444
445    """
446    if number < 1024:
447        return u'%s byte(s)' % (str(number),)
448    elif number < 1024**2:
449        return u'%s KB' % (number / 1024,)
450    elif number < 1024**3:
451        return u'%.2f MB' % (number / 1024**2,)
452    return u'%.2f GB' % (number / 1024**3,)
[7079]453
454def file_size(file_like_obj):
455    """Determine file size in most effective manner.
456
457    Returns the number of bytes in a file. This function works for
458    both, real files as well as file-like objects like cStringIO based
459    'files'.
460
461    Example:
462
463      >>> from cStringIO import StringIO
464      >>> file_size(StringIO('my file content'))
465      15
466
467    Please note that this function expects the file-like object passed
468    in to be at first reading position (it does no seek(0)) and that
469    when finished the file pointer might be at end of file.
470    """
471    if hasattr(file_like_obj, 'fileno'):
472        return os.fstat(file_like_obj.fileno())[6]
473    file_like_obj.seek(0, 2) # seek to last position in file
474    return file_like_obj.tell()
[7175]475
476def get_user_account(request):
477    """Return local user account.
478    """
479    principal_id = request.principal.id
[7234]480    authenticator = getUtility(IAuthenticatorPlugin, name='users')
481    account = authenticator.getAccount(principal_id)
[7175]482    return account
[7941]483
484def iface_names(iface, omit=[], exclude_attribs=True, exclude_methods=True):
485    """Get all attribute names of an interface.
486
487    Searches also base interfaces.
488
489    Names of fields that are pure attributes
490    (i.e. zope.interface.Attribute) or methods are excluded by
491    default.
492
493    Names of typical fields derived from zope.schema are included.
494
495    The `omit` paramter can give a list of names to exclude.
496
497    Returns an unsorted list of strings.
498    """
[8370]499    ifaces = set((iface,))
500    # Collect all interfaces (also bases) recursively
501    while True:
502        ext_ifaces = set(ifaces)
503        for iface in ext_ifaces:
504            ext_ifaces = set.union(ext_ifaces, set(iface.getBases()))
505        if ext_ifaces == ifaces:
506            # No new interfaces found, list complete
507            break
508        ifaces = ext_ifaces
509    # Collect (filtered) names of collected interfaces
[7941]510    result = []
[8370]511    for iface in ifaces:
512        for name, descr in iface.namesAndDescriptions():
513            if name in omit:
514                continue
515            if exclude_attribs and descr.__class__ is Attribute:
516                continue
517            if exclude_methods and isinstance(descr, Method):
518                continue
519            result.append(name)
[7941]520    return result
[7968]521
522def get_sorted_preferred(tuples_iterable, preferred_list):
523    """Get a list of tuples (<TITLE>,<TOKEN>) with values in
524    `preferred_list` put in front.
525
526    The rest of the tuples iterable is returned in orginal order. This
527    is useful for putting default entries on top of (already sorted)
528    lists of choice values, for instance when sorting countries and
529    their code.
530
531    Sample:
532
533    We have a list of tuples with uppercase 'titles' and lowercase
534    'tokens'. This list is already sorted but we want certain values
535    of this list to show up before other values. For instance we want
536    to see the 'C' entry to come first.
537
538      >>> get_sorted_preferred([('A','a'), ('B','b'), ('C','c')],
539      ...                       ['c'])
540      (('C', 'c'), ('A', 'a'), ('B', 'b'))
541
542    i.e. the entry with 'c' as second value moved to head of result.
543
544    We can also require multiple entries at head of list:
545
546      >>> get_sorted_preferred([('A','a'), ('B','b'), ('C','c')],
547      ...                       ['b', 'c'])
548      (('B', 'b'), ('C', 'c'), ('A', 'a'))
549
550    We required the 'b' entry to come before the 'c' entry and then
551    the rest of the input list. That's what we got.
552
553    The result is returned as a tuple of tuples to keep order of values.
554    """
555    result = [None for x in preferred_list]
556    for title, code in tuples_iterable:
557        if code in preferred_list:
558            index = preferred_list.index(code)
559            result[index] = (title, code)
560        else:
561            result.append((title, code))
562    return tuple(result)
[8185]563
564def now(tz=None):
565    """Get current datetime in timezone of `tz`.
566
567    If `tz`, a `tzinfo` instance, is None, UTC time is returned.
568
569    `tz` should be a timezone as defined in pytz.
570    """
571    return to_timezone(datetime.datetime.utcnow(), tz=tz)
572
573def to_timezone(dt, tz=None):
574    """Shift datetime into timezone `tz`.
575
576    If datetime `dt` contains no `tzinfo` (i.e. it is 'naive'), it is
577    assumed to be UTC.
578
579    If no `tz` is given, shift to UTC is performed.
[8192]580
581    If `dt` is not a datetime.datetime, the input value is returned
582    unchanged.
[8185]583    """
[8192]584    if not isinstance(dt, datetime.datetime):
585        return dt
[8185]586    if tz is None:
587        tz = pytz.utc
588    if dt.tzinfo is None:
589        dt = pytz.utc.localize(dt)
590    return tz.normalize(dt.tzinfo.normalize(dt).astimezone(tz))
[8466]591
592def get_fileformat(path, bytestream=None):
593    """Try to determine the file format of a given media file.
594
595    Although checks done here are not done very thoroughly, they make
596    no assumptions about the filetype by looking at its filename
597    extension or similar. Instead they check header data to comply
598    with common known rules (Magic Words).
599
600    If bytestream is not `None` the `path` is ignored.
601
602    Returns filetype as string (something like ``'jpg'``) if
603    file-format can be recognized, ``None`` else.
604
605    Tested recognized filetypes currently are `jpg`, `png`, and `pdf`.
606
607    More filetypes (though untested in waeup.kofa) are automatically
608    recognized because we deploy the stdlib `imghdr` library. See this
609    module's docs for a complete list of filetypes recognized.
610    """
611    if path is None and bytestream is None:
612        return None
613
614    img_type = None
615    if bytestream is not None:
616        img_type = imghdr.what(path, bytestream)
617    else:
618        img_type = imghdr.what(path)
619    for name, replacement in (('jpeg', 'jpg'), ('tiff', 'tif')):
620        if img_type == name:
621            img_type = replacement
622    return img_type
623
624def check_pdf(bytestream, file):
625    """Tell whether a file or bytestream is a PDF file.
626
627    Works as a test/plugin for the stdlib `imghdr` library.
628    """
629    if file is not None:
630        file.seek(0)
631        bytestream = file.read(4)
632        file.seek(0)
633
634    if bytestream.startswith('%PDF'):
635        return 'pdf'
636    return None
637
638# register check_pdf as header check function with `imghdr`
639if check_pdf not in imghdr.tests:
640    imghdr.tests.append(check_pdf)
Note: See TracBrowser for help on using the repository browser.