大家好,我是烤鸭:

今天分享下 springboot 整合 kafka。

1.  环境参数:

windows + kafka_2.11-2.3.0 + zookeeper-3.5.6 + springboot 2.3.0

2.  下载安装zookeeper + kafka

zookeeper:

https://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.5.8/apache-zookeeper-3.5.8-bin.tar.gz

复制 zoo_sample.cfg ,改名为 zoo.cfg,增加日志路径:

dataDir=D:\xxx\env\apache-zookeeper-3.5.6-bin\data
dataLogDir=D:\xxx\env\apache-zookeeper-3.5.6-bin\log

启动zk,zkServer.cmd

kafka:

https://kafka.apache.org/downloads

找 Binary downloads 下载
https://archive.apache.org/dist/kafka/2.3.0/kafka_2.12-2.3.0.tgz

修改 config/server.properties,由于zk用的默认端口 2181,所以不需要改

log.dirs=D:\\xxx\\env\\kafka\\logs

启动kafka

 D:\xxx\env\kafka\bin\windows\kafka-server-start.bat D:\xxx\env\kafka\config\server.properties

3.  springboot 接入

pom.xml

 <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope><exclusions><exclusion><groupId>org.junit.vintage</groupId><artifactId>junit-vintage-engine</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><version>2.3.0.RELEASE</version><scope>compile</scope></dependency></dependencies>

application.yml

spring:kafka:# 指定kafka server的地址,集群配多个,中间,逗号隔开bootstrap-servers: 127.0.0.1:9092# 生产者producer:# 写入失败时,重试次数。当leader节点失效,一个repli节点会替代成为leader节点,此时可能出现写入失败,# 当retris为0时,produce不会重复。retirs重发,此时repli节点完全成为leader节点,不会产生消息丢失。retries: 0# 每次批量发送消息的数量,produce积累到一定数据,一次发送batch-size: 16384# produce积累数据一次发送,缓存大小达到buffer.memory就发送数据buffer-memory: 33554432# 指定消息key和消息体的编解码方式key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerproperties:linger.ms: 1# 消费者consumer:enable-auto-commit: falseauto-commit-interval: 100mskey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproperties:session.timeout.ms: 15000group-id: group
server:port: 8081

KafkaDemoController.java

package com.mys.mys.demo.kafka.web;import com.mys.mys.demo.kafka.service.KafkaSendService;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;@RestController
public class KafkaDemoController {@Autowiredprivate KafkaTemplate<String, Object> kafkaTemplate;@AutowiredKafkaSendService kafkaSendService;@GetMapping("/message/send")public boolean send(@RequestParam String message) {//默认自动创建,消费者端 allow.auto.create.topics = true//createTopic();kafkaTemplate.send("testTopic-xxx15", message);return true;}//同步@GetMapping("/message/sendSync")public boolean sendSync(@RequestParam String message){kafkaSendService.sendSync("synctopic",message);return  true;}//异步示例@GetMapping("/message/sendAnsyc")public boolean sendAnsys(@RequestParam String message){kafkaSendService.sendAnsyc("ansyctopic",message);return  true;}/*** @Author* @Description 创建主题* @Date 2020/5/23 19:03* @Param []* @return void**/private void createTopic() {Map<String, Object> configs = new HashMap<>();configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");KafkaAdmin admin = new KafkaAdmin(configs);NewTopic newTopic = new NewTopic("testTopic-xxx15",1,(short)1);AdminClient adminClient = AdminClient.create(admin.getConfigurationProperties());adminClient.createTopics(Arrays.asList(newTopic));}
}

KafkaSendService.java

package com.mys.mys.demo.kafka.service;import com.mys.mys.demo.kafka.handler.KafkaSendResultHandler;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;@Service
public class KafkaSendService {@Autowiredprivate KafkaTemplate<String,Object> kafkaTemplate;@Autowiredprivate KafkaSendResultHandler producerListener;/*** 异步示例* */public void sendAnsyc(final String topic,final String message){//统一监听处理kafkaTemplate.setProducerListener(producerListener);ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic,message);//具体业务的写自己的监听逻辑future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {@Overridepublic void onSuccess(SendResult<String, Object> result) {System.out.println("发送消息成功:" + result);}@Overridepublic void onFailure(Throwable ex) {System.out.println("发送消息失败:"+ ex.getMessage());}});}/*** 同步示例* */public void sendSync(final String topic,final String message){ProducerRecord<String, Object> producerRecord = new ProducerRecord<>(topic, message);try {kafkaTemplate.send(producerRecord).get(10, TimeUnit.SECONDS);System.out.println("发送成功");}catch (ExecutionException e) {System.out.println("发送消息失败:"+ e.getMessage());}catch (TimeoutException | InterruptedException e) {System.out.println("发送消息失败:"+ e.getMessage());}}
}

CustomerListener.java

package com.mys.mys.demo.kafka.consumer;import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;@Component
public class CustomerListener {@KafkaListener(topics="testTopic")public void onMessage(String message){System.out.println("消费="+message);}@KafkaListener(topics="testTopic-xxx14")public void onMessage1(String message){System.out.println("消费="+message);}@KafkaListener(topics="testTopic-xxx15")public void onMessage15(String message){System.out.println("消费="+message);}
}

KafkaSendResultHandler.java(用于接收异步的返回值)

package com.mys.mys.demo.kafka.handler;import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.stereotype.Component;@Component
public class KafkaSendResultHandler implements ProducerListener {private static final Logger log = LoggerFactory.getLogger(KafkaSendResultHandler.class);@Overridepublic void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) {log.info("Message send success : " + producerRecord.toString());}@Overridepublic void onError(ProducerRecord producerRecord, Exception exception) {log.info("Message send error : " + producerRecord.toString());}
}

4.  效果和部分源码分析

看一下项目启动的日志,消费者监听到的分区和队列名称。另外如果kafka没有这个队列,在调用send方法时自动创建,看以下这个配置。
auto.create.topics.enable ,默认为 true。

访问路径:http://localhost:8081/message/send?message=1234
输出结果。

可以看下 ProducerRecord 这个类,方法先不贴了,看这几个属性。

public class ProducerRecord<K, V> {//队列名称private final String topic;//分区名称,如果没有指定,会按照key的hash值分配。如果key也没有,按照循环的方式分配。private final Integer partition;//请求头,用来存放k、v以外的信息,默认是只读的private final Headers headers;//key-valueprivate final K key;private final V value;//时间戳,如果不传,默认按服务器时间来private final Long timestamp;
}

再看下 Producer,重点看下 send方法,kafka支持同步或异步接收消息发送的结果,实现都是靠Future,只是异步的时候future执行了回调方法,支持拦截器方式。

/*** The interface for the {@link KafkaProducer}* @see KafkaProducer* @see MockProducer*/
public interface Producer<K, V> extends Closeable {/*** See {@link KafkaProducer#send(ProducerRecord)}*/Future<RecordMetadata> send(ProducerRecord<K, V> record);/*** See {@link KafkaProducer#send(ProducerRecord, Callback)}*/Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);
}

更详细的看这篇文章说的很好:

https://www.cnblogs.com/dingwpmz/p/12153036.html

简单总结一下:
Producer的send方法并不会直接像broker发送数据,而是计算消息长度是否超限,是否开启事务,如果当前缓存区已写满或创建了一个新的缓存区,则唤醒 Sender(消息发送线程),将缓存区中的消息发送到 broker 服务器,以队列的形式(每个topic+每个partition维护一个双端队列),即 ArrayDeque,内部存放的元素为 ProducerBatch,即代表一个批次,即 Kafka 消息发送是按批发送的。

springboot 整合 kafka demo 顺便看一下源码相关推荐

  1. 一小时学会使用Springboot整合沙箱环境支付宝支付(附源码)

    0.前言 文章需求: 对于学生来说,目前网上确实没有比较统一而且质量好的支付教程.因为支付对个人开发者尤其是学生来说不太友好.因此,自己折腾两天,算是整理了一篇关于支付宝沙箱支付的文章. 那么为什么不 ...

  2. kafka 安装使用 /springboot整合kafka /消息投递机制以及存储策略 /副本处理机制

    一.背景 1.基本信息 Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写.Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流 ...

  3. Kafka精品教学(入门,安装,Springboot整合Kafka)

    ps:本文是博主结合视频和博客学习之后,自己实验总结编写的,如果侵权请联系删除. 要学习kafka首先要了解什么是消息队列,因为Kafka 是一个分布式的基于发布 / 订阅模式的消息队列(Messag ...

  4. springboot 整合kafka 实现生产,消费数据

    一 kafka集群的启动 1.1 机器说明 192.168.152.128 master 192.168.152.129 slaver01 192.168.152.130 slaver02 1.2 查 ...

  5. Kafka原理以及SpringBoot整合Kafka

    1.Kafka原理 1. brokers有多个broker组成,broker是指Kafka服务器(192.168.223.140就是其中的一个broker),上面三台Kafka服务器组成了Kafka集 ...

  6. SpringBoot整合kafka(实现producer和consumer)

    转载自 SpringBoot整合kafka(实现producer和consumer) 在Windows环境下安装运行Kafka:https://www.jianshu.com/p/d64798e81f ...

  7. SpringBoot整合kafka之kafka分区实战

    本文来说下SpringBoot整合kafka之kafka分区实战 文章目录 准备工作 程序代码 程序测试 本文小结 准备工作 当然我们也可以不手动创建topic,在执行代码kafkaTemplate. ...

  8. SpringBoot整合kafka实战之带回调的生产者

    本文来说下SpringBoot整合kafka部分知识内容 文章目录 带回调的生产者 方式一 方式二 本文小结 带回调的生产者 前面我们说了简单的生产和消费,本文说下带回调的生产者.kafkaTempl ...

  9. 面试有没有看过spring源码_如何看Spring源码、Java每日六道面试分享,打卡第二天...

    原标题:如何看Spring源码.Java每日六道面试分享,打卡第二天 想要深入的熟悉了解Spring源码,我觉得第一步就是要有一个能跑起来的极尽简单的框架,下面我就教大家搭建一个最简单的Spring框 ...

最新文章

  1. Java基础20:Java8新特性终极指南
  2. 地图下面的标尺是什么意思_房屋产权70年产权吧,下面的使用年限是什么意思?...
  3. 作《互联网时代的软件革命--SaaS架构设计》上市了
  4. Windows上卸载SqlServer数据库
  5. Lucene搜索引擎例子demo
  6. 通过反射获取私有构造方法并使用
  7. WebRTC 成为 W3C 与 IETF 正式标准
  8. Linux下apache和fcgi的关系,Linux下编译安装Apache httpd 2.4
  9. 关于js的冒泡--新手踩坑案例
  10. HackerRank [Algo] Matrix Rotation
  11. 【题解】PTA-Python题库 浙大版《Python 程序设计》题目集题解索引
  12. xp服务器远程连接设置方法,Windows XP远程桌面连接设置图解教程
  13. 2年前端 杭州 面试 集合 面经 前端
  14. Android 版本对应 Version Code
  15. 【图像分割】基于收缩系数的粒子群混合引力搜索算法多级图像阈值分割算法研究附matlab代码
  16. 想在美国开餐厅?想招人?顶级餐饮经营锦囊,300个餐饮人等你来撩!
  17. 用python做一个文本翻译器,自动将中文翻译成英文,超方便的
  18. zuul两大作用_springCloud学习- 路由网关(zuul)
  19. nodeJs各个版本下载
  20. 硬盘检测工具HARD DISK SENTINEL PRO(硬盘哨兵)v5.70便携版

热门文章

  1. [html] img中的src加载失败时如何用默认图片来替换呢?
  2. 工作307:uni-富文本的实现逻辑跳转
  3. 前端学习(2568):使用高级特性provide和inject
  4. 工作30:加入git版本库
  5. 前端学习(2307):react之props和state
  6. 前端学习(2116):为什么组件data必须是函数
  7. 前端学习(1675):前端系列实战课程之无缝滚动思路
  8. 前端学习(1524):ES6模块导入和导出
  9. 前端学习(926):淘宝flexiblejs源码分析之核心原理
  10. 如何给定两个gps坐标 算出航向角_机器人开发如何配置ROS中的TF变换关系?