source: main/waeup.sirp/trunk/src/waeup/sirp/tests/test_imagestorage.py @ 7010

Last change on this file since 7010 was 6529, checked in by uli, 13 years ago

Add more tests, most covering the brain-dead problems we had with
Blobs and to ensure this problems are gone now.

File size: 11.0 KB
Line 
1import os
2import sys
3import tempfile
4import shutil
5import unittest
6from StringIO import StringIO
7from hurry.file.interfaces import IFileRetrieval
8from zope.component.hooks import setSite
9from zope.interface.verify import verifyClass, verifyObject
10from waeup.sirp.app import University
11from waeup.sirp.testing import FunctionalLayer, FunctionalTestCase
12from waeup.sirp.image import createWAeUPImageFile
13from waeup.sirp.imagestorage import (
14    md5digest, Basket, ImageStorage, ImageStorageFileRetrieval,)
15
16class HelperFuncsTests(unittest.TestCase):
17
18    def setUp(self):
19        self.workdir = tempfile.mkdtemp()
20
21    def tearDown(self):
22        shutil.rmtree(self.workdir)
23
24    def test_md5digest(self):
25        samplefile = os.path.join(self.workdir, 'sample')
26        open(samplefile, 'wb').write('blah')
27        fp = open(samplefile, 'r')
28        digest = md5digest(fp)
29        self.assertEqual(digest, '6f1ed002ab5595859014ebf0951522d9')
30
31class BasketTests(FunctionalTestCase):
32
33    layer = FunctionalLayer
34
35    def setUp(self):
36        super(BasketTests, self).setUp()
37        self.workdir = tempfile.mkdtemp()
38        self.samplefile = os.path.join(self.workdir, 'sample')
39        self.otherfile = os.path.join(self.workdir, 'other')
40        open(self.samplefile, 'wb').write('Hi there!')
41        open(self.otherfile, 'wb').write('Hi from other!')
42        self.basket = Basket()
43        self.fd = open(self.samplefile, 'r')
44        self.fd2 = open(self.otherfile, 'r')
45        self.stderr = StringIO()
46        self.old_stderr = sys.stderr
47
48    def tearDown(self):
49        sys.stderr = self.old_stderr
50        super(BasketTests, self).tearDown()
51        self.fd.close()
52        shutil.rmtree(self.workdir)
53        self.basket._del() # Remove subojects explicitly
54        del self.basket
55        return
56
57    def test_ifaces(self):
58        pass
59
60    def test_curr_id_empty(self):
61        curr_id = self.basket.curr_id
62        self.assertEqual(curr_id, '1')
63
64    def test_getInternalId_empty(self):
65        basket_id = self.basket.getInternalId(self.fd)
66        self.assertTrue(basket_id is None)
67
68    def test_storeFile_single(self):
69        basket_id = self.basket.storeFile(self.fd, 'sample')
70        self.assertEqual(basket_id, '1')
71        contents = self.basket['1'].open('r').read()
72        self.assertEqual(contents, 'Hi there!')
73
74    def test_storeFile_double(self):
75        basket_id1 = self.basket.storeFile(self.fd, 'sample')
76        basket_id2 = self.basket.storeFile(self.fd, 'sample')
77        self.assertTrue(basket_id1 == basket_id2 == '1')
78        contents = self.basket['1'].open('r').read()
79        self.assertEqual(contents, 'Hi there!')
80
81    def test_storeFile_multiple(self):
82        basket_id1 = self.basket.storeFile(self.fd, 'sample')
83        basket_id2 = self.basket.storeFile(self.fd2, 'sample')
84        self.assertEqual(basket_id1, '1')
85        self.assertEqual(basket_id2, '2')
86        contents1 = self.basket['1'].open('r').read()
87        contents2 = self.basket['2'].open('r').read()
88        self.assertEqual(contents1, 'Hi there!')
89        self.assertEqual(contents2, 'Hi from other!')
90
91    def test_retrieveFile(self):
92        basket_id = self.basket.storeFile(self.fd, 'sample')
93        fd = self.basket.retrieveFile(basket_id)
94        result = fd.read()
95        self.assertEqual(result, 'Hi there!')
96
97    def test_retrieveFile_not_existent(self):
98        result = self.basket.retrieveFile('not-a-valid-basket-id')
99        self.assertTrue(result is None)
100
101    def test_detect_zero_length_blobs(self):
102        # Ensure we get a warning when an empty Blob is found
103        self.basket.storeFile(self.fd, 'sample')
104        self.basket['1'].open('w').write('')
105        self.fd.seek(0)
106        sys.stderr = self.stderr         # Redirect stderr
107        self.basket.storeFile(self.fd, 'sample')
108        sys.stderr = self.old_stderr     # Restore stderr
109        self.stderr.seek(0)
110        self.assertTrue(
111            "EMPTY BLOB DETECTED" in self.stderr.read())
112
113    def test_refill_zero_length_blobs(self):
114        # When we detect an empty Blob, it will be reused
115        self.basket.storeFile(self.fd, 'sample')
116        self.basket['1'].open('w').write('')
117        self.fd.seek(0)
118        sys.stderr = self.stderr         # Redirect stderr
119        self.basket.storeFile(self.fd, 'sample')
120        sys.stderr = self.old_stderr     # Restore stderr
121        contents = self.basket['1'].open('r').read()
122        self.assertEqual(contents, 'Hi there!')
123
124
125class ImageStorageTests(FunctionalTestCase):
126
127    layer = FunctionalLayer
128
129    def setUp(self):
130        super(ImageStorageTests, self).setUp()
131        self.workdir = tempfile.mkdtemp()
132        self.samplefile = os.path.join(self.workdir, 'sample')
133        self.otherfile = os.path.join(self.workdir, 'other')
134        open(self.samplefile, 'wb').write('Hi there!')
135        open(self.otherfile, 'wb').write('Hi from other!')
136        self.storage = ImageStorage()
137        self.fd = open(self.samplefile, 'r')
138        self.fd2 = open(self.otherfile, 'r')
139
140    def tearDown(self):
141        super(ImageStorageTests, self).tearDown()
142        self.fd.close()
143        self.fd2.close()
144        shutil.rmtree(self.workdir)
145        self.storage._del() # Remove subojects explicitly
146        del self.storage
147        return
148
149    def test_ifaces(self):
150        pass
151
152    def test_storeFile(self):
153        full_id = self.storage.storeFile(self.fd, 'sample.txt')
154        self.assertEqual(full_id, '396199333edbf40ad43e62a1c1397793-1')
155
156    def test_storeFile_duplicate(self):
157        full_id1 = self.storage.storeFile(self.fd, 'sample1.txt')
158        full_id2 = self.storage.storeFile(self.fd, 'sample2.txt')
159        full_id3 = self.storage.storeFile(self.fd, 'sample1.txt')
160        self.assertEqual(full_id1, '396199333edbf40ad43e62a1c1397793-1')
161        self.assertEqual(full_id2, '396199333edbf40ad43e62a1c1397793-1')
162        self.assertEqual(full_id3, '396199333edbf40ad43e62a1c1397793-1')
163        contents = self.storage.retrieveFile(
164            '396199333edbf40ad43e62a1c1397793-1').read()
165        self.assertEqual(contents, 'Hi there!')
166
167    def test_storeFile_multiple(self):
168        full_id1 = self.storage.storeFile(self.fd, 'sample1.txt')
169        full_id2 = self.storage.storeFile(self.fd2, 'sample1.txt')
170        self.assertEqual(full_id1, '396199333edbf40ad43e62a1c1397793-1')
171        self.assertEqual(full_id2, '6936fcf8d564f1c5be5a017e650c5e8f-1')
172
173    def test_retrieveFile_not_existent(self):
174        result = self.storage.retrieveFile('not-existent')
175        self.assertTrue(result is None)
176
177    def test_retrieveFile_illegal_marker(self):
178        result1 = self.storage.retrieveFile('really-not-existent')
179        result2 = self.storage.retrieveFile('notexistent')
180        self.assertTrue(result1 is result2 is None)
181
182    def test_retrieveFile(self):
183        full_id = self.storage.storeFile(self.fd, 'sample.txt')
184        result = self.storage.retrieveFile(full_id)
185        content = result.read()
186        self.assertEqual(content, 'Hi there!')
187
188    def test_retrieveFile_multiple(self):
189        full_id1 = self.storage.storeFile(self.fd, 'sample.txt')
190        full_id2 = self.storage.storeFile(self.fd2, 'other.txt')
191        result1 = self.storage.retrieveFile(full_id1)
192        result2 = self.storage.retrieveFile(full_id2)
193        content1 = result1.read()
194        content2 = result2.read()
195        self.assertEqual(content1, 'Hi there!')
196        self.assertEqual(content2, 'Hi from other!')
197
198class ImageStorageFileRetrievalTests(FunctionalTestCase):
199
200    layer = FunctionalLayer
201
202    def setUp(self):
203        super(ImageStorageFileRetrievalTests, self).setUp()
204        self.workdir = tempfile.mkdtemp()
205        self.samplefile = os.path.join(self.workdir, 'sample')
206        self.otherfile = os.path.join(self.workdir, 'other')
207        open(self.samplefile, 'wb').write('Hi there!')
208        open(self.otherfile, 'wb').write('Hi from other!')
209        self.storage = ImageStorage()
210        self.fd = open(self.samplefile, 'r')
211        self.fd2 = open(self.otherfile, 'r')
212        # Set up a single image storage in a site
213        self.getRootFolder()['app'] = University()
214        self.app = self.getRootFolder()['app']
215        if not 'images' in self.app.keys():
216            self.app['images'] = ImageStorage()
217        self.storage = self.app['images']
218        return
219
220
221    def tearDown(self):
222        super(ImageStorageFileRetrievalTests, self).tearDown()
223        self.fd.close()
224        self.fd2.close()
225        shutil.rmtree(self.workdir)
226        self.storage._del() # Remove subojects explicitly
227        return
228
229    def test_ifaces(self):
230        retrieval = ImageStorageFileRetrieval()
231        assert verifyClass(IFileRetrieval, ImageStorageFileRetrieval)
232        assert verifyObject(IFileRetrieval, retrieval)
233        return
234
235    def test_getImageStorage_nosite(self):
236        retrieval = ImageStorageFileRetrieval()
237        storage = retrieval.getImageStorage()
238        self.assertTrue(storage is None)
239        return
240
241    def test_getImageStorage(self):
242        setSite(self.app)
243        retrieval = ImageStorageFileRetrieval()
244        storage = retrieval.getImageStorage()
245        self.assertTrue(storage is self.storage)
246        return
247
248    def test_isImageStorageEnabled_nosite(self):
249        retrieval = ImageStorageFileRetrieval()
250        self.assertTrue(retrieval.isImageStorageEnabled() is False)
251        return
252
253    def test_isImageStorageEnabled(self):
254        setSite(self.app)
255        retrieval = ImageStorageFileRetrieval()
256        self.assertTrue(retrieval.isImageStorageEnabled() is True)
257        return
258
259    def test_getFile_nosite(self):
260        retrieval = ImageStorageFileRetrieval()
261        f = retrieval.getFile('Hi there!')
262        self.assertEqual(f.read(), 'Hi there!')
263        return
264
265    def test_getFile(self):
266        setSite(self.app)
267        retrieval = ImageStorageFileRetrieval()
268        waeup_image = retrieval.createFile('sample.txt', self.fd)
269        full_id = waeup_image.data
270        result = retrieval.getFile(full_id)
271        self.assertEqual(result.read(), 'Hi there!')
272        return
273
274    def test_createFile_nosite(self):
275        retrieval = ImageStorageFileRetrieval()
276        waeup_image = retrieval.createFile('sample.txt', self.fd)
277        self.assertEqual(waeup_image.data, 'Hi there!')
278        return
279
280    def test_createFile(self):
281        # Ensure we can create WAeUPImageFiles when in site
282        setSite(self.app)
283        retrieval = ImageStorageFileRetrieval()
284        waeup_image = retrieval.createFile('sample.txt', self.fd)
285        full_id = waeup_image.data
286        self.assertEqual(full_id, '396199333edbf40ad43e62a1c1397793-1')
287        return
288
289    def test_waeupimagefile(self):
290        # Make sure WAeUPImageFile can use our file retrieval
291        setSite(self.app)
292        myfile = createWAeUPImageFile('sample.jpg', self.fd)
293        contents = myfile.file.read()
294        self.assertEqual(contents, 'Hi there!')
295
296    def test_waeupimagefile_raw(self):
297        # Make sure we can retrieve a file also if it was initialized
298        # with no image storage available
299        myfile = createWAeUPImageFile('sample.jpg', self.fd)
300        setSite(self.app)
301        contents = myfile.file.read()
302        self.assertEqual(contents, 'Hi there!')
Note: See TracBrowser for help on using the repository browser.