目录

一、背景

二、流程

三、案例

1.flink sql读取 Kafka 并写入 MySQL

source

sink

insert

2.flinksql读kafka写入kudu

source

sink

insert

四、注意点

1.断点续传

2.实时采集

3.回溯问题

一、背景
使用flink sql实时同步一下数据

二、流程
总的来说就三步

source–>>sink->>insert

三、案例
1.flink sql读取 Kafka 并写入 MySQL
source
CREATE TABLE source_table (
user_id VARCHAR,
item_id VARCHAR,
category_id VARCHAR,
behavior VARCHAR,
ts TIMESTAMP
) WITH (
‘connector.type’ = ‘kafka’, – 使用 kafka connector
‘connector.version’ = ‘universal’, – kafka 版本,universal 支持 0.11 以上的版本
‘connector.topic’ = ‘user_behavior’, – kafka topic
‘connector.startup-mode’ = ‘earliest-offset’, – 从起始 offset 开始读取
‘connector.properties.0.key’ = ‘zookeeper.connect’, – 连接信息
‘connector.properties.0.value’ = ‘localhost:2181’,
‘connector.properties.1.key’ = ‘bootstrap.servers’,
‘connector.properties.1.value’ = ‘localhost:9092’,
‘update-mode’ = ‘append’,
‘format.type’ = ‘json’, – 数据源格式为 json
‘format.derive-schema’ = ‘true’ – 从 DDL schema 确定 json 解析规则
)
sink
CREATE TABLE sink_table (
dt VARCHAR,
pv BIGINT,
uv BIGINT
) WITH (
‘connector.type’ = ‘jdbc’, – 使用 jdbc connector
‘connector.url’ = ‘jdbc:mysql://localhost:3306/flink-test’, – jdbc url
‘connector.table’ = ‘pvuv_sink’, – 表名
‘connector.username’ = ‘username’, – 用户名
‘connector.password’ = ‘password’, – 密码
‘connector.write.flush.max-rows’ = ‘1’ – 默认5000条,为了演示改为1条
)
insert
INSERT INTO sink_table
SELECT
DATE_FORMAT(ts, ‘yyyy-MM-dd HH:00’) as dt,
COUNT(*) as pv,
COUNT(DISTINCT user_id) as uv
FROM source_table
GROUP BY DATE_FORMAT(ts, ‘yyyy-MM-dd HH:00’)
2.flinksql读kafka写入kudu source
– kafka source
drop table if exists source_table;
CREATE TABLE source_table (
user_id VARCHAR
,item_id VARCHAR
,category_id VARCHAR
,behavior INT
,ts TIMESTAMP(3)
,process_time as proctime()
, WATERMARK FOR ts AS ts
) WITH (
‘connector’ = ‘kafka’
,‘topic’ = ‘user_behavior’
,‘properties.bootstrap.servers’ = ‘venn:9092’
,‘properties.group.id’ = ‘source_table’
,‘scan.startup.mode’ = ‘group-offsets’
,‘format’ = ‘json’
);
sink
– kafka sink
drop table if exists sink_table;
CREATE TABLE sink_table (
user_id STRING
,item_id STRING
,category_id STRING
,ts TIMESTAMP(3)
) WITH (
‘connector.type’ = ‘kudu’
,‘kudu.masters’ = ‘venn:7051,venn:7151,venn:7251’
,‘kudu.table’ = ‘source_table’
,‘kudu.hash-columns’ = ‘user_id’
,‘kudu.primary-key-columns’ = ‘user_id’
,‘kudu.max-buffer-size’ = ‘5000’
,‘kudu.flush-interval’ = ‘1000’
);
insert
– insert
insert into sink_table
select user_id, item_id, category_id,ts
from source_table;
四、注意点
1.断点续传
断点续传是指数据同步任务在运行过程中因各种原因导致任务失败,不需要重头同步数据,只需要从上次失败的位置继续同步即可,类似于下载文件时因网络原因失败,不需要重新下载文件,只需要继续下载就行,可以大大节省时间和计算资源。

默认关闭,开启的话调参 isRestore:true

2.实时采集
根据数据源的数据是否实时变化可以把数据同步分为离线数据同步和实时数据同步,上面介绍的断点续传就是离线数据同步里的功能,实时采集其实就是实时数据同步,当数据源里的数据发生了增删改操作,同步任务监听到这些变化,将变化的数据实时同步到目标数据源。除了数据实时变化外,实时采集和离线数据同步的另一个区别是:实时采集任务是不会停止的,任务会一直监听数据源是否有变化。

3.回溯问题
例如mysql是事务型数据库,会update,最新的消息发过去,得回撤更新前的消息,update-和update+两条消息,数据都在state里。

简单举个例子,统计男女数量,一开始mysql里是男,然后mysql更新为女了,这时候你接收的kafka,消息都会过来,state里一开始存着男,然后把男回撤,女进来,就要删除男新增女,state一般在rocksdb里,可以设置table.exec.state.ttl 窗口时间。

相关参数
val tEnv: TableEnvironment = …
val configuration = tEnv.getConfig().getConfiguration()

configuration.setString(“table.exec.mini-batch.enabled”, “true”) // 启用
configuration.setString(“table.exec.mini-batch.allow-latency”, “5 s”) // 缓存超时时长
configuration.setString(“table.exec.mini-batch.size”, “5000”) // 缓存大小

ps:因为本人这方面不是很专业,还在学习的阶段,有问题的话大家可以多多指教哈~

文章转自:flink sql实战案例_大数据系统-答学网

作者:答学网,转载请注明原文链接:http://www.dxzl8.com/

flink sql实战案例相关推荐

  1. Flink应用实战案例50篇(一)- Flink SQL 在京东的优化实战

    一.背景 目前,京东搜索推荐的数据处理流程如上图所示.可以看到实时和离线是分开的,离线数据处理大部分用的是 Hive / Spark,实时数据处理则大部分用 Flink / Storm. 这就造成了以 ...

  2. spark—SQL实战案例

    学习内容 一.sparkSQL在IDEA的使用 1.环境配置 2.快速入门 二.sparkSQL实战案例 1.数据准备 2.案例分析 3.功能实现 4.代码实现 一.sparkSQL在IDEA的使用 ...

  3. Flink应用实战案例50篇(五)-Apache Flink 在 bilibili 的多元化探索与实践

    一.B 站实时的前世与今生 1. 生态场景辐射     说起实时计算的未来,关键词就在于数据的实效性.首先从整个大数据发展的生态上,来看它的核心场景辐射:在大数据发展的初期,核心是以面向天为粒度的离线 ...

  4. Flink从入门到精通100篇(二十二)- Flink应用实战案例:如何实现网络流控与反压机制

    目录 Flink 流处理为什么需要网络流控? Flink V1.5 版之前网络流控介绍 Flink V1.5 版之前的反压策略存在的问题 Credit的反压策略实现原理,Credit是如何解决 Fli ...

  5. Flink最锋利的武器:Flink SQL入门和实战 | 附完整实现代码

    作者 | 机智的王知无 转载自大数据技术与架构(ID: import_bigdata) 一.Flink SQL 背景 Flink SQL 是 Flink 实时计算为简化计算模型,降低用户使用实时计算门 ...

  6. Flink 最锋利的武器:Flink SQL 入门和实战带你了解NBA球星数据

    一.Flink SQL 背景 Flink SQL 是 Flink 实时计算为简化计算模型,降低用户使用实时计算门槛而设计的一套符合标准 SQL 语义的开发语言. 自 2015 年开始,阿里巴巴开始调研 ...

  7. Flink 最锋利的武器:Flink SQL 入门和实战

    学习路径:<2021年最新从零到大数据专家学习路径指南> 面      试:<2021年最新版大数据面试题全面开启更新> [注意]:Flink1.9版本后的Flink SQL使 ...

  8. Flink入门第十二课:DataStream api/Flink sql实现每隔5分钟统计最近一小时热门商品小案例

    用到的数据文件 用到的数据文件 链接:https://pan.baidu.com/s/1uCk-IF4wWVfUkuuTAKaD0w 提取码:2hmu 1.需求 & 数据 用户行为数据不断写入 ...

  9. 实战 | flink sql 实时 TopN

    实战 | flink sql 实时 TopN 1.背景篇 2.难点剖析篇-此类指标建设.保障的难点 2.1.数据建设 2.2.数据保障 2.3.数据服务保障 3.数据建设篇-具体实现方案详述 3.1. ...

最新文章

  1. 同一个内容,对比Java、C、PHP、Python的代码量,结局意外了
  2. java 存取xml数据_JAVA读取XML文件数据
  3. 10种JavaScript开发者必备的VS Code插件
  4. Python:列表、集合等交集、并集、差集、非集简介及其代码实现之详细攻略
  5. 谈谈离散卷积和卷积神经网络
  6. java 登录牵手_Java: HttpURLConnection 模拟登录方法 (带cookie 的Post/Get)_20160908_七侠镇莫尛貝...
  7. js笔记(八)ES6
  8. 隐藏Tabview顶部上的空白区域统一去掉图片名后缀
  9. pdn阻抗测试_信号线的特征阻抗和PDN的阻抗区别
  10. 国内开发商品基金的一些设想
  11. GB28181国标流媒体服务语音对讲-前端页面采集语音调用接口示例
  12. RadomStuDemo
  13. 复古冰雪传奇H5游戏详细图文架设教程
  14. Microsoft SQL Server 2000的版本区别及选择
  15. 家庭局域网_如何用电视盒子局域网共享电脑资源,赶紧Mark!
  16. 信息论——JS散度(Jensen-Shannon)
  17. 机器学习:什么是无监督学习(Unsupervised Learning)?
  18. 昆石VOS2009/VOS3000 2.1.6.00 操作指南
  19. Quartus无法选择器件库
  20. 蓝桥ROS中使用fishros一键安装

热门文章

  1. (三)数据库笔记:SQL
  2. Redis深入理解五 :Redis主从架构、哨兵架构、高可用集群模式
  3. 计算机毕业设计选题之 旅游行程管理推荐系统
  4. [含论文+开题报告+答辩PPT+源码等]SSM校园食堂点餐系统订餐就餐餐厅(已降重)
  5. 猿如意中的【Java SE Development Kit 8】工具详情介绍
  6. linux卷组命令,vgcreate命令 – 创建卷组
  7. 数字化转型走基层:太古可口可乐终于关闭了中国的数据中心
  8. 模拟法计算大数加减乘除 (附带例题)
  9. python中一些函数使用(tile,排序,搜索,计数)
  10. UnityShdaer笔记第二课笔记-贴花