Flink提供了Kafka连接器,用于从或向Kafka读写数据。

本文总结Flink与Kafka集成中的问题,并对一些疑点进行总结和梳理。

问题一: 读Kafka的方式

## 读取一个Topic
FlinkKafkaConsumer010#FlinkKafkaConsumer010(String topic, DeserializationSchema<T> valueDeserializer, Properties props)
FlinkKafkaConsumer010#FlinkKafkaConsumer010(String topic, KafkaDeserializationSchema<T> deserializer, Properties props)
举例: FlinkKafkaConsumer010<String> kafkaConsumer = new FlinkKafkaConsumer010<>("userActionLog1", new SimpleStringSchema(), kafkaProperties);DataStream<String> source = env.addSource(kafkaConsumer).name("KafkaSource").uid("source-id");## 读取多个Topic
FlinkKafkaConsumer010#FlinkKafkaConsumer010(List<String> topics, DeserializationSchema<T> deserializer, Properties props)
FlinkKafkaConsumer010#FlinkKafkaConsumer010(List<String> topics, KafkaDeserializationSchema<T> deserializer, Properties props)
举例:FlinkKafkaConsumer010<String> kafkaConsumer = new FlinkKafkaConsumer010<>(Arrays.asList("userActionLog1","userActionLog2","userActionLog3"), new SimpleStringSchema(), kafkaProperties);DataStream<String> source = env.addSource(kafkaConsumer).name("KafkaSource").uid("source-id");# 读取多个Topic
FlinkKafkaConsumer010#FlinkKafkaConsumer010(Pattern subscriptionPattern, DeserializationSchema<T> valueDeserializer, Properties props)
FlinkKafkaConsumer010#FlinkKafkaConsumer010(Pattern subscriptionPattern, KafkaDeserializationSchema<T> deserializer, Properties props)
举例:FlinkKafkaConsumer010<String> kafkaConsumer = new FlinkKafkaConsumer010<>(Pattern.compile("userActionLog[1-9]{1}"), new SimpleStringSchema(), kafkaProperties);DataStream<String> source = env.addSource(kafkaConsumer).name("KafkaSource").uid("source-id");# 从指定的时间戳开始消费
kafkaConsumer.setStartFromTimestamp(long startupOffsetsTimestamp)# 从指定的偏移量开始消费,可为每个分区单独设置偏移量
kafkaConsumer.setStartFromSpecificOffsets(Map<KafkaTopicPartition, Long> specificStartupOffsets)

问题二: 读Kafka与反序列化器

可通过org.apache.flink.api.common.serialization.DeserializationSchemaorg.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema,将从Kafka读取的二进制字节流反序列化成Flink内部支持的Java/Scala对象。

Flink内置支持以下2种常用反序列化器:

  1. org.apache.flink.api.common.serialization.SimpleStringSchema:反序列化成String。

  2. org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema:反序列化成jackson ObjectNode。

如果想实现Kafka复杂JSON直接转换成想要的Object,可仿照org.apache.flink.api.common.serialization.SimpleStringSchema自定义即可。主要实现deserialize反序列化方法。

问题三: 读Kafka动态发现Topic、Partition

之前使用Spark Streaming,Spark 2.2.2不支持动态发现Kafka 0.10.1中新增的Topic(基于正则指定)和Partition。当新增了Topic或Partition,需要重启Spark Streaming任务。

在Flink中, 默认支持动态发现Kafka中新增的Topic或Partition,但需要手动开启。

kafkaProperties.put("flink.partition-discovery.interval-millis","10000");flink.partition-discovery.interval-millis: 检查间隔,单位毫秒。

问题四: 读Kafka与Exactly Once语义

没有开启Checkpoint,默认自动提交Offset至外部存储(如Kafka Broker或Zookeeper),自动提交的间隔是5秒。Flink Kafka Consumer的容错依赖于自动提交的Offset。

开启Checkpoint,默认在Checkpoint完成后将存储在Checkpoint中的Offset再提交至外部存储(如Kafka Broker或0.8版本中的Zookeeper),Flink Kafka Consumer在Flink作业运行过程中的容错依赖于Checkpoint中的Offset,Flink作业恢复,则可能是从Checkpoint中的Offset恢复,也可能是从外部存储如Kafka Broker中的Offset恢复,具体取决于恢复方式。注意: 在这种方式下,Kafka Broker(或0.8中Zookeeper)存储的Offset仅用于监控消费进度。

总结,基于Kafka可重复消费的能力并结合Flink Checkpoint机制,Flink Kafka Consumer能提供Exactly-Once语义。

问题五: 写Kafka与Exactly Once语义

  • Kafka 0.8 Flink不提供Exactly-Once或At-Least-Once语义。

  • Kafka 0.9、0.10 Flink启用Checkpoint,FlinkKafkaProducer09FlinkKafkaProducer010提供At-Least-Once语义。除此之外,还需设置以下参数:

    setLogFailuresOnly(false): 若为true,Producer遇到异常时,仅记录失败时的日志,流处理程序继续。需要设置为false,当遇到异常,流处理程序失败,抛出异常恢复任务并重试发送。

    setFlushOnCheckpoint(true): Checkpoint中包含Kafka Producer Buffer中的数据,设置为true, 确保Checkpoint成功前,Buffer中的所有记录都已写入Kafka。

    retries: 重试次数,默认0,建议设置更大。

  • Kafka 0.11、1.0.0+ Flink启用Checkpoint,基于Two Phase Commit,FlinkKafkaProducer011FlinkKafkaProducer(Kafka >=1.0.0) 默认提供Exactly-Once语义。
    如需要其他语义Semantic.NONE(可能会丢或重)Semantic.AT_LEAST_ONCE(可能会重)Semantic.EXACTLY_ONCE(默认),可手动选择。

Kafka 0.10.1读数据并写入到Kafka 0.11.0.3并实现PV统计

部分依赖

<!--Kafka连接器-->
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.10_2.11</artifactId><version>1.8.0</version>
</dependency><!--Kafka连接器-->
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.11_2.11</artifactId><version>1.8.0</version>
</dependency>

代码实现

package com.bigdata.flink;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
import org.apache.flink.util.Collector;import java.text.Format;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;/*** Author: Wang Pei* Summary: 读写Kafka*/
public class ReadWriteKafka {public static void main(String[] args) throws Exception{/**解析命令行参数*/ParameterTool fromArgs = ParameterTool.fromArgs(args);ParameterTool parameterTool = ParameterTool.fromPropertiesFile(fromArgs.getRequired("applicationProperties"));//checkpoint参数String checkpointDirectory = parameterTool.getRequired("checkpointDirectory");long checkpointSecondInterval = parameterTool.getLong("checkpointSecondInterval");//fromKafka参数String fromKafkaBootstrapServers = parameterTool.getRequired("fromKafka.bootstrap.servers");String fromKafkaGroupID = parameterTool.getRequired("fromKafka.group.id");String fromKafkaAutoOffsetReset= parameterTool.getRequired("fromKafka.auto.offset.reset");String fromKafkaTopic = parameterTool.getRequired("fromKafka.topic");//toKafka参数String toKafkaBootstrapServers = parameterTool.getRequired("toKafka.bootstrap.servers");String toKafkaTopic = parameterTool.getRequired("toKafka.topic");//窗口参数long tumblingWindowLength = parameterTool.getLong("tumblingWindowLength");long outOfOrdernessSeconds = parameterTool.getLong("outOfOrdernessSeconds");/**配置运行环境*///设置Local Web ServerConfiguration config = new Configuration();config.setInteger(RestOptions.PORT,8081);config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//设置StateBackendenv.setStateBackend((StateBackend) new FsStateBackend(checkpointDirectory, true));//设置CheckpointCheckpointConfig checkpointConfig = env.getCheckpointConfig();checkpointConfig.setCheckpointInterval(checkpointSecondInterval * 1000);checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);/**配置数据源*/Properties kafkaProperties = new Properties();kafkaProperties.put("bootstrap.servers",fromKafkaBootstrapServers);kafkaProperties.put("group.id",fromKafkaGroupID);kafkaProperties.put("auto.offset.reset",fromKafkaAutoOffsetReset);FlinkKafkaConsumer010<String> kafkaConsumer = new FlinkKafkaConsumer010<>(fromKafkaTopic, new SimpleStringSchema(), kafkaProperties);kafkaConsumer.setCommitOffsetsOnCheckpoints(true);DataStream<String> source = env.addSource(kafkaConsumer).name("KafkaSource").uid("source-id");/**抽取转换*/SingleOutputStreamOperator<Tuple4<String, String, String, Integer>> sourceMap = source.map((MapFunction<String, Tuple4<String, String, String, Integer>>) value -> {Tuple4<String, String, String, Integer> output = new Tuple4<>();try {JSONObject obj = JSON.parseObject(value);output.f0 = obj.getString("userID");output.f1 = obj.getString("eventTime");output.f2 = obj.getString("eventType");output.f3 = obj.getInteger("productID");} catch (Exception e) {e.printStackTrace();}return output;}).returns(new TypeHint<Tuple4<String, String, String, Integer>>(){}).name("Map: ExtractTransform").uid("map-id");/**过滤掉异常数据*/SingleOutputStreamOperator<Tuple4<String, String, String, Integer>> sourceFilter = sourceMap.filter((FilterFunction<Tuple4<String, String, String, Integer>>) value -> value != null).name("Filter: FilterExceptionData").uid("filter-id");/**抽取时间戳并发射水印*/SingleOutputStreamOperator<Tuple4<String, String, String, Integer>> assignTimestampsAndWatermarks = sourceFilter.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple4<String, String, String, Integer>>(Time.seconds(outOfOrdernessSeconds)) {SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");@Overridepublic long extractTimestamp(Tuple4<String, String, String, Integer> element) {long timestamp = 0L;try {Date date = format.parse(element.f1);timestamp = date.getTime();} catch (ParseException e) {e.printStackTrace();}return timestamp;}}).uid("watermark-id");/**窗口统计*/SingleOutputStreamOperator<String> aggregate = assignTimestampsAndWatermarks//默认用Hash方式.keyBy((KeySelector<Tuple4<String, String, String, Integer>, String>) value -> value.f2).window(TumblingEventTimeWindows.of(Time.seconds(tumblingWindowLength)))//在每个窗口(Window)上应用WindowFunction(CustomWindowFunction)//CustomAggFunction用于增量聚合//在每个窗口上,先进行增量聚合(CustomAggFunction),然后将增量聚合的结果作为WindowFunction(CustomWindowFunction)的输入,计算后并输出//具体: 可参考底层AggregateApplyWindowFunction的实现.aggregate(new CustomAggFunction(), new CustomWindowFunction());//aggregate.print();/**结果输出*/Properties kafkaProducerProperties = new Properties();kafkaProducerProperties.setProperty("bootstrap.servers",toKafkaBootstrapServers);kafkaProducerProperties.setProperty("transaction.timeout.ms",60000+"");FlinkKafkaProducer011<String> kafkaProducer011 = new FlinkKafkaProducer011<>(toKafkaTopic,new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()),kafkaProducerProperties,FlinkKafkaProducer011.Semantic.EXACTLY_ONCE);aggregate.addSink(kafkaProducer011).name("outputToKafka");env.execute();}/*** 自定义AggregateFunction* 增量聚合,这里实现累加效果*/static class CustomAggFunction implements AggregateFunction<Tuple4<String, String, String, Integer>,Long,Long> {@Overridepublic Long createAccumulator() {return 0L;}@Overridepublic Long add(Tuple4<String, String, String, Integer> value, Long accumulator) {return accumulator + 1;}@Overridepublic Long getResult(Long accumulator) {return accumulator;}@Overridepublic Long merge(Long accumulator1, Long accumulator2) {return accumulator1 + accumulator2;}}/*** 自定义WindowFunction* 对增量聚合的结果再做处理,并输出*/static class CustomWindowFunction implements WindowFunction<Long, String,String, TimeWindow> {@Overridepublic void apply(String key, TimeWindow window, Iterable<Long> input, Collector<String> out) throws Exception {Format format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");long windowStart = window.getStart();long windowEnd = window.getEnd();Long windowPV = input.iterator().next();String output=format.format(new Date(windowStart))+","+format.format(new Date(windowEnd))+","+key+","+windowPV;out.collect(output);}}}

Flink DataStream读写Kafka相关推荐

  1. Flink DataStream Connectors 之 Apache Kafka 连接器

    文章目录 依赖 Kafka Source 使用方法 Topic / Partition 订阅 消息解析 起始消费位点 有界 / 无界模式 其他属性 动态分区检查 事件时间和水印 空闲 消费位点提交 监 ...

  2. Flink(16):Flink之Connect Kafka API

    目录 ​​​​​​0. 相关文章链接 1. pom依赖 2. 参数设置 3. 参数说明 3.1. 序列化和反序列化器 3.2. 消费者起始位置 3.3. 动态分区检测 3.4. Connect Kaf ...

  3. flink DataStream API使用及原理

    传统的大数据处理方式一般是批处理式的,也就是说,今天所收集的数据,我们明天再把今天收集到的数据算出来,以供大家使用,但是在很多情况下,数据的时效性对于业务的成败是非常关键的. Spark 和 Flin ...

  4. Flink当中使用kafka Consumer

    Flink与kafka结合使用的三个优势: 第一:kafka可以作为Flink的Source和Sink来使用: 第二:Kafka的Partition机制和Flink的并行度机制可以深度结合,从而提高数 ...

  5. Flink DataStream API 介绍

    Flink DataStream API 介绍 StreamExecutionEnvironment #mermaid-svg-JKeWa22W2vWA4zBS {font-family:" ...

  6. Flink系列之:基于scala语言实现flink实时消费Kafka Topic中的数据

    Flink系列之:基于scala语言实现flink实时消费Kafka Topic中的数据 一.引入flink相关依赖 二.properties保存连接kafka的配置 三.构建flink实时消费环境 ...

  7. Flink的Sink_API_Demo (kafka sink 、redis sink 、es sink)

    文章目录 pom文件说明 说明 必要的前提 下面的代码,启动后手动往topic打入数据,该程序读出来,逻辑是通过split分割(盖戳),然后select分流,取applestream写入kafka指定 ...

  8. Flink DataStream iterate算子的简单使用

    Flink DataStream iterate算子的简单使用 由于DataStream程序可能永远不会完成,因此没有最大迭代次数.相反你需要指定流的哪个部分反馈到迭代,哪个部分使用split转换或转 ...

  9. 【Flink】flink并行度与kafka分区(partition)设置

    1.概述 默认: [Flink]FlinkConsumer是如何保证一个partition对应一个thread的 当分区与并行度不一样呢? 2.原理 采用取模运算:平衡 kafka partition ...

最新文章

  1. 教育部:住宿费可以退!你的学校退了吗?
  2. MERGE批量增删查改数据
  3. oracle最大空闲时间,使用Oracle PROFILE控制会话空闲时间
  4. Java交流|面试最后一问:你有什么问题想问我吗?
  5. Python zip函数 - Python零基础入门教程
  6. Spring MVC国际化(i18n)和本地化(L10n)示例
  7. Windows Vista桌面窗口管理器(3)
  8. CryptoQuant CEO:OKEx暂停提款对比特币价格影响不大
  9. coco 数据集_如何用 coco 数据集训练 Detectron2 模型?
  10. [debug] Expected to have finished reduction in the prior iteration before starting a new one.
  11. 在计算机rwn代表,基于改进和RWn-SVM的化工过程故障快速诊断.pdf
  12. D. Magic Gems(矩阵快速幂 || 无敌杜教)
  13. spring mvc controller间跳转 重定向 传参 (转)
  14. 《剑指offer》面试题4——替换空格 C++编程
  15. highchart图表 | 加基准线
  16. 月薪30K+的电子工程师应具备什么?
  17. [从零开始学习FPGA编程-8]:快速入门篇 - 总体 - FPGA功能开发详细流程与关键步骤解读
  18. find命令查找包含指定内容的文件
  19. 第十五篇:大球联赛与小球联赛
  20. 公有继承中 构造函数和析构函数的调用(包含内嵌子对象)

热门文章

  1. 江苏省计算机一级考试试题库,2016年江苏省-计算机一级考试.试题-
  2. 对东方财经个股资金流的爬取分析
  3. python安装作业
  4. 数据库范式——(1NF,2NF,3NF,BCNF,含实例分析)
  5. 如何确定一个期刊是不是EI?
  6. 我喜欢的一篇关于家庭教育的文章
  7. 为什么软件开发周期总是预估的2~3倍?
  8. 慕容垂:百万战骨风云里——激荡的鲜卑史略之一(转载)
  9. 自媒体新手怎么赚钱,搬运不是长久的出路!
  10. Randy Pausch_卡内基梅隆大学演讲--真正实现你的梦想