flink DDL读取kafka数据-Scala嵌入DDL形式
步驟:
service firewalld stop(关闭防火墙)
啓動hadoop
離開安全模式
啓動zookeeper與kafka集羣
操作 | 命令 | 备注 |
查看topic | $KAFKA/bin/kafka-topics.sh --list --zookeeper Desktop:2181 |
无 |
往kafka_ddl 这个 topic发送 json 消息 | $KAFKA/bin/kafka-console-producer.sh --broker-list Desktop:9091 --topic kafka_ddl |
这里可能碰到[2]中的报错,注意检查命令中端口与配置文件server.properties中的listeners的端口严格保持一致 [2]中的报错还可能是某个节点的kafka挂掉导致的. 可能碰到[3] 注意关闭防火墙 |
使用kafka自带消费端测试下消费 | $KAFKA/bin/kafka-console-consumer.sh --bootstrap-server Desktop:9091 --from-beginning --topic kafka_ddl |
如果kafka自带消费者测试有问题,那么就不用继续往下面做了, 此时如果使用Flink SQL Client来消费也必然会出现问题 |
清除topic中所有数据[6](因为,万一你输错了呢?对吧) | $KAFKA/bin/kafka-topics.sh --zookeeper Desktop:2181 --delete --topic kafka_ddl |
需要$KAFKA/config/server.properties设置 delete.topic.enable=true |
kafka生產端輸入的數據:
{"name":"apple1","age":"18","city":"NingBo","address":"100.00","ts":"1556420980000"}
{"name":"apple2","age":"20","city":"JiaXing","address":"130.00","ts":"1556421040000"}
{"name":"apple3","age":"18","city":"JiangXi","address":"120.00","ts":"1556421100000"}
{"name":"apple4","age":"19","city":"JiangXi","address":"100.00","ts":"1556421120000"}
{"name":"apple5","age":"18","city":"NingBo","address":"150.00","ts":"1556421480000"}
{"name":"apple6","age":"18","city":"NingBo","address":"110.00","ts":"1556421510000"}
{"name":"apple7","age":"19","city":"JiaXing","address":"110.00","ts":"1556421570000"}
{"name":"apple8","age":"20","city":"NingBo","address":"100.00","ts":"1556421630000"}
{"name":"apple9","age":"17","city":"JiangXi","address":"110.00","ts":"1556421655000"}
对应的DDL是
CREATE TABLE PERSON (name VARCHAR COMMENT '姓名',age VARCHAR COMMENT '年龄',city VARCHAR COMMENT '所在城市',address VARCHAR COMMENT '家庭住址',ts BIGINT COMMENT '时间戳',pay_time AS TO_TIMESTAMP(FROM_UNIXTIME(ts/1000, 'yyyy-MM-dd HH:mm:ss')), -- 定义事件时间WATERMARK FOR pay_time AS pay_time - INTERVAL '0' SECOND)WITH ('connector.type' = 'kafka', -- 使用 kafka connector'connector.version' = 'universal', -- kafka 版本'connector.topic' = 'kafka_ddl', -- kafka topic'connector.startup-mode' = 'earliest-offset', -- 从最早的 offset 开始读取'connector.properties.0.key' = 'zookeeper.connect', -- 连接信息'connector.properties.0.value' = 'Desktop:2181','connector.properties.1.key' = 'bootstrap.servers','connector.properties.1.value' = 'Desktop:9091','update-mode' = 'append','format.type' = 'json', -- 数据源格式为 json'format.derive-schema' = 'true' -- 从 DDL schema 确定 json 解析规则)
完整工程如下:
https://gitee.com/appleyuchi/Flink_Code/blob/master/flink读kafka/Scala/src/main/scala/FlinkKafkaDDLDemo.scala
本文内容已经绝大部分涵盖[7],不必再重复阅读[7]
Reference:
[1]FlinkSQL使用DDL语句创建kafka源表
[2]Kafka连接服务器出现:Connection to node 1 (localhost/127.0.0.1:9092) could not be established.
[3]Flink SQL深度篇
[6]Is there a way to delete all the data from a topic or delete the topic before every run?
[7]Flink 1.10.0 SQL DDL中如何定义watermark和计算列
flink DDL读取kafka数据-Scala嵌入DDL形式相关推荐
- kafka学习_《从0到1学习Flink》—— Flink 读取 Kafka 数据写入到 RabbitMQ
前言 之前有文章 <从0到1学习Flink>-- Flink 写入数据到 Kafka 写过 Flink 将处理后的数据后发到 Kafka 消息队列中去,当然我们常用的消息队列可不止这一种, ...
- 《从0到1学习Flink》—— Flink 读取 Kafka 数据批量写入到 MySQL
<!-- more --> 前言 之前其实在 <从0到1学习Flink>-- 如何自定义 Data Sink ? 文章中其实已经写了点将数据写入到 MySQL,但是一些配置化的 ...
- 写入mysql_《从0到1学习Flink》—— Flink 读取 Kafka 数据批量写入到 MySQL
前言 之前其实在 <从0到1学习Flink>-- 如何自定义 Data Sink ? 文章中其实已经写了点将数据写入到 MySQL,但是一些配置化的东西当时是写死的,不能够通用,最近知识星 ...
- 【腾讯轻量应用服务器上部署kafka并通过flink读取kafka数据】
环境准备 经过1个月的摸索,最终选择在腾讯云上搭建一个学习环境.当时选择原因还是新用户有优惠(150左右3年),但现在看1核2g的配置勉强够用,建议后续小伙伴选择时最好是2核4g配置. 由于是单节点安 ...
- Spark Streaming读取Kafka数据的两种方式
Kafka在0.8和0.10之间引入了一种新的消费者API,因此,Spark Streaming与Kafka集成,有两种包可以选择: spark-streaming-kafka-0-8与spark-s ...
- java读写德卡数据_Spark Streaming 读取Kafka数据写入ES
简介: 目前项目中已有多个渠道到Kafka的数据处理,本文主要记录通过Spark Streaming 读取Kafka中的数据,写入到Elasticsearch,达到一个实时(严格来说,是近实时,刷新时 ...
- flink定时读取mysql数据_flink时间系统系列之实例讲解:如何做定时输出
flink时间系统系列篇幅目录: 六.实例讲解:如何做定时输出 今天为大家带来flink时间系统系列最后一篇实战篇,同样也是查漏补缺篇:如何做定时输出,首先说一下定时输出的需求背景,在flink流处理 ...
- Flink 分别读取kafka和mysql作为source
需求 首先从kafka中读取数据,然后从mysql中读取数据,然后将这两个数据进行合并处理. 环境 Flink 1.8.2 实现 public static void main(String[] ar ...
- Flink1.11 读取kafka数据写入hive,未完待续
昨天晚上Flink1.11出了,这次改动很多,我只关心hive这一部分. 目前尝试了几个小时用代码读取hive,安装官网的文档,没成功,先蹭个热点,记录下. 先贴一下依赖吧: 注意:反正各种报错,看社 ...
最新文章
- 几条曲线构建Android表白程序
- 日志审计产品(转载)
- Android错误-error:Foundtextwhereitemtagisexpected
- 图像分割过分割和欠分割_使用图割的图像分割
- go语言实现zip压缩与解压
- Android开发笔记(四十六)手机相关事件
- 树莓派安装qq linux,2019年树莓派运行新版原生LinuxQQ?【已更新】
- Segue区别及视图的切换
- unix源码分析_UNIX网络分析
- 仿苹果 底部弹窗 选择列表
- oauth2生成jwt令牌
- 7.ROS编程学习:自定义服务数据c++调用
- MYSQL 金额转大写中文
- Excel 2010 VBA 入门 037 获取最后一行数据的行数
- 怎么读取照片内的文字
- 微信小程序开发框架搭建
- 服务器系统盘40g是什么,云服务器 40g系统盘
- Spring整合Quartz集群部署
- 向苹果提交bug时的糟心经历
- 计算机或设备将不接受连接 解决办法
热门文章
- ES6 必须要用的数组Filter() 方法,不要再自己循环遍历了!!!
- Python+Selenium+PIL+Tesseract真正自动识别验证码进行一键登录
- 文件夹操作之判断是否存在(Directory)
- Oracle_双机备份_资料
- mysql插入日期 vs oracle插入日期
- HTTP调试工具:Fiddler 介绍二
- MOCTF-Web-文件包含
- 电源pd功能的充电器_PD快充充电器电源芯片和普通充电器电源芯片有什么区别?...
- java swing 导出文件_java swing (一) 导出excel文件并打开
- 正则表达式获取一个文本域中每一行的值并且去掉前后空格