Kafka开发指南之 如何Kafka 事务型生产者,保证生产者exactly once
目录
至少一次(at least once)
最多一次(at most once)
精确一次(exactly once)
幂等性
幂等性作用范围
实现方法
代码
事务
事务作用范围
实现方法
代码
我们知道Kafka的消息交付可靠性保障分为 最多一次(at most once),至少一次(at least once),精确一次(exactly once)
至少一次(at least once)
什么时候Producer数据会重复发送 呢?
比如当Producer发送一条数据,当数据发送过去了,由于某种原因Broker没有反馈给Producer已经提交成功,Producer此时设置了重试机制,retries (设置方法:props.put(ProducerConfig.RETRIES_CONFIG, 5); ),则会再次发送数据,此时会导致数据重复发送
最多一次(at most once)
与at least once 相反,我们把retries 禁止,则就是最多一次,如果禁止重试,会导致数据丢失
精确一次(exactly once)
如何实现精确一次呢
Producer 有两种方法 幂等性与事务型
幂等性
幂等性作用范围
只能保证单个Producer不会产生重复数据,如果Producer重启或者多个Producer无法保证数据不重复
实现方法
设置一下配置即可
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true)
代码
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import wiki.hadoop.kafka.config.Constant;import java.util.Properties;
import java.util.concurrent.ExecutionException;/*** 幂等性生产者** 它只能保证单分区上的幂等性,即一个幂等性 Producer 能够保证某个主题的一个 分区上不出现重复消息,它无法实现多个分区的幂等性* 它只能实现单会话上的幂等性,不能实现跨会话的幂等性。这里的会话,你可以理 解为 Producer 进程的一次运行。当你重启了 Producer 进程之后,这种幂等性保 证就丧失了* @author jast* @date 2020/4/19 22:38*/
public class IdempotenceProducer {private static Producer<String, String> producer ;public IdempotenceProducer() {Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Constant.KAFKA_BROKERS);props.put(ProducerConfig.ACKS_CONFIG, "all");props.put(ProducerConfig.RETRIES_CONFIG, 5);props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);props.put(ProducerConfig.LINGER_MS_CONFIG, 1);props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1024 * 1024 * 1024);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");//设置Producer幂等性,其他不用变化props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);producer = new KafkaProducer<String, String>(props);}public Producer<String,String> getProducer(){return producer;}public static void main(String[] args) throws ExecutionException, InterruptedException {IdempotenceProducer idempotenceProducer = new IdempotenceProducer();Producer<String, String> producer = idempotenceProducer.getProducer();producer.send(new ProducerRecord<String,String>("test","1234")).get();}}
事务
事务作用范围
全部
实现方法
Producer设置//设置Producer幂等性,其他不用变化
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);
//设置事务,同时也要指定幂等性,自定义id名称
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"jast-acid");-------------------------------------------------------------------Consumer设置//设置只读事务提交成功后的数据props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase());
代码
Producer
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import wiki.hadoop.kafka.config.Constant;import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;/*** Kafka事务提交,保证exactly once producer* 要么全部成功,要么全部失败* @author jast* @date 2020/4/21 22:38*/
public class TransactionProducer {private static Producer<String, String> producer ;public TransactionProducer() {Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Constant.KAFKA_BROKERS);props.put(ProducerConfig.ACKS_CONFIG, "all");props.put(ProducerConfig.RETRIES_CONFIG, 5);props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);props.put(ProducerConfig.LINGER_MS_CONFIG, 1);props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1024 * 1024 * 1024);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");//设置Producer幂等性,其他不用变化props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);//设置事务,同时也要指定幂等性,自定义id名称props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"jast-acid");producer = new KafkaProducer<String, String>(props);}public Producer<String,String> getProducer(){return producer;}public static void main(String[] args) throws ExecutionException, InterruptedException {TransactionProducer transactionProducer = new TransactionProducer();Producer<String, String> producer = transactionProducer.getProducer();//初始化事务producer.initTransactions();boolean flag = true;//循环四次,最后一次我们把事务成功提交//理想结果:前三次事务提交失败// 事务消费者消费不到数据1,2,第四次可以消费到1,2,3,4;// 普通消费者可以消费到前三次的1,2 ,也可以消费到第四次1,2,3,4// 运行方法 TransactionConsumer/*** 结果如下,事务提交成功* 普通消费者消费数据->1 partition:2 offset:3080713* 事务消费者消费数据->3 partition:2 offset:3080717* 普通消费者消费数据->2 partition:1 offset:3081410* 普通消费者消费数据->1 partition:3 offset:3081465* 普通消费者消费数据->1 partition:2 offset:3080715* 普通消费者消费数据->3 partition:2 offset:3080717* 事务消费者消费数据->4 partition:1 offset:3081414* 事务消费者消费数据->2 partition:0 offset:3081470* 事务消费者消费数据->1 partition:3 offset:3081467* 普通消费者消费数据->2 partition:1 offset:3081412* 普通消费者消费数据->4 partition:1 offset:3081414* 普通消费者消费数据->2 partition:0 offset:3081468* 普通消费者消费数据->2 partition:0 offset:3081470* 普通消费者消费数据->1 partition:3 offset:3081467*/for(int i=0;i<=3;i++) {if(i==3)flag = false;try {//事务开始producer.beginTransaction();producer.send(new ProducerRecord<String, String>("test", "1")).get();producer.send(new ProducerRecord<String, String>("test", "2")).get();//手动制造异常if (flag)throw new RuntimeException("程序异常");producer.send(new ProducerRecord<String, String>("test", "3")).get();producer.send(new ProducerRecord<String, String>("test", "4")).get();//事务提交producer.commitTransaction();} catch (Exception e) {//中止事务producer.abortTransaction();e.printStackTrace();}}}
}
Consumer
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.requests.IsolationLevel;
import org.apache.kafka.common.serialization.StringDeserializer;
import wiki.hadoop.kafka.config.Constant;
import wiki.hadoop.kafka.util.LogInit;import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;/*** 消费Kafka,保证事务性* @author jast* @date 2020/4/21 22:54*/
public class TransactionConsumer {/*** 事务性kafka消费* @return KafkaConsumer<String,String>* @param topic* @param max_poll_records* @param group* @return*/public KafkaConsumer<String, String> transactionConsumer(String topic, String group , int max_poll_records , boolean isLatest) {Properties props = new Properties();//-----------------------------------------------------------------------------------//设置只读事务提交成功后的数据props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase());//-----------------------------------------------------------------------------------props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Constant.KAFKA_BROKERS);props.put(ConsumerConfig.GROUP_ID_CONFIG, group);props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, max_poll_records);//控制每次poll的数量props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);//自动提交 falseprops.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 3000);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, isLatest==true ? "latest" : "earliest");props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 5 * 1024 * 1024);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);consumer.subscribe(Arrays.asList(topic));return consumer;}public KafkaConsumer<String, String> consumer(String topic, String group , int max_poll_records , boolean isLatest) {Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Constant.KAFKA_BROKERS);props.put(ConsumerConfig.GROUP_ID_CONFIG, group);props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, max_poll_records);//控制每次poll的数量props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);//自动提交 falseprops.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 3000);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, isLatest==true ? "latest" : "earliest");props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 5 * 1024 * 1024);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);consumer.subscribe(Arrays.asList(topic));return consumer;}public static void main(String[] args) throws InterruptedException, ExecutionException {TransactionConsumer transactionConsumer = new TransactionConsumer();TransactionConsumer transactionConsumer2 = new TransactionConsumer();KafkaConsumer<String, String> consumer = transactionConsumer.consumer("test", "test", 10, false);KafkaConsumer<String, String> consumer2 = transactionConsumer2.transactionConsumer("test", "test2", 10, false);CompletableFuture.runAsync(()->{while(true) {ConsumerRecords<String, String> records = consumer.poll(1000);for (ConsumerRecord<String, String> record : records) {System.out.println("普通消费者消费数据->" + record.value() + " partition:"+record.partition()+ " offset:"+record.offset());}
// System.out.println("普通消费者休眠1秒");try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}}});CompletableFuture.runAsync(()->{while(true) {ConsumerRecords<String, String> records2 = consumer2.poll(1000);for (ConsumerRecord<String, String> record : records2) {System.out.println("事务消费者消费数据->" + record.value() + " partition:"+record.partition()+ " offset:"+record.offset());}
// System.out.println("事务消费者休眠1秒");try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}}}).get();}
}
Kafka开发指南之 如何Kafka 事务型生产者,保证生产者exactly once相关推荐
- Apache Kafka开发入门指南
Apache Kafka开发入门指南 作者:chszs,转载需注明.博客主页: http://blog.csdn.net/chszs Apache Kafka可以帮助你解决在发布/订阅架构中遇到消费数 ...
- 【Kafka】《Kafka权威指南》入门
发布与订阅消息系统 在正式讨论Apache Kafka (以下简称Kafka)之前,先来了解发布与订阅消息系统的概念, 并认识这个系统的重要性.数据(消息)的发送者(发布者)不会直接把消息发送给接收 ...
- 通俗易懂 !Kafka 开发快速入门看这篇就够了
写在前面:我是「云祁」,一枚热爱技术.会写诗的大数据开发猿.昵称来源于王安石诗中一句 [ 云之祁祁,或雨于渊 ] ,甚是喜欢.写博客一方面是对自己学习的一点点总结及记录,另一方面则是希望能够帮助更多对 ...
- 送5本《Kafka权威指南》第二版
文末送书 科学家们每一次发生分歧都是因为掌握的数据不够充分.所以,我们可以先就获取哪一类数据达成一致,只要获取了数据,问题也就迎刃而解了.要么我是对的,要么你是对的,要么我们都是错的,然后继续. -- ...
- Kafka实践指南:快速掌握部署使用与常用命令
Kafka部署使用 Kafka部署使用 Kafka定义和特性 Kafka架构和组成部分 Kafka工作原理和消息传递过程 Kafka安装与配置 安装Kafka 配置Kafka集群 Kafka的主题和分 ...
- 如何使用Kafka可靠地发送消息-《Kafka权威指南(第二版)》阅读笔记
可靠性是系统而不是某个独立组件的一个属性,所以,在讨论Kafka的可靠性保证时,需要从系统的整体出发.说到可靠性,那些与Kafka集成的系统与Kafka本身一样重要.正因为可靠性是系统层面的概念,所以 ...
- Java开发 - 消息队列之Kafka初体验
目录 前言 Kafka 什么是Kafka Kafka软件结构 Kafka的特点 怎么启动Kafka 下载Kafka 配置Kafka Zookeeper 启动Kafka Kafka案例 添加依赖 添加配 ...
- Linux搭建Kafka开发环境
Linux搭建Kafka开发环境 Kafka是最初由Linkedin公司开发,是一个分布式.支持分区的(partition).多副本的(replica),基于zookeeper协调的分布式消息系统,它 ...
- 2021年大数据Kafka(十):kafka生产者数据分发策略
全网最详细的大数据Kafka文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 生产者数据分发策略 策略一:用户指定了partition 策 ...
最新文章
- 轻松应对Java试题,这是一份大数据分析工程师面试指南
- C# Dictionary.Add(key,123) 与 Dictionary[key]=123的区别
- Cocoa之NSWindow常用总结
- Migration Necessary
- java InputStream的使用
- Java——n个数的全排列
- Leetcode--424. 替换后的最长重复字符
- 异常将上下文初始化事件发送到类的侦听器实例._Java CLassLoader类加载器详解,一点课堂(多岸学院)...
- html5制作拼图游戏教程,用HTML5制作视频拼图的教程
- 计算机网络交换机组网及虚拟局域网实验报告,计算机网络实验报告材料(虚拟局域网).doc...
- 打开access文件 提示文件名无效_分享在PS软件打开图像时提示无效的JPEG的解决方法...
- Windows系统结构
- 计算机网络入侵参考文献,入侵计算机网络论文,关于计算机网络入侵检测相关参考文献资料-免费论文范文...
- 计算机音乐有哪些优势,谈谈用电脑听无损音乐的好处
- ubuntu 16.04安装网易云音乐,没声音?
- 【QNX Hypervisor 2.2 用户手册】1.5 内存
- 混凝土与水泥制品行业运行分析
- 关于POS操作的一些文章收集链接
- Data Binding的报错集合 例如Error 10 54 错误 程序包com kodulf recycl
- Nginx常用配置及代理转发
热门文章
- servlet如何使用session把用户的手机号修改_SpringBoot源码学习系列之嵌入式Servlet容器...
- 一元三次方程重根判别式_如何求一元三次方程
- 邢台职业技术学院计算机系宿舍,邢台职业技术学院宿舍条件怎么样 男生女生宿舍图片...
- wind 下装mysql,windows 下安装MySQL
- springcloud 网关_Spring Cloud 系列之 Netflix Zuul 服务网关(二)
- 【学习笔记】第一章——操作系统的概念、功能、特征、发展分类(系统调用、并发共享虚拟异步)
- python抽荣耀水晶_教你2种免费拿荣耀水晶的方法,获奖概率让人惊喜,一般人我不告诉他...
- android ui自动化框架选型,Appium UI 自动化框架之我见 (开源)
- linux curl 编译命令,linux 编译 curl 出错
- 一键装机linux_(推荐)linux用一键安装包