Flink与kafka结合使用的三个优势:

第一:kafka可以作为Flink的Source和Sink来使用;
第二:Kafka的Partition机制和Flink的并行度机制可以深度结合,从而提高数据的读取率和写入效率
第三:当Flink任务失败的时候,可以通过设置kafka的Offset来恢复应用从而重新消费数据

添加依赖:

        <dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.10_2.11</artifactId><version>1.6.1</version></dependency>

含义:kafka的版本是0.10,scala的版本是2.11,如果你用的java语言编写,scala版本无所谓

1、kafka常用的API

2、Kafka Consumer的消费模式设置

1、setStartFromGroupOffsets
这是默认的消费策略,会从上次消费者保存的offset处继续开始消费,需要说明的是:如果任务是第一次启动,读取不到上次的offset信息,则会根据参数outo.offset
.reset的数值来消费数据
2、setStartFromEarliest
从最初的数据开始消费,忽略存储的Offset信息
3、setStartFromLatest
从最新的数据进行消费,忽略存储的Offset信息
4、setStartFromSpecificOffsets(Map<KafkaTopicPartition,Long> specificStartupOffsets)
从指定分区的具体位置开始消费

3、Kafka consumer的容错机制
当Flink checkPoint机制开启的时候,Kafka Consumer会定期把kafka的Offset信息以及其他的Operator的状态信息保存起来,进行快照存储。当Job失败重启的时候,Flink会从最近一次的CheckPoint中回复数据,重新消费Kafka当中的数据

为了使用支持容错的kafka Consumer,Kafka Consumer需要开启Checkpoint机制,可以通过下面的代码进行设置:
默认情况下,Flink的checkPiont功能是disable的,想要使用的时候需要先开启:

env.enableCheckpointing(1000);

完整参考代码:

//开启flink的checkpoint功能:每隔1000ms启动一个检查点(设置checkpoint的声明周期)
env.enableCheckpointing(1000);
//checkpoint高级选项设置
//设置checkpoint的模式为exactly-once(这也是默认值)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//确保检查点之间至少有500ms间隔(即checkpoint的最小间隔)
env.getCheckPointConfig().setMinPauseBetweenCheckpoints(500);
//确保检查必须在1min之内完成,否则就会被丢弃掉(即checkpoint的超时时间)
env.getCheckpointConfig().setCheckpointTimeout(60000);
//同一时间只允许操作一个检查点
env.getCheckpointConfig().setMaxConcurrentCheckpoint(1);
//程序即使被cancel后,也会保留checkpoint数据,以便根据实际需要恢复到指定的checkpoint
env.getCheckpointConfig().enableExternazedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//设置statebackend,指定state和checkpoint的数据存储位置(checkpoint的数据必须得有一个可以持久化存储的地方)
env.setStateBackend(new FsStateBackend("hdfs://s101:9000/flink/checkpoints"));

4、Kafka Consumer Offset自动提交行为的控制(到底谁决定了offset的自动提交)

kafka提交偏移量的频率offset。
kafka Consumer Offset自动提交的配置需要根据Job是否开启Checkpoint来区分
a、Checkpoint关闭时,可以通过下面2个参数进行配置:

enable.auto.commit
auto.commit.interval.ms

b、CheckPoint开启时,只有当执行Checkpoint的时候才会自动提交Offset,这样就保证了kafka的offset和checkpoint的状态偏移量保持一致,此时Kafka中的自动提交Offset机制就会被忽略。
这里说明一下:我的理解是Offset的存储在2个地方,Kafka存储一份,CheckPoint的时候在HDFS存储一份,一旦开启checkpoint机制,这俩个提交offset的时刻就会同步。

小列子:

package Flink_Kafka;import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;import java.util.Properties;//最简单的一个Kafka的代码
public class FlinkKafkaConsumer {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties consumerProperties = new Properties();//设置服务consumerProperties.setProperty("bootstrap.servers","s101:9092");//设置消费者组consumerProperties.setProperty("group.id","con56");//自动提交偏移量consumerProperties.setProperty("enable.auto.commit","true");consumerProperties.setProperty("auto.commit.interval.ms","2000");DataStream<String> dataStreamSource = env.addSource(new FlinkKafkaConsumer010<>("browse_topic",new SimpleStringSchema(),consumerProperties));dataStreamSource.print();env.execute("FlinkKafkaConsumer");}
}

Flink当中使用kafka Consumer相关推荐

  1. kafka consumer assign 和 subscribe模式差异分析

    转载请注明原创地址:http://www.cnblogs.com/dongxiao-yang/p/7200971.html 最近需要研究flink-connector-kafka的消费行为,发现fli ...

  2. Flink Kafka consumer的消费策略配置

    val helloStream: FlinkKafkaConsumer011[String] = new FlinkKafkaConsumer011[String]("hello" ...

  3. 【Flink】flink并行度与kafka分区(partition)设置

    1.概述 默认: [Flink]FlinkConsumer是如何保证一个partition对应一个thread的 当分区与并行度不一样呢? 2.原理 采用取模运算:平衡 kafka partition ...

  4. Flink DataStream读写Kafka

    Flink提供了Kafka连接器,用于从或向Kafka读写数据. 本文总结Flink与Kafka集成中的问题,并对一些疑点进行总结和梳理. 问题一: 读Kafka的方式 ## 读取一个Topic Fl ...

  5. Flink Caused by:org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException

    Flink程序从kafka中读取数据进行计算,FLink程序一启动就报以下错误,看到错误很懵逼.加班到9点没解决,第二天提前来半小时,把如下错误信息又看了一遍.具体错误如下: 错误信息1. 20/12 ...

  6. Flink(16):Flink之Connect Kafka API

    目录 ​​​​​​0. 相关文章链接 1. pom依赖 2. 参数设置 3. 参数说明 3.1. 序列化和反序列化器 3.2. 消费者起始位置 3.3. 动态分区检测 3.4. Connect Kaf ...

  7. Kafka consumer group位移0ffset重设

    本文阐述如何使用Kafka自带的kafka-consumer-groups.sh脚本随意设置消费者组(consumer group)的位移.需要特别强调的是, 这是0.11.0.0版本提供的新功能且只 ...

  8. 读Kafka Consumer源码

    最近一直在关注阿里的一个开源项目:OpenMessaging OpenMessaging, which includes the establishment of industry guideline ...

  9. Kafka设计解析(四):Kafka Consumer解析--转

    原文地址:http://www.infoq.com/cn/articles/kafka-analysis-part-4?utm_source=infoq&utm_campaign=user_p ...

最新文章

  1. TensorFlow Estimators: Managing Simplicity vs. Flexibility in High-Level Machine Learning Frameworks
  2. 网络对抗技术_实验二_网络嗅探与欺骗
  3. 树莓派:关于linux内核
  4. flink dataset api使用及原理
  5. api权限管理系统与前后端分离实践
  6. Linux内核源码分析方法
  7. mysql模糊查询与预编译_mysql预编译模糊查询恶心了我一天的时间,终于弄好了。但是还有一点不明白。如下:...
  8. 精锐教育创始人张熙辞任CEO 李晓明接任
  9. JSK-27 三值排序【贪心】
  10. 深度学习2.0-5.tensorflow的基础操作之前向传播(张量)实战
  11. ubuntu22打开utools报错 缺少libcrypto.so.1.1问题解决
  12. php公众号关注自动回复内容,微信公众号自动回复内容大全集锦
  13. 符号——Alt+数字键
  14. java控制鼠标操作一些重复的事情
  15. 3D视觉——4.手势识别(Gesture Recognition)入门——使用MediaPipe含单帧(Singel Frame)和实时视频(Real-Time Video)
  16. Labview上位机与单片机系统的开发
  17. SAM4E单片机之旅——10、UART与MCK之PLL
  18. 在Mac下安装XAMPP
  19. 7.4 反编译、篡改漏洞检测和重现
  20. python猜谜语小游戏代码_树莓派趣学实战100例--网络应用+Python编程+传感器+服务器搭建...

热门文章

  1. mysql 类型及其他
  2. 使用pdf.js在移动端预览pdf文档
  3. exif_imagetype() 函数在linux下的php中不存在
  4. Laravel Migrate
  5. Log4net系统日志
  6. pat1069. The Black Hole of Numbers (20)
  7. 未来五年程序员应当具备的十项技能
  8. java模拟病人就诊过程_new 患者在医院看病过程:先排队等候 联合开发网 - pudn.com...
  9. ue编辑器漏洞_编辑器漏洞手册
  10. python几种排序_Python实现几种排序算法