source: main/waeup.kofa/branches/uli-async-update/src/waeup/kofa/utils/tests/test_helpers.py @ 9214

Last change on this file since 9214 was 8632, checked in by uli, 13 years ago

Make sure we can cope with 'broken' (empty) CSV file input when
merging CSV files.

  • Property svn:keywords set to Id
File size: 16.1 KB
Line 
1# -*- coding: utf-8 -*-
2
3## $Id: test_helpers.py 8632 2012-06-06 01:10:51Z uli $
4##
5## Copyright (C) 2011 Uli Fouquet & Henrik Bettermann
6## This program is free software; you can redistribute it and/or modify
7## it under the terms of the GNU General Public License as published by
8## the Free Software Foundation; either version 2 of the License, or
9## (at your option) any later version.
10##
11## This program is distributed in the hope that it will be useful,
12## but WITHOUT ANY WARRANTY; without even the implied warranty of
13## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14## GNU General Public License for more details.
15##
16## You should have received a copy of the GNU General Public License
17## along with this program; if not, write to the Free Software
18## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19##
20
21import datetime
22import os
23import pytz
24import shutil
25import tempfile
26import unittest
27import doctest
28from cStringIO import StringIO
29from zope import schema
30from zope.interface import Interface, Attribute, implements
31from zope.security.testing import Principal, Participation
32from zope.security.management import newInteraction, endInteraction
33from waeup.kofa.utils import helpers
34
35class IFakeObject(Interface):
36    """Some marker interface."""
37
38class FakeObject(object):
39    implements(IFakeObject)
40
41class RemoveFileOrDirectoryTestCase(unittest.TestCase):
42
43    def setUp(self):
44        self.dirpath = tempfile.mkdtemp()
45        self.filepath = os.path.join(self.dirpath, 'somefile')
46        self.non_file = os.path.join(self.dirpath, 'nonfile')
47        open(self.filepath, 'wb').write('Hi!')
48        return
49
50    def tearDown(self):
51        if os.path.exists(self.dirpath):
52            shutil.rmtree(self.dirpath)
53        return
54
55    def test_handle_not_existing_path(self):
56        result = helpers.remove_file_or_directory(self.non_file)
57        self.assertTrue(result is None)
58        return
59
60    def test_handle_dir(self):
61        helpers.remove_file_or_directory(self.dirpath)
62        self.assertFalse(
63            os.path.exists(self.dirpath)
64            )
65        return
66
67    def test_handle_file(self):
68        helpers.remove_file_or_directory(self.filepath)
69        self.assertFalse(
70            os.path.exists(self.filepath)
71            )
72        return
73
74class CopyFileSystemTreeTestCase(unittest.TestCase):
75    # Test edge cases of copy_filesystem_tree().
76    #
77    # This is a typical case of tests not written as doctest as it is
78    # normally not interesting for developers and we only want to make
79    # sure everything works as expected.
80    def setUp(self):
81        self.existing_src = tempfile.mkdtemp()
82        self.filepath = os.path.join(self.existing_src, 'somefile')
83        open(self.filepath, 'wb').write('Hi!')
84        self.existing_dst = tempfile.mkdtemp()
85        self.not_existing_dir = tempfile.mkdtemp()
86        shutil.rmtree(self.not_existing_dir)
87
88        pass
89
90    def tearDown(self):
91        shutil.rmtree(self.existing_src)
92        shutil.rmtree(self.existing_dst)
93        pass
94
95    def test_source_and_dst_existing(self):
96        helpers.copy_filesystem_tree(self.existing_src, self.existing_dst)
97        self.assertTrue(
98            os.path.exists(
99                os.path.join(self.existing_dst, 'somefile')
100                )
101            )
102        return
103
104    def test_source_not_existing(self):
105        self.assertRaises(
106            ValueError,
107            helpers.copy_filesystem_tree,
108            self.not_existing_dir,
109            self.existing_dst
110            )
111        return
112
113    def test_dest_not_existing(self):
114        self.assertRaises(
115            ValueError,
116            helpers.copy_filesystem_tree,
117            self.existing_src,
118            self.not_existing_dir
119            )
120        return
121
122    def test_src_not_a_dir(self):
123        self.assertRaises(
124            ValueError,
125            helpers.copy_filesystem_tree,
126            self.filepath,
127            self.existing_dst
128            )
129        return
130
131    def test_dst_not_a_dir(self):
132        self.assertRaises(
133            ValueError,
134            helpers.copy_filesystem_tree,
135            self.existing_src,
136            self.filepath
137            )
138        return
139
140class ReST2HTMLTestCase(unittest.TestCase):
141
142    def setUp(self):
143        self.expected = u'<div class="document">\n\n\n<p>Some '
144        self.expected += u'test with \xfcmlaut</p>\n</div>'
145        return
146
147    def test_ascii_umlauts(self):
148        # Make sure we convert umlauts correctly to unicode.
149        source = 'Some test with ümlaut'
150        result = helpers.ReST2HTML(source)
151        self.assertEqual(result, self.expected)
152
153    def test_unicode_umlauts(self):
154        # Make sure we convert umlauts correctly to unicode.
155        source = u'Some test with ümlaut'
156        result = helpers.ReST2HTML(source)
157        self.assertEqual(result, self.expected)
158
159    def test_unicode_output_from_ascii(self):
160        source = 'Some test with ümlaut'
161        self.assertTrue(isinstance(helpers.ReST2HTML(source), unicode))
162
163    def test_unicode_output_from_unicode(self):
164        source = u'Some test with ümlaut'
165        self.assertTrue(isinstance(helpers.ReST2HTML(source), unicode))
166
167
168class FactoryBaseTestCase(unittest.TestCase):
169
170    def test_ifaces(self):
171        # We test all relevant parts in the docstring. But the interfaces
172        # method has to be tested to please the coverage report as well.
173        factory = helpers.FactoryBase()
174        factory.factory = FakeObject
175        self.assertTrue(factory.getInterfaces()(IFakeObject))
176        return
177
178class CurrentPrincipalTestCase(unittest.TestCase):
179
180    def tearDown(test):
181        endInteraction() # Just in case, one is still lingering around
182
183    def test_existing_principal(self):
184        # We can get the current principal if one is involved
185        principal = Principal('myprincipal')
186        newInteraction(Participation(principal))
187        result = helpers.get_current_principal()
188        self.assertTrue(result is principal)
189
190    def test_no_participation(self):
191        # Interactions without participation are handled correctly
192        newInteraction()
193        result = helpers.get_current_principal()
194        self.assertTrue(result is None)
195
196    def test_not_existing_principal(self):
197        # Missing interactions do not raise errors.
198        result = helpers.get_current_principal()
199        self.assertTrue(result is None)
200
201class CmpFilesTestCase(unittest.TestCase):
202
203    def setUp(self):
204        self.workdir = tempfile.mkdtemp()
205
206    def tearDown(self):
207        shutil.rmtree(self.workdir)
208
209    def test_equal(self):
210        p1 = os.path.join(self.workdir, 'sample1')
211        p2 = os.path.join(self.workdir, 'sample2')
212        open(p1, 'wb').write('Hi!')
213        open(p2, 'wb').write('Hi!')
214        assert helpers.cmp_files(open(p1, 'r'), open(p2, 'r')) is True
215
216    def test_unequal(self):
217        p1 = os.path.join(self.workdir, 'sample1')
218        p2 = os.path.join(self.workdir, 'sample2')
219        open(p1, 'wb').write('Hi!')
220        open(p2, 'wb').write('Ho!')
221        assert helpers.cmp_files(open(p1, 'r'), open(p2, 'r')) is False
222
223class FileSizeTestCase(unittest.TestCase):
224
225    def setUp(self):
226        self.workdir = tempfile.mkdtemp()
227
228    def tearDown(self):
229        shutil.rmtree(self.workdir)
230
231    def test_real_file(self):
232        # we can get the size of real files
233        path = os.path.join(self.workdir, 'sample.txt')
234        open(path, 'wb').write('My content')
235        self.assertEqual(
236            int(helpers.file_size(open(path, 'rb'))), 10)
237        return
238
239    def test_stringio_file(self):
240        # we can get the size of file-like objects
241        self.assertEqual(
242            helpers.file_size(StringIO('my sample content')), 17)
243
244class IfaceNamesTestCase(unittest.TestCase):
245
246    def test_iface_names(self):
247        class I1(Interface):
248            foo = Attribute("""Some Foo""")
249            def bar(blah):
250                pass
251            i1_name = schema.TextLine(title=u'i1 name')
252        class I2(I1):
253            baz = schema.TextLine(title=u'some baz')
254        class I3(I2):
255            pass
256
257        result1 = helpers.iface_names(I3)
258        result2 = helpers.iface_names(I2)
259        result3 = helpers.iface_names(I1)
260        result4 = helpers.iface_names(I3, exclude_attribs=False)
261        result5 = helpers.iface_names(I3, exclude_methods=False)
262        result6 = helpers.iface_names(I3, omit='i1_name')
263        self.assertEqual(sorted(result1), ['baz', 'i1_name'])
264        self.assertEqual(sorted(result2), ['baz', 'i1_name'])
265        self.assertEqual(sorted(result3), ['i1_name'])
266        self.assertEqual(sorted(result4), ['baz', 'foo', 'i1_name'])
267        self.assertEqual(sorted(result5), ['bar', 'baz', 'i1_name'])
268        self.assertEqual(sorted(result6), ['baz'])
269        return
270
271class DateTimeHelpersTestCase(unittest.TestCase):
272
273    def test_now(self):
274        tz_berlin = pytz.timezone('Europe/Berlin')
275        result1 = helpers.now()
276        result2 = helpers.now(tz_berlin)
277        self.assertEqual(result1.tzinfo, pytz.utc)
278        self.assertFalse(result2.tzinfo == pytz.utc)
279        self.assertFalse(result2.tzinfo is None)
280        return
281
282    def test_to_timezone(self):
283        fmt = '%Y-%m-%d %H:%M:%S %Z%z'
284        tz_berlin = pytz.timezone('Europe/Berlin')
285        tz_lagos = pytz.timezone('Africa/Lagos')
286        dt1 = datetime.datetime(2012, 1, 1, 0, 0)
287        dt2 = datetime.datetime(2012, 1, 1, 0, 0, tzinfo=tz_berlin)
288        dt3 = datetime.datetime(2012, 6, 1, 0, 0, tzinfo=tz_lagos)
289        dt4 = datetime.datetime(2012, 6, 1, 0, 0, tzinfo=tz_berlin)
290        result1 = helpers.to_timezone(dt1)
291        result2 = helpers.to_timezone(dt1, pytz.utc)
292        result3 = helpers.to_timezone(dt2)
293        result4 = helpers.to_timezone(dt2, tz_lagos)
294        result5 = helpers.to_timezone(dt3, tz_berlin)
295        result6 = helpers.to_timezone(dt4, tz_lagos)
296        self.assertEqual(
297            result1.strftime(fmt), '2012-01-01 00:00:00 UTC+0000')
298        self.assertEqual(
299            result2.strftime(fmt), '2012-01-01 00:00:00 UTC+0000')
300        self.assertEqual(
301            result3.strftime(fmt), '2011-12-31 23:00:00 UTC+0000')
302        self.assertEqual(
303            result4.strftime(fmt), '2012-01-01 00:00:00 WAT+0100')
304        self.assertEqual(
305            result5.strftime(fmt), '2012-06-01 01:46:00 CEST+0200')
306        self.assertEqual(
307            result6.strftime(fmt), '2012-06-01 00:00:00 WAT+0100')
308        return
309
310    def test_to_timezone_no_dt(self):
311        # the to_timezone function copes with dates (!= datetimes)
312        d = datetime.date(2012, 12, 1)
313        result1 = helpers.to_timezone(d)
314        result2 = helpers.to_timezone(d, pytz.utc)
315        self.assertEqual(result1, d)
316        self.assertEqual(result2, d)
317        return
318
319class GetFileFormatTestCase(unittest.TestCase):
320    # Tests for the get_fileformat helper.
321
322    def setUp(self):
323        self.valid_jpg_path = os.path.join(
324            os.path.dirname(__file__), 'sample_jpg_valid.jpg')
325        self.valid_jpg = open(self.valid_jpg_path, 'rb').read()
326        self.valid_png_path = os.path.join(
327            os.path.dirname(__file__), 'sample_png_valid.png')
328        self.valid_png = open(self.valid_png_path, 'rb').read()
329        self.valid_pdf_path = os.path.join(
330            os.path.dirname(__file__), 'sample_pdf_valid.pdf')
331        self.valid_pdf = open(self.valid_pdf_path, 'rb').read()
332        return
333
334    def test_none(self):
335        # ``None`` is not a file and not a valid file format
336        self.assertEqual(helpers.get_fileformat(None), None)
337        return
338
339    def test_path_and_bytestream(self):
340        # get_fileformat accepts bytestreams and paths as arg.
341        self.assertEqual(
342            helpers.get_fileformat(None, self.valid_jpg), 'jpg')
343        self.assertEqual(
344            helpers.get_fileformat(self.valid_jpg_path), 'jpg')
345        # path is ignored when giving a bytestream
346        self.assertEqual(
347            helpers.get_fileformat('blah', self.valid_jpg), 'jpg')
348        return
349
350    def test_jpg(self):
351        # we recognize jpeg images.
352        self.assertEqual(
353            helpers.get_fileformat(self.valid_jpg_path), 'jpg')
354        self.assertEqual(
355            helpers.get_fileformat(None, self.valid_jpg), 'jpg')
356        return
357
358    def test_png(self):
359        # we recognize png images.
360        self.assertEqual(
361            helpers.get_fileformat(self.valid_png_path), 'png')
362        self.assertEqual(
363            helpers.get_fileformat(None, self.valid_png), 'png')
364        return
365
366    def test_pdf(self):
367        # we recognize pdf documents.
368        self.assertEqual(
369            helpers.get_fileformat(self.valid_pdf_path), 'pdf')
370        self.assertEqual(
371            helpers.get_fileformat(None, self.valid_pdf), 'pdf')
372        return
373
374class MergeCSVFileTestCase(unittest.TestCase):
375
376    def setUp(self):
377        self.workdir = tempfile.mkdtemp()
378        self.path1 = os.path.join(self.workdir, 'myfile1')
379        self.path2 = os.path.join(self.workdir, 'myfile2')
380        self.result_path = None
381        return
382
383    def tearDown(self):
384        shutil.rmtree(self.workdir)
385        if self.result_path is not None and os.path.exists(self.result_path):
386            os.unlink(self.result_path)
387        return
388
389    def test_basic(self):
390        # we can merge very basic CSV files
391        open(self.path1, 'wb').write('name,age\nManfred,32\n')
392        open(self.path2, 'wb').write('name,age\nBarney,28\n')
393        self.result_path = helpers.merge_csv_files(self.path1, self.path2)
394        contents = open(self.result_path, 'r').read()
395        self.assertEqual(
396            contents,
397            'age,name\r\n'
398            '32,Manfred\r\n'
399            '28,Barney\r\n')
400        return
401
402    def test_different_col_order(self):
403        # if cols of both files have different order, that won't stop us
404        open(self.path1, 'wb').write('name,age\nManfred,32\n')
405        open(self.path2, 'wb').write('age,name\n28,Barney\n')
406        self.result_path = helpers.merge_csv_files(self.path1, self.path2)
407        contents = open(self.result_path, 'r').read()
408        self.assertEqual(
409            contents,
410            'age,name\r\n'
411            '32,Manfred\r\n'
412            '28,Barney\r\n')
413        return
414
415    def test_different_cols_at_all(self):
416        # also cols available only in one file will work.
417        open(self.path1, 'wb').write('name,age\nManfred,32\n')
418        open(self.path2, 'wb').write('name,age,buddy\nBarney,28,Manfred\n')
419        self.result_path = helpers.merge_csv_files(self.path1, self.path2)
420        contents = open(self.result_path, 'r').read()
421        self.assertEqual(
422            contents,
423            'age,buddy,name\r\n'
424            '32,,Manfred\r\n'
425            '28,Manfred,Barney\r\n')
426        return
427
428    def test_one_empty_input(self):
429        # we cope even with nearly empty input (one file with no data)
430        open(self.path1, 'wb').write('\n')
431        open(self.path2, 'wb').write('name,age\nManfred,32\n')
432        self.result_path = helpers.merge_csv_files(self.path1, self.path2)
433        contents = open(self.result_path, 'r').read()
434        self.assertEqual(
435            contents,
436            'age,name\r\n'
437            '32,Manfred\r\n')
438        return
439
440    def test_two_empty_inputs(self):
441        # we cope even with empty input (two files with no data)
442        open(self.path1, 'wb').write('\n')
443        open(self.path2, 'wb').write('\n')
444        self.result_path = helpers.merge_csv_files(self.path1, self.path2)
445        contents = open(self.result_path, 'r').read()
446        self.assertEqual(
447            contents, '\r\n')
448        return
449
450def test_suite():
451    suite = unittest.TestSuite()
452    # Register local test cases...
453    for testcase in [
454        ReST2HTMLTestCase,
455        FactoryBaseTestCase,
456        CopyFileSystemTreeTestCase,
457        RemoveFileOrDirectoryTestCase,
458        CurrentPrincipalTestCase,
459        CmpFilesTestCase,
460        FileSizeTestCase,
461        IfaceNamesTestCase,
462        DateTimeHelpersTestCase,
463        GetFileFormatTestCase,
464        MergeCSVFileTestCase,
465        ]:
466        suite.addTests(
467            unittest.TestLoader().loadTestsFromTestCase(testcase)
468            )
469    # Add tests from docstrings in helpers.py...
470    suite.addTests(
471        doctest.DocTestSuite(
472            helpers,
473            optionflags = doctest.ELLIPSIS + doctest.REPORT_NDIFF,
474            )
475        )
476    return suite
477
478if __name__ == '__main__':
479    unittest.main(defaultTest='test_suite')
Note: See TracBrowser for help on using the repository browser.