##############################################实验目的和环境###############################################

本文是为了在最新的版本上复现[1],环境如下:

组件 版本
Zookeeper 3.6.0
Flink 1.12
Mysql 8.0.22-0ubuntu0.20.04.2
Kafka 2.5.0

[1]的作者Jark 采用了Java代码的形式,这篇博客对其流程进行了等效简化,

采用Flink SQL Client上纯DDL+SQL 的形式,全篇无一行Java代码.

注意关闭防火墙

service firewalld stop

启动zookeeper,hadoop(这个应该没用,但是平时习惯了,也启动吧),flink,mysql,kafka

138210 StandaloneSessionClusterEntrypoint
24593 ResourceManager
115362 QuorumPeerMain
24006 DataNode
124727 Kafka
138491 TaskManagerRunner
138603 Jps
23817 NameNode
24315 SecondaryNameNode
79162 SqlClient
24798 NodeManager

############################################################################################################

文件作用解析:

Jark给的文件

文件的作用

需要修改的地方

kafka-common.sh

kafka生产端输入数据

brokers/ids/1

localhost改成自己的节点域名

run.sh 用来在 Web UI 中看到拓扑
source-generator.sh

创建topic

往kafka里面填充数据

--broker-list后面改掉

topic改成自己需要的

pom.xml 依赖文件 版本号根据自己需要修改即可
kafka-consumer.sh kafka消费端  
env.sh 环境变量设置 FLINK_DIR
KAFKA_DIR
src/main/resources/user_behavior.log 数据来源
src/main/resources/q1.sql

source table定义

sink table定义

connector.properties.0.value

connector.properties.1.value

connector.url

connector.username

connector.password

*.java  

操作步骤:

操作命令 作用

一些准备工作

mysql> create database `flink-test`;(因为这里有横杠,所以需要使用``包起来)

datagrip建表语句

create table pvuv_sink
(
    dt char(30) not null,
    pv bigint null,
    uv bigint null,
    constraint pvuv_sink_pk
        primary key (dt)
);

注意,dt不要使用varchar,否则会导致无法设定为primary key

在mysql中建立sink表格
intellij运行SqlSubmit

生成flink-sql-submit.jar

被下面的source-generator.sh调用

./source-generator.sh 创建kafka的topic,然后往里面填充数据
./run.sh q1  提交成功后,可以在 Web UI 中看到拓扑

该实验的kafka主题是user_behavior

############################################################################################################

kafka常用操作如下:

操作 命令 备注
查看topic $KAFKA/bin/kafka-topics.sh --list --zookeeper Desktop:2181

如果想删除topic,可以是:

user_behavior这个 topic发送 json 消息 $KAFKA/bin/kafka-console-producer.sh --broker-list Desktop:9091 --topic user_behavior

这里可能碰到[2]中的报错,注意检查命令中端口与配置文件server.properties中的listeners的端口严格保持一致

[2]中的报错还可能是某个节点的kafka挂掉导致的.

可能碰到[3]

注意关闭防火墙

使用kafka自带消费端测试下消费 $KAFKA/bin/kafka-console-consumer.sh --bootstrap-server Desktop:9091 --from-beginning --topic user_behavior

如果kafka自带消费者测试有问题,那么就不用继续往下面做了,

此时如果使用Flink SQL Client来消费也必然会出现问题

清除topic中所有数据[6](因为,万一你输错了呢?对吧) $KAFKA/bin/kafka-topics.sh --zookeeper Desktop:2181 --delete --topic user_behavior

需要$KAFKA/config/server.properties设置

delete.topic.enable=true

传入kafka的user_behavior的数据举例如下(完整数据集):

{"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "662867", "item_id":"2244074", "category_id": "1575622", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "561558", "item_id":"3611281", "category_id": "965809", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "894923", "item_id":"3076029", "category_id": "1879194", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "834377", "item_id":"4541270", "category_id": "3738615", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "315321", "item_id":"942195", "category_id": "4339722", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "625915", "item_id":"1162383", "category_id": "570735", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "578814", "item_id":"176722", "category_id": "982926", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "873335", "item_id":"1256540", "category_id": "1451783", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "429984", "item_id":"4625350", "category_id": "2355072", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "866796", "item_id":"534083", "category_id": "4203730", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "937166", "item_id":"321683", "category_id": "2355072", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}

############################################################################################################

下面时对q1.sql中的DDL/SQL的简要介绍

DDL/SQL语句 作用

CREATE TABLE user_log (
    user_id VARCHAR,
    item_id VARCHAR,
    category_id VARCHAR,
    behavior VARCHAR,
    ts TIMESTAMP
) WITH (
    'connector.type' = 'kafka',
    'connector.version' = 'universal',
    'connector.topic' = 'user_behavior',
    'connector.startup-mode' = 'earliest-offset',
    'connector.properties.0.key' = 'zookeeper.connect',
    'connector.properties.0.value' = 'Desktop:2181',
    'connector.properties.1.key' = 'bootstrap.servers',
    'connector.properties.1.value' = 'Desktop:9091',
    'update-mode' = 'append',
    'format.type' = 'json',
    'format.derive-schema' = 'true'
);

接收数据源头

CREATE TABLE pvuv_sink (
    dt VARCHAR,
    pv BIGINT,
    uv BIGINT
) WITH (
    'connector.type' = 'jdbc',
    'connector.url' = 'jdbc:mysql://Desktop:3306/flink-test',
    'connector.table' = 'pvuv_sink',
    'connector.username' = 'appleyuchi',
    'connector.password' = 'appleyuchi',
    'connector.write.flush.max-rows' = '1'
);

数据存储目标
INSERT INTO pvuv_sink
SELECT
  DATE_FORMAT(ts, 'yyyy-MM-dd HH:00') dt,
  COUNT(*) AS pv,
  COUNT(DISTINCT user_id) AS uv
FROM user_log
GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd HH:00');

对source的数据的处理,

指定要存入的sink

网页可能会污染上述DDL导致各种报错,以下面为准:

https://gitee.com/appleyuchi/Flink_Code/blob/master/FLINK读写各种数据源/flink-sql-submit/src/main/resources/q1.sql

整个实验操作步骤流程其实就是一句话:

kafka往user_behavior这个topic填入数据以后

启动Flink SQL Client

$FLINK_HOME/bin/sql-client.sh embedded

然后上面2句DDL+1句SQL全部复制到Flink SQL Client按下回车,

就会自动生成任务提交到Flink集群,实验结束.

最终实验效果如下:

一点题外话:

整个实验运作起来后,硬盘磁头一直在响,所以一旦看到mysql中有数据sink以后,

立刻关掉flink集群,不然实在太伤硬盘了.

Reference:

[1]Flink 1.9 实战:使用 SQL 读取 Kafka 并写入 MySQ

[2]Flink 1.10 SQL 读写Kafka

Kafka2.5->Flink1.12->Mysql8(Jark实验改为DDL形式)相关推荐

  1. flink1.12.0学习笔记第2篇-流批一体API

    flink1.12.0学习笔记第 2 篇-流批一体API flink1.12.0学习笔记第1篇-部署与入门 flink1.12.0学习笔记第2篇-流批一体API flink1.12.0学习笔记第3篇- ...

  2. flink1.12.7+hudi 问题总结

    版本:CDH-6.3.2, flink-1.12.7 ,hudi -0.9.0/0.10.0 1.CDH安装flink,需要自己制作parcel,制作过程略; 2.hudi可以自己编译::https: ...

  3. flink1.12.0学习笔记第1篇-部署与入门

    flink1.12.0学习笔记第 1 篇-部署与入门 flink1.12.0学习笔记第1篇-部署与入门 flink1.12.0学习笔记第2篇-流批一体API flink1.12.0学习笔记第3篇-高级 ...

  4. Flink1.12 - 概述、安装部署及快速入门

    1. Flink概述 1.1 Flink官方介绍 flink官网地址 1.2 Flink组件栈  一个计算框架要有长远的发展,必须打造一个完整的 Stack.只有上层有了具体的应用,并能很好的发挥计算 ...

  5. flink1.12.2+hudi0.9.0测试

    1.环境准备 1.1.flink1.12.2 1.1.1 编译包下载:https://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.12.2/fl ...

  6. 1.安装flink-1.12.2

    FLINK on YARN模式 解压安装包: tar -zvxf flink-1.12.2-bin-scala_2.11.tgz /opt/ 修改yarn配置,设置application master ...

  7. 【Flink】flink-1.12 通过 -t 指定模式后无法指定yarn参数

    1.概述 转载:[Flink 纠错]flink-1.12 通过 -t 指定模式后无法指定yarn参数 1.1 问题描述 我们使用flink 1.12提交任务到yarn时,遇到个比较奇怪的问题,我们的提 ...

  8. Flink1.12.0简单实现wordcount

    文章目录 前言 一.Flink1.12.0简单实现wordcount 二.使用步骤 1.引入pom.xml 2.主类 3.运行结果 总结 前言 Flink1.12.0简单实现wordcount 一.F ...

  9. power BI 中x轴日期值显示英文改为数值形式

    在Power BI中操作图表时 , 将日期值放到X轴上, 显示的坐标值形式总事英文月份 这里出一个解决办法 首先我们的源数据时这个样子 我们将数据可视化后 , 是这个样子的 可以看到 X轴显示的都是英 ...

最新文章

  1. 580显卡驱动_AMD6000系显卡终于来了!3A平台神秘加成?
  2. 在vmware esx平台创建windows 2003 server群集时无法找到共享磁盘的解决方法
  3. feedback from waic
  4. 27行代码AC_How Many Tables HDU - 1213(并查集讲解)
  5. C#各个版本中的新增特性详解
  6. Pytorch-张量的创建与使用方法
  7. golang 内存分析/动态追踪
  8. [LeetCode] 707.设计链表
  9. Windows动态链接库使用详解
  10. 寒武纪 android实习
  11. 数据科学与大数据技术专业 —— 云计算●虚拟化 课程 期末复习卷及其简答(2)
  12. 音视频开发:大华摄像头配置RTSP与RTMP地址访问视频画面
  13. 如何让网页界面变暗色
  14. 技术工艺 | FPC和PCB有哪些区别?
  15. Java知识点太多?北大教授点破Java学习秘籍
  16. TSL2561传感器介绍
  17. Three.js学习09
  18. 怎么计算机械连接的工程量,结合GTJ2018,正确计算工程量
  19. Python作业【六】(语言练习题,稍有难度)
  20. 平板触控笔有哪些用途?适合ipad画画的电容笔推荐

热门文章

  1. autpmapper映射忽略某个属性
  2. java反射使用及性能比较
  3. SpringBoot使用velocity模板引擎
  4. GRUB 启动 WIN PE 镜像(ISO)
  5. 电脑科学性计算机怎么用,怎么使用科学计算器59 000×(1+r)-2
  6. 主产品清单位于oracle,OPatch failed with error code 73(OracleHomeInventory gets null oracleHomeInfo)...
  7. 如何把一些不同类型的数据混合存入一片内存中_如何从技术上增强以太坊的隐私性?...
  8. 激活层是每一层都有吗_89小户型复式这样装,每一层都设计得很棒,完工后秒变小区样板间,邻居前来取经...
  9. Centos 7或者说linux 怎么一直运行一个项目,自己退出终端也在运行
  10. Tensorflow Summary: 查看Tensorflow Model pb格式模型的信息