source: main/waeup.kofa/trunk/src/waeup/kofa/utils/tests/test_helpers.py @ 11078

Last change on this file since 11078 was 10676, checked in by uli, 11 years ago

Check uploaded CSV files more thoroughly before processing.

  • Property svn:keywords set to Id
File size: 18.5 KB
Line 
1# -*- coding: utf-8 -*-
2
3## $Id: test_helpers.py 10676 2013-10-31 17:51:25Z uli $
4##
5## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
6## This program is free software; you can redistribute it and/or modify
7## it under the terms of the GNU General Public License as published by
8## the Free Software Foundation; either version 2 of the License, or
9## (at your option) any later version.
10##
11## This program is distributed in the hope that it will be useful,
12## but WITHOUT ANY WARRANTY; without even the implied warranty of
13## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14## GNU General Public License for more details.
15##
16## You should have received a copy of the GNU General Public License
17## along with this program; if not, write to the Free Software
18## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19##
20
21import datetime
22import os
23import pytz
24import shutil
25import tempfile
26import unittest
27import doctest
28from cStringIO import StringIO
29from zope import schema
30from zope.interface import Interface, Attribute, implements, implementer
31from zope.security.testing import Principal, Participation
32from zope.security.management import newInteraction, endInteraction
33from waeup.kofa.utils import helpers
34
35class IFakeObject(Interface):
36    """Some marker interface."""
37
38class FakeObject(object):
39    implements(IFakeObject)
40
41class SimpleHelpersTestCase(unittest.TestCase):
42    # Tests for simple functions in `helpers`.
43    def test_product(self):
44        # the product will return zero without input
45        result1 = helpers.product([])
46        result2 = helpers.product([1,2,3])
47        result3 = helpers.product([], start=5)
48        result4 = helpers.product([1,2,3], start=5)
49        self.assertEqual(result1, 0)
50        self.assertEqual(result2, 6)
51        self.assertEqual(result3, 0)
52        self.assertEqual(result4, 30)
53        return
54
55    def test_attrs_to_fields_properties(self):
56        # we can omit single fields in order to retrieve properties
57        class IMyInterface(Interface):
58            attr1 = schema.Int(
59                title = u'Attribute 1', readonly = True, default = 110,
60                )
61
62        @helpers.attrs_to_fields
63        @implementer(IMyInterface)
64        class MyClass1(object):
65            @property
66            def attr1(self):
67                return 42
68
69        @implementer(IMyInterface)
70        class MyClass2(object):
71            @property
72            def attr1(self):
73                return 42
74        MyClass2 = helpers.attrs_to_fields(MyClass2, omit=['attr1'])
75
76        obj1 = MyClass1()
77        obj2 = MyClass2()
78
79        self.assertEqual(obj1.attr1, 110)
80        self.assertEqual(obj2.attr1, 42)
81        return
82
83
84class RemoveFileOrDirectoryTestCase(unittest.TestCase):
85
86    def setUp(self):
87        self.dirpath = tempfile.mkdtemp()
88        self.filepath = os.path.join(self.dirpath, 'somefile')
89        self.non_file = os.path.join(self.dirpath, 'nonfile')
90        open(self.filepath, 'wb').write('Hi!')
91        return
92
93    def tearDown(self):
94        if os.path.exists(self.dirpath):
95            shutil.rmtree(self.dirpath)
96        return
97
98    def test_handle_not_existing_path(self):
99        result = helpers.remove_file_or_directory(self.non_file)
100        self.assertTrue(result is None)
101        return
102
103    def test_handle_dir(self):
104        helpers.remove_file_or_directory(self.dirpath)
105        self.assertFalse(
106            os.path.exists(self.dirpath)
107            )
108        return
109
110    def test_handle_file(self):
111        helpers.remove_file_or_directory(self.filepath)
112        self.assertFalse(
113            os.path.exists(self.filepath)
114            )
115        return
116
117class CopyFileSystemTreeTestCase(unittest.TestCase):
118    # Test edge cases of copy_filesystem_tree().
119    #
120    # This is a typical case of tests not written as doctest as it is
121    # normally not interesting for developers and we only want to make
122    # sure everything works as expected.
123    def setUp(self):
124        self.existing_src = tempfile.mkdtemp()
125        self.filepath = os.path.join(self.existing_src, 'somefile')
126        open(self.filepath, 'wb').write('Hi!')
127        self.existing_dst = tempfile.mkdtemp()
128        self.not_existing_dir = tempfile.mkdtemp()
129        shutil.rmtree(self.not_existing_dir)
130
131        pass
132
133    def tearDown(self):
134        shutil.rmtree(self.existing_src)
135        shutil.rmtree(self.existing_dst)
136        pass
137
138    def test_source_and_dst_existing(self):
139        helpers.copy_filesystem_tree(self.existing_src, self.existing_dst)
140        self.assertTrue(
141            os.path.exists(
142                os.path.join(self.existing_dst, 'somefile')
143                )
144            )
145        return
146
147    def test_source_not_existing(self):
148        self.assertRaises(
149            ValueError,
150            helpers.copy_filesystem_tree,
151            self.not_existing_dir,
152            self.existing_dst
153            )
154        return
155
156    def test_dest_not_existing(self):
157        self.assertRaises(
158            ValueError,
159            helpers.copy_filesystem_tree,
160            self.existing_src,
161            self.not_existing_dir
162            )
163        return
164
165    def test_src_not_a_dir(self):
166        self.assertRaises(
167            ValueError,
168            helpers.copy_filesystem_tree,
169            self.filepath,
170            self.existing_dst
171            )
172        return
173
174    def test_dst_not_a_dir(self):
175        self.assertRaises(
176            ValueError,
177            helpers.copy_filesystem_tree,
178            self.existing_src,
179            self.filepath
180            )
181        return
182
183class ReST2HTMLTestCase(unittest.TestCase):
184
185    def setUp(self):
186        self.expected = u'<div class="document">\n\n\n<p>Some '
187        self.expected += u'test with \xfcmlaut</p>\n</div>'
188        return
189
190    def test_ascii_umlauts(self):
191        # Make sure we convert umlauts correctly to unicode.
192        source = 'Some test with ümlaut'
193        result = helpers.ReST2HTML(source)
194        self.assertEqual(result, self.expected)
195
196    def test_unicode_umlauts(self):
197        # Make sure we convert umlauts correctly to unicode.
198        source = u'Some test with ümlaut'
199        result = helpers.ReST2HTML(source)
200        self.assertEqual(result, self.expected)
201
202    def test_unicode_output_from_ascii(self):
203        source = 'Some test with ümlaut'
204        self.assertTrue(isinstance(helpers.ReST2HTML(source), unicode))
205
206    def test_unicode_output_from_unicode(self):
207        source = u'Some test with ümlaut'
208        self.assertTrue(isinstance(helpers.ReST2HTML(source), unicode))
209
210
211class FactoryBaseTestCase(unittest.TestCase):
212
213    def test_ifaces(self):
214        # We test all relevant parts in the docstring. But the interfaces
215        # method has to be tested to please the coverage report as well.
216        factory = helpers.FactoryBase()
217        factory.factory = FakeObject
218        self.assertTrue(factory.getInterfaces()(IFakeObject))
219        return
220
221class CurrentPrincipalTestCase(unittest.TestCase):
222
223    def tearDown(test):
224        endInteraction() # Just in case, one is still lingering around
225
226    def test_existing_principal(self):
227        # We can get the current principal if one is involved
228        principal = Principal('myprincipal')
229        newInteraction(Participation(principal))
230        result = helpers.get_current_principal()
231        self.assertTrue(result is principal)
232
233    def test_no_participation(self):
234        # Interactions without participation are handled correctly
235        newInteraction()
236        result = helpers.get_current_principal()
237        self.assertTrue(result is None)
238
239    def test_not_existing_principal(self):
240        # Missing interactions do not raise errors.
241        result = helpers.get_current_principal()
242        self.assertTrue(result is None)
243
244class CmpFilesTestCase(unittest.TestCase):
245
246    def setUp(self):
247        self.workdir = tempfile.mkdtemp()
248
249    def tearDown(self):
250        shutil.rmtree(self.workdir)
251
252    def test_equal(self):
253        p1 = os.path.join(self.workdir, 'sample1')
254        p2 = os.path.join(self.workdir, 'sample2')
255        open(p1, 'wb').write('Hi!')
256        open(p2, 'wb').write('Hi!')
257        assert helpers.cmp_files(open(p1, 'r'), open(p2, 'r')) is True
258
259    def test_unequal(self):
260        p1 = os.path.join(self.workdir, 'sample1')
261        p2 = os.path.join(self.workdir, 'sample2')
262        open(p1, 'wb').write('Hi!')
263        open(p2, 'wb').write('Ho!')
264        assert helpers.cmp_files(open(p1, 'r'), open(p2, 'r')) is False
265
266class FileSizeTestCase(unittest.TestCase):
267
268    def setUp(self):
269        self.workdir = tempfile.mkdtemp()
270
271    def tearDown(self):
272        shutil.rmtree(self.workdir)
273
274    def test_real_file(self):
275        # we can get the size of real files
276        path = os.path.join(self.workdir, 'sample.txt')
277        open(path, 'wb').write('My content')
278        self.assertEqual(
279            int(helpers.file_size(open(path, 'rb'))), 10)
280        return
281
282    def test_stringio_file(self):
283        # we can get the size of file-like objects
284        self.assertEqual(
285            helpers.file_size(StringIO('my sample content')), 17)
286
287class IfaceNamesTestCase(unittest.TestCase):
288
289    def test_iface_names(self):
290        class I1(Interface):
291            foo = Attribute("""Some Foo""")
292            def bar(blah):
293                pass
294            i1_name = schema.TextLine(title=u'i1 name')
295        class I2(I1):
296            baz = schema.TextLine(title=u'some baz')
297        class I3(I2):
298            pass
299
300        result1 = helpers.iface_names(I3)
301        result2 = helpers.iface_names(I2)
302        result3 = helpers.iface_names(I1)
303        result4 = helpers.iface_names(I3, exclude_attribs=False)
304        result5 = helpers.iface_names(I3, exclude_methods=False)
305        result6 = helpers.iface_names(I3, omit='i1_name')
306        self.assertEqual(sorted(result1), ['baz', 'i1_name'])
307        self.assertEqual(sorted(result2), ['baz', 'i1_name'])
308        self.assertEqual(sorted(result3), ['i1_name'])
309        self.assertEqual(sorted(result4), ['baz', 'foo', 'i1_name'])
310        self.assertEqual(sorted(result5), ['bar', 'baz', 'i1_name'])
311        self.assertEqual(sorted(result6), ['baz'])
312        return
313
314class DateTimeHelpersTestCase(unittest.TestCase):
315
316    def test_now(self):
317        tz_berlin = pytz.timezone('Europe/Berlin')
318        result1 = helpers.now()
319        result2 = helpers.now(tz_berlin)
320        self.assertEqual(result1.tzinfo, pytz.utc)
321        self.assertFalse(result2.tzinfo == pytz.utc)
322        self.assertFalse(result2.tzinfo is None)
323        return
324
325    def test_to_timezone(self):
326        fmt = '%Y-%m-%d %H:%M:%S %Z%z'
327        tz_berlin = pytz.timezone('Europe/Berlin')
328        tz_lagos = pytz.timezone('Africa/Lagos')
329        dt1 = datetime.datetime(2012, 1, 1, 0, 0)
330        dt2 = datetime.datetime(2012, 1, 1, 0, 0, tzinfo=tz_berlin)
331        dt3 = datetime.datetime(2012, 6, 1, 0, 0, tzinfo=tz_lagos)
332        dt4 = datetime.datetime(2012, 6, 1, 0, 0, tzinfo=tz_berlin)
333        result1 = helpers.to_timezone(dt1)
334        result2 = helpers.to_timezone(dt1, pytz.utc)
335        result3 = helpers.to_timezone(dt2)
336        result4 = helpers.to_timezone(dt2, tz_lagos)
337        result5 = helpers.to_timezone(dt3, tz_berlin)
338        result6 = helpers.to_timezone(dt4, tz_lagos)
339        self.assertEqual(
340            result1.strftime(fmt), '2012-01-01 00:00:00 UTC+0000')
341        self.assertEqual(
342            result2.strftime(fmt), '2012-01-01 00:00:00 UTC+0000')
343        self.assertEqual(
344            result3.strftime(fmt), '2011-12-31 23:00:00 UTC+0000')
345        self.assertEqual(
346            result4.strftime(fmt), '2012-01-01 00:00:00 WAT+0100')
347        self.assertEqual(
348            result5.strftime(fmt), '2012-06-01 01:46:00 CEST+0200')
349        self.assertEqual(
350            result6.strftime(fmt), '2012-06-01 00:00:00 WAT+0100')
351        return
352
353    def test_to_timezone_no_dt(self):
354        # the to_timezone function copes with dates (!= datetimes)
355        d = datetime.date(2012, 12, 1)
356        result1 = helpers.to_timezone(d)
357        result2 = helpers.to_timezone(d, pytz.utc)
358        self.assertEqual(result1, d)
359        self.assertEqual(result2, d)
360        return
361
362class GetFileFormatTestCase(unittest.TestCase):
363    # Tests for the get_fileformat helper.
364
365    def setUp(self):
366        self.valid_jpg_path = os.path.join(
367            os.path.dirname(__file__), 'sample_jpg_valid.jpg')
368        self.valid_jpg = open(self.valid_jpg_path, 'rb').read()
369        self.valid_png_path = os.path.join(
370            os.path.dirname(__file__), 'sample_png_valid.png')
371        self.valid_png = open(self.valid_png_path, 'rb').read()
372        self.valid_pdf_path = os.path.join(
373            os.path.dirname(__file__), 'sample_pdf_valid.pdf')
374        self.valid_pdf = open(self.valid_pdf_path, 'rb').read()
375        return
376
377    def test_none(self):
378        # ``None`` is not a file and not a valid file format
379        self.assertEqual(helpers.get_fileformat(None), None)
380        return
381
382    def test_path_and_bytestream(self):
383        # get_fileformat accepts bytestreams and paths as arg.
384        self.assertEqual(
385            helpers.get_fileformat(None, self.valid_jpg), 'jpg')
386        self.assertEqual(
387            helpers.get_fileformat(self.valid_jpg_path), 'jpg')
388        # path is ignored when giving a bytestream
389        self.assertEqual(
390            helpers.get_fileformat('blah', self.valid_jpg), 'jpg')
391        return
392
393    def test_jpg(self):
394        # we recognize jpeg images.
395        self.assertEqual(
396            helpers.get_fileformat(self.valid_jpg_path), 'jpg')
397        self.assertEqual(
398            helpers.get_fileformat(None, self.valid_jpg), 'jpg')
399        return
400
401    def test_png(self):
402        # we recognize png images.
403        self.assertEqual(
404            helpers.get_fileformat(self.valid_png_path), 'png')
405        self.assertEqual(
406            helpers.get_fileformat(None, self.valid_png), 'png')
407        return
408
409    def test_pdf(self):
410        # we recognize pdf documents.
411        self.assertEqual(
412            helpers.get_fileformat(self.valid_pdf_path), 'pdf')
413        self.assertEqual(
414            helpers.get_fileformat(None, self.valid_pdf), 'pdf')
415        return
416
417class MergeCSVFileTestCase(unittest.TestCase):
418
419    def setUp(self):
420        self.workdir = tempfile.mkdtemp()
421        self.path1 = os.path.join(self.workdir, 'myfile1')
422        self.path2 = os.path.join(self.workdir, 'myfile2')
423        self.result_path = None
424        return
425
426    def tearDown(self):
427        shutil.rmtree(self.workdir)
428        if self.result_path is not None and os.path.exists(self.result_path):
429            os.unlink(self.result_path)
430        return
431
432    def test_basic(self):
433        # we can merge very basic CSV files
434        open(self.path1, 'wb').write('name,age\nManfred,32\n')
435        open(self.path2, 'wb').write('name,age\nBarney,28\n')
436        self.result_path = helpers.merge_csv_files(self.path1, self.path2)
437        contents = open(self.result_path, 'r').read()
438        self.assertEqual(
439            contents,
440            'age,name\r\n'
441            '32,Manfred\r\n'
442            '28,Barney\r\n')
443        return
444
445    def test_different_col_order(self):
446        # if cols of both files have different order, that won't stop us
447        open(self.path1, 'wb').write('name,age\nManfred,32\n')
448        open(self.path2, 'wb').write('age,name\n28,Barney\n')
449        self.result_path = helpers.merge_csv_files(self.path1, self.path2)
450        contents = open(self.result_path, 'r').read()
451        self.assertEqual(
452            contents,
453            'age,name\r\n'
454            '32,Manfred\r\n'
455            '28,Barney\r\n')
456        return
457
458    def test_different_cols_at_all(self):
459        # also cols available only in one file will work.
460        open(self.path1, 'wb').write('name,age\nManfred,32\n')
461        open(self.path2, 'wb').write('name,age,buddy\nBarney,28,Manfred\n')
462        self.result_path = helpers.merge_csv_files(self.path1, self.path2)
463        contents = open(self.result_path, 'r').read()
464        self.assertEqual(
465            contents,
466            'age,buddy,name\r\n'
467            '32,,Manfred\r\n'
468            '28,Manfred,Barney\r\n')
469        return
470
471    def test_one_empty_input(self):
472        # we cope even with nearly empty input (one file with no data)
473        open(self.path1, 'wb').write('\n')
474        open(self.path2, 'wb').write('name,age\nManfred,32\n')
475        self.result_path = helpers.merge_csv_files(self.path1, self.path2)
476        contents = open(self.result_path, 'r').read()
477        self.assertEqual(
478            contents,
479            'age,name\r\n'
480            '32,Manfred\r\n')
481        return
482
483    def test_two_empty_inputs(self):
484        # we cope even with empty input (two files with no data)
485        open(self.path1, 'wb').write('\n')
486        open(self.path2, 'wb').write('\n')
487        self.result_path = helpers.merge_csv_files(self.path1, self.path2)
488        contents = open(self.result_path, 'r').read()
489        self.assertEqual(
490            contents, '\r\n')
491        return
492
493
494class CheckCSVCharsetTestCase(unittest.TestCase):
495
496    def test_valid_data1(self):
497        csv = (
498            'col1,col2,col3\n'
499            'val1,val2,val3\n'
500            ).splitlines()
501        self.assertEqual(helpers.check_csv_charset(csv), None)
502
503    def test_valid_data2(self):
504        csv = (
505            "code,title,title_prefix\n"
506            "FAC1,Faculty 1,faculty\n"
507            "FAC2,Faculty 2,institute\n"
508            "FAC3,Fäcülty 3,school\n"
509            ).splitlines()
510        self.assertEqual(helpers.check_csv_charset(csv), None)
511
512    def test_invalid_data1(self):
513        csv = (
514            'col1,col2,col3\n' +
515            chr(0x92) + 'val1,val2,val3\n'
516            ).splitlines()
517        self.assertEqual(helpers.check_csv_charset(csv), 1)
518
519    def test_invalid_data2(self):
520        csv = (
521            'some text that \n'
522            '\n'      # this empty line will break
523            'is not a csv file \n' + chr(0x92) + '\n'
524            ).splitlines()
525        self.assertEqual(helpers.check_csv_charset(csv), 2)
526
527def test_suite():
528    suite = unittest.TestSuite()
529    # Register local test cases...
530    for testcase in [
531        ReST2HTMLTestCase,
532        FactoryBaseTestCase,
533        CopyFileSystemTreeTestCase,
534        RemoveFileOrDirectoryTestCase,
535        CurrentPrincipalTestCase,
536        CmpFilesTestCase,
537        FileSizeTestCase,
538        IfaceNamesTestCase,
539        DateTimeHelpersTestCase,
540        GetFileFormatTestCase,
541        MergeCSVFileTestCase,
542        SimpleHelpersTestCase,
543        CheckCSVCharsetTestCase,
544        ]:
545        suite.addTests(
546            unittest.TestLoader().loadTestsFromTestCase(testcase)
547            )
548    # Add tests from docstrings in helpers.py...
549    suite.addTests(
550        doctest.DocTestSuite(
551            helpers,
552            optionflags = doctest.ELLIPSIS + doctest.REPORT_NDIFF,
553            )
554        )
555    return suite
556
557if __name__ == '__main__':
558    unittest.main(defaultTest='test_suite')
Note: See TracBrowser for help on using the repository browser.