文章目录

  • 一文带你入门flink sql
    • 写在前面
    • 环境准备
  • 正文
  • 遇到的一些问题
    • 错误一
    • 错误二
    • 错误三

一文带你入门flink sql

写在前面

本次实战主要是通过Flink SQL Client消费kafka的实时消息,再用各种SQL操作对数据进行查询统计。

环境准备

具体的环境安装过程就不在这里写了,网上很多资料,大家自己查阅按照就好了。我说下我本地的环境:

  • flink 1.12.4
  • mysql 8.0.25
  • kafka 2.8.0

另外就是,本次示例需要用到以下几个jar包:

flink-sql-connector-kafka_2.11-1.12.4.jar
flink-connector-jdbc_2.11-1.12.4.jar
mysql-connector-java-5.1.48.jar

把他们拷贝到flink安装目录lib目录下。

flink输出的结果,会落到一张mysql的表,也就是我们的sink表,这个表要提前建好。

CREATE TABLE `pvuv_sink` (`dt` varchar(100) DEFAULT NULL,`pv` bigint DEFAULT NULL,`uv` bigint DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3

三个字段分别表示时间,pv值和uv值。

正文

先启动flink以及flink sql的客户端。

$ ./bin/start-cluster.sh
$ .bin/sql-client.sh embedded

这样就开启了一个sql client的客户端。

接着在客户端执行下面这段sql,这相当于启动了一个source table进行监听我们的输入数据流。

CREATE TABLE user_log (user_id VARCHAR,item_id VARCHAR,category_id VARCHAR,behavior VARCHAR,ts TIMESTAMP(3)) WITH ('connector.type' = 'kafka', -- 使用 kafka connector'connector.version' = 'universal',  -- kafka 版本,universal 支持 0.11 以上的版本'connector.topic' = 'user',  -- kafka topic'connector.startup-mode' = 'earliest-offset', -- 从起始 offset 开始读取'connector.properties.0.key' = 'zookeeper.connect',  -- 连接信息'connector.properties.0.value' = 'localhost:2181','connector.properties.1.key' = 'bootstrap.servers','connector.properties.1.value' = 'localhost:9092','update-mode' = 'append','format.type' = 'json',  -- 数据源格式为 json'format.derive-schema' = 'true' -- 从 DDL schema 确定 json 解析规则);

执行成功的话,会返回:

[INFO] Table has been created

解释下这段sql,flink会帮我们创建一张表,这个表的数据来源于kafka的消息,对应的topic是user,数据的格式是json。其它的信息都好理解,不做过多解释了。执行成功后,就开启监听了。

我们可以select下,看看表的情况:

因为还没有输入数据,所以表是空的。

然后执行sink sql,也就是输出数据的表,这个表前面我们提前建好了,在flink sql这里配置下:

CREATE TABLE pvuv_sink (dt VARCHAR,pv BIGINT,uv BIGINT
) WITH ('connector.type' = 'jdbc','connector.url' = 'jdbc:mysql://localhost:3306/flink-test','connector.table' = 'pvuv_sink','connector.username' = 'root','connector.password' = '11111111','connector.write.flush.max-rows' = '1'
);

然后编写计算逻辑,逻辑比较简单,统计每个小时的pv和uv。

INSERT INTO pvuv_sink(dt, pv, uv)
SELECTDATE_FORMAT(ts, 'yyyy-MM-dd HH:00') dt,COUNT(*) AS pv,COUNT(DISTINCT user_id) AS uv
FROM user_log
GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd HH:00');

执行后,flink就会启动一个job在后台执行。

我们可以通过

http://localhost:8081/#/overview

这个地址看到任务的详细情况。

然后我们在本地启动一个kafka的服务,然后再启动一个producer模拟发送数据。

kafka是基于zookeeper的,启动kafka之前,需要先启动zookeeper

/usr/local/Cellar/kafka/2.8.0/bin/zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties &

启动kafka

/usr/local/Cellar/kafka/2.8.0/bin/kafka-server-start /usr/local/etc/kafka/server.properties &

查看启动是否成功

创建topic,注意和上面source table的配置保持一致。

kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic user

启动一个控制台的生产者,

kafka-console-producer --broker-list localhost:9092 --topic user

发送两条消息试试:

{"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "662867", "item_id":"2244074", "category_id": "1575622", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}

去mysql看下pvuv_sink表,发现已经有数据了。

遇到的一些问题

在运行flink sql的时候踩过一些坑,这里列举下帮大家避坑。

错误一

java.lang.NoSuchMethodError: 'boolean org.apache.flink.table.api.TableColumn.isGenerated()'

这个是因为flink-jdbc的版本搞错了导致的。

错误二

Flink SQL> INSERT INTO pvuv_sink(dt, pv, uv)
> SELECT
>   DATE_FORMAT(ts, 'yyyy-MM-dd HH:00') dt,
>   COUNT(*) AS pv,
>   COUNT(DISTINCT user_id) AS uv
> FROM user_log
> GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd HH:00');
[INFO] Submitting SQL update statement to the cluster...
[ERROR] Could not execute SQL statement. Reason:
java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/ByteArrayDeserializer

这个是因为我一开始用错了lib,应该是

flink-sql-connector-kafka_2.11-1.12.4.jar

而不是

flink-connector-kafka_2.12-1.12.4.jar

错误三

[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassCastException: class org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.MissingNode cannot be cast to class org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode (org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.MissingNode and org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode are in unnamed module of loader 'app')

参考

  • https://blog.csdn.net/boling_cavalry/article/details/106038219
  • https://issues.apache.org/jira/browse/FLINK-19995

一文带你入门flink sql相关推荐

  1. java etl工具_一文带你入门ETL工具-datax的简单使用

    什么是ETL? ETL负责将分布的.异构数据源中的数据如关系数据.平面数据文件等抽取到临时中间层后进行清洗.转换.集成,最后加载到数据仓库或数据集市中,成为联机分析处理.数据挖掘的基础. ETL是数据 ...

  2. 一文带你入门go语言

    一文带你入门go语言 go/golang是一门google开发的编程语言,其代码简洁易读,天生支持并发,完美契合当今互联网生态. 目前Go语言已经⼴泛应用于人工智能.云计算开发.容器虚拟化.⼤数据开发 ...

  3. 一文带你入门 SQL

      同学们可以结合本人的另外一篇博客,结合着使用~   使用 SQL 实战处理2020国赛C题数据 1. SQL 简介 ​ SQL (Structured Query Language:结构化查询语言 ...

  4. sql查询每科成绩的最高分_数据分析SQL查询:一文带你入门到掌握

    [背景介绍] 在一家知名电商企业的BI部门实习四个多月,岗位为数据分析.日常工作中打交道最多的就是SQL和EXCEL,在实习之前SQL技能只会简单的增删改查语句,第一周实习经理甩了一份业务常见绩效取数 ...

  5. 一文带你了解 Flink Forward 柏林站全部重点内容

    10 月 7 日 - 9 日,随着 70 周年国庆活动的顺利闭幕,Flink Forward 也照例在他们的发源地柏林举办了第五届大会.虽然还没有拿到具体的数据,不过从培训门票已经在会前销售一空的这样 ...

  6. 【MDX】一文带你搞懂SQL Server Analysis Services 的安装和使用

    目录 Step 1: Install developer and management tools 安装 new stand-alone SQL Server installation or add ...

  7. 一文带你入门Redis

    文章目录 1 课程安排 2 课程目标 3 redis介绍 3.1 什么是NoSQL 3.2 redis历史发展 3.3 什么是redis 3.4 redis的应用场景 4 测试环境 4.1 虚拟机 4 ...

  8. 一文带你入门图论和网络分析(附Python代码)

    作者:Srivatsa 翻译:和中华 校对:丁楠雅 本文约6300字,建议阅读20+分钟. 本文从图的概念以及历史讲起,并介绍了一些必备的术语,随后引入了networkx库,并以一个航班信息数据集为例 ...

  9. 超硬核 | 一文带你入门用户画像

    本文已收录github:https://github.com/BigDataScholar/TheKingOfBigData,里面有大数据高频考点,Java一线大厂面试题资源,上百本免费电子书籍,作者 ...

最新文章

  1. oracle创建表空间及用户,Oracle创建表空间和用户
  2. 理解 Delphi 的类(十) - 深入方法[17] - 提前声明
  3. java 获取系统时间 8小时 jre_Java获取时间与系统时间相差8小时终极解决方案
  4. vnc连接服务器怎么配置文件,vnc服务器和客户端怎么配置文件
  5. java pdf无法加载_java - 试图使用iText7合并来合并pdf,但是当我打开最终的合并pdf时,它说无法加载pdf文档 - SO中文参考 - www.soinside.com...
  6. android 动态创建view,react-native动态创建Android View 无效果
  7. JavaWeb——响应编码与请求编码
  8. Opencv之读取yuv420P
  9. 02 - Tomcat配置
  10. Mac OSX用终端检测文件的sha1值
  11. 数据库原理及应用教程 第四版|微课版答案 陈志泊主编
  12. MS-DOS虚拟机安装
  13. Hibernate 枚举类型@Enumerated(EnumType.STRING)的应用
  14. 为什么空集是集合的子集_空集是任何集合的子集对吗
  15. 【013】故宫博物院数字文物库-让文物随时可赏
  16. html响应式布局手机屏幕导航条,美图响应式布局导航条效果
  17. CSAPP-Lab03 Attack Lab 详细解析
  18. C++最小/最大(min;max;minmax;min_element;max_element;minmax_element)
  19. EXCEL中怎样提取部分特定的文本?
  20. 办公大师系列经典丛书 诚聘译者

热门文章

  1. 嵌入式软件测试ETest在机电综合管理系统中的应用
  2. 计算机管理系统在护理管理中的应用,信息管理系统在护理管理中的应用_39健康网...
  3. 支付宝对刷脸支付精心布局步步推进
  4. MyBatis主键回填和自定义主键
  5. JavaScript学习笔记(四)---闭包、递归、柯里化函数、继承、深浅拷贝、设计模式
  6. Netty入门-第一话【持续更新】
  7. 一号店(html)注册页面
  8. 07 flowable DMN结合bpmn简化流程
  9. NUC972裸机调试步骤
  10. 【智慧园区】利用BIM运营中国第一高楼,精细化管理更“现代”