FlinkKafkaProducer extends TwoPhaseCommitSinkFunction implements CheckpointedFunction, CheckpointListener

TwoPhaseCommitSinkFunction 实现 CheckpointedFunction中的initializeState和snapshotState
CheckpointListener 中的notifyCheckpointComplete

参考:https://cloud.tencent.com/developer/article/1583233
###############################################################################
总结 FlinkKafkaProducer && TPC
1、开启事务:initializeState initializeState TPC.beginTransaction() 开启事务,并初始化KafkaProducer,kafkaProducer 会初始化 accumulateRecord和sender线程
FlinkKafkaProducer.initializeState -> TPC.initializeState
获取状态信息
如果是restore,获取state中未提交的事务,构建kafkaProducer,重新提交事务 getPendingCommitTransactions -> recoverAndCommitInternal -> recoverAndCommit{producer = initTransactionalProducer,producer.commitTransaction()}
初始化用户context,生成 transactionalIds initializeUserContext -> generateNewTransactionalIds -> generateIdsToUse

   Flink用一个队列作为transactional id的Pool,新的Transaction开始时从队头拿出一个transactional id,Transaction结束时将transactional id放回队尾。因为每开始一个Transaction,都会构造一个新的Kafka Producer,因此availableTransactionalIds初始的大小就是配置的Kafka Producer Pool Size(默认是5)public Set<String> generateIdsToUse(long nextFreeTransactionalId) {Set<String> transactionalIds = new HashSet<>();for (int i = 0; i < poolSize; i++) {long transactionalId = nextFreeTransactionalId + subtaskIndex * poolSize + i;transactionalIds.add(generateTransactionalId(transactionalId));}return transactionalIds;}开启新的事务,初始化新的事务关联的producer        beginTransactionInternal -> createTransactionalProducerprivate FlinkKafkaInternalProducer<byte[], byte[]> createTransactionalProducer() throws FlinkKafkaException {String transactionalId = availableTransactionalIds.poll();if (transactionalId == null) {throw new FlinkKafkaException(FlinkKafkaErrorCode.PRODUCERS_POOL_EMPTY,"Too many ongoing snapshots. Increase kafka producers pool size or decrease number of concurrent checkpoints.");}FlinkKafkaInternalProducer<byte[], byte[]> producer = initTransactionalProducer(transactionalId, true);producer.initTransactions();return producer;}initTransactionalProducer -> initProducer -> createProducer -> FlinkKafkaInternalProducer -> kafkaProducer = new KafkaProducer<>(properties)    kafkaProducer 会初始化 accumulateRecord和sender线程

2、invoke 方法,调用kafkaProducer.send 将处理的数据写入 kafka
3、预提交kafka事务,同时开启下一个事务 :preCommit, beginTransactionInternal()
snapshotState -> preCommit 每次checkpoint触发时,调用snapshotState,调用FlinkKafkaProducer.preCommit 再调用kafkaProduer.flush方法,将将RecordAccumulator 中未写入完kafka broker中的剩余数据使用sender写入完 。
注: 2,3两步 已经将消息发送到kafka,因为beginTransaction 已经启动sender线程,accumulateRecord中有数据就会发送到kafka, 如果kafkaconsumer的isolation.level为 read_uncommitted(默认),就能读到写入的数据导致脏读,
将其设置为read_committed 才能读到提交的数据,但会有延时,延时时间为checkpoint间隔时间
4、提交kafka事务 : notifyCheckpointComplete -> commit checkpoint 完成时调用 notifyCheckpointComplete -> comit 调用 kafkaProducer.commitTransaction 提交kafka事务

################################################################################

TwoPhaseCommitSinkFunction
1、initializeState checkpoint 初始化

 @Overridepublic void initializeState(FunctionInitializationContext context) throws Exception {// when we are restoring state with pendingCommitTransactions, we don't really know whether the// transactions were already committed, or whether there was a failure between// completing the checkpoint on the master, and notifying the writer here.// (the common case is actually that is was already committed, the window// between the commit on the master and the notification here is very small)// it is possible to not have any transactions at all if there was a failure before// the first completed checkpoint, or in case of a scale-out event, where some of the// new task do  not have and transactions assigned to check)// we can have more than one transaction to check in case of a scale-in event, or// for the reasons discussed in the 'notifyCheckpointComplete()' method.state = context.getOperatorStateStore().getListState(stateDescriptor);boolean recoveredUserContext = false;// 遇到故障重启时if (context.isRestored()) {LOG.info("{} - restoring state", name());for (State<TXN, CONTEXT> operatorState : state.get()) {userContext = operatorState.getContext();List<TransactionHolder<TXN>> recoveredTransactions = operatorState.getPendingCommitTransactions();List<TXN> handledTransactions = new ArrayList<>(recoveredTransactions.size() + 1);for (TransactionHolder<TXN> recoveredTransaction : recoveredTransactions) {// If this fails to succeed eventually, there is actually data lossrecoverAndCommitInternal(recoveredTransaction);handledTransactions.add(recoveredTransaction.handle);LOG.info("{} committed recovered transaction {}", name(), recoveredTransaction);}{TXN transaction = operatorState.getPendingTransaction().handle;recoverAndAbort(transaction);handledTransactions.add(transaction);LOG.info("{} aborted recovered transaction {}", name(), operatorState.getPendingTransaction());}if (userContext.isPresent()) {finishRecoveringContext(handledTransactions);recoveredUserContext = true;}}}// if in restore we didn't get any userContext or we are initializing from scratchif (!recoveredUserContext) {LOG.info("{} - no state to restore", name());userContext = initializeUserContext();}this.pendingCommitTransactions.clear();currentTransactionHolder = beginTransactionInternal();LOG.debug("{} - started new transaction '{}'", name(), currentTransactionHolder);}
beginTransactionInternal->
private TransactionHolder<TXN> beginTransactionInternal() throws Exception {return new TransactionHolder<>(beginTransaction(), clock.millis());}
// 开启事务
FlinkKafkaProducer.beginTransaction() 见下文

2、snapshotState 每次checkpoint时执行 预提交kafka事务

public void snapshotState(FunctionSnapshotContext context) throws Exception {// this is like the pre-commit of a 2-phase-commit transaction// we are ready to commit and remember the transactioncheckState(currentTransactionHolder != null, "bug: no transaction object when performing state snapshot");long checkpointId = context.getCheckpointId();LOG.debug("{} - checkpoint {} triggered, flushing transaction '{}'", name(), context.getCheckpointId(), currentTransactionHolder);// 执行预提交preCommit(currentTransactionHolder.handle);pendingCommitTransactions.put(checkpointId, currentTransactionHolder);LOG.debug("{} - stored pending transactions {}", name(), pendingCommitTransactions);currentTransactionHolder = beginTransactionInternal();LOG.debug("{} - started new transaction '{}'", name(), currentTransactionHolder);state.clear();state.add(new State<>(this.currentTransactionHolder,new ArrayList<>(pendingCommitTransactions.values()),userContext));}
preCommit -> FlinkKafkaProducer.preCommit

3、notifyCheckpointComplete checkpoint 完成时提交kafka事务

public final void notifyCheckpointComplete(long checkpointId) throws Exception {// the following scenarios are possible here////  (1) there is exactly one transaction from the latest checkpoint that//      was triggered and completed. That should be the common case.//      Simply commit that transaction in that case.////  (2) there are multiple pending transactions because one previous//      checkpoint was skipped. That is a rare case, but can happen//      for example when:////        - the master cannot persist the metadata of the last//          checkpoint (temporary outage in the storage system) but//          could persist a successive checkpoint (the one notified here)////        - other tasks could not persist their status during//          the previous checkpoint, but did not trigger a failure because they//          could hold onto their state and could successfully persist it in//          a successive checkpoint (the one notified here)////      In both cases, the prior checkpoint never reach a committed state, but//      this checkpoint is always expected to subsume the prior one and cover all//      changes since the last successful one. As a consequence, we need to commit//      all pending transactions.////  (3) Multiple transactions are pending, but the checkpoint complete notification//      relates not to the latest. That is possible, because notification messages//      can be delayed (in an extreme case till arrive after a succeeding checkpoint//      was triggered) and because there can be concurrent overlapping checkpoints//      (a new one is started before the previous fully finished).//// ==> There should never be a case where we have no pending transaction here//Iterator<Map.Entry<Long, TransactionHolder<TXN>>> pendingTransactionIterator = pendingCommitTransactions.entrySet().iterator();Throwable firstError = null;while (pendingTransactionIterator.hasNext()) {Map.Entry<Long, TransactionHolder<TXN>> entry = pendingTransactionIterator.next();Long pendingTransactionCheckpointId = entry.getKey();TransactionHolder<TXN> pendingTransaction = entry.getValue();if (pendingTransactionCheckpointId > checkpointId) {continue;}LOG.info("{} - checkpoint {} complete, committing transaction {} from checkpoint {}",name(), checkpointId, pendingTransaction, pendingTransactionCheckpointId);logWarningIfTimeoutAlmostReached(pendingTransaction);try {// 提交kafka事务commit(pendingTransaction.handle);} catch (Throwable t) {if (firstError == null) {firstError = t;}}LOG.debug("{} - committed checkpoint transaction {}", name(), pendingTransaction);pendingTransactionIterator.remove();}if (firstError != null) {throw new FlinkRuntimeException("Committing one of transactions failed, logging first encountered failure",firstError);}}
commit ->FlinkKafkaProducer.commit

FlinkKafkaProducer

1、beginTransaction

protected FlinkKafkaProducer.KafkaTransactionState beginTransaction() throws FlinkKafkaException {switch (semantic) {case EXACTLY_ONCE:// 创建KafkaProducerFlinkKafkaInternalProducer<byte[], byte[]> producer = createTransactionalProducer();producer.beginTransaction();return new FlinkKafkaProducer.KafkaTransactionState(producer.getTransactionalId(), producer);case AT_LEAST_ONCE:case NONE:// Do not create new producer on each beginTransaction() if it is not necessaryfinal FlinkKafkaProducer.KafkaTransactionState currentTransaction = currentTransaction();if (currentTransaction != null && currentTransaction.producer != null) {return new FlinkKafkaProducer.KafkaTransactionState(currentTransaction.producer);}return new FlinkKafkaProducer.KafkaTransactionState(initNonTransactionalProducer(true));default:throw new UnsupportedOperationException("Not implemented semantic");}}
--> createTransactionalProducer -> initTransactionalProducer -> initProducer -> createProducer -> new FlinkKafkaInternalProducer-> new KafkaProducer  -> KafkaProducer
--> KafkaProducer   // 以下为kafka源码// 创建RecordAccumulatorthis.accumulator = new RecordAccumulator// 创建sender对象并启动  this.sender = newSender(logContext, kafkaClient, this.metadata);String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;this.ioThread = new KafkaThread(ioThreadName, this.sender, true);this.ioThread.start();

2、invoke

public void invoke(FlinkKafkaProducer.KafkaTransactionState transaction, IN next, Context context) throws FlinkKafkaException {// 此步就已经将消息发送到kafka,因为beginTransaction 以启动sender线程, 如果kafkaconsumer的isolation.level为// read_uncommitted(默认),就能读到写入的数据导致脏读, 将其设置为read_committed 才能读到提交的数据,但会有延时,// 延时时间为checkpoint间隔时间transaction.producer.send(record, callback);
}

3、preCommit

 protected void preCommit(FlinkKafkaProducer.KafkaTransactionState transaction) throws FlinkKafkaException {switch (semantic) {case EXACTLY_ONCE:case AT_LEAST_ONCE:// 刷新数据,将RecordAccumulator 中未写入完kafka broker中的数据使用sender写入完 flush(transaction);break;case NONE:break;default:throw new UnsupportedOperationException("Not implemented semantic");}checkErroneous();}
--> flush -> transaction.producer.flush() -> kafkaProducer.flush()-> /*** Invoking this method makes all buffered records immediately available to send (even if <code>linger.ms</code> is* greater than 0) and blocks on the completion of the requests associated with these records.*/
public void flush() {log.trace("Flushing accumulated records in producer.");this.accumulator.beginFlush();this.sender.wakeup();try {this.accumulator.awaitFlushCompletion();} catch (InterruptedException e) {throw new InterruptException("Flush interrupted.", e);}}

4、commit

protected void commit(FlinkKafkaProducer.KafkaTransactionState transaction) {if (transaction.isTransactional()) {try {transaction.producer.commitTransaction();} finally {recycleTransactionalProducer(transaction.producer);}}}
commitTransaction-> kafkaProducer.commitTransaction()

FlinkKafkaProducer源码分析相关推荐

  1. 【Golang源码分析】Go Web常用程序包gorilla/mux的使用与源码简析

    目录[阅读时间:约10分钟] 一.概述 二.对比: gorilla/mux与net/http DefaultServeMux 三.简单使用 四.源码简析 1.NewRouter函数 2.HandleF ...

  2. SpringBoot-web开发(四): SpringMVC的拓展、接管(源码分析)

    [SpringBoot-web系列]前文: SpringBoot-web开发(一): 静态资源的导入(源码分析) SpringBoot-web开发(二): 页面和图标定制(源码分析) SpringBo ...

  3. SpringBoot-web开发(二): 页面和图标定制(源码分析)

    [SpringBoot-web系列]前文: SpringBoot-web开发(一): 静态资源的导入(源码分析) 目录 一.首页 1. 源码分析 2. 访问首页测试 二.动态页面 1. 动态资源目录t ...

  4. SpringBoot-web开发(一): 静态资源的导入(源码分析)

    目录 方式一:通过WebJars 1. 什么是webjars? 2. webjars的使用 3. webjars结构 4. 解析源码 5. 测试访问 方式二:放入静态资源目录 1. 源码分析 2. 测 ...

  5. Yolov3Yolov4网络结构与源码分析

    Yolov3&Yolov4网络结构与源码分析 从2018年Yolov3年提出的两年后,在原作者声名放弃更新Yolo算法后,俄罗斯的Alexey大神扛起了Yolov4的大旗. 文章目录 论文汇总 ...

  6. ViewGroup的Touch事件分发(源码分析)

    Android中Touch事件的分发又分为View和ViewGroup的事件分发,View的touch事件分发相对比较简单,可参考 View的Touch事件分发(一.初步了解) View的Touch事 ...

  7. View的Touch事件分发(二.源码分析)

    Android中Touch事件的分发又分为View和ViewGroup的事件分发,先来看简单的View的touch事件分发. 主要分析View的dispatchTouchEvent()方法和onTou ...

  8. MyBatis原理分析之四:一次SQL查询的源码分析

    上回我们讲到Mybatis加载相关的配置文件进行初始化,这回我们讲一下一次SQL查询怎么进行的. 准备工作 Mybatis完成一次SQL查询需要使用的代码如下: Java代码   String res ...

  9. [转]slf4j + log4j原理实现及源码分析

    slf4j + log4j原理实现及源码分析 转载于:https://www.cnblogs.com/jasonzeng888/p/6051080.html

  10. Spark源码分析之七:Task运行(一)

    在Task调度相关的两篇文章<Spark源码分析之五:Task调度(一)>与<Spark源码分析之六:Task调度(二)>中,我们大致了解了Task调度相关的主要逻辑,并且在T ...

最新文章

  1. 如何在同一系统里同时启动多个Tomcat
  2. 0001-Two Sum(两数之和)
  3. JAVA的嵌入式脚本开发(中)
  4. java无限循环可变参数,Java可变参数、加强for循环
  5. Python实例讲解 -- 磁盘文件的操作
  6. 使用hierarchyid查询分层数据
  7. CvvImage.h和CvvImage.cpp
  8. HTML表格:日常消费账单表格展示网页
  9. Foxmail登录网易邮箱提示LOGIN Login error user suspended
  10. [C语言]二维数组传参的格式(详细+总结)
  11. Python Scrapy爬虫报错-Spider error processing
  12. 90后凤凰男:寒门难出贵子
  13. 下载和安装MySQL(傻瓜)教程
  14. 1ms超快响应+144Hz极限刷新 这就是电竞游戏显示器该有的样子
  15. throw new Error() 真实的用法和throw error 的却别
  16. echarts如何引入市级地图
  17. 不一样的VR全景购物,赋能商超和店铺购物升级
  18. 如何解决电脑的电流声吱吱滋滋和爆破声咔咔,困扰一个月了亲测已经解决
  19. 新版神舟战神ZX9游戏本 评测
  20. 百度携手长安汽车,Apollo生态引领自动驾驶进入量产时代

热门文章

  1. Huffman实现对26个英文字母的编码
  2. 如何精确理解leader布置的任务
  3. 工作组计算机如何设置文件共享,怎么设置办公室几台电脑文件共享?
  4. 两个向量的点乘和叉乘怎么算_数学基础 —— 向量运算:点乘和叉乘
  5. excel填充序列_分分钟搞定10万个序号自动填充,拒绝加班,你还在手动输入吗?...
  6. 线元法输入曲线要素_交点法、线元法
  7. ADS1110/ADS1271
  8. Python列表常用函数总结
  9. 如何测试扫码支付二维码?
  10. 微信扫码支付 java版