kafka系列之springboot集成示例
源码地址:
https://github.com/pony-maggie/springboot-kafka-demo
本地kafka和zk环境
我们需要在本地启动一个单机版的kafka和zookeeper环境。kafka的安装包自带zookeeper,直接启动即可,这个详细过程不是本文的重点,不详细说了。
我的本地环境配置如下:
- win10系统
- kafka_2.12-1.1.1
- zookeeper-3.4.9
- spring boot 2.1.6.RELEASE
启动zk,端口是2181
C:\kafka\kafka_2.12-1.1.1
λ .\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
启动kafka,端口是
C:\kafka\kafka_2.12-1.1.1
λ .\bin\windows\kafka-server-start.bat .\config\server.properties
记得查看启动日志确认启动成功才行。
用kafka自带的工具创建一个topic试试:
C:\kafka\kafka_2.12-1.1.1
λ .\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test20190713
Created topic "test20190713".
可以看到创建成功了。然后我们查询下kafka的topic,
C:\kafka\kafka_2.12-1.1.1
λ .\bin\windows\kafka-topics.bat --list --zookeeper localhost:2181
test20190713
然后我们可以用kafka自带的生产者和消费者工具进行测试,进一步验证本地环境。
首先分别启动生产者和消费者,
C:\kafka\kafka_2.12-1.1.1
λ .\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test20190713
>this is a test
>
C:\kafka\kafka_2.12-1.1.1
λ .\bin\windows\kafka-console-consumer.bat --zookeeper localhost:2181 --topic test20190713
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
在消费者的窗口输入消息,很快消费者窗口就会显示出该消息了。或者消费者启动也可以用下面的方式:
C:\kafka\kafka_2.12-1.1.1
λ .\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test20190713
原因可以参考:
Kafka中的broker-list,bootstrap-server以及zookeeper
下面两个如何配置
创建demo项目工程
依赖
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>
配置
#============== kafka ===================
# 指定kafka 代理地址,可以多个
spring.kafka.bootstrap-servers=127.0.0.1:9092#=============== 生产者配置=======================spring.kafka.producer.retries=0
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer#===============消费者配置=======================
# 指定默认消费者group id
spring.kafka.consumer.group-id=test-consumer-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
先来解释下这几个配置,
- bootstrap-servers:连接kafka的地址,多个地址用逗号分隔
- batch-size:当将多个记录被发送到同一个分区时, Producer 将尝试将记录组合到更少的请求中。这有助于提升客户端和服务器端的性能。这个配置控制一个批次的默认大小(以字节为单位)。16384是缺省的配置
- retries:若设置大于0的值,客户端会将发送失败的记录重新发送
- buffer-memory:Producer 用来缓冲等待被发送到服务器的记录的总字节数,33554432是缺省配置
- key-serializer:关键字的序列化类
- value-serializer:值的序列化类
到这里配置就可以结束了,目前spring-kafka已经和spring boot无缝对接,可以自动加载配置文件进行配置,我们不需要再单独定义配置类。
测试代码
我们先定义一个消息实体,方便消费者和生产者共享。
@Data
public class Message {private Long id; //idprivate String msg; //消息private Date sendTime; //时间戳}
然后是生产者,
@Component
@Slf4j
public class KafkaSender {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;private Gson gson = new GsonBuilder().create();//发送消息方法public void send() {Message message = new Message();message.setId(System.currentTimeMillis());message.setMsg(UUID.randomUUID().toString());message.setSendTime(new Date());log.info("+++++++++++++++++++++ message = {}", gson.toJson(message));kafkaTemplate.send("test", gson.toJson(message));}
}
代码很简单,不做过多解释。
然后是消费者,
@Component
@Slf4j
public class KafkaReceiver {@KafkaListener(topics = {"test"})public void listen(ConsumerRecord<?, ?> record) {Optional<?> kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) {Object message = kafkaMessage.get();log.info("----------------- record =" + record);log.info("------------------ message =" + message);}}
}
只需要在监听的方法上通过注解配置一个监听器即可,另外就是指定需要监听的topic。
kafka的消息再接收端会被封装成ConsumerRecord对象返回,它内部的value属性就是实际的消息。
测试
启动springboot项目,通过日志可以看出消息的收发都是正常的。
2019-07-29 15:02:28.812 INFO 13468 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version : 2.0.1
2019-07-29 15:02:28.812 INFO 13468 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : fa14705e51bd2ce5
2019-07-29 15:02:28.819 INFO 13468 --- [ad | producer-1] org.apache.kafka.clients.Metadata : Cluster ID: eGkIiJuNTHGwNkZy31j2NQ2019-07-29 15:02:31.831 INFO 13468 --- [ main] c.p.github.kafka.producer.KafkaSender : +++++++++++++++++++++ message = {"id":1564383751831,"msg":"7c4f3344-d366-453f-ba12-4ca091171636","sendTime":"Jul 29, 2019 3:02:31 PM"}
2019-07-29 15:02:31.839 INFO 13468 --- [ntainer#0-0-C-1] c.p.github.kafka.consumer.KafkaReceiver : ----------------- record =ConsumerRecord(topic = test, partition = 0, offset = 1, CreateTime = 1564383751831, serialized key size = -1, serialized value size = 102, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {"id":1564383751831,"msg":"7c4f3344-d366-453f-ba12-4ca091171636","sendTime":"Jul 29, 2019 3:02:31 PM"})
2019-07-29 15:02:31.839 INFO 13468 --- [ntainer#0-0-C-1] c.p.github.kafka.consumer.KafkaReceiver : ------------------ message ={"id":1564383751831,"msg":"7c4f3344-d366-453f-ba12-4ca091171636","sendTime":"Jul 29, 2019 3:02:31 PM"}
2019-07-29 15:02:34.833 INFO 13468 --- [ main] c.p.github.kafka.producer.KafkaSender : +++++++++++++++++++++ message = {"id":1564383754833,"msg":"7d515786-09f4-41f0-b512-99e3489c1d82","sendTime":"Jul 29, 2019 3:02:34 PM"}
2019-07-29 15:02:34.848 INFO 13468 --- [ntainer#0-0-C-1] c.p.github.kafka.consumer.KafkaReceiver : ----------------- record =ConsumerRecord(topic = test, partition = 0, offset = 2, CreateTime = 1564383754834, serialized key size = -1, serialized value size = 102, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {"id":1564383754833,"msg":"7d515786-09f4-41f0-b512-99e3489c1d82","sendTime":"Jul 29, 2019 3:02:34 PM"})
2019-07-29 15:02:34.849 INFO 13468 --- [ntainer#0-0-C-1] c.p.github.kafka.consumer.KafkaReceiver : ------------------ message ={"id":1564383754833,"msg":"7d515786-09f4-41f0-b512-99e3489c1d82","sendTime":"Jul 29, 2019 3:02:34 PM"}
我们在代码里创建了一个名为 “test” 的topic,可以通过命令查询下:
C:\kafka\kafka_2.12-1.1.1
λ .\bin\windows\kafka-topics.bat --list --zookeeper localhost:2181
__consumer_offsets
test
test20190713
其它说明
如果启动的时候报错,需要考虑springboot和spring-kafka兼容性问题。比如一开始我启动的时候就报错:
Error creating bean with name 'org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration'
后来把spring-kafka的版本升级下就好了。具体的版本对应关系可以看下官方的说明:
https://spring.io/projects/spring-kafka
参考:
http://kafka.apachecn.org/documentation.html
http://www.54tianzhisheng.cn/2018/01/05/SpringBoot-Kafka/
欢迎大家关注我的公众号
kafka系列之springboot集成示例相关推荐
- 三、美团大众点评CAT监控系列文章---Springboot集成CAT并实现邮件告警
文章目录 一.配置过程 二.相关的参考文档 三.系列文章 一.配置过程 登录自己的cat服务端 默认登录用户名和密码都是 catadmin添加一个监控规则: 如下面的监控规则就是说,如果在一分钟以内访 ...
- 深入理解Redis系列之SpringBoot集成Redis
SpringBoot环境 快速搭建一个SpringBoot工程 进入 https://start.spring.io 网站, 使用该网站初始化一个SpringBoot工程 添加相关依赖 因为使用spr ...
- SpringBoot集成Kafka
SpringBoot集成Kafka 知识索引 SpringBoot集成Kafka工程搭建 SpringBoot集成Kafka配置 SpringBoot集成Kafka生产消息 SpringBoot集成K ...
- kafka(组件分析 整合springboot集成 实战)
kafka 组件 搭建 springboot集成 实战 kafka 组件 搭建 springboot集成 实战 1.应用场景 1.1 kafka场景 1.2 kafka特性 1.3 消息对比 1.4 ...
- 从现在开始学 Kafka:SpringBoot 集成 Kafka,生产者与消费者示例
从现在开始学 Kafka:SpringBoot 集成 Kafka,生产者与消费者示例 前言 加依赖 生产者 加配置 生产者代码示例 消费者 加配置 消费者监听器示例 调用 关于 Serializer ...
- SpringBoot集成Kafka消息队列
1.说明 Spring可以方便的集成使用 Kafka消息队列 , 只需要引入依赖包spring-kafka, 注意版本兼容问题, 本文详细介绍SpringBoot集成Kafka的方法, 以及生产者和消 ...
- SpringBoot集成kafka全面实战
本文是SpringBoot+Kafka的实战讲解,如果对kafka的架构原理还不了解的读者,建议先看一下<大白话kafka架构原理>.<秒懂kafka HA(高可用)>两篇文章 ...
- dubbo web工程示例_dubbo实战之二:与SpringBoot集成
欢迎访问我的GitHub https://github.com/zq2599/blog_demos 内容:所有原创文章分类和汇总,及配套源码,涉及Java.Docker.Kubernetes.DevO ...
- php 配置文件加密工具类,SpringBoot集成Jasypt安全框架以及配置文件内容加密(代码示例)...
本篇文章给大家带来的内容是关于SpringBoot集成Jasypt安全框架以及配置文件内容加密(代码示例),有一定的参考价值,有需要的朋友可以参考一下,希望对你有所帮助. 我们在SpringBoot项 ...
最新文章
- oracle何时使用to_date函数
- 性能优化之数据库优化
- javascript事件循环机制EventLoop
- 提高效率的几个软件和快捷键
- 【译】canvas笔触魔法师
- vb初学者编程代码大全_VB编程应该如何学习?
- I2C总线之(一)---概述
- Hadoop+Spark 集群部署
- 3年后准确率仍达97%:利用谷歌语音转文本 API 绕过reCAPTCHA
- CCF NOI1020 数字识别
- Ajax学习笔记-错误的处理-7
- [源码解读]position_estimator_inav_main解读(如何启动光流)
- mysql 最大建议行数2000w,靠谱吗?
- 数字图像处理 - 实验作业一 - Python
- 作为审稿人,你什么情况下会选择拒稿?
- 生物信息学python脚本_使用Python/Biopython/Clustalw的生物信息学脚本,使用stdout迭代蛋白质目录...
- 笔记本 - 常用快捷键 word 笔记
- R配对样本t检验(PAIRED T-TEST​​​​​​​)
- ab压测_上传图片进行压测
- OpenCVSharp 相机棋盘格校正