1.12.Flink Kafka-Connector详解
1.12.1.Kafka Consumer消费策略设置
1.12.2.Kafka Consumer的容错
1.12.3.动态加载Topic
1.12.4.Kafka Consumers Offset 自动提交
1.12.5.Kafka Producer
1.12.6.Kafka Producer的容错-Kafka 0.9 and 0.10
1.12.7.Kafka Producer的容错-Kafka 0.11

1.12.Flink Kafka-Connector详解

Kafka中的partition机制和Flink的并行度机制深度结合。
Kafka可以作为Flink的source和sink
任务失败,通过设置kafka的offset来恢复应用

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;import java.util.Properties;/*** kafkaSink** Created by xxxx on 2020/10/09 on 2018/10/23.*/
public class StreamingKafkaSink {public static void main(String[] args) throws Exception {//获取Flink的运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//checkpoint配置env.enableCheckpointing(5000);env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);env.getCheckpointConfig().setCheckpointTimeout(60000);env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//设置statebackend//env.setStateBackend(new RocksDBStateBackend("hdfs://hadoop100:9000/flink/checkpoints",true));DataStreamSource<String> text = env.socketTextStream("hadoop100", 9001, "\n");String brokerList = "hadoop110:9092";String topic = "t1";Properties prop = new Properties();prop.setProperty("bootstrap.servers",brokerList);//第一种解决方案,设置FlinkKafkaProducer011里面的事务超时时间//设置事务超时时间//prop.setProperty("transaction.timeout.ms",60000*15+"");//第二种解决方案,设置kafka的最大事务超时时间//FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<>(brokerList, topic, new SimpleStringSchema());//使用仅一次语义的kafkaProducerFlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<>(topic, new KeyedSerializationSchemaWrapper<String>(new SimpleStringSchema()), prop, FlinkKafkaProducer011.Semantic.EXACTLY_ONCE);text.addSink(myProducer);env.execute("StreamingFromCollection");}
}
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;import java.util.Properties;/*** kafkaSource** Created by xxxx on 2020/10/09 on 2018/10/23.*/
public class StreamingKafkaSource {public static void main(String[] args) throws Exception {//获取Flink的运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//checkpoint配置env.enableCheckpointing(5000);env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);env.getCheckpointConfig().setCheckpointTimeout(60000);env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//设置statebackend//env.setStateBackend(new RocksDBStateBackend("hdfs://hadoop100:9000/flink/checkpoints",true));String topic = "t1";Properties prop = new Properties();prop.setProperty("bootstrap.servers","hadoop110:9092");prop.setProperty("group.id","con1");FlinkKafkaConsumer011<String> myConsumer = new FlinkKafkaConsumer011<>(topic, new SimpleStringSchema(), prop);myConsumer.setStartFromGroupOffsets();//默认消费策略DataStreamSource<String> text = env.addSource(myConsumer);text.print().setParallelism(1);env.execute("StreamingFromCollection");}
}

Scala案例:

import java.util.Propertiesimport org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer011, FlinkKafkaProducer011}
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper/*** Created by xxxx on 2020/10/09 on 2018/10/23.*/
object StreamingKafkaSinkScala {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment//隐式转换import org.apache.flink.api.scala._//checkpoint配置env.enableCheckpointing(5000);env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500);env.getCheckpointConfig.setCheckpointTimeout(60000);env.getCheckpointConfig.setMaxConcurrentCheckpoints(1);env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//设置statebackend//env.setStateBackend(new RocksDBStateBackend("hdfs://hadoop100:9000/flink/checkpoints",true));val text = env.socketTextStream("hadoop100",9001,'\n')val topic = "t1"val prop = new Properties()prop.setProperty("bootstrap.servers","hadoop110:9092")//第一种解决方案,设置FlinkKafkaProducer011里面的事务超时时间//设置事务超时时间//prop.setProperty("transaction.timeout.ms",60000*15+"");//第二种解决方案,设置kafka的最大事务超时时间//FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<>(brokerList, topic, new SimpleStringSchema());//使用支持仅一次语义的形式val myProducer = new FlinkKafkaProducer011[String](topic,new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema()), prop, FlinkKafkaProducer011.Semantic.EXACTLY_ONCE)text.addSink(myProducer)env.execute("StreamingFromCollectionScala")}}
import java.util.Propertiesimport org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011/*** Created by xxxx on 2020/10/09 on 2018/10/23.*/
object StreamingKafkaSourceScala {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment//隐式转换import org.apache.flink.api.scala._//checkpoint配置env.enableCheckpointing(5000);env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500);env.getCheckpointConfig.setCheckpointTimeout(60000);env.getCheckpointConfig.setMaxConcurrentCheckpoints(1);env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//设置statebackend//env.setStateBackend(new RocksDBStateBackend("hdfs://hadoop100:9000/flink/checkpoints",true));val topic = "t1"val prop = new Properties()prop.setProperty("bootstrap.servers","hadoop110:9092")prop.setProperty("group.id","con1")val myConsumer = new FlinkKafkaConsumer011[String](topic,new SimpleStringSchema(),prop)val text = env.addSource(myConsumer)text.print()env.execute("StreamingFromCollectionScala")}}

1.12.1.Kafka Consumer消费策略设置

setStartFromGroupOffsets() 【默认消费策略】

  • 默认读取上次保存的offset信息
  • 如果是应用第一次启动,读取不到上次的offset信息,则会根据这个参数auto.offset.reset的值来进行消费数据。
    setStartFromEarliest()
  • 从最早的数据开始进行消费,忽略存储的offset信息。
    setStartFromSpecificOffsets(Map<KafkaTopicPartition, Long>)

1.12.2.Kafka Consumer的容错

当checkpoint机制开启的时候,Kafka Consumer会定期把kafka的offset信息还有其他operator的状态信息一块保存起来。当job失败重启的时候,Flink会从最近一次的checkpoint中进行恢复数据,重新消费kafka中的数据。
为了能够使用支持容错的kafka Consumer,需要开启checkpoint

  • env.enableCheckpointing(5000); // 每5s checkpoint一次

1.12.3.动态加载Topic

1.12.4.Kafka Consumers Offset 自动提交

针对job是否开启checkpoint来区分
Checkpoint关闭时: 可以通过下面两个参数配置

  • enable.auto.commit
  • auto.commit.interval.ms

Checkpoint开启时:当执行checkpoint的时候才会保存offset,这样保证了kafka的offset和checkpoint的状态偏移量保持一致。

  • 可以通过这个参数设置setCommitOffsetsOnCheckpoints(boolean)
  • 这个参数默认就是true。表示在checkpoint的时候提交offset
  • 此时,kafka中的自动提交机制就会被忽略

1.12.5.Kafka Producer

1.12.6.Kafka Producer的容错-Kafka 0.9 and 0.10

如果Flink开启了checkpoint,针对FlinkKafkaProducer09 和FlinkKafkaProducer010 可以提供 at-least-once的语义,还需要配置下面两个参数

  • setLogFailuresOnly(false)
  • setFlushOnCheckpoint(true)

注意:建议修改kafka 生产者的重试次数
retries【这个参数的值默认是0】

1.12.7.Kafka Producer的容错-Kafka 0.11

如果Flink开启了checkpoint,针对FlinkKafkaProducer011 就可以提供 exactly-once的语义
但是需要选择具体的语义

  • Semantic.NONE
  • Semantic.AT_LEAST_ONCE【默认】
  • Semantic.EXACTLY_ONCE

1.12.Flink Kafka-Connector详解、Consumer消费策略设置、动态加载Topic、Consumers Offset 自动提交、Producer、容错等相关推荐

  1. Kafka配置详解-Consumer配置

    转载自:http://orchome.com/535 3.4 kafka消费者配置 在0.9.0.0中,我们引入了新的Java消费者来替代早期基于Scala的简单和高级消费者.新老客户端的配置如下. ...

  2. Android音频框架之一 详解audioPolicy流程及HAL驱动加载与配置

    前言 此音频架构梳理笔记.主要是因工作上需要在 Android8.1 以上版本中,增加 snd-aloop 虚拟声卡做前期准备工作, 本篇文章提纲挈领的把音频框架主线梳理清晰,通过这篇文章能够清晰如下 ...

  3. java环境变量详解---找不到或无法加载主类

    刚学java,配置好环境变量之后,在DOS下却运行java小程序却始终出现"找不到或无法加载主类"然后返回配置环境变量折腾了好久,查看书籍.网上的资料,最终OK了!安装Eclips ...

  4. 从龟速 11s 到闪电 1s,详解前端性能优化之首屏加载

    点击上方 前端瓶子君,关注公众号 回复算法,加入前端编程面试算法每日一题群 全文共6511字/词,阅读大概需要13分钟,太长不看党请直接移步

  5. WinDBG详解进程初始化dll是如何加载的

    一:背景 1.讲故事 有朋友咨询个问题,他每次在调试 WinDbg 的时候,进程初始化断点之前都会有一些 dll 加载到进程中,比如下面这样: Microsoft (R) Windows Debugg ...

  6. java 找不到环境变量_java环境变量详解---找不到或无法加载主类

    默认安装在C:\ProgramFiles\Java\jdk1.7.0目录下 环境变量配置为 PATH=.;%JAVA_HOME%\bin CLASSPATH=.;%JAVA_HOME%\lib\dt. ...

  7. Kafka 原理详解

    Kafka 原理详解 1 kakfa基础概念说明 Broker:消息服务器,就是我们部署的一个kafka服务 Partition:消息的水平分区,一个Topic可以有多个分区,这样实现了消息的无限量存 ...

  8. 全国计算机棋类竞赛,自主招生认可的12类主流竞赛详解

    自主招生认可的12类主流竞赛详解 2016年全国有90所自主招生院校,其中77所针对全国招生,13所针对本省招生.在如此多的自主招生院校中,大家最为感兴趣的就是,2017高校自主招生到底认可什么样的竞 ...

  9. OpenCV-Python实战(12)——一文详解AR增强现实

    OpenCV-Python实战(12)--一文详解AR增强现实 0. 前言 1. 增强现实简介 2. 基于无标记的增强现实 2.1 特征检测 2.2 特征匹配 2.3 利用特征匹配和单应性计算以查找对 ...

最新文章

  1. 数学建模学习笔记——拟合算法
  2. python开发工具管理系统_Python之软件管理
  3. Redis 新特性篇:100% 掌握多线程模型
  4. 手环升级鸿蒙设备名单,鸿蒙2.0升级名单已确认-可首批升级的42款机型推荐
  5. 形容PHP程序员的语句,形容程序员的句子
  6. android 勿扰模式代码,Android N Zen Mode (勿扰模式)设置流程
  7. java web截屏_java_WebDriver中实现对特定的Web区域截图方法,用过 WebDriver 的同学都知道,We - phpStudy...
  8. JAVA多线程与并发学习总结
  9. XML1_XML基础
  10. linux实现普通用户只允许使用部分命令
  11. VB6各类源码开源 - 开源研究系列文章
  12. 【C/C++】求解线性方程组的雅克比迭代与高斯赛德尔迭代
  13. MOV格式视频转MP4
  14. B1/B2签证的有效期——对于B1/B2签证,停留期最长不超过183天
  15. 曼哈顿距离,欧式距离,明式距离,切比雪夫距离区别
  16. MG7780打印机喷嘴堵塞
  17. scada如何用oracle数据库,SCADA系统数据库存储功能设计及应用
  18. 皮皮APP语音派对策划师:千亿娱乐社交下的百万自由职业者
  19. C语言实现进度条彩色变化
  20. Vue3 setup语法糖销毁一个或多个定时器(setTimeout或setInterval)

热门文章

  1. excel可视化图表插件_Excel新版图表插件EasyShu: 新型面积图
  2. apollo 配置中心_Apollo配置中心搭建笔记
  3. 深度学习-机器学习(神经网络 1)
  4. python 实现文本自动翻译功能
  5. wxWidgets:wxScrollBar类用法
  6. boost::stl_interfaces模块实现过滤的整数迭代器的测试程序
  7. boost::python::pointee相关的测试程序
  8. boost::lexical_cast模块将创建一个to_long_double方法,将 Boost.Variant 的值转换为long double
  9. boost::local_time模块custom_time_zone 和 posix_time_zone 的简单示例
  10. Boost:显示如何将累加器持久化到文件中