source: main/waeup.sirp/branches/ulif-stress-multimech/multi-mechanize/multi-mechanize.py @ 7486

Last change on this file since 7486 was 7478, checked in by uli, 13 years ago

Sample usage of multi-mechanize (not finished).

File size: 13.6 KB
Line 
1#!/usr/bin/env python
2#
3#  Copyright (c) 2010-2011 Corey Goldberg (corey@goldb.org)
4#  License: GNU LGPLv3 - distributed under the terms of the GNU Lesser General Public License version 3
5
6#  This file is part of Multi-Mechanize:
7#       Multi-Process, Multi-Threaded, Web Load Generator, with python-mechanize agents
8#
9#  requires Python 2.6+
10
11
12
13import ConfigParser
14import glob
15import multiprocessing
16import optparse
17import os
18import Queue
19import shutil
20import subprocess
21import sys
22import threading
23import time
24import lib.results as results
25import lib.progressbar as progressbar       
26
27usage = 'Usage: %prog <project name> [options]'
28parser = optparse.OptionParser(usage=usage)
29parser.add_option('-p', '--port', dest='port', type='int', help='rpc listener port')
30parser.add_option('-r', '--results', dest='results_dir', help='results directory to reprocess')
31cmd_opts, args = parser.parse_args()
32
33try:
34    project_name = args[0]
35except IndexError:
36    sys.stderr.write('\nERROR: no project specified\n\n')
37    sys.stderr.write('usage: python multi-mechanize.py <project_name>\n')
38    sys.stderr.write('example: python multi-mechanize.py default_project\n\n')
39    sys.exit(1) 
40
41scripts_path = 'projects/%s/test_scripts' % project_name
42if not os.path.exists(scripts_path):
43    sys.stderr.write('\nERROR: can not find project: %s\n\n' % project_name)
44    sys.exit(1)
45sys.path.append(scripts_path)         
46for f in glob.glob( '%s/*.py' % scripts_path):  # import all test scripts as modules
47    f = f.replace(scripts_path, '').replace(os.sep, '').replace('.py', '')
48    exec('import %s' % f)
49
50
51
52def main():
53    if cmd_opts.results_dir:  # don't run a test, just reprocess results
54        rerun_results(cmd_opts.results_dir)
55    elif cmd_opts.port:
56        import lib.rpcserver
57        lib.rpcserver.launch_rpc_server(cmd_opts.port, project_name, run_test)
58    else: 
59        run_test()
60    return
61       
62       
63   
64def run_test(remote_starter=None):
65    if remote_starter is not None:
66        remote_starter.test_running = True
67        remote_starter.output_dir = None
68       
69    run_time, rampup, results_ts_interval, console_logging, progress_bar, results_database, post_run_script, xml_report, user_group_configs = configure(project_name)
70   
71    run_localtime = time.localtime()
72    output_dir = time.strftime('projects/' + project_name + '/results/results_%Y.%m.%d_%H.%M.%S/', run_localtime)
73       
74    # this queue is shared between all processes/threads
75    queue = multiprocessing.Queue()
76    rw = ResultsWriter(queue, output_dir, console_logging)
77    rw.daemon = True
78    rw.start()
79   
80    user_groups = []
81    for i, ug_config in enumerate(user_group_configs):
82        ug = UserGroup(queue, i, ug_config.name, ug_config.num_threads, ug_config.script_file, run_time, rampup)
83        user_groups.append(ug)   
84    for user_group in user_groups:
85        user_group.start()
86       
87    start_time = time.time()
88   
89    if console_logging:
90        for user_group in user_groups:
91            user_group.join()
92    else:
93        print '\n  user_groups:  %i' % len(user_groups)
94        print '  threads: %i\n' % (ug_config.num_threads * len(user_groups))
95       
96        if progress_bar:
97            p = progressbar.ProgressBar(run_time)
98            elapsed = 0
99            while elapsed < (run_time + 1):
100                p.update_time(elapsed)
101                if sys.platform.startswith('win'):
102                    print '%s   transactions: %i  timers: %i  errors: %i\r' % (p, rw.trans_count, rw.timer_count, rw.error_count),
103                else:
104                    print '%s   transactions: %i  timers: %i  errors: %i' % (p, rw.trans_count, rw.timer_count, rw.error_count)
105                    sys.stdout.write(chr(27) + '[A' )
106                time.sleep(1)
107                elapsed = time.time() - start_time
108           
109            print p
110           
111        while [user_group for user_group in user_groups if user_group.is_alive()] != []:
112            if progress_bar:
113                if sys.platform.startswith('win'):
114                    print 'waiting for all requests to finish...\r',
115                else:
116                    print 'waiting for all requests to finish...\r'
117                    sys.stdout.write(chr(27) + '[A' )
118            time.sleep(.5)
119           
120        if not sys.platform.startswith('win'):
121            print
122
123    # all agents are done running at this point
124    time.sleep(.2) # make sure the writer queue is flushed
125    print '\n\nanalyzing results...\n'
126    results.output_results(output_dir, 'results.csv', run_time, rampup, results_ts_interval, user_group_configs, xml_report)
127    print 'created: %sresults.html\n' % output_dir
128    if xml_report:
129        print 'created: %sresults.jtl' % output_dir
130        print 'created: last_results.jtl\n'
131   
132    # copy config file to results directory
133    project_config = os.sep.join(['projects', project_name, 'config.cfg'])
134    saved_config = os.sep.join([output_dir, 'config.cfg'])
135    shutil.copy(project_config, saved_config)
136   
137    if results_database is not None:
138        print 'loading results into database: %s\n' % results_database
139        import lib.resultsloader
140        lib.resultsloader.load_results_database(project_name, run_localtime, output_dir, results_database,
141                run_time, rampup, results_ts_interval, user_group_configs)
142   
143    if post_run_script is not None:
144        print 'running post_run_script: %s\n' % post_run_script
145        subprocess.call(post_run_script)
146       
147    print 'done.\n'
148   
149    if remote_starter is not None:
150        remote_starter.test_running = False
151        remote_starter.output_dir = output_dir
152   
153    return
154   
155   
156   
157def rerun_results(results_dir):
158    output_dir = 'projects/%s/results/%s/' % (project_name, results_dir)
159    saved_config = '%s/config.cfg' % output_dir
160    run_time, rampup, results_ts_interval, console_logging, progress_bar, results_database, post_run_script, xml_report, user_group_configs = configure(project_name, config_file=saved_config)
161    print '\n\nanalyzing results...\n'
162    results.output_results(output_dir, 'results.csv', run_time, rampup, results_ts_interval, user_group_configs, xml_report)
163    print 'created: %sresults.html\n' % output_dir
164    if xml_report:
165        print 'created: %sresults.jtl' % output_dir
166        print 'created: last_results.jtl\n'
167   
168
169
170def configure(project_name, config_file=None):
171    user_group_configs = []
172    config = ConfigParser.ConfigParser()
173    if config_file is None:
174        config_file = 'projects/%s/config.cfg' % project_name
175    config.read(config_file)
176    for section in config.sections():
177        if section == 'global':
178            run_time = config.getint(section, 'run_time')
179            rampup = config.getint(section, 'rampup')
180            results_ts_interval = config.getint(section, 'results_ts_interval')
181            try:
182                console_logging = config.getboolean(section, 'console_logging')
183            except ConfigParser.NoOptionError:
184                console_logging = False
185            try:
186                progress_bar = config.getboolean(section, 'progress_bar')
187            except ConfigParser.NoOptionError:
188                progress_bar = True
189            try:
190                results_database = config.get(section, 'results_database')
191                if results_database == 'None': results_database = None
192            except ConfigParser.NoOptionError:
193                results_database = None
194            try:
195                post_run_script = config.get(section, 'post_run_script')
196                if post_run_script == 'None': post_run_script = None
197            except ConfigParser.NoOptionError:
198                post_run_script = None
199            try:
200                xml_report = config.getboolean(section, 'xml_report')
201            except ConfigParser.NoOptionError:
202                xml_report = False
203        else:
204            threads = config.getint(section, 'threads')
205            script = config.get(section, 'script')
206            user_group_name = section
207            ug_config = UserGroupConfig(threads, user_group_name, script)
208            user_group_configs.append(ug_config)
209
210    return (run_time, rampup, results_ts_interval, console_logging, progress_bar, results_database, post_run_script, xml_report, user_group_configs)
211   
212
213
214class UserGroupConfig(object):
215    def __init__(self, num_threads, name, script_file):
216        self.num_threads = num_threads
217        self.name = name
218        self.script_file = script_file
219   
220   
221   
222class UserGroup(multiprocessing.Process):
223    def __init__(self, queue, process_num, user_group_name, num_threads, script_file, run_time, rampup):
224        multiprocessing.Process.__init__(self)
225        self.queue = queue
226        self.process_num = process_num
227        self.user_group_name = user_group_name
228        self.num_threads = num_threads
229        self.script_file = script_file
230        self.run_time = run_time
231        self.rampup = rampup
232        self.start_time = time.time()
233       
234    def run(self):
235        threads = []
236        for i in range(self.num_threads):
237            spacing = float(self.rampup) / float(self.num_threads)
238            if i > 0:
239                time.sleep(spacing)
240            agent_thread = Agent(self.queue, self.process_num, i, self.start_time, self.run_time, self.user_group_name, self.script_file)
241            agent_thread.daemon = True
242            threads.append(agent_thread)
243            agent_thread.start()           
244        for agent_thread in threads:
245            agent_thread.join()
246       
247
248
249class Agent(threading.Thread):
250    def __init__(self, queue, process_num, thread_num, start_time, run_time, user_group_name, script_file):
251        threading.Thread.__init__(self)
252        self.queue = queue
253        self.process_num = process_num
254        self.thread_num = thread_num
255        self.start_time = start_time
256        self.run_time = run_time
257        self.user_group_name = user_group_name
258        self.script_file = script_file
259       
260        # choose most accurate timer to use (time.clock has finer granularity than time.time on windows, but shouldn't be used on other systems)
261        if sys.platform.startswith('win'):
262            self.default_timer = time.clock
263        else:
264            self.default_timer = time.time
265   
266   
267    def run(self):
268        elapsed = 0
269       
270        if self.script_file.lower().endswith('.py'):
271            module_name = self.script_file.replace('.py', '')
272        else:
273            sys.stderr.write('ERROR: scripts must have .py extension. can not run test script: %s.  aborting user group: %s\n' % (self.script_file, self.user_group_name))
274            return
275        try:
276            trans = eval(module_name + '.Transaction()')
277        except NameError, e:
278            sys.stderr.write('ERROR: can not find test script: %s.  aborting user group: %s\n' % (self.script_file, self.user_group_name))
279            return
280        except Exception, e:
281            sys.stderr.write('ERROR: failed initializing Transaction: %s.  aborting user group: %s\n' % (self.script_file, self.user_group_name))
282            return
283       
284        trans.custom_timers = {}
285       
286        # scripts have access to these vars, which can be useful for loading unique data
287        trans.thread_num = self.thread_num
288        trans.process_num = self.process_num
289           
290        while elapsed < self.run_time:
291            error = ''
292            start = self.default_timer() 
293           
294            try:
295                trans.run()
296            except Exception, e:  # test runner catches all script exceptions here
297                error = str(e).replace(',', '')
298
299            finish = self.default_timer()
300           
301            scriptrun_time = finish - start
302            elapsed = time.time() - self.start_time
303
304            epoch = time.mktime(time.localtime())
305           
306            fields = (elapsed, epoch, self.user_group_name, scriptrun_time, error, trans.custom_timers)
307            self.queue.put(fields)
308           
309
310
311class ResultsWriter(threading.Thread):
312    def __init__(self, queue, output_dir, console_logging):
313        threading.Thread.__init__(self)
314        self.queue = queue
315        self.console_logging = console_logging
316        self.output_dir = output_dir
317        self.trans_count = 0
318        self.timer_count = 0
319        self.error_count = 0
320       
321        try:
322            os.makedirs(self.output_dir, 0755)
323        except OSError:
324            sys.stderr.write('ERROR: Can not create output directory\n')
325            sys.exit(1)   
326   
327    def run(self):
328        with open(self.output_dir + 'results.csv', 'w') as f:     
329            while True:
330                try:
331                    elapsed, epoch, self.user_group_name, scriptrun_time, error, custom_timers = self.queue.get(False)
332                    self.trans_count += 1
333                    self.timer_count += len(custom_timers)
334                    if error != '':
335                        self.error_count += 1
336                    f.write('%i,%.3f,%i,%s,%f,%s,%s\n' % (self.trans_count, elapsed, epoch, self.user_group_name, scriptrun_time, error, repr(custom_timers)))
337                    f.flush()
338                    if self.console_logging:
339                        print '%i, %.3f, %i, %s, %.3f, %s, %s' % (self.trans_count, elapsed, epoch, self.user_group_name, scriptrun_time, error, repr(custom_timers))
340                except Queue.Empty:
341                    time.sleep(.05)
342
343
344
345if __name__ == '__main__':
346    main()
347
Note: See TracBrowser for help on using the repository browser.