flink sql读取kafka-入门级
目录
- 环境
- 下载相关jar包
- 启动flink集群与flink sql
- 创建数据库并选择
- 创建flink sql表结构
- 查询结果
环境
flink 1.11版本
Mac系统
下载相关jar包
flink sql读取kafka需要下载相关的kafka依赖包,放到本地的lib目录下,选择下面这个:
cd /usr/local/Cellar/apache-flink/1.11.2/libexec/lib
cp ~/Downloads/flink-sql-connector-kafka_2.11-1.11.2.jar .
启动flink集群与flink sql
启动flink集群
/usr/local/Cellar/apache-flink/1.11.2/libexec/bin/start-cluster.sh
启动flink sql
/usr/local/Cellar/apache-flink/1.11.2/libexec/bin/sql-client.sh embedded
CLI 为维护和可视化结果提供三种模式。其中,Tableau模式(tableau mode)更接近传统的数据库,会将执行的结果以制表的形式直接打在屏幕之上。使用 CTRL+C 按键,这个会停掉作业同时停止屏幕上的打印。设置Tableau模式:
SET execution.result-mode=tableau;
创建数据库并选择
create database if not exists river_test;
use river_test;
创建flink sql表结构
CREATE TABLE kafkaTable13 (item_id BIGINT,source_type BIGINT,title STRING,white_image STRING,coupon_name STRING,schema STRING
) WITH ('connector' = 'kafka','topic' = 'test','properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'testGroup','format' = 'json','scan.startup.mode' = 'latest-offset'
);
相关连接选项:
Option | Required | Default | Description |
---|---|---|---|
connector | 必需 | 空 | kafka |
topic | 必需 | 空 | topic名 |
properties.bootstrap.servers | 必需 | 空 | kafka连接地址 |
properties.group.id | 必需 | 空 | groupid |
format | 必需 | 空 | ‘csv’, ‘json’, ‘avro’, ‘debezium-json’ and 'canal-json’等 |
scan.startup.mode | 非必需 | group-offsets | ‘earliest-offset’, ‘latest-offset’, ‘group-offsets’, ‘timestamp’ 或者 ‘specific-offsets’ |
scan.startup.specific-offsets | 非必需 | 空 | 设置消费模式为:'specific-offsets’时,需写出消费offset,例:‘partition:0,offset:42;partition:1,offset:300’ |
scan.startup.timestamp-millis | 非必需 | 空 | 设置消费模式为:'timestamp’时,指定时间 |
sink.partitioner | 非必需 | 空 |
查询结果
select * from kafkaTable13;
通过相关flink UI界面,查看任务:
flink sql读取kafka-入门级相关推荐
- Flink 1.9 实战:使用 SQL 读取 Kafka 并写入 MySQL
上周六在深圳分享了<Flink SQL 1.9.0 技术内幕和最佳实践>,会后许多小伙伴对最后演示环节的 Demo 代码非常感兴趣,迫不及待地想尝试下,所以写了这篇文章分享下这份代码.希望 ...
- flink DDL读取kafka数据-Scala嵌入DDL形式
步驟: service firewalld stop(关闭防火墙) 啓動hadoop 離開安全模式 啓動zookeeper與kafka集羣 操作 命令 备注 查看topic $KAFKA/bin/ka ...
- goland sql 脚本运行_Flink 1.9 实战:使用 SQL 读取 Kafka 并写入 MySQL
SqlSubmit 的实现 笔者一开始是想用 SQL Client 来贯穿整个演示环节,但可惜 1.9 版本 SQL CLI 还不支持处理 CREATE TABLE 语句.所以笔者就只好自己写了个简单 ...
- flink sql 连接kafka avro序列化异常 Failed to deserialize Avro record ArrayIndexOutOfBoundsException
在使用flinksql 解析复杂avro格式时候,遇到ArrayIndexOutOfBoundsException报错.问题的原因是所有非空字段都需要显示 not null,注意是每个层级的字段. 2 ...
- Flink 分别读取kafka和mysql作为source
需求 首先从kafka中读取数据,然后从mysql中读取数据,然后将这两个数据进行合并处理. 环境 Flink 1.8.2 实现 public static void main(String[] ar ...
- fink sql 读取 kafka 的数据写到 kafka
版本: flink 1.12 平台:streamX -- source CREATE TABLE `stg_access_its_rt_kafka` (`message` STRING COMMEN ...
- flink sql实战案例
目录 一.背景 二.流程 三.案例 1.flink sql读取 Kafka 并写入 MySQL source sink insert 2.flinksql读kafka写入kudu source sin ...
- flink sql client讀取kafka數據的timestamp(DDL方式)
实验目的 Kafka的数据能让Flink SQL Client读取到 本文是对[1]的详细记载 具体操作步骤 ①啓動hadoop集羣,離開安全模式 ②各个节点都关闭防火墙: service firew ...
- 【Flink 实战系列】Flink SQL 使用 filesystem connector 同步 Kafka 数据到 HDFS(parquet 格式 + snappy 压缩)
Flink SQL 同步 Kafka 数据到 HDFS(parquet + snappy) 在上一篇文章中,我们用 datastream API 实现了从 Kafka 读取数据写到 HDFS 并且用 ...
- Flink SQL Client进行Kafka事实表与Hbase维度表Join(纯DDL/SQL方式)
概述: 對參考鏈接[1]進行DDL上的復現. 一些基本的業務常识 來源載體 數據特點 維表 Mysql/Csv/Hbase 很少變化 事實表 Kafka 不停變化 开发环境与准备工作 组件 版本 ...
最新文章
- GitHub日收12000星,微软新命令行工具引爆程序员圈!
- Google Chrome调试js入门
- 图论(十)最小生成树-Prim算法
- 解决append的div的事件失效问题
- 【瞎搞】 Codeforces Round 276 DIV 2 C.Bits
- CUBA 7的新功能
- java构建工具 gradle_Java构建工具
- 四格漫画《MUXing》——请客记
- python中数据处理的格式,json.csv txt excel
- 不同语言实现两个变量的交换 Python之禅
- sql盲注 解决_解决SQL盲注和跨站脚本攻击
- 【有利可图网】推荐!设计师必备配色宝典!
- rhadoop之mapreduce函数
- Postman Sandbox
- 数学建模【系统评价决策模型(概论、案例分析-汽车选购、层次分析法、案例分析-职员晋升、动态加权综合评价法、案例分析-大气污染问题)】
- 计算机word保存如何操作,电脑word怎么保存 在电脑上word打字怎么保存
- Redis7.0的安装步骤
- 【国庆献礼】浅谈国产数据库
- python 实现差商
- 美团笔试题:股票交易日
热门文章
- 计算机键盘设置功能键取消,键盘insert操作怎么取消?电脑键盘insert操作取消教程...
- sqlserver战德臣_SQLSERVER补位示例
- 一文详解空洞卷积(Atrous Convolution)
- 2022-11-18 mysql列存储引擎-assert failed on i < m_idx.size() at rc_attr.h:342-问题分析
- 为公司添加以网站作为邮箱后缀的企业邮箱
- noip2011 观光公交 (贪心)
- bootstrap登录模板
- dede标签调用大全
- UE4对接腾讯GME语音服务(实时语音一)
- 从“七宗罪”角度,看互联网产品与人性的深沉纠缠