一、Flink在流处理上常见的Source和sink操作

flink在流处理上的source和在批处理上的source基本一致。大致有4大类

1.基于本地集合的source(Collection-based-source)

2.基于文件的source(File-based-source)

3.基于网络套接字的source(Socket-based-source)

4.自定义Kafka的source(Custom-source)

1、基于集合的source

object DataSource001 {def main(args: Array[String]): Unit = {val senv = StreamExecutionEnvironment.getExecutionEnvironment//0.用element创建DataStream(fromElements)val ds0: DataStream[String] = senv.fromElements("spark", "flink")ds0.print()//1.用Tuple创建DataStream(fromElements)val ds1: DataStream[(Int, String)] = senv.fromElements((1, "spark"), (2, "flink"))ds1.print()//2.用Array创建DataStreamval ds2: DataStream[String] = senv.fromCollection(Array("spark", "flink"))ds2.print()//3.用ArrayBuffer创建DataStreamval ds3: DataStream[String] = senv.fromCollection(ArrayBuffer("spark", "flink"))ds3.print()//4.用List创建DataStreamval ds4: DataStream[String] = senv.fromCollection(List("spark", "flink"))ds4.print()//5.用List创建DataStreamval ds5: DataStream[String] = senv.fromCollection(ListBuffer("spark", "flink"))ds5.print()//6.用Vector创建DataStreamval ds6: DataStream[String] = senv.fromCollection(Vector("spark", "flink"))ds6.print()//7.用Queue创建DataStreamval ds7: DataStream[String] = senv.fromCollection(Queue("spark", "flink"))ds7.print()//8.用Stack创建DataStreamval ds8: DataStream[String] = senv.fromCollection(Stack("spark", "flink"))ds8.print()//9.用Stream创建DataStream(Stream相当于lazy List,避免在中间过程中生成不必要的集合)val ds9: DataStream[String] = senv.fromCollection(Stream("spark", "flink"))ds9.print()//10.用Seq创建DataStreamval ds10: DataStream[String] = senv.fromCollection(Seq("spark", "flink"))ds10.print()//11.用Set创建DataStream(不支持)//val ds11: DataStream[String] = senv.fromCollection(Set("spark", "flink"))//ds11.print()//12.用Iterable创建DataStream(不支持)//val ds12: DataStream[String] = senv.fromCollection(Iterable("spark", "flink"))//ds12.print()//13.用ArraySeq创建DataStreamval ds13: DataStream[String] = senv.fromCollection(mutable.ArraySeq("spark", "flink"))ds13.print()//14.用ArrayStack创建DataStreamval ds14: DataStream[String] = senv.fromCollection(mutable.ArrayStack("spark", "flink"))ds14.print()//15.用Map创建DataStream(不支持)//val ds15: DataStream[(Int, String)] = senv.fromCollection(Map(1 -> "spark", 2 -> "flink"))//ds15.print()//16.用Range创建DataStreamval ds16: DataStream[Int] = senv.fromCollection(Range(1, 9))ds16.print()//17.用fromElements创建DataStreamval ds17: DataStream[Long] = senv.generateSequence(1, 9)ds17.print()senv.execute(this.getClass.getName)}
}

2、基于文件的source(File-based-source)

//TODO 2.基于文件的source(File-based-source)
//0.创建运行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//TODO 1.读取本地文件
val text1 = env.readTextFile("data2.csv")
text1.print()
//TODO 2.读取hdfs文件
val text2 = env.readTextFile("hdfs://hadoop01:9000/input/flink/README.txt")
text2.print()
env.execute()

3、基于网络套接字的source(Socket-based-source)

val source = env.socketTextStream("IP", PORT)

4、自定义的source(Custom-source,以kafka为例)

Kafka基本命令:

 ● 查看当前服务器中的所有topic
bin/kafka-topics.sh --list --zookeeper  hadoop01:2181● 创建topic
bin/kafka-topics.sh --create --zookeeper hadoop01:2181 --replication-factor 1 --partitions 1 --topic test● 删除topic
sh bin/kafka-topics.sh --delete --zookeeper zk01:2181 --topic test
需要server.properties中设置delete.topic.enable=true否则只是标记删除或者直接重启。● 通过shell命令发送消息
sh bin/kafka-console-producer.sh --broker-list hadoop01:9092 --topic test● 通过shell消费消息
bin/kafka-console-consumer.sh --zookeeper hadoop01:2181 --from-beginning --topic test1● 查看消费位置
bin/kafka-run-cla.ss.sh kafka.tools.ConsumerOffsetChecker --zookeeper zk01:2181 --group testGroup● 查看某个Topic的详情
bin/kafka-topics.sh --topic test --describe --zookeeper zk01:2181● 对分区数进行修改
kafka-topics.sh --zookeeper  zk01 --alter --partitions 15 --topic   utopic

使用flink消费kafka的消息(不规范,其实需要自己手动维护offset):

object DataSource_kafka {def main(args: Array[String]): Unit = {//1指定kafka数据流的相关信息val zkCluster = "hadoop01,hadoop02,hadoop03:2181"val kafkaCluster = "hadoop01:9092,hadoop02:9092,hadoop03:9092"val kafkaTopicName = "test"//2.创建流处理环境val env = StreamExecutionEnvironment.getExecutionEnvironment//3.创建kafka数据流val properties = new Properties()properties.setProperty("bootstrap.servers", kafkaCluster)properties.setProperty("zookeeper.connect", zkCluster)properties.setProperty("group.id", kafkaTopicName)val kafka09 = new FlinkKafkaConsumer09[String](kafkaTopicName,new SimpleStringSchema(), properties)//4.添加数据源addSource(kafka09)val text = env.addSource(kafka09).setParallelism(4)/*** test#CS#request http://b2c.csair.com/B2C40/query/jaxb/direct/query.ao?t=S&c1=HLN&c2=CTU&d1=2018-07-12&at=2&ct=2&inf=1#CS#POST#CS#application/x-www-form-urlencoded#CS#t=S&json={'adultnum':'1','arrcity':'NAY','childnum':'0','depcity':'KHH','flightdate':'2018-07-12','infantnum':'2'}#CS#http://b2c.csair.com/B2C40/modules/bookingnew/main/flightSelectDirect.html?t=R&c1=LZJ&c2=MZG&d1=2018-07-12&at=1&ct=2&inf=2#CS#123.235.193.25#CS#Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.1 (KHTML, like Gecko) Chrome/21.0.1180.89 Safari/537.1#CS#2018-01-19T10:45:13:578+08:00#CS#106.86.65.18#CS#cookie* */val values: DataStream[ProcessedData] = text.map{line =>var encrypted = lineval values = encrypted.split("#CS#")val valuesLength = values.lengthvar regionalRequest =  if(valuesLength > 1) values(1) else ""val requestMethod = if (valuesLength > 2) values(2) else ""val contentType = if (valuesLength > 3) values(3) else ""//Post提交的数据体val requestBody = if (valuesLength > 4) values(4) else ""//http_referrerval httpReferrer = if (valuesLength > 5) values(5) else ""//客户端IPval remoteAddr = if (valuesLength > 6) values(6) else ""//客户端UAval httpUserAgent = if (valuesLength > 7) values(7) else ""//服务器时间的ISO8610格式val timeIso8601 = if (valuesLength > 8) values(8) else ""//服务器地址val serverAddr = if (valuesLength > 9) values(9) else ""//获取原始信息中的cookie字符串val cookiesStr = if (valuesLength > 10) values(10) else ""ProcessedData(regionalRequest,requestMethod,contentType,requestBody,httpReferrer,remoteAddr,httpUserAgent,timeIso8601,serverAddr,cookiesStr)}values.print()val remoteAddr: DataStream[String] = values.map(line => line.remoteAddr)remoteAddr.print()//5.触发运算env.execute("flink-kafka-wordcunt")}
}//保存结构化数据
case class ProcessedData(regionalRequest: String,requestMethod: String,contentType: String,requestBody: String,httpReferrer: String,remoteAddr: String,httpUserAgent: String,timeIso8601: String,serverAddr: String,cookiesStr: String)

二、Flink--sink到kafka

package com.flink.DataStreamobject DataSource_kafka {def main(args: Array[String]): Unit = {//1指定kafka数据流的相关信息val zkCluster = "hadoop01,hadoop02,hadoop03:2181"val kafkaCluster = "hadoop01:9092,hadoop02:9092,hadoop03:9092"val kafkaTopicName = "test"val sinkKafka = "test2"//2.创建流处理环境val env = StreamExecutionEnvironment.getExecutionEnvironment//3.创建kafka数据流val properties = new Properties()properties.setProperty("bootstrap.servers", kafkaCluster)properties.setProperty("zookeeper.connect", zkCluster)properties.setProperty("group.id", kafkaTopicName)val kafka09 = new FlinkKafkaConsumer09[String](kafkaTopicName, new SimpleStringSchema(), properties)//4.添加数据源addSource(kafka09)val text = env.addSource(kafka09).setParallelism(4)/*** test#CS#request http://b2c.csair.com/B2C40/query/jaxb/direct/query.ao?t=S&c1=HLN&c2=CTU&d1=2018-07-12&at=2&ct=2&inf=1#CS#POST#CS#application/x-www-form-urlencoded#CS#t=S&json={'adultnum':'1','arrcity':'NAY','childnum':'0','depcity':'KHH','flightdate':'2018-07-12','infantnum':'2'}#CS#http://b2c.csair.com/B2C40/modules/bookingnew/main/flightSelectDirect.html?t=R&c1=LZJ&c2=MZG&d1=2018-07-12&at=1&ct=2&inf=2#CS#123.235.193.25#CS#Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.1 (KHTML, like Gecko) Chrome/21.0.1180.89 Safari/537.1#CS#2018-01-19T10:45:13:578+08:00#CS#106.86.65.18#CS#cookie* */val values: DataStream[ProcessedData] = text.map{line =>var encrypted = lineval values = encrypted.split("#CS#")val valuesLength = values.lengthvar regionalRequest =  if(valuesLength > 1) values(1) else ""val requestMethod = if (valuesLength > 2) values(2) else ""val contentType = if (valuesLength > 3) values(3) else ""//Post提交的数据体val requestBody = if (valuesLength > 4) values(4) else ""//http_referrerval httpReferrer = if (valuesLength > 5) values(5) else ""//客户端IPval remoteAddr = if (valuesLength > 6) values(6) else ""//客户端UAval httpUserAgent = if (valuesLength > 7) values(7) else ""//服务器时间的ISO8610格式val timeIso8601 = if (valuesLength > 8) values(8) else ""//服务器地址val serverAddr = if (valuesLength > 9) values(9) else ""//获取原始信息中的cookie字符串val cookiesStr = if (valuesLength > 10) values(10) else ""ProcessedData(regionalRequest,requestMethod,contentType,requestBody,httpReferrer,remoteAddr,httpUserAgent,timeIso8601,serverAddr,cookiesStr)}values.print()val remoteAddr: DataStream[String] = values.map(line => line.remoteAddr)remoteAddr.print()//TODO sink到kafkaval p: Properties = new Propertiesp.setProperty("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092")p.setProperty("key.serializer", classOf[ByteArraySerializer].getName)p.setProperty("value.serializer", classOf[ByteArraySerializer].getName)val sink = new FlinkKafkaProducer09[String](sinkKafka, new SimpleStringSchema(), properties)remoteAddr.addSink(sink)//5.触发运算env.execute("flink-kafka-wordcunt")}
}
//保存结构化数据
case class ProcessedData(regionalRequest: String,requestMethod: String,contentType: String,requestBody: String,httpReferrer: String,remoteAddr: String,httpUserAgent: String,timeIso8601: String,serverAddr: String,cookiesStr: String)

Flink在流处理上的Source和sink操作、Flink--sink到kafka相关推荐

  1. Flink教程(06)- Flink批流一体API(Source示例)

    文章目录 01 引言 02 Source 2.1 基于集合的Source 2.2 基于文件的Source 2.3 基于Socket的Source 2.4 自定义Source 2.4.1 案例 - 随机 ...

  2. Flink教程(10)- Flink批流一体API(其它)

    文章目录 01 引言 02 累加器 2.1 相关API 2.2 示例代码 03 广播变量 3.1 原理 3.2 示例代码 04 分布式缓存 4.1 原理 4.2 示例代码 05 文末 01 引言 在前 ...

  3. Flink教程(09)- Flink批流一体API(Connectors示例)

    文章目录 01 引言 02 Connectors 2.1 Flink目前支持的Connectors 2.2 JDBC案例 2.3 Kafa案例 2.3.1 Kafa相关命令 2.3.2 Kafka C ...

  4. Flink教程(07)- Flink批流一体API(Transformation示例)

    文章目录 01 引言 02 Transformation 2.1 基本操作 2.1.1 API 解析 2.1.2 示例代码 2.2 合并 2.2.1 union 2.2.2 connect 2.2.3 ...

  5. 《基于Apache Flink的流处理》读书笔记

    第1章 状态化流处理概述 传统数据处理 绝大多数企业所实现的传统架构都会将数据处理分为两类: 事务型处理 分析型处理 事务型处理 企业在日常业务运营过程中会用到各类应用,例如:客户管理管理软件.基于W ...

  6. 技术13期:一文读懂Flink的流式处理及窗口理解

    Apache Flink是一个框架和分布式大数据处理引擎,可对有界数据流和无界数据流进行有状态计算.可部署在各种集群环境,对各种大小的数据规模进行快速计算. Flink基本概念 流处理:特点是无限.实 ...

  7. 2021年大数据Flink(十一):流批一体API Source

    目录 Source 预定义Source 基于集合的Source 基于文件的Source ​​​​​​​基于Socket的Source 自定义Source 随机生成数据 ​​​​​​​MySQL Sou ...

  8. 大数据架构如何做到流批一体?【对于Flink等流批一体的概念做了很好的澄清!】

    导读:大数据与现有的科技手段结合,对大多数产业而言都能产生巨大的经济及社会价值.这也是当下许多企业,在大数据上深耕的原因.大数据分析场景需要解决哪些技术挑战?目前,有哪些主流大数据架构模式及其发展?今 ...

  9. Iceberg 在基于 Flink 的流式数据入库场景中的应用

    本文以流式数据入库的场景为基础,介绍引入 Iceberg 作为落地格式和嵌入 Flink sink 的收益,并分析了当前可实现的框架及要点. 应用场景 流式数据入库,是大数据和数据湖的典型应用场景.上 ...

  10. Apache Flink,流计算?不仅仅是流计算!

    阿里妹导读:2018年12月下旬,由阿里巴巴集团主办的Flink Forward China在北京国家会议中心举行.Flink Forward是由Apache软件基金会授权的全球范围内的Flink技术 ...

最新文章

  1. 百度地图android室内定位,百度地图4.0正式发布,主打免费语音导航、室内定位、实时公交和生活搜索 (视频)...
  2. java中function_Java 8:java.util.function中的TriFunction(和亲属)在哪里?还是有什么选择?...
  3. codewars048: Triple Double
  4. 跨域post请求实现方案小结--转
  5. 如何使用VisualVM监视服务器上的多个JVM
  6. Delphi中动态调用DLL的方法
  7. 使用git在本地电脑与远程GitHub/gitlub库中clone别人的github上的远程仓库代码,找资源
  8. 收不到oracle邮件,dovecot不能够收邮件
  9. 5G:无人驾驶的“超级英雄”路
  10. 优化数据库的思想及SQL语句优化的原则
  11. ubuntu 18.04 使用 nvm 安装 nodejs
  12. wlanconnect无法连接wifi_苹果iphone12无法连接wifi怎么回事 解决方法分享
  13. 商城小程序项目完整源码(微信小程序)
  14. STM32使用MCUISP下载程序教程
  15. TeamTalk源码分析之win-client
  16. Repeater的 Items属性、Items里面的控件有几个?
  17. 以太坊用户体验的痛点
  18. 阿里云互动课堂解决方案助力淘宝教育,打造普惠教育平台
  19. QTimer定时器问题分析
  20. c++面试经验(可下载文档)

热门文章

  1. 电池SOC估计-EKF UKF
  2. 分享美容护肤门店预约下单小程序开发制作功能介绍
  3. HTML图片鼠标滑动加边框,鼠标移动到图片上时,用css怎么实现图片加边框效果?...
  4. 手写Promise 封装Promise resolve reject then catch Promise.resolve Promise.reject
  5. 由内而外全面进化,影像娱乐都出彩,vivo S12 Pro上手
  6. PDF带目录导出java_itextpdf为pdf文件添加目录(可跳转)
  7. HTML对字体的所有操作详解(经典)
  8. 使用SINet进行伪装目标检测
  9. 网易云 linux 网络,网易云音乐正式登陆Linux平台
  10. 《GAMES104-现代游戏引擎:从入门到实践》-05 学习笔记