Ignore:
Timestamp:
9 Nov 2011, 15:42:45 (13 years ago)
Author:
uli
Message:

Merge changes from branch ulif-extimgstore back into trunk.
Beside external image storage also waeupdocs should work again.

Location:
main/waeup.sirp/trunk
Files:
3 edited

Legend:

Unmodified
Added
Removed
  • main/waeup.sirp/trunk

  • main/waeup.sirp/trunk/src/waeup/sirp/tests/test_app.py

    r6593 r7063  
    66from zope.interface.verify import verifyClass, verifyObject
    77from waeup.sirp.app import University
    8 from waeup.sirp.imagestorage import ImageStorageFileRetrieval
    98from waeup.sirp.interfaces import IUniversity
    109from waeup.sirp.testing import FunctionalLayer, FunctionalTestCase
     
    3231        return
    3332
    34     def test_images(self):
    35         # Make sure we have a image container in a university
    36         assert 'images' in self.app.keys()
    37         return
    38 
    3933    def test_IFileRetrieval_utility(self):
    4034        # Make sure we can get a local IFileRetrieval utility
     
    4337        assert result is not None
    4438        assert IFileRetrieval.providedBy(result)
    45         assert isinstance(result, ImageStorageFileRetrieval)
    4639        return
    4740
  • main/waeup.sirp/trunk/src/waeup/sirp/tests/test_imagestorage.py

    r6529 r7063  
    11import os
    2 import sys
    32import tempfile
    43import shutil
    54import unittest
    65from StringIO import StringIO
     6from hurry.file import HurryFile
    77from hurry.file.interfaces import IFileRetrieval
     8from zope.component import (
     9    getUtility, provideUtility, queryUtility, provideAdapter)
    810from zope.component.hooks import setSite
    911from zope.interface.verify import verifyClass, verifyObject
    1012from waeup.sirp.app import University
    1113from waeup.sirp.testing import FunctionalLayer, FunctionalTestCase
    12 from waeup.sirp.image import createWAeUPImageFile
    1314from waeup.sirp.imagestorage import (
    14     md5digest, Basket, ImageStorage, ImageStorageFileRetrieval,)
     15    FileStoreNameChooser, ExtFileStore, DefaultFileStoreHandler,
     16    DefaultStorage)
     17from waeup.sirp.interfaces import (
     18    IFileStoreNameChooser, IExtFileStore, IFileStoreHandler,)
    1519
    1620class HelperFuncsTests(unittest.TestCase):
     
    2226        shutil.rmtree(self.workdir)
    2327
    24     def test_md5digest(self):
    25         samplefile = os.path.join(self.workdir, 'sample')
    26         open(samplefile, 'wb').write('blah')
    27         fp = open(samplefile, 'r')
    28         digest = md5digest(fp)
    29         self.assertEqual(digest, '6f1ed002ab5595859014ebf0951522d9')
    30 
    31 class BasketTests(FunctionalTestCase):
     28class FileStoreNameChooserTests(FunctionalTestCase):
    3229
    3330    layer = FunctionalLayer
    3431
     32    def test_iface(self):
     33        # we provide the interfaces we promise to do
     34        obj = FileStoreNameChooser(None)
     35        verifyClass(IFileStoreNameChooser, FileStoreNameChooser)
     36        verifyObject(IFileStoreNameChooser, obj)
     37        return
     38
     39    def test_accessible_as_adapter(self):
     40        # we can get a file name chooser via adapter
     41        chooser = IFileStoreNameChooser(object())
     42        self.assertTrue(
     43            isinstance(chooser, FileStoreNameChooser))
     44        return
     45
     46    def test_check_name(self):
     47        # default file name choosers accept any string
     48        chooser = FileStoreNameChooser(object())
     49        self.assertEqual(chooser.checkName('Hi there!'), True)
     50        self.assertEqual(chooser.checkName(None), False)
     51        return
     52
     53    def test_choose_name(self):
     54        # we get a simple string if we do not pass in a valid string
     55        chooser = FileStoreNameChooser(object())
     56        self.assertEqual(chooser.chooseName('myname'), 'myname')
     57        self.assertEqual(chooser.chooseName(None), u'unknown_file')
     58        return
     59
     60class ExtFileStoreTests(unittest.TestCase):
     61    # Test external file store (non-functional mode)
     62
    3563    def setUp(self):
    36         super(BasketTests, self).setUp()
     64        self.workdir = tempfile.mkdtemp()
     65        self.root = None
     66        return
     67
     68    def tearDown(self):
     69        shutil.rmtree(self.workdir)
     70        if self.root is not None:
     71            shutil.rmtree(self.root)
     72        return
     73
     74    def test_iface(self):
     75        obj = ExtFileStore(None)
     76        verifyClass(IExtFileStore, ExtFileStore)
     77        verifyObject(IExtFileStore, obj)
     78        return
     79
     80    def test_root_setup_wo_site(self):
     81        # if no site is available we can use a temporary root
     82        fs = ExtFileStore()
     83        self.root = fs.root
     84        self.assertTrue(isinstance(self.root, basestring))
     85        self.assertTrue(os.path.exists(self.root))
     86        return
     87
     88    def test_create_instance(self):
     89        storage1 = ExtFileStore()
     90        storage2 = ExtFileStore(root=self.workdir)
     91        self.root = storage1.root
     92        self.assertTrue(storage1.root is not None)
     93        self.assertTrue(storage1.root != storage2.root)
     94        self.assertEqual(storage2.root, self.workdir)
     95        return
     96
     97    def test_create_file(self):
     98        # We can store files
     99        storage = ExtFileStore(root=self.workdir)
     100        dummy_file = StringIO('sample file')
     101        image_file = storage.createFile('mysample', dummy_file)
     102        self.assertTrue('mysample' in os.listdir(storage.root))
     103        self.assertEqual('mysample', image_file.data)
     104        return
     105
     106    def test_get_file(self):
     107        # We can get files after having them stored
     108        storage = ExtFileStore(root=self.workdir)
     109        dummy_file = StringIO('sample file')
     110        image_file = storage.createFile('mysample', dummy_file)
     111        result = storage.getFile(image_file.data)
     112        self.assertEqual(result.read(), 'sample file')
     113        return
     114
     115    def test_extract_marker(self):
     116        # file stores support extracting markers from filenames
     117        storage = ExtFileStore(root=self.workdir)
     118        result1 = storage.extractMarker(None)
     119        result2 = storage.extractMarker('')
     120        result3 = storage.extractMarker('no-marker')
     121        result4 = storage.extractMarker('no-marker.txt')
     122        result5 = storage.extractMarker('__MARKER__foo.jpg')
     123        result6 = storage.extractMarker('__MaRkEr__foo.jpg')
     124        result7 = storage.extractMarker('__THE_MARKER__foo.jpg')
     125        result8 = storage.extractMarker('__A_MARK__my__foo.jpg')
     126
     127        self.assertEqual(result1, ('', '', '', ''))
     128        self.assertEqual(result2, ('', '', '', ''))
     129        self.assertEqual(result3, ('', 'no-marker', 'no-marker', ''))
     130        self.assertEqual(result4, ('', 'no-marker.txt', 'no-marker', '.txt'))
     131        self.assertEqual(result5, ('marker', 'foo.jpg', 'foo', '.jpg'))
     132        self.assertEqual(result6, ('marker', 'foo.jpg', 'foo', '.jpg'))
     133        self.assertEqual(result7, ('the_marker', 'foo.jpg', 'foo', '.jpg'))
     134        self.assertEqual(result8, ('a_mark', 'my__foo.jpg', 'my__foo', '.jpg'))
     135        return
     136
     137class DefaultFileStoreHandlerTests(unittest.TestCase):
     138
     139    def test_iface(self):
     140        obj = DefaultFileStoreHandler()
     141        verifyClass(IFileStoreHandler, DefaultFileStoreHandler)
     142        verifyObject(IFileStoreHandler, obj)
     143        return
     144
     145class CustomizedFileHandler(object):
     146    def pathFromFileID(self, store, root, file_id):
     147        return os.path.join(root, file_id[12:])
     148
     149    def createFile(self, store, root, file_id, filename, f):
     150        path = self.pathFromFileID(store, root, file_id)
     151        return f, path, HurryFile(filename, file_id)
     152
     153class CustomContext(object):
     154    pass
     155
     156class CustomContextFileChooser(object):
     157    def __init__(self, context):
     158        self.context = context
     159
     160    def chooseName(self, name=None):
     161        return '__mymarker__mysample.txt'
     162
     163class FunctionalExtFileStoreTests(FunctionalTestCase):
     164
     165    layer = FunctionalLayer
     166
     167    def setUp(self):
     168        super(FunctionalExtFileStoreTests, self).setUp()
    37169        self.workdir = tempfile.mkdtemp()
    38170        self.samplefile = os.path.join(self.workdir, 'sample')
     
    40172        open(self.samplefile, 'wb').write('Hi there!')
    41173        open(self.otherfile, 'wb').write('Hi from other!')
    42         self.basket = Basket()
    43174        self.fd = open(self.samplefile, 'r')
    44175        self.fd2 = open(self.otherfile, 'r')
    45         self.stderr = StringIO()
    46         self.old_stderr = sys.stderr
     176        self.getRootFolder()['app'] = University()
     177        self.app = self.getRootFolder()['app']
     178        self.app['datacenter'].setStoragePath(self.workdir)
     179        # register a custom filename mangler
     180        provideUtility(
     181            CustomizedFileHandler(), IFileStoreHandler, name=u'mymarker')
     182        # register a file chooser adapter for CustomContext
     183        provideAdapter(
     184            CustomContextFileChooser,
     185            (CustomContext,), IFileStoreNameChooser)
     186        return
     187
    47188
    48189    def tearDown(self):
    49         sys.stderr = self.old_stderr
    50         super(BasketTests, self).tearDown()
    51         self.fd.close()
    52         shutil.rmtree(self.workdir)
    53         self.basket._del() # Remove subojects explicitly
    54         del self.basket
    55         return
    56 
    57     def test_ifaces(self):
    58         pass
    59 
    60     def test_curr_id_empty(self):
    61         curr_id = self.basket.curr_id
    62         self.assertEqual(curr_id, '1')
    63 
    64     def test_getInternalId_empty(self):
    65         basket_id = self.basket.getInternalId(self.fd)
    66         self.assertTrue(basket_id is None)
    67 
    68     def test_storeFile_single(self):
    69         basket_id = self.basket.storeFile(self.fd, 'sample')
    70         self.assertEqual(basket_id, '1')
    71         contents = self.basket['1'].open('r').read()
    72         self.assertEqual(contents, 'Hi there!')
    73 
    74     def test_storeFile_double(self):
    75         basket_id1 = self.basket.storeFile(self.fd, 'sample')
    76         basket_id2 = self.basket.storeFile(self.fd, 'sample')
    77         self.assertTrue(basket_id1 == basket_id2 == '1')
    78         contents = self.basket['1'].open('r').read()
    79         self.assertEqual(contents, 'Hi there!')
    80 
    81     def test_storeFile_multiple(self):
    82         basket_id1 = self.basket.storeFile(self.fd, 'sample')
    83         basket_id2 = self.basket.storeFile(self.fd2, 'sample')
    84         self.assertEqual(basket_id1, '1')
    85         self.assertEqual(basket_id2, '2')
    86         contents1 = self.basket['1'].open('r').read()
    87         contents2 = self.basket['2'].open('r').read()
    88         self.assertEqual(contents1, 'Hi there!')
    89         self.assertEqual(contents2, 'Hi from other!')
    90 
    91     def test_retrieveFile(self):
    92         basket_id = self.basket.storeFile(self.fd, 'sample')
    93         fd = self.basket.retrieveFile(basket_id)
    94         result = fd.read()
    95         self.assertEqual(result, 'Hi there!')
    96 
    97     def test_retrieveFile_not_existent(self):
    98         result = self.basket.retrieveFile('not-a-valid-basket-id')
    99         self.assertTrue(result is None)
    100 
    101     def test_detect_zero_length_blobs(self):
    102         # Ensure we get a warning when an empty Blob is found
    103         self.basket.storeFile(self.fd, 'sample')
    104         self.basket['1'].open('w').write('')
    105         self.fd.seek(0)
    106         sys.stderr = self.stderr         # Redirect stderr
    107         self.basket.storeFile(self.fd, 'sample')
    108         sys.stderr = self.old_stderr     # Restore stderr
    109         self.stderr.seek(0)
    110         self.assertTrue(
    111             "EMPTY BLOB DETECTED" in self.stderr.read())
    112 
    113     def test_refill_zero_length_blobs(self):
    114         # When we detect an empty Blob, it will be reused
    115         self.basket.storeFile(self.fd, 'sample')
    116         self.basket['1'].open('w').write('')
    117         self.fd.seek(0)
    118         sys.stderr = self.stderr         # Redirect stderr
    119         self.basket.storeFile(self.fd, 'sample')
    120         sys.stderr = self.old_stderr     # Restore stderr
    121         contents = self.basket['1'].open('r').read()
    122         self.assertEqual(contents, 'Hi there!')
    123 
    124 
    125 class ImageStorageTests(FunctionalTestCase):
    126 
    127     layer = FunctionalLayer
    128 
    129     def setUp(self):
    130         super(ImageStorageTests, self).setUp()
    131         self.workdir = tempfile.mkdtemp()
    132         self.samplefile = os.path.join(self.workdir, 'sample')
    133         self.otherfile = os.path.join(self.workdir, 'other')
    134         open(self.samplefile, 'wb').write('Hi there!')
    135         open(self.otherfile, 'wb').write('Hi from other!')
    136         self.storage = ImageStorage()
    137         self.fd = open(self.samplefile, 'r')
    138         self.fd2 = open(self.otherfile, 'r')
    139 
    140     def tearDown(self):
    141         super(ImageStorageTests, self).tearDown()
     190        super(FunctionalExtFileStoreTests, self).tearDown()
    142191        self.fd.close()
    143192        self.fd2.close()
    144193        shutil.rmtree(self.workdir)
    145         self.storage._del() # Remove subojects explicitly
    146         del self.storage
    147         return
    148 
    149     def test_ifaces(self):
    150         pass
    151 
    152     def test_storeFile(self):
    153         full_id = self.storage.storeFile(self.fd, 'sample.txt')
    154         self.assertEqual(full_id, '396199333edbf40ad43e62a1c1397793-1')
    155 
    156     def test_storeFile_duplicate(self):
    157         full_id1 = self.storage.storeFile(self.fd, 'sample1.txt')
    158         full_id2 = self.storage.storeFile(self.fd, 'sample2.txt')
    159         full_id3 = self.storage.storeFile(self.fd, 'sample1.txt')
    160         self.assertEqual(full_id1, '396199333edbf40ad43e62a1c1397793-1')
    161         self.assertEqual(full_id2, '396199333edbf40ad43e62a1c1397793-1')
    162         self.assertEqual(full_id3, '396199333edbf40ad43e62a1c1397793-1')
    163         contents = self.storage.retrieveFile(
    164             '396199333edbf40ad43e62a1c1397793-1').read()
    165         self.assertEqual(contents, 'Hi there!')
    166 
    167     def test_storeFile_multiple(self):
    168         full_id1 = self.storage.storeFile(self.fd, 'sample1.txt')
    169         full_id2 = self.storage.storeFile(self.fd2, 'sample1.txt')
    170         self.assertEqual(full_id1, '396199333edbf40ad43e62a1c1397793-1')
    171         self.assertEqual(full_id2, '6936fcf8d564f1c5be5a017e650c5e8f-1')
    172 
    173     def test_retrieveFile_not_existent(self):
    174         result = self.storage.retrieveFile('not-existent')
    175         self.assertTrue(result is None)
    176 
    177     def test_retrieveFile_illegal_marker(self):
    178         result1 = self.storage.retrieveFile('really-not-existent')
    179         result2 = self.storage.retrieveFile('notexistent')
    180         self.assertTrue(result1 is result2 is None)
    181 
    182     def test_retrieveFile(self):
    183         full_id = self.storage.storeFile(self.fd, 'sample.txt')
    184         result = self.storage.retrieveFile(full_id)
    185         content = result.read()
    186         self.assertEqual(content, 'Hi there!')
    187 
    188     def test_retrieveFile_multiple(self):
    189         full_id1 = self.storage.storeFile(self.fd, 'sample.txt')
    190         full_id2 = self.storage.storeFile(self.fd2, 'other.txt')
    191         result1 = self.storage.retrieveFile(full_id1)
    192         result2 = self.storage.retrieveFile(full_id2)
    193         content1 = result1.read()
    194         content2 = result2.read()
    195         self.assertEqual(content1, 'Hi there!')
    196         self.assertEqual(content2, 'Hi from other!')
    197 
    198 class ImageStorageFileRetrievalTests(FunctionalTestCase):
    199 
    200     layer = FunctionalLayer
    201 
    202     def setUp(self):
    203         super(ImageStorageFileRetrievalTests, self).setUp()
    204         self.workdir = tempfile.mkdtemp()
    205         self.samplefile = os.path.join(self.workdir, 'sample')
    206         self.otherfile = os.path.join(self.workdir, 'other')
    207         open(self.samplefile, 'wb').write('Hi there!')
    208         open(self.otherfile, 'wb').write('Hi from other!')
    209         self.storage = ImageStorage()
    210         self.fd = open(self.samplefile, 'r')
    211         self.fd2 = open(self.otherfile, 'r')
    212         # Set up a single image storage in a site
    213         self.getRootFolder()['app'] = University()
    214         self.app = self.getRootFolder()['app']
    215         if not 'images' in self.app.keys():
    216             self.app['images'] = ImageStorage()
    217         self.storage = self.app['images']
    218         return
    219 
    220 
    221     def tearDown(self):
    222         super(ImageStorageFileRetrievalTests, self).tearDown()
    223         self.fd.close()
    224         self.fd2.close()
    225         shutil.rmtree(self.workdir)
    226         self.storage._del() # Remove subojects explicitly
    227         return
    228 
    229     def test_ifaces(self):
    230         retrieval = ImageStorageFileRetrieval()
    231         assert verifyClass(IFileRetrieval, ImageStorageFileRetrieval)
    232         assert verifyObject(IFileRetrieval, retrieval)
    233         return
    234 
    235     def test_getImageStorage_nosite(self):
    236         retrieval = ImageStorageFileRetrieval()
    237         storage = retrieval.getImageStorage()
    238         self.assertTrue(storage is None)
    239         return
    240 
    241     def test_getImageStorage(self):
     194        return
     195
     196    def test_root_setup_w_site(self):
     197        # if a site is available we use it to determine the root dir
     198        fs = ExtFileStore()
    242199        setSite(self.app)
    243         retrieval = ImageStorageFileRetrieval()
    244         storage = retrieval.getImageStorage()
    245         self.assertTrue(storage is self.storage)
    246         return
    247 
    248     def test_isImageStorageEnabled_nosite(self):
    249         retrieval = ImageStorageFileRetrieval()
    250         self.assertTrue(retrieval.isImageStorageEnabled() is False)
    251         return
    252 
    253     def test_isImageStorageEnabled(self):
    254         setSite(self.app)
    255         retrieval = ImageStorageFileRetrieval()
    256         self.assertTrue(retrieval.isImageStorageEnabled() is True)
    257         return
    258 
    259     def test_getFile_nosite(self):
    260         retrieval = ImageStorageFileRetrieval()
    261         f = retrieval.getFile('Hi there!')
    262         self.assertEqual(f.read(), 'Hi there!')
    263         return
    264 
    265     def test_getFile(self):
    266         setSite(self.app)
    267         retrieval = ImageStorageFileRetrieval()
    268         waeup_image = retrieval.createFile('sample.txt', self.fd)
    269         full_id = waeup_image.data
    270         result = retrieval.getFile(full_id)
    271         self.assertEqual(result.read(), 'Hi there!')
    272         return
    273 
    274     def test_createFile_nosite(self):
    275         retrieval = ImageStorageFileRetrieval()
    276         waeup_image = retrieval.createFile('sample.txt', self.fd)
    277         self.assertEqual(waeup_image.data, 'Hi there!')
    278         return
    279 
    280     def test_createFile(self):
    281         # Ensure we can create WAeUPImageFiles when in site
    282         setSite(self.app)
    283         retrieval = ImageStorageFileRetrieval()
    284         waeup_image = retrieval.createFile('sample.txt', self.fd)
    285         full_id = waeup_image.data
    286         self.assertEqual(full_id, '396199333edbf40ad43e62a1c1397793-1')
    287         return
    288 
    289     def test_waeupimagefile(self):
    290         # Make sure WAeUPImageFile can use our file retrieval
    291         setSite(self.app)
    292         myfile = createWAeUPImageFile('sample.jpg', self.fd)
    293         contents = myfile.file.read()
    294         self.assertEqual(contents, 'Hi there!')
    295 
    296     def test_waeupimagefile_raw(self):
    297         # Make sure we can retrieve a file also if it was initialized
    298         # with no image storage available
    299         myfile = createWAeUPImageFile('sample.jpg', self.fd)
    300         setSite(self.app)
    301         contents = myfile.file.read()
    302         self.assertEqual(contents, 'Hi there!')
     200        self.root = fs.root
     201        expected_root = os.path.join(
     202            self.app['datacenter'].storage, 'media')
     203        self.assertTrue(isinstance(self.root, basestring))
     204        self.assertEqual(self.root, expected_root)
     205        return
     206
     207    def test_get_utility(self):
     208        # we can get an ExtFileStore by global utility lookup
     209        fs1 = getUtility(IExtFileStore)
     210        fs2 = getUtility(IExtFileStore)
     211        self.assertTrue(isinstance(fs1, ExtFileStore))
     212        self.assertTrue(fs1 is fs2)
     213        return
     214
     215    def test_default_handler_create_file(self):
     216        # we can use the default handler to store files
     217        fs = ExtFileStore()
     218        result = fs.createFile('sample.txt', StringIO('sample text'))
     219        self.assertEqual(result.data, 'sample.txt')
     220        self.assertTrue('sample.txt' in os.listdir(fs.root))
     221        return
     222
     223    def test_default_handler_get_file(self):
     224        # we can get files stored by the default handler
     225        fs = ExtFileStore()
     226        fs.createFile('sample.txt', StringIO('sample text'))
     227        result1 = fs.getFile('sample.txt')
     228        result2 = fs.getFile('not-existent')
     229        self.assertEqual(result1.read(), 'sample text')
     230        self.assertTrue(result2 is None)
     231        return
     232
     233    def test_customized_handler_create_file(self):
     234        # we can use registered filename handlers
     235        fs = ExtFileStore()
     236        result = fs.createFile(
     237            '__MYMARKER__sample.txt', StringIO('sample text'))
     238        self.assertEqual(result.data, '__MYMARKER__sample.txt')
     239        self.assertTrue('sample.txt' in os.listdir(fs.root))
     240        return
     241
     242    def test_customized_handler_get_file(self):
     243        # we consider registered filename handlers when asking for
     244        # stored files.
     245        fs = ExtFileStore()
     246        fs.createFile('__MYMARKER__sample.txt', StringIO('sample text'))
     247        result1 = fs.getFile('__MYMARKER__sample.txt')
     248        result2 = fs.getFile('__MYMARKER__not-existent')
     249        result3 = fs.getFile('not-existent')
     250        self.assertEqual(result1.read(), 'sample text')
     251        self.assertTrue(result2 is None)
     252        self.assertTrue(result3 is None)
     253        return
     254
     255    def test_get_file_by_context(self):
     256        # if we register a file name chooser, we can also get a file
     257        # by context
     258        fs = ExtFileStore()
     259        context = CustomContext()
     260        file_id = IFileStoreNameChooser(context).chooseName()
     261        fs = ExtFileStore()
     262        fs.createFile(file_id, StringIO('my sample'))
     263        result = fs.getFileByContext(context)
     264        self.assertEqual(
     265            file_id, '__mymarker__mysample.txt')
     266        self.assertEqual(
     267            result.read(), 'my sample')
     268        return
     269
     270    def test_get_default_handler(self):
     271        # we can get a default handler
     272        result = queryUtility(IFileStoreHandler)
     273        self.assertTrue(
     274            isinstance(result, DefaultFileStoreHandler))
     275        return
     276
     277    def test_get_default_file_retrieval(self):
     278        # we get a file store when requesting a file retrieval
     279        result = queryUtility(IFileRetrieval)
     280        self.assertTrue(
     281            isinstance(result, DefaultStorage))
Note: See TracChangeset for help on using the changeset viewer.