目录

1. A Brief Overview of Apache Kafka

2. Pepper-Box Serialized Config

3. Pepper Box Kafka Sampler

4. 配置消费者Consumer

5. 在JMeter中构建负载测试Apache Kafka场景

6. 运行脚本并查看View Results Tree.


最近消息队列要换成Kafka,作为一个测试人员,应工作要求,需要对Kafka进行性能测试,那么开干吧,了解它,测试它。在这里把自己学习和使用的一些经验记录在本文中,研究如何去使用Apache JMeter测试Kafka。
首先,先来了解一下什么是Kafka。

1. A Brief Overview of Apache Kafka

在一个大型的分布式系统中,通常有很多服务生成不同的事件:日志、监视数据、可疑用户操作等等。在Kafka中,这些被称为生产者Producer。另一方面,有些服务需要生成的数据,这些被称为消费者Consumer。
Kafka解决了这些服务之间的交互问题,它位于生产者和消费者之间,从生产者收集数据,将它们存储在主题的分布式存储库中,并通过订阅向每个消费者提供数据。Kafka作为一个由一个或多个服务器组成的集群启动,每个服务器都称为代理。
换句话说,Kafka是分布式数据库和消息队列的混合体。它以其特性而广为人知,被许多大公司用来处理万亿字节的信息。例如,在LinkedIn中,Apache Kafka用于传输用户活动的数据,Netflix则用于下游系统的数据收集和缓冲,如Elasticsearch、Amazon EMR、Mantis等。
让我们看看Kafka的一些特性,它们对于负载测试非常重要:

  • 通过顺序I/O的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。
  • 高吞吐量:即使是非常普通的硬件,Kafka也可以支持每秒数百万的消息。
  • 支持通过Kafka服务器和消费机集群来分区消息。
  • 支持Hadoop并行数据加载。

通常,Kafka用于处理大量数据。因此,压力测试要注意以下几个方面:

  1. 不断地将数据写入磁盘将影响服务器的容量。如果不足,将达到拒绝服务状态。
  2. 此外,sections分布和broker的数量也影响服务能力的使用。例如,代理可能根本没有足够的资源来处理数据流。因此,生产者Producer将耗尽用于存储消息的本地缓冲区,并且部分消息可能会丢失。
  3. 当使用复制功能时,一切都变得更加复杂。这是因为它的维护需要更多的资源,而代理拒绝接收消息的情况变得更加可能。
  4. 处理量如此之大的数据很容易丢失,即使大多数过程是自动化的。因此,对这些服务的测试非常重要,并且必须能够生成适当的负载。

关于Kafka的性能测试,Jmeter是有相应插件的,也就是Pepper-Box插件。我们把这个插件中的元素作为生产者Producer,它有一个比kafkameter更方便的接口来处理消息生成,然后我们自己去实现消费者Consumer。由于没有插件提供Consumer实现,我们将使用JSR223 Sampler 来实现。

利用Pepper-Box插件配置生产者Producer
要安装这个插件,您需要编译这个 源代码 或 下载jar包,然后将其放入JMeter文件夹下面的lib/ext目录下,重新启动JMeter,正确添加了jar包的话,用JMeter命令行打开会有对应的显示,如下图所示:

Pepper-Box插件有3个元素:
1.Pepper-Box PlainText Config    允许根据指定模板生成文本消息。
2.Pepper-Box Serialized Config   允许生成序列化java对象的消息。
3.Pepper-Box Kafka Sampler       设计用于发送由以前的元素构建的消息。

Pepper-Box PlainText Config
按照“Thread Group”->“Add”->“Config Elements”->“Pepper-Box PlainText Config”添加该元素:


如上图所示,元素有两个字段:
Message Placeholder Key  -  需要在Pepper-Box Kafka Sampler中指定才能使用此元素中的模板的键。
Schema Template  -  一个可以使用JMeter变量和函数,以及插件函数的消息模板,消息的结构可以是任何东西,Text、JSON或XML。
例如,在上面的屏幕截图中,我们使用几个插件函数将JSON字符串作为消息传递:指定消息编号messageId、 指定标识符messageBody 和发送时间戳messageTime。

2. Pepper-Box Serialized Config

按照“Thread Group”->“Add”->“Config Elements”->“Pepper-Box Serialized Config”添加该元素:

如上图所示,它有一个键字段Message Placeholder Key和一个类名字段Class Name,用于指定Java类。要注意的是,这里的键字段一定要与后面Sampler里面的Message Placeholder Key的值一致。把带有类的jar文件必须放在lib/ext文件夹中,指定后,具有其属性的字段将显示在下面,您可以为它们指定所需的值。我们重复了来自最后一个元素的消息,但这次它将是一个Java对象。

3. Pepper Box Kafka Sampler

按照“Thread Group”->“Add”->“Sampler”->“Java Request”的方式添加Java Request,然后从下拉列表中选择com.gslab.pepper.sampler.PepperBoxKafkaSampler。


设置项的解释意义如下:
bootstrap.servers/zookeeper.servers

kafka.topic.name

  • 消息发布的主题的名称。

key.serializer

  • 密钥序列化的类。如果消息中没有密钥,请保持不变。

value.serializer

  • 用于消息序列化的类。对于简单文本,字段保持不变。使用Pepper-Box序列化配置时,需要指定“com.gslab.Pepper.input.Serialized.ObjectSerializer”。

compression.type

  • 消息压缩的一种类型(none/gzip/snappy/lz4)

batch.size

  • 最大的消息大小。

linger.ms

  • 消息等待时间。

buffer.memory

  • 生产商的缓冲区大小。

acks

  • 服务质量(-1/0/1-不保证传递/消息一定会传递/消息只传递一次)。

receive.buffer.bytes/send.buffer.bytes

  • TCP发送/接收缓冲区的大小。-1-使用默认OS值。

protocol

  • 加密协议(明文/SSL/SASL_明文/SASL_SSL)。

message.placeholder.key

  • 在前面的元素中指定的消息键。

kerberos.auth.enabled、java.security.auth.login.config、java.security.krb5.conf、sasl.kerberos.service.name是负责身份验证的字段组。

此外,如果需要,可以在名称前使用前缀添加其他参数,例如,ssl.key.password。

4. 配置消费者Consumer

现在让我们来谈谈消费者Consumer。虽然在服务器上创建最大负载的是生产者Producer,但服务也必须传递消息。因此,我们还应该增加消费者,以更准确地再现情况。它们还可用于检查是否已发送所有消费者消息。

作为一个例子,让我们使用以下源代码并简要介绍其步骤:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("TestTopic"));
while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s" + "\n", record.offset(), record.key(), record.value());}
}

1.执行连接配置。
2.将指定一个主题“TestTopic”并对其进行订阅。
3.在本主题的循环中接收消息并将其带到控制台。
经过一些修改后的代码将添加到JMeter中的JSR223 Sampler中。

5. 在JMeter中构建负载测试Apache Kafka场景

现在,我们已经研究了创建负载的所有必要元素,让我们尝试将几个消息发布到Kafka服务的主题。假设我们有一个资源,可以从中收集有关其活动的数据。信息将作为XML文档发送。
1.添加Pepper-Box Text Config并创建模板。消息的结构如下:消息编号messageId、UUID、从中收集统计信息的项目ID、统计信息、发送日期戳,消息模板显示如下方截图所示:


2.添加Pepper-Box Kafka Sampler。在其中,指定kafka服务中bootstrap.servers和kafka.topic.name的地址。在我们的例子中,代理的地址是localhost:2181,演示的主题是TestTopic。我们还将从上一步的template元素中指定placeholder.key。
3. 将带有消费者Consumer代码的JSR223 Sampler添加到单独的线程组中。要使其工作,您还需要一个kafka-clients-x.x.x.x.jar文件,其中包含用于使用kafka的类,您可以在kafka目录/kafka/lib中找到这个jar包。接着为了更方便地查看测试数据,修改了脚本的一部分,并将数据保存到一个文件中。此外还添加了设置使用者执行时间所必需的部分,这里设置为5秒,更改后的全部消费者代码如下:

import org.apache.kafka.clients.consumer.*;import javax.script.ScriptException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test1");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("TestTopic"));
long t = System.currentTimeMillis();
long end = t + 5000;
f = new FileOutputStream(".\\data.csv", true);
p = new PrintStream(f);
while (System.currentTimeMillis()<end) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> record : records) {p.println( "offset = " + record.offset() +" value = " + record.value());}consumer.commitSync();
}
consumer.close();
p.close();
f.close();

这个jmx文件的整体结构如下所示,两个线程组同时工作,生产者开始将消息发布到指定的主题,消费者连接到主题并等待来自Kafka的消息。当消费者收到消息时,它会将消息写入文件。

6. 运行脚本并查看View Results Tree.

从上面的截图可以看到,已经发送了10条消息,你可以在jmeter的bin目录下找到“data.csv”这个文件并查看接收到的信息。这样设置完成之后,只需要调整消费者和生产者的数量来增加负荷。

我把生产者的线程调整成300个,最终的聚合报告截图如下:

值得提醒的是,Apache Kafka是为大量连接而设计的,因此您可以简单地达到网络负载生成器的容量限制。在这种情况下,JMeter保持了分布式测试的特性。以上就是如何使用JMeter加载测试Apache Kafka的全部学习记录,附上我的jmx文件地址:kafka_pepper_box.jmx_-kafka文档类资源-CSDN下载。如果pepper-box-1.0.jar包无法下载,请转至该地址进行下载:pepper-box-1.0.jar_-互联网文档类资源-CSDN下载。

参考文章:

1. 如何使用Jmeter对Kafka进行性能测试_shan286的专栏-CSDN博客_jmeter kafka

2. https://www.blazemeter.com/blog/apache-kafka-how-to-load-test-with-jmeter

Jmeter之创建Kafka生产者和消费者进行性能测试相关推荐

  1. 深入分析Kafka生产者和消费者

    深入Kafka生产者和消费者 Kafka生产者 消息发送的流程 发送方式 发送并忘记 同步发送 异步发送 生产者属性配置 序列化器 分区器 自定义分区器 Kafka消费者 消费者属性配置 消费者基础概 ...

  2. Kafka生产者与消费者详解

    什么是 Kafka Kafka 是由 Linkedin 公司开发的,它是一个分布式的,支持多分区.多副本,基于 Zookeeper 的分布式消息流平台,它同时也是一款开源的基于发布订阅模式的消息引擎系 ...

  3. Kafka 生产者、消费者命令行操作

    Kafka 生产者.消费者命令行操作 1.查看操作生产者命令参数 bin/kafka-console-producer.sh 参数 --bootstrap-server <String: ser ...

  4. kafka生产者和消费者端的数据不一致

    撸了今年阿里.头条和美团的面试,我有一个重要发现.......>>> kafka生产者生产30条数据,而消费者却不一定消费了30条数据,经过探索发现了main线程执行完成了而kafk ...

  5. java调用kafka接口发送数据_Java调用Kafka生产者,消费者Api及相关配置说明

    本次的记录内容包括: 1.Java调用生产者APi流程 2.Kafka生产者Api的使用及说明 3.Kafka消费者Api的使用及说明 4.Kafka消费者自动提交Offset和手动提交Offset ...

  6. java最简单的kafka生产者和消费者,未结合spring

    目录 1 最简单的生产者和消费者 1.1 引入maven 1.2 基本的生产者和代码注释 1.3 最简单消费者 2 生产者发送消息的三种方式 2.1 直接send之后就不管了,会自动重试,可能丢失消息 ...

  7. Kafka 生产者及消费者详解

    一.Kafka 生产者 1.1 分区策略 1)分区的原因 (1)方便在集群中扩展,每个Partition可以通过调整以适应它所在的机器,而一个topic又可以有多个Partition组成,因此整个集群 ...

  8. pykafka连接重要使用pykafka,kafka-python的api开发kafka生产者和消费者

    https://pykafka.readthedocs.io/en/latest/api/producer.html 说明文档 </div><h2 class="heade ...

  9. kafka生产者、消费者java示例

    1. 生产者 import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.Ke ...

最新文章

  1. 从源码分析DEARGUI之add_drawing
  2. 2.STM32中对Key_GPIO_Config()函数的理解(自定义)之轮询控制按键LED
  3. 【CyberSecurityLearning 32】Apache配置、Apache的访问控制设定、LAMP平台的搭建
  4. android 单元测试 多线程,单元测试多线程Android RxJava
  5. Apollo进阶课程 ⑦ | 高精地图的采集与生产
  6. PHP5时间相差八小时问题[三种方法]
  7. LeetCode-MySQL196. 删除重复的电子邮箱
  8. 高性能队列--Disruptor
  9. [CTO札记]电纸书,将成为教学、阅读潮流
  10. echarts 引用地图的json
  11. 浅谈SQL注入的四种防御方法
  12. xvid开放源码xvidcore-1.1.3.zip在VC下成功编译的方法
  13. vscode中文乱码
  14. 怎样用Python自制好看的指数估值图
  15. 随机出现“No result defined for action ....Action and result input”解决
  16. php 月份查询生日_PHP判断日期(生日)格式是否正确合法的方法
  17. 《战双帕弥什》的动作打击感是怎么做出来的
  18. 【2021-09-22 修订】【梳理】计算机网络:自顶向下方法 第二章 应用层(docx)
  19. FAST迅捷路由器设置
  20. 赣州旅游职业学校学计算机,赣州旅游职业学校是公办的吗

热门文章

  1. chromedp网络监听_动态爬虫三:监听网络事件 + 监听js事件
  2. 皮一皮:可怜的西瓜...
  3. 明明有了 promise ,为啥还需要 async await ?
  4. 如何正确的创建和销毁Java对象
  5. Mysql 都会遭受哪些方面的攻击?
  6. 【实用】面对枯燥的源码,如何才能看得下去?
  7. 最好用的 IntelliJ 插件 Top 10
  8. java输入字符数组_JAVA中怎样把用户输入的字符串存入数组中?
  9. you should specify the `steps` argument
  10. vs2017 release模式断点调试