步驟:

service firewalld stop(关闭防火墙)

啓動hadoop

離開安全模式

啓動zookeeper與kafka集羣

操作 命令 备注
查看topic $KAFKA/bin/kafka-topics.sh --list --zookeeper Desktop:2181

往kafka_ddl 这个 topic发送 json 消息 $KAFKA/bin/kafka-console-producer.sh --broker-list Desktop:9091 --topic kafka_ddl

这里可能碰到[2]中的报错,注意检查命令中端口与配置文件server.properties中的listeners的端口严格保持一致

[2]中的报错还可能是某个节点的kafka挂掉导致的.

可能碰到[3]

注意关闭防火墙

使用kafka自带消费端测试下消费 $KAFKA/bin/kafka-console-consumer.sh --bootstrap-server Desktop:9091 --from-beginning --topic kafka_ddl

如果kafka自带消费者测试有问题,那么就不用继续往下面做了,

此时如果使用Flink SQL Client来消费也必然会出现问题

清除topic中所有数据[6](因为,万一你输错了呢?对吧) $KAFKA/bin/kafka-topics.sh --zookeeper Desktop:2181 --delete --topic kafka_ddl

需要$KAFKA/config/server.properties设置

delete.topic.enable=true

kafka生產端輸入的數據:

{"name":"apple1","age":"18","city":"NingBo","address":"100.00","ts":"1556420980000"}
{"name":"apple2","age":"20","city":"JiaXing","address":"130.00","ts":"1556421040000"}
{"name":"apple3","age":"18","city":"JiangXi","address":"120.00","ts":"1556421100000"}
{"name":"apple4","age":"19","city":"JiangXi","address":"100.00","ts":"1556421120000"}
{"name":"apple5","age":"18","city":"NingBo","address":"150.00","ts":"1556421480000"}
{"name":"apple6","age":"18","city":"NingBo","address":"110.00","ts":"1556421510000"}
{"name":"apple7","age":"19","city":"JiaXing","address":"110.00","ts":"1556421570000"}
{"name":"apple8","age":"20","city":"NingBo","address":"100.00","ts":"1556421630000"}
{"name":"apple9","age":"17","city":"JiangXi","address":"110.00","ts":"1556421655000"}

对应的DDL是

 CREATE TABLE PERSON (name VARCHAR COMMENT '姓名',age VARCHAR COMMENT '年龄',city VARCHAR COMMENT '所在城市',address VARCHAR COMMENT '家庭住址',ts BIGINT COMMENT '时间戳',pay_time AS TO_TIMESTAMP(FROM_UNIXTIME(ts/1000, 'yyyy-MM-dd HH:mm:ss')), -- 定义事件时间WATERMARK FOR pay_time AS pay_time - INTERVAL '0' SECOND)WITH ('connector.type' = 'kafka', -- 使用 kafka connector'connector.version' = 'universal',  -- kafka 版本'connector.topic' = 'kafka_ddl',  -- kafka topic'connector.startup-mode' = 'earliest-offset', -- 从最早的 offset 开始读取'connector.properties.0.key' = 'zookeeper.connect',  -- 连接信息'connector.properties.0.value' = 'Desktop:2181','connector.properties.1.key' = 'bootstrap.servers','connector.properties.1.value' = 'Desktop:9091','update-mode' = 'append','format.type' = 'json',  -- 数据源格式为 json'format.derive-schema' = 'true' -- 从 DDL schema 确定 json 解析规则)

完整工程如下:

https://gitee.com/appleyuchi/Flink_Code/blob/master/flink读kafka/Scala/src/main/scala/FlinkKafkaDDLDemo.scala

本文内容已经绝大部分涵盖[7],不必再重复阅读[7]

Reference:

[1]FlinkSQL使用DDL语句创建kafka源表

[2]Kafka连接服务器出现:Connection to node 1 (localhost/127.0.0.1:9092) could not be established.

[3]Flink SQL深度篇

[6]Is there a way to delete all the data from a topic or delete the topic before every run?

[7]Flink 1.10.0 SQL DDL中如何定义watermark和计算列

flink DDL读取kafka数据-Scala嵌入DDL形式相关推荐

  1. kafka学习_《从0到1学习Flink》—— Flink 读取 Kafka 数据写入到 RabbitMQ

    前言 之前有文章 <从0到1学习Flink>-- Flink 写入数据到 Kafka 写过 Flink 将处理后的数据后发到 Kafka 消息队列中去,当然我们常用的消息队列可不止这一种, ...

  2. 《从0到1学习Flink》—— Flink 读取 Kafka 数据批量写入到 MySQL

    <!-- more --> 前言 之前其实在 <从0到1学习Flink>-- 如何自定义 Data Sink ? 文章中其实已经写了点将数据写入到 MySQL,但是一些配置化的 ...

  3. 写入mysql_《从0到1学习Flink》—— Flink 读取 Kafka 数据批量写入到 MySQL

    前言 之前其实在 <从0到1学习Flink>-- 如何自定义 Data Sink ? 文章中其实已经写了点将数据写入到 MySQL,但是一些配置化的东西当时是写死的,不能够通用,最近知识星 ...

  4. 【腾讯轻量应用服务器上部署kafka并通过flink读取kafka数据】

    环境准备 经过1个月的摸索,最终选择在腾讯云上搭建一个学习环境.当时选择原因还是新用户有优惠(150左右3年),但现在看1核2g的配置勉强够用,建议后续小伙伴选择时最好是2核4g配置. 由于是单节点安 ...

  5. Spark Streaming读取Kafka数据的两种方式

    Kafka在0.8和0.10之间引入了一种新的消费者API,因此,Spark Streaming与Kafka集成,有两种包可以选择: spark-streaming-kafka-0-8与spark-s ...

  6. java读写德卡数据_Spark Streaming 读取Kafka数据写入ES

    简介: 目前项目中已有多个渠道到Kafka的数据处理,本文主要记录通过Spark Streaming 读取Kafka中的数据,写入到Elasticsearch,达到一个实时(严格来说,是近实时,刷新时 ...

  7. flink定时读取mysql数据_flink时间系统系列之实例讲解:如何做定时输出

    flink时间系统系列篇幅目录: 六.实例讲解:如何做定时输出 今天为大家带来flink时间系统系列最后一篇实战篇,同样也是查漏补缺篇:如何做定时输出,首先说一下定时输出的需求背景,在flink流处理 ...

  8. Flink 分别读取kafka和mysql作为source

    需求 首先从kafka中读取数据,然后从mysql中读取数据,然后将这两个数据进行合并处理. 环境 Flink 1.8.2 实现 public static void main(String[] ar ...

  9. Flink1.11 读取kafka数据写入hive,未完待续

    昨天晚上Flink1.11出了,这次改动很多,我只关心hive这一部分. 目前尝试了几个小时用代码读取hive,安装官网的文档,没成功,先蹭个热点,记录下. 先贴一下依赖吧: 注意:反正各种报错,看社 ...

最新文章

  1. 几条曲线构建Android表白程序
  2. 日志审计产品(转载)
  3. Android错误-error:Foundtextwhereitemtagisexpected
  4. 图像分割过分割和欠分割_使用图割的图像分割
  5. go语言实现zip压缩与解压
  6. Android开发笔记(四十六)手机相关事件
  7. 树莓派安装qq linux,2019年树莓派运行新版原生LinuxQQ?【已更新】
  8. Segue区别及视图的切换
  9. unix源码分析_UNIX网络分析
  10. 仿苹果 底部弹窗 选择列表
  11. oauth2生成jwt令牌
  12. 7.ROS编程学习:自定义服务数据c++调用
  13. MYSQL 金额转大写中文
  14. Excel 2010 VBA 入门 037 获取最后一行数据的行数
  15. 怎么读取照片内的文字
  16. 微信小程序开发框架搭建
  17. 服务器系统盘40g是什么,云服务器 40g系统盘
  18. Spring整合Quartz集群部署
  19. 向苹果提交bug时的糟心经历
  20. 计算机或设备将不接受连接 解决办法

热门文章

  1. ES6 必须要用的数组Filter() 方法,不要再自己循环遍历了!!!
  2. Python+Selenium+PIL+Tesseract真正自动识别验证码进行一键登录
  3. 文件夹操作之判断是否存在(Directory)
  4. Oracle_双机备份_资料
  5. mysql插入日期 vs oracle插入日期
  6. HTTP调试工具:Fiddler 介绍二
  7. MOCTF-Web-文件包含
  8. 电源pd功能的充电器_PD快充充电器电源芯片和普通充电器电源芯片有什么区别?...
  9. java swing 导出文件_java swing (一) 导出excel文件并打开
  10. 正则表达式获取一个文本域中每一行的值并且去掉前后空格