【调研】FlinkSql功能测试及实战演练

槐序i 2021-05-12 17:39:24 573 收藏
分类专栏: flink
版权

flink
专栏收录该内容
14 篇文章0 订阅
订阅专栏
FlinkSql功能测试及实战演练
前言:Flink在国内的占有率逐步提升的情况下,各项组件的功能与稳定性也得到逐步提升。为了解决目前已有的复杂需求,尝试研究flinksql的特性与功能,作为是否引入该组件的依据。同时尝试将现有需求通过简单demo的形式进行测试。本次测试主要集中在Kafka、mysql、Impala三个组件上,同时将结合官方文档进行:

https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/
1
1 前期准备
1.1 环境配置
本次研究测试需要用到以下组件:

CDH 6.3.2
Flink 1.12.2
mysql 5.7
impala 3.2.0-cdh6.3.2
kafka 2.2.1-cdh6.3.2
1
2
3
4
5
1.2 依赖关系
本次测试会将FlinkSql与kafka、mysql、impala等组件进行conn,因此需要以下依赖包:

flink-connector-kafka_2.11-1.12.2.jar
flink-connector-jdbc_2.11-1.11.2.jar
mysql-connector-java-5.1.47.jar
ImpalaJDBC4.jar
ImpalaJDBC41.jar
flink-sql-connector-kafka_2.11-1.12.2.jar
1
2
3
4
5
6
1.3 重启flink
将上述所需的jar包放入$FLINK_HOME/lib中之后(所有部署flink的服务器都需要放),重启yarn-session

yarn-session.sh --detached
sql-client.sh embedded
1
2
2 FlinkSql-kafka测试
FlinkSql-kafka相关资料:

https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/connectors/kafka.html
1
2.1 FlinkSql-kafka常规功能测试
通过FlinkSql将Kafka中的数据映射成一张表

2.1.1 创建常规topic
1、创建topic
kafka-topics --create --zookeeper 192.168.5.185:2181,192.168.5.165:2181,192.168.5.187:2181 --replication-factor 3 --partitions 3 --topic test01

2、模拟消费者
kafka-console-consumer --bootstrap-server 192.168.5.185:9092,192.168.5.165:9092,192.168.5.187:9092 --topic test01 --from-beginning

3、模拟生产者
kafka-console-producer --broker-list 192.168.5.185:9092,192.168.5.165:9092,192.168.5.187:9092 --topic test01

4、删除topic
kafka-topics --delete --topic test01 --zookeeper 192.168.5.185:2181,192.168.5.165:2181,192.168.5.187:2181
1
2
3
4
5
6
7
8
9
10
11
2.1.2 FlinkSql建表
CREATE TABLE t1 (
name string,
age BIGINT,
isStu INT,
opt STRING,
optDate TIMESTAMP(3) METADATA FROM ‘timestamp’
) WITH (
‘connector’ = ‘kafka’, – 使用 kafka connector
‘topic’ = ‘test01’, – kafka topic
‘scan.startup.mode’ = ‘earliest-offset’,
‘properties.bootstrap.servers’ = ‘192.168.5.185:9092,192.168.5.165:9092,192.168.5.187:9092’, – kafka broker 地址
‘format’ = ‘csv’ – 数据源格式为 csv,
);
select * from t1;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
2.1.3 写入数据
往kafka中写入数据,同时查看flinksql中t1表的变化

zhangsan,20,1,1
lisi,18,1,2
wangwu,30,2,2
1
2
3

2.1.4 小结
通过kafka数据映射成表这个步骤,可以将数据实时的汇入表中,通过sql再进行后续操作,相对代码编写来说更为简易,同时有问题也相对好排查

2.2 FlinkSql-upsertKafka常规功能测试
upsert-kafka 连接器生产 changelog 流,其中每条数据记录代表一个更新或删除事件。

如果有key则update,没有key则insert,如果value的值为空,则表示删除

2.2.1 FlinkSql建upsert表
drop table t2;
CREATE TABLE t2 (
name STRING,
age bigint,
isStu INT,
opt STRING,
optDate TIMESTAMP(3) ,
PRIMARY KEY (name) NOT ENFORCED
) WITH (
‘connector’ = ‘upsert-kafka’,
‘topic’ = ‘test02’,
‘properties.bootstrap.servers’ = ‘192.168.5.185:9092,192.168.5.165:9092,192.168.5.187:9092’, – kafka broker 地址
‘key.format’ = ‘csv’,
‘value.format’ = ‘csv’
);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
2.2.2 建立映射关系
将t1表中的数据写入到t2中

INSERT INTO t2 SELECT * FROM t1 ;
select * from t2;
1
2
结果如下:

2.2.3 更新数据
继续模拟kafka生产者,写入如下数据

zhangsan,25,1,2
risen,8,8,8
lisi,0,0,
1
2
3
结果如下:

2.2.4小结
通过如上测试,两条更新,一条插入,都已经实现了,

根据官方文档描述,指定key的情况下,当value为空则判断为删除操作

但是假如我插入一条数据到kafka,例如:

lisi,
1
只有key,没有value,t1表就会报如下错误

因为建表的时候有几个类型定义为了Int类型,这里为空它默认为是""空字符串,有点呆,推测如果是json格式这类可以指定数据类型的,才能直接使用。对于csv这种数据类型不确定的,会存在无法推断类型的情况。

鉴于此,为了探究是否真的具备删除操作,我又将上述所有表结构都进行了修改。为了试验简单,我直接修改表结构再次测试

drop TABLE t1;
CREATE TABLE t1 (
name STRING,
age STRING,
isStu STRING,
opt STRING,
optDate TIMESTAMP(3) METADATA FROM ‘timestamp’
) WITH (
‘connector’ = ‘kafka’, – 使用 kafka connector
‘topic’ = ‘test01’, – kafka topic
‘scan.startup.mode’ = ‘earliest-offset’,
‘properties.bootstrap.servers’ = ‘192.168.5.185:9092,192.168.5.165:9092,192.168.5.187:9092’, – kafka broker 地址
‘format’ = ‘csv’ – 数据源格式为 csv,
);
drop table t2;
CREATE TABLE t2 (
name STRING,
age STRING,
isStu STRING,
opt STRING,
optDate TIMESTAMP(3) ,
PRIMARY KEY (name) NOT ENFORCED
) WITH (
‘connector’ = ‘upsert-kafka’,
‘topic’ = ‘test02’,
‘properties.bootstrap.servers’ = ‘192.168.5.185:9092,192.168.5.165:9092,192.168.5.187:9092’, – kafka broker 地址
‘key.format’ = ‘csv’,
‘value.format’ = ‘csv’
);
INSERT INTO t2 SELECT * FROM t1 ;
select * from t2;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

依然没有在t2表中删除掉该条记录,该功能需要进一步探索,以后在跟进。

2.3 FlinkSql-upsertKafka关于kafka中数据过期测试
2.3.1 创建10分钟策略的topic
kafka-topics --create --zookeeper 192.168.5.185:2181,192.168.5.165:2181,192.168.5.187:2181 --replication-factor 3 --partitions 3 --topic test01 --config log.retention.minutes=10
1
kafka-console-producer --broker-list 192.168.5.185:9092,192.168.5.165:9092,192.168.5.187:9092 --topic test01
1
kafka-topics --delete --topic test01 --zookeeper 192.168.5.185:2181,192.168.5.165:2181,192.168.5.187:2181
1
kafka-console-consumer --bootstrap-server 192.168.5.185:9092,192.168.5.165:9092,192.168.5.187:9092 --topic output --from-beginning
1
kafka-topics --bootstrap-server 192.168.5.185:9092,192.168.5.165:9092,192.168.5.187:9092 --topic test01 --describe
1
2.3.2 创建flinksql的表
CREATE TABLE t1 (
name string,
age BIGINT,
isStu INT,
opt STRING,
optDate TIMESTAMP(3) METADATA FROM ‘timestamp’,
WATERMARK FOR optDate as optDate - INTERVAL ‘5’ SECOND – 在ts上定义watermark,ts成为事件时间列
) WITH (
‘connector’ = ‘kafka’, – 使用 kafka connector
‘topic’ = ‘test01’, – kafka topic
‘scan.startup.mode’ = ‘earliest-offset’,
‘properties.bootstrap.servers’ = ‘192.168.5.185:9092,192.168.5.165:9092,192.168.5.187:9092’, – kafka broker 地址
‘format’ = ‘csv’ – 数据源格式为 csv,
);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
CREATE TABLE t2 (
name STRING,
age bigint,
PRIMARY KEY (name) NOT ENFORCED
) WITH (
‘connector’ = ‘upsert-kafka’,
‘topic’ = ‘output’,
‘properties.bootstrap.servers’ = ‘192.168.5.185:9092,192.168.5.165:9092,192.168.5.187:9092’, – kafka broker 地址
‘key.format’ = ‘csv’,
‘value.format’ = ‘csv’
);
1
2
3
4
5
6
7
8
9
10
11
INSERT INTO t2
SELECT
name,
max(age)
FROM t1
GROUP BY name;
1
2
3
4
5
6
2.3.3 写入数据
zhangsan,18,1,insert
lisi,20,2,update
wangwu,30,1,delete
1
2
3
2.3.4 等待策略过期

flink映射的kafka数据因为数据删除,导致t1表里为空

但是t2是基于t1的汇总表,在t1被清空的情况下,t2依旧存在

3 FlinkSql-JDBC
FlinkSql-JDBC相关资料:

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/jdbc.html
1
3.1 FlinkSql-JDBC-Mysql常规功能测试
3.1.1 mysql建表并写入数据
create table test.test01(name varchar(10),age int, primary key (name));
INSERT INTO test.test01(name, age)VALUES(‘zhangsan’, 20);
INSERT INTO test.test01(name, age)VALUES(‘lisi’, 30);
INSERT INTO test.test01(name, age)VALUES(‘wangwu’, 18);
1
2
3
4
3.1.2 flinkSql建表
drop table mysqlTest ;
create table mysqlTest (
name string,
age int,
PRIMARY KEY (name) NOT ENFORCED
) with (
‘connector’ = ‘jdbc’,
‘url’ = ‘jdbc:mysql://192.168.5.187:3306/test’,
‘username’ = ‘root’,
‘password’ = ‘123456’,
‘table-name’ = ‘test01’

);
select * from mysqlTest;
1
2
3
4
5
6
7
8
9
10
11
12
13
14

3.1.3 flinksql写入/更新数据到mysql
3.1.3.1 写入
INSERT INTO mysqlTest(name, age)VALUES(‘risen’, 88);
1

在flink表与mysql表中,都多了该条记录

3.1.3.2 更新
INSERT INTO mysqlTest (name, age) VALUES(‘zhangsan’, 50);
1
flinksql

mysql

3.1.3.3 删除
官方文档对delete简单提了一下,但是在实际中并没有

JDBC连接器允许使用JDBC驱动程序从任何关系数据库读取数据或将数据写入任何关系数据库。本文档介绍了如何设置JDBC连接器以对关系数据库运行SQL查询。

如果在DDL上定义了主键,则JDBC接收器将在upsert模式下运行以与外部系统交换UPDATE / DELETE消息,否则,它将在附加模式下运行,并且不支持使用UPDATE / DELETE消息。
1
2
3
尝试删除:

DELETE FROM mysqlTest where name=‘zhangsan’;
INSERT INTO mysqlTest (name, age) VALUES(‘zhangsan’, null);
1
2

3.1.4 小结
flinkSql连接mysql,增删改查,增加与查询很容易实现,但是修改一定要在建表的时候,指定主键才可以实现upsert,删除目前好像没办法实现

3.2 FlinkSql-JDBC-Impala常规测试
3.2.1 Impala创建kudu表
drop table kudu_test.kuduTest;
CREAT TABLE kudu_test.kuduTest
(
name string,
age BIGINT,
isStu INT,
opt STRING,
PRIMARY KEY(name)
)STORED AS KUDU;
INSERT INTO kudu_test.mysqlTest(name, age,isStu,opt)VALUES(‘zhangsan’, 20,1,‘1’);
INSERT INTO kudu_test.mysqlTest(name, age,isStu,opt)VALUES(‘lisi’, 30,1,‘1’);
INSERT INTO kudu_test.mysqlTest(name, age,isStu,opt)VALUES(‘wangwu’, 18,1,‘1’);
1
2
3
4
5
6
7
8
9
10
11
12
3.2.2 flinkSql建表
drop table impalaTest ;
create table impalaTest (
name string,
age int,
PRIMARY KEY (name) NOT ENFORCED
) with (
‘connector’ = ‘jdbc’,
‘url’ = ‘jdbc:impala://192.168.5.185:21050/kudu_test’,
‘username’ = ‘root’,
‘password’ = ‘123456’,
‘table-name’ = ‘kuduTest’,
‘driver’=‘com.cloudera.impala.jdbc4.Driver’

);
select * from impalaTest;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

呃,不支持impala

3.2.3 小结
目前暂不支持通过JDBC连接Impala

4 总结
1、Flinksql支持kafka、mysql,且已经支持upsert功能,但是在测试delete的时候,发现都无法直接实现,但是可以通过汇总一次,在逻辑上实现。在尝试将flinksql连接impala的时候报错,目前暂不支持,但是可以考虑通过将数据写入kafka,最后impala来消费来实现。

2、在大数据场景中,每条数据都是有价值的。当某天有"统计删除了多少条数据"的需求时,物理删除掉的数据再也无法捞回,导致需求无法实现。所以建议不删除任何数据,以保留数据状态的形式,实现逻辑上的删除,即不统计当前状态为"删除"的数据。

5 实战演练
5.1 场景及需求
通过简单demo实现:

1、维度表更新

2、实时统计指标

5.1.1 源数据
姓名,年龄,身份,在校状态(1:存在:2:不存在)
name,age,identity,status
zhangsan,20,1,1
lisi,18,1,2
wangwu,30,2,2
1
2
3
4
5
5.1.2 维度表
身份维度表
dim_identity 1:学生,2:老师
1
2
5.2 流程设计

5.3 代码实现
5.3.1 创建mysql维度表并插入数据
create table test.dim_identity(name varchar(10),identity int);
INSERT INTO test.dim_identity(name, identity)VALUES(‘学生’, 1);
INSERT INTO test.dim_identity(name, identity)VALUES(‘老师’, 2);
1
2
3
5.3.2 将mysql维度表映射成FlinkSql中的表
drop table dim_identity ;
create table dim_identity (
name string,
identity int
) with (
‘connector’ = ‘jdbc’,
‘url’ = ‘jdbc:mysql://192.168.5.187:3306/test’,
‘username’ = ‘root’,
‘password’ = ‘123456’,
‘table-name’ = ‘dim_identity’
);
select * from dim_identity;
1
2
3
4
5
6
7
8
9
10
11
12
5.3.3 创建映射kafka的源数据表
CREATE TABLE ods_kafka (
name string,
age BIGINT,
identity INT,
status STRING,
insertDate TIMESTAMP(3) METADATA FROM ‘timestamp’
) WITH (
‘connector’ = ‘kafka’, – 使用 kafka connector
‘topic’ = ‘stuLog’, – kafka topic
‘scan.startup.mode’ = ‘earliest-offset’,
‘properties.bootstrap.servers’ = ‘192.168.5.185:9092,192.168.5.165:9092,192.168.5.187:9092’, – kafka broker 地址
‘format’ = ‘csv’ – 数据源格式为 csv,
);
select * from ods_kafka;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
5.3.4 根据源数据表汇总成用户状态表
采用upsert的方式,以最新一条数据作为用户的状态

drop table tds_user_status;
CREATE TABLE tds_user_status (
name STRING,
age bigint,
identity INT,
status STRING,
insertDate TIMESTAMP(3) ,
PRIMARY KEY (name) NOT ENFORCED
) WITH (
‘connector’ = ‘upsert-kafka’,
‘topic’ = ‘tdsResult’,
‘properties.bootstrap.servers’ = ‘192.168.5.185:9092,192.168.5.165:9092,192.168.5.187:9092’, – kafka broker 地址
‘key.format’ = ‘csv’,
‘value.format’ = ‘csv’
);
INSERT INTO tds_user_status SELECT * FROM ods_kafka ;
select * from tds_user_status;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
5.3.5 统计指标
统计以下指标:

当前在校的老师数量
当前总共多少学生
学生占总数的比例
当前状态为在校占总数的比例
1
2
3
4
建FlinkSql表接收指标

drop table rpt_result;
CREATE TABLE rpt_result (
inStuTeatherNum int,
StudentNum int,
StudengRate FLOAT,
inStuRate FLOAT,
countDate TIMESTAMP(3) METADATA FROM ‘timestamp’,
PRIMARY KEY (countDate) NOT ENFORCED
) WITH (
‘connector’ = ‘upsert-kafka’,
‘topic’ = ‘rptResult’,
‘properties.bootstrap.servers’ = ‘192.168.5.185:9092,192.168.5.165:9092,192.168.5.187:9092’, – kafka broker 地址
‘key.format’ = ‘csv’,
‘value.format’ = ‘csv’
);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
开始统计:

INSERT INTO rpt_result
SELECT
sum(case when t2.name = ‘老师’ and t1.status =1 then 1 else 0 end ) inStuTeatherNum
,sum(case when t2.name = ‘学生’ then 1 else 0 end ) StudentNum
,sum(case when t2.name = ‘学生’ then 1 else 0 end )/sum(1) StudengRate
,sum(case when t1.status = 1 then 1 else 0 end )/sum(1) inStuRate
FROM tds_user_status t1
left join dim_identity t2
on t1.identity=t2.identity
;
select * from rpt_result
————————————————
版权声明:本文为CSDN博主「槐序i」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/spark9527/article/details/116716127

调研 FlinkSql功能测试及实战演练相关推荐

  1. 解析《中国互联网软件测试行业2018年度调研报告》

    之前收到了一份TesterHome的调查问卷邀请:2018年度中国测试行业问卷调研.最近问卷的汇总结果出来了,有2000+的测试童鞋参与了问卷调查. 从幸存者偏差角度来说,无论是覆盖率还是准确性,结果 ...

  2. Twemproxy调研总结

    本文档主要针对Redis的代理程序TwemProxy的使用进行调研和总结,共分为以下几个部分: 1.TwemProxy简介:该部分主要介绍TwemProxy是什么,能干哪些事情,有什么主要特点: 2. ...

  3. 如何进行产品规划?一次实战演练给出具体步骤

    如何进行产品规划?一次实战演练给出具体步骤 平时写过各种规划,年度规划.季度规划.版本规划,各种类型都试过了,把总结的一些经验分享下,欢迎大家一起讨论交流. 如何进行产品规划,是产品经理比较重要的一项 ...

  4. 功能测试人员技能提升路线图,试从第一个脚步到年薪50W...

    测试心路历程: 测试行业在国内兴比较晚,大部分测试人员,最开始接触都是纯功能界面测试,会数据库Linux,随着工作年限,会接触到一些常用测试工具,比如抓包,接口测试等. 在一线城市做功能测试,如果定个 ...

  5. 软件项目需求调研报告模板下载_需求调研规范

    本文明确项目调研阶段的工作划分及流程,作为产品经理或者项目经理及参与项目调研的项目组成员,在调研阶段的工作指导以及相关约束条件,如何高效的进行调研.通过本文所明确的管理规则,促进医疗事业部需求调研的管 ...

  6. 用户需求调研—快速上手篇

    什么是需求   需求:是指人们在某一特定的时期内在各种可能的价格下愿意并且能够购买某个具体商品的需要.----<百度百科>   基于上述理解,在我们的日常生活中,"需求" ...

  7. 3年功能测试经验,面试想拿到15k很难吗?

    一直觉得经验多,无论在哪都能找到满意的工作,但是现实却是给我打了一个大巴掌!事后也不会给糖的那种... 个人情况 大概介绍一下个人情况,男,本科,三年多测试工作经验,一毕业因为不成熟的经验以及学历多次 ...

  8. 3年功能测试经验,面试拿到15k难吗?

    一直觉得经验多,无论在哪都能找到满意的工作,但是现实却是给我打了一个大巴掌!事后也不会给糖的那种... 个人情况 大概介绍一下个人情况,男,本科,三年多测试工作经验,一毕业因为不成熟的经验以及学历多次 ...

  9. 干货 | 五千字长文带你快速入门FlinkSQL

    本文已收录github:https://github.com/BigDataScholar/TheKingOfBigData,里面有大数据高频考点,Java一线大厂面试题资源,上百本免费电子书籍,作者 ...

最新文章

  1. SQL:安装多个实例,修改实例端口号,和IP加端口号连接实例
  2. Springboot+Thymeleaf+layui框架的配置与使用
  3. python在线课程价格-杭州python课程价格
  4. 关于extjs中动态添加TabPanel的tab项并以iframe显示的整理(转)
  5. Java --- 常用API
  6. 关于初始化C++类成员
  7. java 设计模式 示例_Java中的访问者设计模式–示例教程
  8. python进行数据查询_如何进行python数据库查询?(实例解析)
  9. 用java程序设计一个快递_Java编程语言的优点快递
  10. 责任链模式 php,每天一个设计模式之(9)-----责任链模式
  11. GNS3下载安装和使用、本地主机虚拟网卡消失解决方案以及环回网卡添加与测试
  12. windows7远程桌面设置
  13. sm2和sm4加密算法浅析
  14. 文件后缀名修改重命名,两种更改的小技巧
  15. django基础知识总结
  16. 燃烧的远征java(二)-开发环境:eclipse 的使用技巧收集
  17. [编程题]山寨金闪闪 (面试题)
  18. 电视软件测试项目怎么做,LED_TV液晶电视软件测试用例.doc
  19. 转载——如何让自己像打王者荣耀一样发了疯、拼了命、石乐志的学习?
  20. 典型相关分析(Canonical Correlation Analysis, CCA)

热门文章

  1. 计算机怎么进入待机模式,电脑怎么进入待机模式
  2. 【开发】后端框架——Mybatis
  3. mxgraph初体验
  4. 网易互娱2022校园招聘在线笔试-游戏研发工程师(第一批)
  5. IDEA起步(一) - 工程结构
  6. Spoon Kettle 输入之获取文件名(Get file names)
  7. 【四二学堂】标准GPS坐标,转换成百度坐标
  8. 摩托罗拉Android系统,摩托罗拉官方Android 4.0系统正式到来
  9. CSS单行、多行文本溢出隐藏
  10. 2018 Unite大会——《使用UPA工具优化项目》演讲实录