一、事务操作
### --- 事务操作~~~     # 在Kafka事务中,一个原子性操作,根据操作类型可以分为3种情况。情况如下:
~~~     只有Producer生产消息,这种场景需要事务的介入;
~~~     消费消息和生产消息并存,比如Consumer&Producer模式,
~~~     这种场景是一般Kafka项目中比较常见的模式,需要事务介入;
~~~     只有Consumer消费消息,这种操作在实际项目中意义不大,和手动Commit Offsets的结果一样,
~~~     而且这种场景不是事务的引入目的。

### --- 初始化事务,需要注意确保transation.id属性被分配
void initTransactions();// 开启事务
void beginTransaction() throws ProducerFencedException;// 为Consumer提供的在事务内Commit Offsets的操作
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata>offsets,String consumerGroupId) throws
ProducerFencedException;// 提交事务
void commitTransaction() throws ProducerFencedException;// 放弃事务,类似于回滚事务的操作
void abortTransaction() throws ProducerFencedException;

二、事务操作案例:
### --- 创建一个maven项目:demo-12-kafka-Transactionl
### --- 导入pom.xml依赖<dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>1.0.2</version></dependency></dependencies>

三、编程代码没实现
### --- 案例1:单个Producer,使用过十五保证消息的仅一次发送package com.yanqi.kafka.demo.producer;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.HashMap;
import java.util.Map;public class MyTransactionalProducer {public static void main(String[] args) {Map<String, Object> configs = new HashMap<>();configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);// 提供生产者client.idconfigs.put(ProducerConfig.CLIENT_ID_CONFIG, "tx_producer");// 设置事务IDconfigs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my_tx_id_1");// 需要ISR全体确认消息configs.put(ProducerConfig.ACKS_CONFIG, "all");KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configs);// 初始化事务producer.initTransactions();try {// 开启事务producer.beginTransaction();// 发送事务消息producer.send(new ProducerRecord<>("tp_tx_01", "txkey1", "tx_msg_4"));producer.send(new ProducerRecord<>("tp_tx_01", "txkey2", "tx_msg_5"));producer.send(new ProducerRecord<>("tp_tx_01", "txkey3", "tx_msg_6"));int i = 1 / 0;// 提交事务producer.commitTransaction();} catch (Exception e) {e.printStackTrace();// 事务回滚producer.abortTransaction();} finally {// 关闭生产者producer.close();}}}

### --- 案例2:在消费-转换-生产模式,使用事务保证仅一次发送。package com.yanqi.kafka.demo;import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Collections;
import java.util.HashMap;
import java.util.Map;public class MyTransactional {public static KafkaProducer<String, String> getProducer() {Map<String, Object> configs = new HashMap<>();configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);// 设置client.idconfigs.put(ProducerConfig.CLIENT_ID_CONFIG, "tx_producer_01");// 设置事务idconfigs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tx_id_02");// 需要所有的ISR副本确认configs.put(ProducerConfig.ACKS_CONFIG, "all");// 启用幂等性configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configs);return producer;}public static KafkaConsumer<String, String> getConsumer(String consumerGroupId) {Map<String, Object> configs = new HashMap<>();configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);// 设置消费组IDconfigs.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer_grp_02");// 不启用消费者偏移量的自动确认,也不要手动确认configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);configs.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer_client_02");configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");// 只读取已提交的消息
//        configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(configs);return consumer;}public static void main(String[] args) {String consumerGroupId = "consumer_grp_id_101";KafkaProducer<String, String> producer = getProducer();KafkaConsumer<String, String> consumer = getConsumer(consumerGroupId);// 事务的初始化producer.initTransactions();//订阅主题consumer.subscribe(Collections.singleton("tp_tx_01"));final ConsumerRecords<String, String> records = consumer.poll(1_000);// 开启事务producer.beginTransaction();try {Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();for (ConsumerRecord<String, String> record : records) {System.out.println(record);producer.send(new ProducerRecord<String, String>("tp_tx_out_01", record.key(), record.value()));offsets.put(new TopicPartition(record.topic(), record.partition()),new OffsetAndMetadata(record.offset() + 1)); // 偏移量表示下一条要消费的消息}// 将该消息的偏移量提交作为事务的一部分,随事务提交和回滚(不提交消费偏移量)producer.sendOffsetsToTransaction(offsets, consumerGroupId);//            int i = 1 / 0;// 提交事务producer.commitTransaction();} catch (Exception e) {e.printStackTrace();// 回滚事务producer.abortTransaction();} finally {// 关闭资源producer.close();consumer.close();}}
}

四、编译打印:案例一
### --- 准备运行主题
~~~     # 创建主题[root@hadoop01 ~]# kafka-topics.sh --zookeeper localhost:2181/myKafka \
--create --topic tp_tx_01 --partitions 1 --replication-factor 1

### --- 创建一个消费者用于消费消息[root@hadoop01 ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic tp_tx_01 --from-beginning
~~~输出参数
tx_msg_1
tx_msg_2
tx_msg_3

### --- 编译打印:MyTransactionalProducerD:\JAVA\jdk1.8.0_231\bin\java.exe "-javaagent:D:\IntelliJIDEA\IntelliJ IDEA 2019.3.3\lib\idea_rt.jar=52156:D:\IntelliJIDEA\IntelliJ IDEA 2019.3.3\bin" -Dfile.encoding=UTF-8 -classpath D:\JAVA\jdk1.8.0_231\jre\lib\charsets.jar;D:\JAVA\jdk1.8.0_231\jre\lib\deploy.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\access-bridge-64.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\cldrdata.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\dnsns.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\jaccess.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\jfxrt.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\localedata.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\nashorn.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\sunec.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\sunjce_provider.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\sunmscapi.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\sunpkcs11.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\zipfs.jar;D:\JAVA\jdk1.8.0_231\jre\lib\javaws.jar;D:\JAVA\jdk1.8.0_231\jre\lib\jce.jar;D:\JAVA\jdk1.8.0_231\jre\lib\jfr.jar;D:\JAVA\jdk1.8.0_231\jre\lib\jfxswt.jar;D:\JAVA\jdk1.8.0_231\jre\lib\jsse.jar;D:\JAVA\jdk1.8.0_231\jre\lib\management-agent.jar;D:\JAVA\jdk1.8.0_231\jre\lib\plugin.jar;D:\JAVA\jdk1.8.0_231\jre\lib\resources.jar;D:\JAVA\jdk1.8.0_231\jre\lib\rt.jar;E:\NO.Z.10000——javaproject\NO.Z.00002.Hadoop\kafka_demo\demo-12-kafka-Transactional\target\classes;C:\Users\Administrator\.m2\repository\org\apache\kafka\kafka-clients\1.0.2\kafka-clients-1.0.2.jar;C:\Users\Administrator\.m2\repository\org\lz4\lz4-java\1.4\lz4-java-1.4.jar;C:\Users\Administrator\.m2\repository\org\xerial\snappy\snappy-java\1.1.4\snappy-java-1.1.4.jar;C:\Users\Administrator\.m2\repository\org\slf4j\slf4j-api\1.7.25\slf4j-api-1.7.25.jar com.yanqi.kafka.demo.producer.MyTransactionalProducer
~~~未输出任何参数

五、编译打印:案例二
### --- 创建消费者[root@hadoop01 ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic tp_tx_01 -isolation-level read_committed  --from-beginning
~~~输出参数
~~~     # 不会输出任何参数

### --- 编译打印:MyTransactionalProducerD:\JAVA\jdk1.8.0_231\bin\java.exe "-javaagent:D:\IntelliJIDEA\IntelliJ IDEA 2019.3.3\lib\idea_rt.jar=49304:D:\IntelliJIDEA\IntelliJ IDEA 2019.3.3\bin" -Dfile.encoding=UTF-8 -classpath D:\JAVA\jdk1.8.0_231\jre\lib\charsets.jar;D:\JAVA\jdk1.8.0_231\jre\lib\deploy.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\access-bridge-64.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\cldrdata.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\dnsns.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\jaccess.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\jfxrt.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\localedata.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\nashorn.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\sunec.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\sunjce_provider.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\sunmscapi.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\sunpkcs11.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\zipfs.jar;D:\JAVA\jdk1.8.0_231\jre\lib\javaws.jar;D:\JAVA\jdk1.8.0_231\jre\lib\jce.jar;D:\JAVA\jdk1.8.0_231\jre\lib\jfr.jar;D:\JAVA\jdk1.8.0_231\jre\lib\jfxswt.jar;D:\JAVA\jdk1.8.0_231\jre\lib\jsse.jar;D:\JAVA\jdk1.8.0_231\jre\lib\management-agent.jar;D:\JAVA\jdk1.8.0_231\jre\lib\plugin.jar;D:\JAVA\jdk1.8.0_231\jre\lib\resources.jar;D:\JAVA\jdk1.8.0_231\jre\lib\rt.jar;E:\NO.Z.10000——javaproject\NO.Z.00002.Hadoop\kafka_demo\demo-12-kafka-Transactional\target\classes;C:\Users\Administrator\.m2\repository\org\apache\kafka\kafka-clients\1.0.2\kafka-clients-1.0.2.jar;C:\Users\Administrator\.m2\repository\org\lz4\lz4-java\1.4\lz4-java-1.4.jar;C:\Users\Administrator\.m2\repository\org\xerial\snappy\snappy-java\1.1.4\snappy-java-1.1.4.jar;C:\Users\Administrator\.m2\repository\org\slf4j\slf4j-api\1.7.25\slf4j-api-1.7.25.jar com.yanqi.kafka.demo.MyTransactional
~~~不会输出任何参数
~~~     # 抛异常

CC00073.kafka——|Hadoopkafka.V58|——|kafka.v58|稳定性|事务操作|相关推荐

  1. CC00038.kafka——|Hadoopkafka.V23|——|kafka.v23|消费者拦截器参数配置|

    一.消费者拦截器参数配置:消费者参数配置补充 配置项 说明 bootstrap.servers 建立到Kafka集群的初始连接用到的host/port列表. 客户端会使用这里指定的所有的host/po ...

  2. CC00042.kafka——|Hadoopkafka.V27|——|kafka.v27|主题管理.v02|

    一.修改主题 ### --- 为topic_x加入segment.bytes配置[root@hadoop ~]# kafka-topics.sh --zookeeper localhost:2181/ ...

  3. CC00065.kafka——|Hadoopkafka.V50|——|kafka.v50|日志清理|

    一.日志压缩策略 ### --- 概念~~~ 日志压缩是Kafka的一种机制,可以提供较为细粒度的记录保留, ~~~ 而不是基于粗粒度的基于时间的保留. ~~~ 对于具有相同的Key,而数据不同,只保 ...

  4. CC00060.kafka——|Hadoopkafka.V45|——|kafka.v45|日志存储概述|

    一.日志存储概述 ### --- 日志存储概述~~~ Kafka 消息是以主题为单位进行归类,各个主题之间是彼此独立的,互不影响. ~~~ 每个主题又可以分为一个或多个分区. ~~~ 每个分区各自存在 ...

  5. CC00034.kafka——|Hadoopkafka.V19|——|kafka.v19|消费者位移管理.v02|

    一.消费者位移管理数据准备 ### --- 准备数据~~~ # 生成消息文件 [root@hadoop ~]# for i in `seq 60`; do echo "hello yanqi ...

  6. kafka版本_Apache Kafka 版本演进及特性介绍

    前段时间有一个同事问到:Kafka 0.8.2 只能使用Zookeeper连接吗?虽然仍有一部分Kafka的老用户在使用 0.8.x 版本,但 Kafka 0.8.x 确实是比较老的版本了.如果不是对 ...

  7. Kafka(三)、Kafka架构

    Kafka架构 一.Kafka 基本介绍 1.1 什么是Kafka 1.2 Kafka特性 1.3 常见应用场景 二.Kafka 系统架构 2.1 基本概念 2.2 index / timeindex ...

  8. Kafka教程(一)Kafka入门教程

    Kafka教程(一)Kafka入门教程 1 Kafka入门教程 1.1 消息队列(Message Queue) Message Queue消息传送系统提供传送服务.消息传送依赖于大量支持组件,这些组件 ...

  9. 最新Kafka教程(包含kafka部署与基本操作、java连接kafka、spring连接kafka以及使用springboot)

    最新Kafka教程(包含kafka部署与基本操作.java连接kafka.spring连接kafka以及使用springboot) 欢迎转载,转载请注明网址:https://blog.csdn.net ...

最新文章

  1. tcc分布式事务_什么是 TCC分布式事务?
  2. 第二十五章补充内容 3 assert()宏
  3. 第二篇:Mysql---约束条件、修改表的结构、键值
  4. Stable Sort Aizu - ALDS1_2_C
  5. C++有符号和无符号数的转换
  6. [导入]Visual Studio 2005 Web Deployment Projects版本不同引发的问题
  7. python学习与数据挖掘_python机器学习与数据挖掘
  8. mysql外键约束脚本_使用SQL脚本创建数据库,操作主键、外键与各种约束(MS SQL Server)...
  9. 提升网页加载速度—预加载VS预读取
  10. 条款28:避免返回handles指向对象内部的成分(Avoid returning handles to objects internals)...
  11. 不得不学的统计学基础知识(二)
  12. 计算机电子管与晶体管区别,电子管、晶体管与集成电路 扫盲
  13. Ubuntu搭建KMS服务
  14. win10连接计算机,win10怎么连接局域网打印机
  15. 电动48V/60V自行车/摩托车/观光车电池检测设备,满足GB38031新国标测试
  16. RSA详解 ----- Android中常用的非对称加密算法
  17. 揭秘中国球员十大豪宅
  18. 新猿木子李:0基础学python培训教程 Python操作Excel之写入数据
  19. Inkscape制作LOGO——新手
  20. android exo解码问题,Android Exoplayer音频播放异常

热门文章

  1. 在Adobe Reader里添加书签功能
  2. Linux CentOS 7.3 1611 基础命令
  3. 「雕爷学编程」Arduino动手做(39)——DS18B20温度传感器
  4. 原生js输出html5,原生JS+HTML5实现的可调节写字板功能示例
  5. Python语言基础学习不错的网站
  6. 关于PEER - PEER毅恒挚友 - Powered by Discuz!
  7. 单片机入门(三)----DS1302时钟芯片 可调时钟
  8. Python中用PIL/Pillow裁剪图片
  9. 防火墙和iptables
  10. 今天看到“勃客郑渊洁”,是我的偶像