source: main/waeup.kofa/trunk/src/waeup/kofa/tests/test_reports.py @ 9633

Last change on this file since 9633 was 9633, checked in by uli, 12 years ago

Basic statistics for kofa. Still the functionality is hidden - but available :-)

File size: 17.9 KB
Line 
1import datetime
2import grok
3import logging
4import pytz
5import unittest
6from cStringIO import StringIO
7from grok.interfaces import IContainer
8from zc.async.interfaces import IJob
9from zc.async.testing import wait_for_result
10from zope.component import getGlobalSiteManager, getUtility
11from zope.component.hooks import setSite
12from zope.interface.verify import verifyClass, verifyObject
13from waeup.kofa.interfaces import IJobManager, IKofaPluggable
14from waeup.kofa.reports import (
15    IReport, IReportGenerator, IReportJob, IReportJobContainer,
16    IReportsContainer,)
17from waeup.kofa.reports import (
18    Report, ReportGenerator, get_generators, report_job, AsyncReportJob,
19    ReportJobContainer, ReportsContainer, ReportsContainerPlugin)
20from waeup.kofa.testing import FakeJob, FakeJobManager, FunctionalLayer
21from waeup.kofa.tests.test_async import FunctionalAsyncTestCase
22
23class FakeReportGenerator(ReportGenerator):
24    # fake report generator for tests.
25
26    def __init__(self, name=None, perm_create=None, perm_view=None):
27        self.title = 'Report 1'
28        self.perm_create = perm_create
29        self.perm_view = perm_view
30        return
31
32    def __eq__(self, obj):
33        if getattr(obj, 'title', None) is None:
34            return False
35        return self.title == obj.title
36
37    def generate(self, site, args=[], kw={}):
38        result = Report()
39        result.args = args
40        result.kw = kw
41        return result
42
43class ReportTests(unittest.TestCase):
44
45    def test_iface(self):
46        # make sure we fullfill the promised contracts
47        obj = Report()
48        verifyClass(IReport, Report)
49        verifyObject(IReport, obj)
50        return
51
52    def test_creation_dt(self):
53        # a report sets a datetime timestamp when created
54        report = Report()
55        self.assertTrue(hasattr(report, 'creation_dt'))
56        self.assertTrue(isinstance(report.creation_dt, datetime.datetime))
57        # the datetime is set with UTC timezone info
58        self.assertEqual(report.creation_dt.tzinfo, pytz.utc)
59        return
60
61    def test_args(self):
62        # a report stores the args and kw used for generation
63        report1 = Report()
64        report2 = Report(args=[1, 2], kwargs=dict(a=1, b=2))
65        self.assertEqual(report1.args, [])
66        self.assertEqual(report1.kwargs, dict())
67        self.assertEqual(report2.args, [1, 2])
68        self.assertEqual(report2.kwargs, dict(a=1, b=2))
69        return
70
71    def test_create_pdf(self):
72        # trying to create a pdf results in an error
73        report = Report()
74        self.assertRaises(NotImplementedError, report.create_pdf)
75        return
76
77class ReportGeneratorTest(unittest.TestCase):
78    def setUp(self):
79        grok.testing.grok('waeup.kofa.reports') # register utils
80
81    def test_iface(self):
82        # make sure we fullfill the promised contracts
83        obj = ReportGenerator()
84        verifyClass(IReportGenerator, ReportGenerator)
85        verifyObject(IReportGenerator, obj)
86        return
87
88    def test_generate(self):
89        # the base report generator delivers reports
90        obj = ReportGenerator()
91        result = obj.generate(None)
92        self.assertTrue(IReport.providedBy(result))
93        return
94
95class GeneratorRegistrar(object):
96    # Mix-in providing report generator registrations
97    def setUp(self):
98        self.registered = []
99
100    def tearDown(self):
101        # unregister any previously registered report types
102        gsm = getGlobalSiteManager()
103        for report, name in self.registered:
104            gsm.unregisterUtility(report, IReportGenerator, name=name)
105        return
106
107    def register_generator(self, name, perm_create=None, perm_view=None):
108        # helper to register report generators as utils
109        gsm = getGlobalSiteManager()
110        generator = FakeReportGenerator(name)
111        gsm.registerUtility(generator, provided=IReportGenerator, name=name)
112        self.registered.append((generator, name),)
113        return generator
114
115class HelpersTests(GeneratorRegistrar, unittest.TestCase):
116    # Tests for helper functions
117
118    def setUp(self):
119        grok.testing.grok('waeup.kofa.reports') # register utils
120        super(HelpersTests, self).setUp()
121        return
122
123    def test_get_generators_none(self):
124        # we get no generators if none was registered
125        result = list(get_generators())
126        self.assertEqual(result, [])
127        return
128
129    def test_get_generators_simple(self):
130        # we get a single generator if one is registered
131        self.register_generator('report1')
132        result = list(get_generators())
133        self.assertEqual(
134            result, [('report1', FakeReportGenerator('report1'))])
135        return
136
137    def test_get_generators_multiple(self):
138        # we also get multiple generators if available
139        self.register_generator('report1')
140        self.register_generator('report2')
141        result = list(get_generators())
142        self.assertEqual(
143            result,
144            [(u'report1', FakeReportGenerator('report1')),
145             (u'report2', FakeReportGenerator('report2'))])
146        return
147
148
149class ReportJobTests(FunctionalAsyncTestCase, GeneratorRegistrar):
150    # Test asynchronous report functionality (simple cases)
151
152    layer = FunctionalLayer
153
154    def setUp(self):
155        super(ReportJobTests, self).setUp()
156        GeneratorRegistrar.setUp(self)
157        self.root_folder = self.getRootFolder()
158
159    def test_report_job_func(self):
160        # the report_job func really creates reports...
161        self.register_generator('report1')
162        report = report_job(None, 'report1')
163        self.assertTrue(IReport.providedBy(report))
164        self.assertTrue(isinstance(report, Report))
165
166    def test_report_job_interfaces(self):
167        # the AsyncReportJob implements promised interfaces correctly...
168        job = AsyncReportJob(None, None)
169        verifyClass(IJob, AsyncReportJob)
170        verifyObject(IJob, job)
171        verifyClass(IReportJob, AsyncReportJob)
172        verifyObject(IReportJob, job)
173        return
174
175    def test_finished(self):
176        # AsyncReportJobs signal with a bool whether they`re  finished
177        job = AsyncReportJob(self.root_folder, None)
178        setSite(self.root_folder)
179        self.assertEqual(job.finished, False)
180        manager = getUtility(IJobManager)
181        manager.put(job)
182        wait_for_result(job)
183        self.assertEqual(job.finished, True)
184        return
185
186    def test_failed_true(self):
187        # We can test whether a job failed
188        job = AsyncReportJob(self.root_folder, None) # no report generator
189        setSite(self.root_folder)
190        # while a job is not finished, `failed` is ``None``
191        self.assertTrue(job.failed is None)
192        manager = getUtility(IJobManager)
193        manager.put(job)
194        wait_for_result(job)
195        # the finished job failed
196        self.assertEqual(job.failed, True)
197        return
198
199    def test_failed_false(self):
200        # We can test whether a job failed
201        self.register_generator('report1')
202        job = AsyncReportJob(self.root_folder, 'report1')
203        setSite(self.root_folder)
204        # while a job is not finished, `failed` is ``None``
205        self.assertTrue(job.failed is None)
206        manager = getUtility(IJobManager)
207        manager.put(job)
208        wait_for_result(job)
209        # the finished job failed
210        self.assertEqual(job.failed, False)
211        return
212
213    def test_description(self):
214        # IReportJobs provide a description of the started job
215        self.register_generator('report1')
216        args, kw = ['a', 'b'], dict(a=1, b=2)
217        job = AsyncReportJob(self.root_folder, 'report1', args=args, kw=kw)
218        self.assertEqual(
219            job.description,
220            "Report 1 ('a', 'b', a=1, b=2)")
221        # w/o args nor kwargs
222        job = AsyncReportJob(self.root_folder, 'report1')
223        self.assertEqual(
224            job.description,
225            'Report 1 ()')
226        # with args only
227        job = AsyncReportJob(self.root_folder, 'report1', args=args)
228        self.assertEqual(
229            job.description, "Report 1 ('a', 'b')")
230        # with keywords only
231        job = AsyncReportJob(self.root_folder, 'report1', kw=kw)
232        self.assertEqual(
233            job.description, "Report 1 (a=1, b=2)")
234        return
235
236    def test_description_invalid_generator(self):
237        # We can get a description even with an invalid generator
238        job = AsyncReportJob(self.root_folder, 'NOT EXISTENT')
239        self.assertEqual(
240            job.description, u'Unregistered Report Generator ()')
241        # with args set (no kws)
242        job = AsyncReportJob(
243            self.root_folder, 'NOT EXISTENT', args=['a', 'b'])
244        self.assertEqual(
245            job.description, u"Unregistered Report Generator ('a', 'b')")
246        # with kw set (no args)
247        job = AsyncReportJob(
248            self.root_folder, 'NOT_EXISTENT', kw=dict(a=1, b=2))
249        self.assertEqual(
250            job.description, u'Unregistered Report Generator (a=1, b=2)')
251        # with args and kws set
252        job = AsyncReportJob(
253            self.root_folder, 'NOT_EXISTENT', args=['a'], kw=dict(b=2))
254        self.assertEqual(
255            job.description, u"Unregistered Report Generator ('a', b=2)")
256        return
257
258class FakeJobWithResult(FakeJob):
259
260    def __init__(self, args=[], kw={}):
261        self.result = Report()
262        self.result.args = args
263        self.result.kw = kw
264        return
265
266class ReportJobContainerTests(unittest.TestCase):
267    # Test ReportJobContainer
268
269    def setUp(self):
270        # register a suitable ICSVExporter as named utility
271        self.generator = FakeReportGenerator('report1')
272        self.job_manager = FakeJobManager()
273        self.gsm = getGlobalSiteManager()
274        self.gsm.registerUtility(
275            self.generator, IReportGenerator, name='report1')
276        self.gsm.registerUtility(
277            self.job_manager, IJobManager)
278
279    def tearDown(self):
280        self.gsm.unregisterUtility(
281            self.generator, IReportGenerator, name='report1')
282        self.gsm.unregisterUtility(self.job_manager, IJobManager)
283
284    def test_report_job_interfaces(self):
285        # the ExportJobContainer implements promised interfaces correctly...
286        container = ReportJobContainer()
287        verifyClass(IReportJobContainer, ReportJobContainer)
288        verifyObject(IReportJobContainer, container)
289        return
290
291    def test_start_report_job(self):
292        # we can start jobs
293        container = ReportJobContainer()
294        container.start_report_job('report3', 'bob')
295        result = self.job_manager._jobs.values()[0]
296        self.assertTrue(IJob.providedBy(result))
297        self.assertEqual(
298            container.running_report_jobs,
299            [('1', 'report3', 'bob')]
300            )
301        return
302
303    def test_get_running_report_jobs_all(self):
304        # we can get report jobs of all users
305        container = ReportJobContainer()
306        container.start_report_job('report3', 'bob')
307        container.start_report_job('report3', 'alice')
308        result = container.get_running_report_jobs()
309        self.assertEqual(
310            result,
311            [('1', 'report3', 'bob'),
312             ('2', 'report3', 'alice')]
313            )
314        return
315
316    def test_get_running_report_jobs_user(self):
317        # we can get the report jobs running for a certain user
318        container = ReportJobContainer()
319        container.start_report_job('report3', 'bob')
320        container.start_report_job('report3', 'alice')
321        result1 = container.get_running_report_jobs(user_id='alice')
322        result2 = container.get_running_report_jobs(user_id='foo')
323        self.assertEqual(
324            result1, [('2', 'report3', 'alice')])
325        self.assertEqual(
326            result2, [])
327        return
328
329    def test_get_running_report_jobs_only_if_exist(self):
330        # we get only jobs that are accessible through the job manager...
331        container = ReportJobContainer()
332        container.start_report_job('report3', 'bob')
333        container.start_report_job('report3', 'bob')
334        self.assertTrue(
335            ('2', 'report3', 'bob') in container.running_report_jobs)
336        # we remove the second entry from job manager
337        del self.job_manager._jobs['2']
338        result = container.get_running_report_jobs(user_id='bob')
339        self.assertEqual(
340            result, [('1', 'report3', 'bob')])
341        self.assertTrue(
342            ('2', 'report3', 'bob') not in container.running_report_jobs)
343        return
344
345    def test_get_report_job_status(self):
346        # we can get the stati of jobs...
347        container = ReportJobContainer()
348        container.start_report_job('report1', 'alice')
349        container.start_report_job('report1', 'bob')
350        container.start_report_job('report1', 'bob')
351        result = container.get_report_jobs_status(user_id='bob')
352        # we'll get the raw value, a translation and the title of the
353        # exporter
354        self.assertEqual(
355            result,
356            [('new', u'new', u'Report 1'),
357             ('completed', u'completed', u'Report 1')]
358            )
359        return
360
361    def test_get_report_job_description(self):
362        # we can get the descriptions of jobs...
363        container = ReportJobContainer()
364        container.start_report_job('report1', 'alice')
365        container.start_report_job('report1', 'bob')
366        result = container.get_report_jobs_description(user_id='bob')
367        # we'll get the job id, a description and the status
368        self.assertEqual(
369            result,
370            [('2', 'Report 1 ()', u'new'),]
371            )
372        return
373
374    def test_delete_report_entry(self):
375        # we can remove report entries in local lists and the job
376        # manager as well...
377        container = ReportJobContainer()
378        container.start_report_job('report3', 'bob')
379        entry = container.running_report_jobs[0]
380        container.delete_report_entry(entry)
381        # both, running_report_jobs list and job manager are empty now
382        self.assertEqual(
383            container.running_report_jobs, [])
384        self.assertEqual(
385            self.job_manager._jobs, {})
386        return
387
388    def test_report_entry_from_job_id(self):
389        # we can get an report entry for a job_id if the id exists
390        container = ReportJobContainer()
391        entry = ('4', 'report3', 'bob')
392        container.running_report_jobs = [entry]
393        fake_job = FakeJobWithResult()
394        self.job_manager._jobs['4'] = fake_job
395        result1 = container.report_entry_from_job_id(None)
396        result2 = container.report_entry_from_job_id('4')
397        result3 = container.report_entry_from_job_id('23')
398        self.assertEqual(result1, None)
399        self.assertEqual(result2, ('4', 'report3', 'bob'))
400        self.assertEqual(result3, None)
401        return
402
403
404class ReportsContainerTests(unittest.TestCase):
405    # Tests for ReportsContainer
406
407    def test_iface(self):
408        # ReportsContainers really provide the promised interfaces
409        obj = ReportsContainer()
410        verifyClass(IReportsContainer, ReportsContainer)
411        verifyClass(IContainer, ReportsContainer)
412        verifyObject(IReportsContainer, obj)
413        verifyObject(IContainer, obj)
414        return
415
416class ReportsContainerPluginTests(unittest.TestCase):
417    # Tests for ReportsContainerPlugin
418
419    def create_logger(self):
420        # create a logger suitable for local tests.
421        test_logger = logging.getLogger('waeup.kofa.reports.testlogger')
422        log = StringIO()
423        handler = logging.StreamHandler(log)
424        handler.setLevel(logging.DEBUG)
425        test_logger.addHandler(handler)
426        test_logger.setLevel(logging.DEBUG)
427        self.logger = test_logger
428        self.log = log
429        self.handler = handler
430        return self.logger
431
432    def remove_logger(self):
433        del self.handler
434        del self.logger
435        del self.log
436        pass
437
438    def get_log(self):
439        self.log.seek(0)
440        return self.log.read()
441
442    def setUp(self):
443        self.create_logger()
444        return
445
446    def tearDown(self):
447        self.remove_logger()
448        return
449
450    def test_iface(self):
451        # make sure we fullfill the promised interfaces
452        obj = ReportsContainerPlugin()
453        verifyClass(IKofaPluggable, ReportsContainerPlugin)
454        verifyObject(IKofaPluggable, obj)
455        return
456
457    def test_get_as_utility(self):
458        # make sure we can get the plugin as utility
459        grok.testing.grok('waeup.kofa.reports')
460        util = getUtility(IKofaPluggable, name='reports')
461        self.assertTrue(util is not None)
462        return
463
464    def test_update_no_container(self):
465        # we can update an existing site
466        fake_site = grok.Container()
467        plugin = ReportsContainerPlugin()
468        plugin.update(fake_site, 'app', self.logger)
469        log = self.get_log()
470        self.assertEqual(
471            log, 'Added reports container for site "app"\n')
472        self.assertTrue('reports' in fake_site.keys())
473        self.assertTrue(IReportsContainer.providedBy(fake_site['reports']))
474        return
475
476    def test_update_uptodate_site(self):
477        # we leave already existing reports containers in place
478        fake_site = grok.Container()
479        plugin = ReportsContainerPlugin()
480        fake_site['reports'] = ReportsContainer()
481        plugin.update(fake_site, 'app', self.logger)
482        log = self.get_log()
483        self.assertEqual(log, '') # no log message
484        return
485
486    def test_setup_new_site(self):
487        # if we setup a site, we always install a fresh reports container
488        fake_site = grok.Container()
489        plugin = ReportsContainerPlugin()
490        plugin.setup(fake_site, 'app', self.logger)
491        log1 = self.get_log()
492        result1 = fake_site.get('reports', None)
493        plugin.setup(fake_site, 'app', self.logger) # replace old container
494        log2 = self.get_log()
495        result2 = fake_site.get('reports', None)
496        self.assertTrue(result1 is not result2)
497        self.assertEqual(log1,
498                         'Added reports container for site "app"\n')
499        self.assertEqual(log2,
500                         'Added reports container for site "app"\n'
501                         'Removed reports container for site "app"\n'
502                         'Added reports container for site "app"\n')
503        return
Note: See TracBrowser for help on using the repository browser.