Changeset 7063 for main/waeup.sirp/trunk/src/waeup/sirp/tests
- Timestamp:
- 9 Nov 2011, 15:42:45 (13 years ago)
- Location:
- main/waeup.sirp/trunk
- Files:
-
- 3 edited
Legend:
- Unmodified
- Added
- Removed
-
main/waeup.sirp/trunk
- Property svn:mergeinfo changed
/main/waeup.sirp/branches/ulif-extimgstore (added) merged: 7001-7002,7010-7011,7016,7031-7041,7043-7044,7046-7055
- Property svn:mergeinfo changed
-
main/waeup.sirp/trunk/src/waeup/sirp/tests/test_app.py
r6593 r7063 6 6 from zope.interface.verify import verifyClass, verifyObject 7 7 from waeup.sirp.app import University 8 from waeup.sirp.imagestorage import ImageStorageFileRetrieval9 8 from waeup.sirp.interfaces import IUniversity 10 9 from waeup.sirp.testing import FunctionalLayer, FunctionalTestCase … … 32 31 return 33 32 34 def test_images(self):35 # Make sure we have a image container in a university36 assert 'images' in self.app.keys()37 return38 39 33 def test_IFileRetrieval_utility(self): 40 34 # Make sure we can get a local IFileRetrieval utility … … 43 37 assert result is not None 44 38 assert IFileRetrieval.providedBy(result) 45 assert isinstance(result, ImageStorageFileRetrieval)46 39 return 47 40 -
main/waeup.sirp/trunk/src/waeup/sirp/tests/test_imagestorage.py
r6529 r7063 1 1 import os 2 import sys3 2 import tempfile 4 3 import shutil 5 4 import unittest 6 5 from StringIO import StringIO 6 from hurry.file import HurryFile 7 7 from hurry.file.interfaces import IFileRetrieval 8 from zope.component import ( 9 getUtility, provideUtility, queryUtility, provideAdapter) 8 10 from zope.component.hooks import setSite 9 11 from zope.interface.verify import verifyClass, verifyObject 10 12 from waeup.sirp.app import University 11 13 from waeup.sirp.testing import FunctionalLayer, FunctionalTestCase 12 from waeup.sirp.image import createWAeUPImageFile13 14 from waeup.sirp.imagestorage import ( 14 md5digest, Basket, ImageStorage, ImageStorageFileRetrieval,) 15 FileStoreNameChooser, ExtFileStore, DefaultFileStoreHandler, 16 DefaultStorage) 17 from waeup.sirp.interfaces import ( 18 IFileStoreNameChooser, IExtFileStore, IFileStoreHandler,) 15 19 16 20 class HelperFuncsTests(unittest.TestCase): … … 22 26 shutil.rmtree(self.workdir) 23 27 24 def test_md5digest(self): 25 samplefile = os.path.join(self.workdir, 'sample') 26 open(samplefile, 'wb').write('blah') 27 fp = open(samplefile, 'r') 28 digest = md5digest(fp) 29 self.assertEqual(digest, '6f1ed002ab5595859014ebf0951522d9') 30 31 class BasketTests(FunctionalTestCase): 28 class FileStoreNameChooserTests(FunctionalTestCase): 32 29 33 30 layer = FunctionalLayer 34 31 32 def test_iface(self): 33 # we provide the interfaces we promise to do 34 obj = FileStoreNameChooser(None) 35 verifyClass(IFileStoreNameChooser, FileStoreNameChooser) 36 verifyObject(IFileStoreNameChooser, obj) 37 return 38 39 def test_accessible_as_adapter(self): 40 # we can get a file name chooser via adapter 41 chooser = IFileStoreNameChooser(object()) 42 self.assertTrue( 43 isinstance(chooser, FileStoreNameChooser)) 44 return 45 46 def test_check_name(self): 47 # default file name choosers accept any string 48 chooser = FileStoreNameChooser(object()) 49 self.assertEqual(chooser.checkName('Hi there!'), True) 50 self.assertEqual(chooser.checkName(None), False) 51 return 52 53 def test_choose_name(self): 54 # we get a simple string if we do not pass in a valid string 55 chooser = FileStoreNameChooser(object()) 56 self.assertEqual(chooser.chooseName('myname'), 'myname') 57 self.assertEqual(chooser.chooseName(None), u'unknown_file') 58 return 59 60 class ExtFileStoreTests(unittest.TestCase): 61 # Test external file store (non-functional mode) 62 35 63 def setUp(self): 36 super(BasketTests, self).setUp() 64 self.workdir = tempfile.mkdtemp() 65 self.root = None 66 return 67 68 def tearDown(self): 69 shutil.rmtree(self.workdir) 70 if self.root is not None: 71 shutil.rmtree(self.root) 72 return 73 74 def test_iface(self): 75 obj = ExtFileStore(None) 76 verifyClass(IExtFileStore, ExtFileStore) 77 verifyObject(IExtFileStore, obj) 78 return 79 80 def test_root_setup_wo_site(self): 81 # if no site is available we can use a temporary root 82 fs = ExtFileStore() 83 self.root = fs.root 84 self.assertTrue(isinstance(self.root, basestring)) 85 self.assertTrue(os.path.exists(self.root)) 86 return 87 88 def test_create_instance(self): 89 storage1 = ExtFileStore() 90 storage2 = ExtFileStore(root=self.workdir) 91 self.root = storage1.root 92 self.assertTrue(storage1.root is not None) 93 self.assertTrue(storage1.root != storage2.root) 94 self.assertEqual(storage2.root, self.workdir) 95 return 96 97 def test_create_file(self): 98 # We can store files 99 storage = ExtFileStore(root=self.workdir) 100 dummy_file = StringIO('sample file') 101 image_file = storage.createFile('mysample', dummy_file) 102 self.assertTrue('mysample' in os.listdir(storage.root)) 103 self.assertEqual('mysample', image_file.data) 104 return 105 106 def test_get_file(self): 107 # We can get files after having them stored 108 storage = ExtFileStore(root=self.workdir) 109 dummy_file = StringIO('sample file') 110 image_file = storage.createFile('mysample', dummy_file) 111 result = storage.getFile(image_file.data) 112 self.assertEqual(result.read(), 'sample file') 113 return 114 115 def test_extract_marker(self): 116 # file stores support extracting markers from filenames 117 storage = ExtFileStore(root=self.workdir) 118 result1 = storage.extractMarker(None) 119 result2 = storage.extractMarker('') 120 result3 = storage.extractMarker('no-marker') 121 result4 = storage.extractMarker('no-marker.txt') 122 result5 = storage.extractMarker('__MARKER__foo.jpg') 123 result6 = storage.extractMarker('__MaRkEr__foo.jpg') 124 result7 = storage.extractMarker('__THE_MARKER__foo.jpg') 125 result8 = storage.extractMarker('__A_MARK__my__foo.jpg') 126 127 self.assertEqual(result1, ('', '', '', '')) 128 self.assertEqual(result2, ('', '', '', '')) 129 self.assertEqual(result3, ('', 'no-marker', 'no-marker', '')) 130 self.assertEqual(result4, ('', 'no-marker.txt', 'no-marker', '.txt')) 131 self.assertEqual(result5, ('marker', 'foo.jpg', 'foo', '.jpg')) 132 self.assertEqual(result6, ('marker', 'foo.jpg', 'foo', '.jpg')) 133 self.assertEqual(result7, ('the_marker', 'foo.jpg', 'foo', '.jpg')) 134 self.assertEqual(result8, ('a_mark', 'my__foo.jpg', 'my__foo', '.jpg')) 135 return 136 137 class DefaultFileStoreHandlerTests(unittest.TestCase): 138 139 def test_iface(self): 140 obj = DefaultFileStoreHandler() 141 verifyClass(IFileStoreHandler, DefaultFileStoreHandler) 142 verifyObject(IFileStoreHandler, obj) 143 return 144 145 class CustomizedFileHandler(object): 146 def pathFromFileID(self, store, root, file_id): 147 return os.path.join(root, file_id[12:]) 148 149 def createFile(self, store, root, file_id, filename, f): 150 path = self.pathFromFileID(store, root, file_id) 151 return f, path, HurryFile(filename, file_id) 152 153 class CustomContext(object): 154 pass 155 156 class CustomContextFileChooser(object): 157 def __init__(self, context): 158 self.context = context 159 160 def chooseName(self, name=None): 161 return '__mymarker__mysample.txt' 162 163 class FunctionalExtFileStoreTests(FunctionalTestCase): 164 165 layer = FunctionalLayer 166 167 def setUp(self): 168 super(FunctionalExtFileStoreTests, self).setUp() 37 169 self.workdir = tempfile.mkdtemp() 38 170 self.samplefile = os.path.join(self.workdir, 'sample') … … 40 172 open(self.samplefile, 'wb').write('Hi there!') 41 173 open(self.otherfile, 'wb').write('Hi from other!') 42 self.basket = Basket()43 174 self.fd = open(self.samplefile, 'r') 44 175 self.fd2 = open(self.otherfile, 'r') 45 self.stderr = StringIO() 46 self.old_stderr = sys.stderr 176 self.getRootFolder()['app'] = University() 177 self.app = self.getRootFolder()['app'] 178 self.app['datacenter'].setStoragePath(self.workdir) 179 # register a custom filename mangler 180 provideUtility( 181 CustomizedFileHandler(), IFileStoreHandler, name=u'mymarker') 182 # register a file chooser adapter for CustomContext 183 provideAdapter( 184 CustomContextFileChooser, 185 (CustomContext,), IFileStoreNameChooser) 186 return 187 47 188 48 189 def tearDown(self): 49 sys.stderr = self.old_stderr 50 super(BasketTests, self).tearDown() 51 self.fd.close() 52 shutil.rmtree(self.workdir) 53 self.basket._del() # Remove subojects explicitly 54 del self.basket 55 return 56 57 def test_ifaces(self): 58 pass 59 60 def test_curr_id_empty(self): 61 curr_id = self.basket.curr_id 62 self.assertEqual(curr_id, '1') 63 64 def test_getInternalId_empty(self): 65 basket_id = self.basket.getInternalId(self.fd) 66 self.assertTrue(basket_id is None) 67 68 def test_storeFile_single(self): 69 basket_id = self.basket.storeFile(self.fd, 'sample') 70 self.assertEqual(basket_id, '1') 71 contents = self.basket['1'].open('r').read() 72 self.assertEqual(contents, 'Hi there!') 73 74 def test_storeFile_double(self): 75 basket_id1 = self.basket.storeFile(self.fd, 'sample') 76 basket_id2 = self.basket.storeFile(self.fd, 'sample') 77 self.assertTrue(basket_id1 == basket_id2 == '1') 78 contents = self.basket['1'].open('r').read() 79 self.assertEqual(contents, 'Hi there!') 80 81 def test_storeFile_multiple(self): 82 basket_id1 = self.basket.storeFile(self.fd, 'sample') 83 basket_id2 = self.basket.storeFile(self.fd2, 'sample') 84 self.assertEqual(basket_id1, '1') 85 self.assertEqual(basket_id2, '2') 86 contents1 = self.basket['1'].open('r').read() 87 contents2 = self.basket['2'].open('r').read() 88 self.assertEqual(contents1, 'Hi there!') 89 self.assertEqual(contents2, 'Hi from other!') 90 91 def test_retrieveFile(self): 92 basket_id = self.basket.storeFile(self.fd, 'sample') 93 fd = self.basket.retrieveFile(basket_id) 94 result = fd.read() 95 self.assertEqual(result, 'Hi there!') 96 97 def test_retrieveFile_not_existent(self): 98 result = self.basket.retrieveFile('not-a-valid-basket-id') 99 self.assertTrue(result is None) 100 101 def test_detect_zero_length_blobs(self): 102 # Ensure we get a warning when an empty Blob is found 103 self.basket.storeFile(self.fd, 'sample') 104 self.basket['1'].open('w').write('') 105 self.fd.seek(0) 106 sys.stderr = self.stderr # Redirect stderr 107 self.basket.storeFile(self.fd, 'sample') 108 sys.stderr = self.old_stderr # Restore stderr 109 self.stderr.seek(0) 110 self.assertTrue( 111 "EMPTY BLOB DETECTED" in self.stderr.read()) 112 113 def test_refill_zero_length_blobs(self): 114 # When we detect an empty Blob, it will be reused 115 self.basket.storeFile(self.fd, 'sample') 116 self.basket['1'].open('w').write('') 117 self.fd.seek(0) 118 sys.stderr = self.stderr # Redirect stderr 119 self.basket.storeFile(self.fd, 'sample') 120 sys.stderr = self.old_stderr # Restore stderr 121 contents = self.basket['1'].open('r').read() 122 self.assertEqual(contents, 'Hi there!') 123 124 125 class ImageStorageTests(FunctionalTestCase): 126 127 layer = FunctionalLayer 128 129 def setUp(self): 130 super(ImageStorageTests, self).setUp() 131 self.workdir = tempfile.mkdtemp() 132 self.samplefile = os.path.join(self.workdir, 'sample') 133 self.otherfile = os.path.join(self.workdir, 'other') 134 open(self.samplefile, 'wb').write('Hi there!') 135 open(self.otherfile, 'wb').write('Hi from other!') 136 self.storage = ImageStorage() 137 self.fd = open(self.samplefile, 'r') 138 self.fd2 = open(self.otherfile, 'r') 139 140 def tearDown(self): 141 super(ImageStorageTests, self).tearDown() 190 super(FunctionalExtFileStoreTests, self).tearDown() 142 191 self.fd.close() 143 192 self.fd2.close() 144 193 shutil.rmtree(self.workdir) 145 self.storage._del() # Remove subojects explicitly 146 del self.storage 147 return 148 149 def test_ifaces(self): 150 pass 151 152 def test_storeFile(self): 153 full_id = self.storage.storeFile(self.fd, 'sample.txt') 154 self.assertEqual(full_id, '396199333edbf40ad43e62a1c1397793-1') 155 156 def test_storeFile_duplicate(self): 157 full_id1 = self.storage.storeFile(self.fd, 'sample1.txt') 158 full_id2 = self.storage.storeFile(self.fd, 'sample2.txt') 159 full_id3 = self.storage.storeFile(self.fd, 'sample1.txt') 160 self.assertEqual(full_id1, '396199333edbf40ad43e62a1c1397793-1') 161 self.assertEqual(full_id2, '396199333edbf40ad43e62a1c1397793-1') 162 self.assertEqual(full_id3, '396199333edbf40ad43e62a1c1397793-1') 163 contents = self.storage.retrieveFile( 164 '396199333edbf40ad43e62a1c1397793-1').read() 165 self.assertEqual(contents, 'Hi there!') 166 167 def test_storeFile_multiple(self): 168 full_id1 = self.storage.storeFile(self.fd, 'sample1.txt') 169 full_id2 = self.storage.storeFile(self.fd2, 'sample1.txt') 170 self.assertEqual(full_id1, '396199333edbf40ad43e62a1c1397793-1') 171 self.assertEqual(full_id2, '6936fcf8d564f1c5be5a017e650c5e8f-1') 172 173 def test_retrieveFile_not_existent(self): 174 result = self.storage.retrieveFile('not-existent') 175 self.assertTrue(result is None) 176 177 def test_retrieveFile_illegal_marker(self): 178 result1 = self.storage.retrieveFile('really-not-existent') 179 result2 = self.storage.retrieveFile('notexistent') 180 self.assertTrue(result1 is result2 is None) 181 182 def test_retrieveFile(self): 183 full_id = self.storage.storeFile(self.fd, 'sample.txt') 184 result = self.storage.retrieveFile(full_id) 185 content = result.read() 186 self.assertEqual(content, 'Hi there!') 187 188 def test_retrieveFile_multiple(self): 189 full_id1 = self.storage.storeFile(self.fd, 'sample.txt') 190 full_id2 = self.storage.storeFile(self.fd2, 'other.txt') 191 result1 = self.storage.retrieveFile(full_id1) 192 result2 = self.storage.retrieveFile(full_id2) 193 content1 = result1.read() 194 content2 = result2.read() 195 self.assertEqual(content1, 'Hi there!') 196 self.assertEqual(content2, 'Hi from other!') 197 198 class ImageStorageFileRetrievalTests(FunctionalTestCase): 199 200 layer = FunctionalLayer 201 202 def setUp(self): 203 super(ImageStorageFileRetrievalTests, self).setUp() 204 self.workdir = tempfile.mkdtemp() 205 self.samplefile = os.path.join(self.workdir, 'sample') 206 self.otherfile = os.path.join(self.workdir, 'other') 207 open(self.samplefile, 'wb').write('Hi there!') 208 open(self.otherfile, 'wb').write('Hi from other!') 209 self.storage = ImageStorage() 210 self.fd = open(self.samplefile, 'r') 211 self.fd2 = open(self.otherfile, 'r') 212 # Set up a single image storage in a site 213 self.getRootFolder()['app'] = University() 214 self.app = self.getRootFolder()['app'] 215 if not 'images' in self.app.keys(): 216 self.app['images'] = ImageStorage() 217 self.storage = self.app['images'] 218 return 219 220 221 def tearDown(self): 222 super(ImageStorageFileRetrievalTests, self).tearDown() 223 self.fd.close() 224 self.fd2.close() 225 shutil.rmtree(self.workdir) 226 self.storage._del() # Remove subojects explicitly 227 return 228 229 def test_ifaces(self): 230 retrieval = ImageStorageFileRetrieval() 231 assert verifyClass(IFileRetrieval, ImageStorageFileRetrieval) 232 assert verifyObject(IFileRetrieval, retrieval) 233 return 234 235 def test_getImageStorage_nosite(self): 236 retrieval = ImageStorageFileRetrieval() 237 storage = retrieval.getImageStorage() 238 self.assertTrue(storage is None) 239 return 240 241 def test_getImageStorage(self): 194 return 195 196 def test_root_setup_w_site(self): 197 # if a site is available we use it to determine the root dir 198 fs = ExtFileStore() 242 199 setSite(self.app) 243 retrieval = ImageStorageFileRetrieval() 244 storage = retrieval.getImageStorage() 245 self.assertTrue(storage is self.storage) 246 return 247 248 def test_isImageStorageEnabled_nosite(self): 249 retrieval = ImageStorageFileRetrieval() 250 self.assertTrue(retrieval.isImageStorageEnabled() is False) 251 return 252 253 def test_isImageStorageEnabled(self): 254 setSite(self.app) 255 retrieval = ImageStorageFileRetrieval() 256 self.assertTrue(retrieval.isImageStorageEnabled() is True) 257 return 258 259 def test_getFile_nosite(self): 260 retrieval = ImageStorageFileRetrieval() 261 f = retrieval.getFile('Hi there!') 262 self.assertEqual(f.read(), 'Hi there!') 263 return 264 265 def test_getFile(self): 266 setSite(self.app) 267 retrieval = ImageStorageFileRetrieval() 268 waeup_image = retrieval.createFile('sample.txt', self.fd) 269 full_id = waeup_image.data 270 result = retrieval.getFile(full_id) 271 self.assertEqual(result.read(), 'Hi there!') 272 return 273 274 def test_createFile_nosite(self): 275 retrieval = ImageStorageFileRetrieval() 276 waeup_image = retrieval.createFile('sample.txt', self.fd) 277 self.assertEqual(waeup_image.data, 'Hi there!') 278 return 279 280 def test_createFile(self): 281 # Ensure we can create WAeUPImageFiles when in site 282 setSite(self.app) 283 retrieval = ImageStorageFileRetrieval() 284 waeup_image = retrieval.createFile('sample.txt', self.fd) 285 full_id = waeup_image.data 286 self.assertEqual(full_id, '396199333edbf40ad43e62a1c1397793-1') 287 return 288 289 def test_waeupimagefile(self): 290 # Make sure WAeUPImageFile can use our file retrieval 291 setSite(self.app) 292 myfile = createWAeUPImageFile('sample.jpg', self.fd) 293 contents = myfile.file.read() 294 self.assertEqual(contents, 'Hi there!') 295 296 def test_waeupimagefile_raw(self): 297 # Make sure we can retrieve a file also if it was initialized 298 # with no image storage available 299 myfile = createWAeUPImageFile('sample.jpg', self.fd) 300 setSite(self.app) 301 contents = myfile.file.read() 302 self.assertEqual(contents, 'Hi there!') 200 self.root = fs.root 201 expected_root = os.path.join( 202 self.app['datacenter'].storage, 'media') 203 self.assertTrue(isinstance(self.root, basestring)) 204 self.assertEqual(self.root, expected_root) 205 return 206 207 def test_get_utility(self): 208 # we can get an ExtFileStore by global utility lookup 209 fs1 = getUtility(IExtFileStore) 210 fs2 = getUtility(IExtFileStore) 211 self.assertTrue(isinstance(fs1, ExtFileStore)) 212 self.assertTrue(fs1 is fs2) 213 return 214 215 def test_default_handler_create_file(self): 216 # we can use the default handler to store files 217 fs = ExtFileStore() 218 result = fs.createFile('sample.txt', StringIO('sample text')) 219 self.assertEqual(result.data, 'sample.txt') 220 self.assertTrue('sample.txt' in os.listdir(fs.root)) 221 return 222 223 def test_default_handler_get_file(self): 224 # we can get files stored by the default handler 225 fs = ExtFileStore() 226 fs.createFile('sample.txt', StringIO('sample text')) 227 result1 = fs.getFile('sample.txt') 228 result2 = fs.getFile('not-existent') 229 self.assertEqual(result1.read(), 'sample text') 230 self.assertTrue(result2 is None) 231 return 232 233 def test_customized_handler_create_file(self): 234 # we can use registered filename handlers 235 fs = ExtFileStore() 236 result = fs.createFile( 237 '__MYMARKER__sample.txt', StringIO('sample text')) 238 self.assertEqual(result.data, '__MYMARKER__sample.txt') 239 self.assertTrue('sample.txt' in os.listdir(fs.root)) 240 return 241 242 def test_customized_handler_get_file(self): 243 # we consider registered filename handlers when asking for 244 # stored files. 245 fs = ExtFileStore() 246 fs.createFile('__MYMARKER__sample.txt', StringIO('sample text')) 247 result1 = fs.getFile('__MYMARKER__sample.txt') 248 result2 = fs.getFile('__MYMARKER__not-existent') 249 result3 = fs.getFile('not-existent') 250 self.assertEqual(result1.read(), 'sample text') 251 self.assertTrue(result2 is None) 252 self.assertTrue(result3 is None) 253 return 254 255 def test_get_file_by_context(self): 256 # if we register a file name chooser, we can also get a file 257 # by context 258 fs = ExtFileStore() 259 context = CustomContext() 260 file_id = IFileStoreNameChooser(context).chooseName() 261 fs = ExtFileStore() 262 fs.createFile(file_id, StringIO('my sample')) 263 result = fs.getFileByContext(context) 264 self.assertEqual( 265 file_id, '__mymarker__mysample.txt') 266 self.assertEqual( 267 result.read(), 'my sample') 268 return 269 270 def test_get_default_handler(self): 271 # we can get a default handler 272 result = queryUtility(IFileStoreHandler) 273 self.assertTrue( 274 isinstance(result, DefaultFileStoreHandler)) 275 return 276 277 def test_get_default_file_retrieval(self): 278 # we get a file store when requesting a file retrieval 279 result = queryUtility(IFileRetrieval) 280 self.assertTrue( 281 isinstance(result, DefaultStorage))
Note: See TracChangeset for help on using the changeset viewer.