Jmeter之创建Kafka生产者和消费者进行性能测试
目录
1. A Brief Overview of Apache Kafka
2. Pepper-Box Serialized Config
3. Pepper Box Kafka Sampler
4. 配置消费者Consumer
5. 在JMeter中构建负载测试Apache Kafka场景
6. 运行脚本并查看View Results Tree.
最近消息队列要换成Kafka,作为一个测试人员,应工作要求,需要对Kafka进行性能测试,那么开干吧,了解它,测试它。在这里把自己学习和使用的一些经验记录在本文中,研究如何去使用Apache JMeter测试Kafka。
首先,先来了解一下什么是Kafka。
1. A Brief Overview of Apache Kafka
在一个大型的分布式系统中,通常有很多服务生成不同的事件:日志、监视数据、可疑用户操作等等。在Kafka中,这些被称为生产者Producer。另一方面,有些服务需要生成的数据,这些被称为消费者Consumer。
Kafka解决了这些服务之间的交互问题,它位于生产者和消费者之间,从生产者收集数据,将它们存储在主题的分布式存储库中,并通过订阅向每个消费者提供数据。Kafka作为一个由一个或多个服务器组成的集群启动,每个服务器都称为代理。
换句话说,Kafka是分布式数据库和消息队列的混合体。它以其特性而广为人知,被许多大公司用来处理万亿字节的信息。例如,在LinkedIn中,Apache Kafka用于传输用户活动的数据,Netflix则用于下游系统的数据收集和缓冲,如Elasticsearch、Amazon EMR、Mantis等。
让我们看看Kafka的一些特性,它们对于负载测试非常重要:
- 通过顺序I/O的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。
- 高吞吐量:即使是非常普通的硬件,Kafka也可以支持每秒数百万的消息。
- 支持通过Kafka服务器和消费机集群来分区消息。
- 支持Hadoop并行数据加载。
通常,Kafka用于处理大量数据。因此,压力测试要注意以下几个方面:
- 不断地将数据写入磁盘将影响服务器的容量。如果不足,将达到拒绝服务状态。
- 此外,sections分布和broker的数量也影响服务能力的使用。例如,代理可能根本没有足够的资源来处理数据流。因此,生产者Producer将耗尽用于存储消息的本地缓冲区,并且部分消息可能会丢失。
- 当使用复制功能时,一切都变得更加复杂。这是因为它的维护需要更多的资源,而代理拒绝接收消息的情况变得更加可能。
- 处理量如此之大的数据很容易丢失,即使大多数过程是自动化的。因此,对这些服务的测试非常重要,并且必须能够生成适当的负载。
关于Kafka的性能测试,Jmeter是有相应插件的,也就是Pepper-Box插件。我们把这个插件中的元素作为生产者Producer,它有一个比kafkameter更方便的接口来处理消息生成,然后我们自己去实现消费者Consumer。由于没有插件提供Consumer实现,我们将使用JSR223 Sampler 来实现。
利用Pepper-Box插件配置生产者Producer
要安装这个插件,您需要编译这个 源代码 或 下载jar包,然后将其放入JMeter文件夹下面的lib/ext目录下,重新启动JMeter,正确添加了jar包的话,用JMeter命令行打开会有对应的显示,如下图所示:
Pepper-Box插件有3个元素:
1.Pepper-Box PlainText Config 允许根据指定模板生成文本消息。
2.Pepper-Box Serialized Config 允许生成序列化java对象的消息。
3.Pepper-Box Kafka Sampler 设计用于发送由以前的元素构建的消息。
Pepper-Box PlainText Config
按照“Thread Group”->“Add”->“Config Elements”->“Pepper-Box PlainText Config”添加该元素:
如上图所示,元素有两个字段:
Message Placeholder Key - 需要在Pepper-Box Kafka Sampler中指定才能使用此元素中的模板的键。
Schema Template - 一个可以使用JMeter变量和函数,以及插件函数的消息模板,消息的结构可以是任何东西,Text、JSON或XML。
例如,在上面的屏幕截图中,我们使用几个插件函数将JSON字符串作为消息传递:指定消息编号messageId、 指定标识符messageBody 和发送时间戳messageTime。
2. Pepper-Box Serialized Config
按照“Thread Group”->“Add”->“Config Elements”->“Pepper-Box Serialized Config”添加该元素:
如上图所示,它有一个键字段Message Placeholder Key和一个类名字段Class Name,用于指定Java类。要注意的是,这里的键字段一定要与后面Sampler里面的Message Placeholder Key的值一致。把带有类的jar文件必须放在lib/ext文件夹中,指定后,具有其属性的字段将显示在下面,您可以为它们指定所需的值。我们重复了来自最后一个元素的消息,但这次它将是一个Java对象。
3. Pepper Box Kafka Sampler
按照“Thread Group”->“Add”->“Sampler”->“Java Request”的方式添加Java Request,然后从下拉列表中选择com.gslab.pepper.sampler.PepperBoxKafkaSampler。
设置项的解释意义如下:
bootstrap.servers/zookeeper.servers
kafka.topic.name
- 消息发布的主题的名称。
key.serializer
- 密钥序列化的类。如果消息中没有密钥,请保持不变。
value.serializer
- 用于消息序列化的类。对于简单文本,字段保持不变。使用Pepper-Box序列化配置时,需要指定“com.gslab.Pepper.input.Serialized.ObjectSerializer”。
compression.type
- 消息压缩的一种类型(none/gzip/snappy/lz4)
batch.size
- 最大的消息大小。
linger.ms
- 消息等待时间。
buffer.memory
- 生产商的缓冲区大小。
acks
- 服务质量(-1/0/1-不保证传递/消息一定会传递/消息只传递一次)。
receive.buffer.bytes/send.buffer.bytes
- TCP发送/接收缓冲区的大小。-1-使用默认OS值。
protocol
- 加密协议(明文/SSL/SASL_明文/SASL_SSL)。
message.placeholder.key
- 在前面的元素中指定的消息键。
kerberos.auth.enabled、java.security.auth.login.config、java.security.krb5.conf、sasl.kerberos.service.name是负责身份验证的字段组。
此外,如果需要,可以在名称前使用前缀添加其他参数,例如,ssl.key.password。
4. 配置消费者Consumer
现在让我们来谈谈消费者Consumer。虽然在服务器上创建最大负载的是生产者Producer,但服务也必须传递消息。因此,我们还应该增加消费者,以更准确地再现情况。它们还可用于检查是否已发送所有消费者消息。
作为一个例子,让我们使用以下源代码并简要介绍其步骤:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("TestTopic"));
while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s" + "\n", record.offset(), record.key(), record.value());}
}
1.执行连接配置。
2.将指定一个主题“TestTopic”并对其进行订阅。
3.在本主题的循环中接收消息并将其带到控制台。
经过一些修改后的代码将添加到JMeter中的JSR223 Sampler中。
5. 在JMeter中构建负载测试Apache Kafka场景
现在,我们已经研究了创建负载的所有必要元素,让我们尝试将几个消息发布到Kafka服务的主题。假设我们有一个资源,可以从中收集有关其活动的数据。信息将作为XML文档发送。
1.添加Pepper-Box Text Config并创建模板。消息的结构如下:消息编号messageId、UUID、从中收集统计信息的项目ID、统计信息、发送日期戳,消息模板显示如下方截图所示:
2.添加Pepper-Box Kafka Sampler。在其中,指定kafka服务中bootstrap.servers和kafka.topic.name的地址。在我们的例子中,代理的地址是localhost:2181,演示的主题是TestTopic。我们还将从上一步的template元素中指定placeholder.key。
3. 将带有消费者Consumer代码的JSR223 Sampler添加到单独的线程组中。要使其工作,您还需要一个kafka-clients-x.x.x.x.jar文件,其中包含用于使用kafka的类,您可以在kafka目录/kafka/lib中找到这个jar包。接着为了更方便地查看测试数据,修改了脚本的一部分,并将数据保存到一个文件中。此外还添加了设置使用者执行时间所必需的部分,这里设置为5秒,更改后的全部消费者代码如下:
import org.apache.kafka.clients.consumer.*;import javax.script.ScriptException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test1");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("TestTopic"));
long t = System.currentTimeMillis();
long end = t + 5000;
f = new FileOutputStream(".\\data.csv", true);
p = new PrintStream(f);
while (System.currentTimeMillis()<end) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> record : records) {p.println( "offset = " + record.offset() +" value = " + record.value());}consumer.commitSync();
}
consumer.close();
p.close();
f.close();
这个jmx文件的整体结构如下所示,两个线程组同时工作,生产者开始将消息发布到指定的主题,消费者连接到主题并等待来自Kafka的消息。当消费者收到消息时,它会将消息写入文件。
6. 运行脚本并查看View Results Tree.
从上面的截图可以看到,已经发送了10条消息,你可以在jmeter的bin目录下找到“data.csv”这个文件并查看接收到的信息。这样设置完成之后,只需要调整消费者和生产者的数量来增加负荷。
我把生产者的线程调整成300个,最终的聚合报告截图如下:
值得提醒的是,Apache Kafka是为大量连接而设计的,因此您可以简单地达到网络负载生成器的容量限制。在这种情况下,JMeter保持了分布式测试的特性。以上就是如何使用JMeter加载测试Apache Kafka的全部学习记录,附上我的jmx文件地址:kafka_pepper_box.jmx_-kafka文档类资源-CSDN下载。如果pepper-box-1.0.jar包无法下载,请转至该地址进行下载:pepper-box-1.0.jar_-互联网文档类资源-CSDN下载。
参考文章:
1. 如何使用Jmeter对Kafka进行性能测试_shan286的专栏-CSDN博客_jmeter kafka
2. https://www.blazemeter.com/blog/apache-kafka-how-to-load-test-with-jmeter
Jmeter之创建Kafka生产者和消费者进行性能测试相关推荐
- 深入分析Kafka生产者和消费者
深入Kafka生产者和消费者 Kafka生产者 消息发送的流程 发送方式 发送并忘记 同步发送 异步发送 生产者属性配置 序列化器 分区器 自定义分区器 Kafka消费者 消费者属性配置 消费者基础概 ...
- Kafka生产者与消费者详解
什么是 Kafka Kafka 是由 Linkedin 公司开发的,它是一个分布式的,支持多分区.多副本,基于 Zookeeper 的分布式消息流平台,它同时也是一款开源的基于发布订阅模式的消息引擎系 ...
- Kafka 生产者、消费者命令行操作
Kafka 生产者.消费者命令行操作 1.查看操作生产者命令参数 bin/kafka-console-producer.sh 参数 --bootstrap-server <String: ser ...
- kafka生产者和消费者端的数据不一致
撸了今年阿里.头条和美团的面试,我有一个重要发现.......>>> kafka生产者生产30条数据,而消费者却不一定消费了30条数据,经过探索发现了main线程执行完成了而kafk ...
- java调用kafka接口发送数据_Java调用Kafka生产者,消费者Api及相关配置说明
本次的记录内容包括: 1.Java调用生产者APi流程 2.Kafka生产者Api的使用及说明 3.Kafka消费者Api的使用及说明 4.Kafka消费者自动提交Offset和手动提交Offset ...
- java最简单的kafka生产者和消费者,未结合spring
目录 1 最简单的生产者和消费者 1.1 引入maven 1.2 基本的生产者和代码注释 1.3 最简单消费者 2 生产者发送消息的三种方式 2.1 直接send之后就不管了,会自动重试,可能丢失消息 ...
- Kafka 生产者及消费者详解
一.Kafka 生产者 1.1 分区策略 1)分区的原因 (1)方便在集群中扩展,每个Partition可以通过调整以适应它所在的机器,而一个topic又可以有多个Partition组成,因此整个集群 ...
- pykafka连接重要使用pykafka,kafka-python的api开发kafka生产者和消费者
https://pykafka.readthedocs.io/en/latest/api/producer.html 说明文档 </div><h2 class="heade ...
- kafka生产者、消费者java示例
1. 生产者 import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.Ke ...
最新文章
- 从源码分析DEARGUI之add_drawing
- 2.STM32中对Key_GPIO_Config()函数的理解(自定义)之轮询控制按键LED
- 【CyberSecurityLearning 32】Apache配置、Apache的访问控制设定、LAMP平台的搭建
- android 单元测试 多线程,单元测试多线程Android RxJava
- Apollo进阶课程 ⑦ | 高精地图的采集与生产
- PHP5时间相差八小时问题[三种方法]
- LeetCode-MySQL196. 删除重复的电子邮箱
- 高性能队列--Disruptor
- [CTO札记]电纸书,将成为教学、阅读潮流
- echarts 引用地图的json
- 浅谈SQL注入的四种防御方法
- xvid开放源码xvidcore-1.1.3.zip在VC下成功编译的方法
- vscode中文乱码
- 怎样用Python自制好看的指数估值图
- 随机出现“No result defined for action ....Action and result input”解决
- php 月份查询生日_PHP判断日期(生日)格式是否正确合法的方法
- 《战双帕弥什》的动作打击感是怎么做出来的
- 【2021-09-22 修订】【梳理】计算机网络:自顶向下方法 第二章 应用层(docx)
- FAST迅捷路由器设置
- 赣州旅游职业学校学计算机,赣州旅游职业学校是公办的吗
热门文章
- chromedp网络监听_动态爬虫三:监听网络事件 + 监听js事件
- 皮一皮:可怜的西瓜...
- 明明有了 promise ,为啥还需要 async await ?
- 如何正确的创建和销毁Java对象
- Mysql 都会遭受哪些方面的攻击?
- 【实用】面对枯燥的源码,如何才能看得下去?
- 最好用的 IntelliJ 插件 Top 10
- java输入字符数组_JAVA中怎样把用户输入的字符串存入数组中?
- you should specify the `steps` argument
- vs2017 release模式断点调试