Flink系列之:基于scala语言实现flink实时消费Kafka Topic中的数据

  • 一、引入flink相关依赖
  • 二、properties保存连接kafka的配置
  • 三、构建flink实时消费环境
  • 四、添加Kafka源和处理数据
  • 五、完整代码
  • 六、执行程序查看消费到的数据

一、引入flink相关依赖

    <groupId>com.bigdata</groupId><artifactId>flink</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>11</maven.compiler.source><maven.compiler.target>11</maven.compiler.target><flink.version>1.13.1</flink.version><scala.binary.version>2.12</scala.binary.version></properties><dependencies><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-scala --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.binary.version}</artifactId><version>${flink.version}</version><!-- <scope>provided</scope>--></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency></dependencies>

二、properties保存连接kafka的配置

    //用properties保存kafka连接的相关配置val properties = new Properties()properties.setProperty("bootstrap.servers","10.129.44.26:9092,10.129.44.32:9092,10.129.44.39:9092")properties.setProperty("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username=\"debezium\" password=\"swlfalfal\";")properties.setProperty("security.protocol","SASL_PLAINTEXT")properties.setProperty("sasl.mechanism", "PLAIN")properties.setProperty("group.id","flink-test")properties.setProperty("auto.offset.reset","earliest")

三、构建flink实时消费环境

    val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)env.setRestartStrategy(RestartStrategies.noRestart())

四、添加Kafka源和处理数据

    val lines: DataStream[String] = env.addSource(new FlinkKafkaConsumer[String]("debezium-test-optics_uds",new SimpleStringSchema(),properties))lines.print()//触发执行env.execute()

五、完整代码

import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerimport java.util.Propertiesobject SourceKafka {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)env.setRestartStrategy(RestartStrategies.noRestart())//用properties保存kafka连接的相关配置val properties = new Properties()properties.setProperty("bootstrap.servers","10.129.44.26:9092,10.129.44.32:9092,10.129.44.39:9092")properties.setProperty("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username=\"debezium\" password=\"******\";")properties.setProperty("security.protocol","SASL_PLAINTEXT")properties.setProperty("sasl.mechanism", "PLAIN")properties.setProperty("group.id","flink-test")properties.setProperty("auto.offset.reset","earliest")//添加kafka源,并打印数据val lines: DataStream[String] = env.addSource(new FlinkKafkaConsumer[String]("debezium-test-optics_uds",new SimpleStringSchema(),properties))lines.print()//触发执行env.execute()}}

六、执行程序查看消费到的数据

{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"sid"},{"type":"string","optional":false,"field":"sname"},{"type":"int64","optional":false,"name":"io.debezium.time.Timestamp","version":1,"field":"updatetime"},{"type":"string","optional":false,"field":"ssex"}],"optional":true,"name":"debezium_test_optics_uds.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"sid"},{"type":"string","optional":false,"field":"sname"},{"type":"int64","optional":false,"name":"io.debezium.time.Timestamp","version":1,"field":"updatetime"},{"type":"string","optional":false,"field":"ssex"}],"optional":true,"name":"debezium_test_optics_uds.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"debezium_test_optics_uds.Envelope"},"payload":{"before":null,"after":{"sid":3600,"sname":"f","updatetime":1661126400000,"ssex":"a"},"source":{"version":"1.9.6.Final","connector":"mysql","name":"debezium-uds8-optics8-test_1h","ts_ms":1665155935000,"snapshot":"false","db":"dw","sequence":null,"table":"student","server_id":223344,"gtid":null,"file":"mysql-bin.000012","pos":6193972,"row":0,"thread":66072,"query":"/* ApplicationName=DBeaver 21.0.1 - SQLEditor <Script-3.sql> */ insert into dw.student values(3600,'f','20220822','a')"},"op":"c","ts_ms":1665155935640,"transaction":null}
}

Flink系列之:基于scala语言实现flink实时消费Kafka Topic中的数据相关推荐

  1. 人工智能-推荐系统-模块01:离线统计模块【使用SparkSQL(基于Scala语言/Python语言)进行离线统计分析:历史热门商品统计、近期热门商品统计、商品平均评分统计...】

    一.基于scala语言的SparkSQL离线统计分析 1.将数据导入MongoDB数据库 DataLoader.scala import com.mongodb.casbah.commons.Mong ...

  2. C语言 | 建立链表,输出各结点中的数据

    例42:C语言实现一个简单链表,它由3个学生数据的结点组成,要求输出各结点中的数据. 解题思路:读者在学习这道例题的时候,应该首先分析三个问题. 各个结点是怎么样构成链表的? 没有头指针head行不行 ...

  3. flink实时消费kafka中oracle的DML数据写入mysql

    1.需要环境 zookeeper,小编安装环境为zookeeper-3.4.10 kakfa,小编安装环境为kafka_2.13-2.8.0 kafka-connect-oracle,此为kafka- ...

  4. 基于R语言极值统计学及其在相关领域中的应用

    受到气候变化.温室效应以及人类活动等因素的影响,自然界中极端高温.极端环境污染.大洪水和大暴雨等现象的发生日益频繁:在人类社会中,股市崩溃.金融危机等极端情况也时有发生:今年的新冠疫情就是非常典型的极 ...

  5. 中数据逆序输出_C语言 | 建立链表,输出各结点中的数据

    例42:C语言实现一个简单链表,它由3个学生数据的结点组成,要求输出各结点中的数据. 解题思路:读者在学习这道例题的时候,应该首先分析三个问题. 各个结点是怎么样构成链表的? 没有头指针head行不行 ...

  6. c语言读取网页文本内容,从文本文件中读取数据

    请问如何用c语言从txt文件中读取数据? 请问如何用c语言从txt文件中读取数据? 就是用空格分开的数据,例如:12//其中的in.txt就是你要读取数据的文件,当然把它和程序放在同一目录 - #in ...

  7. 【毕业设计/Matlab系列】基于MATLAB语言的实时变声器系统

    Date:2022.5.5 文章目录 前言 1.算法原理 2.实现效果 2.1.男声变女声 2.2.女声变男声 2.3.男声变大叔声 3.部分matlab代码实现 前言 在大学毕业设计中,要求实现一个 ...

  8. 【Flink系列】开发篇:1. Flink维表关联方案

    数据流往往需要访问外部的数据源来丰富自己的信息,比如通过record中的ip地址查询ip数据库maxmind的GeoIP2 Databases得到ip对应的城市名称,城市经纬度,将这些作为新的字段添加 ...

  9. 【基于scala语言的spark编程】

    2.1Scala介绍及安装 scala是一门类JAVA的多范式语言,它整合了面向对象编程和函数式编程的最佳特性 Scala运行于Java虚拟机 (JVM)之上,并且兼容现有的Java程序 Scala是 ...

最新文章

  1. React useState,useEffect ,Hook是什么?什么是副作用?
  2. Streams:深入理解Redis5.0新特性
  3. 操作系统原理二进程切换,调度
  4. linux nvme分区,这些 loop 分区是什么鬼东西?
  5. .Net 下载网络图片
  6. 判断客户端是电脑还是手机
  7. ​iIIegalArgumentException:the bind value at index 1 isnull
  8. Spring Boot————BeanCreationNotAllowedException异常分析
  9. git 删除本地和远程分支_如何在本地和远程删除Git分支
  10. 静态RAM(1K X 4位)2114原理介绍(抄原理图)
  11. Java 创建xml文件和操作xml数据
  12. mac虚拟机哪个好用 mac双系统和虚拟机哪个好
  13. 【PHP练习】每日词汇,随机产生10个单词,方便备考随时背诵(php+html+css)
  14. 锯齿波调制的FMCW雷达差拍信号的推导及分析
  15. 读史可以明智_在开发中明智思考的5种方法
  16. 一行代码实现curry化
  17. 《计算机网络》-- 第一章 概述
  18. PlantCV中文文档
  19. 【解决方案】宠物医院如何实现顾客医院联动的安全监控?EasyCVR视频图像智能分析远程监控方案介绍
  20. Matlab入门教程--基本运算与函数(一)

热门文章

  1. ae等高线_AE插件-地形海拔轮廓等高线动画 Topograph v1.0 Win/Mac + 视频教程
  2. 高中计算机教室标语,高中教室励志标语
  3. 微信分享卡片制作_微信分享卡片自定义制作
  4. 对 GNU/Linux 介绍、ArchLinux社区氛围、DDE移植Arch流程
  5. 理论研究类硕士学位论文写作思路
  6. VOD(视频点播技术)基本原理
  7. python的mag模块_Python Decimal min_mag()用法及代码示例
  8. 2020张宇1000题【好题收集】【第三章:一元函数积分学】
  9. jass 添加资源的代码
  10. 基于 SQLite 开发Android studio 的记账APP