locust压测环境描述

本文环境python3.5.2
locust版本0.9.0

locust测试信息输出与分布式模式

本文将主要分析两个方面的内容:

1.locust在启动运行在跑测试用例的时候,会将相关的测试信息输出

2.locust支持分布式测试,已达到可以通过多机器来达到高并发的测试。

下文将会对这两个内容进行分析。

locust测试时的相关测试信息

在locust启动流程的分析过程中,在main函数的执行过程中,有如下代码:

if not options.only_summary and (options.print_stats or (options.no_web and not options.slave)):# spawn stats printing greenletgevent.spawn(stats_printer)                             # 控制台输出相关信息

主要通过stats_printer将相关的运行信息输出,我们就从此处开始分析用例的执行流程;

def stats_printer():from . import runnerswhile True:print_stats(runners.locust_runner.request_stats)            # 首先获取runners.locust_runner对应的此刻的locust实例并获取request_stats属性值gevent.sleep(CONSOLE_STATS_INTERVAL_SEC)                    # 打印完成后,睡眠2秒

首先分析一下print_stats函数;

def print_stats(stats):console_logger.info((" %-" + str(STATS_NAME_WIDTH) + "s %7s %12s %7s %7s %7s  | %7s %7s") % ('Name', '# reqs', '# fails', 'Avg', 'Min', 'Max', 'Median', 'req/s'))   # 打印头部信息console_logger.info("-" * (80 + STATS_NAME_WIDTH))                                                                                                                   # 打印横线total_rps = 0total_reqs = 0total_failures = 0for key in sorted(six.iterkeys(stats)):                                     # 遍历所有的状态属性值r = stats[key]total_rps += r.current_rps                                              # 获取当前值total_reqs += r.num_requeststotal_failures += r.num_failuresconsole_logger.info(r)console_logger.info("-" * (80 + STATS_NAME_WIDTH))                          # 打印横线try:fail_percent = (total_failures/float(total_reqs))*100                   # 计算失败比率except ZeroDivisionError:fail_percent = 0console_logger.info((" %-" + str(STATS_NAME_WIDTH) + "s %7d %12s %42.2f") % ('Total', total_reqs, "%d(%.2f%%)" % (total_failures, fail_percent), total_rps))        # 打印相关计算出来的信息console_logger.info("")

该函数的主要作用将就是先计算出相关比率,总的执行错误数等相关信息,然后按照表格的信息输出到控制台,其主要的信息数据是来源于输入的参数stats值,而该值就是runners.locust_runner.request_stats,查看LocustRunner类的初始化过程可知,stats为global_stats,

global_stats = RequestStats()

此时我们查看RequestStats的相关信息:

class RequestStats(object):def __init__(self):self.entries = {}                                                                   # state输出的信息self.errors = {}self.total = StatsEntry(self, "Total", None, use_response_times_cache=True)         # 定义了total数量值self.start_time = None                                                              # 定义开始时间@propertydef num_requests(self):return self.total.num_requests                                                      # 获取总共的请求数@propertydef num_failures(self):return self.total.num_failures                                                      # 获取总共失败的次数@propertydef last_request_timestamp(self):return self.total.last_request_timestamp                                            # 获取最后一次请求的时间戳def log_request(self, method, name, response_time, content_length):self.total.log(response_time, content_length)                                       # 通过total记录总的数量self.get(name, method).log(response_time, content_length)                           # 通过name 与method 定义一个StatsEntry来记录数据def log_error(self, method, name, error):self.total.log_error(error)                                                         # 通过total记录总的错误数量self.get(name, method).log_error(error)                                             # 通过 name 与method组合的一个StatsEntry来记录数据# store error in errors dictkey = StatsError.create_key(method, name, error)                                    # 生成一个错误的keyentry = self.errors.get(key)                                                        # 先从errors中去获取该keyif not entry:                                                                       # 如果没有获取到entry = StatsError(method, name, error)                                         # 生成一个错误记录的StatsError实例self.errors[key] = entry                                                        # 保存该实例entry.occured()                                                                     # 记录该错误def get(self, name, method):"""Retrieve a StatsEntry instance by name and method"""entry = self.entries.get((name, method))                                            # 先从enteries字典中获取是否有以name method为key的StatsEntryif not entry:entry = StatsEntry(self, name, method)                                          # 如果没有则实例化一个并保存到enteries中self.entries[(name, method)] = entryreturn entry                                                                        # 返回该enterydef reset_all(self):"""Go through all stats entries and reset them to zero"""self.start_time = time.time()                                                       # 重置当前开始时间self.total.reset()                                                                  # 重新置零 从新开始记录for r in six.itervalues(self.entries):                                              # 遍历所有的statsentery然后并重置所有数据r.reset()def clear_all(self):"""Remove all stats entries and errors"""self.total = StatsEntry(self, "Total", None, use_response_times_cache=True)         # 重新生成totalself.entries = {}                                                                   # 全部重新初始化值self.errors = {}self.start_time = Nonedef serialize_stats(self):return [self.entries[key].get_stripped_report() for key in six.iterkeys(self.entries) if not (self.entries[key].num_requests == 0 and self.entries[key].num_failures == 0)] # 初始化输出信息def serialize_errors(self):return dict([(k, e.to_dict()) for k, e in six.iteritems(self.errors)])              # 生成错误信息的字典值

此时里面主要的记录主要依靠StatsEntry类来实现,查看该类的代码;

class StatsEntry(object):"""Represents a single stats entry (name and method)"""name = None""" Name (URL) of this stats entry """method = None""" Method (GET, POST, PUT, etc.) """num_requests = None""" The number of requests made """num_failures = None""" Number of failed request """total_response_time = None""" Total sum of the response times """min_response_time = None""" Minimum response time """max_response_time = None""" Maximum response time """num_reqs_per_sec = None""" A {second => request_count} dict that holds the number of requests made per second """response_times = None"""A {response_time => count} dict that holds the response time distribution of allthe requests.The keys (the response time in ms) are rounded to store 1, 2, ... 9, 10, 20. .. 90, 100, 200 .. 900, 1000, 2000 ... 9000, in order to save memory.This dict is used to calculate the median and percentile response times."""use_response_times_cache = False"""If set to True, the copy of the response_time dict will be stored in response_times_cache every second, and kept for 20 seconds (by default, will be CURRENT_RESPONSE_TIME_PERCENTILE_WINDOW + 10). We can use this dict to calculate the *current*  median response time, as well as other response time percentiles."""response_times_cache = None"""If use_response_times_cache is set to True, this will be a {timestamp => CachedResponseTimes()} OrderedDict that holds a copy of the response_times dict for each of the last 20 seconds."""total_content_length = None""" The sum of the content length of all the requests for this entry """start_time = None""" Time of the first request for this entry """last_request_timestamp = None""" Time of the last request for this entry """def __init__(self, stats, name, method, use_response_times_cache=False):self.stats = stats                                                      # 保存传入的statsself.name = name                                                        # 名称self.method = method                                                    # 方法self.use_response_times_cache = use_response_times_cache                self.reset()                                                            # 重新初始化def reset(self):self.start_time = time.time()                                           # 重新设置开始时间self.num_requests = 0self.num_failures = 0self.total_response_time = 0self.response_times = {}self.min_response_time = Noneself.max_response_time = 0self.last_request_timestamp = int(time.time())self.num_reqs_per_sec = {}self.total_content_length = 0if self.use_response_times_cache:self.response_times_cache = OrderedDict()self._cache_response_times(int(time.time()))def log(self, response_time, content_length):# get the timet = int(time.time())                                                        # 获取当前时间if self.use_response_times_cache and self.last_request_timestamp and t > self.last_request_timestamp:# see if we shall make a copy of the respone_times dict and store in the cacheself._cache_response_times(t-1)                                         self.num_requests += 1                                                      # 请求加1self._log_time_of_request(t)                                                # 记录当前时间self._log_response_time(response_time)                                      # 记录响应时间# increase total content-lengthself.total_content_length += content_length                                 # 记录总的响应数据的大小def _log_time_of_request(self, t):self.num_reqs_per_sec[t] = self.num_reqs_per_sec.setdefault(t, 0) + 1       # 保存最新的时间self.last_request_timestamp = t                                             # 获取最新的请求的时间戳def _log_response_time(self, response_time):self.total_response_time += response_time                                   # 增加总的记录时间if self.min_response_time is None:self.min_response_time = response_timeself.min_response_time = min(self.min_response_time, response_time)         # 比较获取最小的响应时间self.max_response_time = max(self.max_response_time, response_time)         # 比较获取最大的响应时间# to avoid to much data that has to be transfered to the master node when# running in distributed mode, we save the response time rounded in a dict# so that 147 becomes 150, 3432 becomes 3400 and 58760 becomes 59000if response_time < 100:                                                     # 如果小于一百rounded_response_time = response_time                                   # 则直接赋值elif response_time < 1000:                                                  # 如果大于100小于1000rounded_response_time = int(round(response_time, -1))                   # 倒数第一位四舍五入elif response_time < 10000:rounded_response_time = int(round(response_time, -2))else:rounded_response_time = int(round(response_time, -3))                   # 倒数第三位四舍五入# increase request count for the rounded key in response time dictself.response_times.setdefault(rounded_response_time, 0)                    # 设置时间self.response_times[rounded_response_time] += 1                             def log_error(self, error):self.num_failures += 1                                                      # 失败次数加1@propertydef fail_ratio(self):try:return float(self.num_failures) / (self.num_requests + self.num_failures)  # 计算错误的比率except ZeroDivisionError:if self.num_failures > 0:return 1.0else:return 0.0@propertydef avg_response_time(self):try:return float(self.total_response_time) / self.num_requests                 # 计算平均响应时间except ZeroDivisionError:return 0@propertydef median_response_time(self):if not self.response_times:return 0return median_from_dict(self.num_requests, self.response_times)                 # 求出每个请求平均花了多久@propertydef current_rps(self):if self.stats.last_request_timestamp is None:return 0slice_start_time = max(self.stats.last_request_timestamp - 12, int(self.stats.start_time or 0))     # 获取总共的运行时间reqs = [self.num_reqs_per_sec.get(t, 0) for t in range(slice_start_time, self.stats.last_request_timestamp-2)]  # 获取总的运行时间的请求的列表return avg(reqs)                                                                                                # 求平均值每秒多少个请求            @propertydef total_rps(self):if not self.stats.last_request_timestamp or not self.stats.start_time:return 0.0return self.num_requests / max(self.stats.last_request_timestamp - self.stats.start_time, 1)                    # 求出每个请求的平均处理时间@propertydef avg_content_length(self):try:return self.total_content_length / self.num_requests                                                        # 求出平均每个请求多大except ZeroDivisionError:return 0def extend(self, other):"""Extend the data from the current StatsEntry with the stats from anotherStatsEntry instance. """self.last_request_timestamp = max(self.last_request_timestamp, other.last_request_timestamp)            # 获取最新的运行的时间值self.start_time = min(self.start_time, other.start_time)                                                # 获取最小的开始时间self.num_requests = self.num_requests + other.num_requests                                              # 获取总的请求数self.num_failures = self.num_failures + other.num_failures                                              # 获取总的失败请求数self.total_response_time = self.total_response_time + other.total_response_time                         # 获取总的响应时间self.max_response_time = max(self.max_response_time, other.max_response_time)                           # 获取最大的响应时间self.min_response_time = min(self.min_response_time or 0, other.min_response_time or 0) or other.min_response_time  # 获取最小的响应时间self.total_content_length = self.total_content_length + other.total_content_length                      # 获取总的响应请求的大小for key in other.response_times:self.response_times[key] = self.response_times.get(key, 0) + other.response_times[key]              # 获取每个请求的并计算总的响应时间for key in other.num_reqs_per_sec:self.num_reqs_per_sec[key] = self.num_reqs_per_sec.get(key, 0) +  other.num_reqs_per_sec[key]def serialize(self):return {"name": self.name,"method": self.method,"last_request_timestamp": self.last_request_timestamp,"start_time": self.start_time,"num_requests": self.num_requests,"num_failures": self.num_failures,"total_response_time": self.total_response_time,"max_response_time": self.max_response_time,"min_response_time": self.min_response_time,"total_content_length": self.total_content_length,"response_times": self.response_times,"num_reqs_per_sec": self.num_reqs_per_sec,}                                                                               # 序列化所有值@classmethoddef unserialize(cls, data):obj = cls(None, data["name"], data["method"])                                   # 反序列化 生成该类实例for key in ["last_request_timestamp","start_time","num_requests","num_failures","total_response_time","max_response_time","min_response_time","total_content_length","response_times","num_reqs_per_sec",]:setattr(obj, key, data[key])                                                # 给该类设置相关属性值return obj                                                                      # 返回初始化的实例def get_stripped_report(self):"""Return the serialized version of this StatsEntry, and then clear the current stats."""report = self.serialize()                                                       # 获取序列化的数据self.reset()                                                                    # 重新初始化return report                                                                   # 返回当前属性值def __str__(self):try:fail_percent = (self.num_failures/float(self.num_requests + self.num_failures))*100except ZeroDivisionError:fail_percent = 0return (" %-" + str(STATS_NAME_WIDTH) + "s %7d %12s %7d %7d %7d  | %7d %7.2f") % ((self.method and self.method + " " or "") + self.name,self.num_requests,"%d(%.2f%%)" % (self.num_failures, fail_percent),self.avg_response_time,self.min_response_time or 0,self.max_response_time,self.median_response_time or 0,self.current_rps or 0)                                                                               # 打印Object时输出的信息

通过StatsEntry类的定义可知,相关信息的记录与计算都是通过该类来实现,计算平均每个请求的耗时,计算每个请求多大的返回大小等参数都是通过该类来完成。

整体的记录的体系大概知道后,请求的信息是如何记录的呢?

"""
A global instance for holding the statistics. Should be removed eventually.
"""def on_request_success(request_type, name, response_time, response_length, **kwargs):global_stats.log_request(request_type, name, response_time, response_length)            # 记录访问成功的请求def on_request_failure(request_type, name, response_time, exception, **kwargs):global_stats.log_request(request_type, name, response_time, 0)                          # 记录访问请求global_stats.log_error(request_type, name, exception)                                   # 记录该错误的请求events.request_success += on_request_success                        # 添加到events.request_success中
events.request_failure += on_request_failure                        # 添加到events.request_failure中

由该段代码可知,只需要调用events.request_success.fire()函数就可以记录该请求数据,此时我们回看http执行过程中的如下代码:

        try:response.raise_for_status()except RequestException as e:events.request_failure.fire(request_type=request_meta["method"], name=request_meta["name"], response_time=request_meta["response_time"], exception=e, )else:events.request_success.fire(request_type=request_meta["method"],name=request_meta["name"],response_time=request_meta["response_time"],response_length=request_meta["content_size"],)return response

在通过requests.Session发送请求之后,如果访问失败则调用events.request_failure来记录失败的请求,否则就调动events.request_success来记录成功的请求,至此相关执行过程中的数据信息的大致执行流程就分析完成,大家如有兴趣可翻看源码查看更多有关记录的相关信息。

分布式模式的流程

因为locust支持分布式的测试,在此我们简要分析一下分布式的大致流程,首先根据文档中的分布式的启动的命令,通过不同的输入参数而启动不同的模式,在main函数的执行过程中有如下代码;

elif options.master:runners.locust_runner = MasterLocustRunner(locust_classes, options)if options.no_web:while len(runners.locust_runner.clients.ready)<options.expect_slaves:logging.info("Waiting for slaves to be ready, %s of %s connected",len(runners.locust_runner.clients.ready), options.expect_slaves)time.sleep(1)runners.locust_runner.start_hatching(options.num_clients, options.hatch_rate)main_greenlet = runners.locust_runner.greenletif options.run_time:spawn_run_time_limit_greenlet()
elif options.slave:if options.run_time:logger.error("--run-time should be specified on the master node, and not on slave nodes")sys.exit(1)try:runners.locust_runner = SlaveLocustRunner(locust_classes, options)main_greenlet = runners.locust_runner.greenletexcept socket.error as e:logger.error("Failed to connect to the Locust master: %s", e)sys.exit(-1)

有代码可知,通过传入的不同的参数而运行在不同的模式之下,我们首先分析一下master模式。

master模式的启动

从代码可知,主从模式不过是初始化了不同的Locust类,master初始化的是MasterLocustRunner类,查看该类的代码;

class DistributedLocustRunner(LocustRunner):def __init__(self, locust_classes, options):super(DistributedLocustRunner, self).__init__(locust_classes, options)self.master_host = options.master_host                                      # 初始化master的域名self.master_port = options.master_port                                      # 初始化master的端口self.master_bind_host = options.master_bind_host                            # 初始化master绑定的域名self.master_bind_port = options.master_bind_port                            # 初始化master绑定的端口self.heartbeat_liveness = options.heartbeat_liveness                        # 获取心跳检查的次数 默认为3self.heartbeat_interval = options.heartbeat_interval                        # 获取心跳的间隔时间 默认为1def noop(self, *args, **kwargs):""" Used to link() greenlets to in order to be compatible with gevent 1.0 """passclass SlaveNode(object):def __init__(self, id, state=STATE_INIT, heartbeat_liveness=3):self.id = id self.state = state                                                  # 初始化节点的状态self.user_count = 0self.heartbeat = heartbeat_liveness                                 # 初始化心跳检查的次数class MasterLocustRunner(DistributedLocustRunner):def __init__(self, *args, **kwargs):super(MasterLocustRunner, self).__init__(*args, **kwargs)class SlaveNodesDict(dict):def get_by_state(self, state):return [c for c in six.itervalues(self) if c.state == state]@propertydef all(self):return six.itervalues(self)@propertydef ready(self):return self.get_by_state(STATE_INIT)@propertydef hatching(self):return self.get_by_state(STATE_HATCHING)@propertydef running(self):return self.get_by_state(STATE_RUNNING)self.clients = SlaveNodesDict()                                                 # 初始化一个从节点的字典self.server = rpc.Server(self.master_bind_host, self.master_bind_port)          # 启动rpc服务端self.greenlet = Group()                 self.greenlet.spawn(self.heartbeat_worker).link_exception(callback=self.noop)   # 启动心动检查self.greenlet.spawn(self.client_listener).link_exception(callback=self.noop)    # 启动客户端监听# listener that gathers info on how many locust users the slaves has spawneddef on_slave_report(client_id, data):if client_id not in self.clients:logger.info("Discarded report from unrecognized slave %s", client_id)returnself.clients[client_id].user_count = data["user_count"]events.slave_report += on_slave_report                                          # 记录从节点的报告# register listener that sends quit message to slave nodesdef on_quitting():self.quit()events.quitting += on_quitting                                                  # 记录离开函数@propertydef user_count(self):return sum([c.user_count for c in six.itervalues(self.clients)])                # 获取所有的并发的用户数def start_hatching(self, locust_count, hatch_rate):num_slaves = len(self.clients.ready) + len(self.clients.running) + len(self.clients.hatching)  # 获取所有的从节点数量if not num_slaves:logger.warning("You are running in distributed mode but have no slave servers connected. ""Please connect slaves prior to swarming.")returnself.num_clients = locust_count                                                 # 获取客户端数量slave_num_clients = locust_count // (num_slaves or 1)                           # 计算从节点的数量slave_hatch_rate = float(hatch_rate) / (num_slaves or 1)                        # 计算启动速率remaining = locust_count % num_slaves                                           # 计算还有多少启动logger.info("Sending hatch jobs to %d ready clients", num_slaves)if self.state != STATE_RUNNING and self.state != STATE_HATCHING:                # 如果既不是运行状态并且也不是启动状态self.stats.clear_all()                                                      # 重新初始化self.exceptions = {}events.master_start_hatching.fire() for client in (self.clients.ready + self.clients.running + self.clients.hatching):  # 遍历所有的客户端data = {"hatch_rate":slave_hatch_rate,"num_clients":slave_num_clients,"host":self.host,"stop_timeout":None}if remaining > 0:data["num_clients"] += 1remaining -= 1self.server.send_to_client(Message("hatch", data, client.id))               # 向客户端发送启动的命令参数self.stats.start_time = time()                                                  # 记录启动时间self.state = STATE_HATCHING                                                     # 修改状态为运行态def stop(self):for client in self.clients.all:                                                 # 停止的话就遍历所有的客户端self.server.send_to_client(Message("stop", None, client.id))                # 向客户端发送停止命令events.master_stop_hatching.fire()                                              # master停止def quit(self):for client in self.clients.all:                                                 # 遍历所有客户端发送退出指令self.server.send_to_client(Message("quit", None, client.id))            self.greenlet.kill(block=True)                                                  # master退出def heartbeat_worker(self):while True:                                                                     # 死循环gevent.sleep(self.heartbeat_interval)                                       # 休眠指定时间默认1秒for client in self.clients.all:                                             # 遍历所有客户端if client.heartbeat < 0 and client.state != STATE_MISSING:              # 如果客户端的hearbeat小于0并且客户端状态不为丢失状态则设置为丢失logger.info('Slave %s failed to send heartbeat, setting state to missing.' % str(client.id))client.state = STATE_MISSINGelse:client.heartbeat -= 1                                               # heartbeat减一def client_listener(self):while True:client_id, msg = self.server.recv_from_client()                             # 从客户端接受请求msg.node_id = client_id                                                     # 设置客户端idif msg.type == "client_ready":                                              # 判断消息类型id = msg.node_idself.clients[id] = SlaveNode(id, heartbeat_liveness=self.heartbeat_liveness)    # 如果客户端准备完成则服务端生成一个SlaveNode实例与之对应logger.info("Client %r reported as ready. Currently %i clients ready to swarm." % (id, len(self.clients.ready)))## emit a warning if the slave's clock seem to be out of sync with our clock#if abs(time() - msg.data["time"]) > 5.0:#    warnings.warn("The slave node's clock seem to be out of sync. For the statistics to be correct the different locust servers need to have synchronized clocks.")elif msg.type == "client_stopped":                                          # 如果客户端停止del self.clients[msg.node_id]                                           # 则删除该节点logger.info("Removing %s client from running clients" % (msg.node_id))elif msg.type == "heartbeat":                                               # 如果是心跳if msg.node_id in self.clients:                                         # 判断是否在子节点列表中self.clients[msg.node_id].heartbeat = self.heartbeat_liveness       # 重置该node的心跳计数self.clients[msg.node_id].state = msg.data['state']                 # 重置该node的节点状态elif msg.type == "stats":                                                   # 如果是显示节点状态events.slave_report.fire(client_id=msg.node_id, data=msg.data)          # 打印节点信息elif msg.type == "hatching":                                                # 如果是运行状态则更改为运行状态self.clients[msg.node_id].state = STATE_HATCHINGelif msg.type == "hatch_complete":                                          self.clients[msg.node_id].state = STATE_RUNNING                         self.clients[msg.node_id].user_count = msg.data["count"]if len(self.clients.hatching) == 0:count = sum(c.user_count for c in six.itervalues(self.clients))events.hatch_complete.fire(user_count=count)elif msg.type == "quit":                                                    # 如果退出if msg.node_id in self.clients:                                         # 遍历所有子节点并删除del self.clients[msg.node_id]logger.info("Client %r quit. Currently %i clients connected." % (msg.node_id, len(self.clients.ready)))elif msg.type == "exception":                                               # 如果报错则停止self.log_exception(msg.node_id, msg.data["msg"], msg.data["traceback"])if not self.state == STATE_INIT and all(map(lambda x: x.state == STATE_INIT, self.clients.all)): # 如果不 是 状态为初始化中并且所有节点都为初始化状态self.state = STATE_STOPPED                                                               # 状态修改为停止状态@propertydef slave_count(self):return len(self.clients.ready) + len(self.clients.hatching) + len(self.clients.running)

由该类可知,master主要是通过rpc来与slave进行交互,并且通过rpc来进行各种交互,并且在启动的时候master就启动心跳检查,并且监听客户端发送来的数据,此时我们继续分析一下slave的相关状态。

slave模式的执行

slave的代码如下;

class SlaveLocustRunner(DistributedLocustRunner):def __init__(self, *args, **kwargs):super(SlaveLocustRunner, self).__init__(*args, **kwargs)self.client_id = socket.gethostname() + "_" + uuid4().hex                       # 生成一个client_idself.client = rpc.Client(self.master_host, self.master_port, self.client_id)    # 连接master并传入节点的client_idself.greenlet = Group()self.greenlet.spawn(self.heartbeat).link_exception(callback=self.noop)          # 启动心跳函数self.greenlet.spawn(self.worker).link_exception(callback=self.noop)             # 启动工作函数self.client.send(Message("client_ready", None, self.client_id))                 # 向Master发送准备完成信息self.slave_state = STATE_INIT                                                   # 客户端设置状态为初始化self.greenlet.spawn(self.stats_reporter).link_exception(callback=self.noop)     # 启动向master发送状态报告# register listener for when all locust users have hatched, and report it to the master nodedef on_hatch_complete(user_count):self.client.send(Message("hatch_complete", {"count":user_count}, self.client_id))self.slave_state = STATE_RUNNINGevents.hatch_complete += on_hatch_complete# register listener that adds the current number of spawned locusts to the report that is sent to the master node def on_report_to_master(client_id, data):data["user_count"] = self.user_countevents.report_to_master += on_report_to_master# register listener that sends quit message to masterdef on_quitting():self.client.send(Message("quit", None, self.client_id))events.quitting += on_quitting                                                  # 如果退出向master发送退出信息# register listener thats sends locust exceptions to masterdef on_locust_error(locust_instance, exception, tb):formatted_tb = "".join(traceback.format_tb(tb))self.client.send(Message("exception", {"msg" : str(exception), "traceback" : formatted_tb}, self.client_id))events.locust_error += on_locust_errordef heartbeat(self):while True:self.client.send(Message('heartbeat', {'state': self.slave_state}, self.client_id))     # 向master发送心跳信息gevent.sleep(self.heartbeat_interval)                                                   # 休眠一段时间def worker(self):while True:msg = self.client.recv()                                                                # 客户端等待接受数据if msg.type == "hatch":                                                                 # 如果是hatchself.slave_state = STATE_HATCHING                                                   # 修改状态为运行self.client.send(Message("hatching", None, self.client_id))                         # 向master发送hatching数据job = msg.data                                                                      # 获取slave启动的参数信息self.hatch_rate = job["hatch_rate"]                                                 # 获取速率信息#self.num_clients = job["num_clients"]self.host = job["host"]                                                             # 获取Hostself.hatching_greenlet = gevent.spawn(lambda: self.start_hatching(locust_count=job["num_clients"], hatch_rate=job["hatch_rate"]))   # 启动测试elif msg.type == "stop":                                                                # 如果是停止self.stop()                                                                         # 停止self.client.send(Message("client_stopped", None, self.client_id))                   # 向master发送客户端停止命令self.client.send(Message("client_ready", None, self.client_id))                     # 向客户端发送子节点已经准备好self.slave_state = STATE_INIT                                                       # 修改状态为初始化状态elif msg.type == "quit":logger.info("Got quit message from master, shutting down...")self.stop()                                                                         # 如果主节点需要子节点退出self.greenlet.kill(block=True)                                                      # 退出def stats_reporter(self):while True:data = {}events.report_to_master.fire(client_id=self.client_id, data=data)                       # 获取节点信息try:self.client.send(Message("stats", data, self.client_id))                            # 发送给主节点except:logger.error("Connection lost to master server. Aborting...")breakgevent.sleep(SLAVE_REPORT_INTERVAL)                                                     # 休眠指定时间

slave需要实时的想master报告子节点的运行的状态,通过主节点发送的指令进行相关操作,slave从主节点获取启动参数并启动执行,然后休眠一段时间之后就向master发送运行的数据。

至此分布式的基本内容分析完成,主要是通过rpc来进行节点与master之间的通信,并又心跳检测来获取客户端的状态。大家如有兴趣课自行查看相关代码。

总结

本文主要是分析了locust在运行的过程中对相关的数据进行处理的流程,来反应测试的过程中成功率等相关指标,接着分析了分布式相关的实现的方式,主要通过rpc调用来进行主从之间的通信,节点测试的数据汇总到master,已到达分布式的测试的效果。相关内容大家可自行阅读查看。鉴于本人才疏学浅,如有疏漏请批评指正。

locust压测工具:测试信息输出与分布式模式相关推荐

  1. locust压测工具:启动概述

    locust压测工具启动概述 本文环境python3.5.2 locust版本0.9.0 locust概述 locust是一个简单易用.分布式的用户压测工具,并确定系统可以处理多少并发用户.在测试的时 ...

  2. locust压测工具【学习】

    locust压测工具[学习] 1.安装:pip3 install locust 检验版本:locust -V 2.使用脚本: from locust import task, HttpUser, co ...

  3. locust压测工具:http测试过程与定时停止

    locust压测环境描述 本文环境python3.5.2 locust版本0.9.0 locust示例的执行过程 上文大概描述了locust的启动了流程,本文主要是接着上文继续分析,示例代码中的htt ...

  4. 基于python的压测工具_Python Locust性能测试简介及框架实践

    Locust(俗称 蝗虫), 一个轻量级的开源压测工具,用Python编写.使用 Python 代码定义用户行为,也可以仿真百万个用户: Locust 非常简单易用,是分布式,用户负载测试工具.Loc ...

  5. 6. 堪比JMeter的.Net压测工具 - Crank 实战篇 - 收集诊断跟踪信息与如何分析瓶颈

    1. 前言 上面我们已经做到了接口以及场景压测,通过控制台输出结果,我们只需要将结果收集整理下来,最后汇总到excel上,此次压测报告就可以完成了,但收集报告也挺麻烦的,交给谁呢-- 找了一圈.没找到 ...

  6. 压测工具之Locust

    前言   说起压测,我就用过Jmeter,而且仅是简单使用,好用性能强大,最近接触了一个python提供的压测框架Locust,翻译为蝗虫,蝗虫过之,寸草不生,哈哈哈,我感觉很贴切. 首先,我们分析一 ...

  7. MySQL测试工具之-TPCC(业界通用的压测工具)

    TPCC业界通用的压测工具,主要是压数据库性能. 首先安装tpcc 官网地址:https://github.com/Percona-Lab/tpcc-mysql [root@test3 src]# u ...

  8. jmeter,TCPCopy,loadrunner 等测试压测工具使用教程

    2.JMeter环境设置 – Jmeter中文网 Apache JMeter - User's Manual: Getting Started JMeter学习(一)工具简单介绍 - 阳光温暖了心情 ...

  9. Http压测工具wrk使用指南【转】

    用过了很多压测工具,却一直没找到中意的那款.最近试了wrk感觉不错,写下这份使用指南给自己备忘用,如果能帮到你,那也很好. 安装 wrk支持大多数类UNIX系统,不支持windows.需要操作系统支持 ...

最新文章

  1. 【在CSDN创作2021年度总结】2021年的第一场雪,来的比以往更早一些
  2. 文档怎么添加云服务器,如何添加云服务器地址
  3. MyBatis 源码解读-loadCustomLogImpl(settings)
  4. SSM:Spring整合MyBatis框架时出现 java.lang.AbstractMethodError: org.mybatis.spring.transaction.SpringManaged
  5. 云服务器拷贝文件大小,如何从云服务器上拷贝大文件
  6. 湖南省第六届大学生计算机程序设计竞赛---数字整除
  7. mysql truncate很慢_mysql truncate 的问题
  8. Javascript第五章window对象的事件常用方法第三课
  9. android 平板端应用商店,安卓平板电脑软件商店--乐商店是目前最安全的Android应用商店...
  10. 复制文件或文件夹时出错_为什么对于小白来说,复制文件一个都会出错???...
  11. Excel K4宏病毒专杀
  12. Ubuntu 机箱前置音频接口不能用的解决方法
  13. 日常运维-端口查询篇
  14. Word(二) Word2016 如何删除页眉的下划线
  15. 中兴华为继续应诉欧盟无线网卡反倾销
  16. APP爬虫开发环境准备
  17. 重庆大学计算机类专业分数线,重庆大学录取分数线 2019年重庆大学各专业录取分数线...
  18. P3396 哈希冲突 (根号算法)
  19. 行车路线(改)(图的应用)
  20. 制造工厂生产线液晶电子看板显示终端

热门文章

  1. 想提高代码水平,做到这点就够了
  2. 为什么这门技术如此重要?错过这次黄金期,就晚了!
  3. 机器学习中的线性回归,你理解多少?
  4. Python实现五子棋人机对战 | CSDN博文精选
  5. ICLR 2019最佳论文揭晓!NLP深度学习、神经网络压缩夺魁 | 技术头条
  6. 在TensorFlow2.0发布前,帮你掌握TensorFlow的必备内容
  7. 资源 | 这是你要的Keras官方中文版(附文档链接)
  8. 从15000个Python开源项目中精选的Top30,Github平均star为3707,赶紧收藏!
  9. 资源 | 10x Python开发者必读:本月Python文章TOP 10
  10. 重大改变!Python 或将取代 VBA 成为 Excel 官方脚本语言