source: main/waeup.kofa/trunk/src/waeup/kofa/utils/helpers.py @ 10787

Last change on this file since 10787 was 10678, checked in by uli, 11 years ago

pyflakes.

  • Property svn:keywords set to Id
File size: 23.0 KB
Line 
1## $Id: helpers.py 10678 2013-10-31 17:58:50Z uli $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
18"""General helper functions for Kofa.
19"""
20import unicodecsv as csv  # XXX: csv ops should move to dedicated module.
21import datetime
22import imghdr
23import logging
24import os
25import pytz
26import re
27import shutil
28import tempfile
29import grok
30from cStringIO import StringIO
31from docutils.core import publish_string
32from zope.component import getUtility
33from zope.component.interfaces import IFactory
34from zope.interface import implementedBy
35from zope.interface.interface import Method, Attribute
36from zope.schema import getFieldNames
37from zope.schema.fieldproperty import FieldProperty
38from zope.security.interfaces import NoInteraction
39from zope.security.management import getInteraction
40from zope.pluggableauth.interfaces import IAuthenticatorPlugin
41
42BUFSIZE = 8 * 1024
43
44
45def remove_file_or_directory(filepath):
46    """Remove a file or directory.
47
48    Different to :func:`shutil.rmtree` we also accept not existing
49    paths (returning silently) and if a dir turns out to be a regular
50    file, we remove that.
51    """
52    filepath = os.path.abspath(filepath)
53    if not os.path.exists(filepath):
54        return
55    if os.path.isdir(filepath):
56        shutil.rmtree(filepath)
57    else:
58        os.unlink(filepath)
59    return
60
61
62def copy_filesystem_tree(src, dst, overwrite=False, del_old=False):
63    """Copy contents of directory src to directory dst.
64
65    Both directories must exists.
66
67    If `overwrite` is true, any same named objects will be
68    overwritten. Otherwise these files will not be touched.
69
70    If `del_old` is true, copied files and directories will be removed
71    from the src directory.
72
73    This functions returns a list of non-copied files.
74
75    Unix hidden files and directories (starting with '.') are not
76    processed by this function.
77    """
78    if not os.path.exists(src):
79        raise ValueError('source path does not exist: %s' % src)
80    if not os.path.exists(dst):
81        raise ValueError('destination path does not exist: %s' % dst)
82    if not os.path.isdir(src):
83        raise ValueError('source path is not a directory: %s' % src)
84    if not os.path.isdir(dst):
85        raise ValueError('destination path is not a directory: %s' % dst)
86    not_copied = []
87    for item in os.listdir(src):
88        if item.startswith('.'):
89            continue  # We do not copy hidden stuff...
90        itemsrc = os.path.join(src, item)
91        itemdst = os.path.join(dst, item)
92
93        if os.path.exists(itemdst):
94            if overwrite is True:
95                remove_file_or_directory(itemdst)
96            else:
97                not_copied.append(item)
98                continue
99
100        if os.path.isdir(itemsrc):
101            shutil.copytree(itemsrc, itemdst)
102        else:
103            shutil.copy2(itemsrc, itemdst)
104        if del_old:
105            remove_file_or_directory(itemsrc)
106    return not_copied
107
108
109def get_inner_HTML_part(html_code):
110    """Return the 'inner' part of a complete HTML snippet.
111
112    If there is a form part, get this.
113
114    If there is no form part, try to return the body part contents.
115
116    If there is no body, return as-is.
117
118    Let's see how that works. If we deliver some doc with form, we
119    will get that form only:
120
121       >>> doc = '<html><form>My Form</form>Outside the form</html>'
122       >>> get_inner_HTML_part(doc)
123       '<form>My Form</form>'
124
125    No form? Then seek for a body part and get the contents:
126
127       >>> doc = '<html><body>My Body</body>Trailing Trash</html>'
128       >>> get_inner_HTML_part(doc)
129       'My Body'
130
131    If none of these is included, return what we got:
132
133       >>> doc = '<html>without body nor form</html>'
134       >>> get_inner_HTML_part(doc)
135       '<html>without body nor form</html>'
136
137    """
138
139    try:
140        result = re.match('^.+(<form[^\>]*>.*</form>).+$', html_code,
141                          re.DOTALL).groups()[0]
142        return result
143    except AttributeError:
144        # No <form> part included
145        try:
146            result = re.match('^.+<body[^\>]*>(.*)</body>.*$', html_code,
147                              re.DOTALL).groups()[0]
148            return result
149        except AttributeError:
150            # No <form> and no <body> tag...
151            pass
152    return html_code
153
154
155class FactoryBase(grok.GlobalUtility):
156    """A factory for things.
157
158    This is a baseclass for easier creation of factories. Factories
159    are utilities that are registered under a certain name and return
160    instances of certain classes when called.
161
162    In :mod:`waeup.kofa` we use factories extensively for
163    batching. While processing a batch some processors looks up a
164    factory to create real-world instances that then get filled with
165    data from imported CSV files.
166
167    To get rid of reimplementing the same stuff over and over again,
168    most notably the methods defined here, we offer this base class
169    (which will *not* be registered as a factory itself).
170
171    Real factories can then be created like this:
172
173       >>> import grok
174       >>> from waeup.kofa.utils.helpers import FactoryBase
175       >>> class MyObject(object):
176       ...   # Some class we want to get instances of.
177       ...   pass
178       >>> class MyObjectFactory(FactoryBase):
179       ...   # This is the factory for MyObject instances
180       ...   grok.name(u'waeup.kofa.factory.MyObject')
181       ...   factory = MyObject
182
183    That's it. It is essential to set the ``factory`` attribute, which
184    will determine the class of which instances should be created when
185    called. The given name must even be unique amongst all utilities
186    registered during runtime. While you can pick any name you like
187    you might want to prepend ``waeup.kofa.factory.`` to the name
188    string to make sure it does not clash with names of other
189    utilities one day.
190
191    Before all this works we have to grok the baseclass once and our
192    freshly defined factory. This executes all the component
193    registration stuff we don't want to do ourselves. In daily use
194    this is done automatically on startup of a :mod:`waeup.kofa`
195    system.
196
197       >>> grok.testing.grok('waeup.kofa.utils.helpers')
198       >>> grok.testing.grok_component(
199       ...    'MyObjectFactory', MyObjectFactory
200       ...  )
201       True
202
203    After grokking we (and processors) can create objects without
204    knowing about the location of the real class definition, just by
205    the factory name:
206
207       >>> from zope.component import createObject
208       >>> obj = createObject('waeup.kofa.factory.MyObject')
209       >>> isinstance(obj, MyObject)
210       True
211
212    We can also use the regular utility lookups to find our new
213    factory:
214
215       >>> from zope.component import getUtility
216       >>> from zope.component.interfaces import IFactory
217       >>> factory = getUtility(
218       ...   IFactory, name='waeup.kofa.factory.MyObject'
219       ...   )
220       >>> isinstance(factory, MyObjectFactory)
221       True
222
223    And this factory generates `MyObject` instances:
224
225       >>> obj = factory()
226       >>> isinstance(obj, MyObject)
227       True
228
229    """
230    grok.baseclass()  # Do not grok this class, do not register us.
231    grok.implements(IFactory)
232    # You can override any of the following attributes in derived
233    # classes. The `grok.name` setting *must* even be set to some
234    # unique value.
235    grok.name(u'waeup.Factory')
236    title = u"Create instances of ``factory``.",
237    description = u"This factory instantiates new applicant instances."
238    factory = None
239
240    def __call__(self, *args, **kw):
241        """The main factory function.
242
243        Returns an instance of the requested object.
244        """
245        return self.factory()
246
247    def getInterfaces(self):
248        # Required by IFactory
249        return implementedBy(self.factory)
250
251
252def ReST2HTML_w_warnings(source_string):
253    """Convert a reStructuredText string to HTML preserving warnings.
254
255    Returns a tuple ``(<HTML_CODE>, <WARNINGS>)``, both being
256    strings. Where ``<HTML_CODE>`` is the HTML code generated from the
257    source string (in unicode), ``<WARNINGS>`` is a string containing
258    any warning messages or ``None``.
259
260    Regular multi-line ReStructuredText strings will be returned as
261    HTML code:
262
263        >>> from waeup.kofa.utils.helpers import ReST2HTML
264        >>> source = '''
265        ... Headline
266        ... ========
267        ...
268        ... - A list item
269        ... - Another item
270        ...
271        ... Thanks for watching!
272        ... '''
273        >>> html, warnings = ReST2HTML_w_warnings(source)
274        >>> print html
275        <div class="document" id="headline">
276        <h1 class="title">Headline</h1>
277        <BLANKLINE>
278        <ul class="simple">
279        <li>A list item</li>
280        <li>Another item</li>
281        </ul>
282        <p>Thanks for watching!</p>
283        </div>
284
285    Here no warnings happened, so the `warnings` are ``None``:
286
287        >>> warnings is None
288        True
289
290    If warnings happen then they can be retrieved in the returned
291    ``warnings``. We try to render an erraneous document:
292
293        >>> source = '''
294        ... Headline
295        ... ======
296        ...
297        ... Thanks for watching!
298        ... '''
299        >>> html, warnings = ReST2HTML_w_warnings(source)
300        >>> print html
301        <div class="document" id="headline">
302        <h1 class="title">Headline</h1>
303        <BLANKLINE>
304        <p>Thanks for watching!</p>
305        </div>
306
307        >>> print warnings
308        <string>:3: (WARNING/2) Title underline too short.
309        <BLANKLINE>
310        Headline
311        ======
312        <BLANKLINE>
313
314    As you can see, the warnings are not displayed inline the document
315    but can be retrieved from the returned warnings, which is a string
316    or ``None``.
317    """
318    warnings = StringIO()
319    fulldoc = publish_string(
320        source_string, writer_name='html4css1',
321        settings_overrides={
322            'report_level': 0,
323            'warning_stream': warnings,
324            })
325    warnings.seek(0)
326    warning_msgs = warnings.read()
327    if warning_msgs:
328        # Render again, this time with no warnings inline...
329        fulldoc = publish_string(
330        source_string, writer_name='html4css1',
331        settings_overrides={
332            'report_level': 10000,
333            'halt_level': 10000,
334            'warning_stream': warnings,
335            })
336    if warning_msgs == '':
337        warning_msgs = None
338    result = get_inner_HTML_part(fulldoc).strip()
339    if not isinstance(result, unicode):
340        result = result.decode('utf-8')
341    return result, warning_msgs
342
343
344def ReST2HTML(source_string):
345    """Render a string containing ReStructuredText to HTML.
346
347    Any warnings about too short headings, etc. are silently
348    discarded. Use :func:`ReST2HTML_w_warnings` if you want to get any
349    warnings.
350
351    The returned string will be unicode.
352
353    A regular document will be rendered like this:
354
355        >>> source = '''
356        ... Headline
357        ... ========
358        ...
359        ... Thanks for watching!
360        ... '''
361        >>> html = ReST2HTML(source)
362        >>> print html
363        <div class="document" id="headline">
364        <h1 class="title">Headline</h1>
365        <BLANKLINE>
366        <p>Thanks for watching!</p>
367        </div>
368
369    A document with markup problems (here: the underline is too short)
370    will look similar:
371
372        >>> source = '''
373        ... Headline
374        ... ======
375        ...
376        ... Thanks for watching!
377        ... '''
378        >>> html = ReST2HTML(source)
379        >>> print html
380        <div class="document" id="headline">
381        <h1 class="title">Headline</h1>
382        <BLANKLINE>
383        <p>Thanks for watching!</p>
384        </div>
385
386    """
387    html, warnings = ReST2HTML_w_warnings(source_string)
388    return html
389
390
391def attrs_to_fields(cls, omit=[]):
392    """Turn the attributes of a class into FieldProperty instances.
393
394    With Python >= 2.6 we can even use this function as a class decorator.
395
396    `omit` is a list of field names that should _not_ be turned into
397    field properties. This is useful for properties and the like.
398    """
399    iface = list(implementedBy(cls))[0]
400    for field_name in getFieldNames(iface):
401        if field_name in omit:
402            continue
403        setattr(cls, field_name, FieldProperty(iface[field_name]))
404    return cls
405
406
407def get_current_principal():
408    """Get the 'current' principal.
409
410    This method works without a request. Examining a request is the
411    regular (and recommended) way to get a principal involved
412    'currently'.
413
414    Use this method only if you really have no access to the current
415    request.
416
417    Returns ``None`` when no principal is involved (for instance
418    during tests).
419    """
420    try:
421        principal = getInteraction().participations[0].principal
422    except NoInteraction:
423        return None
424    except IndexError:  # No participations present
425        return None
426    return principal
427
428
429def cmp_files(file_descr1, file_descr2):
430    """Compare two files by their file descriptors.
431
432    Returns ``True`` if both are equal, ``False`` otherwise.
433    """
434    file_descr1.seek(0)
435    file_descr2.seek(0)
436    while True:
437        b1 = file_descr1.read(BUFSIZE)
438        b2 = file_descr2.read(BUFSIZE)
439        if b1 != b2:
440            return False
441        if not b1:
442            return True
443
444
445def string_from_bytes(number):
446    """Turn a number into some textual representation.
447
448      Examples:
449
450        >>> string_from_bytes(1)
451        u'1 byte(s)'
452
453        >>> string_from_bytes(1025)
454        u'1 KB'
455
456        >>> string_from_bytes(1.5 * 1024*1024)
457        u'1.50 MB'
458
459        >>> string_from_bytes(673.286 * 1024**3)
460        u'673.29 GB'
461
462    """
463    if number < 1024:
464        return u'%s byte(s)' % (str(number),)
465    elif number < 1024 ** 2:
466        return u'%s KB' % (number / 1024,)
467    elif number < 1024 ** 3:
468        return u'%.2f MB' % (number / 1024 ** 2,)
469    return u'%.2f GB' % (number / 1024 ** 3,)
470
471
472def file_size(file_like_obj):
473    """Determine file size in most effective manner.
474
475    Returns the number of bytes in a file. This function works for
476    both, real files as well as file-like objects like cStringIO based
477    'files'.
478
479    Example:
480
481      >>> from cStringIO import StringIO
482      >>> file_size(StringIO('my file content'))
483      15
484
485    Please note that this function expects the file-like object passed
486    in to be at first reading position (it does no seek(0)) and that
487    when finished the file pointer might be at end of file.
488    """
489    if hasattr(file_like_obj, 'fileno'):
490        return os.fstat(file_like_obj.fileno())[6]
491    file_like_obj.seek(0, 2)  # seek to last position in file
492    return file_like_obj.tell()
493
494
495def get_user_account(request):
496    """Return local user account.
497    """
498    principal_id = request.principal.id
499    authenticator = getUtility(IAuthenticatorPlugin, name='users')
500    account = authenticator.getAccount(principal_id)
501    return account
502
503
504def iface_names(iface, omit=[], exclude_attribs=True, exclude_methods=True):
505    """Get all attribute names of an interface.
506
507    Searches also base interfaces.
508
509    Names of fields that are pure attributes
510    (i.e. zope.interface.Attribute) or methods are excluded by
511    default.
512
513    Names of typical fields derived from zope.schema are included.
514
515    The `omit` paramter can give a list of names to exclude.
516
517    Returns an unsorted list of strings.
518    """
519    ifaces = set((iface,))
520    # Collect all interfaces (also bases) recursively
521    while True:
522        ext_ifaces = set(ifaces)
523        for iface in ext_ifaces:
524            ext_ifaces = set.union(ext_ifaces, set(iface.getBases()))
525        if ext_ifaces == ifaces:
526            # No new interfaces found, list complete
527            break
528        ifaces = ext_ifaces
529    # Collect (filtered) names of collected interfaces
530    result = []
531    for iface in ifaces:
532        for name, descr in iface.namesAndDescriptions():
533            if name in omit:
534                continue
535            if exclude_attribs and descr.__class__ is Attribute:
536                continue
537            if exclude_methods and isinstance(descr, Method):
538                continue
539            if name in result:
540                continue
541            result.append(name)
542    return result
543
544
545def get_sorted_preferred(tuples_iterable, preferred_list):
546    """Get a list of tuples (<TITLE>,<TOKEN>) with values in
547    `preferred_list` put in front.
548
549    The rest of the tuples iterable is returned in orginal order. This
550    is useful for putting default entries on top of (already sorted)
551    lists of choice values, for instance when sorting countries and
552    their code.
553
554    Sample:
555
556    We have a list of tuples with uppercase 'titles' and lowercase
557    'tokens'. This list is already sorted but we want certain values
558    of this list to show up before other values. For instance we want
559    to see the 'C' entry to come first.
560
561      >>> get_sorted_preferred([('A','a'), ('B','b'), ('C','c')],
562      ...                       ['c'])
563      (('C', 'c'), ('A', 'a'), ('B', 'b'))
564
565    i.e. the entry with 'c' as second value moved to head of result.
566
567    We can also require multiple entries at head of list:
568
569      >>> get_sorted_preferred([('A','a'), ('B','b'), ('C','c')],
570      ...                       ['b', 'c'])
571      (('B', 'b'), ('C', 'c'), ('A', 'a'))
572
573    We required the 'b' entry to come before the 'c' entry and then
574    the rest of the input list. That's what we got.
575
576    The result is returned as a tuple of tuples to keep order of values.
577    """
578    result = [None for x in preferred_list]
579    for title, code in tuples_iterable:
580        if code in preferred_list:
581            index = preferred_list.index(code)
582            result[index] = (title, code)
583        else:
584            result.append((title, code))
585    return tuple(result)
586
587
588def now(tz=None):
589    """Get current datetime in timezone of `tz`.
590
591    If `tz`, a `tzinfo` instance, is None, UTC time is returned.
592
593    `tz` should be a timezone as defined in pytz.
594    """
595    return to_timezone(datetime.datetime.utcnow(), tz=tz)
596
597
598def to_timezone(dt, tz=None):
599    """Shift datetime into timezone `tz`.
600
601    If datetime `dt` contains no `tzinfo` (i.e. it is 'naive'), it is
602    assumed to be UTC.
603
604    If no `tz` is given, shift to UTC is performed.
605
606    If `dt` is not a datetime.datetime, the input value is returned
607    unchanged.
608    """
609    if not isinstance(dt, datetime.datetime):
610        return dt
611    if tz is None:
612        tz = pytz.utc
613    if dt.tzinfo is None:
614        dt = pytz.utc.localize(dt)
615    return tz.normalize(dt.tzinfo.normalize(dt).astimezone(tz))
616
617
618def get_fileformat(path, bytestream=None):
619    """Try to determine the file format of a given media file.
620
621    Although checks done here are not done very thoroughly, they make
622    no assumptions about the filetype by looking at its filename
623    extension or similar. Instead they check header data to comply
624    with common known rules (Magic Words).
625
626    If bytestream is not `None` the `path` is ignored.
627
628    Returns filetype as string (something like ``'jpg'``) if
629    file-format can be recognized, ``None`` else.
630
631    Tested recognized filetypes currently are `jpg`, `png`, and `pdf`.
632
633    More filetypes (though untested in waeup.kofa) are automatically
634    recognized because we deploy the stdlib `imghdr` library. See this
635    module's docs for a complete list of filetypes recognized.
636    """
637    if path is None and bytestream is None:
638        return None
639
640    img_type = None
641    if bytestream is not None:
642        img_type = imghdr.what(path, bytestream)
643    else:
644        img_type = imghdr.what(path)
645    for name, replacement in (('jpeg', 'jpg'), ('tiff', 'tif')):
646        if img_type == name:
647            img_type = replacement
648    return img_type
649
650
651def check_pdf(bytestream, file):
652    """Tell whether a file or bytestream is a PDF file.
653
654    Works as a test/plugin for the stdlib `imghdr` library.
655    """
656    if file is not None:
657        file.seek(0)
658        bytestream = file.read(4)
659        file.seek(0)
660
661    if bytestream.startswith('%PDF'):
662        return 'pdf'
663    return None
664
665# register check_pdf as header check function with `imghdr`
666if check_pdf not in imghdr.tests:
667    imghdr.tests.append(check_pdf)
668
669
670def merge_csv_files(path1, path2):
671    """Merge two CSV files into one (appending).
672
673    CSV data from `path2` will be merged into `path1` csv file. This
674    is a bit like 'appending' data from path2 to data from path1.
675
676    The path of the resulting temporary file will be returned.
677
678    In the result file data from `path2` will always come _after_ data
679    from `path1`.
680
681    **Caution**: It is the _callers_ responsibility to remove the
682    result file (which is created by tempfile.mkstemp) after usage.
683
684    This CSV file merging copes with different column orders in both
685    CSV files and even with different column sets in both files.
686
687    Also broken/empty CSV files can be handled.
688    """
689    # sniff the col names
690    try:
691        row10 = csv.DictReader(open(path1, 'rb')).next()
692    except StopIteration:
693        row10 = dict()
694    try:
695        row20 = csv.DictReader(open(path2, 'rb')).next()
696    except StopIteration:
697        row20 = dict()
698    fieldnames = sorted(list(set(row10.keys() + row20.keys())))
699    # now read/write the real data
700    reader1 = csv.DictReader(open(path1, 'rb'))
701    reader2 = csv.DictReader(open(path2, 'rb'))
702    wp, tmp_path = tempfile.mkstemp()
703    writer = csv.DictWriter(os.fdopen(wp, 'wb'), fieldnames)
704    writer.writerow(dict((x, x) for x in fieldnames))  # header
705    for row in reader1:
706        writer.writerow(row)
707    for row in reader2:
708        writer.writerow(row)
709    return tmp_path
710
711
712def product(sequence, start=1):
713    """Returns the product of a sequence of numbers (_not_ strings)
714    multiplied by the parameter `start` (defaults to 1). If the
715    sequence is empty, returns 0.
716    """
717    if not len(sequence):
718        return 0
719    result = start
720    for item in sequence:
721        result *= item
722    return result
723
724
725class NullHandler(logging.Handler):
726    """A logging NullHandler.
727
728    Does not log anything. Useful if you want to shut up a log.
729
730    Defined here for backwards compatibility with Python < 2.7.
731    """
732    def emit(self, record):
733        pass
734
735
736def check_csv_charset(iterable):
737    """Check contents of `iterable` regarding valid CSV encoding.
738
739    `iterable` is expected to be an iterable on _rows_ (not
740    chars). This is true for instance for
741    filehandlers. `zope.publisher.browser.FileUpload` instances are
742    _not_ iterable, unfortunately.
743
744    Returns line num of first illegal char or ``None``. Line nums
745    start counting with 1 (not zero).
746    """
747    linenum = 1
748    reader = csv.DictReader(iterable)
749    try:
750        for row in reader:
751            linenum += 1
752    except UnicodeDecodeError:
753        return linenum
754    except:
755        return linenum + 1
756    return None
Note: See TracBrowser for help on using the repository browser.