source: main/waeup.kofa/trunk/src/waeup/kofa/utils/helpers.py @ 15694

Last change on this file since 15694 was 15627, checked in by Henrik Bettermann, 5 years ago

Update doctests.

  • Property svn:keywords set to Id
File size: 28.1 KB
RevLine 
[7196]1## $Id: helpers.py 15627 2019-09-30 21:39:12Z henrik $
2##
3## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
4## This program is free software; you can redistribute it and/or modify
5## it under the terms of the GNU General Public License as published by
6## the Free Software Foundation; either version 2 of the License, or
7## (at your option) any later version.
8##
9## This program is distributed in the hope that it will be useful,
10## but WITHOUT ANY WARRANTY; without even the implied warranty of
11## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12## GNU General Public License for more details.
13##
14## You should have received a copy of the GNU General Public License
15## along with this program; if not, write to the Free Software
16## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17##
[7819]18"""General helper functions for Kofa.
[4188]19"""
[10677]20import unicodecsv as csv  # XXX: csv ops should move to dedicated module.
[8185]21import datetime
[8466]22import imghdr
[9593]23import logging
[4188]24import os
[8185]25import pytz
[4375]26import re
[4188]27import shutil
[8631]28import tempfile
[5731]29import grok
[5848]30from cStringIO import StringIO
31from docutils.core import publish_string
[15595]32from HTMLParser import HTMLParser
[7943]33from zope.component import getUtility
[5731]34from zope.component.interfaces import IFactory
[5734]35from zope.interface import implementedBy
[7941]36from zope.interface.interface import Method, Attribute
[6071]37from zope.schema import getFieldNames
38from zope.schema.fieldproperty import FieldProperty
[6372]39from zope.security.interfaces import NoInteraction
40from zope.security.management import getInteraction
[7175]41from zope.pluggableauth.interfaces import IAuthenticatorPlugin
[12231]42from zope.formlib.widget import renderElement
[4188]43
[6503]44BUFSIZE = 8 * 1024
[6372]45
[10677]46
[7186]47def remove_file_or_directory(filepath):
[4188]48    """Remove a file or directory.
[5738]49
50    Different to :func:`shutil.rmtree` we also accept not existing
51    paths (returning silently) and if a dir turns out to be a regular
52    file, we remove that.
[4188]53    """
54    filepath = os.path.abspath(filepath)
55    if not os.path.exists(filepath):
56        return
57    if os.path.isdir(filepath):
58        shutil.rmtree(filepath)
59    else:
60        os.unlink(filepath)
61    return
62
[10677]63
[7186]64def copy_filesystem_tree(src, dst, overwrite=False, del_old=False):
[4188]65    """Copy contents of directory src to directory dst.
66
67    Both directories must exists.
68
69    If `overwrite` is true, any same named objects will be
70    overwritten. Otherwise these files will not be touched.
71
72    If `del_old` is true, copied files and directories will be removed
73    from the src directory.
74
75    This functions returns a list of non-copied files.
76
77    Unix hidden files and directories (starting with '.') are not
78    processed by this function.
79    """
80    if not os.path.exists(src):
81        raise ValueError('source path does not exist: %s' % src)
82    if not os.path.exists(dst):
83        raise ValueError('destination path does not exist: %s' % dst)
84    if not os.path.isdir(src):
85        raise ValueError('source path is not a directory: %s' % src)
86    if not os.path.isdir(dst):
87        raise ValueError('destination path is not a directory: %s' % dst)
88    not_copied = []
89    for item in os.listdir(src):
90        if item.startswith('.'):
[10677]91            continue  # We do not copy hidden stuff...
[4188]92        itemsrc = os.path.join(src, item)
93        itemdst = os.path.join(dst, item)
94
95        if os.path.exists(itemdst):
96            if overwrite is True:
[7186]97                remove_file_or_directory(itemdst)
[4188]98            else:
99                not_copied.append(item)
100                continue
[6113]101
[4188]102        if os.path.isdir(itemsrc):
103            shutil.copytree(itemsrc, itemdst)
104        else:
105            shutil.copy2(itemsrc, itemdst)
106        if del_old:
[7186]107            remove_file_or_directory(itemsrc)
[4188]108    return not_copied
[4375]109
110
[7186]111def get_inner_HTML_part(html_code):
[4375]112    """Return the 'inner' part of a complete HTML snippet.
113
114    If there is a form part, get this.
115
116    If there is no form part, try to return the body part contents.
117
118    If there is no body, return as-is.
[5738]119
120    Let's see how that works. If we deliver some doc with form, we
121    will get that form only:
122
123       >>> doc = '<html><form>My Form</form>Outside the form</html>'
[7186]124       >>> get_inner_HTML_part(doc)
[5738]125       '<form>My Form</form>'
126
127    No form? Then seek for a body part and get the contents:
128
129       >>> doc = '<html><body>My Body</body>Trailing Trash</html>'
[7186]130       >>> get_inner_HTML_part(doc)
[5738]131       'My Body'
132
133    If none of these is included, return what we got:
134
135       >>> doc = '<html>without body nor form</html>'
[7186]136       >>> get_inner_HTML_part(doc)
[5738]137       '<html>without body nor form</html>'
138
[4375]139    """
140
141    try:
[5738]142        result = re.match('^.+(<form[^\>]*>.*</form>).+$', html_code,
[4375]143                          re.DOTALL).groups()[0]
144        return result
145    except AttributeError:
146        # No <form> part included
147        try:
148            result = re.match('^.+<body[^\>]*>(.*)</body>.*$', html_code,
149                              re.DOTALL).groups()[0]
150            return result
151        except AttributeError:
152            # No <form> and no <body> tag...
153            pass
154    return html_code
155
[10677]156
[5731]157class FactoryBase(grok.GlobalUtility):
158    """A factory for things.
159
160    This is a baseclass for easier creation of factories. Factories
161    are utilities that are registered under a certain name and return
162    instances of certain classes when called.
163
[7811]164    In :mod:`waeup.kofa` we use factories extensively for
[7933]165    batching. While processing a batch some processors looks up a
[5731]166    factory to create real-world instances that then get filled with
167    data from imported CSV files.
168
169    To get rid of reimplementing the same stuff over and over again,
170    most notably the methods defined here, we offer this base class
171    (which will *not* be registered as a factory itself).
172
173    Real factories can then be created like this:
174
175       >>> import grok
[7811]176       >>> from waeup.kofa.utils.helpers import FactoryBase
[5731]177       >>> class MyObject(object):
178       ...   # Some class we want to get instances of.
179       ...   pass
180       >>> class MyObjectFactory(FactoryBase):
181       ...   # This is the factory for MyObject instances
[7811]182       ...   grok.name(u'waeup.kofa.factory.MyObject')
[5731]183       ...   factory = MyObject
184
185    That's it. It is essential to set the ``factory`` attribute, which
186    will determine the class of which instances should be created when
187    called. The given name must even be unique amongst all utilities
188    registered during runtime. While you can pick any name you like
[7811]189    you might want to prepend ``waeup.kofa.factory.`` to the name
[5731]190    string to make sure it does not clash with names of other
191    utilities one day.
192
193    Before all this works we have to grok the baseclass once and our
194    freshly defined factory. This executes all the component
195    registration stuff we don't want to do ourselves. In daily use
[7811]196    this is done automatically on startup of a :mod:`waeup.kofa`
[5731]197    system.
[6113]198
[7811]199       >>> grok.testing.grok('waeup.kofa.utils.helpers')
[5731]200       >>> grok.testing.grok_component(
201       ...    'MyObjectFactory', MyObjectFactory
202       ...  )
203       True
204
[7933]205    After grokking we (and processors) can create objects without
[5731]206    knowing about the location of the real class definition, just by
207    the factory name:
208
209       >>> from zope.component import createObject
[7811]210       >>> obj = createObject('waeup.kofa.factory.MyObject')
[5731]211       >>> isinstance(obj, MyObject)
212       True
213
214    We can also use the regular utility lookups to find our new
215    factory:
216
217       >>> from zope.component import getUtility
218       >>> from zope.component.interfaces import IFactory
219       >>> factory = getUtility(
[7811]220       ...   IFactory, name='waeup.kofa.factory.MyObject'
[5731]221       ...   )
222       >>> isinstance(factory, MyObjectFactory)
223       True
224
225    And this factory generates `MyObject` instances:
226
227       >>> obj = factory()
228       >>> isinstance(obj, MyObject)
229       True
230
231    """
[10677]232    grok.baseclass()  # Do not grok this class, do not register us.
[5731]233    grok.implements(IFactory)
234    # You can override any of the following attributes in derived
235    # classes. The `grok.name` setting *must* even be set to some
236    # unique value.
237    grok.name(u'waeup.Factory')
238    title = u"Create instances of ``factory``.",
239    description = u"This factory instantiates new applicant instances."
240    factory = None
241
242    def __call__(self, *args, **kw):
243        """The main factory function.
244
245        Returns an instance of the requested object.
246        """
247        return self.factory()
248
249    def getInterfaces(self):
250        # Required by IFactory
251        return implementedBy(self.factory)
[5848]252
[10677]253
[5848]254def ReST2HTML_w_warnings(source_string):
255    """Convert a reStructuredText string to HTML preserving warnings.
256
257    Returns a tuple ``(<HTML_CODE>, <WARNINGS>)``, both being
258    strings. Where ``<HTML_CODE>`` is the HTML code generated from the
[5876]259    source string (in unicode), ``<WARNINGS>`` is a string containing
260    any warning messages or ``None``.
[6113]261
[5848]262    Regular multi-line ReStructuredText strings will be returned as
263    HTML code:
264
[7811]265        >>> from waeup.kofa.utils.helpers import ReST2HTML
[5848]266        >>> source = '''
267        ... Headline
268        ... ========
269        ...
270        ... - A list item
271        ... - Another item
272        ...
273        ... Thanks for watching!
274        ... '''
275        >>> html, warnings = ReST2HTML_w_warnings(source)
276        >>> print html
277        <div class="document" id="headline">
278        <h1 class="title">Headline</h1>
279        <BLANKLINE>
280        <ul class="simple">
281        <li>A list item</li>
282        <li>Another item</li>
283        </ul>
284        <p>Thanks for watching!</p>
285        </div>
286
287    Here no warnings happened, so the `warnings` are ``None``:
288
289        >>> warnings is None
290        True
[6113]291
[5848]292    If warnings happen then they can be retrieved in the returned
293    ``warnings``. We try to render an erraneous document:
294
295        >>> source = '''
296        ... Headline
297        ... ======
298        ...
299        ... Thanks for watching!
300        ... '''
301        >>> html, warnings = ReST2HTML_w_warnings(source)
302        >>> print html
303        <div class="document" id="headline">
304        <h1 class="title">Headline</h1>
305        <BLANKLINE>
306        <p>Thanks for watching!</p>
307        </div>
308
309        >>> print warnings
310        <string>:3: (WARNING/2) Title underline too short.
311        <BLANKLINE>
312        Headline
313        ======
314        <BLANKLINE>
315
316    As you can see, the warnings are not displayed inline the document
317    but can be retrieved from the returned warnings, which is a string
318    or ``None``.
319    """
320    warnings = StringIO()
321    fulldoc = publish_string(
322        source_string, writer_name='html4css1',
323        settings_overrides={
324            'report_level': 0,
325            'warning_stream': warnings,
326            })
327    warnings.seek(0)
328    warning_msgs = warnings.read()
329    if warning_msgs:
330        # Render again, this time with no warnings inline...
[10677]331        fulldoc = publish_string(
[5848]332        source_string, writer_name='html4css1',
333        settings_overrides={
334            'report_level': 10000,
335            'halt_level': 10000,
336            'warning_stream': warnings,
337            })
338    if warning_msgs == '':
339        warning_msgs = None
[7186]340    result = get_inner_HTML_part(fulldoc).strip()
[5876]341    if not isinstance(result, unicode):
342        result = result.decode('utf-8')
343    return result, warning_msgs
[5848]344
[10677]345
[5848]346def ReST2HTML(source_string):
347    """Render a string containing ReStructuredText to HTML.
348
349    Any warnings about too short headings, etc. are silently
350    discarded. Use :func:`ReST2HTML_w_warnings` if you want to get any
351    warnings.
352
[5876]353    The returned string will be unicode.
[6113]354
[5848]355    A regular document will be rendered like this:
356
357        >>> source = '''
358        ... Headline
359        ... ========
360        ...
361        ... Thanks for watching!
362        ... '''
363        >>> html = ReST2HTML(source)
364        >>> print html
365        <div class="document" id="headline">
366        <h1 class="title">Headline</h1>
367        <BLANKLINE>
368        <p>Thanks for watching!</p>
369        </div>
370
371    A document with markup problems (here: the underline is too short)
372    will look similar:
373
374        >>> source = '''
375        ... Headline
376        ... ======
377        ...
378        ... Thanks for watching!
379        ... '''
380        >>> html = ReST2HTML(source)
381        >>> print html
382        <div class="document" id="headline">
383        <h1 class="title">Headline</h1>
384        <BLANKLINE>
385        <p>Thanks for watching!</p>
386        </div>
[6113]387
[5848]388    """
389    html, warnings = ReST2HTML_w_warnings(source_string)
390    return html
[6071]391
[10677]392
[9689]393def attrs_to_fields(cls, omit=[]):
[12974]394    """Set class attributes and bind them to the data definitions
395    specified in the interface by turning the attributes into FieldProperty
396    instances.
[6113]397
398    With Python >= 2.6 we can even use this function as a class decorator.
[9689]399
400    `omit` is a list of field names that should _not_ be turned into
401    field properties. This is useful for properties and the like.
[6071]402    """
403    iface = list(implementedBy(cls))[0]
404    for field_name in getFieldNames(iface):
[9689]405        if field_name in omit:
406            continue
[11476]407        field_property = FieldProperty(iface[field_name])
408        # Set proper docstring for the API docs.
409        field_property.__doc__ = iface[field_name].title + ' (computed attribute)'
410        setattr(cls, field_name, field_property)
[6071]411    return cls
[6372]412
[10677]413
[6372]414def get_current_principal():
415    """Get the 'current' principal.
416
417    This method works without a request. Examining a request is the
418    regular (and recommended) way to get a principal involved
419    'currently'.
420
421    Use this method only if you really have no access to the current
422    request.
423
424    Returns ``None`` when no principal is involved (for instance
425    during tests).
426    """
427    try:
428        principal = getInteraction().participations[0].principal
429    except NoInteraction:
430        return None
[10677]431    except IndexError:  # No participations present
[6372]432        return None
433    return principal
[6503]434
[10677]435
[6503]436def cmp_files(file_descr1, file_descr2):
437    """Compare two files by their file descriptors.
438
439    Returns ``True`` if both are equal, ``False`` otherwise.
440    """
[6531]441    file_descr1.seek(0)
442    file_descr2.seek(0)
[6503]443    while True:
444        b1 = file_descr1.read(BUFSIZE)
445        b2 = file_descr2.read(BUFSIZE)
446        if b1 != b2:
447            return False
448        if not b1:
449            return True
[7078]450
[10677]451
[7078]452def string_from_bytes(number):
453    """Turn a number into some textual representation.
454
455      Examples:
456
457        >>> string_from_bytes(1)
458        u'1 byte(s)'
459
460        >>> string_from_bytes(1025)
461        u'1 KB'
462
463        >>> string_from_bytes(1.5 * 1024*1024)
464        u'1.50 MB'
465
466        >>> string_from_bytes(673.286 * 1024**3)
467        u'673.29 GB'
468
469    """
470    if number < 1024:
471        return u'%s byte(s)' % (str(number),)
[10677]472    elif number < 1024 ** 2:
[7078]473        return u'%s KB' % (number / 1024,)
[10677]474    elif number < 1024 ** 3:
475        return u'%.2f MB' % (number / 1024 ** 2,)
476    return u'%.2f GB' % (number / 1024 ** 3,)
[7079]477
[10677]478
[7079]479def file_size(file_like_obj):
480    """Determine file size in most effective manner.
481
482    Returns the number of bytes in a file. This function works for
483    both, real files as well as file-like objects like cStringIO based
484    'files'.
485
486    Example:
487
488      >>> from cStringIO import StringIO
489      >>> file_size(StringIO('my file content'))
490      15
491
492    Please note that this function expects the file-like object passed
493    in to be at first reading position (it does no seek(0)) and that
494    when finished the file pointer might be at end of file.
495    """
496    if hasattr(file_like_obj, 'fileno'):
497        return os.fstat(file_like_obj.fileno())[6]
[10677]498    file_like_obj.seek(0, 2)  # seek to last position in file
[7079]499    return file_like_obj.tell()
[7175]500
[10677]501
[7175]502def get_user_account(request):
503    """Return local user account.
504    """
505    principal_id = request.principal.id
[7234]506    authenticator = getUtility(IAuthenticatorPlugin, name='users')
507    account = authenticator.getAccount(principal_id)
[7175]508    return account
[7941]509
[10677]510
[7941]511def iface_names(iface, omit=[], exclude_attribs=True, exclude_methods=True):
512    """Get all attribute names of an interface.
513
514    Searches also base interfaces.
515
516    Names of fields that are pure attributes
517    (i.e. zope.interface.Attribute) or methods are excluded by
518    default.
519
520    Names of typical fields derived from zope.schema are included.
521
522    The `omit` paramter can give a list of names to exclude.
523
524    Returns an unsorted list of strings.
525    """
[8370]526    ifaces = set((iface,))
527    # Collect all interfaces (also bases) recursively
528    while True:
529        ext_ifaces = set(ifaces)
530        for iface in ext_ifaces:
531            ext_ifaces = set.union(ext_ifaces, set(iface.getBases()))
532        if ext_ifaces == ifaces:
533            # No new interfaces found, list complete
534            break
535        ifaces = ext_ifaces
536    # Collect (filtered) names of collected interfaces
[7941]537    result = []
[8370]538    for iface in ifaces:
539        for name, descr in iface.namesAndDescriptions():
540            if name in omit:
541                continue
542            if exclude_attribs and descr.__class__ is Attribute:
543                continue
544            if exclude_methods and isinstance(descr, Method):
545                continue
[9043]546            if name in result:
547                continue
[8370]548            result.append(name)
[7941]549    return result
[7968]550
[10677]551
[7968]552def get_sorted_preferred(tuples_iterable, preferred_list):
553    """Get a list of tuples (<TITLE>,<TOKEN>) with values in
554    `preferred_list` put in front.
555
556    The rest of the tuples iterable is returned in orginal order. This
557    is useful for putting default entries on top of (already sorted)
558    lists of choice values, for instance when sorting countries and
559    their code.
560
561    Sample:
562
563    We have a list of tuples with uppercase 'titles' and lowercase
564    'tokens'. This list is already sorted but we want certain values
565    of this list to show up before other values. For instance we want
566    to see the 'C' entry to come first.
567
568      >>> get_sorted_preferred([('A','a'), ('B','b'), ('C','c')],
569      ...                       ['c'])
570      (('C', 'c'), ('A', 'a'), ('B', 'b'))
571
572    i.e. the entry with 'c' as second value moved to head of result.
573
574    We can also require multiple entries at head of list:
575
576      >>> get_sorted_preferred([('A','a'), ('B','b'), ('C','c')],
577      ...                       ['b', 'c'])
578      (('B', 'b'), ('C', 'c'), ('A', 'a'))
579
580    We required the 'b' entry to come before the 'c' entry and then
581    the rest of the input list. That's what we got.
582
583    The result is returned as a tuple of tuples to keep order of values.
584    """
585    result = [None for x in preferred_list]
586    for title, code in tuples_iterable:
587        if code in preferred_list:
588            index = preferred_list.index(code)
589            result[index] = (title, code)
590        else:
591            result.append((title, code))
592    return tuple(result)
[8185]593
[10677]594
[8185]595def now(tz=None):
596    """Get current datetime in timezone of `tz`.
597
598    If `tz`, a `tzinfo` instance, is None, UTC time is returned.
599
600    `tz` should be a timezone as defined in pytz.
601    """
602    return to_timezone(datetime.datetime.utcnow(), tz=tz)
603
[10677]604
[8185]605def to_timezone(dt, tz=None):
606    """Shift datetime into timezone `tz`.
607
608    If datetime `dt` contains no `tzinfo` (i.e. it is 'naive'), it is
609    assumed to be UTC.
610
611    If no `tz` is given, shift to UTC is performed.
[8192]612
613    If `dt` is not a datetime.datetime, the input value is returned
614    unchanged.
[8185]615    """
[8192]616    if not isinstance(dt, datetime.datetime):
617        return dt
[8185]618    if tz is None:
619        tz = pytz.utc
620    if dt.tzinfo is None:
621        dt = pytz.utc.localize(dt)
622    return tz.normalize(dt.tzinfo.normalize(dt).astimezone(tz))
[8466]623
[10677]624
[11660]625def imghdr_test_fpm(h, f):
626    """FPM fileformat test.
627
628    The `fpm` fileformat is the binary fingerprint data as created by
629    `libfprint`.
630    """
631    if len(h) >= 3 and h[:3] == 'FP1':
632        return 'fpm'
633
634
635#: Add test function in stdlib's imghdr tests.
636imghdr.tests.append(imghdr_test_fpm)
637
638
[8466]639def get_fileformat(path, bytestream=None):
640    """Try to determine the file format of a given media file.
641
642    Although checks done here are not done very thoroughly, they make
643    no assumptions about the filetype by looking at its filename
644    extension or similar. Instead they check header data to comply
645    with common known rules (Magic Words).
646
647    If bytestream is not `None` the `path` is ignored.
648
649    Returns filetype as string (something like ``'jpg'``) if
650    file-format can be recognized, ``None`` else.
651
[11660]652    Tested recognized filetypes currently are `jpg`, `png`, `fpm`, and
653    `pdf`.
[8466]654
655    More filetypes (though untested in waeup.kofa) are automatically
656    recognized because we deploy the stdlib `imghdr` library. See this
657    module's docs for a complete list of filetypes recognized.
658    """
659    if path is None and bytestream is None:
660        return None
661
662    img_type = None
663    if bytestream is not None:
664        img_type = imghdr.what(path, bytestream)
665    else:
666        img_type = imghdr.what(path)
667    for name, replacement in (('jpeg', 'jpg'), ('tiff', 'tif')):
668        if img_type == name:
669            img_type = replacement
670    return img_type
671
[10677]672
[8466]673def check_pdf(bytestream, file):
674    """Tell whether a file or bytestream is a PDF file.
675
676    Works as a test/plugin for the stdlib `imghdr` library.
677    """
678    if file is not None:
679        file.seek(0)
680        bytestream = file.read(4)
681        file.seek(0)
682
683    if bytestream.startswith('%PDF'):
684        return 'pdf'
685    return None
686
687# register check_pdf as header check function with `imghdr`
688if check_pdf not in imghdr.tests:
689    imghdr.tests.append(check_pdf)
[8631]690
[10677]691
[8631]692def merge_csv_files(path1, path2):
693    """Merge two CSV files into one (appending).
694
695    CSV data from `path2` will be merged into `path1` csv file. This
696    is a bit like 'appending' data from path2 to data from path1.
697
698    The path of the resulting temporary file will be returned.
699
700    In the result file data from `path2` will always come _after_ data
701    from `path1`.
702
703    **Caution**: It is the _callers_ responsibility to remove the
704    result file (which is created by tempfile.mkstemp) after usage.
705
706    This CSV file merging copes with different column orders in both
707    CSV files and even with different column sets in both files.
[8633]708
709    Also broken/empty CSV files can be handled.
[8631]710    """
711    # sniff the col names
[8633]712    try:
713        row10 = csv.DictReader(open(path1, 'rb')).next()
714    except StopIteration:
715        row10 = dict()
716    try:
717        row20 = csv.DictReader(open(path2, 'rb')).next()
718    except StopIteration:
719        row20 = dict()
[8631]720    fieldnames = sorted(list(set(row10.keys() + row20.keys())))
721    # now read/write the real data
722    reader1 = csv.DictReader(open(path1, 'rb'))
723    reader2 = csv.DictReader(open(path2, 'rb'))
724    wp, tmp_path = tempfile.mkstemp()
725    writer = csv.DictWriter(os.fdopen(wp, 'wb'), fieldnames)
[10677]726    writer.writerow(dict((x, x) for x in fieldnames))  # header
[8631]727    for row in reader1:
728        writer.writerow(row)
729    for row in reader2:
730        writer.writerow(row)
731    return tmp_path
[9372]732
[10677]733
[9372]734def product(sequence, start=1):
735    """Returns the product of a sequence of numbers (_not_ strings)
736    multiplied by the parameter `start` (defaults to 1). If the
737    sequence is empty, returns 0.
738    """
739    if not len(sequence):
740        return 0
741    result = start
742    for item in sequence:
743        result *= item
744    return result
[9593]745
[10677]746
[9593]747class NullHandler(logging.Handler):
748    """A logging NullHandler.
749
750    Does not log anything. Useful if you want to shut up a log.
751
752    Defined here for backwards compatibility with Python < 2.7.
753    """
754    def emit(self, record):
755        pass
[10676]756
757
758def check_csv_charset(iterable):
[14939]759    """Check contents of `iterable` regarding valid CSV encoding and
760    trailing whitespaces in data.
[10676]761
762    `iterable` is expected to be an iterable on _rows_ (not
763    chars). This is true for instance for
764    filehandlers. `zope.publisher.browser.FileUpload` instances are
765    _not_ iterable, unfortunately.
766
767    Returns line num of first illegal char or ``None``. Line nums
[14939]768    start counting with 1 (not zero). Returns -1 if data contain
769    trailing whitespaces.
[10676]770    """
771    linenum = 1
772    try:
[13537]773        reader = csv.DictReader(iterable)
[10676]774        for row in reader:
775            linenum += 1
[14939]776            for value in row.values():
777                if value.endswith(' '):
778                    return -1
[10676]779    except UnicodeDecodeError:
780        return linenum
781    except:
782        return linenum + 1
783    return None
[11824]784
785
786class MemInfo(dict):
787    """A dict with access to its items like if they are attributes.
788    """
789    __getattr__ = dict.__getitem__
790    __setattr__ = dict.__setitem__
791    __delattr__ = dict.__delitem__
792
793
794def get_meminfo(src="/proc/meminfo"):
795    """Get local memory info as provided in /proc/meminfo.
796
797    Entries in /proc/meminfo are available as MemInfo attributes.
798
799    By default we lookup a file /proc/meminfo. Another path can be
800    lines = open(src, 'r').read()passed in as `src` parameter. In this
801    case `src` must be a regular file and contain meminfo-style data.
802
803    If the given `src` (or `/proc/meminfo`) are not available, `None`
804    lines = open(src, 'r').read()is returned.
805    """
806    if not os.path.isfile(src):
807        return None
808    lines = open(src, 'r').read().splitlines()
809    result = MemInfo()
810    for line in lines:
811        key, value = line.split(':', 1)
812        value = int(value.split(' kB', 1)[0])
813        result[key] = value
814    return result
[12231]815
816def html2dict(value=None,portal_language='en'):
817    """Transforms a localized HTML text string into a dictionary.
818
[13077]819    Different languages must be separated by ``>>xy<<`` whereas
[12231]820    xy is the language code. Text parts without correct leading
821    language separator - usually the first part has no language
822    descriptor - are interpreted as texts in the portal's language.
823    """
824    try:
825        parts = value.split('>>')
826    except:
827        return {}
828    elements = {}
829    lang = portal_language
830    for part in parts:
831        if part[2:4] == u'<<':
[12393]832            lang = str(part[0:2].lower())
[12231]833            text = part[4:]
834            elements[lang] = renderElement(u'div id="html"',
835                contents=text)
836        else:
837            text = part
838            elements[lang] = renderElement(u'div id="html"',
839                contents=text)
[12433]840    return elements
841
842def rest2dict(value=None,portal_language='en'):
843    """Transforms a localized REST text string into a dictionary.
844
[13077]845    Different languages must be separated by ``>>xy<<``` whereas
[12433]846    xy is the language code. Text parts without correct leading
847    language separator - usually the first part has no language
848    descriptor - are interpreted as texts in the portal's language.
849    """
850    try:
851        parts = value.split('>>')
852    except:
853        return {}
854    elements = {}
855    lang = portal_language
856    for part in parts:
857        if part[2:4] == u'<<':
858            lang = str(part[0:2].lower())
859            text = part[4:]
860            elements[lang] = renderElement(u'div id="rest"',
861                contents=ReST2HTML(text))
862        else:
863            text = part
864            elements[lang] = renderElement(u'div id="rest"',
865                contents=ReST2HTML(text))
[15595]866    return elements
867
868
869
870class FormVarParser(HTMLParser):
871    """An HTML form parser that extracts keys and values.
872
873       Fed with an HTML document, we parse all starttags and check for each,
874       whether it provides a `name` and a `value` attribute. If so, the
875       values of the respective attributes are stored in instance var
876       `form_vars` as a dict entry.
877    """
878
879    def __init__(self):
880        HTMLParser.__init__(self)  # old-style class - no super()
881        self.form_vars = {}
882
883    def handle_starttag(self, tag, attrs):
884        tag_attrs = {}
885        for key, val in attrs:
886            tag_attrs[key] = val
887        if 'name' in tag_attrs and 'value' in tag_attrs:
[15597]888            self.form_vars[tag_attrs['name']] = unicode(tag_attrs['value'])
[15595]889
890
891def extract_formvars(html_code):
892    """Extract keys and values from an HTML form as dict.
893
894       No text, no values::
895
896         >>> extract_formvars("")
897         {}
898
899       Simple input tags normally provide name and value::
900
901         >>> extract_formvars("<input type='text' name='foo' value='bar'>")
[15627]902         {'foo': u'bar'}
[15595]903
904       The sample doc we stored in tests is a bit more difficult::
905
906         >>> html_path = os.path.join(os.path.dirname(__file__),
907         ...                          'tests', 'sample_response.html')
908         >>> html_code = open(html_path, 'r').read()
909         >>> import pprint
910         >>> pprint.pprint(extract_formvars(html_code))
[15627]911         {'AMOUNT': u'100',
[15595]912         ...
[15627]913          'TRANS_NUM': u'01ESA20190916134824YA3YJ8'}
[15595]914
915    """
916    result = {}
917    parser = FormVarParser()
918    parser.feed(html_code)
919    return parser.form_vars
Note: See TracBrowser for help on using the repository browser.