本文主要介绍 HBase 和 Flink SQL 的结合使用。HBase 作为 Google 发表 Big Table 论文的开源实现版本,是一种分布式列式存储的数据库,构建在 HDFS 之上的 NoSQL 数据库,非常适合大规模实时查询,因此 HBase 在实时计算领域使用非常广泛。可以实时写 HBase,也可以利用 buckload 一把把离线 Job 生成 HFile Load 到HBase 表中。而当下 Flink SQL 的火热程度不用多说,Flink SQL 也为 HBase 提供了 connector,因此 HBase 与 Flink SQL 的结合非常有必要实践实践。

当然,本文假设用户有一定的 HBase 知识基础,不会详细去介绍 HBase 的架构和原理,本文着重介绍 HBase 和 Flink 在实际场景中的结合使用。主要分为两种场景,第一种场景:HBase 作为维表与 Flink Kafka table 做 temporal table join 的场景;第二种场景:Flink SQL 做计算之后的结果写到 HBase 表,供其他用户查询的场景。因此,本文介绍的内容如下所示:

  • HBase 环境准备

  • 数据准备

  • HBase 作为维度表进行 temporal table join的场景

  • Flink SQL 做计算写 HBase 的场景

  • 总结

01 HBase 环境准备

由于没有测试的 HBase 环境以及为了避免污染线上 Hbase 环境。因此,自己 build一个 Hbase docker image(大家可以 docker pull guxinglei/myhbase 拉到本地),是基于官方干净的 ubuntu imgae 之上安装了 Hbase 2.2.0 版本以及 JDK1.8 版本。

  • 启动容器,暴露 Hbase web UI 端口以及内置 zk 端口,方便我们从 web 页面看信息以及创建 Flink Hbase table 需要 zk 的链接信息。

docker run -it --network=host -p 2181:2181 -p 60011:60011 docker.io/guxinglei/myhbase:latest bash

  • 进入容器,启动 HBase 集群,以及启动 rest server,后续方便我们用 REST API 来读取 Flink SQL 写进 HBase 的数据。

# 启动hbase 集群bin/start-hbase.sh# 后台启动restServerbin/hbase-daemon.sh start rest -p 8000

02 数据准备

由于 HBase 环境是自己临时搞的单机服务,里面没有数据,需要往里面写点数据供后续示例用。在 Flink SQL 实战系列第二篇中介绍了如何注册 Flink Mysql table,我们可以将广告位表抽取到 HBase 表中,用来做维度表,进行 temporal table join。因此,我们需要在 HBase 中创建一张表,同时还需要创建 Flink HBase table, 这两张表通过 Flink SQL 的 HBase connector 关联起来。

  • 在容器中启动 HBase shell,创建一张名为 dim_hbase 的 HBase 表,建表语句如下所示:

# 在hbase shell创建 hbase表hbase(main):002:0> create 'dim_hbase','cf'Created table dim_hbaseTook 1.3120 seconds=> Hbase::Table - dim_hbase

  • 在 Flink 中创建 Flink HBase table,建表语句如下所示:

# 注册 Flink Hbase tableDROP TABLE IF EXISTS flink_rtdw.demo.hbase_dim_table;CREATE TABLE flink_rtdw.demo.hbase_dim_table (  rowkey STRING,  cf ROW < adspace_name STRING >,  PRIMARY KEY (rowkey) NOT ENFORCED) WITH ('connector' = 'hbase-1.4','table-name' = 'dim_hbase','sink.buffer-flush.max-rows' = '1000','zookeeper.quorum' = 'localhost:2181');
  • Flink MySQL table 和 Flink HBase table 已经创建好了,就可以写抽取数据到HBase 的 SQL job 了,SQL 语句以及 job 状态如下所示:

# 抽取Mysql数据到Hbase表中insert into  hbase_dim_tableselectCAST (ID as VARCHAR),ROW(name)from  mysql_dim_table;

03 HBase 作为维表与 Kafka 

做 temporal join 的场景

在 Flink SQL join 中,维度表的 join 一定绕不开的,比如订单金额 join 汇率表,点击流 join 广告位的明细表等等,使用场景非常广泛。那么作为分布式数据库的 HBase 比 MySQL 作为维度表用作维度表 join 更有优势。在 Flink SQL 实战系列第二篇中,我们注册了广告的点击流,将 Kafka topic 注册 Flink Kafka Table,同时也介绍了 temporal table join 在 Flink SQL 中的使用;那么本节中将会介绍 HBase 作为维度表来使用,上面小节中已经将数据抽取到 Hbase 中了,我们直接写 temporal table join 计算逻辑即可。

  • 作为广告点击流的 Flink Kafa table 与 作为广告位的 Flink HBase table 通过广告位 Id 进行 temporal table join,输出广告位 ID 和广告位中文名字,SQL join  逻辑如下所示:

select adsdw_dwd_max_click_mobileapp.publisher_adspace_adspaceId as publisher_adspace_adspaceId,       hbase_dim_table.cf.adspace_name as publisher_adspace_namefrom adsdw_dwd_max_click_mobileappleft join hbase_dim_table FOR SYSTEM_TIME AS OF adsdw_dwd_max_click_mobileapp.procTimeon cast(adsdw_dwd_max_click_mobileapp.publisher_adspace_adspaceId as string) = hbase_dim_table.rowkey;
  • temporal table join job 提交 Flink 集群上的状态以及 join 结果如下所示:

04 计算结果 sink 到 HBase 作为结果的场景

上面小节中,HBase 作为维度表用作 temporal table join 是非常常见的场景,实际上 HBase 作为存储计算结果也是非常常见的场景,毕竟 Hbase 作为分布式数据库,底层存储是拥有多副本机制的 HDFS,维护简单,扩容方便, 实时查询快,而且提供各种客户端方便下游使用存储在 HBase 中的数据。那么本小节就介绍 Flink SQL 将计算结果写到 HBase,并且通过 REST API 查询计算结果的场景。

  • 进入容器中,在 HBase 中新建一张 HBase 表,一个 column family 就满足需求,建表语句如下所示:

# 注册hbase sink tablecreate 'dwa_hbase_click_report','cf'

  • 建立好 HBase 表之后,我们需要在 Flink SQL 创建一张 Flink HBase table,这个时候我们需要明确 cf 这个 column famaly 下面 column 字段,在 Flink SQL实战第二篇中,已经注册好了作为点击流的 Flink Kafka table,因此本节中,将会计算点击流的 uv 和点击数,因此两个 column 分别为 uv 和 click_count,建表语句如下所示:

# 注册 Flink Hbase tableDROP TABLE IF EXISTS flink_rtdw.demo.dwa_hbase_click_report;CREATE TABLE flink_rtdw.demo.dwa_hbase_click_report (  rowkey STRING,  cf ROW < uv BIGINT, click_count BIGINT >,  PRIMARY KEY (rowkey) NOT ENFORCED) WITH ('connector' = 'hbase-1.4','table-name' = 'dwa_hbase_click_report','sink.buffer-flush.max-rows' = '1000','zookeeper.quorum' = 'hostname:2181');

  • 前面点击流的 Flink Kafka table 和存储计算结果的 HBase table 和 Flink HBase table 已经准备了,我们将做一个1分钟的翻转窗口计算 uv 和点击数,并且将计算结果写到 HBase 中。对 HBase 了解的人应该知道,rowkey 的设计对  hbase regoin 的分布有着非常重要的影响,基于此我们的 rowkey 是使用 Flink SQL 内置的 reverse 函数进行广告位 Id 进行反转和窗口启始时间做 concat,因此,SQL 逻辑语句如下所示:

INSERT INTO dwa_hbase_click_reportSELECTCONCAT(REVERSE(CAST(publisher_adspace_adspaceId AS STRING)) ,'_',CAST((UNIX_TIMESTAMP(DATE_FORMAT(TUMBLE_START(ets, INTERVAL '1' MINUTE),'yyyy-MM-dd HH:mm:ss')) * 1000) AS STRING)  ) as rowkey, ROW(COUNT(DISTINCT audience_mvid) , COUNT(audience_behavior_click_creative_impressionId)) as cfFROM  adsdw_dwd_max_click_mobileappWHERE publisher_adspace_adspaceId IS NOT NULL AND audience_mvid IS NOT NULL AND audience_behavior_click_creative_impressionId IS NOT NULLGROUP BY  TUMBLE(ets, INTERVAL '1' MINUTE),  publisher_adspace_adspaceId;

  • SQL job 提交之后的状态以及结果 check 如下所示:

上述 SQL job 已经成功的将结算结果写到 HBase 中了。对于线上的 HBase 服务来讲,很多同事不一定有 HBase 客户端的权限,从而也不能通过 HBase shell 读取数据;另外作为线上报表服务显然不可能通过 HBase shell 来通过查询数据。因此,在实时报表场景中,数据开发工程师将数据写入 HBase, 前端工程师通过 REST API 来读取数据。前面我们已经启动了 HBase rest server 进程,我们可以通 rest 服务提供读取 HBase 里面的数据。

  • 我们先 get 一条刚刚写到 HBase 中的数据看看,如下所示:

  • 下面我们开始通过 REST API 来查询 HBase 中的数据,第一步,执行如下语句拿到 scannerId;首先需要将要查询的 rowkey 进行 base64 编码才能使用,后面需要将结果进行 base64 解码

rowkey base64 编码前:0122612_1606295280000

base64 编码之后:MDEyMjYxMl8xNjA2Mjk1MjgwMDAw

curl -vi -X PUT \         -H "Accept: text/xml" \         -H "Content-Type: text/xml" \         -d '<Scanner startRow="MDEyMjYxMl8xNjA2Mjk1MjgwMDAw" endRow="MDEyMjYxMl8xNjA2Mjk1MjgwMDAw"></Scanner>' \"http://hostname:8000/dwa_hbase_click_report/scanner"

  • 第二步,执行如下语句根据上条语句返回的 scannerID 查询数据,可以看到返回的结果:

curl -vi -X GET \         -H "Accept: application/json" \"http://hostname:8000/dwa_hbase_click_report/scanner/16063768141736ac0a8b5"

  • 第三步,查询完毕之后,执行如下语句删除该 scannerId:

curl -vi -X DELETE \         -H "Accept: text/xml" \"http://hostname:8000/dwa_hbase_click_report/scanner/16063768141736ac0a8b5"

五. 总结

在本篇文章中,我们介绍了 HBase 和 Flink SQL 的结合使用比较广泛两种的场景:作为维度表用以及存储计算结果;同时使用 REST API 对 HBase 中的数据进行查询,对于查询用户来说,避免直接暴露 HBase 的 zk,同时将 rest server 和 HBase 集群解耦。

作者简介

余敖,360 数据开发高级工程师,目前专注于基于 Flink 的实时数仓建设与平台化工作。对 Flink、Kafka、Hive、Spark 等进行数据 ETL 和数仓开发有丰富的经验。


  Flink Forward Asia 2020  

大会议程发布

Flink Forward Asia 2020 在线峰会重磅开启!12月13-15日,全球 38+ 一线厂商,70+ 优质议题,与您探讨新型数字化技术下的未来趋势!大会议程已正式上线,点击文末「阅读原文」即可免费预约~

(点击可了解更多大会详情)

▼ 关注 Flink 技术社区,获取更多技术干货 ▼

戳我,去 Flink Forward Asia 2020!

Flink SQL 实战:HBase 的结合应用相关推荐

  1. flink sql实战案例

    目录 一.背景 二.流程 三.案例 1.flink sql读取 Kafka 并写入 MySQL source sink insert 2.flinksql读kafka写入kudu source sin ...

  2. Flink最锋利的武器:Flink SQL入门和实战 | 附完整实现代码

    作者 | 机智的王知无 转载自大数据技术与架构(ID: import_bigdata) 一.Flink SQL 背景 Flink SQL 是 Flink 实时计算为简化计算模型,降低用户使用实时计算门 ...

  3. Flink SQL Client进行Kafka事实表与Hbase维度表Join(纯DDL/SQL方式)

    概述: 對參考鏈接[1]進行DDL上的復現. 一些基本的業務常识   來源載體 數據特點 維表 Mysql/Csv/Hbase 很少變化 事實表 Kafka 不停變化 开发环境与准备工作 组件 版本 ...

  4. Flink 最锋利的武器:Flink SQL 入门和实战带你了解NBA球星数据

    一.Flink SQL 背景 Flink SQL 是 Flink 实时计算为简化计算模型,降低用户使用实时计算门槛而设计的一套符合标准 SQL 语义的开发语言. 自 2015 年开始,阿里巴巴开始调研 ...

  5. Flink 最锋利的武器:Flink SQL 入门和实战

    学习路径:<2021年最新从零到大数据专家学习路径指南> 面      试:<2021年最新版大数据面试题全面开启更新> [注意]:Flink1.9版本后的Flink SQL使 ...

  6. 个推基于Flink SQL建设实时数仓实践

    作为一家数据智能企业,个推在服务垂直行业客户的过程中,会涉及到很多数据实时计算和分析的场景,比如在服务开发者时,需要对App消息推送的下发数.到达数.打开率等后效数据进行实时统计:在服务政府单位时,需 ...

  7. 实战 | flink sql 实时 TopN

    实战 | flink sql 实时 TopN 1.背景篇 2.难点剖析篇-此类指标建设.保障的难点 2.1.数据建设 2.2.数据保障 2.3.数据服务保障 3.数据建设篇-具体实现方案详述 3.1. ...

  8. 京东:Flink SQL 优化实战

    简介:本文着重从 shuffle.join 方式的选择.对象重用.UDF 重用等方面介绍了京东在 Flink SQL 任务方面做的优化措施. 本文作者为京东算法服务部的张颖和段学浩,并由 Apache ...

  9. Flink应用实战案例50篇(一)- Flink SQL 在京东的优化实战

    一.背景 目前,京东搜索推荐的数据处理流程如上图所示.可以看到实时和离线是分开的,离线数据处理大部分用的是 Hive / Spark,实时数据处理则大部分用 Flink / Storm. 这就造成了以 ...

  10. 【阿里云EMR实战篇】以EMR测试集群版本为例,详解 Flink SQL Client 集成 Hive 使用步骤

    简介: 以测试集群版本为例(EMR-4.4.1)-- Flink SQL Client 集成 Hive 使用文档 作者:林志成,阿里云EMR产品团队技术支持,拥有多年开源大数据经验 1.以测试集群版本 ...

最新文章

  1. android:layout_gravity和android:gravity的区别
  2. 全球与中国压延铜箔市场发展调研及投资前景战略建议报告2022-2028年版
  3. Java私有方法解释_java接口中 定义 private 私有方法
  4. MyBatis 实际使用案例-Mapper.xml 映射配置文件【重点】
  5. 关于一些电脑使用的小技巧
  6. 洛谷P2114 [NOI2014]起床困难综合症
  7. foreman架构的引入3-安装Foreman1.5.3架构(all-in-one)
  8. 【Android】ArcFaceDemo
  9. Android 详解自定义View抽奖转盘
  10. (六)洞悉linux下的Netfilteriptables:如何理解连接跟踪机制?(2)
  11. Android4,flutter插件引用第三方库
  12. 免费可商用图片素材网站,建议收藏
  13. 弹性地基梁板实用计算_YJK软件前处理之计算参数的设置(上篇)
  14. 【golang】go app 优雅关机 Graceful Shutdown How?
  15. pyton 内置模块
  16. 预祝大家平安夜圣诞节快乐
  17. 大疆aeb连拍_给无人机新手的3个航拍技巧,入门机也能拍大片!
  18. 【全文】狼叔:如何正确的学习Node.js
  19. 长铗:信息与计算力的大爆炸可能会导致超级智能
  20. 职场七年之痒,如何抉择

热门文章

  1. lol韩服游戏内设置_LOL手游:灵药战队有多强?辅助国际服第六,AD堪比UZI
  2. 非线性优化_曲线拟合_Ceres_最小二乘法示例
  3. SLAM++:面向对象的同时定位与建图系统(2013-CVPR)
  4. OpenCV_(Corner Detect with Morphology) 基于形态学滤波的角点检测
  5. C++调用SSD caffe模型进行物体检测-Opencv3.4.3
  6. vue项目中配置favicon图标
  7. Python学习 5day__基础知识
  8. libgdx for eclipse开发环境搭建
  9. Unity带参数的协程
  10. 软件工程-东北师大站-第六次作业PSP