目录

Kafka

pom依赖

参数设置

参数说明

Kafka命令

代码实现-Kafka Consumer

代码实现-Kafka Producer

代码实现-实时ETL


Kafka

pom依赖

Flink 里已经提供了一些绑定的 Connector,例如 kafka source 和 sink,Es sink 等。读写 kafka、es、rabbitMQ 时可以直接使用相应 connector 的 api 即可,虽然该部分是 Flink 项目源代码里的一部分,但是真正意义上不算作 Flink 引擎相关逻辑,并且该部分没有打包在二进制的发布包里面。所以在提交 Job 时候需要注意, job 代码 jar 包中一定要将相应的 connetor 相关类打包进去,否则在提交作业时就会失败,提示找不到相应的类,或初始化某些类异常。

//ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/

参数设置

以下参数都必须/建议设置上

1.订阅的主题

2.反序列化规则

3.消费者属性-集群地址

4.消费者属性-消费者组id(如果不设置,会有默认的,但是默认的不方便管理)

5.消费者属性-offset重置规则,如earliest/latest...

6.动态分区检测(当kafka的分区数变化/增加时,Flink能够检测到!)

7.如果没有设置Checkpoint,那么可以设置自动提交offset,后续学习了Checkpoint会把offset随着做Checkpoint的时候提交到Checkpoint和默认主题中

​​​​​​​参数说明

实际的生产环境中可能有这样一些需求,比如:

l场景一:有一个 Flink 作业需要将五份数据聚合到一起,五份数据对应五个 kafka topic,随着业务增长,新增一类数据,同时新增了一个 kafka topic,如何在不重启作业的情况下作业自动感知新的 topic。

l场景二:作业从一个固定的 kafka topic 读数据,开始该 topic 有 10 个 partition,但随着业务的增长数据量变大,需要对 kafka partition 个数进行扩容,由 10 个扩容到 20。该情况下如何在不重启作业情况下动态感知新扩容的 partition?

针对上面的两种场景,首先需要在构建 FlinkKafkaConsumer 时的 properties 中设置 flink.partition-discovery.interval-millis 参数为非负值,表示开启动态发现的开关,以及设置的时间间隔。此时 FlinkKafkaConsumer 内部会启动一个单独的线程定期去 kafka 获取最新的 meta 信息。

l针对场景一,还需在构建 FlinkKafkaConsumer 时,topic 的描述可以传一个正则表达式描述的 pattern。每次获取最新 kafka meta 时获取正则匹配的最新 topic 列表。

l针对场景二,设置前面的动态发现参数,在定期获取 kafka 最新 meta 信息时会匹配新的 partition。为了保证数据的正确性,新发现的 partition 从最早的位置开始读取。

注意:

开启 checkpoint 时 offset 是 Flink 通过状态 state 管理和恢复的,并不是从 kafka 的 offset 位置恢复。在 checkpoint 机制下,作业从最近一次checkpoint 恢复,本身是会回放部分历史数据,导致部分数据重复消费,Flink 引擎仅保证计算状态的精准一次,要想做到端到端精准一次需要依赖一些幂等的存储系统或者事务操作。

​​​​​​​Kafka命令

● 查看当前服务器中的所有topic

/export/server/kafka/bin/kafka-topics.sh --list --zookeeper  node1:2181

● 创建topic

/export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 2 --partitions 3 --topic flink_kafka

● 查看某个Topic的详情

/export/server/kafka/bin/kafka-topics.sh --topic flink_kafka --describe --zookeeper node1:2181

● 删除topic

/export/server/kafka/bin/kafka-topics.sh --delete --zookeeper node1:2181 --topic flink_kafka

● 通过shell命令发送消息

/export/server/kafka/bin/kafka-console-producer.sh --broker-list node1:9092 --topic flink_kafka

● 通过shell消费消息

/export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic flink_kafka --from-beginning

● 修改分区

 /export/server/kafka/bin/kafka-topics.sh --alter --partitions 4 --topic flink_kafka --zookeeper node1:2181

​​​​​​​代码实现-Kafka Consumer

package cn.it.connectors;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;import java.util.Properties;/*** Author lanson* Desc* 需求:使用flink-connector-kafka_2.12中的FlinkKafkaConsumer消费Kafka中的数据做WordCount* 需要设置如下参数:* 1.订阅的主题* 2.反序列化规则* 3.消费者属性-集群地址* 4.消费者属性-消费者组id(如果不设置,会有默认的,但是默认的不方便管理)* 5.消费者属性-offset重置规则,如earliest/latest...* 6.动态分区检测(当kafka的分区数变化/增加时,Flink能够检测到!)* 7.如果没有设置Checkpoint,那么可以设置自动提交offset,后续学习了Checkpoint会把offset随着做Checkpoint的时候提交到Checkpoint和默认主题中*/
public class ConnectorsDemo_KafkaConsumer {public static void main(String[] args) throws Exception {//1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//2.SourceProperties props  = new Properties();props.setProperty("bootstrap.servers", "node1:9092");props.setProperty("group.id", "flink");props.setProperty("auto.offset.reset","latest");props.setProperty("flink.partition-discovery.interval-millis","5000");//会开启一个后台线程每隔5s检测一下Kafka的分区情况props.setProperty("enable.auto.commit", "true");props.setProperty("auto.commit.interval.ms", "2000");//kafkaSource就是KafkaConsumerFlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<>("flink_kafka", new SimpleStringSchema(), props);kafkaSource.setStartFromGroupOffsets();//设置从记录的offset开始消费,如果没有记录从auto.offset.reset配置开始消费//kafkaSource.setStartFromEarliest();//设置直接从Earliest消费,和auto.offset.reset配置无关DataStreamSource<String> kafkaDS = env.addSource(kafkaSource);//3.Transformation//3.1切割并记为1SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOneDS = kafkaDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {String[] words = value.split(" ");for (String word : words) {out.collect(Tuple2.of(word, 1));}}});//3.2分组KeyedStream<Tuple2<String, Integer>, Tuple> groupedDS = wordAndOneDS.keyBy(0);//3.3聚合SingleOutputStreamOperator<Tuple2<String, Integer>> result = groupedDS.sum(1);//4.Sinkresult.print();//5.executeenv.execute();}
}

​​​​​​​代码实现-Kafka Producer

  • 需求:

将Flink集合中的数据通过自定义Sink保存到Kafka

  • 代码实现
package cn.it.connectors;import com.alibaba.fastjson.JSON;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;import java.util.Properties;/*** Author lanson* Desc* 使用自定义sink-官方提供的flink-connector-kafka_2.12-将数据保存到Kafka*/
public class ConnectorsDemo_KafkaProducer {public static void main(String[] args) throws Exception {//1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//2.SourceDataStreamSource<Student> studentDS = env.fromElements(new Student(1, "tonyma", 18));//3.Transformation//注意:目前来说我们使用Kafka使用的序列化和反序列化都是直接使用最简单的字符串,所以先将Student转为字符串//可以直接调用Student的toString,也可以转为JSONSingleOutputStreamOperator<String> jsonDS = studentDS.map(new MapFunction<Student, String>() {@Overridepublic String map(Student value) throws Exception {//String str = value.toString();String jsonStr = JSON.toJSONString(value);return jsonStr;}});//4.SinkjsonDS.print();//根据参数创建KafkaProducer/KafkaSinkProperties props = new Properties();props.setProperty("bootstrap.servers", "node1:9092");FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<>("flink_kafka",  new SimpleStringSchema(),  props);jsonDS.addSink(kafkaSink);//5.executeenv.execute();// /export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic flink_kafka}@Data@NoArgsConstructor@AllArgsConstructorpublic static class Student {private Integer id;private String name;private Integer age;}
}

​​​​​​​代码实现-实时ETL

package cn.it.connectors;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;import java.util.Properties;/*** Author lanson* Desc 演示Flink-Connectors-KafkaComsumer/Source + KafkaProducer/Sink*/
public class KafkaETLDemo {public static void main(String[] args) throws Exception {//TODO 0.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//TODO 1.source//准备kafka连接参数Properties props  = new Properties();props.setProperty("bootstrap.servers", "node1:9092");//集群地址props.setProperty("group.id", "flink");//消费者组idprops.setProperty("auto.offset.reset","latest");//latest有offset记录从记录位置开始消费,没有记录从最新的/最后的消息开始消费 /earliest有offset记录从记录位置开始消费,没有记录从最早的/最开始的消息开始消费props.setProperty("flink.partition-discovery.interval-millis","5000");//会开启一个后台线程每隔5s检测一下Kafka的分区情况,实现动态分区检测props.setProperty("enable.auto.commit", "true");//自动提交(提交到默认主题,后续学习了Checkpoint后随着Checkpoint存储在Checkpoint和默认主题中)props.setProperty("auto.commit.interval.ms", "2000");//自动提交的时间间隔//使用连接参数创建FlinkKafkaConsumer/kafkaSourceFlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<String>("flink_kafka", new SimpleStringSchema(), props);//使用kafkaSourceDataStream<String> kafkaDS = env.addSource(kafkaSource);//TODO 2.transformationSingleOutputStreamOperator<String> etlDS = kafkaDS.filter(new FilterFunction<String>() {@Overridepublic boolean filter(String value) throws Exception {return value.contains("success");}});//TODO 3.sinketlDS.print();Properties props2 = new Properties();props2.setProperty("bootstrap.servers", "node1:9092");FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<>("flink_kafka2", new SimpleStringSchema(), props2);etlDS.addSink(kafkaSink);//TODO 4.executeenv.execute();}
}
//控制台生成者 ---> flink_kafka主题 --> Flink -->etl ---> flink_kafka2主题--->控制台消费者
//准备主题 /export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 2 --partitions 3 --topic flink_kafka
//准备主题 /export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 2 --partitions 3 --topic flink_kafka2
//启动控制台生产者发送数据 /export/server/kafka/bin/kafka-console-producer.sh --broker-list node1:9092 --topic flink_kafka
//log:2020-10-10 success xxx
//log:2020-10-10 success xxx
//log:2020-10-10 success xxx
//log:2020-10-10 fail xxx
//启动控制台消费者消费数据 /export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic flink_kafka2 --from-beginning
//启动程序FlinkKafkaConsumer
//观察控制台输出结果

2021年大数据Flink(十五):流批一体API Connectors ​​​​​​​Kafka相关推荐

  1. 大数据架构如何做到流批一体?【对于Flink等流批一体的概念做了很好的澄清!】

    导读:大数据与现有的科技手段结合,对大多数产业而言都能产生巨大的经济及社会价值.这也是当下许多企业,在大数据上深耕的原因.大数据分析场景需要解决哪些技术挑战?目前,有哪些主流大数据架构模式及其发展?今 ...

  2. 大数据架构如何做到流批一体?

    阿里妹导读:大数据与现有的科技手段结合,对大多数产业而言都能产生巨大的经济及社会价值.这也是当下许多企业,在大数据上深耕的原因.大数据分析场景需要解决哪些技术挑战?目前,有哪些主流大数据架构模式及其发 ...

  3. pb数据窗口怎么调用视图_大数据架构如何做到流批一体?

    阿里妹导读:大数据与现有的科技手段结合,对大多数产业而言都能产生巨大的经济及社会价值.这也是当下许多企业,在大数据上深耕的原因.大数据分析场景需要解决哪些技术挑战?目前,有哪些主流大数据架构模式及其发 ...

  4. 2021年大数据Flink(十六):流批一体API Connectors ​​​​​​​​​​​​​​Redis

    目录 Redis API 使用RedisCommand设置数据结构类型时和redis结构对应关系 需求 代码实现 Redis API 通过flink 操作redis 其实我们可以通过传统的redis ...

  5. 2021年大数据Flink(十四):流批一体API Connectors JDBC

    目录 Connectors JDBC 代码演示 Connectors JDBC Apache Flink 1.12 Documentation: JDBC Connector 代码演示 package ...

  6. 2021年大数据Flink(五):Standalone-HA高可用集群模式

    目录 Standalone-HA高可用集群模式 原理 操作 1.集群规划 2.启动ZooKeeper 3.启动HDFS 4.停止Flink集群 5.修改flink-conf.yaml 6.修改mast ...

  7. 大数据Flink(十四):流批一体API Connectors JDBC

    文章目录 Connectors JDBC 代码演示 Connectors JDBC Apache Flink 1.12 Documentation: JDBC Connector

  8. 2021年大数据Flink(十):流处理相关概念

    目录 流处理相关概念 数据的时效性 ​​​​​​​流处理和批处理 ​​​​​​​流批一体API DataStream API 支持批执行模式 API 编程模型 ​​​​​​​流处理相关概念 数据的时效 ...

  9. 2021年大数据HBase(五):HBase的相关操作JavaAPI方式

    全网最详细的大数据HBase文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 前言 HBase的相关操作-JavaAPI方式 一.需求说明 ...

最新文章

  1. mysql 查询语句 参数,mysql参数化查询语句有关问题
  2. Hadoop命令手册
  3. 解决 GraphQL 的限流难题
  4. 备战数学建模国赛,快速搞定算法模型!
  5. dmn是大脑中哪个区域_DMN中的函数式编程:感觉就像再次重读我的大学课程一样...
  6. 15秋计算机基础作业3,东师15秋《计算机应用基础》在线作业3介绍.doc
  7. mysql主从1594错误_3分钟解决MySQL主从1594错误
  8. vs2012新建项目产生的问题
  9. 英雄联盟祖安服务器要维护多久,祖安玩家的春天!英雄联盟将回归队内语音,娱乐玩家遭殃了?...
  10. 笔记本独显和集显linux,笔记本电脑独显不见了的解决方法介绍
  11. 项目管理必备文档大全:这14个项目文档你还不会写?
  12. ASP.Net中控件的EnableViewState属性
  13. 《MATLAB智能算法超级学习手册》一一1.5 简单工程应用分析
  14. spurious wakeups(虚假唤醒)
  15. 阿里云对象存储OSS
  16. 松翰单片机--SN8F5702学习笔记(二)HelloWorld
  17. REST Assured 4 - 第一个GET Request
  18. php织梦cms 安装教程,Linux服务器上安装织梦CMS,linux服务器织梦cms_PHP教程
  19. 038 罗尔定理及拉格朗日定理
  20. 笔记本电脑外接显示器投屏问题

热门文章

  1. IDEA的Docker插件实战(Dockerfile篇)
  2. Go 学习笔记(66)— Go 并发同步原语(sync.Mutex、sync.RWMutex、sync.Once)
  3. 解决LC_ALL: 无法改变区域选项 (UTF-8): 没有那个文件或目录的问题
  4. pytorch: Variable detach 与 detach_
  5. LeetCode简单题之石头与宝石
  6. HiCar人-车-家全场景智慧互联
  7. LCD: 2D-3D匹配算法
  8. Python:模拟登录、点击和执行 JavaScript 语句案例
  9. 你哪来这么多事(六):职工信息查找
  10. Andriod TextView typeface