目录

  • 环境
  • 下载相关jar包
  • 启动flink集群与flink sql
  • 创建数据库并选择
  • 创建flink sql表结构
  • 查询结果

环境

flink 1.11版本
Mac系统

下载相关jar包

flink sql读取kafka需要下载相关的kafka依赖包,放到本地的lib目录下,选择下面这个:

cd /usr/local/Cellar/apache-flink/1.11.2/libexec/lib
cp ~/Downloads/flink-sql-connector-kafka_2.11-1.11.2.jar .

启动flink集群与flink sql

启动flink集群

/usr/local/Cellar/apache-flink/1.11.2/libexec/bin/start-cluster.sh

启动flink sql

/usr/local/Cellar/apache-flink/1.11.2/libexec/bin/sql-client.sh embedded

CLI 为维护和可视化结果提供三种模式。其中,Tableau模式(tableau mode)更接近传统的数据库,会将执行的结果以制表的形式直接打在屏幕之上。使用 CTRL+C 按键,这个会停掉作业同时停止屏幕上的打印。设置Tableau模式:

SET execution.result-mode=tableau;

创建数据库并选择

create database if not exists river_test;
use river_test;

创建flink sql表结构

CREATE TABLE kafkaTable13 (item_id BIGINT,source_type BIGINT,title STRING,white_image STRING,coupon_name STRING,schema STRING
) WITH ('connector' = 'kafka','topic' = 'test','properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'testGroup','format' = 'json','scan.startup.mode' = 'latest-offset'
);

相关连接选项:

Option Required Default Description
connector 必需 kafka
topic 必需 topic名
properties.bootstrap.servers 必需 kafka连接地址
properties.group.id 必需 groupid
format 必需 ‘csv’, ‘json’, ‘avro’, ‘debezium-json’ and 'canal-json’等
scan.startup.mode 非必需 group-offsets ‘earliest-offset’, ‘latest-offset’, ‘group-offsets’, ‘timestamp’ 或者 ‘specific-offsets’
scan.startup.specific-offsets 非必需 设置消费模式为:'specific-offsets’时,需写出消费offset,例:‘partition:0,offset:42;partition:1,offset:300’
scan.startup.timestamp-millis 非必需 设置消费模式为:'timestamp’时,指定时间
sink.partitioner 非必需

查询结果

select * from kafkaTable13;


通过相关flink UI界面,查看任务:

flink sql读取kafka-入门级相关推荐

  1. Flink 1.9 实战:使用 SQL 读取 Kafka 并写入 MySQL

    上周六在深圳分享了<Flink SQL 1.9.0 技术内幕和最佳实践>,会后许多小伙伴对最后演示环节的 Demo 代码非常感兴趣,迫不及待地想尝试下,所以写了这篇文章分享下这份代码.希望 ...

  2. flink DDL读取kafka数据-Scala嵌入DDL形式

    步驟: service firewalld stop(关闭防火墙) 啓動hadoop 離開安全模式 啓動zookeeper與kafka集羣 操作 命令 备注 查看topic $KAFKA/bin/ka ...

  3. goland sql 脚本运行_Flink 1.9 实战:使用 SQL 读取 Kafka 并写入 MySQL

    SqlSubmit 的实现 笔者一开始是想用 SQL Client 来贯穿整个演示环节,但可惜 1.9 版本 SQL CLI 还不支持处理 CREATE TABLE 语句.所以笔者就只好自己写了个简单 ...

  4. flink sql 连接kafka avro序列化异常 Failed to deserialize Avro record ArrayIndexOutOfBoundsException

    在使用flinksql 解析复杂avro格式时候,遇到ArrayIndexOutOfBoundsException报错.问题的原因是所有非空字段都需要显示 not null,注意是每个层级的字段. 2 ...

  5. Flink 分别读取kafka和mysql作为source

    需求 首先从kafka中读取数据,然后从mysql中读取数据,然后将这两个数据进行合并处理. 环境 Flink 1.8.2 实现 public static void main(String[] ar ...

  6. fink sql 读取 kafka 的数据写到 kafka

    版本: flink 1.12  平台:streamX -- source CREATE TABLE `stg_access_its_rt_kafka` (`message` STRING COMMEN ...

  7. flink sql实战案例

    目录 一.背景 二.流程 三.案例 1.flink sql读取 Kafka 并写入 MySQL source sink insert 2.flinksql读kafka写入kudu source sin ...

  8. flink sql client讀取kafka數據的timestamp(DDL方式)

    实验目的 Kafka的数据能让Flink SQL Client读取到 本文是对[1]的详细记载 具体操作步骤 ①啓動hadoop集羣,離開安全模式 ②各个节点都关闭防火墙: service firew ...

  9. 【Flink 实战系列】Flink SQL 使用 filesystem connector 同步 Kafka 数据到 HDFS(parquet 格式 + snappy 压缩)

    Flink SQL 同步 Kafka 数据到 HDFS(parquet + snappy) 在上一篇文章中,我们用 datastream API 实现了从 Kafka 读取数据写到 HDFS 并且用 ...

  10. Flink SQL Client进行Kafka事实表与Hbase维度表Join(纯DDL/SQL方式)

    概述: 對參考鏈接[1]進行DDL上的復現. 一些基本的業務常识   來源載體 數據特點 維表 Mysql/Csv/Hbase 很少變化 事實表 Kafka 不停變化 开发环境与准备工作 组件 版本 ...

最新文章

  1. GitHub日收12000星,微软新命令行工具引爆程序员圈!
  2. Google Chrome调试js入门
  3. 图论(十)最小生成树-Prim算法
  4. 解决append的div的事件失效问题
  5. 【瞎搞】 Codeforces Round 276 DIV 2 C.Bits
  6. CUBA 7的新功能
  7. java构建工具 gradle_Java构建工具
  8. 四格漫画《MUXing》——请客记
  9. python中数据处理的格式,json.csv txt excel
  10. 不同语言实现两个变量的交换 Python之禅
  11. sql盲注 解决_解决SQL盲注和跨站脚本攻击
  12. 【有利可图网】推荐!设计师必备配色宝典!
  13. rhadoop之mapreduce函数
  14. Postman Sandbox
  15. 数学建模【系统评价决策模型(概论、案例分析-汽车选购、层次分析法、案例分析-职员晋升、动态加权综合评价法、案例分析-大气污染问题)】
  16. 计算机word保存如何操作,电脑word怎么保存 在电脑上word打字怎么保存
  17. Redis7.0的安装步骤
  18. 【国庆献礼】浅谈国产数据库
  19. python 实现差商
  20. 美团笔试题:股票交易日

热门文章

  1. 计算机键盘设置功能键取消,键盘insert操作怎么取消?电脑键盘insert操作取消教程...
  2. sqlserver战德臣_SQLSERVER补位示例
  3. 一文详解空洞卷积(Atrous Convolution)
  4. 2022-11-18 mysql列存储引擎-assert failed on i < m_idx.size() at rc_attr.h:342-问题分析
  5. 为公司添加以网站作为邮箱后缀的企业邮箱
  6. noip2011 观光公交 (贪心)
  7. bootstrap登录模板
  8. dede标签调用大全
  9. UE4对接腾讯GME语音服务(实时语音一)
  10. 从“七宗罪”角度,看互联网产品与人性的深沉纠缠