简介

维度表,作为数据仓库里面的概念,是维度属性的集合,比如时间维、地点维;可以是一个mysql或者cassandra,redis等存储,甚至是自己定义的一些api。
流表是kafka等流式数据。
根据流表join维表的字段去异步查询维表。

举个例子

流表:kafka id1,id2,id3三列
维表:mysql id,age,name
sql:select id1,id2,id3,age,name from kafka join mysql on id1=id;
join的结果就是: id1,id2,id3,age,name 流表的字段加上mysql维表的字段。
流表这边提供id1,给到维表,维表那边执行的sql是select * from mysql where id=id1

实战

流表:文本数据csv包含uid、phone
维表:Elasticsearch数据包含uid、username
需要把流表和维表的数据进行join,形成uid、username、phone

第一步从文本获取流数据

public class Test {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> dataStreamSource = env.readTextFile("/mytextFile.txt");SingleOutputStreamOperator<Tuple5<String, String, String, String, String>> map = dataStreamSource.map(new MapFunction<String, Tuple2<String, String>>() {@Overridepublic Tuple5<String, String, String, String, String> map(String s) throws Exception {String[] splits = s.split("\t");String uid = splits[0];String phone = splits[1];return new Tuple2<>(uid, phone);}});//SingleOutputStreamOperator<Tuple5<String, Set<String>, Set<String>, Set<String>, Set<String>>> renyuanku = AsyncDataStream.unorderedWait(map, new AsyncEsDataRequest(), 2, TimeUnit.SECONDS, 100);//renyuanku.writeAsText("E:/test/renyuanku.txt").setParallelism(1);env.execute("Test");}
}

异步从Elasticsearch获取数据

public class AsyncEsDataRequest extends RichAsyncFunction<Tuple2<String, String>, Tuple3<String, String, String>> {private transient RestHighLevelClient restHighLevelClient;@Overridepublic void open(Configuration parameters) throws Exception {HttpHost httpHost = new HttpHost("swarm-manager", 9200, "http");//初始化ElasticSearch-ClientrestHighLevelClient = new RestHighLevelClient(RestClient.builder(httpHost));}@Overridepublic void close() throws Exception {restHighLevelClient.close();}@Overridepublic void asyncInvoke(Tuple2<String, String> input, ResultFuture<Tuple3<String, String, String>> resultFuture) throws Exception {search(input, resultFuture);}//异步去读Es表private void search(Tuple2<String, String> input, ResultFuture<Tuple3<String, String, String>> resultFuture) {SearchRequest searchRequest = new SearchRequest("renyuanku");String uid = input.f0;QueryBuilder builder = QueryBuilders.boolQuery().must(QueryBuilders.termQuery("uid", uid));SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();sourceBuilder.query(builder);searchRequest.source(sourceBuilder);ActionListener<SearchResponse> listener = new ActionListener<SearchResponse>() {String uid = input.f1;String phone = input.f2;//成功@Overridepublic void onResponse(SearchResponse searchResponse) {SearchHit[] searchHits = searchResponse.getHits().getHits();if (searchHits.length > 0) {JSONObject jsonObject = JSONObject.parseObject(searchHits[0].getSourceAsString());String username = jsonObject.getString("username");}resultFuture.complete(Collections.singleton(Tuple5.of(uid, username, phone)));}//失败@Overridepublic void onFailure(Exception e) {System.out.println(e.getMessage());resultFuture.complete(Collections.singleton(Tuple5.of(uid, username, phone));*/}};restHighLevelClient.searchAsync(searchRequest, listener);}
}

连接这两个流,并将结果输出到文件

        SingleOutputStreamOperator<Tuple5<String, Set<String>, Set<String>, Set<String>, Set<String>>> renyuanku = AsyncDataStream.unorderedWait(map, new AsyncEsDataRequest(), 2, TimeUnit.SECONDS, 100);renyuanku.writeAsText("E:/test/renyuanku.txt").setParallelism(1);

这样就将这两个流进行合并了。

Flink异步io应用场景之流表join维表相关推荐

  1. flink 异步io使用

    [4]中提到了使用异步IO的场景 需要异步I / O操作 当与外部系统交互时(例如,当使用存储在数据库中的数据来丰富流事件时),需要注意与外部系统的通信延迟不会影响流应用程序的整体工作. 直接访问外部 ...

  2. Flink 异步IO时 java.util.concurrent.TimeoutException: Async function call has timed out.

    Flink 异步IO时 timeout报错 java.lang.Exception: An async function call terminated with an exception. Fail ...

  3. 大数据开发实战:Hive优化实战2-大表join小表优化

    4.大表join小表优化 和join相关的优化主要分为mapjoin可以解决的优化(即大表join小表)和mapjoin无法解决的优化(即大表join大表),前者相对容易解决,后者较难,比较麻烦. 首 ...

  4. MySQL小表join大表的正确使用姿势(straight_join 关键字的使用)

    网上有种说法是:由于一般是采用小表join大表的方式(可以提高效率),所以有人说将小表放在左边,让它先执行,记住,这种说法是错误的!!!有例为证: 我们看上例: film inner join fil ...

  5. 真正让你明白Hive调优系列3:笛卡尔乘积,小表join大表,Mapjoin等问题

    0.Hive中的优化分类    真正想要掌握Hive的优化,要熟悉相关的MapReduce,Yarn,hdfs底层源码,明晰Hive的底层执行流程.真正让你明白Hive调优系列,会征对下面分类逐一分析 ...

  6. 【Spark调优】大表join大表,少数key导致数据倾斜解决方案

    [Spark调优]大表join大表,少数key导致数据倾斜解决方案 参考文章: (1)[Spark调优]大表join大表,少数key导致数据倾斜解决方案 (2)https://www.cnblogs. ...

  7. 【Spark调优】小表join大表数据倾斜解决方案

    [Spark调优]小表join大表数据倾斜解决方案 参考文章: (1)[Spark调优]小表join大表数据倾斜解决方案 (2)https://www.cnblogs.com/wwcom123/p/1 ...

  8. Flink 异步IO访问外部数据(mysql篇)

    接上篇:[翻译]Flink 异步I / O访问外部数据 最近看了大佬的博客,突然想起Async I/O方式是Blink 推给社区的一大重要功能,可以使用异步的方式获取外部数据,想着自己实现以下,项目上 ...

  9. 阿里云流计算中维表join VS 流join

    最近业务上使用blink进行清洗数据,使用到了双流join和维表join,今天有同学问我流join和维表join有什么区别.在此我做个简单的说明,描述不对的地方,欢迎大家纠正,后面补充. 流式计算过程 ...

最新文章

  1. js时间格式化函数,支持Unix时间戳
  2. 小程序导航组件navigator活学活用
  3. 省二计算机二级vb程序设计题,江苏省计算机二级考试VB程序设计复习题(分类汇总)...
  4. 男朋友让我纹他的名字,但我不想纹怎么办?
  5. c#chart背景透明_C#+Layui开发后台管理系统
  6. java之备忘录模式,java设计模式之备忘录模式
  7. Matlab C-Mex Round 1
  8. 借助Ehcache缓存框架实现对页面的缓存
  9. 2022Java最新学习路线(初学者必看)
  10. Bootstrap如何设置table样式
  11. 厨神之路五--粥汤类
  12. 菜鸟教程python3 mysql_MySQL菜鸟教程
  13. java程序员一般用什么笔记本_程序员对笔记本电脑有什么要求吗?推荐下哪些牌子笔记本性价比高?...
  14. 闲居即兴 - 反卷诗篇
  15. mysql中高阶玩法系列(九)
  16. esp8266接入米家、小爱同学,附开源app控制
  17. 全光谱防蓝光护眼灯有用吗?怎么分辨是全光谱灯
  18. 微信小程序真机调试无法连接websocket解决方案
  19. 计算机网络知识英语,计算机网络基础课程中英文简介.DOC
  20. 关于虚拟现实(VR)内容开发综述

热门文章

  1. PHP下kafka的实践(已经测试)
  2. PHP的ob多级缓冲设置
  3. Git新建临时分支进行开发后合并至master
  4. Go与PHP区别:类型 引用 语法 错误 性能 应用 生态
  5. tp框架命名空间使用(namespace,use,as,\)
  6. android+5.0+ble,android5.0(Lollipop) BLE Peripheral牛刀小试(示例代码)
  7. excel导入mysql语句_求助:用SQL语句从Excel将数据导入到SQL数据库
  8. matlab 主成分 分类,matlab主成分分析
  9. matlab仿真软件 高阶调制,高阶差分幅度相移键控调制解调系统及仿真
  10. python3.7.2怎么使用win7_Win7同时安装Python2和Python3的配置