一、flink在批处理中常见的source

  flink在批处理中常见的source主要有两大类:  

    1.基于本地集合的source(Collection-based-source)   

    2.基于文件的source(File-based-source)

 1.基于本地集合的source

      在flink最常见的创建DataSet方式有三种。   

1.使用env.fromElements(),这种方式也支持Tuple,自定义对象等复合形式。   

2.使用env.fromCollection(),这种方式支持多种Collection的具体类型   

3.使用env.generateSequence()方法创建基于Sequence的DataSet

import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment, _}
import scala.collection.immutable.{Queue, Stack}
import scala.collection.mutable
import scala.collection.mutable.{ArrayBuffer, ListBuffer}object DataSource001 {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironment//0.用element创建DataSet(fromElements)val ds0: DataSet[String] = env.fromElements("spark", "flink")ds0.print()//1.用Tuple创建DataSet(fromElements)val ds1: DataSet[(Int, String)] = env.fromElements((1, "spark"), (2, "flink"))ds1.print()//2.用Array创建DataSetval ds2: DataSet[String] = env.fromCollection(Array("spark", "flink"))ds2.print()//3.用ArrayBuffer创建DataSetval ds3: DataSet[String] = env.fromCollection(ArrayBuffer("spark", "flink"))ds3.print()//4.用List创建DataSetval ds4: DataSet[String] = env.fromCollection(List("spark", "flink"))ds4.print()//5.用List创建DataSetval ds5: DataSet[String] = env.fromCollection(ListBuffer("spark", "flink"))ds5.print()//6.用Vector创建DataSetval ds6: DataSet[String] = env.fromCollection(Vector("spark", "flink"))ds6.print()//7.用Queue创建DataSetval ds7: DataSet[String] = env.fromCollection(Queue("spark", "flink"))ds7.print()//8.用Stack创建DataSetval ds8: DataSet[String] = env.fromCollection(Stack("spark", "flink"))ds8.print()//9.用Stream创建DataSet(Stream相当于lazy List,避免在中间过程中生成不必要的集合)val ds9: DataSet[String] = env.fromCollection(Stream("spark", "flink"))ds9.print()//10.用Seq创建DataSetval ds10: DataSet[String] = env.fromCollection(Seq("spark", "flink"))ds10.print()//11.用Set创建DataSetval ds11: DataSet[String] = env.fromCollection(Set("spark", "flink"))ds11.print()//12.用Iterable创建DataSetval ds12: DataSet[String] = env.fromCollection(Iterable("spark", "flink"))ds12.print()//13.用ArraySeq创建DataSetval ds13: DataSet[String] = env.fromCollection(mutable.ArraySeq("spark", "flink"))ds13.print()//14.用ArrayStack创建DataSetval ds14: DataSet[String] = env.fromCollection(mutable.ArrayStack("spark", "flink"))ds14.print()//15.用Map创建DataSetval ds15: DataSet[(Int, String)] = env.fromCollection(Map(1 -> "spark", 2 -> "flink"))ds15.print()//16.用Range创建DataSetval ds16: DataSet[Int] = env.fromCollection(Range(1, 9))ds16.print()//17.用fromElements创建DataSetval ds17: DataSet[Long] =  env.generateSequence(1,9)ds17.print()}
}

2.基于文件的source(File-based-source)

flink支持多种存储设备上的文件,包括本地文件,hdfs文件,alluxio文件等。
flink支持多种文件的存储格式,包括text文件,CSV文件等。
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment,_}object DataSource002 {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironment//1.读取本地文本文件,本地文件以file://开头val ds1: DataSet[String] = env.readTextFile("file:///Applications/flink-1.1.3/README.txt")ds1.print()//2.读取hdfs文本文件,hdfs文件以hdfs://开头,不指定master的短URLval ds2: DataSet[String] = env.readTextFile("hdfs:///input/flink/README.txt")ds2.print()//3.读取hdfs CSV文件,转化为tupleval path = "hdfs://qingcheng11:9000/input/flink/sales.csv"val ds3 = env.readCsvFile[(String, Int, Int, Double)](filePath = path,lineDelimiter = "\n",fieldDelimiter = ",",lenient = false,ignoreFirstLine = true,includedFields = Array(0, 1, 2, 3))ds3.print()//4.读取hdfs CSV文件,转化为case classcase class Sales(transactionId: String, customerId: Int, itemId: Int, amountPaid: Double)val ds4 = env.readCsvFile[Sales](filePath = path,lineDelimiter = "\n",fieldDelimiter = ",",lenient = false,ignoreFirstLine = true,includedFields = Array(0, 1, 2, 3),pojoFields = Array("transactionId", "customerId", "itemId", "amountPaid"))ds4.print()}
}

3.基于文件的source(遍历目录)

flink支持对一个文件目录内的所有文件,包括所有子目录中的所有文件的遍历访问方式。
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration/*** 递归读取hdfs目录中的所有文件,会遍历各级子目录*/
object DataSource003 {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironment// create a configuration objectval parameters = new Configuration// set the recursive enumeration parameterparameters.setBoolean("recursive.file.enumeration", true)// pass the configuration to the data sourceval ds1 = env.readTextFile("hdfs:///input/flink").withParameters(parameters)ds1.print()}
}

 
 

转载于:https://www.cnblogs.com/linkmust/p/10896051.html

flink批处理中的source以及sink介绍相关推荐

  1. Flink批处理中的增量迭代

    对某些迭代而言并不是单次迭代产生的下一次工作集中的每个元素都需要重新参与下一轮迭代,有时只需要重新计算部分数据同时选择性地更新解集,这种形式的迭代就是增量迭代.增量迭代能够使得一些算法执行得更高效,它 ...

  2. flink链接mysql_flink 将mysql作为Source和Sink的代码示例

    1.maven导入 mysql mysql-connector-java 5.1.34 2.SourceFromMySQL工具类java代码 import org.apache.flink.confi ...

  3. flink source和sink

    flink中的source作为整个stream中的入口,而sink作为整个stream的终点. SourceFunction为所有flink中source的根接口,其定义了run()方法和cancel ...

  4. 1.31.Flink自定义rocketmq(source/sink)+自定义redis source和sink

    1.31.Flink自定义rocketmq(source/sink)+自定义redis+自定义 1.31.1.工程结构 1.31.2.定义pom.xml文件 1.31.3.log4j2.propert ...

  5. Flink自带的Source源算子以及自定义数据源Source

    文章目录 Flink的DataStream API(基础篇) Source源算子 从集合中读取数据 从文件中读取数据 从Scoket中读取数据 从Kafka中读取数据 自定义Source Flink的 ...

  6. [ETL] Flume 理论与demo(Taildir Source Hdfs Sink)

    一.Flume简介 1. Flume概述 Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集.聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据: ...

  7. 23.Flink-高级特性-新特性-Streaming Flie Sink\介绍\代码演示\Flink-高级特性-新特性-FlinkSQL整合Hive\添加依赖和jar包和配置

    23.Flink-高级特性-新特性-Streaming Flie Sink 23.1.介绍 23.2.代码演示 24.Flink-高级特性-新特性-FlinkSQL整合Hive 24.1.介绍 24. ...

  8. 三十九、Flume自定义Source、Sink

    上篇文章咱们基于Flume举了几个例子,包括它的扇入扇出等等.这篇文章我们主要来看一下怎样通过自定义Source和Sink来实现Flume的数据采集.关注专栏<破茧成蝶--大数据篇>,查看 ...

  9. Flink-常用Source与Sink的使用汇总整理

    Flink-常用Source与Sink的使用汇总整理 基础结构 Source 本地数据源 端口数据源 文件数据源 Kafka-Source 自定义JDBC-Source 数据处理 SingleData ...

最新文章

  1. handle句柄 matlab_学习随笔之Matlab句柄对象深拷贝方法
  2. html表格数据点击事件,如何在iview的table单元格里实现点击事件?
  3. Linux命令常用的快捷键
  4. lan8720a自协商启动_惠及18个小区17851户!今年海曙老旧小区改造启动,重点内容包括…...
  5. 总结之:CentOS 6.5 LAMP分主机平台的搭建及测试
  6. Lucene学习笔记:一,全文检索的基本原理
  7. PHP调用shell命令
  8. Mutisim14.0安装后,汉化的详细方法
  9. H264 数据帧头分析
  10. 安科瑞无线测温装置,多点温度在线测温装置
  11. 教你如何进入有密码的 XP 系统
  12. 通过JAVA编写DOMINO服务器端插件程序
  13. 青少年CTF - Web - Flag在哪里 Wp WriteUp
  14. 和讯金融界证券之星 财经网站竞争格局突变
  15. java实现数字金额转换成汉字大写金额
  16. HCIA--基础网络实验---HTTP服务搭建
  17. 使用ASP.NET技术开发网上书店
  18. IE浏览器下载文件名出现中文乱码问题解决
  19. 借助EDGE,实现文本朗读
  20. 多类分割mask,如何转彩色图像并保存

热门文章

  1. zcmu1133(dfs+判重)
  2. 大数运算(6)——大数阶乘(求位数)
  3. 连接不同区块链的跨链技术介绍
  4. Android Binder 分析——原理
  5. Android Linux内核编译调试
  6. Android安全教程(2)---Fiddler简易使用教程之使用
  7. 多项式的ln、exp、快速幂和开根学习小记
  8. linux常见命令_Linux系统常见命令
  9. python手写均值滤波器_python手写均值滤波
  10. 产品开发管理方法工具流程 pdf_pdf转化为word的方法有什么?实用工具就有这两个...