source: main/waeup.sirp/branches/ulif-stress-multimech/multi-mechanize/multi-mechanize.py @ 7519

Last change on this file since 7519 was 7519, checked in by uli, 13 years ago

Clean up code.

File size: 14.0 KB
RevLine 
[7478]1#!/usr/bin/env python
2#
3#  Copyright (c) 2010-2011 Corey Goldberg (corey@goldb.org)
[7519]4#  License: GNU LGPLv3 - distributed under the terms of the GNU Lesser
5#  General Public License version 3
6#
[7478]7#  This file is part of Multi-Mechanize:
[7519]8#       Multi-Process, Multi-Threaded, Web Load Generator, with
9#       python-mechanize agents
[7478]10#
11#  requires Python 2.6+
12import ConfigParser
13import glob
14import multiprocessing
15import optparse
16import os
17import Queue
18import shutil
19import subprocess
20import sys
21import threading
22import time
23import lib.results as results
[7519]24import lib.progressbar as progressbar
[7478]25
26usage = 'Usage: %prog <project name> [options]'
27parser = optparse.OptionParser(usage=usage)
[7519]28parser.add_option(
29    '-p', '--port', dest='port', type='int', help='rpc listener port')
30parser.add_option(
31    '-r', '--results', dest='results_dir',
32    help='results directory to reprocess')
[7478]33cmd_opts, args = parser.parse_args()
34
35try:
36    project_name = args[0]
37except IndexError:
38    sys.stderr.write('\nERROR: no project specified\n\n')
39    sys.stderr.write('usage: python multi-mechanize.py <project_name>\n')
40    sys.stderr.write('example: python multi-mechanize.py default_project\n\n')
[7519]41    sys.exit(1)
[7478]42
43scripts_path = 'projects/%s/test_scripts' % project_name
44if not os.path.exists(scripts_path):
45    sys.stderr.write('\nERROR: can not find project: %s\n\n' % project_name)
[7519]46    sys.exit(1)
47sys.path.append(scripts_path)
48for f in glob.glob( '%s/*.py' % scripts_path):
49    # import all test scripts as modules
[7478]50    f = f.replace(scripts_path, '').replace(os.sep, '').replace('.py', '')
51    exec('import %s' % f)
52
53
54
55def main():
56    if cmd_opts.results_dir:  # don't run a test, just reprocess results
57        rerun_results(cmd_opts.results_dir)
58    elif cmd_opts.port:
59        import lib.rpcserver
60        lib.rpcserver.launch_rpc_server(cmd_opts.port, project_name, run_test)
[7519]61    else:
[7478]62        run_test()
63    return
[7519]64
65
[7478]66def run_test(remote_starter=None):
67    if remote_starter is not None:
68        remote_starter.test_running = True
69        remote_starter.output_dir = None
[7519]70    (run_time, rampup, results_ts_interval, console_logging, progress_bar,
71     results_database, post_run_script, xml_report,
72     user_group_configs) = configure(project_name)
73
74    run_localtime = time.localtime()
75    output_dir = time.strftime(
76        'projects/' + project_name
77        + '/results/results_%Y.%m.%d_%H.%M.%S/', run_localtime)
78
[7478]79    # this queue is shared between all processes/threads
80    queue = multiprocessing.Queue()
81    rw = ResultsWriter(queue, output_dir, console_logging)
82    rw.daemon = True
83    rw.start()
[7519]84
85    user_groups = []
[7478]86    for i, ug_config in enumerate(user_group_configs):
[7519]87        ug = UserGroup(queue, i, ug_config.name, ug_config.num_threads,
88                       ug_config.script_file, run_time, rampup)
89        user_groups.append(ug)
[7478]90    for user_group in user_groups:
91        user_group.start()
[7519]92
93    start_time = time.time()
94
[7478]95    if console_logging:
96        for user_group in user_groups:
97            user_group.join()
98    else:
99        print '\n  user_groups:  %i' % len(user_groups)
100        print '  threads: %i\n' % (ug_config.num_threads * len(user_groups))
[7519]101
[7478]102        if progress_bar:
103            p = progressbar.ProgressBar(run_time)
104            elapsed = 0
105            while elapsed < (run_time + 1):
106                p.update_time(elapsed)
107                if sys.platform.startswith('win'):
[7519]108                    print '%s   transactions: %i  timers: %i  errors: %i\r' % (
109                        p, rw.trans_count, rw.timer_count, rw.error_count),
[7478]110                else:
[7519]111                    print '%s   transactions: %i  timers: %i  errors: %i' % (
112                        p, rw.trans_count, rw.timer_count, rw.error_count)
[7478]113                    sys.stdout.write(chr(27) + '[A' )
114                time.sleep(1)
115                elapsed = time.time() - start_time
116            print p
[7519]117
118        while [user_group for user_group in user_groups
119               if user_group.is_alive()] != []:
[7478]120            if progress_bar:
121                if sys.platform.startswith('win'):
122                    print 'waiting for all requests to finish...\r',
123                else:
124                    print 'waiting for all requests to finish...\r'
125                    sys.stdout.write(chr(27) + '[A' )
126            time.sleep(.5)
[7519]127
[7478]128        if not sys.platform.startswith('win'):
129            print
130
131    # all agents are done running at this point
132    time.sleep(.2) # make sure the writer queue is flushed
133    print '\n\nanalyzing results...\n'
[7519]134    results.output_results(
135        output_dir, 'results.csv', run_time, rampup, results_ts_interval,
136        user_group_configs, xml_report)
[7478]137    print 'created: %sresults.html\n' % output_dir
138    if xml_report:
139        print 'created: %sresults.jtl' % output_dir
140        print 'created: last_results.jtl\n'
[7519]141
[7478]142    # copy config file to results directory
143    project_config = os.sep.join(['projects', project_name, 'config.cfg'])
144    saved_config = os.sep.join([output_dir, 'config.cfg'])
145    shutil.copy(project_config, saved_config)
[7519]146
[7478]147    if results_database is not None:
148        print 'loading results into database: %s\n' % results_database
149        import lib.resultsloader
[7519]150        lib.resultsloader.load_results_database(
151            project_name, run_localtime, output_dir, results_database,
152            run_time, rampup, results_ts_interval, user_group_configs)
153
[7478]154    if post_run_script is not None:
155        print 'running post_run_script: %s\n' % post_run_script
156        subprocess.call(post_run_script)
[7519]157
[7478]158    print 'done.\n'
[7519]159
[7478]160    if remote_starter is not None:
161        remote_starter.test_running = False
162        remote_starter.output_dir = output_dir
163    return
[7519]164
165
166
[7478]167def rerun_results(results_dir):
168    output_dir = 'projects/%s/results/%s/' % (project_name, results_dir)
169    saved_config = '%s/config.cfg' % output_dir
[7519]170    (run_time, rampup, results_ts_interval, console_logging, progress_bar,
171     results_database, post_run_script, xml_report,
172     user_group_configs) = configure(project_name, config_file=saved_config)
[7478]173    print '\n\nanalyzing results...\n'
[7519]174    results.output_results(
175        output_dir, 'results.csv', run_time, rampup, results_ts_interval,
176        user_group_configs, xml_report)
[7478]177    print 'created: %sresults.html\n' % output_dir
178    if xml_report:
179        print 'created: %sresults.jtl' % output_dir
180        print 'created: last_results.jtl\n'
181
182
[7519]183
[7478]184def configure(project_name, config_file=None):
185    user_group_configs = []
186    config = ConfigParser.ConfigParser()
187    if config_file is None:
188        config_file = 'projects/%s/config.cfg' % project_name
189    config.read(config_file)
190    for section in config.sections():
191        if section == 'global':
192            run_time = config.getint(section, 'run_time')
193            rampup = config.getint(section, 'rampup')
194            results_ts_interval = config.getint(section, 'results_ts_interval')
[7519]195            try:
[7478]196                console_logging = config.getboolean(section, 'console_logging')
197            except ConfigParser.NoOptionError:
198                console_logging = False
[7519]199            try:
[7478]200                progress_bar = config.getboolean(section, 'progress_bar')
201            except ConfigParser.NoOptionError:
202                progress_bar = True
203            try:
204                results_database = config.get(section, 'results_database')
205                if results_database == 'None': results_database = None
206            except ConfigParser.NoOptionError:
207                results_database = None
208            try:
209                post_run_script = config.get(section, 'post_run_script')
210                if post_run_script == 'None': post_run_script = None
211            except ConfigParser.NoOptionError:
212                post_run_script = None
213            try:
214                xml_report = config.getboolean(section, 'xml_report')
215            except ConfigParser.NoOptionError:
216                xml_report = False
217        else:
218            threads = config.getint(section, 'threads')
219            script = config.get(section, 'script')
220            user_group_name = section
221            ug_config = UserGroupConfig(threads, user_group_name, script)
222            user_group_configs.append(ug_config)
223
[7519]224    return (run_time, rampup, results_ts_interval, console_logging,
225            progress_bar, results_database, post_run_script, xml_report,
226            user_group_configs)
[7478]227
228
[7519]229
[7478]230class UserGroupConfig(object):
231    def __init__(self, num_threads, name, script_file):
232        self.num_threads = num_threads
233        self.name = name
234        self.script_file = script_file
[7519]235
236
237
[7478]238class UserGroup(multiprocessing.Process):
[7519]239    def __init__(self, queue, process_num, user_group_name, num_threads,
240                 script_file, run_time, rampup):
[7478]241        multiprocessing.Process.__init__(self)
242        self.queue = queue
243        self.process_num = process_num
244        self.user_group_name = user_group_name
245        self.num_threads = num_threads
246        self.script_file = script_file
247        self.run_time = run_time
248        self.rampup = rampup
249        self.start_time = time.time()
[7519]250
[7478]251    def run(self):
252        threads = []
253        for i in range(self.num_threads):
254            spacing = float(self.rampup) / float(self.num_threads)
255            if i > 0:
256                time.sleep(spacing)
[7519]257            agent_thread = Agent(self.queue, self.process_num, i,
258                                 self.start_time, self.run_time,
259                                 self.user_group_name, self.script_file)
[7478]260            agent_thread.daemon = True
261            threads.append(agent_thread)
[7519]262            agent_thread.start()
[7478]263        for agent_thread in threads:
264            agent_thread.join()
265
266
[7519]267
[7478]268class Agent(threading.Thread):
[7519]269    def __init__(self, queue, process_num, thread_num, start_time,
270                 run_time, user_group_name, script_file):
[7478]271        threading.Thread.__init__(self)
272        self.queue = queue
273        self.process_num = process_num
274        self.thread_num = thread_num
275        self.start_time = start_time
276        self.run_time = run_time
277        self.user_group_name = user_group_name
278        self.script_file = script_file
[7519]279
280        # choose most accurate timer to use (time.clock has finer
281        # granularity than time.time on windows, but shouldn't be used
282        # on other systems)
[7478]283        if sys.platform.startswith('win'):
284            self.default_timer = time.clock
285        else:
286            self.default_timer = time.time
[7519]287
288
[7478]289    def run(self):
290        elapsed = 0
[7519]291
[7478]292        if self.script_file.lower().endswith('.py'):
293            module_name = self.script_file.replace('.py', '')
294        else:
[7519]295            sys.stderr.write(
296                'ERROR: scripts must have .py extension. can not run '
297                'test script: %s.  aborting user group: %s\n' % (
298                    self.script_file, self.user_group_name))
[7478]299            return
300        try:
301            trans = eval(module_name + '.Transaction()')
302        except NameError, e:
[7519]303            sys.stderr.write(
304                'ERROR: can not find test script: %s.  aborting user '
305                'group: %s\n' % (self.script_file, self.user_group_name))
[7478]306            return
307        except Exception, e:
[7519]308            sys.stderr.write(
309                'ERROR: failed initializing Transaction: %s.  aborting '
310                'user group: %s\n' % (self.script_file, self.user_group_name))
[7478]311            return
[7519]312
[7478]313        trans.custom_timers = {}
[7519]314
315        # scripts have access to these vars, which can be useful for
316        # loading unique data
[7478]317        trans.thread_num = self.thread_num
318        trans.process_num = self.process_num
[7519]319
[7478]320        while elapsed < self.run_time:
321            error = ''
[7519]322            start = self.default_timer()
323
[7478]324            try:
325                trans.run()
[7519]326            except Exception, e:  # test runner catches all script
327                                  # exceptions here
[7478]328                error = str(e).replace(',', '')
329
330            finish = self.default_timer()
[7519]331
[7478]332            scriptrun_time = finish - start
[7519]333            elapsed = time.time() - self.start_time
[7478]334
335            epoch = time.mktime(time.localtime())
[7519]336
337            fields = (elapsed, epoch, self.user_group_name,
338                      scriptrun_time, error, trans.custom_timers)
[7478]339            self.queue.put(fields)
340
341
[7519]342
[7478]343class ResultsWriter(threading.Thread):
344    def __init__(self, queue, output_dir, console_logging):
345        threading.Thread.__init__(self)
346        self.queue = queue
347        self.console_logging = console_logging
348        self.output_dir = output_dir
349        self.trans_count = 0
350        self.timer_count = 0
351        self.error_count = 0
[7519]352
[7478]353        try:
354            os.makedirs(self.output_dir, 0755)
355        except OSError:
356            sys.stderr.write('ERROR: Can not create output directory\n')
[7519]357            sys.exit(1)
358
[7478]359    def run(self):
[7519]360        with open(self.output_dir + 'results.csv', 'w') as f:
[7478]361            while True:
362                try:
[7519]363                    (elapsed, epoch, self.user_group_name, scriptrun_time,
364                    error, custom_timers) = self.queue.get(False)
[7478]365                    self.trans_count += 1
366                    self.timer_count += len(custom_timers)
367                    if error != '':
368                        self.error_count += 1
[7519]369                    f.write('%i,%.3f,%i,%s,%f,%s,%s\n' % (
370                            self.trans_count, elapsed, epoch,
371                            self.user_group_name, scriptrun_time, error,
372                            repr(custom_timers)))
[7478]373                    f.flush()
374                    if self.console_logging:
[7519]375                        print '%i, %.3f, %i, %s, %.3f, %s, %s' % (
376                            self.trans_count, elapsed, epoch,
377                            self.user_group_name, scriptrun_time, error,
378                            repr(custom_timers))
[7478]379                except Queue.Empty:
380                    time.sleep(.05)
381
382if __name__ == '__main__':
383    main()
384
Note: See TracBrowser for help on using the repository browser.