1. 整合kafka

1、引入依赖

        <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>

2、设置yml文件

spring:application:name: demokafka:bootstrap-servers: 52.82.98.209:10903,52.82.98.209:10904producer: # producer 生产者retries: 0 # 重试次数acks: 1 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)batch-size: 16384 # 批量大小buffer-memory: 33554432 # 生产端缓冲区大小key-serializer: org.apache.kafka.common.serialization.StringSerializer
#      value-serializer: com.itheima.demo.config.MySerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer: # consumer消费者group-id: javagroup # 默认的消费组IDenable-auto-commit: true # 是否自动提交offsetauto-commit-interval: 100  # 提交offset延时(接收到消息后多久提交offset)# earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费# latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据# none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常auto-offset-reset: latestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializer
#      value-deserializer: com.itheima.demo.config.MyDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer

3、启动项目

2. 消息发送

2.1 发送类型

KafkaTemplate调用send时默认采用异步发送,如果需要同步获取发送结果,调用get方法

异步发送生产者:

@RestController
public class KafkaProducer {@Resourceprivate KafkaTemplate<String, Object> kafkaTemplate;@GetMapping("/kafka/test/{msg}")public void sendMessage(@PathVariable("msg") String msg) {Message message = new Message();message.setMessage(msg);kafkaTemplate.send("test", JSON.toJSONString(message));}
}

同步发送生产者:

//测试同步发送与监听
@RestController
public class AsyncProducer {private final static Logger logger = LoggerFactory.getLogger(AsyncProducer.class);@Resourceprivate KafkaTemplate<String, Object> kafkaTemplate;//同步发送@GetMapping("/kafka/sync/{msg}")public void sync(@PathVariable("msg") String msg) throws Exception {Message message = new Message();message.setMessage(msg);ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send("test", JSON.toJSONString(message));//注意,可以设置等待时间,超出后,不再等候结果SendResult<String, Object> result = future.get(3,TimeUnit.SECONDS);logger.info("send result:{}",result.getProducerRecord().value());}}

消费者:

@Component
public class KafkaConsumer {private final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);//不指定group,默认取yml里配置的@KafkaListener(topics = {"test"})public void onMessage1(ConsumerRecord<?, ?> consumerRecord) {Optional<?> optional = Optional.ofNullable(consumerRecord.value());if (optional.isPresent()) {Object msg = optional.get();logger.info("message:{}", msg);}}
}

那么我们怎么看出来同步发送和异步发送的区别呢?

①首先在服务器上,将kafka暂停服务。
②在swagger发送消息

  • 调同步发送:请求被阻断,一直等待,超时后返回错误

  • 而调异步发送的(默认发送接口),请求立刻返回。

那么,异步发送的消息怎么确认发送情况呢?
我们使用注册监听
即新建一个类:KafkaListener.java

@Configuration
public class KafkaListener {private final static Logger logger = LoggerFactory.getLogger(KafkaListener.class);@AutowiredKafkaTemplate kafkaTemplate;//配置监听@PostConstructprivate void listener() {kafkaTemplate.setProducerListener(new ProducerListener<String, Object>() {@Overridepublic void onSuccess(ProducerRecord<String, Object> producerRecord, RecordMetadata recordMetadata) {logger.info("ok,message={}", producerRecord.value());}@Overridepublic void onError(ProducerRecord<String, Object> producerRecord, Exception exception) {logger.error("error!message={}", producerRecord.value());}});}
}

查看控制台,等待一段时间后,异步发送失败的消息会被回调给注册过的listener

如果是正常发送异步消息,则会获得该消息。可以看到,在内部类 KafkaListener$1 中,即注册的Listener的消息。

2.2 序列化

消费者使用:KafkaConsumer.java

@Component
public class KafkaConsumer {private final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);//不指定group,默认取yml里配置的@KafkaListener(topics = {"test"})public void onMessage1(ConsumerRecord<?, ?> consumerRecord) {Optional<?> optional = Optional.ofNullable(consumerRecord.value());if (optional.isPresent()) {Object msg = optional.get();logger.info("message:{}", msg);}}
}

1)序列化详解

  • 前面用到的是Kafka自带的字符串序列化器(org.apache.kafka.common.serialization.StringSerializer
  • 除此之外还有:ByteArray、ByteBuffer、Bytes、Double、Integer、Long 等
  • 这些序列化器都实现了接口(org.apache.kafka.common.serialization.Serializer
  • 基本上,可以满足绝大多数场景

2)自定义序列化
自己实现,实现对应的接口即可,有以下方法:

public interface Serializer<T> extends Closeable {default void configure(Map<String, ?> configs, Boolean isKey) {}//理论上,只实现这个即可正常运行byte[] serialize(String var1, T var2);//默认调上面的方法default byte[] serialize(String topic, Headers headers, T data) {return this.serialize(topic, data);}default void close() {}
}

我们来自己实现一个序列化器:MySerializer.java

public class MySerializer implements Serializer {@Overridepublic byte[] serialize(String s, Object o) {String json = JSON.toJSONString(o);return json.getBytes();}
}

3)解码
MyDeserializer.java,实现方式与编码器几乎一样.

public class MyDeserializer implements Deserializer {private final static Logger logger = LoggerFactory.getLogger(MyDeserializer.class);@Overridepublic Object deserialize(String s, byte[] bytes) {try {String json = new String(bytes,"utf-8");return JSON.parse(json);} catch (UnsupportedEncodingException e) {e.printStackTrace();}return null;}}

4)在yaml中配置自己的编码器、解码器

再次收发,消息正常

2.3 分区策略

分区策略决定了消息根据key投放到哪个分区,也是顺序消费保障的基石。

  • 给定了分区号,直接将数据发送到指定的分区里面去
  • 没有给定分区号,给定数据的key值,通过key取上hashCode进行分区
  • 既没有给定分区号,也没有给定key值,直接轮循进行分区(默认
  • 自定义分区,你想怎么做就怎么做

1)验证默认分区规则
发送者代码参考:PartitionProducer.java

//测试分区发送
@RestController
public class PartitionProducer {@Resourceprivate KafkaTemplate<String, Object> kafkaTemplate;//    指定分区发送
//    不管你key是什么,到同一个分区@GetMapping("/kafka/partitionSend/{key}")public void setPartition(@PathVariable("key") String key) {kafkaTemplate.send("test", 0, key, "key=" + key + ",msg=指定0号分区");}//    指定key发送,不指定分区
//    根据key做hash,相同的key到同一个分区@GetMapping("/kafka/keysend/{key}")public void setKey(@PathVariable("key") String key) {kafkaTemplate.send("test", key, "key=" + key + ",msg=不指定分区");}}

消费者代码使用:PartitionConsumer.java

@Component
public class PartitionConsumer {private final Logger logger = LoggerFactory.getLogger(PartitionConsumer.class);//分区消费@KafkaListener(topics = {"test"},topicPattern = "0")public void onMessage(ConsumerRecord<?, ?> consumerRecord) {Optional<?> optional = Optional.ofNullable(consumerRecord.value());if (optional.isPresent()) {Object msg = optional.get();logger.info("partition=0,message:[{}]", msg);}}@KafkaListener(topics = {"test"},topicPattern = "1")public void onMessage1(ConsumerRecord<?, ?> consumerRecord) {Optional<?> optional = Optional.ofNullable(consumerRecord.value());if (optional.isPresent()) {Object msg = optional.get();logger.info("partition=1,message:[{}]", msg);}}
}

通过swagger访问setKey(也就是只给了key的方法):

可以看到key相同的被hash到了同一个分区

再访问setPartition来设置分区号0来发送:

可以看到无论key是什么,都是分区0来消费

2)自定义分区
参考代码:MyPartitioner.java , MyPartitionTemplate.java。
发送使用:MyPartitionProducer.java。

public class MyPartitioner implements Partitioner {@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {//        定义自己的分区策略
//                如果key以0开头,发到0号分区
//                其他都扔到1号分区String keyStr = key+"";if (keyStr.startsWith("0")){return 0;}else {return 1;}}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> map) {}
}
@Configuration
public class MyPartitionTemplate {private final Logger logger = LoggerFactory.getLogger(this.getClass());@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;KafkaTemplate kafkaTemplate;@PostConstructpublic void setKafkaTemplate() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);//注意分区器在这里!!!props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class);this.kafkaTemplate = new KafkaTemplate<String, String>(new DefaultKafkaProducerFactory<>(props));}public KafkaTemplate getKafkaTemplate(){return kafkaTemplate;}
}
//测试自定义分区发送
@RestController
public class MyPartitionProducer {@AutowiredMyPartitionTemplate template;//    使用0开头和其他任意字母开头的key发送消息
//    看控制台的输出,在哪个分区里?@GetMapping("/kafka/myPartitionSend/{key}")public void setPartition(@PathVariable("key") String key) {template.getKafkaTemplate().send("test", key,"key="+key+",msg=自定义分区策略");}
}

使用swagger,发送0开头和非0开头两种key

3. 消息消费

3.1 消息组别

发送者使用:KafkaProducer.java

@RestController
public class KafkaProducer {@Resourceprivate KafkaTemplate<String, Object> kafkaTemplate;@GetMapping("/kafka/test/{msg}")public void sendMessage(@PathVariable("msg") String msg) {Message message = new Message();message.setMessage(msg);kafkaTemplate.send("test", JSON.toJSONString(message));}
}

1)代码参考:GroupConsumer.java,Listener拷贝3份,分别赋予两组group,验证分组消费:

//测试组消费
@Component
public class GroupConsumer {private final Logger logger = LoggerFactory.getLogger(GroupConsumer.class);//组1,消费者1@KafkaListener(topics = {"test"},groupId = "group1")public void onMessage1(ConsumerRecord<?, ?> consumerRecord) {Optional<?> optional = Optional.ofNullable(consumerRecord.value());if (optional.isPresent()) {Object msg = optional.get();logger.info("group:group1-1 , message:{}", msg);}}//组1,消费者2@KafkaListener(topics = {"test"},groupId = "group1")public void onMessage2(ConsumerRecord<?, ?> consumerRecord) {Optional<?> optional = Optional.ofNullable(consumerRecord.value());if (optional.isPresent()) {Object msg = optional.get();logger.info("group:group1-2 , message:{}", msg);}}//组2,只有一个消费者@KafkaListener(topics = {"test"},groupId = "group2")public void onMessage3(ConsumerRecord<?, ?> consumerRecord) {Optional<?> optional = Optional.ofNullable(consumerRecord.value());if (optional.isPresent()) {Object msg = optional.get();logger.info("group:group2 , message:{}", msg);}}
}

2)启动

3)通过swagger发送2条消息

  • 同一group下的两个消费者,在group1均分消息
  • group2下只有一个消费者,得到全部消息

4)消费端闲置
注意分区数与消费者数的搭配,如果 ( 消费者数 > 分区数量 ),将会出现消费者闲置(因为一个分区只能分配给一个消费者),浪费资源!

验证方式:
停掉项目,删掉test主题,重新建一个 ,这次只给它分配一个分区。
重新发送两条消息,试一试

  • group2可以消费到1、2两条消息
  • group1下有两个消费者,但是只分配给了 1 , 2这个进程被闲置

3.2 位移提交

1)自动提交
前面的案例中,我们设置了以下两个选项,则kafka会按延时设置自动提交

enable-auto-commit: true # 是否自动提交offset
auto-commit-interval: 100 # 提交offset延时(接收到消息后多久提交offset,默认单位为ms)

2)手动提交
有些时候,我们需要手动控制偏移量的提交时机,比如确保消息严格消费后再提交,以防止丢失或重复。
下面我们自己定义配置,覆盖上面的参数
代码参考:MyOffsetConfig.java

@Configuration
public class MyOffsetConfig {private final Logger logger = LoggerFactory.getLogger(this.getClass());@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Beanpublic KafkaListenerContainerFactory<?> manualKafkaListenerContainerFactory() {Map<String, Object> configProps = new HashMap<>();configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);// 注意这里!!!设置手动提交configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");ConcurrentKafkaListenerContainerFactory<String, String> factory =new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(configProps));// ack模式://          AckMode针对ENABLE_AUTO_COMMIT_CONFIG=false时生效,有以下几种:////          RECORD//          每处理一条commit一次////          BATCH(默认)//          每次poll的时候批量提交一次,频率取决于每次poll的调用频率////          TIME//          每次间隔ackTime的时间去commit(跟auto commit interval有什么区别呢?)////          COUNT//          累积达到ackCount次的ack去commit////          COUNT_TIME//          ackTime或ackCount哪个条件先满足,就commit////          MANUAL//          listener负责ack,但是背后也是批量上去////          MANUAL_IMMEDIATE//          listner负责ack,每调用一次,就立即commitfactory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);return factory;}}

然后通过在消费端的Consumer来提交偏移量
MyOffsetConsumer:

@Component
public class MyOffsetConsumer {private final Logger logger = LoggerFactory.getLogger(this.getClass());@KafkaListener(topics = "test", groupId = "myoffset-group-1", containerFactory = "manualKafkaListenerContainerFactory")public void manualCommit(@Payload String message,@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,Consumer consumer,Acknowledgment ack) {logger.info("手动提交偏移量 , partition={}, msg={}", partition, message);// 同步提交consumer.commitSync();//异步提交//consumer.commitAsync();// ack提交也可以,会按设置的ack策略走(参考MyOffsetConfig.java里的ack模式)// ack.acknowledge();}@KafkaListener(topics = "test", groupId = "myoffset-group-2", containerFactory = "manualKafkaListenerContainerFactory")public void noCommit(@Payload String message,@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,Consumer consumer,Acknowledgment ack) {logger.info("忘记提交偏移量, partition={}, msg={}", partition, message);// 不做commit!}/*** 现实状况:* commitSync和commitAsync组合使用* <p>* 手工提交异步 consumer.commitAsync();* 手工同步提交 consumer.commitSync()* <p>* commitSync()方法提交最后一个偏移量。在成功提交或碰到无怯恢复的错误之前,* commitSync()会一直重试,但是commitAsync()不会。* <p>* 一般情况下,针对偶尔出现的提交失败,不进行重试不会有太大问题* 因为如果提交失败是因为临时问题导致的,那么后续的提交总会有成功的。* 但如果这是发生在关闭消费者或再均衡前的最后一次提交,就要确保能够提交成功。否则就会造成重复消费* 因此,在消费者关闭前一般会组合使用commitAsync()和commitSync()。*/
//   @KafkaListener(topics = "test", groupId = "myoffset-group-3",containerFactory = "manualKafkaListenerContainerFactory")public void manualOffset(@Payload String message,@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,Consumer consumer,Acknowledgment ack) {try {logger.info("同步异步搭配 , partition={}, msg={}", partition, message);//先异步提交consumer.commitAsync();//继续做别的事} catch (Exception e) {System.out.println("commit failed");} finally {try {consumer.commitSync();} finally {consumer.close();}}}/*** 甚至可以手动提交,指定任意位置的偏移量* 不推荐日常使用!!!*/
//    @KafkaListener(topics = "test", groupId = "myoffset-group-4",containerFactory = "manualKafkaListenerContainerFactory")public void offset(ConsumerRecord record, Consumer consumer) {logger.info("手动指定任意偏移量, partition={}, msg={}", record.partition(), record);Map<TopicPartition, OffsetAndMetadata> currentOffset = new HashMap<>();currentOffset.put(new TopicPartition(record.topic(), record.partition()),new OffsetAndMetadata(record.offset() + 1));consumer.commitSync(currentOffset);}}

3)重复消费问题
如果手动提交模式被打开,一定不要忘记提交偏移量。否则会造成重复消费!

用km将test主题删除,新建一个test空主题。方便观察消息偏移 注释掉其他Consumer的Component注解,只保留当前MyOffsetConsumer.java 启动项目,使用swagger的KafkaProducer发送连续几条消息 留心控制台,都能消费,没问题:

但是!重启项目:

无论重启多少次,不提交偏移量的消费组,会重复消费一遍!!!

再通过命令行查询偏移量

4)经验与总结
commitSync()方法,即同步提交,会提交最后一个偏移量。在成功提交或碰到无怯恢复的错误之前,commitSync()会一直重试,但是commitAsync()不会。

这就造成一个陷阱:
如果异步提交,针对偶尔出现的提交失败,不进行重试不会有太大问题,因为如果提交失败是因为临时问题导致的,那么后续的提交总会有成功的。只要成功一次,偏移量就会提交上去。

但是!如果这是发生在关闭消费者时的最后一次提交,就要确保能够提交成功,如果还没提交完就停掉了进程。就会造成重复消费!

因此,在消费者关闭前一般会组合使用commitAsync()和commitSync()。
详细代码参考:MyOffsetConsumer.manualOffset()

Springboot整合kafka相关推荐

  1. SpringBoot整合kafka(实现producer和consumer)

    转载自 SpringBoot整合kafka(实现producer和consumer) 在Windows环境下安装运行Kafka:https://www.jianshu.com/p/d64798e81f ...

  2. springboot 整合 kafka demo 顺便看一下源码

    大家好,我是烤鸭: 今天分享下 springboot 整合 kafka. 1.  环境参数: windows + kafka_2.11-2.3.0 + zookeeper-3.5.6 + spring ...

  3. SpringBoot整合kafka之kafka分区实战

    本文来说下SpringBoot整合kafka之kafka分区实战 文章目录 准备工作 程序代码 程序测试 本文小结 准备工作 当然我们也可以不手动创建topic,在执行代码kafkaTemplate. ...

  4. SpringBoot整合kafka实战之带回调的生产者

    本文来说下SpringBoot整合kafka部分知识内容 文章目录 带回调的生产者 方式一 方式二 本文小结 带回调的生产者 前面我们说了简单的生产和消费,本文说下带回调的生产者.kafkaTempl ...

  5. springboot 整合kafka 实现生产,消费数据

    一 kafka集群的启动 1.1 机器说明 192.168.152.128 master 192.168.152.129 slaver01 192.168.152.130 slaver02 1.2 查 ...

  6. Kafka精品教学(入门,安装,Springboot整合Kafka)

    ps:本文是博主结合视频和博客学习之后,自己实验总结编写的,如果侵权请联系删除. 要学习kafka首先要了解什么是消息队列,因为Kafka 是一个分布式的基于发布 / 订阅模式的消息队列(Messag ...

  7. Kafka原理以及SpringBoot整合Kafka

    1.Kafka原理 1. brokers有多个broker组成,broker是指Kafka服务器(192.168.223.140就是其中的一个broker),上面三台Kafka服务器组成了Kafka集 ...

  8. kafka 安装使用 /springboot整合kafka /消息投递机制以及存储策略 /副本处理机制

    一.背景 1.基本信息 Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写.Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流 ...

  9. SpringBoot整合 ActiveMQ、SpringBoot整合RabbitMQ、SpringBoot整合Kafka

    1.概念:SpringBoot 整合消息服务2.具体内容对于异步消息组件在实际的应用之中会有两类:· JMS:代表作就是 ActiveMQ,但是其性能不高,因为其是用 java 程序实现的:· AMQ ...

  10. SpringBoot整合kafka(安装)

    项目路径:https://github.com/zhaopeng01/springboot-study/tree/master/study_14 序言 Kafka 是一种高吞吐的分布式发布订阅消息系统 ...

最新文章

  1. JVM内存管理学习总结(一)
  2. CDH5之Unexpected error.Unable to verify database connection
  3. 我与TCP连接不得不说的故事
  4. java api接口签名验证失败_cryptapi结合java进行数字签名与验证签名的困惑
  5. 使用you-get下载blbl视频
  6. php京东接口开发,技术文档
  7. mac电脑循环次数多少算新_在Mac上处理不同事务,这些软件必不可少,个个精品...
  8. 混动汽车HEV混合驱动的MPC控制
  9. 2018年中学计算机考试,重要!2018年济南市初中信息技术考试相关说明!
  10. matlab解超越函数,矩阵的超越函数Matlab提供的矩阵函数.PPT
  11. APP支付宝提现和微信提现之服务端接入
  12. 表格识别综述与相关实战
  13. CSS基础五(盒模型)
  14. 计算机换内存条解决方案
  15. GameMaker如何导入JSON文件
  16. 【c++入门(2)】贪心训练
  17. Flutter 制霸全平台?这事儿我看有戏。
  18. java将汉字转成拼音首字母大写字母_Java 将汉字转换为拼音并取首字母大写
  19. 一零二九、scalac: Token not found: C:\Users\Tuomasi\AppData\Local\JetBrains\IdeaIC2021
  20. 【Linux】概述(Unix和Linux的关系)

热门文章

  1. 阿里巴巴前端实习面经总结(可内推)
  2. 计算机丢失MSVCR71.dll处理方法
  3. win10卓越性能模式开启方法
  4. (XWZ)的python学习笔记Ⅳ——错误、调试和测试
  5. java png图片转换成jpg_Java实现将png格式图片转换成jpg格式图片的方法【测试可用】...
  6. google账号已停用(已解决)
  7. postgres 命令行建数据库表_PostgreSQL 创建表格
  8. that、this、these、those的区别
  9. c语言中until的用法,until的用法总结
  10. latex----目录格式设置