Kafka2.5->Flink1.12->Mysql8(Jark实验改为DDL形式)
##############################################实验目的和环境###############################################
本文是为了在最新的版本上复现[1],环境如下:
组件 | 版本 |
Zookeeper | 3.6.0 |
Flink | 1.12 |
Mysql | 8.0.22-0ubuntu0.20.04.2 |
Kafka | 2.5.0 |
[1]的作者Jark 采用了Java代码的形式,这篇博客对其流程进行了等效简化,
采用Flink SQL Client上纯DDL+SQL 的形式,全篇无一行Java代码.
注意关闭防火墙
service firewalld stop
启动zookeeper,hadoop(这个应该没用,但是平时习惯了,也启动吧),flink,mysql,kafka
138210 StandaloneSessionClusterEntrypoint
24593 ResourceManager
115362 QuorumPeerMain
24006 DataNode
124727 Kafka
138491 TaskManagerRunner
138603 Jps
23817 NameNode
24315 SecondaryNameNode
79162 SqlClient
24798 NodeManager
############################################################################################################
文件作用解析:
Jark给的文件 |
文件的作用 |
需要修改的地方 |
kafka-common.sh |
kafka生产端输入数据 |
brokers/ids/1 localhost改成自己的节点域名 |
run.sh | 用来在 Web UI 中看到拓扑 | 无 |
source-generator.sh |
创建topic 往kafka里面填充数据 |
--broker-list后面改掉 topic改成自己需要的 |
pom.xml | 依赖文件 | 版本号根据自己需要修改即可 |
kafka-consumer.sh | kafka消费端 | |
env.sh | 环境变量设置 |
FLINK_DIR KAFKA_DIR |
src/main/resources/user_behavior.log | 数据来源 | 无 |
src/main/resources/q1.sql |
source table定义 sink table定义 |
connector.properties.0.value connector.properties.1.value connector.url connector.username connector.password |
*.java | 无 |
操作步骤:
操作命令 | 作用 |
一些准备工作 mysql> create database `flink-test`;(因为这里有横杠,所以需要使用``包起来) datagrip建表语句 create table pvuv_sink 注意,dt不要使用varchar,否则会导致无法设定为primary key |
在mysql中建立sink表格 |
intellij运行SqlSubmit |
生成flink-sql-submit.jar 被下面的source-generator.sh调用 |
./source-generator.sh | 创建kafka的topic,然后往里面填充数据 |
./run.sh q1
|
提交成功后,可以在 Web UI 中看到拓扑 |
该实验的kafka主题是user_behavior
############################################################################################################
kafka常用操作如下:
操作 | 命令 | 备注 |
查看topic | $KAFKA/bin/kafka-topics.sh --list --zookeeper Desktop:2181 |
如果想删除topic,可以是: |
往user_behavior这个 topic发送 json 消息 | $KAFKA/bin/kafka-console-producer.sh --broker-list Desktop:9091 --topic user_behavior |
这里可能碰到[2]中的报错,注意检查命令中端口与配置文件server.properties中的listeners的端口严格保持一致 [2]中的报错还可能是某个节点的kafka挂掉导致的. 可能碰到[3] 注意关闭防火墙 |
使用kafka自带消费端测试下消费 | $KAFKA/bin/kafka-console-consumer.sh --bootstrap-server Desktop:9091 --from-beginning --topic user_behavior |
如果kafka自带消费者测试有问题,那么就不用继续往下面做了, 此时如果使用Flink SQL Client来消费也必然会出现问题 |
清除topic中所有数据[6](因为,万一你输错了呢?对吧) | $KAFKA/bin/kafka-topics.sh --zookeeper Desktop:2181 --delete --topic user_behavior |
需要$KAFKA/config/server.properties设置 delete.topic.enable=true |
传入kafka的user_behavior的数据举例如下(完整数据集):
{"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "662867", "item_id":"2244074", "category_id": "1575622", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "561558", "item_id":"3611281", "category_id": "965809", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "894923", "item_id":"3076029", "category_id": "1879194", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "834377", "item_id":"4541270", "category_id": "3738615", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "315321", "item_id":"942195", "category_id": "4339722", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "625915", "item_id":"1162383", "category_id": "570735", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "578814", "item_id":"176722", "category_id": "982926", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "873335", "item_id":"1256540", "category_id": "1451783", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "429984", "item_id":"4625350", "category_id": "2355072", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "866796", "item_id":"534083", "category_id": "4203730", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "937166", "item_id":"321683", "category_id": "2355072", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
############################################################################################################
下面时对q1.sql中的DDL/SQL的简要介绍
DDL/SQL语句 | 作用 |
CREATE TABLE user_log ( |
接收数据源头 |
CREATE TABLE pvuv_sink ( |
数据存储目标 |
INSERT INTO pvuv_sink SELECT DATE_FORMAT(ts, 'yyyy-MM-dd HH:00') dt, COUNT(*) AS pv, COUNT(DISTINCT user_id) AS uv FROM user_log GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd HH:00'); |
对source的数据的处理, 指定要存入的sink |
网页可能会污染上述DDL导致各种报错,以下面为准:
https://gitee.com/appleyuchi/Flink_Code/blob/master/FLINK读写各种数据源/flink-sql-submit/src/main/resources/q1.sql
整个实验操作步骤流程其实就是一句话:
kafka往user_behavior这个topic填入数据以后
启动Flink SQL Client
$FLINK_HOME/bin/sql-client.sh embedded
然后上面2句DDL+1句SQL全部复制到Flink SQL Client按下回车,
就会自动生成任务提交到Flink集群,实验结束.
最终实验效果如下:
一点题外话:
整个实验运作起来后,硬盘磁头一直在响,所以一旦看到mysql中有数据sink以后,
立刻关掉flink集群,不然实在太伤硬盘了.
Reference:
[1]Flink 1.9 实战:使用 SQL 读取 Kafka 并写入 MySQ
[2]Flink 1.10 SQL 读写Kafka
Kafka2.5->Flink1.12->Mysql8(Jark实验改为DDL形式)相关推荐
- flink1.12.0学习笔记第2篇-流批一体API
flink1.12.0学习笔记第 2 篇-流批一体API flink1.12.0学习笔记第1篇-部署与入门 flink1.12.0学习笔记第2篇-流批一体API flink1.12.0学习笔记第3篇- ...
- flink1.12.7+hudi 问题总结
版本:CDH-6.3.2, flink-1.12.7 ,hudi -0.9.0/0.10.0 1.CDH安装flink,需要自己制作parcel,制作过程略; 2.hudi可以自己编译::https: ...
- flink1.12.0学习笔记第1篇-部署与入门
flink1.12.0学习笔记第 1 篇-部署与入门 flink1.12.0学习笔记第1篇-部署与入门 flink1.12.0学习笔记第2篇-流批一体API flink1.12.0学习笔记第3篇-高级 ...
- Flink1.12 - 概述、安装部署及快速入门
1. Flink概述 1.1 Flink官方介绍 flink官网地址 1.2 Flink组件栈 一个计算框架要有长远的发展,必须打造一个完整的 Stack.只有上层有了具体的应用,并能很好的发挥计算 ...
- flink1.12.2+hudi0.9.0测试
1.环境准备 1.1.flink1.12.2 1.1.1 编译包下载:https://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.12.2/fl ...
- 1.安装flink-1.12.2
FLINK on YARN模式 解压安装包: tar -zvxf flink-1.12.2-bin-scala_2.11.tgz /opt/ 修改yarn配置,设置application master ...
- 【Flink】flink-1.12 通过 -t 指定模式后无法指定yarn参数
1.概述 转载:[Flink 纠错]flink-1.12 通过 -t 指定模式后无法指定yarn参数 1.1 问题描述 我们使用flink 1.12提交任务到yarn时,遇到个比较奇怪的问题,我们的提 ...
- Flink1.12.0简单实现wordcount
文章目录 前言 一.Flink1.12.0简单实现wordcount 二.使用步骤 1.引入pom.xml 2.主类 3.运行结果 总结 前言 Flink1.12.0简单实现wordcount 一.F ...
- power BI 中x轴日期值显示英文改为数值形式
在Power BI中操作图表时 , 将日期值放到X轴上, 显示的坐标值形式总事英文月份 这里出一个解决办法 首先我们的源数据时这个样子 我们将数据可视化后 , 是这个样子的 可以看到 X轴显示的都是英 ...
最新文章
- 580显卡驱动_AMD6000系显卡终于来了!3A平台神秘加成?
- 在vmware esx平台创建windows 2003 server群集时无法找到共享磁盘的解决方法
- feedback from waic
- 27行代码AC_How Many Tables HDU - 1213(并查集讲解)
- C#各个版本中的新增特性详解
- Pytorch-张量的创建与使用方法
- golang 内存分析/动态追踪
- [LeetCode] 707.设计链表
- Windows动态链接库使用详解
- 寒武纪 android实习
- 数据科学与大数据技术专业 —— 云计算●虚拟化 课程 期末复习卷及其简答(2)
- 音视频开发:大华摄像头配置RTSP与RTMP地址访问视频画面
- 如何让网页界面变暗色
- 技术工艺 | FPC和PCB有哪些区别?
- Java知识点太多?北大教授点破Java学习秘籍
- TSL2561传感器介绍
- Three.js学习09
- 怎么计算机械连接的工程量,结合GTJ2018,正确计算工程量
- Python作业【六】(语言练习题,稍有难度)
- 平板触控笔有哪些用途?适合ipad画画的电容笔推荐
热门文章
- autpmapper映射忽略某个属性
- java反射使用及性能比较
- SpringBoot使用velocity模板引擎
- GRUB 启动 WIN PE 镜像(ISO)
- 电脑科学性计算机怎么用,怎么使用科学计算器59 000×(1+r)-2
- 主产品清单位于oracle,OPatch failed with error code 73(OracleHomeInventory gets null oracleHomeInfo)...
- 如何把一些不同类型的数据混合存入一片内存中_如何从技术上增强以太坊的隐私性?...
- 激活层是每一层都有吗_89小户型复式这样装,每一层都设计得很棒,完工后秒变小区样板间,邻居前来取经...
- Centos 7或者说linux 怎么一直运行一个项目,自己退出终端也在运行
- Tensorflow Summary: 查看Tensorflow Model pb格式模型的信息