source: main/waeup.sirp/branches/ulif-stress-multimech/multi-mechanize/multi-mechanize.py @ 7543

Last change on this file since 7543 was 7519, checked in by uli, 13 years ago

Clean up code.

File size: 14.0 KB
Line 
1#!/usr/bin/env python
2#
3#  Copyright (c) 2010-2011 Corey Goldberg (corey@goldb.org)
4#  License: GNU LGPLv3 - distributed under the terms of the GNU Lesser
5#  General Public License version 3
6#
7#  This file is part of Multi-Mechanize:
8#       Multi-Process, Multi-Threaded, Web Load Generator, with
9#       python-mechanize agents
10#
11#  requires Python 2.6+
12import ConfigParser
13import glob
14import multiprocessing
15import optparse
16import os
17import Queue
18import shutil
19import subprocess
20import sys
21import threading
22import time
23import lib.results as results
24import lib.progressbar as progressbar
25
26usage = 'Usage: %prog <project name> [options]'
27parser = optparse.OptionParser(usage=usage)
28parser.add_option(
29    '-p', '--port', dest='port', type='int', help='rpc listener port')
30parser.add_option(
31    '-r', '--results', dest='results_dir',
32    help='results directory to reprocess')
33cmd_opts, args = parser.parse_args()
34
35try:
36    project_name = args[0]
37except IndexError:
38    sys.stderr.write('\nERROR: no project specified\n\n')
39    sys.stderr.write('usage: python multi-mechanize.py <project_name>\n')
40    sys.stderr.write('example: python multi-mechanize.py default_project\n\n')
41    sys.exit(1)
42
43scripts_path = 'projects/%s/test_scripts' % project_name
44if not os.path.exists(scripts_path):
45    sys.stderr.write('\nERROR: can not find project: %s\n\n' % project_name)
46    sys.exit(1)
47sys.path.append(scripts_path)
48for f in glob.glob( '%s/*.py' % scripts_path):
49    # import all test scripts as modules
50    f = f.replace(scripts_path, '').replace(os.sep, '').replace('.py', '')
51    exec('import %s' % f)
52
53
54
55def main():
56    if cmd_opts.results_dir:  # don't run a test, just reprocess results
57        rerun_results(cmd_opts.results_dir)
58    elif cmd_opts.port:
59        import lib.rpcserver
60        lib.rpcserver.launch_rpc_server(cmd_opts.port, project_name, run_test)
61    else:
62        run_test()
63    return
64
65
66def run_test(remote_starter=None):
67    if remote_starter is not None:
68        remote_starter.test_running = True
69        remote_starter.output_dir = None
70    (run_time, rampup, results_ts_interval, console_logging, progress_bar,
71     results_database, post_run_script, xml_report,
72     user_group_configs) = configure(project_name)
73
74    run_localtime = time.localtime()
75    output_dir = time.strftime(
76        'projects/' + project_name
77        + '/results/results_%Y.%m.%d_%H.%M.%S/', run_localtime)
78
79    # this queue is shared between all processes/threads
80    queue = multiprocessing.Queue()
81    rw = ResultsWriter(queue, output_dir, console_logging)
82    rw.daemon = True
83    rw.start()
84
85    user_groups = []
86    for i, ug_config in enumerate(user_group_configs):
87        ug = UserGroup(queue, i, ug_config.name, ug_config.num_threads,
88                       ug_config.script_file, run_time, rampup)
89        user_groups.append(ug)
90    for user_group in user_groups:
91        user_group.start()
92
93    start_time = time.time()
94
95    if console_logging:
96        for user_group in user_groups:
97            user_group.join()
98    else:
99        print '\n  user_groups:  %i' % len(user_groups)
100        print '  threads: %i\n' % (ug_config.num_threads * len(user_groups))
101
102        if progress_bar:
103            p = progressbar.ProgressBar(run_time)
104            elapsed = 0
105            while elapsed < (run_time + 1):
106                p.update_time(elapsed)
107                if sys.platform.startswith('win'):
108                    print '%s   transactions: %i  timers: %i  errors: %i\r' % (
109                        p, rw.trans_count, rw.timer_count, rw.error_count),
110                else:
111                    print '%s   transactions: %i  timers: %i  errors: %i' % (
112                        p, rw.trans_count, rw.timer_count, rw.error_count)
113                    sys.stdout.write(chr(27) + '[A' )
114                time.sleep(1)
115                elapsed = time.time() - start_time
116            print p
117
118        while [user_group for user_group in user_groups
119               if user_group.is_alive()] != []:
120            if progress_bar:
121                if sys.platform.startswith('win'):
122                    print 'waiting for all requests to finish...\r',
123                else:
124                    print 'waiting for all requests to finish...\r'
125                    sys.stdout.write(chr(27) + '[A' )
126            time.sleep(.5)
127
128        if not sys.platform.startswith('win'):
129            print
130
131    # all agents are done running at this point
132    time.sleep(.2) # make sure the writer queue is flushed
133    print '\n\nanalyzing results...\n'
134    results.output_results(
135        output_dir, 'results.csv', run_time, rampup, results_ts_interval,
136        user_group_configs, xml_report)
137    print 'created: %sresults.html\n' % output_dir
138    if xml_report:
139        print 'created: %sresults.jtl' % output_dir
140        print 'created: last_results.jtl\n'
141
142    # copy config file to results directory
143    project_config = os.sep.join(['projects', project_name, 'config.cfg'])
144    saved_config = os.sep.join([output_dir, 'config.cfg'])
145    shutil.copy(project_config, saved_config)
146
147    if results_database is not None:
148        print 'loading results into database: %s\n' % results_database
149        import lib.resultsloader
150        lib.resultsloader.load_results_database(
151            project_name, run_localtime, output_dir, results_database,
152            run_time, rampup, results_ts_interval, user_group_configs)
153
154    if post_run_script is not None:
155        print 'running post_run_script: %s\n' % post_run_script
156        subprocess.call(post_run_script)
157
158    print 'done.\n'
159
160    if remote_starter is not None:
161        remote_starter.test_running = False
162        remote_starter.output_dir = output_dir
163    return
164
165
166
167def rerun_results(results_dir):
168    output_dir = 'projects/%s/results/%s/' % (project_name, results_dir)
169    saved_config = '%s/config.cfg' % output_dir
170    (run_time, rampup, results_ts_interval, console_logging, progress_bar,
171     results_database, post_run_script, xml_report,
172     user_group_configs) = configure(project_name, config_file=saved_config)
173    print '\n\nanalyzing results...\n'
174    results.output_results(
175        output_dir, 'results.csv', run_time, rampup, results_ts_interval,
176        user_group_configs, xml_report)
177    print 'created: %sresults.html\n' % output_dir
178    if xml_report:
179        print 'created: %sresults.jtl' % output_dir
180        print 'created: last_results.jtl\n'
181
182
183
184def configure(project_name, config_file=None):
185    user_group_configs = []
186    config = ConfigParser.ConfigParser()
187    if config_file is None:
188        config_file = 'projects/%s/config.cfg' % project_name
189    config.read(config_file)
190    for section in config.sections():
191        if section == 'global':
192            run_time = config.getint(section, 'run_time')
193            rampup = config.getint(section, 'rampup')
194            results_ts_interval = config.getint(section, 'results_ts_interval')
195            try:
196                console_logging = config.getboolean(section, 'console_logging')
197            except ConfigParser.NoOptionError:
198                console_logging = False
199            try:
200                progress_bar = config.getboolean(section, 'progress_bar')
201            except ConfigParser.NoOptionError:
202                progress_bar = True
203            try:
204                results_database = config.get(section, 'results_database')
205                if results_database == 'None': results_database = None
206            except ConfigParser.NoOptionError:
207                results_database = None
208            try:
209                post_run_script = config.get(section, 'post_run_script')
210                if post_run_script == 'None': post_run_script = None
211            except ConfigParser.NoOptionError:
212                post_run_script = None
213            try:
214                xml_report = config.getboolean(section, 'xml_report')
215            except ConfigParser.NoOptionError:
216                xml_report = False
217        else:
218            threads = config.getint(section, 'threads')
219            script = config.get(section, 'script')
220            user_group_name = section
221            ug_config = UserGroupConfig(threads, user_group_name, script)
222            user_group_configs.append(ug_config)
223
224    return (run_time, rampup, results_ts_interval, console_logging,
225            progress_bar, results_database, post_run_script, xml_report,
226            user_group_configs)
227
228
229
230class UserGroupConfig(object):
231    def __init__(self, num_threads, name, script_file):
232        self.num_threads = num_threads
233        self.name = name
234        self.script_file = script_file
235
236
237
238class UserGroup(multiprocessing.Process):
239    def __init__(self, queue, process_num, user_group_name, num_threads,
240                 script_file, run_time, rampup):
241        multiprocessing.Process.__init__(self)
242        self.queue = queue
243        self.process_num = process_num
244        self.user_group_name = user_group_name
245        self.num_threads = num_threads
246        self.script_file = script_file
247        self.run_time = run_time
248        self.rampup = rampup
249        self.start_time = time.time()
250
251    def run(self):
252        threads = []
253        for i in range(self.num_threads):
254            spacing = float(self.rampup) / float(self.num_threads)
255            if i > 0:
256                time.sleep(spacing)
257            agent_thread = Agent(self.queue, self.process_num, i,
258                                 self.start_time, self.run_time,
259                                 self.user_group_name, self.script_file)
260            agent_thread.daemon = True
261            threads.append(agent_thread)
262            agent_thread.start()
263        for agent_thread in threads:
264            agent_thread.join()
265
266
267
268class Agent(threading.Thread):
269    def __init__(self, queue, process_num, thread_num, start_time,
270                 run_time, user_group_name, script_file):
271        threading.Thread.__init__(self)
272        self.queue = queue
273        self.process_num = process_num
274        self.thread_num = thread_num
275        self.start_time = start_time
276        self.run_time = run_time
277        self.user_group_name = user_group_name
278        self.script_file = script_file
279
280        # choose most accurate timer to use (time.clock has finer
281        # granularity than time.time on windows, but shouldn't be used
282        # on other systems)
283        if sys.platform.startswith('win'):
284            self.default_timer = time.clock
285        else:
286            self.default_timer = time.time
287
288
289    def run(self):
290        elapsed = 0
291
292        if self.script_file.lower().endswith('.py'):
293            module_name = self.script_file.replace('.py', '')
294        else:
295            sys.stderr.write(
296                'ERROR: scripts must have .py extension. can not run '
297                'test script: %s.  aborting user group: %s\n' % (
298                    self.script_file, self.user_group_name))
299            return
300        try:
301            trans = eval(module_name + '.Transaction()')
302        except NameError, e:
303            sys.stderr.write(
304                'ERROR: can not find test script: %s.  aborting user '
305                'group: %s\n' % (self.script_file, self.user_group_name))
306            return
307        except Exception, e:
308            sys.stderr.write(
309                'ERROR: failed initializing Transaction: %s.  aborting '
310                'user group: %s\n' % (self.script_file, self.user_group_name))
311            return
312
313        trans.custom_timers = {}
314
315        # scripts have access to these vars, which can be useful for
316        # loading unique data
317        trans.thread_num = self.thread_num
318        trans.process_num = self.process_num
319
320        while elapsed < self.run_time:
321            error = ''
322            start = self.default_timer()
323
324            try:
325                trans.run()
326            except Exception, e:  # test runner catches all script
327                                  # exceptions here
328                error = str(e).replace(',', '')
329
330            finish = self.default_timer()
331
332            scriptrun_time = finish - start
333            elapsed = time.time() - self.start_time
334
335            epoch = time.mktime(time.localtime())
336
337            fields = (elapsed, epoch, self.user_group_name,
338                      scriptrun_time, error, trans.custom_timers)
339            self.queue.put(fields)
340
341
342
343class ResultsWriter(threading.Thread):
344    def __init__(self, queue, output_dir, console_logging):
345        threading.Thread.__init__(self)
346        self.queue = queue
347        self.console_logging = console_logging
348        self.output_dir = output_dir
349        self.trans_count = 0
350        self.timer_count = 0
351        self.error_count = 0
352
353        try:
354            os.makedirs(self.output_dir, 0755)
355        except OSError:
356            sys.stderr.write('ERROR: Can not create output directory\n')
357            sys.exit(1)
358
359    def run(self):
360        with open(self.output_dir + 'results.csv', 'w') as f:
361            while True:
362                try:
363                    (elapsed, epoch, self.user_group_name, scriptrun_time,
364                    error, custom_timers) = self.queue.get(False)
365                    self.trans_count += 1
366                    self.timer_count += len(custom_timers)
367                    if error != '':
368                        self.error_count += 1
369                    f.write('%i,%.3f,%i,%s,%f,%s,%s\n' % (
370                            self.trans_count, elapsed, epoch,
371                            self.user_group_name, scriptrun_time, error,
372                            repr(custom_timers)))
373                    f.flush()
374                    if self.console_logging:
375                        print '%i, %.3f, %i, %s, %.3f, %s, %s' % (
376                            self.trans_count, elapsed, epoch,
377                            self.user_group_name, scriptrun_time, error,
378                            repr(custom_timers))
379                except Queue.Empty:
380                    time.sleep(.05)
381
382if __name__ == '__main__':
383    main()
384
Note: See TracBrowser for help on using the repository browser.