Flink实操 : DataSource操作
.
- 一 .前言
- 二 .四种读取类型
- 2.1. 基于本地集合的source(Collection-based-source)
- 2.2. 基于文件的source(File-based-source)
- 2.2.1. readTextFile
- 2.2.2. readCsvFile
- 2.3. 基于网络套接字的source(Socket-based-source)
- 2.4. 自定义的source(Custom-source)
- 2.4.1.使用MySQL作为数据源
- 2.4.2.使用Kafka作为数据源
- 2.4.3.自定义数据源
一 .前言
本文主要写Flink读取数据的方式. 只考虑DataStream API.
数据读取的API定义在StreamExecutionEnvironment, 这是Flink流计算的起点. 一个DataStream就是从数据读取API中构造出来的.
二 .四种读取类型
Flink在流处理上大致有4大类 :
- 基于本地集合的source(Collection-based-source)
- 基于文件的source(File-based-source)- 读取文本文件,即符合 TextInputFormat 规范的文件,并将其作为字符串返回
- 基于网络套接字的source(Socket-based-source)- 从 socket 读取。元素可以用分隔符切分。
- 自定义的source(Custom-source)
2.1. 基于本地集合的source(Collection-based-source)
其实就是把集合中的数据变成DataStream.
注: scala版本不支持 Iterable , Set, Map 集合.
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, createTypeInformation}import scala.collection.mutable
import scala.collection.mutable.{ArrayBuffer, ListBuffer}object CollectionSource {def main(args: Array[String]): Unit ={// 1. 创建流处理环境val env = StreamExecutionEnvironment.getExecutionEnvironment// 设置并行度,默认和CPU的核数相同env.setParallelism(1)//0.用element创建DataStream(fromElements)val ds0 : DataStream[Int] = env.fromElements(1,3234,55,65,74523,1)
// ds0.print()//1.用Tuple创建DataStream(fromElements)val ds1: DataStream[(Int,String)] = env.fromElements((1,"bo"),(2,"yi"))
// ds1.print()//2.用Array创建DataStreamval ds2: DataStream[String] = env.fromCollection(Array("bo","yi"))
// ds2.print()//3.用ArrayBuffer创建DataStreamval ds3 :DataStream[String] = env.fromCollection(ArrayBuffer("bo","yi"))
// ds3.print()//4.用List创建DataStreamval ds4 : DataStream[String] = env.fromCollection(List("bo","yi"))
// ds4.print()//5.用ListBuffer创建DataStreamval ds5 : DataStream[String] = env.fromCollection(ListBuffer("BO","YI"))
// ds5.print()//6.用Vector创建DataStreamval ds6 : DataStream[String] = env.fromCollection(Vector("bo","yi","!!!"))
// ds6.print()//7.用Queue创建DataStreamval ds7: DataStream[String] = env.fromCollection(mutable.Queue("bo", "yi","flink","!!!"))
// ds7.print()//8.用Stack创建DataStreamval ds8: DataStream[String] = env.fromCollection(mutable.Stack("bo", "yi","flink","!!!"))
// ds8.print()//9.用Stream创建DataStreamval ds9: DataStream[String] = env.fromCollection(Stream("bo", "yi","flink","!!!"))
// ds9.print()//10.用Seq创建DataStreamval ds10: DataStream[String] = env.fromCollection(Seq("bo", "yi","flink","!!!"))
// ds10.print()//11.用Set创建DataStream(不支持)//val ds11: DataStream[String] = env.fromCollection(Set("bo", "yi","flink","!!!"))//ds11.print()//12.用Iterable创建DataStream(不支持)//val ds12: DataStream[String] = env.fromCollection(Iterable("bo", "yi","flink","!!!"))//ds12.print()//13.用ArraySeq创建DataStreamval ds13: DataStream[String] = env.fromCollection(mutable.ArraySeq("bo", "yi","flink","!!!"))
// ds13.print()// //14.用ArrayStack创建DataStream
// val ds14: DataStream[String] = env.fromCollection(mutable.ArrayStack("bo", "yi","flink","!!!"))
// ds14.print()//15.用Map创建DataStream(不支持)//val ds15: DataStream[(Int, String)] = env.fromCollection(Map(1 -> "spark", 2 ->"flink"))//ds15.print()// //16.用Range创建DataStreamval ds16: DataStream[Int] = env.fromCollection(Range(1, 9))
// ds16.print()// //17. Sequence创建DataStreamval ds17: DataStream[Long] = env.fromSequence(1, 9)ds17.print()// 执行任务,但是在流环境下,必须手动执行任务env.execute()}}
2.2. 基于文件的source(File-based-source)
2.2.1. readTextFile
从本地或者hdfs中加载数据
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}object FileSource {def main(args : Array[String]) : Unit = {// 1. 获取流处理运行环境val env:StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment// 2. 读取文件val data : DataStream[String] = env.readTextFile("hdfs://h23:8020/tmp/test/score.csv")// 3. 打印数据data.print()// 4. 执行程序env.execute()}}
2.2.2. readCsvFile
import org.apache.flink.api.scala.{ExecutionEnvironment, createTypeInformation}object JoinOp {// 学科Subject(学科ID、学科名字)case class Subject(id:Int, name:String)// 成绩Score(唯一ID、学生姓名、学科ID、分数)case class Score(id:Int, name:String, subjectId:Int, score:Double)def main(args: Array[String]): Unit = {// 1. 创建流处理环境val env = ExecutionEnvironment.getExecutionEnvironment// 2.用fromCollection创建DataStream(fromCollection)val socreData = env.readCsvFile[Score]("hdfs://h23:8020/tmp/test/score.csv")val subjectData = env.readCsvFile[Subject]("hdfs://h23:8020/tmp/test/subject.csv")// 3.处理数据val joinData = socreData.join(subjectData).where(2).equalTo(0)// 4.打印输出joinData.print()}}
2.3. 基于网络套接字的source(Socket-based-source)
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}object SocketSource {def main (args : Array[String]) : Unit = {//1. 获取流处理运行环境val env : StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment()// 2. 构建socket流数据源,并指定IP地址和端口号val data : DataStream[String] = env.socketTextStream("localhost",6666)// 3. 转换,以空格拆分单词val res = data.flatMap(_.split(" "))// 4. 打印输出res.print()// 5. 启动执行env.execute("WordCount_Stream")}}
2.4. 自定义的source(Custom-source)
Flink 中你可以使用 StreamExecutionEnvironment.addSource(source) 来为你的程序添加数据来源。
Flink 已经提供了若干实现好了的 source functions,当然你也可以通过实现 SourceFunction 来自定义非并行的
source或者实现 ParallelSourceFunction 接口或者扩展 RichParallelSourceFunction 来自定义并行的source。
2.4.1.使用MySQL作为数据源
package com.boyi.datasourceimport java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, RichSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}object MysqlCustomSource {def main(args : Array[String]) : Unit = {// 1. envval env = StreamExecutionEnvironment.getExecutionEnvironment// 2 使用自定义Sourceval mySqlDataStream: DataStream[(Int, String, String, String)] = env.addSource(new MysqlSource)// 3. 打印结果mySqlDataStream.print()// 4. 执行任务env.execute()}// 1. 自定义Source,继承自RichSourceFunctionclass MysqlSource extends RichSourceFunction[(Int,String,String,String)]{var connection: Connection = null;var ps: PreparedStatement = null;override def open(parameters: Configuration): Unit = {super.open(parameters)// 1. 加载驱动Class.forName("com.mysql.jdbc.Driver")// 2. 创建连接connection = DriverManager.getConnection("jdbc:mysql://127.0.0.1:3306/tmp?useUnicode=true&characterEncoding=UTF-8&useSSL=false", "root", "root")// 3. 创建PreparedStatementval sql = "select id,username,password,name from user"ps = connection.prepareStatement(sql)}// 2. 实现run方法override def run(sourceContext: SourceFunction.SourceContext[(Int, String, String, String)]): Unit = {// 4. 执行查询val resultSet: ResultSet = ps.executeQuery()// 5. 遍历查询结果,收集数据while(resultSet.next()){val id = resultSet.getInt("id")val username = resultSet.getString("username")val password = resultSet.getString("password")val name = resultSet.getString("name")// 收集数据sourceContext.collect((id,username,password,name))}}override def cancel(): Unit = {if(null != connection){connection.close()}if (ps != null) {ps.close()}}}}
2.4.2.使用Kafka作为数据源
package com.boyi.datasourceimport java.util.Propertiesimport org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.kafka.clients.CommonClientConfigsobject KafkaCustomSource {def main(args : Array[String]) : Unit = {// 1. 创建流式环境val env : StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//2 .指定kafak相关信息val kafkaCluster = "k01:9092,k02:9092"val kafkaTopic = "test"// 3. 创建Kafka数据流val props = new Properties()props.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,kafkaCluster)val flinkKafkaConsumer = new FlinkKafkaConsumer[String](kafkaTopic,new SimpleStringSchema(),props)//4 .设置数据源val data : DataStream[String] = env.addSource(flinkKafkaConsumer)// 5. 打印数据data.print()// 6.执行任务env.execute()}}
2.4.3.自定义数据源
package com.boyi.datasourceimport java.util.UUID
import java.util.concurrent.TimeUnitimport org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}import scala.util.Random// 自定义数据源, 每1秒钟随机生成一条订单信息( 订单ID 、 用户ID 、 订单金额 、 时间戳 )
// 要求:
// 随机生成订单ID(UUID)
// 随机生成用户ID(0-2)
// 随机生成订单金额(0-100)
// 时间戳为当前系统时间// 开发步骤:
// 1. 创建订单样例类
// 2. 获取流处理环境
// 3. 创建自定义数据源
// 循环1000次
// 随机构建订单信息
// 上下文收集数据
// 每隔一秒执行一次循环
// 4. 打印数据
// 5. 执行任务object OwnCustomSource {// 创建一个订单样例类Order,包含四个字段(订单ID、用户ID、订单金额、时间戳)case class Order(id : String , userId : Int , money : Long , createTime : Long)def main(args : Array[String]) : Unit = {// 1. 获取流处理运行环境rval env = StreamExecutionEnvironment.getExecutionEnvironment// 2. 创建一个自定义数据源val ownCustomSource : DataStream[Order] = env.addSource(new RichSourceFunction[Order] {override def run(sourceContext: SourceFunction.SourceContext[Order]): Unit = {for (i <- 0 until 1000){// 随机生成订单ID(UUID)val id = UUID.randomUUID().toString// 随机生成用户ID(0-2)val userId = Random.nextInt(3)// 随机生成订单金额(0-100)val money = Random.nextInt(101)// 时间戳为当前系统时间val createTime = System.currentTimeMillis()// 收集数据sourceContext.collect(Order(id,userId,money, createTime))// 每隔1秒生成一个订单TimeUnit.SECONDS.sleep(1)}}override def cancel(): Unit = ()})ownCustomSource.print()env.execute()}}
代码:
git lab : https://github.com/BoYiZhang/flink-demo
Flink实操 : DataSource操作相关推荐
- Flink实操 : 算子操作
. 一 .前言 二 .算子操作 2.1. map 2.2. flatMap 2.3. mapPartition 2.4. filter 2.5. reduce/groupBy 2.6. reduceG ...
- Flink实操 : Sink操作
. 一 .前言 二 .类型 2.1. 基于本地集合的sink 2.2. 基于文件的sink 2.2.1.将数据写入本地文件 2.2.2.将数据写入HDFS 2.3. Kafka Sink 2.4. M ...
- jQuery入门实操-css操作,鼠标点击事件,页面计算器
前言 本文是学习jQuery中的一些实践,是jQuery入门的实操案例.更多语法可参考w3school的jQuery参考手册 jQuery是一个快速.简洁的JavaScript框架,是继Prototy ...
- Flink实操 : 广播变量/累加器/分布式缓存
. 一 .前言 二 .广播变量使用 2.1.前言 2.2. 使用 三 .累加器 3.1. 前言 3.2. 使用 四 .分布式缓存 4.1. 前言 4.2.使用 一 .前言 二 .广播变量使用 2.1. ...
- Flink实操 : 状态管理
. 一 .概念 1.1. 什么是有状态的计算? 1.2. 传统的流计算系统缺少对于程序状态的有效支持 1.3. Flink丰富的状态访问和高效的容错机制 二 .Keyed State 2.1.保存st ...
- 完整代码+实操!手把手教你操作Faster R-CNN和Mask R-CNN
点击上方↑↑↑蓝字关注我们~ 「2019 Python开发者日」全日程揭晓,请扫码咨询 ↑↑↑ 机器视觉领域的核心问题之一就是目标检测(Object Detection),它的任务是找出图像当中所有感 ...
- rtk采点后如何导入cad_【干货】RTK实操视频:工程之星5.0操作攻略!(第五部分)...
前期回顾:[干货]RTK实操视频:工程之星5.0操作攻略!(第一部分)[干货]RTK实操视频:工程之星5.0操作教程(第二部分) [干货]RTK实操视频:工程之星5.0操作攻略!(第三部分) [干货] ...
- 从实操教学到赛题演练,腾讯专家亲授TI-ONE平台操作攻略!
5月10日,我们迎来了"视"界直播周的首场直播--"2021腾讯广告算法大赛赛题解析".直播现场,芦清林和熊江丰老师对本届赛事的两大赛题进行了深入浅出的解析 ...
- 2021年R1快开门式压力容器操作考试题及R1快开门式压力容器操作实操考试视频
题库来源:安全生产模拟考试一点通公众号小程序 安全生产模拟考试一点通:R1快开门式压力容器操作考试题是安全生产模拟考试一点通生成的,R1快开门式压力容器操作证模拟考试题库是根据R1快开门式压力容器操作 ...
最新文章
- Linux磁盘阵列技术详解(二)--raid 1创建
- 可以使用的mysql和navigate
- led灯光衰怎么解决_什么是LED光衰,光衰怎么解决?
- (1)搞一搞 seata 之 基础环境搭建
- memset函数具体说明
- led显示屏服务器怎么设置,led显示屏怎么改字幕 led显示屏改字幕方法
- 【人脸对齐-Landmarks】人脸关键点检测方法及评测汇总
- 一文带解读C# 动态拦截覆盖第三方进程中的函数(外挂必备)
- vs设计窗口不见了_龙猫腕表评测:VS沛纳海320V2版本
- 浏览器的加载顺序与页面性能优化
- HTML5本地存储——IndexedDB二:索引
- python切割音频文件_python3使用pydub切分音频文件
- 各厂商服务器存储默认管理口登录信息(默认IP、用户名、密码)
- 常有不规则动词的过去式和过去分词…
- 导论:什么是 Conversational Robot
- 3D游戏编程作业10
- 基于 MaxCompute+PAI 的用户增长方案实践
- java如何设置网页全屏_java中如何进行全屏方式和窗口方式的转换 详细??
- Google如何在新标签打开页面打开链接?
- 东软慧聚助力汽车“芯”节能减排
热门文章
- Android应用界面开发(一)
- 文字的纵向显示的问题 cdc drawtext
- 苹果保修期多久_手机的寿命究竟有多久,您的换机周期是否会等到手机报废呢?|手机|电池|安卓手机...
- [深度学习] PyTorch 实现双向LSTM 情感分析
- 2021-2027中国电工仪器仪表市场现状及未来发展趋势
- Map线程安全几种实现方法
- 汇编语言——伪指令详解
- 【密码学基础】03 传统加密技术
- flutter Dart Mixin后关于调用super的理解
- [汉化主题] Knowhow v1.1.16 – 响应式论坛知识库WordPress主题