-- 1 Mysql 创建mysql 表
create table IF NOT EXISTS itcast_nev.web_chat_ems(id int auto_increment primary key comment '主键' ,create_date_time timestamp null comment '数据创建时间',session_id varchar(48) default '' not null comment 'sessionId',sid varchar(48) collate utf8_bin  default '' not null comment '访客id',create_time datetime null comment '会话创建时间',seo_source varchar(255) collate utf8_bin default '' null comment '搜索来源',seo_keywords varchar(512) collate utf8_bin default '' null comment '关键字',ip varchar(48) collate utf8_bin  default '' null comment 'IP地址',area varchar(255) collate utf8_bin default '' null comment '地域',country varchar(16) collate utf8_bin  default '' null comment '所在国家',province varchar(16) collate utf8_bin  default '' null comment '省',city varchar(255) collate utf8_bin default '' null comment '城市',origin_channel varchar(32) collate utf8_bin  default '' null comment '投放渠道',user varchar(255) collate utf8_bin default '' null comment '所属坐席',manual_time datetime null comment '人工开始时间',begin_time datetime null comment '坐席领取时间 ',end_time datetime null comment '会话结束时间',last_customer_msg_time_stamp datetime null comment '客户最后一条消息的时间',last_agent_msg_time_stamp datetime null comment '坐席最后一下回复的时间',reply_msg_count int(12) default 0  null comment '客服回复消息数',msg_count int(12) default 0  null comment '客户发送消息数',browser_name varchar(255) collate utf8_bin default '' null comment '浏览器名称',os_info varchar(255) collate utf8_bin default '' null comment '系统名称'
);-- 2 Flink sql 创建mysql-cdc表
create table tbl_web_chat_ems_mysql (id string PRIMARY KEY NOT ENFORCED,create_date_time string,session_id string,sid string,create_time string,seo_source string,seo_keywords string,ip string,area string,country string,province string,city string,origin_channel string,`user` string,manual_time string,begin_time string,end_time string,last_customer_msg_time_stamp string,last_agent_msg_time_stamp string,reply_msg_count string,msg_count string,browser_name string,os_info string
)WITH('connector' = 'mysql-cdc','hostname' = 'node1.itcast.cn','port' = '3306','username' = 'root','password' = '123456','server-time-zone' = 'Asia/Shanghai','debezium.snapshot.mode' = 'initial','database-name' = 'itcast_nev','table-name' = 'web_chat_ems'
);-- 3 Flink SQL Connector Hudi创建hudi表--
create table edu_web_chat_ems_hudi (id string PRIMARY KEY NOT ENFORCED,create_date_time string,session_id string,sid string,create_time string,seo_source string,seo_keywords string,ip string,area string,country string,province string,city string,origin_channel string,`user` string,manual_time string,begin_time string,end_time string,last_customer_msg_time_stamp string,last_agent_msg_time_stamp string,reply_msg_count string,msg_count string,browser_name string,os_info string,part STRING
)
PARTITIONED BY (part)
WITH('connector'='hudi','path'= 'hdfs://node1.itcast.cn:8020/hudi-warehouse/edu_web_chat_ems_hudi', 'table.type'= 'MERGE_ON_READ','hoodie.datasource.write.recordkey.field'= 'id', 'write.precombine.field'= 'create_date_time','write.tasks'= '1','write.rate.limit'= '2000', 'compaction.tasks'= '1', 'compaction.async.enabled'= 'true','compaction.trigger.strategy'= 'num_commits','compaction.delta_commits'= '1','changelog.enabled'= 'true'
);-- 插入数据
insert into edu_web_chat_ems_hudi
select *, CAST(CURRENT_DATE AS STRING) AS part from tbl_web_chat_ems_mysql;-- 4 创建hive-hudi-映射表
CREATE EXTERNAL TABLE edu_hudi.tbl_web_chat_ems (id string,create_date_time string,session_id string,sid string,create_time string,seo_source string,seo_keywords string,ip string,area string,country string,province string,city string,origin_channel string,`user` string,manual_time string,begin_time string,end_time string,last_customer_msg_time_stamp string,last_agent_msg_time_stamp string,reply_msg_count string,msg_count string,browser_name string,os_info string
)
PARTITIONED BY (day_str string)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT 'org.apache.hudi.hadoop.HoodieParquetInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION '/hudi-warehouse/edu_web_chat_ems_hudi' ;ALTER TABLE edu_hudi.tbl_web_chat_ems ADD IF NOT EXISTS PARTITION(day_str='2021-11-29')
location '/hudi-warehouse/edu_web_chat_ems_hudi/2021-11-29' ;SELECT COUNT(1) AS total FROM edu_hudi.tbl_web_chat_ems WHERE day_str = '2021-11-29';
SELECT id, session_id, ip, province FROM edu_hudi.tbl_web_chat_ems WHERE day_str = '2021-11-29' LIMIT 10;-- 5.6 presto保存MySQL数据库
-- presto可以操作不同数据源:将hive数据保存到mysql
presto:edu_hudi>
INSERT INTO mysql.itcast_rpt.web_pv (report_date, report_total)
SELECT day_value, COUNT(id) AS total FROM (SELECT id, format_datetime(from_unixtime(cast(create_time as bigint) / 1000), 'yyyy-MM-dd') AS day_valueFROM hive.edu_hudi.tbl_web_chat_ems WHERE day_str = '2021-11-29'
) GROUP BY day_value -- 实时 streaming=======================================================================================================-- 1-1. MySQL 数据库表
CREATE TABLE `itcast_rpt`.`realtime_web_pv` (`report_date` varchar(255) NOT NULL,`report_total` bigint(20) NOT NULL,PRIMARY KEY (`report_date`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;-- 1-2. Flink SQL Connector Hudi
CREATE TABLE edu_web_chat_ems_hudi (id string PRIMARY KEY NOT ENFORCED,create_date_time string,session_id string,sid string,create_time string,seo_source string,seo_keywords string,ip string,area string,country string,province string,city string,origin_channel string,`user` string,manual_time string,begin_time string,end_time string,last_customer_msg_time_stamp string,last_agent_msg_time_stamp string,reply_msg_count string,msg_count string,browser_name string,os_info string,part STRING
)
PARTITIONED BY (part)
WITH('connector'='hudi','path'= 'hdfs://node1.itcast.cn:8020/hudi-warehouse/edu_web_chat_ems_hudi', 'table.type'= 'MERGE_ON_READ','hoodie.datasource.write.recordkey.field'= 'id', 'write.precombine.field'= 'create_date_time','read.streaming.enabled' = 'true','read.streaming.check-interval' = '5','read.tasks' = '1'
);--  1-3.Flink SQL  保存MySQL数据库-- 创建 Flink SQL Connector MySQL
CREATE TABLE realtime_web_pv_mysql (report_date STRING,report_total BIGINT, PRIMARY KEY (report_date) NOT ENFORCED
) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://node1.itcast.cn:3306/itcast_rpt','driver' = 'com.mysql.cj.jdbc.Driver','username' = 'root','password' = '123456','table-name' = 'realtime_web_pv'
); --1-4. Flink SQL 指标计算 INSERT INTO 插入统计结果,存储至视图ViewINSERT INTO  realtime_web_pv_mysql
SELECT day_value, COUNT(id) AS total FROM (SELECTFROM_UNIXTIME(CAST(create_time AS BIGINT) / 1000, 'yyyy-MM-dd') AS day_value, idFROM edu_web_chat_ems_hudiWHERE part = '2021-12-28'
) GROUP BY  day_value;

Mysql+Flink CDC +Hudi+Presto+mysql相关推荐

  1. Flink系列之:Flink CDC深入了解MySQL CDC连接器

    Flink系列之:Flink CDC深入了解MySQL CDC连接器 一.增量快照特性 1.增量快照读取 2.并发读取 3.全量阶段支持 checkpoint 4.无锁算法 5.MySQL高可用性支持 ...

  2. Flink CDC 系列 | 构建 MySQL 和 Postgres 上的 Streaming ETL

    摘要:本篇教程将展示如何基于 Flink CDC 快速构建 MySQL 和 Postgres 的流式 ETL. Flink-CDC 项目地址: https://github.com/ververica ...

  3. 37 手游基于 Flink CDC + Hudi 湖仓一体方案实践

    简介: 介绍了 37 手游为何选择 Flink 作为计算引擎,并如何基于 Flink CDC + Hudi 构建新的湖仓一体方案. 本文作者是 37 手游大数据开发徐润柏,介绍了 37 手游为何选择 ...

  4. mysql中cdc什么意思,MySQL的CDC源表

    本文为您介绍MySQL的CDC(Change Data Capture)源表DDL定义.WITH参数.类型映射和代码示例. 注意 本文仅适用于VVP 2.3.0及以上版本. 仅支持单并发消费CDC-M ...

  5. Flink CDC 实时同步mysql

    前言 在实际开发中,需要做数据同步的场景是非常多的,比如不同的应用之间不想直接通过RPC的方式进行数据交互,或者说下游应用需要检测来自上游应用的某些业务指标数据的变化时,这些都可以考虑使用数据同步的方 ...

  6. Flink CDC + Hudi 海量数据入湖在顺丰的实践

    摘要:本文整理自顺丰大数据研发工程师覃立辉在 5月 21 日 Flink CDC Meetup 的演讲.主要内容包括: 顺丰数据集成背景 Flink CDC 实践问题与优化 未来规划 Tips:点击「 ...

  7. Flink CDC 将MySQL的数据写入Hudi实践

    Flink CDC + Hudi实践 一.依赖关系 1.Maven依赖 2.SQL客户端JAR 二.设置MySQL服务器 1.创建MySQL用户: 2.向用户授予所需的权限: 3.最终确定用户的权限: ...

  8. Doris Connector 结合 Flink CDC 实现 MySQL 分库分表 Exactly Once精准接入

    导读:本篇文档将演示如何使用 Apache Doris Flink Connector 结合 Flink CDC 以及 Doris Stream Load 的两阶段提交,实现 MySQL 数据库分库分 ...

  9. Flink CDC (Mysql为例)

    背景 业务中经常出现一些千万乃至亿级别的大表,此时可能考虑分库分表(Sharding-JDBC.MyCat等方案),也常同步数据进入ES中:同步数据这一业务场景中,Flink CDC是一个很不错的解决 ...

最新文章

  1. 编写程序判断等腰、等边或者普通三角形
  2. mysql userstat_mysql 中记录用户登录错误日志方法小结
  3. echart旭日图_150Echarts - 旭日图(Sunburst Label Rotate)
  4. OpenInventor开发笔记:解决FaceSet的填充问题
  5. BugkuCTF-Misc:做个游戏(08067CTF)
  6. java成绩管理系统论文总结,JAVA论文成绩管理系统课程设计
  7. firefox 接受post 不完整_HTTP中GET与POST的区别,99 %的人都理解错了
  8. AI又输了!中国传奇Dota2冠军联手,OpenAI快速进化然并卵
  9. 用递归解决冒泡排序问题
  10. Java byte变成无符号整数~~~
  11. javascript 学习--javascript高级程序设计
  12. linux虚拟机安装samba服务,在虚拟机Redhat Linux下安装Samba服务器分享
  13. WIN7下安装IIS
  14. Python词频统计与杨辉三角
  15. TypeError: can only concatenate list (not “int“) to list
  16. 苹果笔记本没有计算机管理员,Mac管理员账户丢失怎么办
  17. ReportingService报表入门
  18. Crash监控神器之腾讯Bugly
  19. 计算机应用技术专业课程视频,微视频技术在计算机专业课程教学中的应用研究...
  20. 内置python数据类型_python基础之数据类型及内置方法

热门文章

  1. mysql5.7.14安装版教程_MySQL5.7.14-下载安装教程--gt;MySQL数据库语句详细教程
  2. bitcomet比特彗星教程(bt种子下载)
  3. Flarum论坛软件+宝塔 安装教程
  4. JavaScript一些常用的正则表达式
  5. 小学四年级家长计算机,小学四年级家长的寄语
  6. Vue前端项目-主页布局-左侧导航菜单(静态)
  7. JFinal Weixin 1.9 发布,微信极速 SDK
  8. 2022年,小家电向左or向右?
  9. echarts.js实现雷达图
  10. native2ascii的用法