source: main/waeup.sirp/trunk/src/waeup/sirp/utils/tests/test_batching.py @ 7016

Last change on this file since 7016 was 6743, checked in by uli, 13 years ago

More usage of logger-aware testing components.
Remove unnecessary imports/testsuite registrations.

File size: 7.7 KB
Line 
1##
2## test_batching.py
3## Login : <uli@pu.smp.net>
4## Started on  Fri Jun  3 00:28:43 2011 Uli Fouquet
5## $Id$
6##
7## Copyright (C) 2011 Uli Fouquet
8## This program is free software; you can redistribute it and/or modify
9## it under the terms of the GNU General Public License as published by
10## the Free Software Foundation; either version 2 of the License, or
11## (at your option) any later version.
12##
13## This program is distributed in the hope that it will be useful,
14## but WITHOUT ANY WARRANTY; without even the implied warranty of
15## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16## GNU General Public License for more details.
17##
18## You should have received a copy of the GNU General Public License
19## along with this program; if not, write to the Free Software
20## Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
21##
22import doctest
23import logging
24import os
25import shutil
26import tempfile
27from zope import schema
28from zope.component import provideUtility
29from zope.component.factory import Factory
30from zope.component.hooks import clearSite
31from zope.component.interfaces import IFactory
32from zope.interface import Interface, implements
33from waeup.sirp.app import University
34from waeup.sirp.testing import FunctionalLayer, FunctionalTestCase
35
36optionflags = (
37    doctest.REPORT_NDIFF + doctest.ELLIPSIS + doctest.NORMALIZE_WHITESPACE)
38
39
40class ICave(Interface):
41    """A cave."""
42    id_num = schema.TextLine(
43        title = u'internal id',
44        default = u'default',
45        required = True,
46        readonly = True,
47        )
48    name = schema.TextLine(
49        title = u'Cave name',
50        default = u'Unnamed',
51        required = True)
52    dinoports = schema.Int(
53        title = u'Number of DinoPorts (tm)',
54        required = False,
55        default = 1)
56    owner = schema.TextLine(
57        title = u'Owner name',
58        required = True,
59        missing_value = 'Fred Estates Inc.')
60    taxpayer = schema.Bool(
61        title = u'Payes taxes',
62        required = True,
63        default = False)
64
65class Cave(object):
66    implements(ICave)
67    def __init__(self, name=u'Unnamed', dinoports=2,
68                 owner='Fred Estates Inc.', taxpayer=False):
69        self.name = name
70        self.dinoports = 2
71        self.owner = owner
72        self.taxpayer = taxpayer
73#Cave = attrs_to_fields(Cave)
74
75stoneville = dict
76
77from waeup.sirp.utils.batching import BatchProcessor
78class CaveProcessor(BatchProcessor):
79    util_name = 'caveprocessor'
80    #grok.name(util_name)
81    name = 'Cave Processor'
82    iface = ICave
83    location_fields = ['name']
84    factory_name = 'Lovely Cave'
85
86    def parentsExist(self, row, site):
87        return True
88
89    def getParent(self, row, site):
90        return stoneville
91
92    def entryExists(self, row, site):
93        return row['name'] in stoneville.keys()
94
95    def getEntry(self, row, site):
96        if not self.entryExists(row, site):
97            return None
98        return stoneville[row['name']]
99
100    def delEntry(self, row, site):
101        del stoneville[row['name']]
102
103    def addEntry(self, obj, row, site):
104        stoneville[row['name']] = obj
105
106    def updateEntry(self, obj, row, site):
107        # This is not strictly necessary, as the default
108        # updateEntry method does exactly the same
109        for key, value in row.items():
110            setattr(obj, key, value)
111
112class BatchProcessorTests(FunctionalTestCase):
113
114    layer = FunctionalLayer
115
116    def setupLogger(self):
117
118        self.logger = logging.getLogger('stoneville')
119        self.logger.setLevel(logging.DEBUG)
120        self.logger.propagate = False
121        self.logfile = os.path.join(self.workdir, 'stoneville.log')
122        self.handler = logging.FileHandler(self.logfile, 'w')
123        self.logger.addHandler(self.handler)
124
125    def setUp(self):
126        global stoneville
127        super(BatchProcessorTests, self).setUp()
128
129        # Setup a sample site for each test
130        app = University()
131        self.dc_root = tempfile.mkdtemp()
132        app['datacenter'].setStoragePath(self.dc_root)
133
134        # Prepopulate the ZODB...
135        self.getRootFolder()['app'] = app
136        self.app = self.getRootFolder()['app']
137
138        self.workdir = tempfile.mkdtemp()
139        factory = Factory(Cave)
140        provideUtility(factory, IFactory, 'Lovely Cave')
141
142        # Provide sample data
143        self.newcomers_csv = os.path.join(self.workdir, 'newcomers.csv')
144        open(self.newcomers_csv, 'wb').write(
145            """name,dinoports,owner,taxpayer
146Barneys Home,2,Barney,1
147Wilmas Asylum,1,Wilma,1
148Freds Dinoburgers,10,Fred,0
149Joeys Drive-in,110,Joey,0
150""")
151        self.setupLogger()
152        self.stoneville = stoneville
153        stoneville = dict()
154        self.resultpath = None
155        return
156
157    def tearDown(self):
158        super(BatchProcessorTests, self).tearDown()
159        shutil.rmtree(self.workdir)
160        shutil.rmtree(self.dc_root)
161        self.logger.removeHandler(self.handler)
162        clearSite()
163        if not isinstance(self.resultpath, list):
164            self.resultpath = [self.resultpath]
165        for path in self.resultpath:
166            if not isinstance(path, basestring):
167                continue
168            if not os.path.isdir(path):
169                path = os.path.dirname(path)
170            if os.path.exists(path):
171                shutil.rmtree(path)
172        return
173
174    def test_import(self):
175        processor = CaveProcessor()
176        result = processor.doImport(
177            self.newcomers_csv,
178            ['name', 'dinoports', 'owner', 'taxpayer'],
179            mode='create', user='Bob', logger=self.logger)
180        num_succ, num_fail, finished_path, failed_path = result
181        self.resultpath = [finished_path, failed_path]
182        assert num_succ == 4
183        assert num_fail == 0
184        assert finished_path.endswith('/newcomers.finished.csv')
185        assert failed_path is None
186
187    def test_import_stoneville(self):
188        processor = CaveProcessor()
189        result = processor.doImport(
190            self.newcomers_csv,
191            ['name', 'dinoports', 'owner', 'taxpayer'],
192            mode='create', user='Bob', logger=self.logger)
193        num_succ, num_fail, finished_path, failed_path = result
194        self.resultpath = [finished_path, failed_path]
195        assert len(self.stoneville) == 4
196        self.assertEqual(
197            sorted(self.stoneville.keys()),
198            [u'Barneys Home', u'Freds Dinoburgers',
199             u'Joeys Drive-in', u'Wilmas Asylum'])
200
201    def test_import_correct_type(self):
202        processor = CaveProcessor()
203        result = processor.doImport(
204            self.newcomers_csv,
205            ['name', 'dinoports', 'owner', 'taxpayer'],
206            mode='create', user='Bob', logger=self.logger)
207        num_succ, num_fail, finished_path, failed_path = result
208        self.resultpath = [finished_path, failed_path]
209        assert isinstance(self.stoneville['Barneys Home'].dinoports, int)
210
211
212    def test_log(self):
213        """
214           >>> print log_contents
215           --------------------
216           Bob: Batch processing finished: OK
217           Bob: Source: /.../newcomers.csv
218           Bob: Mode: create
219           Bob: User: Bob
220           Bob: Processing time: ... s (... s/item)
221           Bob: Processed: 4 lines (4 successful/ 0 failed)
222           --------------------
223
224        """
225        processor = CaveProcessor()
226        result = processor.doImport(
227            self.newcomers_csv,
228            ['name', 'dinoports', 'owner', 'taxpayer'],
229            mode='create', user='Bob', logger=self.logger)
230        num_succ, num_fail, finished_path, failed_path = result
231        self.resultpath = [finished_path, failed_path]
232        log_contents = open(self.logfile, 'rb').read()
233        doctest.run_docstring_examples(
234            self.test_log, locals(), False, 'test_log', None, optionflags)
235        return
236
237#def test_suite():
238#    return unittest.TestSuite()
Note: See TracBrowser for help on using the repository browser.