数据流往往需要访问外部的数据源来丰富自己的信息,比如通过record中的ip地址查询ip数据库maxmind的GeoIP2 Databases得到ip对应的城市名称,城市经纬度,将这些作为新的字段添加到原来的record中。这就涉及到本篇的主题:维表关联。

网上关于flink中维表关联的博文很多,本文我想谈一谈个人对不同方案的理解和尝试后发现的一些问题。如果想要比较全面地了解维表关联的各个解决方案,建议阅读参考文献前两篇。

技术选型

维表关联方案主要有以下几种

  • 实时数据库查找关联,又叫热存储维表
  • 预加载维表关联
  • 广播维表关联
  • 维表变更日志关联,最常见的就是Temporal table function join

这几种方案各有优劣,没有最好的方案,只有最适合的方案。

所谓实时数据库查找就是Flink中的算子保持与数据库的连接,每来一条record就提取关键字,直接查找外部的数据库。这个方案最致命的问题在于这种实时访问外部数据库进行查询的方式是很影响作业性能的,对数据库的负载很大,导致吞吐量很难提上去。而且大数据的流量一般都很大,频繁访问数据库导致产线上的数据库挂掉那就是重大的生产故障。当然,针对这个问题也有一些解决方案,比如同步查找可以替换为异步查找,还可以使用缓存使得热点数据直接在内存中就能找到不用访问外部数据库。Anyway,带来的性能提升效果有限,这种方案主要还是适用于流量不大的场景。

预加载维表关联就是在任务启动的时候就把维表加载在内存中,查找的时候直接在内存中找就可以了。这个方案查找的性能是最高的,毕竟直接在内存中查找。但它也有一些局限性,一是占用更多的内存资源,如果维表非常大(比如大于TM内存),就不可取;二是维表很难实时更新,尽管可以设置定时器定时刷新维表,但是如果维表更新的太频繁性能消耗就太大了。总的来说,这种方案适合维表不是非常大,维表更新也不是很频繁的场景。(该方案实现简单,性能高,也是我最终选择的方案)

前面两种方案都属于数据流与静态的表之间的关联,而后面两种方案则是数据流与数据流之间的关联。所谓广播维表就是将维表转化为广播流从Source广播到下游的算子中,然后作为广播态保存到State Backend中,可以是内存,也可以是rocksdb。将广播态保存到rocksdb中每次读取状态都涉及到序列化和反序列化,对性能是有一定影响的。将广播态以MapState的形式保存在heap中和预加载维表关联就比较类似。

我这边将预加载维表关联和广播维表这两个方案做一个对比:

  1. 都可以将完整的维表保存在内存中,维表查询性能较高。但是广播维表需要将维表从上游广播到下游,涉及到不同节点的数据传输(网络传输,序列化和反序列化等),会带来额外的性能损失。但是作为广播态保存,不同的slots可以共享广播态,每个TM只需要保存一份维表,而不是每个slots保存一份,内存的利用率更高。
  2. 广播维表需要把维表转化为数据流。好处是维表的实时性更高,方便实时更新。不好的地方是通常是把维表存储在Kafka中,考虑到实时性,实现上更复杂。也可以自定义source定时把最新的维表转化为数据流,和预加载维表的定时刷新方案一样,但这样维表更新就有延迟。
  3. 维表广播只能是数据流和一条广播流的join,不可以数据流和多条广播流join。预加载维表方案同一个算子可以预加载多个维表,维表广播的方案就需要把多个维表转化为同一个数据流进行广播,然后保存在不同名字的广播态中。实现起来比较复杂,另外就是代码聚合程度太高,很不优雅。

总的来说,广播维表方案维表的实时性高,数据查询性能高,资源利用率也高,属于比较全面的一个方案。缺点主要在于实现上较为复杂,而且也要求维表不能太大。

最后提一下维表变更日志关联,主要是Temporal table function join。目前Datastream API不支持,需要写Table API/Sql。这一块我没有做太多研究,就此不表。

代码示例

实时查询外部数据库

使用cache来减轻访问压力

package join;import com.google.common.cache.*;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;public class JoinDemo2 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<Tuple2<String, Integer>> textStream = env.socketTextStream("localhost", 9000, "\n").map(p -> {//输入格式为:user,1000,分别是用户名称和城市编号String[] list = p.split(",");return new Tuple2<String, Integer>(list[0], Integer.valueOf(list[1]));}).returns(new TypeHint<Tuple2<String, Integer>>() {});DataStream<Tuple3<String, Integer, String>> result = textStream.map(new MapJoinDemo1());result.print();env.execute("joinDemo1");}static class MapJoinDemo1 extends RichMapFunction<Tuple2<String, Integer>, Tuple3<String, Integer, String>> {LoadingCache<Integer, String> dim;@Overridepublic void open(Configuration parameters) throws Exception {//使用google LoadingCache来进行缓存dim = CacheBuilder.newBuilder()//最多缓存个数,超过了就根据最近最少使用算法来移除缓存.maximumSize(1000)//在更新后的指定时间后就回收.expireAfterWrite(10, TimeUnit.MINUTES)//指定移除通知.removalListener(new RemovalListener<Integer, String>() {@Overridepublic void onRemoval(RemovalNotification<Integer, String> removalNotification) {System.out.println(removalNotification.getKey() + "被移除了,值为:" + removalNotification.getValue());}}).build(//指定加载缓存的逻辑new CacheLoader<Integer, String>() {@Overridepublic String load(Integer cityId) throws Exception {String cityName = readFromHbase(cityId);return cityName;}});}private String readFromHbase(Integer cityId) {//读取hbase//这里写死,模拟从hbase读取数据Map<Integer, String> temp = new HashMap<>();temp.put(1001, "beijing");temp.put(1002, "shanghai");temp.put(1003, "wuhan");temp.put(1004, "changsha");String cityName = "";if (temp.containsKey(cityId)) {cityName = temp.get(cityId);}return cityName;}@Overridepublic Tuple3<String, Integer, String> map(Tuple2<String, Integer> value) throws Exception {//在map方法中进行主流和维表的关联String cityName = "";if (dim.get(value.f1) != null) {cityName = dim.get(value.f1);}return new Tuple3<>(value.f0, value.f1, cityName);}}
}

使用异步IO来提高访问吞吐量

package join;import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;public class JoinDemo3 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<Tuple2<String, Integer>> textStream = env.socketTextStream("localhost", 9000, "\n").map(p -> {//输入格式为:user,1000,分别是用户名称和城市编号String[] list = p.split(",");return new Tuple2<String, Integer>(list[0], Integer.valueOf(list[1]));}).returns(new TypeHint<Tuple2<String, Integer>>() {});DataStream<Tuple3<String,Integer, String>> orderedResult = AsyncDataStream//保证顺序:异步返回的结果保证顺序,超时时间1秒,最大容量2,超出容量触发反压.orderedWait(textStream, new JoinDemo3AyncFunction(), 1000L, TimeUnit.MILLISECONDS, 2).setParallelism(1);DataStream<Tuple3<String,Integer, String>> unorderedResult = AsyncDataStream//允许乱序:异步返回的结果允许乱序,超时时间1秒,最大容量2,超出容量触发反压.unorderedWait(textStream, new JoinDemo3AyncFunction(), 1000L, TimeUnit.MILLISECONDS, 2).setParallelism(1);orderedResult.print();unorderedResult.print();env.execute("joinDemo");}//定义个类,继承RichAsyncFunction,实现异步查询存储在mysql里的维表//输入用户名、城市ID,返回 Tuple3<用户名、城市ID,城市名称>static class JoinDemo3AyncFunction extends RichAsyncFunction<Tuple2<String, Integer>, Tuple3<String, Integer, String>> {// 链接private static String jdbcUrl = "jdbc:mysql://192.168.145.1:3306?useSSL=false";private static String username = "root";private static String password = "123";private static String driverName = "com.mysql.jdbc.Driver";java.sql.Connection conn;PreparedStatement ps;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);Class.forName(driverName);conn = DriverManager.getConnection(jdbcUrl, username, password);ps = conn.prepareStatement("select city_name from tmp.city_info where id = ?");}@Overridepublic void close() throws Exception {super.close();conn.close();}//异步查询方法@Overridepublic void asyncInvoke(Tuple2<String, Integer> input, ResultFuture<Tuple3<String,Integer, String>> resultFuture) throws Exception {// 使用 city id 查询ps.setInt(1, input.f1);ResultSet rs = ps.executeQuery();String cityName = null;if (rs.next()) {cityName = rs.getString(1);}List list = new ArrayList<Tuple2<Integer, String>>();list.add(new Tuple3<>(input.f0,input.f1, cityName));resultFuture.complete(list);}//超时处理@Overridepublic void timeout(Tuple2<String, Integer> input, ResultFuture<Tuple3<String,Integer, String>> resultFuture) throws Exception {List list = new ArrayList<Tuple2<Integer, String>>();list.add(new Tuple3<>(input.f0,input.f1, ""));resultFuture.complete(list);}}
}

预加载维表+定时刷新

我的维表是以文件的形式保存在本地磁盘中的。

如果是保存在外部数据库可参考参考文献4

 public static class MyMapFunction extends RichMapFunction<String,String>{private transient HashMap<String, String> hashMap;private HashMap<String,String> readTxtFile(String filePath) {HashMap<String,String> hashMap = new HashMap<>();File file = new File(filePath);try {if (file.isFile() && file.exists()) { //判断文件是否存在InputStreamReader read = new InputStreamReader(new FileInputStream(file), "UTF-8");//考虑到编码格式BufferedReader bufferedReader = new BufferedReader(read);// String lineTxt = null;while (bufferedReader.readLine() != null) {String lineTxt = bufferedReader.readLine();String[] str = lineTxt.split(",");if (str.length == 2) {hashMap.put(str[0], str[1]);}}read.close();} else {System.out.println("找不到指定的文件");}} catch (Exception e) {System.out.println("读取文件内容出错");e.printStackTrace();}return hashMap;}@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);String filePath = "input/data100.txt";ScheduledExecutorService timer = Executors.newSingleThreadScheduledExecutor();timer.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {hashMap = readTxtFile(filePath);}}, 0, 5, TimeUnit.SECONDS);}@Overridepublic String map(String s) throws Exception {return hashMap.size() + "";}}

广播态

package join;import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;/*** 这个例子是从socket中读取的流,数据为用户名称和城市id,维表是城市id、城市名称,* 主流和维表关联,得到用户名称、城市id、城市名称* 这个例子采用 Flink 广播流的方式来做为维度**/
public class JoinDemo4 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//定义主流DataStream<Tuple2<String, Integer>> textStream = env.socketTextStream("localhost", 9000, "\n").map(p -> {//输入格式为:user,1000,分别是用户名称和城市编号String[] list = p.split(",");return new Tuple2<String, Integer>(list[0], Integer.valueOf(list[1]));}).returns(new TypeHint<Tuple2<String, Integer>>() {});//定义城市流DataStream<Tuple2<Integer, String>> cityStream = env.socketTextStream("localhost", 9001, "\n").map(p -> {//输入格式为:城市ID,城市名称String[] list = p.split(",");return new Tuple2<Integer, String>(Integer.valueOf(list[0]), list[1]);}).returns(new TypeHint<Tuple2<Integer, String>>() {});//将城市流定义为广播流final MapStateDescriptor<Integer, String> broadcastDesc = new MapStateDescriptor("broad1", Integer.class, String.class);BroadcastStream<Tuple2<Integer, String>> broadcastStream = cityStream.broadcast(broadcastDesc);DataStream result = textStream.connect(broadcastStream).process(new BroadcastProcessFunction<Tuple2<String, Integer>, Tuple2<Integer, String>, Tuple3<String, Integer, String>>() {//处理非广播流,关联维度@Overridepublic void processElement(Tuple2<String, Integer> value, ReadOnlyContext ctx, Collector<Tuple3<String, Integer, String>> out) throws Exception {ReadOnlyBroadcastState<Integer, String> state = ctx.getBroadcastState(broadcastDesc);String cityName = "";if (state.contains(value.f1)) {cityName = state.get(value.f1);}out.collect(new Tuple3<>(value.f0, value.f1, cityName));}@Overridepublic void processBroadcastElement(Tuple2<Integer, String> value, Context ctx, Collector<Tuple3<String, Integer, String>> out) throws Exception {System.out.println("收到广播数据:" + value);ctx.getBroadcastState(broadcastDesc).put(value.f0, value.f1);}});result.print();env.execute("joinDemo");}
}

参考文献

  1. 实时数仓之Flink维表关联难点解决方案
  2. Flink重点难点:维表关联理论和Join实战
  3. Flink State 误用之痛,你中招了吗?
  4. Flink 维表关联之全量预加载+定时刷新

【Flink系列】开发篇:1. Flink维表关联方案相关推荐

  1. sql 忽略大小写_Flink使用Calcite解析Sql做维表关联(一)

    点击箭头处"蓝色字",关注我们哦!! 维表关联是离线计算或者实时计算里面常见的一种处理逻辑,常常用于字段补齐.规则过滤等,一般情况下维表数据放在MySql等数据库里面,对于离线计算 ...

  2. flink维表关联系列之Redis维表关联:实时查询

    点击上方蓝 字关注~ 在做维表关联如果要求低延时,即维表数据的变更能够被立刻感知到,所以就要求在查询时没有缓存策略,直接查询数据库维表信息. 本篇以实时查询redis为例,要求redis 客户端支持异 ...

  3. Flink实时数据处理实践经验(Flink去重、维表关联、定时器、双流join)

    Flink实时数据处理实践经验 文章目录 Flink实时数据处理实践经验 1. 数据输入与预处理 2. 实时数据处理 3. 实时数仓架构 4. 优化方案 Java.大数据开发学习要点(持续更新中-) ...

  4. 小白学习Flink系列--第一篇(知识图谱)

    小白学习Flink系列–第一篇(知识图谱) 如何学习Flink? ​ 对于一门计算机技术来说,如何快速学习上手呢?具体的逻辑是什么呢?我认为有以下几条 了解技术的应用场景 技术的基本概念,如何使用,以 ...

  5. Flink系列之:基于Flink CDC2.0实现海量数据的实时同步和转换

    Flink系列之:基于Flink CDC2.0实现海量数据的实时同步和转换 一.CDC技术 二.Flink CDC技术 三.传统数据集成方案的痛点 1.传统数据入仓架构1.0 2.传统数据入仓架构2. ...

  6. 实时数仓-维表维护方案

    实时数仓-维表维护方案 维表维护方案 阶段一 阶段二 技术细节代码解析 cdc数据upsert到kafka kafka数据sink hbase 1.创建upsert kafka table 2.在hb ...

  7. 95-134-114-源码-维表-Hbase维表关联:LRU策略

    1 .世界 2.概述 ​ LRU(Least Recently Used),最近最少使用缓存淘汰算法,认为最近访问过的数据在将来被访问的概率也比较大,当内存达到上限去淘汰那些最近访问较少的数据. ​ ...

  8. Flink SQL 如何避免 JDBC Connector 维表出现 Finished 状态

    背景 JDBC Connector 使得关系型数据库( Mysql.PostgreSQL)可以作为 Flink 主流的维表,如下图: 但如果使用不当会出现 JDBC Connector Source ...

  9. Flink系列文档-(YY03)-Flink编程基础API-Source

    1 Flink编程入口 首先获取flink编程的核心入口对象   /*** 获取批处理入口对象*/// 1) 普通的批处理对象ExecutionEnvironment environment1 = E ...

最新文章

  1. SAP中的KANBAN
  2. 大规模分布式消息中间件考虑点
  3. Web应用虚拟目录的映射的几种方式
  4. Redhat7.2上编译Linux内核源码
  5. python同步锁和互斥锁的区别_Python实现的多线程同步与互斥锁功能示例
  6. flink sql client讀取kafka數據的timestamp(DDL方式)
  7. GetDisplayName 获取枚举的显示值
  8. 仿Drinkspiration App的menu
  9. Valid Number 1
  10. 浅谈API测试与UI Auomation一点心得
  11. layui的table常用方法
  12. maple 2022
  13. Python编写微信打飞机小游戏(四)
  14. bin文件用cad打开_bin文件怎么打开?实测可靠方法
  15. 谷歌flash无法输入中文
  16. 机器学习十大经典算法入门
  17. STM32F4 ETH-Lwip以太网通信
  18. Linux 内核配置项详解 myimx6
  19. 【倾斜摄影】——三维建模软件ContextCapture 空三质量报告详细解读
  20. 项目1:基于Java API文档制作的搜索引擎

热门文章

  1. Lecture 2 DFT STFT Heisenberg's uncertainty principle Spectral Estimation
  2. 虹科HK-NT 50网关让扫码器轻松连接工业PLC
  3. 深度学习之神经网络传递流程
  4. BUUCTF[GXYCTF2019]BabySQli
  5. 如何停止无意义的内耗
  6. 2018-2021国内网站数量直线下降,2021年仅剩422万个
  7. 【Python文本处理】基于运动路线记录GPX的文件解析,及对经纬度坐标的数学模型运动速度求解
  8. 安装numpy, pandas, scipy 和matplotlib
  9. 【SPIE独立出版 | Ei检索 】第二届物联网与机器学习国际学术会议征稿中!
  10. 推荐一门边开车边赚钱的小生意跟游戏推广有关