目录

至少一次(at least once)

最多一次(at most once)

精确一次(exactly once)

幂等性

幂等性作用范围

实现方法

代码

事务

事务作用范围

实现方法

代码


我们知道Kafka的消息交付可靠性保障分为 最多一次(at most once),至少一次(at least once),精确一次(exactly once)

至少一次(at least once)

什么时候Producer数据会重复发送 呢?

比如当Producer发送一条数据,当数据发送过去了,由于某种原因Broker没有反馈给Producer已经提交成功,Producer此时设置了重试机制,retries (设置方法:props.put(ProducerConfig.RETRIES_CONFIG, 5); ),则会再次发送数据,此时会导致数据重复发送

最多一次(at most once)

与at least once 相反,我们把retries 禁止,则就是最多一次,如果禁止重试,会导致数据丢失

精确一次(exactly once)

如何实现精确一次呢

Producer 有两种方法 幂等性与事务型

幂等性

幂等性作用范围

只能保证单个Producer不会产生重复数据,如果Producer重启或者多个Producer无法保证数据不重复

实现方法

设置一下配置即可

props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true)

代码


import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import wiki.hadoop.kafka.config.Constant;import java.util.Properties;
import java.util.concurrent.ExecutionException;/*** 幂等性生产者**      它只能保证单分区上的幂等性,即一个幂等性 Producer 能够保证某个主题的一个 分区上不出现重复消息,它无法实现多个分区的幂等性*      它只能实现单会话上的幂等性,不能实现跨会话的幂等性。这里的会话,你可以理 解为 Producer 进程的一次运行。当你重启了 Producer 进程之后,这种幂等性保 证就丧失了* @author jast* @date 2020/4/19 22:38*/
public class IdempotenceProducer {private  static Producer<String, String> producer ;public IdempotenceProducer() {Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Constant.KAFKA_BROKERS);props.put(ProducerConfig.ACKS_CONFIG, "all");props.put(ProducerConfig.RETRIES_CONFIG, 5);props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);props.put(ProducerConfig.LINGER_MS_CONFIG, 1);props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1024 * 1024 * 1024);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");//设置Producer幂等性,其他不用变化props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);producer = new KafkaProducer<String, String>(props);}public Producer<String,String> getProducer(){return producer;}public static void main(String[] args) throws ExecutionException, InterruptedException {IdempotenceProducer idempotenceProducer = new IdempotenceProducer();Producer<String, String> producer = idempotenceProducer.getProducer();producer.send(new ProducerRecord<String,String>("test","1234")).get();}}

事务

事务作用范围

全部

实现方法

Producer设置//设置Producer幂等性,其他不用变化
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);
//设置事务,同时也要指定幂等性,自定义id名称
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"jast-acid");-------------------------------------------------------------------Consumer设置//设置只读事务提交成功后的数据props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase());

代码

Producer

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import wiki.hadoop.kafka.config.Constant;import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;/*** Kafka事务提交,保证exactly once producer* 要么全部成功,要么全部失败* @author jast* @date 2020/4/21 22:38*/
public class TransactionProducer {private  static Producer<String, String> producer ;public TransactionProducer() {Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Constant.KAFKA_BROKERS);props.put(ProducerConfig.ACKS_CONFIG, "all");props.put(ProducerConfig.RETRIES_CONFIG, 5);props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);props.put(ProducerConfig.LINGER_MS_CONFIG, 1);props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1024 * 1024 * 1024);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");//设置Producer幂等性,其他不用变化props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);//设置事务,同时也要指定幂等性,自定义id名称props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"jast-acid");producer = new KafkaProducer<String, String>(props);}public Producer<String,String> getProducer(){return producer;}public static void main(String[] args) throws ExecutionException, InterruptedException {TransactionProducer transactionProducer = new TransactionProducer();Producer<String, String> producer = transactionProducer.getProducer();//初始化事务producer.initTransactions();boolean flag = true;//循环四次,最后一次我们把事务成功提交//理想结果:前三次事务提交失败//  事务消费者消费不到数据1,2,第四次可以消费到1,2,3,4;//  普通消费者可以消费到前三次的1,2 ,也可以消费到第四次1,2,3,4// 运行方法 TransactionConsumer/*** 结果如下,事务提交成功* 普通消费者消费数据->1 partition:2 offset:3080713* 事务消费者消费数据->3 partition:2 offset:3080717* 普通消费者消费数据->2 partition:1 offset:3081410* 普通消费者消费数据->1 partition:3 offset:3081465* 普通消费者消费数据->1 partition:2 offset:3080715* 普通消费者消费数据->3 partition:2 offset:3080717* 事务消费者消费数据->4 partition:1 offset:3081414* 事务消费者消费数据->2 partition:0 offset:3081470* 事务消费者消费数据->1 partition:3 offset:3081467* 普通消费者消费数据->2 partition:1 offset:3081412* 普通消费者消费数据->4 partition:1 offset:3081414* 普通消费者消费数据->2 partition:0 offset:3081468* 普通消费者消费数据->2 partition:0 offset:3081470* 普通消费者消费数据->1 partition:3 offset:3081467*/for(int i=0;i<=3;i++) {if(i==3)flag = false;try {//事务开始producer.beginTransaction();producer.send(new ProducerRecord<String, String>("test", "1")).get();producer.send(new ProducerRecord<String, String>("test", "2")).get();//手动制造异常if (flag)throw new RuntimeException("程序异常");producer.send(new ProducerRecord<String, String>("test", "3")).get();producer.send(new ProducerRecord<String, String>("test", "4")).get();//事务提交producer.commitTransaction();} catch (Exception e) {//中止事务producer.abortTransaction();e.printStackTrace();}}}
}

Consumer


import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.requests.IsolationLevel;
import org.apache.kafka.common.serialization.StringDeserializer;
import wiki.hadoop.kafka.config.Constant;
import wiki.hadoop.kafka.util.LogInit;import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;/*** 消费Kafka,保证事务性* @author jast* @date 2020/4/21 22:54*/
public class TransactionConsumer {/*** 事务性kafka消费* @return KafkaConsumer<String,String>* @param topic* @param max_poll_records* @param group* @return*/public KafkaConsumer<String, String> transactionConsumer(String topic, String group , int max_poll_records , boolean isLatest) {Properties props = new Properties();//-----------------------------------------------------------------------------------//设置只读事务提交成功后的数据props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase());//-----------------------------------------------------------------------------------props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Constant.KAFKA_BROKERS);props.put(ConsumerConfig.GROUP_ID_CONFIG, group);props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, max_poll_records);//控制每次poll的数量props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);//自动提交 falseprops.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 3000);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, isLatest==true ? "latest" : "earliest");props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 5 * 1024 * 1024);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);consumer.subscribe(Arrays.asList(topic));return consumer;}public KafkaConsumer<String, String> consumer(String topic, String group , int max_poll_records , boolean isLatest) {Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Constant.KAFKA_BROKERS);props.put(ConsumerConfig.GROUP_ID_CONFIG, group);props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, max_poll_records);//控制每次poll的数量props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);//自动提交 falseprops.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 3000);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, isLatest==true ? "latest" : "earliest");props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 5 * 1024 * 1024);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);consumer.subscribe(Arrays.asList(topic));return consumer;}public static void main(String[] args) throws InterruptedException, ExecutionException {TransactionConsumer transactionConsumer = new TransactionConsumer();TransactionConsumer transactionConsumer2 = new TransactionConsumer();KafkaConsumer<String, String> consumer = transactionConsumer.consumer("test", "test", 10, false);KafkaConsumer<String, String> consumer2 = transactionConsumer2.transactionConsumer("test", "test2", 10, false);CompletableFuture.runAsync(()->{while(true) {ConsumerRecords<String, String> records = consumer.poll(1000);for (ConsumerRecord<String, String> record : records) {System.out.println("普通消费者消费数据->" + record.value() + " partition:"+record.partition()+ " offset:"+record.offset());}
//                System.out.println("普通消费者休眠1秒");try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}}});CompletableFuture.runAsync(()->{while(true) {ConsumerRecords<String, String> records2 = consumer2.poll(1000);for (ConsumerRecord<String, String> record : records2) {System.out.println("事务消费者消费数据->" + record.value() + " partition:"+record.partition()+ " offset:"+record.offset());}
//                System.out.println("事务消费者休眠1秒");try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}}}).get();}
}

Kafka开发指南之 如何Kafka 事务型生产者,保证生产者exactly once相关推荐

  1. Apache Kafka开发入门指南

    Apache Kafka开发入门指南 作者:chszs,转载需注明.博客主页: http://blog.csdn.net/chszs Apache Kafka可以帮助你解决在发布/订阅架构中遇到消费数 ...

  2. 【Kafka】《Kafka权威指南》入门

    发布与订阅消息系统 在正式讨论Apache Kafka (以下简称Kafka)之前,先来了解发布与订阅消息系统的概念, 并认识这个系统的重要性.数据(消息)的发送者(发布者)不会直接把消息发送给接收 ...

  3. 通俗易懂 !Kafka 开发快速入门看这篇就够了

    写在前面:我是「云祁」,一枚热爱技术.会写诗的大数据开发猿.昵称来源于王安石诗中一句 [ 云之祁祁,或雨于渊 ] ,甚是喜欢.写博客一方面是对自己学习的一点点总结及记录,另一方面则是希望能够帮助更多对 ...

  4. 送5本《Kafka权威指南》第二版

    文末送书 科学家们每一次发生分歧都是因为掌握的数据不够充分.所以,我们可以先就获取哪一类数据达成一致,只要获取了数据,问题也就迎刃而解了.要么我是对的,要么你是对的,要么我们都是错的,然后继续. -- ...

  5. Kafka实践指南:快速掌握部署使用与常用命令

    Kafka部署使用 Kafka部署使用 Kafka定义和特性 Kafka架构和组成部分 Kafka工作原理和消息传递过程 Kafka安装与配置 安装Kafka 配置Kafka集群 Kafka的主题和分 ...

  6. 如何使用Kafka可靠地发送消息-《Kafka权威指南(第二版)》阅读笔记

    可靠性是系统而不是某个独立组件的一个属性,所以,在讨论Kafka的可靠性保证时,需要从系统的整体出发.说到可靠性,那些与Kafka集成的系统与Kafka本身一样重要.正因为可靠性是系统层面的概念,所以 ...

  7. Java开发 - 消息队列之Kafka初体验

    目录 前言 Kafka 什么是Kafka Kafka软件结构 Kafka的特点 怎么启动Kafka 下载Kafka 配置Kafka Zookeeper 启动Kafka Kafka案例 添加依赖 添加配 ...

  8. Linux搭建Kafka开发环境

    Linux搭建Kafka开发环境 Kafka是最初由Linkedin公司开发,是一个分布式.支持分区的(partition).多副本的(replica),基于zookeeper协调的分布式消息系统,它 ...

  9. 2021年大数据Kafka(十):kafka生产者数据分发策略

    全网最详细的大数据Kafka文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 生产者数据分发策略 策略一:用户指定了partition 策 ...

最新文章

  1. 轻松应对Java试题,这是一份大数据分析工程师面试指南
  2. C# Dictionary.Add(key,123) 与 Dictionary[key]=123的区别
  3. Cocoa之NSWindow常用总结
  4. Migration Necessary
  5. java InputStream的使用
  6. Java——n个数的全排列
  7. Leetcode--424. 替换后的最长重复字符
  8. 异常将上下文初始化事件发送到类的侦听器实例._Java CLassLoader类加载器详解,一点课堂(多岸学院)...
  9. html5制作拼图游戏教程,用HTML5制作视频拼图的教程
  10. 计算机网络交换机组网及虚拟局域网实验报告,计算机网络实验报告材料(虚拟局域网).doc...
  11. 打开access文件 提示文件名无效_分享在PS软件打开图像时提示无效的JPEG的解决方法...
  12. Windows系统结构
  13. 计算机网络入侵参考文献,入侵计算机网络论文,关于计算机网络入侵检测相关参考文献资料-免费论文范文...
  14. 计算机音乐有哪些优势,谈谈用电脑听无损音乐的好处
  15. ubuntu 16.04安装网易云音乐,没声音?
  16. 【QNX Hypervisor 2.2 用户手册】1.5 内存
  17. 混凝土与水泥制品行业运行分析
  18. 关于POS操作的一些文章收集链接
  19. Data Binding的报错集合 例如Error 10 54 错误 程序包com kodulf recycl
  20. Nginx常用配置及代理转发

热门文章

  1. servlet如何使用session把用户的手机号修改_SpringBoot源码学习系列之嵌入式Servlet容器...
  2. 一元三次方程重根判别式_如何求一元三次方程
  3. 邢台职业技术学院计算机系宿舍,邢台职业技术学院宿舍条件怎么样 男生女生宿舍图片...
  4. wind 下装mysql,windows 下安装MySQL
  5. springcloud 网关_Spring Cloud 系列之 Netflix Zuul 服务网关(二)
  6. 【学习笔记】第一章——操作系统的概念、功能、特征、发展分类(系统调用、并发共享虚拟异步)
  7. python抽荣耀水晶_教你2种免费拿荣耀水晶的方法,获奖概率让人惊喜,一般人我不告诉他...
  8. android ui自动化框架选型,Appium UI 自动化框架之我见 (开源)
  9. linux curl 编译命令,linux 编译 curl 出错
  10. 一键装机linux_(推荐)linux用一键安装包