编写测试用例,将消息发布到Kafka中,自定义消费者消费消息。

1. pom依赖

<dependencies><!-- kafka客户端 --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.0.0</version></dependency><!-- 工具类 --><dependency><groupId>org.apache.commons</groupId><artifactId>commons-io</artifactId><version>1.3.2</version></dependency><dependency><groupId>org.junit.jupiter</groupId><artifactId>junit-jupiter</artifactId><version>RELEASE</version><scope>compile</scope></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>8</source><target>8</target></configuration></plugin></plugins></build>

注意:

客户端版本与服务端不一致时,会出现生产者发送消息,客户端始终消费不到情况。

本机服务端使用的是:kafka_2.13-3.0.0.tgz,所以客户端使用3.0.0版本保持一致即可。


2. 生产消息到kafka中

参考

可以参考以下方式来编写第一个Kafka示例程序

参考以下文档:kafka 2.4.0 API

流程:

  1. 创建一个生产者对象KafkaProducer
  2. 调用send发送1-100消息到指定Topic test,并获取返回值Future,该对象封装了返回值
  3. 再调用一个Future.get()方法等待响应
  4. 关闭生产者

定义常量

package com.magic.kafka;public interface KafkaConstants {String BROKER_LIST = "192.168.0.213:9092";String CLIENT_ID = "client1";String GROUP_ID_CONFIG = "consumerGroup1";String TEST_TOPIC = "test-topic";
}

生产者

package com.magic.kafka;import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;/*** 生产者程序*/
public class ProducerCreator {/*** 构造一个生产者*/private KafkaProducer<String, String> createProducer() {Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.BROKER_LIST);properties.put(ProducerConfig.CLIENT_ID_CONFIG, com.magic.kafka.KafkaConstants.CLIENT_ID);properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());return new KafkaProducer<>(properties);}/*** 发布消息*/public void send(String msg) {KafkaProducer<String, String> producer = createProducer();try {for (int i = 0; i < 100; i++) {//send messageFuture<RecordMetadata> future = producer.send(new ProducerRecord<>(KafkaConstants.TEST_TOPIC,UUID.randomUUID().toString().replace("-", ""), "[" + i + "] " + msg));//等待响应RecordMetadata metadata = future.get();String topic = metadata.topic();int partition = metadata.partition();long offset = metadata.offset();System.out.println("Send a message, value: ' [" + i + "] " + msg + " ', topic: " + topic + ", partition:" + partition + ", offset " + offset);}} catch (ExecutionException | InterruptedException e) {System.out.println("Error in sending record");e.printStackTrace();}producer.close();}public static void main(String[] args) {new ProducerCreator().send("hello, world!");}
}

3. 从Topic中消费消息

参考

可以参考官网API文档编写用例:

kafka 2.4.0 API

流程:

  1. 创建Kafka消费者
  2. 订阅要消费的主题
  3. 使用一个while循环,不断从Kafka的topic中拉取消息
  4. 将将记录(record)的offset、key、value都打印出来

消费者

package com.magic.kafka;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;/*** 消费者程序*/
public class ConsumerCreator {/*** 构造一个消费者*/public KafkaConsumer<String, String> createConsumer() {Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.BROKER_LIST);properties.put(ConsumerConfig.GROUP_ID_CONFIG, KafkaConstants.GROUP_ID);properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());return new KafkaConsumer<>(properties);}/*** 消费消息*/public void consume() throws InterruptedException {KafkaConsumer<String, String>  consumer = createConsumer();//订阅主题并消费消息consumer.subscribe(Collections.singletonList(KafkaConstants.TEST_TOPIC));// 循环消费消息while (true) {// Kafka的消费者一次拉取一批的数据ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));// 5.将将记录(record)的offset、key、value都打印出来for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {String topic = consumerRecord.topic();// offset:这条消息处于Kafka分区中的哪个位置long offset = consumerRecord.offset();int partition = consumerRecord.partition();// key/valueString key = consumerRecord.key();String value = consumerRecord.value();System.out.println("topic: " + topic + ", offset:" + offset + ", partition: "+ partition  + ", key:" + key + ", value: ' " + value + " '");}}}public static void main(String[] args) {try {new ConsumerCreator().consume();} catch (InterruptedException e) {e.printStackTrace();}}
}

4. 测试

1-先启动消费者测试类

2-启动生产者测试类

3-此时消费者测试类将消费到消息

10分钟体验一把Kafka[测试用例]相关推荐

  1. kafka创建topic_Kafka系列第三篇!10 分钟学会如何在 Spring Boot 程序中使用 Kafka 作为消息队列?...

    Guide哥答应大家的 Kafka系列的第3篇原创文章,写的非常详细,没有接触过 Kafka 的朋友应该都可以看懂,觉得不错的话一定要点亮你们的在看!在看就是对Guide 哥最大的鼓励! 为了保证内容 ...

  2. 华为充电的效果_充电60秒吃鸡10分钟 华为Mate30 Pro 快充体验

    今天重构想象的华为Mate30系列在国内正式发布,除了保持Mate系列强劲的拍照性能传统外,其在5G.屏幕.快充等方面对苹果的打击更是针针见血,让一直以来以领头羊形象示人的苹果不免有些自惭形秽. 同为 ...

  3. Vue3.0 10分钟上手体验-Vite

    1.Vite 简单介绍 Vite 是由 Vue 作者尤雨溪开发的一套一种新的.更快地 web 开发工具,它具快速的冷启动.即时的模块热更新.真正的按需编译几个特点. 作者曾在微博上发言:Vite,一个 ...

  4. 中专学校计算机科目试讲稿,中专10分钟试讲教案模板

    第1篇:10分钟试讲教案 10分钟试讲教案 [篇1:10分钟试讲教案] 相关推荐 | | | [10分钟试讲]考生常见问题: 一.知识点太多,不知道十分钟课堂可以讲些什么内容 很多考生在十分钟的课堂里 ...

  5. SQL人的优势:实战大数据开发10分钟入门

    金色的九月,即将开启收获的篇章. 一早醒来,魔都湛蓝的天空,暑气未消的阳光,一扫前几日狂风暴雨的阴霾.品着自己煮的咖啡,吃上一口朱家角寄来的苏荷月饼,人生真赞! 这个礼拜将32G内存和M.2 SSD都 ...

  6. 花10分钟看一看少走30年弯路

    花10分钟看一看少走30年弯路 HP大中华区总裁孙振耀退休感言 : 如果这篇文章没有分享给你,那是我的错. 如果这篇文章分享给你了,你却没有读,继续走弯路的你不要怪我. 如果你看了这篇文章,只读了一半 ...

  7. pulsar 容量_[Pulsar系列] 10分钟学会Pulsar消息系统概念

    Apache Pulsar Pulsar是一个支持多租户的.高性能的服务与服务之间消息通讯的解决方案,最初由雅虎开发,现在由Apache软件基金会管理. Pulsar在Yahoo的生产环境运行了三年多 ...

  8. es6 ... 添加属性_如何在10分钟内免费将HTTPS添加到您的网站,以及为什么您现在不止需要这样做......

    es6 ... 添加属性 by Ayo Isaiah 通过Ayo Isaiah 如何在10分钟内免费将HTTPS添加到您的网站,以及为什么现在比以往更需要这样做 (How to add HTTPS t ...

  9. 10分钟出一个块的BCH,可以作为日常支付?

    中本聪创建比特币的初衷是希望其成为全球支付的货币,作为继承中本聪衣钵的比特币现金更是在这方面不断的努力.比特币平均10分钟出一个块,交易确认的时间也是10分钟左右.10分钟的支付确认时间对于日常支付来 ...

  10. mysql connection闪退重连_玩家排位巅峰赛开局闪退,重连失败,10分钟后一个提示让他懵了...

    #游戏圈中的春节# 王者荣耀排位赛460是经常的事情,不过闪退还是比较少见的,玩家排位巅峰赛开局闪退,重新登录游戏之后,重连一直失败,10分钟后一个提示让他懵了. 460的情况大家都经历过,这是很多原 ...

最新文章

  1. Mybatis二级缓存原理
  2. 揭秘:支付宝小程序 V8 Worker 技术演进
  3. qt绘制一圈圆_Qt绘制圆形,矩形等图形   绘制同心圆
  4. 《你必须知道的.NET》,评价和推荐
  5. java http编码_java httprequest编码/解码
  6. matlab在傅里叶里的应用,MATLAB在傅里叶变换中的应用
  7. typedef的4种常见用法
  8. Java注解:@IntDef 替换 Emum
  9. linux gcc/g++编译参数 -l(大写i)-L(大写l) -l(小写l)
  10. IP摄像头实现远程目标检测(rtsp)
  11. 服务器外置硬盘的分区格式,MacBook下移动硬盘分区配置几种格式解决方案
  12. [BJTU]C语言期中考试总结
  13. linux 进程共享内存同步,Linux使用共享内存通信的进程同步退出问题
  14. python3 常用模块_python3-常用模块之re
  15. 上亿海量数据处理方法
  16. 最流行的布局方案 Flex 弹性盒布局详解
  17. padding样式属性
  18. 「动画演示」勾股定理的证明
  19. OkHttp面试之--HttpEngine中的readResponse流程简介
  20. 经典语录:也许一个人在真正无可奈何的时候除了微笑也只好微笑了

热门文章

  1. mybatis批量插入和批量更新
  2. Tomcat下ajax请求路径总结
  3. Mybatis注解: SQL语句映射@Select @Insert @Updata @Delete @SelectKey
  4. Spring Cloud Alibaba Sentinel之持久化篇
  5. MyCat分片规则之取模分片
  6. Spring缓存切面源码解析
  7. js面向对象练习(二):JS面向对象的思路(canvas)写躁动的小球
  8. vim粘贴代码格式变乱
  9. Hibernate(1)
  10. 思达报表工具Style Report基础教程—通过镜像,子表和联合将逗号分隔的字段内容处理成多行数据...