.

  • 一 .前言
  • 二 .四种读取类型
    • 2.1. 基于本地集合的source(Collection-based-source)
    • 2.2. 基于文件的source(File-based-source)
    • 2.2.1. readTextFile
    • 2.2.2. readCsvFile
    • 2.3. 基于网络套接字的source(Socket-based-source)
    • 2.4. 自定义的source(Custom-source)
      • 2.4.1.使用MySQL作为数据源
      • 2.4.2.使用Kafka作为数据源
      • 2.4.3.自定义数据源

一 .前言

本文主要写Flink读取数据的方式.  只考虑DataStream API.

数据读取的API定义在StreamExecutionEnvironment, 这是Flink流计算的起点. 一个DataStream就是从数据读取API中构造出来的.

二 .四种读取类型

Flink在流处理上大致有4大类 :

  1. 基于本地集合的source(Collection-based-source)
  2. 基于文件的source(File-based-source)- 读取文本文件,即符合 TextInputFormat 规范的文件,并将其作为字符串返回
  3. 基于网络套接字的source(Socket-based-source)- 从 socket 读取。元素可以用分隔符切分。
  4. 自定义的source(Custom-source)

2.1. 基于本地集合的source(Collection-based-source)

其实就是把集合中的数据变成DataStream.

注: scala版本不支持 Iterable , Set, Map 集合.

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, createTypeInformation}import scala.collection.mutable
import scala.collection.mutable.{ArrayBuffer, ListBuffer}object CollectionSource {def main(args: Array[String]): Unit ={// 1. 创建流处理环境val env = StreamExecutionEnvironment.getExecutionEnvironment// 设置并行度,默认和CPU的核数相同env.setParallelism(1)//0.用element创建DataStream(fromElements)val ds0 : DataStream[Int] = env.fromElements(1,3234,55,65,74523,1)
//    ds0.print()//1.用Tuple创建DataStream(fromElements)val ds1: DataStream[(Int,String)] = env.fromElements((1,"bo"),(2,"yi"))
//    ds1.print()//2.用Array创建DataStreamval ds2: DataStream[String] = env.fromCollection(Array("bo","yi"))
//    ds2.print()//3.用ArrayBuffer创建DataStreamval ds3 :DataStream[String] = env.fromCollection(ArrayBuffer("bo","yi"))
//    ds3.print()//4.用List创建DataStreamval ds4 : DataStream[String] = env.fromCollection(List("bo","yi"))
//    ds4.print()//5.用ListBuffer创建DataStreamval ds5 : DataStream[String] =  env.fromCollection(ListBuffer("BO","YI"))
//    ds5.print()//6.用Vector创建DataStreamval ds6 : DataStream[String] = env.fromCollection(Vector("bo","yi","!!!"))
//    ds6.print()//7.用Queue创建DataStreamval ds7: DataStream[String] = env.fromCollection(mutable.Queue("bo", "yi","flink","!!!"))
//    ds7.print()//8.用Stack创建DataStreamval ds8: DataStream[String] = env.fromCollection(mutable.Stack("bo", "yi","flink","!!!"))
//    ds8.print()//9.用Stream创建DataStreamval ds9: DataStream[String] = env.fromCollection(Stream("bo", "yi","flink","!!!"))
//    ds9.print()//10.用Seq创建DataStreamval ds10: DataStream[String] = env.fromCollection(Seq("bo", "yi","flink","!!!"))
//    ds10.print()//11.用Set创建DataStream(不支持)//val ds11: DataStream[String] = env.fromCollection(Set("bo", "yi","flink","!!!"))//ds11.print()//12.用Iterable创建DataStream(不支持)//val ds12: DataStream[String] = env.fromCollection(Iterable("bo", "yi","flink","!!!"))//ds12.print()//13.用ArraySeq创建DataStreamval ds13: DataStream[String] = env.fromCollection(mutable.ArraySeq("bo", "yi","flink","!!!"))
//    ds13.print()//    //14.用ArrayStack创建DataStream
//    val ds14: DataStream[String] = env.fromCollection(mutable.ArrayStack("bo", "yi","flink","!!!"))
//    ds14.print()//15.用Map创建DataStream(不支持)//val ds15: DataStream[(Int, String)] = env.fromCollection(Map(1 -> "spark", 2 ->"flink"))//ds15.print()//    //16.用Range创建DataStreamval ds16: DataStream[Int] = env.fromCollection(Range(1, 9))
//    ds16.print()//    //17. Sequence创建DataStreamval ds17: DataStream[Long] = env.fromSequence(1, 9)ds17.print()// 执行任务,但是在流环境下,必须手动执行任务env.execute()}}

2.2. 基于文件的source(File-based-source)

2.2.1. readTextFile

从本地或者hdfs中加载数据


import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}object FileSource {def main(args : Array[String]) : Unit = {// 1. 获取流处理运行环境val env:StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment// 2. 读取文件val data : DataStream[String] = env.readTextFile("hdfs://h23:8020/tmp/test/score.csv")// 3. 打印数据data.print()// 4. 执行程序env.execute()}}

2.2.2. readCsvFile


import org.apache.flink.api.scala.{ExecutionEnvironment, createTypeInformation}object JoinOp {// 学科Subject(学科ID、学科名字)case class Subject(id:Int, name:String)// 成绩Score(唯一ID、学生姓名、学科ID、分数)case class Score(id:Int, name:String, subjectId:Int, score:Double)def main(args: Array[String]): Unit = {// 1. 创建流处理环境val env = ExecutionEnvironment.getExecutionEnvironment// 2.用fromCollection创建DataStream(fromCollection)val socreData = env.readCsvFile[Score]("hdfs://h23:8020/tmp/test/score.csv")val subjectData = env.readCsvFile[Subject]("hdfs://h23:8020/tmp/test/subject.csv")// 3.处理数据val joinData = socreData.join(subjectData).where(2).equalTo(0)// 4.打印输出joinData.print()}}

2.3. 基于网络套接字的source(Socket-based-source)


import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}object SocketSource {def main (args : Array[String]) : Unit = {//1. 获取流处理运行环境val env : StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment()// 2. 构建socket流数据源,并指定IP地址和端口号val data : DataStream[String] = env.socketTextStream("localhost",6666)// 3. 转换,以空格拆分单词val res = data.flatMap(_.split(" "))// 4. 打印输出res.print()// 5. 启动执行env.execute("WordCount_Stream")}}

2.4. 自定义的source(Custom-source)

Flink 中你可以使用 StreamExecutionEnvironment.addSource(source) 来为你的程序添加数据来源。
Flink 已经提供了若干实现好了的 source functions,当然你也可以通过实现 SourceFunction 来自定义非并行的
source或者实现 ParallelSourceFunction 接口或者扩展 RichParallelSourceFunction 来自定义并行的source。

2.4.1.使用MySQL作为数据源


package com.boyi.datasourceimport java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, RichSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}object MysqlCustomSource {def main(args : Array[String]) : Unit = {// 1. envval env = StreamExecutionEnvironment.getExecutionEnvironment// 2 使用自定义Sourceval mySqlDataStream: DataStream[(Int, String, String, String)] = env.addSource(new MysqlSource)// 3. 打印结果mySqlDataStream.print()// 4. 执行任务env.execute()}//  1. 自定义Source,继承自RichSourceFunctionclass MysqlSource extends RichSourceFunction[(Int,String,String,String)]{var connection:  Connection = null;var ps:  PreparedStatement = null;override def open(parameters: Configuration): Unit = {super.open(parameters)//    1. 加载驱动Class.forName("com.mysql.jdbc.Driver")//    2. 创建连接connection = DriverManager.getConnection("jdbc:mysql://127.0.0.1:3306/tmp?useUnicode=true&characterEncoding=UTF-8&useSSL=false", "root", "root")//    3. 创建PreparedStatementval sql = "select id,username,password,name from user"ps = connection.prepareStatement(sql)}//  2. 实现run方法override def run(sourceContext: SourceFunction.SourceContext[(Int, String, String, String)]): Unit = {//    4. 执行查询val resultSet: ResultSet = ps.executeQuery()//    5. 遍历查询结果,收集数据while(resultSet.next()){val id = resultSet.getInt("id")val username = resultSet.getString("username")val password = resultSet.getString("password")val name = resultSet.getString("name")// 收集数据sourceContext.collect((id,username,password,name))}}override def cancel(): Unit = {if(null != connection){connection.close()}if (ps != null) {ps.close()}}}}

2.4.2.使用Kafka作为数据源

package com.boyi.datasourceimport java.util.Propertiesimport org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.kafka.clients.CommonClientConfigsobject KafkaCustomSource {def main(args : Array[String])  :  Unit = {// 1. 创建流式环境val env : StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//2 .指定kafak相关信息val kafkaCluster = "k01:9092,k02:9092"val kafkaTopic = "test"// 3. 创建Kafka数据流val props = new Properties()props.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,kafkaCluster)val flinkKafkaConsumer = new FlinkKafkaConsumer[String](kafkaTopic,new SimpleStringSchema(),props)//4 .设置数据源val data : DataStream[String] = env.addSource(flinkKafkaConsumer)// 5. 打印数据data.print()// 6.执行任务env.execute()}}

2.4.3.自定义数据源

package com.boyi.datasourceimport java.util.UUID
import java.util.concurrent.TimeUnitimport org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}import scala.util.Random//  自定义数据源, 每1秒钟随机生成一条订单信息( 订单ID 、 用户ID 、 订单金额 、 时间戳 )
//  要求:
//    随机生成订单ID(UUID)
//    随机生成用户ID(0-2)
//    随机生成订单金额(0-100)
//    时间戳为当前系统时间//    开发步骤:
//      1. 创建订单样例类
//      2. 获取流处理环境
//      3. 创建自定义数据源
//          循环1000次
//          随机构建订单信息
//          上下文收集数据
//          每隔一秒执行一次循环
//      4. 打印数据
//      5. 执行任务object OwnCustomSource {// 创建一个订单样例类Order,包含四个字段(订单ID、用户ID、订单金额、时间戳)case class Order(id : String , userId : Int , money : Long , createTime : Long)def main(args : Array[String]) : Unit = {// 1. 获取流处理运行环境rval env = StreamExecutionEnvironment.getExecutionEnvironment// 2. 创建一个自定义数据源val ownCustomSource : DataStream[Order] =   env.addSource(new RichSourceFunction[Order] {override def run(sourceContext: SourceFunction.SourceContext[Order]): Unit = {for (i <- 0 until 1000){// 随机生成订单ID(UUID)val id = UUID.randomUUID().toString// 随机生成用户ID(0-2)val userId = Random.nextInt(3)// 随机生成订单金额(0-100)val money =  Random.nextInt(101)// 时间戳为当前系统时间val createTime = System.currentTimeMillis()// 收集数据sourceContext.collect(Order(id,userId,money, createTime))// 每隔1秒生成一个订单TimeUnit.SECONDS.sleep(1)}}override def cancel(): Unit = ()})ownCustomSource.print()env.execute()}}

代码:
git lab : https://github.com/BoYiZhang/flink-demo

Flink实操 : DataSource操作相关推荐

  1. Flink实操 : 算子操作

    . 一 .前言 二 .算子操作 2.1. map 2.2. flatMap 2.3. mapPartition 2.4. filter 2.5. reduce/groupBy 2.6. reduceG ...

  2. Flink实操 : Sink操作

    . 一 .前言 二 .类型 2.1. 基于本地集合的sink 2.2. 基于文件的sink 2.2.1.将数据写入本地文件 2.2.2.将数据写入HDFS 2.3. Kafka Sink 2.4. M ...

  3. jQuery入门实操-css操作,鼠标点击事件,页面计算器

    前言 本文是学习jQuery中的一些实践,是jQuery入门的实操案例.更多语法可参考w3school的jQuery参考手册 jQuery是一个快速.简洁的JavaScript框架,是继Prototy ...

  4. Flink实操 : 广播变量/累加器/分布式缓存

    . 一 .前言 二 .广播变量使用 2.1.前言 2.2. 使用 三 .累加器 3.1. 前言 3.2. 使用 四 .分布式缓存 4.1. 前言 4.2.使用 一 .前言 二 .广播变量使用 2.1. ...

  5. Flink实操 : 状态管理

    . 一 .概念 1.1. 什么是有状态的计算? 1.2. 传统的流计算系统缺少对于程序状态的有效支持 1.3. Flink丰富的状态访问和高效的容错机制 二 .Keyed State 2.1.保存st ...

  6. 完整代码+实操!手把手教你操作Faster R-CNN和Mask R-CNN

    点击上方↑↑↑蓝字关注我们~ 「2019 Python开发者日」全日程揭晓,请扫码咨询 ↑↑↑ 机器视觉领域的核心问题之一就是目标检测(Object Detection),它的任务是找出图像当中所有感 ...

  7. rtk采点后如何导入cad_【干货】RTK实操视频:工程之星5.0操作攻略!(第五部分)...

    前期回顾:[干货]RTK实操视频:工程之星5.0操作攻略!(第一部分)[干货]RTK实操视频:工程之星5.0操作教程(第二部分) [干货]RTK实操视频:工程之星5.0操作攻略!(第三部分) [干货] ...

  8. 从实操教学到赛题演练,腾讯专家亲授TI-ONE平台操作攻略!

    ​ 5月10日,我们迎来了"视"界直播周的首场直播--"2021腾讯广告算法大赛赛题解析".直播现场,芦清林和熊江丰老师对本届赛事的两大赛题进行了深入浅出的解析 ...

  9. 2021年R1快开门式压力容器操作考试题及R1快开门式压力容器操作实操考试视频

    题库来源:安全生产模拟考试一点通公众号小程序 安全生产模拟考试一点通:R1快开门式压力容器操作考试题是安全生产模拟考试一点通生成的,R1快开门式压力容器操作证模拟考试题库是根据R1快开门式压力容器操作 ...

最新文章

  1. Linux磁盘阵列技术详解(二)--raid 1创建
  2. 可以使用的mysql和navigate
  3. led灯光衰怎么解决_什么是LED光衰,光衰怎么解决?
  4. (1)搞一搞 seata 之 基础环境搭建
  5. memset函数具体说明
  6. led显示屏服务器怎么设置,led显示屏怎么改字幕 led显示屏改字幕方法
  7. 【人脸对齐-Landmarks】人脸关键点检测方法及评测汇总
  8. 一文带解读C# 动态拦截覆盖第三方进程中的函数(外挂必备)
  9. vs设计窗口不见了_龙猫腕表评测:VS沛纳海320V2版本
  10. 浏览器的加载顺序与页面性能优化
  11. HTML5本地存储——IndexedDB二:索引
  12. python切割音频文件_python3使用pydub切分音频文件
  13. 各厂商服务器存储默认管理口登录信息(默认IP、用户名、密码)
  14. 常有不规则动词的过去式和过去分词…
  15. 导论:什么是 Conversational Robot
  16. 3D游戏编程作业10
  17. 基于 MaxCompute+PAI 的用户增长方案实践
  18. java如何设置网页全屏_java中如何进行全屏方式和窗口方式的转换 详细??
  19. Google如何在新标签打开页面打开链接?
  20. 东软慧聚助力汽车“芯”节能减排

热门文章

  1. Android应用界面开发(一)
  2. 文字的纵向显示的问题 cdc drawtext
  3. 苹果保修期多久_手机的寿命究竟有多久,您的换机周期是否会等到手机报废呢?|手机|电池|安卓手机...
  4. [深度学习] PyTorch 实现双向LSTM 情感分析
  5. 2021-2027中国电工仪器仪表市场现状及未来发展趋势
  6. Map线程安全几种实现方法
  7. 汇编语言——伪指令详解
  8. 【密码学基础】03 传统加密技术
  9. flutter Dart Mixin后关于调用super的理解
  10. [汉化主题] Knowhow v1.1.16 – 响应式论坛知识库WordPress主题