CC00073.kafka——|Hadoopkafka.V58|——|kafka.v58|稳定性|事务操作|
### --- 事务操作~~~ # 在Kafka事务中,一个原子性操作,根据操作类型可以分为3种情况。情况如下:
~~~ 只有Producer生产消息,这种场景需要事务的介入;
~~~ 消费消息和生产消息并存,比如Consumer&Producer模式,
~~~ 这种场景是一般Kafka项目中比较常见的模式,需要事务介入;
~~~ 只有Consumer消费消息,这种操作在实际项目中意义不大,和手动Commit Offsets的结果一样,
~~~ 而且这种场景不是事务的引入目的。
### --- 初始化事务,需要注意确保transation.id属性被分配
void initTransactions();// 开启事务
void beginTransaction() throws ProducerFencedException;// 为Consumer提供的在事务内Commit Offsets的操作
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata>offsets,String consumerGroupId) throws
ProducerFencedException;// 提交事务
void commitTransaction() throws ProducerFencedException;// 放弃事务,类似于回滚事务的操作
void abortTransaction() throws ProducerFencedException;
### --- 创建一个maven项目:demo-12-kafka-Transactionl
### --- 导入pom.xml依赖<dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>1.0.2</version></dependency></dependencies>
### --- 案例1:单个Producer,使用过十五保证消息的仅一次发送package com.yanqi.kafka.demo.producer;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.HashMap;
import java.util.Map;public class MyTransactionalProducer {public static void main(String[] args) {Map<String, Object> configs = new HashMap<>();configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);// 提供生产者client.idconfigs.put(ProducerConfig.CLIENT_ID_CONFIG, "tx_producer");// 设置事务IDconfigs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my_tx_id_1");// 需要ISR全体确认消息configs.put(ProducerConfig.ACKS_CONFIG, "all");KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configs);// 初始化事务producer.initTransactions();try {// 开启事务producer.beginTransaction();// 发送事务消息producer.send(new ProducerRecord<>("tp_tx_01", "txkey1", "tx_msg_4"));producer.send(new ProducerRecord<>("tp_tx_01", "txkey2", "tx_msg_5"));producer.send(new ProducerRecord<>("tp_tx_01", "txkey3", "tx_msg_6"));int i = 1 / 0;// 提交事务producer.commitTransaction();} catch (Exception e) {e.printStackTrace();// 事务回滚producer.abortTransaction();} finally {// 关闭生产者producer.close();}}}
### --- 案例2:在消费-转换-生产模式,使用事务保证仅一次发送。package com.yanqi.kafka.demo;import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Collections;
import java.util.HashMap;
import java.util.Map;public class MyTransactional {public static KafkaProducer<String, String> getProducer() {Map<String, Object> configs = new HashMap<>();configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);// 设置client.idconfigs.put(ProducerConfig.CLIENT_ID_CONFIG, "tx_producer_01");// 设置事务idconfigs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tx_id_02");// 需要所有的ISR副本确认configs.put(ProducerConfig.ACKS_CONFIG, "all");// 启用幂等性configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configs);return producer;}public static KafkaConsumer<String, String> getConsumer(String consumerGroupId) {Map<String, Object> configs = new HashMap<>();configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);// 设置消费组IDconfigs.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer_grp_02");// 不启用消费者偏移量的自动确认,也不要手动确认configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);configs.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer_client_02");configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");// 只读取已提交的消息
// configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(configs);return consumer;}public static void main(String[] args) {String consumerGroupId = "consumer_grp_id_101";KafkaProducer<String, String> producer = getProducer();KafkaConsumer<String, String> consumer = getConsumer(consumerGroupId);// 事务的初始化producer.initTransactions();//订阅主题consumer.subscribe(Collections.singleton("tp_tx_01"));final ConsumerRecords<String, String> records = consumer.poll(1_000);// 开启事务producer.beginTransaction();try {Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();for (ConsumerRecord<String, String> record : records) {System.out.println(record);producer.send(new ProducerRecord<String, String>("tp_tx_out_01", record.key(), record.value()));offsets.put(new TopicPartition(record.topic(), record.partition()),new OffsetAndMetadata(record.offset() + 1)); // 偏移量表示下一条要消费的消息}// 将该消息的偏移量提交作为事务的一部分,随事务提交和回滚(不提交消费偏移量)producer.sendOffsetsToTransaction(offsets, consumerGroupId);// int i = 1 / 0;// 提交事务producer.commitTransaction();} catch (Exception e) {e.printStackTrace();// 回滚事务producer.abortTransaction();} finally {// 关闭资源producer.close();consumer.close();}}
}
### --- 准备运行主题
~~~ # 创建主题[root@hadoop01 ~]# kafka-topics.sh --zookeeper localhost:2181/myKafka \
--create --topic tp_tx_01 --partitions 1 --replication-factor 1
### --- 创建一个消费者用于消费消息[root@hadoop01 ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic tp_tx_01 --from-beginning
~~~输出参数
tx_msg_1
tx_msg_2
tx_msg_3
### --- 编译打印:MyTransactionalProducerD:\JAVA\jdk1.8.0_231\bin\java.exe "-javaagent:D:\IntelliJIDEA\IntelliJ IDEA 2019.3.3\lib\idea_rt.jar=52156:D:\IntelliJIDEA\IntelliJ IDEA 2019.3.3\bin" -Dfile.encoding=UTF-8 -classpath D:\JAVA\jdk1.8.0_231\jre\lib\charsets.jar;D:\JAVA\jdk1.8.0_231\jre\lib\deploy.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\access-bridge-64.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\cldrdata.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\dnsns.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\jaccess.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\jfxrt.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\localedata.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\nashorn.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\sunec.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\sunjce_provider.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\sunmscapi.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\sunpkcs11.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\zipfs.jar;D:\JAVA\jdk1.8.0_231\jre\lib\javaws.jar;D:\JAVA\jdk1.8.0_231\jre\lib\jce.jar;D:\JAVA\jdk1.8.0_231\jre\lib\jfr.jar;D:\JAVA\jdk1.8.0_231\jre\lib\jfxswt.jar;D:\JAVA\jdk1.8.0_231\jre\lib\jsse.jar;D:\JAVA\jdk1.8.0_231\jre\lib\management-agent.jar;D:\JAVA\jdk1.8.0_231\jre\lib\plugin.jar;D:\JAVA\jdk1.8.0_231\jre\lib\resources.jar;D:\JAVA\jdk1.8.0_231\jre\lib\rt.jar;E:\NO.Z.10000——javaproject\NO.Z.00002.Hadoop\kafka_demo\demo-12-kafka-Transactional\target\classes;C:\Users\Administrator\.m2\repository\org\apache\kafka\kafka-clients\1.0.2\kafka-clients-1.0.2.jar;C:\Users\Administrator\.m2\repository\org\lz4\lz4-java\1.4\lz4-java-1.4.jar;C:\Users\Administrator\.m2\repository\org\xerial\snappy\snappy-java\1.1.4\snappy-java-1.1.4.jar;C:\Users\Administrator\.m2\repository\org\slf4j\slf4j-api\1.7.25\slf4j-api-1.7.25.jar com.yanqi.kafka.demo.producer.MyTransactionalProducer
~~~未输出任何参数
### --- 创建消费者[root@hadoop01 ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic tp_tx_01 -isolation-level read_committed --from-beginning
~~~输出参数
~~~ # 不会输出任何参数
### --- 编译打印:MyTransactionalProducerD:\JAVA\jdk1.8.0_231\bin\java.exe "-javaagent:D:\IntelliJIDEA\IntelliJ IDEA 2019.3.3\lib\idea_rt.jar=49304:D:\IntelliJIDEA\IntelliJ IDEA 2019.3.3\bin" -Dfile.encoding=UTF-8 -classpath D:\JAVA\jdk1.8.0_231\jre\lib\charsets.jar;D:\JAVA\jdk1.8.0_231\jre\lib\deploy.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\access-bridge-64.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\cldrdata.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\dnsns.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\jaccess.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\jfxrt.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\localedata.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\nashorn.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\sunec.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\sunjce_provider.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\sunmscapi.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\sunpkcs11.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\zipfs.jar;D:\JAVA\jdk1.8.0_231\jre\lib\javaws.jar;D:\JAVA\jdk1.8.0_231\jre\lib\jce.jar;D:\JAVA\jdk1.8.0_231\jre\lib\jfr.jar;D:\JAVA\jdk1.8.0_231\jre\lib\jfxswt.jar;D:\JAVA\jdk1.8.0_231\jre\lib\jsse.jar;D:\JAVA\jdk1.8.0_231\jre\lib\management-agent.jar;D:\JAVA\jdk1.8.0_231\jre\lib\plugin.jar;D:\JAVA\jdk1.8.0_231\jre\lib\resources.jar;D:\JAVA\jdk1.8.0_231\jre\lib\rt.jar;E:\NO.Z.10000——javaproject\NO.Z.00002.Hadoop\kafka_demo\demo-12-kafka-Transactional\target\classes;C:\Users\Administrator\.m2\repository\org\apache\kafka\kafka-clients\1.0.2\kafka-clients-1.0.2.jar;C:\Users\Administrator\.m2\repository\org\lz4\lz4-java\1.4\lz4-java-1.4.jar;C:\Users\Administrator\.m2\repository\org\xerial\snappy\snappy-java\1.1.4\snappy-java-1.1.4.jar;C:\Users\Administrator\.m2\repository\org\slf4j\slf4j-api\1.7.25\slf4j-api-1.7.25.jar com.yanqi.kafka.demo.MyTransactional
~~~不会输出任何参数
~~~ # 抛异常
CC00073.kafka——|Hadoopkafka.V58|——|kafka.v58|稳定性|事务操作|相关推荐
- CC00038.kafka——|Hadoopkafka.V23|——|kafka.v23|消费者拦截器参数配置|
一.消费者拦截器参数配置:消费者参数配置补充 配置项 说明 bootstrap.servers 建立到Kafka集群的初始连接用到的host/port列表. 客户端会使用这里指定的所有的host/po ...
- CC00042.kafka——|Hadoopkafka.V27|——|kafka.v27|主题管理.v02|
一.修改主题 ### --- 为topic_x加入segment.bytes配置[root@hadoop ~]# kafka-topics.sh --zookeeper localhost:2181/ ...
- CC00065.kafka——|Hadoopkafka.V50|——|kafka.v50|日志清理|
一.日志压缩策略 ### --- 概念~~~ 日志压缩是Kafka的一种机制,可以提供较为细粒度的记录保留, ~~~ 而不是基于粗粒度的基于时间的保留. ~~~ 对于具有相同的Key,而数据不同,只保 ...
- CC00060.kafka——|Hadoopkafka.V45|——|kafka.v45|日志存储概述|
一.日志存储概述 ### --- 日志存储概述~~~ Kafka 消息是以主题为单位进行归类,各个主题之间是彼此独立的,互不影响. ~~~ 每个主题又可以分为一个或多个分区. ~~~ 每个分区各自存在 ...
- CC00034.kafka——|Hadoopkafka.V19|——|kafka.v19|消费者位移管理.v02|
一.消费者位移管理数据准备 ### --- 准备数据~~~ # 生成消息文件 [root@hadoop ~]# for i in `seq 60`; do echo "hello yanqi ...
- kafka版本_Apache Kafka 版本演进及特性介绍
前段时间有一个同事问到:Kafka 0.8.2 只能使用Zookeeper连接吗?虽然仍有一部分Kafka的老用户在使用 0.8.x 版本,但 Kafka 0.8.x 确实是比较老的版本了.如果不是对 ...
- Kafka(三)、Kafka架构
Kafka架构 一.Kafka 基本介绍 1.1 什么是Kafka 1.2 Kafka特性 1.3 常见应用场景 二.Kafka 系统架构 2.1 基本概念 2.2 index / timeindex ...
- Kafka教程(一)Kafka入门教程
Kafka教程(一)Kafka入门教程 1 Kafka入门教程 1.1 消息队列(Message Queue) Message Queue消息传送系统提供传送服务.消息传送依赖于大量支持组件,这些组件 ...
- 最新Kafka教程(包含kafka部署与基本操作、java连接kafka、spring连接kafka以及使用springboot)
最新Kafka教程(包含kafka部署与基本操作.java连接kafka.spring连接kafka以及使用springboot) 欢迎转载,转载请注明网址:https://blog.csdn.net ...
最新文章
- tcc分布式事务_什么是 TCC分布式事务?
- 第二十五章补充内容 3 assert()宏
- 第二篇:Mysql---约束条件、修改表的结构、键值
- Stable Sort Aizu - ALDS1_2_C
- C++有符号和无符号数的转换
- [导入]Visual Studio 2005 Web Deployment Projects版本不同引发的问题
- python学习与数据挖掘_python机器学习与数据挖掘
- mysql外键约束脚本_使用SQL脚本创建数据库,操作主键、外键与各种约束(MS SQL Server)...
- 提升网页加载速度—预加载VS预读取
- 条款28:避免返回handles指向对象内部的成分(Avoid returning handles to objects internals)...
- 不得不学的统计学基础知识(二)
- 计算机电子管与晶体管区别,电子管、晶体管与集成电路 扫盲
- Ubuntu搭建KMS服务
- win10连接计算机,win10怎么连接局域网打印机
- 电动48V/60V自行车/摩托车/观光车电池检测设备,满足GB38031新国标测试
- RSA详解 ----- Android中常用的非对称加密算法
- 揭秘中国球员十大豪宅
- 新猿木子李:0基础学python培训教程 Python操作Excel之写入数据
- Inkscape制作LOGO——新手
- android exo解码问题,Android Exoplayer音频播放异常
热门文章
- 在Adobe Reader里添加书签功能
- Linux CentOS 7.3 1611 基础命令
- 「雕爷学编程」Arduino动手做(39)——DS18B20温度传感器
- 原生js输出html5,原生JS+HTML5实现的可调节写字板功能示例
- Python语言基础学习不错的网站
- 关于PEER - PEER毅恒挚友 - Powered by Discuz!
- 单片机入门(三)----DS1302时钟芯片 可调时钟
- Python中用PIL/Pillow裁剪图片
- 防火墙和iptables
- 今天看到“勃客郑渊洁”,是我的偶像