ps:本文是博主结合视频和博客学习之后,自己实验总结编写的,如果侵权请联系删除。

要学习kafka首先要了解什么是消息队列,因为Kafka 是一个分布式的基于发布 / 订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域。

消息队列

消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象。

消息队列(Message Queue)是一种应用间的通信方式,消息发送后可以立即返回,由消息系统来确保消息的可靠传递。消息发布者只管把消息发布到 MQ 中而不用管谁来取,消息使用者只管从 MQ 中取消息而不管是谁发布的。这样发布者和使用者都不用知道对方的存在。

消息队列是一种应用间的异步协作机制,同时消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题。实现高性能,高可用,可伸缩和最终一致性架构。是大型分布式系统不可缺少的中间件。

kafka

Kafka是一种消息队列,主要用来处理大量数据状态下的消息队列,一般用来做日志的处理。既然是消息队列,那么Kafka也就拥有消息队列的相应的特性了。

1)解耦

在使用kafka之前,是应用A直接调用B的接口,流程是:A执行-->A调用B-->B执行完成-->A执行完成。这种做法就很耦合,如果B应用出错了,则A也不能顺利执行。

使用kafka之后呢,AB就被解耦了,A只管将消息放进kafka里面,B从kafka拿到消息就好了,即使B应用挂掉,A也能正常返回。

2)异步处理

异步处理替代了之前的同步处理,异步处理不需要让流程走完就返回结果,可以将消息发送到消息队列中,然后返回结果,剩下让其他业务处理接口从消息队列中拉取消费处理即可。

3)流量消峰

当高流量的时候,例如秒杀商品的时候,会有大量的请求对服务器进行访问,容易造成服务器的压力剧增,甚至是挂掉,使用kafka之后能对流量进行控制。

kafka架构的基本概念

要了解一门技术,一定要知道它的架构和一些相关的概念。

Kafka的业务流程,总的来说分为三部分:生产者、Kafka服务器、消费者,分别对应图上的左边、中间、和右边。

这里我们主要介绍中间部分的一些概念。

1)Topic主题

首先要了解,Kafka处理消息是按照topic进行分类的,发送消息时需要指定topic,消费者通过订阅topic来消费消息 ,一个topic可被多个消费者订阅,这样可以更快的消费消息,例如一个系统需要发送的信息有:订单信息,用户信息。在Kafka中对应的就是两个topic,topic-user和topic-order。

对应图上,红色和橙色同为一个主题A(只是不同分区),绿色和蓝色分别是topicB和topicC。

2)Partition分区

一个topic是由多个partition组成,默认有一个分区。partition是一个不可修改的消息序列,消息被存储在Partition中,所以Kafka 采用了 topic-partition-message 三级结构来处理消息。

分区的好处就是,如果合理设置好分区,那么同一个主题的消息会均匀的分布在各个分区当中,保证了负载均衡和水平拓展。例如图上TopicA主题,有两个分区,消息量多的时候,会均匀的分布,让系统更好的运行。

举个例子,在高速路上面有三个车道,就理解为三个分区,车子就是消息。一个车道的车多了,就走另外两个车道。就像一个分区的消息多了,就去另外一个分区。

3)Broker

我们都知道Kafka的设计就是集群的方式,所以在实际业务中可能有多个Kafka的服务器,你可以将一个Broker想象成一台机器。假如我在电脑上有三个虚拟机,分别安装了Kafka,并且构成一个集群。那么就对应有三个Broker。

4)Leader 和 Follower

分区是有副本的,分为leder副本和follower副本,这些副本的内容是相同的。但是生产者和消费者只会跟leader进行写入和读取操作,follower只是被动的向leader同步数据以达到数据备份的目的。

简单来说,就是为了保证数据的可靠性,假如Leader的服务器挂掉了,Kafka 会从剩余的副本中选举出新的 leader 续提供服务。因此某个分区的leader和follow副本不会放在同一台服务器上面。

对应图上就是,TopicA的partition0分区有三个副本,其中在Broker1上面的是Leader副本,然后Broker0和Borker2上面分别是其对应的Follower副本。如果Broker1挂掉,就会从另外两个副本中选取一个新的Leader,保证服务的正常运行。

Kafka的持久化

Kafka的数据会儿被持久化到磁盘,但是每次写入的时候是写在操作系统的页缓存(page cache)中,然后交由操作系统再将数据写入到磁盘中,相当于是专业的事交(写入磁盘)给专业的人(操作系统)来做。并且写入的时候是采用追加写入的。

Kafka是先把数据写入 操作系统的页缓存,那么Kafka在读数据的时候也会先从页缓存读取,然后把消息发送到网络的Socket,这个过程是就是使用的是sendfile零拷贝技术。其数据的读取速度是非常快的。

优点是:

1)系统的页缓存是在内存中分配的,所以Kafka写入消息的速度是非常快的。

2)Kafka 不必直接与底层的文件系统打交道。所有烦琐的 IO操作都交由操作系统来处理,增加了写入效率。

3)由于是采用append追加方式写入,所以避免了磁盘的随机操作,磁盘如果按照顺序读写,效率是不输内存的随机读写的,所以速度非常快。

4)使用sendfile为代表的零拷贝技术,加强网络间的数据传输效率。

kafka的安装和入门

下载

官方下载地址:https://kafka.apache.org/downloads

我这里下载的是3.2.0的

注意:kafka是使用Scala开发,所以版本号是由 Scala的版本号和Kafka版本号组成的,如:kafka_2.12-3.2.0 , 2.12是scala版本, 3.2.0是kafka版本

下载后解压,目录结构如下

1)bin:

kafka的执行脚本 ,其中包括启动kafka的脚本,启动zookeeper的脚本 ,bin/windows 目录中的脚本是针对windows平台。

2)config:

配置文件目录 ,包括server.properties :kafka的配置 ;zookeeper.properties :zookeeper的配置, producer.properties:生产者的配置 ; consumer.properties 消费者的配置等等。

3)libs:依赖的三方jar包

4)logs:运行一次Kafka后自动生成,是内置zookeeper做记录的地方

配置文件

1)zookeeper的配置,zookeeper.properties作为zookeeper的配置文件

  • dataDir : 数据存储目录

  • clientPort :端口

2)Kafka配置,server.properties作为kafka的配置文件,我们关注下面几个配置,你也可以根据情况进行修改

  • broker.id =0 : 如果是做个多个kafka主机集群,那么brocker.id不能重复,0 ;1 ;2 增长

  • zookeeper.connect : zookeeper的地址 ,如果有多个zk就用逗号隔开配置多个地址

  • num.partions = 1 : 默认partions 数量默认为1

  • log.dirs : 日志目录,不建议放到tmp临时目录,一定要修改,如log.dirs=d:/kafka-logs

启动

(一)启动Kafka内置的zookeeper,进入bin/windows目录,打开cmd窗口,执行如下命令:

zookeeper-server-start.bat ../../config/zookeeper.properties

注意:如果在Linux启动,执行bin目录下的启动脚本即可

zookeeper-server-start.sh ../config/zookeeper.properties

(二)启动Kafka,还是在/bin/windows目录打开cmd窗口,执行如下命令

kafka-server-start.bat ../../config/server.properties

不出意外你已经成功启动了kafka

TOPIC的操作

kafka-topics.bat :是供针对topic的操作脚本,可对topic进行增删改查

在/bin/windows目录下使用cmd输入kafka-topics.bat可以查看到kafka-topics.bat的帮助说明

几个核心的相关参数

参数名称

解释

alter 用于修改主题,包括分区数及主题的配置
config <键值对> 创建或修改主题时,用于设置主题级别的参数
create 创建主题
delete 删除主题
delete-config <配置名称> 删除主题级别被覆盖的配置
describe 查看主题的详细信息
help 打印帮助信息
if-exists 修改或删除主题时使用,只有当主题存在时才会执行操作
if-not-exists 创建主题时使用,只有主题不存在时才会执行动作
list 列出所有可用的主题
partitions <分区数> 创建主题或增加分区时指定分区数
replica-assignment <分配方案> 手工指定分区副本分配方案
replication-factor <副本数> 创建主题时指定副本因子
topic <主题名称> 指定主题名称
zookeeper 指定连接的zookeeper地址信息(必填项)

上面命令参数的使用方法:在/bin/windows目录下面使用cmd窗口,输入kafka.bat 后面接参数(后面会介绍)

创建TOPIC

kafka-topics.bat --bootstrap-server localhost:9092 --create --topic myTopic

--create : 表示要创建一个topic

--topic :表示要创建topic 名字为 topic-hello

--bootstrap-server localhost:9092 : 表示连接到kafka服务

上面命令会创建一个名字为  myTopic的主题,默认的分区数量为1,可以通过--partitions 1 参数指定分区数量 ,可以通过 --replication-factor 1 指定副本数量

查看TOPIC

使用 --list 命令查看所有topic

kafka-topics.bat --bootstrap-server localhost:9092 --list

修改分区

使用 --alert 命令修改topic

kafka-topics.bat --bootstrap-server localhost:9092 --alter --topic myTopic --partitions 2

将myTopic主题的分区修改为2

删除TOPIC

使用 --delete 命令删除一个TOIC ,语法如下:

kafka-topics.bat --bootstrap-server localhost:9092 --delete --topic myTopic

生产者发送消息

Kafka 默认提供了脚本工具 kafka-console-producer.bat 可以不断地接收标准输入并将它们发送到 Kafka 的某个 topic用户在控制台终端下启动该命令,输入一行文本数据,然后该脚本将该行文本封装成Kafka 消息发送给指定的 topic .为了使用该脚本工具发送消息,用户需要再打开 个新的终端,执行下列命令:

kafka-console-producer.bat --bootstrap-server localhost:9092 --topic myTopic

上面命令往 topic-hello 这个主题中发送了两行消息 , 如果需要结束生产者客户端的话使用 ctrl+c

消费者消费消息

同样Kafka提供了针对于消费者的脚本, kafka-console-consumer.bat 可以方便的从Kafka中订阅和消费消息,我们打开新的终端执行命令:

kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic myTopic --from-beginning

上面命令是通过客户端:消费topic-hello主题中的消息, --from-beginning 表示获取历史数据,即使消费者宕机了也可以拿到曾经生产者发送的消息 。需要结束消费者者客户端的话使用 ctrl+c

如果是在linux环境中那么就使用 bin目录下的对应脚本即可, 另外 kafka-console-consumer 和 kafka-console-producer 还有很多参数可以指定,如果不加任何参数直接执行他们可以打印各自的帮助文档。

Springboot整合kafka

在Springboot程序运行期间,请确保kafka服务器正常运行。

引入依赖包

在使用springboot的情况下,直接引入下面这个依赖包,则可以使用kafka的相关操作

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>

设置配置文件yml

spring:kafka:bootstrap-servers: localhost:9092producer: # producer 生产者retries: 0 # 重试次数acks: 1 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)batch-size: 16384 # 批量大小buffer-memory: 33554432 # 生产端缓冲区大小key-serializer: org.apache.kafka.common.serialization.StringSerializer#      value-serializer: com.itheima.demo.config.MySerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer
​consumer: # consumer消费者group-id: javagroup # 默认的消费组IDenable-auto-commit: true # 是否自动提交offsetauto-commit-interval: 100  # 提交offset延时(接收到消息后多久提交offset)
​# earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费# latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据# none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常auto-offset-reset: latestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializer#      value-deserializer: com.itheima.demo.config.MyDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer

创建一个主题

我们在启动类上面,定义一个常量,用来表示主题的名称

public static final String TOPIC_NAME = "topic-test";

创建一个TopicConf类,使用@Configuration注解标记

@Configuration
public class TopicConf {
​@Beanpublic NewTopic topicHello(){//构造一个名字为topic-test的主题return TopicBuilder.name(KafkaApplication.TOPIC_NAME).build();}
​
}

生产者-异步消息

首先创建一个HelloProducer类,自动注入KafkaTemplate(和redisTemplate类似),然后使用KafkaTemplate的send方法。

@RestController
public class HelloProducer {
​@Autowiredprivate KafkaTemplate<Object,Object> kafkaTemplate;
​@GetMapping("/send/y/{msg}")public String sendMessage(@PathVariable("msg") String msg) {kafkaTemplate.send(KafkaApplication.TOPIC_NAME, msg);return "发送成功";}
}

生产者-同步消息

因为send方法默认采用异步发送,如果需要同步获取发送结果,则调用get方法。

@RestController
public class HelloProducer {
​@Autowiredprivate KafkaTemplate<Object,Object> kafkaTemplate;
​@GetMapping("/send/t/{msg}")public String sendMsg(@PathVariable("msg")String msg) throws ExecutionException, InterruptedException, TimeoutException {ListenableFuture<SendResult<Object, Object>> future = kafkaTemplate.send(KafkaApplication.TOPIC_NAME, msg);SendResult<Object, Object> result = future.get(3, TimeUnit.SECONDS);System.out.println("发送的消息:"+result.getProducerRecord().value());return "发送成功";}
}

需要特别注意的是: future.get()方法会阻塞,他会一直尝试获取发送结果,如果Kafka迟迟没有返回发送结果那么程序会阻塞到这里。所以这种发送方式是同步的。

当然如果你的消息不重要允许丢失你也可以直接执行 : kafkaTemplate.send ,不调用get()方法获取发送结果,程序就不会阻塞,当然你也就不知道消息到底有没有发送成功。

消费者

Kafka提供了 @KafkaListener注释来接收消息,用法比较简单,我们演示注解方式如下:

@Component
public class HelloConsumer {@KafkaListener(topics = KafkaApplication.TOPIC_NAME)public void onMessage1(ConsumerRecord<?, ?> consumerRecord) {Optional<?> optional = Optional.ofNullable(consumerRecord.value());if (optional.isPresent()) {Object msg = optional.get();System.out.println("message = " + msg);}}
}

或者,直接接受消息

@Component public class HelloConsumer {public class HelloConsumer {@KafkaListener(topics = KafkaApplication.TOPIC_NAME)public void handler(String message){System.out.println("message = " + message);}
}
​
}

推荐使用第一种方式

不出意外已经能够正常的接受消息了

生产者-消费者测试

使用api工具(例如:postman)发送请求,查看控制台的输出

测试链接:http://localhost:8080/send/y/123456

控制台输出是:

测试链接:http://localhost:8080/send/t/hhhhh

控制台输出的是:

另外在kafka的消费者控制台也能看到这两个消息

同步和异步测试

异步只要执行到KafkaTemplate的send方法就能返回结果,无需关心后面的情况,而同步需要等待get方法返回结果

测试步骤:1)启动Springboot之后,然后暂时关闭kafka的服务。2)通过postman发送异步请求和同步请求

异步的:调异步发送的(默认发送接口),请求立刻返回。

同步的:调同步发送:请求被阻断,一直等待,超时后返回错误

注册监听

注册监听的主要作用是用来监听异步发送消息是否发送成功的,因为在采用异步发送消息的时候,不会等待结果返回,就会继续执行后续的代码,所以可以通过注册监听的方式来获得是否发送成功。

1)首先创建一个配置类,用@Configuration注解

2)然后使用Springboot自带的注解@PostConstruct,在构造函数之后完成监听器的初始化

3)重写监听到事件(消息是否发送成功)的方法

public class KafkaListenerConf {
​@AutowiredKafkaTemplate<Object,Object> kafkaTemplate;
​@PostConstructprivate void listener() {kafkaTemplate.setProducerListener(new ProducerListener<Object, Object>() {@Overridepublic void onSuccess(ProducerRecord<Object, Object> producerRecord, RecordMetadata recordMetadata) {System.out.println("成功,message="+ producerRecord.value());}
​@Overridepublic void onError(ProducerRecord<Object, Object> producerRecord, Exception exception) {System.out.println("失败,message="+ producerRecord.value());}});}
​
}

测试URL和结果:http://localhost:8080/send/y/eee

Springboot整合Kafka进阶

前面将了Springboot和Kafka的基本整合,实现了基本的发送消息和接受消息,下面将介绍一些其他的用法

1.序列化

序列化器讲解

我们之前的序列化器是使用Kafka自带的:org.apache.kafka.common.serialization.StringSerializer(有点类似于redis自带的序列化器一样,这里不了解的就不深入讲了)

除了String对应的序列化器之外,还有ByteArray、ByteBuffer、Bytes、Double、Integer、Long 等,这些序列化器都实现了一个父接口:org.apache.kafka.common.serialization.Serializer,下面图片来自源码。

下面图片是实现Serializer接口的子类

正常情况下,这些序列化器已经能够满足绝大多数的业务需求了,但是不排除有个别业务需要进行自己的序列化,所以这里我们仿造源码进行自定义序列化器。

自定义序列化器

自定义序列化器(反序列化器)的步骤是:

1)首先自己创建一个类MySerializer进行序列化(或者MyDeserializer进行反序列化)

2)然后跟源码一样,实现org.apache.kafka.common.serialization.Serializer接口(org.apache.kafka.common.serialization.Deserializer)

3)重写方法serialize()或deserialize()

下面是序列化的类

import com.alibaba.fastjson.JSON;
import org.apache.kafka.common.serialization.Serializer;
​
import java.nio.charset.StandardCharsets;
​
public class MySerializer implements Serializer {
​//重写序列化的方法,就是在发送消息的时候,进行序列化@Overridepublic byte[] serialize(String topic, Object data) {//将数据变成JSON格式的字符串String jsonString = JSON.toJSONString(data);//返回字节数组return jsonString.getBytes(StandardCharsets.UTF_8);}
}

下面是反序列化的类

import com.alibaba.fastjson.JSON;
import org.apache.kafka.common.serialization.Deserializer;
​
import java.io.UnsupportedEncodingException;
​
public class MyDeserializer implements Deserializer {@Overridepublic Object deserialize(String topic, byte[] data) {try {//将字节数据转换为字符串String s = new String(data,"utf-8");//通过JSON解析字符串,并返回相应的对象return JSON.parse(s);} catch (UnsupportedEncodingException e) {e.printStackTrace();}return null;}
}

4)在配置文件中,使用自己的序列化器和反序列化器

再次测试发送消息,成功

2.分区策略

分区策略决定了消息根据key投放到哪个分区,也是顺序消费保障的基石。一个消息根据key不可能去不同的分区,也算是一种负载均衡的感觉。

发送消息时通常情况有下面4种分区策略:

  1. 指定分区号,直接将数据发送到指定的分区里面去

  2. 没有指定分区号,给定数据的key值,通过key取上hashCode进行分区(不重要的ps:类似HashMap存数据的时候)

  3. 既没有给定分区号,也没有给定key值,直接轮循进行分区(默认

  4. 自定义分区,你想怎么做就怎么做

测试Kafka自带的分区规则

@RestController
public class HelloProducer {
​@Autowiredprivate KafkaTemplate<Object,Object> kafkaTemplate;
​//指定分区发送//不管你key是什么,到同一个分区@GetMapping("/send/setPartition/{key}")public void setPartition(@PathVariable("key") String key) {String msg = key + "—指定0号分区";//第二个参数是分区号,这里指定的是0号分区kafkaTemplate.send(KafkaApplication.TOPIC_NAME, 0, key, msg);}
​//指定key发送,不指定分区//根据key做hash,相同的key到同一个分区@GetMapping("/send/setKey/{key}")public void setKey(@PathVariable("key") String key) {String msg = key + "—不指定分区";kafkaTemplate.send(KafkaApplication.TOPIC_NAME, key,msg);}
​
}

对应的消费者代码是

//注解指定消费的分区是0
@KafkaListener(topics = KafkaApplication.TOPIC_NAME,topicPattern = "0")
public void test(ConsumerRecord<?, ?> consumerRecord) {Optional<?> optional = Optional.ofNullable(consumerRecord.value());if (optional.isPresent()) {Object msg = optional.get();System.out.println("分区是0,消息是:"+msg);}
}
//注解指定消费的分区是1
@KafkaListener(topics = KafkaApplication.TOPIC_NAME,topicPattern = "1")
public void test1(ConsumerRecord<?, ?> consumerRecord) {Optional<?> optional = Optional.ofNullable(consumerRecord.value());if (optional.isPresent()) {Object msg = optional.get();System.out.println("分区是1,消息是:"+msg);}
}

1)我们通过postman对setKey发送请求

不指定分区,指定key策略,将通过hash进行分区发送,进行测试

发送的链接是:http://localhost:8080/send/setKey/21 和 http://localhost:8080/send/setKey/12

2)我们通过postman对setPartition发送请求

指定分区,进行测试:不管我们发送什么链接始终在指定的分区

自定义分区规则

我们可以通过一些操作自定义怎么分区,自定义分区的规则。

1)首先是创建类,并且实现Partitioner接口,重写方法编写自己的分区规则

重写partition()方法,这里设置的是只有key值为123的时候进去0区,其余都进入1区

public class MyPartitioner implements Partitioner {@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {System.out.println("进入了自定义分区...");//如果key的值是123则进入0分区,其余进入1分区if ("123".equals(key)){return 0;}return 1;}
​@Overridepublic void close() {
​}
​@Overridepublic void configure(Map<String, ?> configs) {
​}
}

2)通过自定义KafkaTemplate的参数,让自定义分区生效

已经编写了分区的规则,那么怎么让它生效呢?我们之前使用的KafkaTemplate都是没有自己传参的,现在我们通过自己传入参数来更改默认的分区规则。(这里并没有直接使用@bean的方式显示的注入,因为这样会覆盖原有的KafkaTemplate)

@Configuration
public class MyPartitionTemplate {
​
​@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;KafkaTemplate kafkaTemplate;
​@PostConstructpublic void setKafkaTemplate() {Map<Object, Object> props = new HashMap<>();
​//定义server的地址信息props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);//定义key的序列化器props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);//定义value的序列化器props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);//注意分区器在这里!!!//将自己的分区规则传入进去props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class);
​//实例化自定义的kafkaTemplate,以使用自己的序列化方式this.kafkaTemplate = new KafkaTemplate<Object, Object>(new DefaultKafkaProducerFactory(props));}
​//通过这个方法来获得自定义的kafkaTemplate,使MyPartitioner的规则生效public KafkaTemplate getKafkaTemplate(){return kafkaTemplate;}
}

3)测试自定义分区的规则

@Autowired
MyPartitionTemplate template;
​
@GetMapping("/send/myPartitionSend/{key}")
public void MyPartition(@PathVariable("key") String key) {String msg = "key=" + key + "————自定义分区";//调用自定义的templatetemplate.getKafkaTemplate().send(KafkaApplication.TOPIC_NAME, key, msg);
}

手动提交

1)自动提交 前面的案例中,我们设置了以下两个选项,则kafka会按延时设置自动提交

enable-auto-commit: true # 是否自动提交offset
auto-commit-interval: 100 # 提交offset延时(接收到消息后多久提交offset,默认单位为ms)

2)手动提交 有些时候,我们需要手动控制偏移量的提交时机,比如确保消息严格消费后再提交,以防止丢失或重复。通常情况下我们需要在业务处理成功之后手动触发消息的签收,否则可能会出现:消息消费到一半消费者异常,消息并未消费成功但是消息已经自动被确认,也不会再投递给消费者,也就导致消息丢失了。

第一步:添加kafka配置,把 spring.kafka.listener.ack-mode = manual 设置为手动

spring:kafka:listener:ack-mode: manual #手动提交信息

第二步:消费消息的时候,给 方法添加 Acknowledgment 参数用来签收消息

@KafkaListener(topics = KafkaApplication.TOPIC_NAME)
public void handler1(ConsumerRecord<?, ?> consumerRecord, Acknowledgment ack){Object message = consumerRecord.value();System.out.println("message = " + message);//这里是只有消息为test的时候,才会确认提交if ("test".equals(message)){System.out.println("确认收到消息");//确认收到消息ack.acknowledge();}else {System.out.println("模拟发生故障");}
}

注意:如果手动提交模式被打开,一定不要忘记提交。否则会造成重复消费!

Kafka精品教学(入门,安装,Springboot整合Kafka)相关推荐

  1. kafka 安装使用 /springboot整合kafka /消息投递机制以及存储策略 /副本处理机制

    一.背景 1.基本信息 Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写.Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流 ...

  2. SpringBoot整合kafka(实现producer和consumer)

    转载自 SpringBoot整合kafka(实现producer和consumer) 在Windows环境下安装运行Kafka:https://www.jianshu.com/p/d64798e81f ...

  3. springboot 整合 kafka demo 顺便看一下源码

    大家好,我是烤鸭: 今天分享下 springboot 整合 kafka. 1.  环境参数: windows + kafka_2.11-2.3.0 + zookeeper-3.5.6 + spring ...

  4. 搭建kafka集群并使用springboot 整合

    上一篇文章我们已经成功安装了kafka,本文讲解部署kafka集群,并使用springboot整合测试. 设置多 broker 集群 由于只有一台虚拟机,于是通过多个配置文件模拟多台broker 首先 ...

  5. springboot 整合kafka 实现生产,消费数据

    一 kafka集群的启动 1.1 机器说明 192.168.152.128 master 192.168.152.129 slaver01 192.168.152.130 slaver02 1.2 查 ...

  6. SpringBoot整合 ActiveMQ、SpringBoot整合RabbitMQ、SpringBoot整合Kafka

    1.概念:SpringBoot 整合消息服务2.具体内容对于异步消息组件在实际的应用之中会有两类:· JMS:代表作就是 ActiveMQ,但是其性能不高,因为其是用 java 程序实现的:· AMQ ...

  7. SpringBoot整合kafka之kafka分区实战

    本文来说下SpringBoot整合kafka之kafka分区实战 文章目录 准备工作 程序代码 程序测试 本文小结 准备工作 当然我们也可以不手动创建topic,在执行代码kafkaTemplate. ...

  8. SpringBoot整合kafka实战之带回调的生产者

    本文来说下SpringBoot整合kafka部分知识内容 文章目录 带回调的生产者 方式一 方式二 本文小结 带回调的生产者 前面我们说了简单的生产和消费,本文说下带回调的生产者.kafkaTempl ...

  9. Kafka原理以及SpringBoot整合Kafka

    1.Kafka原理 1. brokers有多个broker组成,broker是指Kafka服务器(192.168.223.140就是其中的一个broker),上面三台Kafka服务器组成了Kafka集 ...

最新文章

  1. 快速排序的基本原理及实现
  2. spring源码分析之cache注解
  3. 页面访问的常见错误码解析
  4. Leetcode | Implement strStr()
  5. android rtsp 延时,ijkplayer 单视频流直播延迟问题解决过程
  6. tomcat配置文件context.xml和server.xml分析
  7. 深度学习(四十二)word2vec词向量学习笔记
  8. HDU 1223 还是畅通工程(最小生成树prim模板)
  9. 多点子接口的帧中继配置
  10. 猎人能单拿修理机器人图纸_南京创新周麒麟行:他们为铁路配备“体检”机器人...
  11. MATLAB数组生成、引用
  12. 如何在64位win10中装个win98虚拟机
  13. VS2015安装使用番茄助手Visual Assist
  14. 有没有什么好的生日提醒软件推荐?3款软件让你的生活更有品质
  15. 知我者谓我心忧 不知我者谓我何求
  16. Philosopher’s Walk ICPC 2017 Daejeon F dfs 分治
  17. [转贴]Excel操作技巧大全(微软Office技巧大赛获奖作品)
  18. 《IT学生解惑手册》电子版免费下载!
  19. 小米手机页面显示android,小米手机连接电脑不显示文件怎么办?
  20. 大话卫星导航中的信号处理系列文章——GPS信号L1频点的中频数据生成与验证

热门文章

  1. h5的fetch方法_Javascript window.fetch API
  2. Vue << 拦截器(interceptors ) 过滤器(filter)
  3. 输入一行字符串,假设单词之间由空格隔开,统计其中共有多少单词?
  4. 跟着王进老师学开发Python篇:基础强化案例讲解-王进-专题视频课程
  5. 02. 【Java】语言编程基础
  6. Xsehll连接Linux进入VIm后不能鼠标右键复制
  7. html 学习 常用的html标签及使用
  8. 16种CSS水平垂直居中方法
  9. md5 加盐原理和常用的加盐方法
  10. 无图形启动matlab,MATLAB可以不启动图形界面运行