源码地址:

https://github.com/pony-maggie/springboot-kafka-demo


本地kafka和zk环境

我们需要在本地启动一个单机版的kafka和zookeeper环境。kafka的安装包自带zookeeper,直接启动即可,这个详细过程不是本文的重点,不详细说了。

我的本地环境配置如下:

  • win10系统
  • kafka_2.12-1.1.1
  • zookeeper-3.4.9
  • spring boot 2.1.6.RELEASE

启动zk,端口是2181

C:\kafka\kafka_2.12-1.1.1
λ .\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties

启动kafka,端口是

C:\kafka\kafka_2.12-1.1.1
λ .\bin\windows\kafka-server-start.bat .\config\server.properties

记得查看启动日志确认启动成功才行。

用kafka自带的工具创建一个topic试试:

C:\kafka\kafka_2.12-1.1.1
λ .\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test20190713
Created topic "test20190713".

可以看到创建成功了。然后我们查询下kafka的topic,

C:\kafka\kafka_2.12-1.1.1
λ .\bin\windows\kafka-topics.bat --list --zookeeper localhost:2181
test20190713

然后我们可以用kafka自带的生产者和消费者工具进行测试,进一步验证本地环境。

首先分别启动生产者和消费者,

C:\kafka\kafka_2.12-1.1.1
λ .\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test20190713
>this is a test
>
C:\kafka\kafka_2.12-1.1.1
λ .\bin\windows\kafka-console-consumer.bat --zookeeper localhost:2181 --topic test20190713
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].

在消费者的窗口输入消息,很快消费者窗口就会显示出该消息了。或者消费者启动也可以用下面的方式:

C:\kafka\kafka_2.12-1.1.1
λ .\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test20190713

原因可以参考:

Kafka中的broker-list,bootstrap-server以及zookeeper

下面两个如何配置

创建demo项目工程

依赖

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>

配置

#============== kafka ===================
# 指定kafka 代理地址,可以多个
spring.kafka.bootstrap-servers=127.0.0.1:9092#=============== 生产者配置=======================spring.kafka.producer.retries=0
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer#===============消费者配置=======================
# 指定默认消费者group id
spring.kafka.consumer.group-id=test-consumer-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

先来解释下这几个配置,

  • bootstrap-servers:连接kafka的地址,多个地址用逗号分隔
  • batch-size:当将多个记录被发送到同一个分区时, Producer 将尝试将记录组合到更少的请求中。这有助于提升客户端和服务器端的性能。这个配置控制一个批次的默认大小(以字节为单位)。16384是缺省的配置
  • retries:若设置大于0的值,客户端会将发送失败的记录重新发送
  • buffer-memory:Producer 用来缓冲等待被发送到服务器的记录的总字节数,33554432是缺省配置
  • key-serializer:关键字的序列化类
  • value-serializer:值的序列化类

到这里配置就可以结束了,目前spring-kafka已经和spring boot无缝对接,可以自动加载配置文件进行配置,我们不需要再单独定义配置类。

测试代码

我们先定义一个消息实体,方便消费者和生产者共享。

@Data
public class Message {private Long id;    //idprivate String msg; //消息private Date sendTime;  //时间戳}

然后是生产者,

@Component
@Slf4j
public class KafkaSender {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;private Gson gson = new GsonBuilder().create();//发送消息方法public void send() {Message message = new Message();message.setId(System.currentTimeMillis());message.setMsg(UUID.randomUUID().toString());message.setSendTime(new Date());log.info("+++++++++++++++++++++  message = {}", gson.toJson(message));kafkaTemplate.send("test", gson.toJson(message));}
}

代码很简单,不做过多解释。

然后是消费者,

@Component
@Slf4j
public class KafkaReceiver {@KafkaListener(topics = {"test"})public void listen(ConsumerRecord<?, ?> record) {Optional<?> kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) {Object message = kafkaMessage.get();log.info("----------------- record =" + record);log.info("------------------ message =" + message);}}
}

只需要在监听的方法上通过注解配置一个监听器即可,另外就是指定需要监听的topic。

kafka的消息再接收端会被封装成ConsumerRecord对象返回,它内部的value属性就是实际的消息。

测试

启动springboot项目,通过日志可以看出消息的收发都是正常的。

2019-07-29 15:02:28.812  INFO 13468 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 2.0.1
2019-07-29 15:02:28.812  INFO 13468 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : fa14705e51bd2ce5
2019-07-29 15:02:28.819  INFO 13468 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : Cluster ID: eGkIiJuNTHGwNkZy31j2NQ2019-07-29 15:02:31.831  INFO 13468 --- [           main] c.p.github.kafka.producer.KafkaSender    : +++++++++++++++++++++  message = {"id":1564383751831,"msg":"7c4f3344-d366-453f-ba12-4ca091171636","sendTime":"Jul 29, 2019 3:02:31 PM"}
2019-07-29 15:02:31.839  INFO 13468 --- [ntainer#0-0-C-1] c.p.github.kafka.consumer.KafkaReceiver  : ----------------- record =ConsumerRecord(topic = test, partition = 0, offset = 1, CreateTime = 1564383751831, serialized key size = -1, serialized value size = 102, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {"id":1564383751831,"msg":"7c4f3344-d366-453f-ba12-4ca091171636","sendTime":"Jul 29, 2019 3:02:31 PM"})
2019-07-29 15:02:31.839  INFO 13468 --- [ntainer#0-0-C-1] c.p.github.kafka.consumer.KafkaReceiver  : ------------------ message ={"id":1564383751831,"msg":"7c4f3344-d366-453f-ba12-4ca091171636","sendTime":"Jul 29, 2019 3:02:31 PM"}
2019-07-29 15:02:34.833  INFO 13468 --- [           main] c.p.github.kafka.producer.KafkaSender    : +++++++++++++++++++++  message = {"id":1564383754833,"msg":"7d515786-09f4-41f0-b512-99e3489c1d82","sendTime":"Jul 29, 2019 3:02:34 PM"}
2019-07-29 15:02:34.848  INFO 13468 --- [ntainer#0-0-C-1] c.p.github.kafka.consumer.KafkaReceiver  : ----------------- record =ConsumerRecord(topic = test, partition = 0, offset = 2, CreateTime = 1564383754834, serialized key size = -1, serialized value size = 102, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {"id":1564383754833,"msg":"7d515786-09f4-41f0-b512-99e3489c1d82","sendTime":"Jul 29, 2019 3:02:34 PM"})
2019-07-29 15:02:34.849  INFO 13468 --- [ntainer#0-0-C-1] c.p.github.kafka.consumer.KafkaReceiver  : ------------------ message ={"id":1564383754833,"msg":"7d515786-09f4-41f0-b512-99e3489c1d82","sendTime":"Jul 29, 2019 3:02:34 PM"}

我们在代码里创建了一个名为 “test” 的topic,可以通过命令查询下:

C:\kafka\kafka_2.12-1.1.1
λ .\bin\windows\kafka-topics.bat --list --zookeeper localhost:2181
__consumer_offsets
test
test20190713

其它说明

如果启动的时候报错,需要考虑springboot和spring-kafka兼容性问题。比如一开始我启动的时候就报错:

Error creating bean with name 'org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration'

后来把spring-kafka的版本升级下就好了。具体的版本对应关系可以看下官方的说明:

https://spring.io/projects/spring-kafka

参考:

http://kafka.apachecn.org/documentation.html

http://www.54tianzhisheng.cn/2018/01/05/SpringBoot-Kafka/


欢迎大家关注我的公众号

kafka系列之springboot集成示例相关推荐

  1. 三、美团大众点评CAT监控系列文章---Springboot集成CAT并实现邮件告警

    文章目录 一.配置过程 二.相关的参考文档 三.系列文章 一.配置过程 登录自己的cat服务端 默认登录用户名和密码都是 catadmin添加一个监控规则: 如下面的监控规则就是说,如果在一分钟以内访 ...

  2. 深入理解Redis系列之SpringBoot集成Redis

    SpringBoot环境 快速搭建一个SpringBoot工程 进入 https://start.spring.io 网站, 使用该网站初始化一个SpringBoot工程 添加相关依赖 因为使用spr ...

  3. SpringBoot集成Kafka

    SpringBoot集成Kafka 知识索引 SpringBoot集成Kafka工程搭建 SpringBoot集成Kafka配置 SpringBoot集成Kafka生产消息 SpringBoot集成K ...

  4. kafka(组件分析 整合springboot集成 实战)

    kafka 组件 搭建 springboot集成 实战 kafka 组件 搭建 springboot集成 实战 1.应用场景 1.1 kafka场景 1.2 kafka特性 1.3 消息对比 1.4 ...

  5. 从现在开始学 Kafka:SpringBoot 集成 Kafka,生产者与消费者示例

    从现在开始学 Kafka:SpringBoot 集成 Kafka,生产者与消费者示例 前言 加依赖 生产者 加配置 生产者代码示例 消费者 加配置 消费者监听器示例 调用 关于 Serializer ...

  6. SpringBoot集成Kafka消息队列

    1.说明 Spring可以方便的集成使用 Kafka消息队列 , 只需要引入依赖包spring-kafka, 注意版本兼容问题, 本文详细介绍SpringBoot集成Kafka的方法, 以及生产者和消 ...

  7. SpringBoot集成kafka全面实战

    本文是SpringBoot+Kafka的实战讲解,如果对kafka的架构原理还不了解的读者,建议先看一下<大白话kafka架构原理>.<秒懂kafka HA(高可用)>两篇文章 ...

  8. dubbo web工程示例_dubbo实战之二:与SpringBoot集成

    欢迎访问我的GitHub https://github.com/zq2599/blog_demos 内容:所有原创文章分类和汇总,及配套源码,涉及Java.Docker.Kubernetes.DevO ...

  9. php 配置文件加密工具类,SpringBoot集成Jasypt安全框架以及配置文件内容加密(代码示例)...

    本篇文章给大家带来的内容是关于SpringBoot集成Jasypt安全框架以及配置文件内容加密(代码示例),有一定的参考价值,有需要的朋友可以参考一下,希望对你有所帮助. 我们在SpringBoot项 ...

最新文章

  1. oracle何时使用to_date函数
  2. 性能优化之数据库优化
  3. javascript事件循环机制EventLoop
  4. 提高效率的几个软件和快捷键
  5. 【译】canvas笔触魔法师
  6. vb初学者编程代码大全_VB编程应该如何学习?
  7. I2C总线之(一)---概述
  8. Hadoop+Spark 集群部署
  9. 3年后准确率仍达97%:利用谷歌语音转文本 API 绕过reCAPTCHA
  10. CCF NOI1020 数字识别
  11. Ajax学习笔记-错误的处理-7
  12. [源码解读]position_estimator_inav_main解读(如何启动光流)
  13. mysql 最大建议行数2000w,靠谱吗?
  14. 数字图像处理 - 实验作业一 - Python
  15. 作为审稿人,你什么情况下会选择拒稿?
  16. 生物信息学python脚本_使用Python/Biopython/Clustalw的生物信息学脚本,使用stdout迭代蛋白质目录...
  17. 笔记本 - 常用快捷键 word 笔记
  18. R配对样本t检验(PAIRED T-TEST​​​​​​​)
  19. ab压测_上传图片进行压测
  20. OpenCVSharp 相机棋盘格校正

热门文章

  1. 网址、域名的区别(二)
  2. R语言绘制圈图、环形热图可视化基因组实战:展示基因数据比较
  3. 简单的梯形规划算法实践
  4. cesium-04 primitives.update
  5. SW6206和SW6208支持22.5W输出的全协议移动电源方案
  6. GDAL通过地理坐标获取所在位置高程值(数据源为DEM)
  7. ASO关键词覆盖原理及搜索排名规则
  8. Python小技巧:提取docx文件中表格
  9. 比智商和情商更重要的品质——Grit
  10. aspose转pdf横版_Aspose实现文件转PDF在线预览及水印功能