实验目的

Kafka的数据能让Flink SQL Client读取到

本文是对[1]的详细记载

具体操作步骤

①啓動hadoop集羣,離開安全模式

②各个节点都关闭防火墙:

service firewalld status(查看防火墙状态)

service firewalld stop(关闭防火墙)

各个节点分别启动zookeeper

③启动kafka集群

startkafka

startkafka2

startkafka3

flink-connector-kafka_2.12-1.12.0.jar

flink-json-1.12.0.jar

flink-jdbc_2.12-1.10.2.jar

放入$FLINK_HOME/lib中

启动flink集群

操作 命令 备注
查看topic $KAFKA/bin/kafka-topics.sh --list --zookeeper Desktop:2181

如果想删除topic,可以是:

往 order_sql 这个 topic发送 json 消息 $KAFKA/bin/kafka-console-producer.sh --broker-list Desktop:9091 --topic order_sql

这里可能碰到[2]中的报错,注意检查命令中端口与配置文件server.properties中的listeners的端口严格保持一致

[2]中的报错还可能是某个节点的kafka挂掉导致的.

可能碰到[3]

注意关闭防火墙

使用kafka自带消费端测试下消费 $KAFKA/bin/kafka-console-consumer.sh --bootstrap-server Desktop:9091 --from-beginning --topic order_sql

如果kafka自带消费者测试有问题,那么就不用继续往下面做了,

此时如果使用Flink SQL Client来消费也必然会出现问题

清除topic中所有数据[6](因为,万一你输错了呢?对吧) $KAFKA/bin/kafka-topics.sh --zookeeper Desktop:2181 --delete --topic order_sql

需要$KAFKA/config/server.properties设置

delete.topic.enable=true

下面的需要手动输入(发送json消息,注意下面的信息千万不要一次性复制全部内容,必须一条一条手动拷贝)

{"order_id": "1","shop_id": "AF18","member_id": "3410211","trade_amt": "100.00","pay_time": "1556420980000"}
{"order_id": "2","shop_id": "AF20","member_id": "3410213","trade_amt": "130.00","pay_time": "1556421040000"}
{"order_id": "3","shop_id": "AF18","member_id": "3410212","trade_amt": "120.00","pay_time": "1556421100000"}
{"order_id": "4","shop_id": "AF19","member_id": "3410212","trade_amt": "100.00","pay_time": "1556421120000"}
{"order_id": "5","shop_id": "AF18","member_id": "3410211","trade_amt": "150.00","pay_time": "1556421480000"}
{"order_id": "6","shop_id": "AF18","member_id": "3410211","trade_amt": "110.00","pay_time": "1556421510000"}
{"order_id": "7","shop_id": "AF19","member_id": "3410213","trade_amt": "110.00","pay_time": "1556421570000"}
{"order_id": "8","shop_id": "AF20","member_id": "3410211","trade_amt": "100.00","pay_time": "1556421630000"}
{"order_id": "9","shop_id": "AF17","member_id": "3410212","trade_amt": "110.00","pay_time": "1556421655000"}

$FLINK_HOME/bin/sql-client.sh embedded -d $FLINK_HOME/conf/sql.my.yaml -l sql-libs/

注意这个实验涉及到两个.yaml文件,修改如下:

$FLINK_HOME/bin/sql-client.sh embedded -d $FLINK_HOME/conf/sql.my.yaml -l /home/appleyuchi/bigdata/flink-1.12/lib

⑦FLINK SQL执行

具体操作 具体FLINK SQL
显示orders内容 select * from orders;
1分钟固定窗口计算 SELECT
  shop_id
  , TUMBLE_START(payment_time, INTERVAL '1' MINUTE) AS tumble_start
  , TUMBLE_END(payment_time, INTERVAL '1' MINUTE)   AS tumble_end
  , sum(trade_amt)                             AS amt
FROM orders
GROUP BY shop_id, TUMBLE(payment_time, INTERVAL '1' MINUTE);

--------------------------------------------------------------------------------------------------------实验效果截图--------------------------------------------------------------------------------------------------------------------------------------------------------------------

解决方案:

$FLINK_HOME/conf/flink-conf.yaml中修改为:

classloader.resolve-order: parent-first

尽量集群中的每个节点都要修改

继续往下,碰到问题:

问了花名雪尽

根据提示:

放弃采用.yaml的方式,改用DDL方式(下面的直接拷贝到Flink SQL Client中然后按下回车即可):

CREATE TABLE orders (order_id BIGINT, member_id BIGINT, trade_amt DOUBLE, pay_time BIGINT,ts AS TO_TIMESTAMP(FROM_UNIXTIME(pay_time/ 1000, 'yyyy-MM-dd HH:mm:ss')), -- 定义事件时间WATERMARK FOR ts AS ts - INTERVAL '5' SECOND   -- 在ts上定义5 秒延迟的 watermark
) WITH ('connector.type' = 'kafka','connector.version' = 'universal','connector.topic' = 'order_sql','connector.startup-mode' = 'earliest-offset','connector.properties.group.id' = 'testGroup','connector.properties.zookeeper.connect' = 'Desktop:2181,Laptop:2181,Laptop:2183','connector.properties.bootstrap.servers' = 'Desktop:9091','format.type' = 'json'
);

操作如下:

实验效果如下:

----------------------------------------------------------------------------附录(其他可能用到的常规操作)-----------------------------------------------------------------------------------------------------------------------------------------

删除topic:

$KAFKA/bin/kafka-topics.sh --delete --zookeeper Desktop:2181 --topic order_sql

或者:

deleteall /brokers/topics/order_sql
deleteall /config/topics/order_sql
deleteall /admin/delete_topics/order_sql

根据官方文档[4]用的是0.11的kafka,可能是版本存在不兼容的问题

Reference:

[1]Flink SQL-Client 的使用

[2]Kafka连接服务器出现:Connection to node 1 (localhost/127.0.0.1:9092) could not be established.

[3]kafka出现Unable to read additional data from server sessionid 0x0, likely server has closed socket

[4]SQL Client

[5]Flink 1.9 实战:使用 SQL 读取 Kafka 并写入 MySQL

[6]Is there a way to delete all the data from a topic or delete the topic before every run?

flink sql client讀取kafka數據的timestamp(DDL方式)相关推荐

  1. Flink SQL Client讀取csv中的數據(轉載+總結)

    根據官方文檔[2] Flink SQL啓動方式 啓動命令 (1)starting an embedded standalone process $FLINK_HOME/bin/sql-client.s ...

  2. Flink SQL Client进行Kafka事实表与Hbase维度表Join(纯DDL/SQL方式)

    概述: 對參考鏈接[1]進行DDL上的復現. 一些基本的業務常识   來源載體 數據特點 維表 Mysql/Csv/Hbase 很少變化 事實表 Kafka 不停變化 开发环境与准备工作 组件 版本 ...

  3. Flink SQL Client读Kafka+流计算(DDL方式+代碼嵌入DDL/SQL方式)

    #################################################################################################### ...

  4. Flink SQL Client实现CDC实验

    概述 本文主要是對[7]中內容的復現 环境 组件 版本 Flink(HA) 1.12 Zookeeper 3.6.0 flink-sql-connector-mysql-cdc 1.1.1 Mysql ...

  5. Flink SQL Client注册SCALA UDF完整流程

    UDF的完整maven工程與SQL https://github.com/appleyuchi/Flink_SQL_Client_UDF 完整操作步骤 ①mvn scala:compile packa ...

  6. Flink SQL Client注册JAVA UDF完整流程

    概述 听大佬说[1]里面有flink sql client注册udf的方法 去看了一眼,全是文字,闹心,索性琢磨了一下,记录下来. UDF的完整maven工程 https://github.com/a ...

  7. Flink SQL Client方言切换与datagen->Hive(DDL形式+streaming形式)

    概述 本文是对[1]的完整复现,补充了[1]中缺失的大量细节. 切换方言 切换目标 Flink SQL Client命令 切换为hive SET table.sql-dialect=hive; 切换为 ...

  8. flink sql client读取hive时卡住

    问题复现如下: 查看$FLINK_HOME/log/flink-appleyuchi-sql-client-Desktop.log 2020-12-23 11:48:56,811 INFO  org. ...

  9. 【阿里云EMR实战篇】以EMR测试集群版本为例,详解 Flink SQL Client 集成 Hive 使用步骤

    简介: 以测试集群版本为例(EMR-4.4.1)-- Flink SQL Client 集成 Hive 使用文档 作者:林志成,阿里云EMR产品团队技术支持,拥有多年开源大数据经验 1.以测试集群版本 ...

最新文章

  1. Python之路--前端知识--JavaScript
  2. 实现线程栈初始化(RTT)
  3. 计算机事业单位专技岗考什么区别,事业单位管理岗和专技岗的区别(从待遇等角度)...
  4. leetcode18. 四数之和(双指针)
  5. redis系列:通过队列案例学习list命令 1
  6. template might not exist or might not be accessible by any of the configured
  7. 【工具大道】UML的点点滴滴
  8. PHP execl导出/展示
  9. 2019.8.8 2048小游戏
  10. shell脚本样本_Shell脚本
  11. vb6如何判断文件是否存在_使用boost.filesystem检查文件是否存在的正确姿势
  12. 限制页面被pc端访问
  13. [转]Android--多线程之Handler
  14. String.fromCharCode()函数
  15. 泰山JDK8升级u302,找到了更好的整合mips办法
  16. 美团/饿了么外卖红包小程序源码
  17. 小幅震荡市场下的期权投资策略举例
  18. 乐动ld06激光雷达sdk改bug记录分享
  19. 时尚回馈:店铺以帮助飓风桑迪赈灾基金
  20. vs code快速复制一行代码 快捷键

热门文章

  1. css垂直居中技巧总结
  2. 洛谷 P1019 单词接龙
  3. [转] 深入浅出 妙用Javascript中apply、call、bind
  4. PIL中分离通道发生“AttributeError: 'NoneType' object has no attribute 'bands'”
  5. 单例模式的七种写法(转)
  6. V4L2学习 二 ----视频打开与保存简单流程
  7. .net core 不启用 https_.NET 应用如何优雅的做功能开关(Feature Flag)
  8. 对dropout的理解详细版
  9. actions相互调用并且存在顺序
  10. 使用vue-router设置每个页面的title