###################################################################################################################

目的

本文是对参考文献[1]在高版本上的的复现

###################################################################################################################

环境与配置

组件 版本
Flink 1.12
Hive 3.1.2
mysql 8.0.22-0ubuntu0.20.04.2
Zookeeper 3.6.0
Hadoop 3.1.2
Ubuntu 20.04

###################################################################################################################

步驟

service firewalld stop(关闭防火墙)

啓動hadoop

離開安全模式

啓動zookeeper與kafka集羣

启动flink集群

 该实验不需要额外的.yaml文件的配置,采用的是DDL方式

操作 命令 备注
查看topic $KAFKA/bin/kafka-topics.sh --list --zookeeper Desktop:2181

如果想删除topic,可以是:

my_topic 这个 topic发送 json 消息 $KAFKA/bin/kafka-console-producer.sh --broker-list Desktop:9091 --topic my_topic

这里可能碰到[2]中的报错,注意检查命令中端口与配置文件server.properties中的listeners的端口严格保持一致

[2]中的报错还可能是某个节点的kafka挂掉导致的.

可能碰到[3]

注意关闭防火墙

使用kafka自带消费端测试下消费 $KAFKA/bin/kafka-console-consumer.sh --bootstrap-server Desktop:9091 --from-beginning --topic my_topic

如果kafka自带消费者测试有问题,那么就不用继续往下面做了,

此时如果使用Flink SQL Client来消费也必然会出现问题

清除topic中所有数据[6]

(因为,万一你输错了呢?对吧)

$KAFKA/bin/kafka-topics.sh --zookeeper Desktop:2181 --delete --topic my_topic

需要$KAFKA/config/server.properties设置

delete.topic.enable=true

kafka生產端輸入的數據:

{"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "662867", "item_id":"2244074", "category_id": "1575622", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "662868", "item_id":"1784", "category_id": "54123654", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "662854", "item_id":"1456", "category_id": "12345678", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "662858", "item_id":"1457", "category_id": "12345679", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}

###################################################################################################################################################

SQL Client+DDL方式-实验结果

  DDL/SQL 实验效果
建立表(对接kafka) CREATE TABLE user_log1 (
    user_id VARCHAR,
    item_id VARCHAR,
    category_id VARCHAR,
    behavior VARCHAR,
    ts VARCHAR
) WITH (
    'connector.type' = 'kafka',
    'connector.version' = 'universal',
    'connector.topic' = 'my_topic',
    'connector.startup-mode' = 'earliest-offset',
    'connector.properties.group.id' = 'testGroup',
    'connector.properties.zookeeper.connect' = 'Desktop:2181,Laptop:2181,Laptop:2183',
    'connector.properties.bootstrap.servers' = 'Desktop:9091',
    'format.type' = 'json'
);
流计算 select item_id,count(*) from user_log1 group by item_id;

###################################################################################################################################################

Maven工程中嵌入DDL方式

代碼方式參考[2],自己運行通過的代碼如下:

https://gitee.com/appleyuchi/Flink_Code/blob/master/FLINK读写各种数据源/Java/src/main/java/KafkaFlinkDDL.java

Reference:

[1]Flink通过SQLClinet创建kafka源表并进行实时计算

[2]Flink通过SQLClinet/Java代码创建kafka源表,指定Offset消费,并进行实时计算,最后sink到mysql表中

Flink SQL Client读Kafka+流计算(DDL方式+代碼嵌入DDL/SQL方式)相关推荐

  1. Flink SQL Client进行Kafka事实表与Hbase维度表Join(纯DDL/SQL方式)

    概述: 對參考鏈接[1]進行DDL上的復現. 一些基本的業務常识   來源載體 數據特點 維表 Mysql/Csv/Hbase 很少變化 事實表 Kafka 不停變化 开发环境与准备工作 组件 版本 ...

  2. Flink(二十三)—— 流计算框架 Flink 与 Storm 的性能对比

    1. 背景 Apache Flink 和 Apache Storm 是当前业界广泛使用的两个分布式实时计算框架.其中 Apache Storm(以下简称"Storm")在美团点评实 ...

  3. 基于Apache Flink的爱奇艺实时计算平台建设实践

    导读:随着大数据的快速发展,行业大数据服务越来越重要.同时,对大数据实时计算的要求也越来越高.今天会和大家分享下爱奇艺基于Apache Flink的实时计算平台建设实践. 今天的介绍会围绕下面三点展开 ...

  4. Flink流计算引擎

    伴随着海量增长的数据,数字化时代的未来感扑面而至.不论是结绳记事的小数据时代,还是我们正在经历的大数据时代,计算的边界正在被无限拓宽,而数据的价值再也难以被计算.时下,谈及大数据,不得不提到热门的下一 ...

  5. 11 Confluent_Kafka权威指南 第十一章:流计算

    文章目录 CHAPTER 10 Stream Processing 流式计算 What Is Stream Processing? 流处理是什么 Stream-Processing Concepts ...

  6. 特来电监控引擎流计算应用实践

    随着云计算的深入落地,大数据技术有了坚实的底层支撑,不断向前发展并日趋成熟,无论是传统企业还是互联网公司,都不再满足于离线批处理计算,而是更倾向于应用实时流计算,要想在残酷的企业竞争中立于不败之地,企 ...

  7. 解读 2018:13 家开源框架谁能统一流计算?

    2018 年接近尾声,I我策划了"解读 2018"年终技术盘点系列文章,希望能够给读者清晰地梳理出重要技术领域在这一年来的发展和变化.本文是实时流计算 2018 年终盘点,作者对实 ...

  8. 解读2018:13家开源框架谁能统一流计算?

    2018年接近尾声,InfoQ策划了"解读 2018"年终技术盘点系列文章,希望能够给读者清晰地梳理出重要技术领域在这一年来的发展和变化.本文是实时流计算2018年终盘点,作者对实 ...

  9. 大数据实时流计算详解

    开篇词-攻克实时流计算难点,掌握大数据未来 我曾任职于华为 2012 实验室高斯部门,负责实时分析型内存数据库 RTANA.华为公有云 RDS 服务的研发工作.目前,我专注于移动反欺诈解决方案的研发. ...

最新文章

  1. WinForm开发框架资料积累
  2. Java 强引用与软引用以及弱引用,虚引用
  3. 华大 MCU 之三 时钟控制器(CMU)配置记录
  4. 【项目记录】用vue-h5写可前后端分离和控制计时的物联网移动端app
  5. Sql添加Oracle数据库的表空间和用户
  6. 动态代理Java实现
  7. 公司java框架让程序员变笨_框架会使程序员变笨吗?
  8. Codeforces 963B Destruction of a Tree 【贪心】
  9. SpringBoot实战教程(6)| 整合Druid
  10. 马化腾朋友圈晒微信支付分:835;爱奇艺回应用户隐私话题;Firefox 77.0 发布| 极客头条...
  11. SVN 与 CVS 在【版本管理】上的区别~
  12. Python对图像进行二维Gabor滤波加速
  13. 苹方字体 for linux,使用macOS苹方替换Windows 10微软雅黑
  14. 给小学生上计算机语言课,[程序设计]为了下一代,大家讨论一下小学生该学什么编程语言。...
  15. NKOJ 4234 三角分形
  16. HDU 6318 Swaps and inversions
  17. 2018.06~7 阅读随笔
  18. 【HCIE备考笔记】TAC报告总结
  19. 使用隐马尔科夫模型实现分词
  20. Spark GraphX下强连通子图和社团发现算法在1T TPC-DS数据集下执行方法、优化和性能估算

热门文章

  1. Linux常用命令: zip、unzip 压缩和解压缩命令
  2. android,面向对象
  3. python学习之路-第七天-python面向对象编程简介
  4. Java 字符串操作的总结1(转载)
  5. 《Java程序员面试宝典》读书笔记1
  6. objective-c 编程总结(第六篇)运行时操作 - 方法交换
  7. AS3 in FlashDevelop
  8. apue对java_[apue] 一个快速确定新系统上各类限制值的工具
  9. mysql同步表到本地_sql 同步远程数据库(表)到本地
  10. react hook问题讲解