source: main/waeup.sirp/trunk/src/waeup/sirp/utils/tests/test_batching.py @ 6737

Last change on this file since 6737 was 6273, checked in by uli, 14 years ago

Finally make the new converter work. API-wise it is as good as the old one (can import everyting, the old one could),
but design-wise it might be much more powerfull. Basically it can handle/convert all content-types for which one can
create an Add- or EditForm? successfully. In other words: if you manage to write an edit form for some content type,
then you can also create an importer for that content-type. Still finetuning needed (for dates, bool data, etc.) but
the main things work.

File size: 7.8 KB
Line 
1##
2## test_batching.py
3## Login : <uli@pu.smp.net>
4## Started on  Fri Jun  3 00:28:43 2011 Uli Fouquet
5## $Id$
6##
7## Copyright (C) 2011 Uli Fouquet
8## This program is free software; you can redistribute it and/or modify
9## it under the terms of the GNU General Public License as published by
10## the Free Software Foundation; either version 2 of the License, or
11## (at your option) any later version.
12##
13## This program is distributed in the hope that it will be useful,
14## but WITHOUT ANY WARRANTY; without even the implied warranty of
15## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16## GNU General Public License for more details.
17##
18## You should have received a copy of the GNU General Public License
19## along with this program; if not, write to the Free Software
20## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
21##
22import doctest
23import logging
24import os
25import shutil
26import tempfile
27import unittest
28from zope import schema
29from zope.app.testing.functional import FunctionalTestCase
30from zope.component import provideUtility
31from zope.component.factory import Factory
32from zope.component.hooks import setSite, clearSite
33from zope.component.interfaces import IFactory
34from zope.interface import Interface, implements
35from waeup.sirp.app import University
36from waeup.sirp.testing import FunctionalLayer, doctestsuite_for_module
37from waeup.sirp.utils.helpers import attrs_to_fields
38
39optionflags = (
40    doctest.REPORT_NDIFF + doctest.ELLIPSIS + doctest.NORMALIZE_WHITESPACE)
41
42
43class ICave(Interface):
44    """A cave."""
45    id_num = schema.TextLine(
46        title = u'internal id',
47        default = u'default',
48        required = True,
49        readonly = True,
50        )
51    name = schema.TextLine(
52        title = u'Cave name',
53        default = u'Unnamed',
54        required = True)
55    dinoports = schema.Int(
56        title = u'Number of DinoPorts (tm)',
57        required = False,
58        default = 1)
59    owner = schema.TextLine(
60        title = u'Owner name',
61        required = True,
62        missing_value = 'Fred Estates Inc.')
63    taxpayer = schema.Bool(
64        title = u'Payes taxes',
65        required = True,
66        default = False)
67
68class Cave(object):
69    implements(ICave)
70    def __init__(self, name=u'Unnamed', dinoports=2,
71                 owner='Fred Estates Inc.', taxpayer=False):
72        self.name = name
73        self.dinoports = 2
74        self.owner = owner
75        self.taxpayer = taxpayer
76#Cave = attrs_to_fields(Cave)
77
78stoneville = dict
79
80from waeup.sirp.utils.batching import BatchProcessor
81class CaveProcessor(BatchProcessor):
82    util_name = 'caveprocessor'
83    #grok.name(util_name)
84    name = 'Cave Processor'
85    iface = ICave
86    location_fields = ['name']
87    factory_name = 'Lovely Cave'
88
89    def parentsExist(self, row, site):
90        return True
91
92    def getParent(self, row, site):
93        return stoneville
94
95    def entryExists(self, row, site):
96        return row['name'] in stoneville.keys()
97
98    def getEntry(self, row, site):
99        if not self.entryExists(row, site):
100            return None
101        return stoneville[row['name']]
102
103    def delEntry(self, row, site):
104        del stoneville[row['name']]
105
106    def addEntry(self, obj, row, site):
107        stoneville[row['name']] = obj
108
109    def updateEntry(self, obj, row, site):
110        # This is not strictly necessary, as the default
111        # updateEntry method does exactly the same
112        for key, value in row.items():
113            setattr(obj, key, value)
114
115class BatchProcessorTests(FunctionalTestCase):
116
117    layer = FunctionalLayer
118
119    def setupLogger(self):
120
121        self.logger = logging.getLogger('stoneville')
122        self.logger.setLevel(logging.DEBUG)
123        self.logger.propagate = False
124        self.logfile = os.path.join(self.workdir, 'stoneville.log')
125        self.handler = logging.FileHandler(self.logfile, 'w')
126        self.logger.addHandler(self.handler)
127
128    def setUp(self):
129        global stoneville
130        super(BatchProcessorTests, self).setUp()
131
132        # Setup a sample site for each test
133        app = University()
134        self.dc_root = tempfile.mkdtemp()
135        app['datacenter'].setStoragePath(self.dc_root)
136
137        # Prepopulate the ZODB...
138        self.getRootFolder()['app'] = app
139        self.app = self.getRootFolder()['app']
140
141        self.workdir = tempfile.mkdtemp()
142        factory = Factory(Cave)
143        provideUtility(factory, IFactory, 'Lovely Cave')
144
145        # Provide sample data
146        self.newcomers_csv = os.path.join(self.workdir, 'newcomers.csv')
147        open(self.newcomers_csv, 'wb').write(
148            """name,dinoports,owner,taxpayer
149Barneys Home,2,Barney,1
150Wilmas Asylum,1,Wilma,1
151Freds Dinoburgers,10,Fred,0
152Joeys Drive-in,110,Joey,0
153""")
154        self.setupLogger()
155        self.stoneville = stoneville
156        stoneville = dict()
157        self.resultpath = None
158        return
159
160    def tearDown(self):
161        super(BatchProcessorTests, self).tearDown()
162        shutil.rmtree(self.workdir)
163        shutil.rmtree(self.dc_root)
164        self.logger.removeHandler(self.handler)
165        clearSite()
166        if not isinstance(self.resultpath, list):
167            self.resultpath = [self.resultpath]
168        for path in self.resultpath:
169            if not isinstance(path, basestring):
170                continue
171            if not os.path.isdir(path):
172                path = os.path.dirname(path)
173            if os.path.exists(path):
174                shutil.rmtree(path)
175        return
176
177    def test_import(self):
178        processor = CaveProcessor()
179        result = processor.doImport(
180            self.newcomers_csv,
181            ['name', 'dinoports', 'owner', 'taxpayer'],
182            mode='create', user='Bob', logger=self.logger)
183        num_succ, num_fail, finished_path, failed_path = result
184        self.resultpath = [finished_path, failed_path]
185        assert num_succ == 4
186        assert num_fail == 0
187        assert finished_path.endswith('/newcomers.finished.csv')
188        assert failed_path is None
189
190    def test_import_stoneville(self):
191        processor = CaveProcessor()
192        result = processor.doImport(
193            self.newcomers_csv,
194            ['name', 'dinoports', 'owner', 'taxpayer'],
195            mode='create', user='Bob', logger=self.logger)
196        num_succ, num_fail, finished_path, failed_path = result
197        self.resultpath = [finished_path, failed_path]
198        assert len(self.stoneville) == 4
199        self.assertEqual(
200            sorted(self.stoneville.keys()),
201            [u'Barneys Home', u'Freds Dinoburgers',
202             u'Joeys Drive-in', u'Wilmas Asylum'])
203
204    def test_import_correct_type(self):
205        processor = CaveProcessor()
206        result = processor.doImport(
207            self.newcomers_csv,
208            ['name', 'dinoports', 'owner', 'taxpayer'],
209            mode='create', user='Bob', logger=self.logger)
210        num_succ, num_fail, finished_path, failed_path = result
211        self.resultpath = [finished_path, failed_path]
212        assert isinstance(self.stoneville['Barneys Home'].dinoports, int)
213
214
215    def test_log(self):
216        """
217           >>> print log_contents
218           --------------------
219           Bob: Batch processing finished: OK
220           Bob: Source: /.../newcomers.csv
221           Bob: Mode: create
222           Bob: User: Bob
223           Bob: Processing time: ... s (... s/item)
224           Bob: Processed: 4 lines (4 successful/ 0 failed)
225           --------------------
226
227        """
228        processor = CaveProcessor()
229        result = processor.doImport(
230            self.newcomers_csv,
231            ['name', 'dinoports', 'owner', 'taxpayer'],
232            mode='create', user='Bob', logger=self.logger)
233        num_succ, num_fail, finished_path, failed_path = result
234        self.resultpath = [finished_path, failed_path]
235        log_contents = open(self.logfile, 'rb').read()
236        doctest.run_docstring_examples(
237            self.test_log, locals(), False, 'test_log', None, optionflags)
238        return
239
240#def test_suite():
241#    return unittest.TestSuite()
Note: See TracBrowser for help on using the repository browser.