Flink在流处理上的Source和sink操作、Flink--sink到kafka
一、Flink在流处理上常见的Source和sink操作
flink在流处理上的source和在批处理上的source基本一致。大致有4大类
1.基于本地集合的source(Collection-based-source)
2.基于文件的source(File-based-source)
3.基于网络套接字的source(Socket-based-source)
4.自定义Kafka的source(Custom-source)
1、基于集合的source
object DataSource001 {def main(args: Array[String]): Unit = {val senv = StreamExecutionEnvironment.getExecutionEnvironment//0.用element创建DataStream(fromElements)val ds0: DataStream[String] = senv.fromElements("spark", "flink")ds0.print()//1.用Tuple创建DataStream(fromElements)val ds1: DataStream[(Int, String)] = senv.fromElements((1, "spark"), (2, "flink"))ds1.print()//2.用Array创建DataStreamval ds2: DataStream[String] = senv.fromCollection(Array("spark", "flink"))ds2.print()//3.用ArrayBuffer创建DataStreamval ds3: DataStream[String] = senv.fromCollection(ArrayBuffer("spark", "flink"))ds3.print()//4.用List创建DataStreamval ds4: DataStream[String] = senv.fromCollection(List("spark", "flink"))ds4.print()//5.用List创建DataStreamval ds5: DataStream[String] = senv.fromCollection(ListBuffer("spark", "flink"))ds5.print()//6.用Vector创建DataStreamval ds6: DataStream[String] = senv.fromCollection(Vector("spark", "flink"))ds6.print()//7.用Queue创建DataStreamval ds7: DataStream[String] = senv.fromCollection(Queue("spark", "flink"))ds7.print()//8.用Stack创建DataStreamval ds8: DataStream[String] = senv.fromCollection(Stack("spark", "flink"))ds8.print()//9.用Stream创建DataStream(Stream相当于lazy List,避免在中间过程中生成不必要的集合)val ds9: DataStream[String] = senv.fromCollection(Stream("spark", "flink"))ds9.print()//10.用Seq创建DataStreamval ds10: DataStream[String] = senv.fromCollection(Seq("spark", "flink"))ds10.print()//11.用Set创建DataStream(不支持)//val ds11: DataStream[String] = senv.fromCollection(Set("spark", "flink"))//ds11.print()//12.用Iterable创建DataStream(不支持)//val ds12: DataStream[String] = senv.fromCollection(Iterable("spark", "flink"))//ds12.print()//13.用ArraySeq创建DataStreamval ds13: DataStream[String] = senv.fromCollection(mutable.ArraySeq("spark", "flink"))ds13.print()//14.用ArrayStack创建DataStreamval ds14: DataStream[String] = senv.fromCollection(mutable.ArrayStack("spark", "flink"))ds14.print()//15.用Map创建DataStream(不支持)//val ds15: DataStream[(Int, String)] = senv.fromCollection(Map(1 -> "spark", 2 -> "flink"))//ds15.print()//16.用Range创建DataStreamval ds16: DataStream[Int] = senv.fromCollection(Range(1, 9))ds16.print()//17.用fromElements创建DataStreamval ds17: DataStream[Long] = senv.generateSequence(1, 9)ds17.print()senv.execute(this.getClass.getName)}
}
2、基于文件的source(File-based-source)
//TODO 2.基于文件的source(File-based-source) //0.创建运行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment //TODO 1.读取本地文件 val text1 = env.readTextFile("data2.csv") text1.print() //TODO 2.读取hdfs文件 val text2 = env.readTextFile("hdfs://hadoop01:9000/input/flink/README.txt") text2.print() env.execute()
3、基于网络套接字的source(Socket-based-source)
val source = env.socketTextStream("IP", PORT)
4、自定义的source(Custom-source,以kafka为例)
Kafka基本命令:
● 查看当前服务器中的所有topic bin/kafka-topics.sh --list --zookeeper hadoop01:2181● 创建topic bin/kafka-topics.sh --create --zookeeper hadoop01:2181 --replication-factor 1 --partitions 1 --topic test● 删除topic sh bin/kafka-topics.sh --delete --zookeeper zk01:2181 --topic test 需要server.properties中设置delete.topic.enable=true否则只是标记删除或者直接重启。● 通过shell命令发送消息 sh bin/kafka-console-producer.sh --broker-list hadoop01:9092 --topic test● 通过shell消费消息 bin/kafka-console-consumer.sh --zookeeper hadoop01:2181 --from-beginning --topic test1● 查看消费位置 bin/kafka-run-cla.ss.sh kafka.tools.ConsumerOffsetChecker --zookeeper zk01:2181 --group testGroup● 查看某个Topic的详情 bin/kafka-topics.sh --topic test --describe --zookeeper zk01:2181● 对分区数进行修改 kafka-topics.sh --zookeeper zk01 --alter --partitions 15 --topic utopic
使用flink消费kafka的消息(不规范,其实需要自己手动维护offset):
object DataSource_kafka {def main(args: Array[String]): Unit = {//1指定kafka数据流的相关信息val zkCluster = "hadoop01,hadoop02,hadoop03:2181"val kafkaCluster = "hadoop01:9092,hadoop02:9092,hadoop03:9092"val kafkaTopicName = "test"//2.创建流处理环境val env = StreamExecutionEnvironment.getExecutionEnvironment//3.创建kafka数据流val properties = new Properties()properties.setProperty("bootstrap.servers", kafkaCluster)properties.setProperty("zookeeper.connect", zkCluster)properties.setProperty("group.id", kafkaTopicName)val kafka09 = new FlinkKafkaConsumer09[String](kafkaTopicName,new SimpleStringSchema(), properties)//4.添加数据源addSource(kafka09)val text = env.addSource(kafka09).setParallelism(4)/*** test#CS#request http://b2c.csair.com/B2C40/query/jaxb/direct/query.ao?t=S&c1=HLN&c2=CTU&d1=2018-07-12&at=2&ct=2&inf=1#CS#POST#CS#application/x-www-form-urlencoded#CS#t=S&json={'adultnum':'1','arrcity':'NAY','childnum':'0','depcity':'KHH','flightdate':'2018-07-12','infantnum':'2'}#CS#http://b2c.csair.com/B2C40/modules/bookingnew/main/flightSelectDirect.html?t=R&c1=LZJ&c2=MZG&d1=2018-07-12&at=1&ct=2&inf=2#CS#123.235.193.25#CS#Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.1 (KHTML, like Gecko) Chrome/21.0.1180.89 Safari/537.1#CS#2018-01-19T10:45:13:578+08:00#CS#106.86.65.18#CS#cookie* */val values: DataStream[ProcessedData] = text.map{line =>var encrypted = lineval values = encrypted.split("#CS#")val valuesLength = values.lengthvar regionalRequest = if(valuesLength > 1) values(1) else ""val requestMethod = if (valuesLength > 2) values(2) else ""val contentType = if (valuesLength > 3) values(3) else ""//Post提交的数据体val requestBody = if (valuesLength > 4) values(4) else ""//http_referrerval httpReferrer = if (valuesLength > 5) values(5) else ""//客户端IPval remoteAddr = if (valuesLength > 6) values(6) else ""//客户端UAval httpUserAgent = if (valuesLength > 7) values(7) else ""//服务器时间的ISO8610格式val timeIso8601 = if (valuesLength > 8) values(8) else ""//服务器地址val serverAddr = if (valuesLength > 9) values(9) else ""//获取原始信息中的cookie字符串val cookiesStr = if (valuesLength > 10) values(10) else ""ProcessedData(regionalRequest,requestMethod,contentType,requestBody,httpReferrer,remoteAddr,httpUserAgent,timeIso8601,serverAddr,cookiesStr)}values.print()val remoteAddr: DataStream[String] = values.map(line => line.remoteAddr)remoteAddr.print()//5.触发运算env.execute("flink-kafka-wordcunt")}
}//保存结构化数据
case class ProcessedData(regionalRequest: String,requestMethod: String,contentType: String,requestBody: String,httpReferrer: String,remoteAddr: String,httpUserAgent: String,timeIso8601: String,serverAddr: String,cookiesStr: String)
二、Flink--sink到kafka
package com.flink.DataStreamobject DataSource_kafka {def main(args: Array[String]): Unit = {//1指定kafka数据流的相关信息val zkCluster = "hadoop01,hadoop02,hadoop03:2181"val kafkaCluster = "hadoop01:9092,hadoop02:9092,hadoop03:9092"val kafkaTopicName = "test"val sinkKafka = "test2"//2.创建流处理环境val env = StreamExecutionEnvironment.getExecutionEnvironment//3.创建kafka数据流val properties = new Properties()properties.setProperty("bootstrap.servers", kafkaCluster)properties.setProperty("zookeeper.connect", zkCluster)properties.setProperty("group.id", kafkaTopicName)val kafka09 = new FlinkKafkaConsumer09[String](kafkaTopicName, new SimpleStringSchema(), properties)//4.添加数据源addSource(kafka09)val text = env.addSource(kafka09).setParallelism(4)/*** test#CS#request http://b2c.csair.com/B2C40/query/jaxb/direct/query.ao?t=S&c1=HLN&c2=CTU&d1=2018-07-12&at=2&ct=2&inf=1#CS#POST#CS#application/x-www-form-urlencoded#CS#t=S&json={'adultnum':'1','arrcity':'NAY','childnum':'0','depcity':'KHH','flightdate':'2018-07-12','infantnum':'2'}#CS#http://b2c.csair.com/B2C40/modules/bookingnew/main/flightSelectDirect.html?t=R&c1=LZJ&c2=MZG&d1=2018-07-12&at=1&ct=2&inf=2#CS#123.235.193.25#CS#Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.1 (KHTML, like Gecko) Chrome/21.0.1180.89 Safari/537.1#CS#2018-01-19T10:45:13:578+08:00#CS#106.86.65.18#CS#cookie* */val values: DataStream[ProcessedData] = text.map{line =>var encrypted = lineval values = encrypted.split("#CS#")val valuesLength = values.lengthvar regionalRequest = if(valuesLength > 1) values(1) else ""val requestMethod = if (valuesLength > 2) values(2) else ""val contentType = if (valuesLength > 3) values(3) else ""//Post提交的数据体val requestBody = if (valuesLength > 4) values(4) else ""//http_referrerval httpReferrer = if (valuesLength > 5) values(5) else ""//客户端IPval remoteAddr = if (valuesLength > 6) values(6) else ""//客户端UAval httpUserAgent = if (valuesLength > 7) values(7) else ""//服务器时间的ISO8610格式val timeIso8601 = if (valuesLength > 8) values(8) else ""//服务器地址val serverAddr = if (valuesLength > 9) values(9) else ""//获取原始信息中的cookie字符串val cookiesStr = if (valuesLength > 10) values(10) else ""ProcessedData(regionalRequest,requestMethod,contentType,requestBody,httpReferrer,remoteAddr,httpUserAgent,timeIso8601,serverAddr,cookiesStr)}values.print()val remoteAddr: DataStream[String] = values.map(line => line.remoteAddr)remoteAddr.print()//TODO sink到kafkaval p: Properties = new Propertiesp.setProperty("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092")p.setProperty("key.serializer", classOf[ByteArraySerializer].getName)p.setProperty("value.serializer", classOf[ByteArraySerializer].getName)val sink = new FlinkKafkaProducer09[String](sinkKafka, new SimpleStringSchema(), properties)remoteAddr.addSink(sink)//5.触发运算env.execute("flink-kafka-wordcunt")}
}
//保存结构化数据
case class ProcessedData(regionalRequest: String,requestMethod: String,contentType: String,requestBody: String,httpReferrer: String,remoteAddr: String,httpUserAgent: String,timeIso8601: String,serverAddr: String,cookiesStr: String)
Flink在流处理上的Source和sink操作、Flink--sink到kafka相关推荐
- Flink教程(06)- Flink批流一体API(Source示例)
文章目录 01 引言 02 Source 2.1 基于集合的Source 2.2 基于文件的Source 2.3 基于Socket的Source 2.4 自定义Source 2.4.1 案例 - 随机 ...
- Flink教程(10)- Flink批流一体API(其它)
文章目录 01 引言 02 累加器 2.1 相关API 2.2 示例代码 03 广播变量 3.1 原理 3.2 示例代码 04 分布式缓存 4.1 原理 4.2 示例代码 05 文末 01 引言 在前 ...
- Flink教程(09)- Flink批流一体API(Connectors示例)
文章目录 01 引言 02 Connectors 2.1 Flink目前支持的Connectors 2.2 JDBC案例 2.3 Kafa案例 2.3.1 Kafa相关命令 2.3.2 Kafka C ...
- Flink教程(07)- Flink批流一体API(Transformation示例)
文章目录 01 引言 02 Transformation 2.1 基本操作 2.1.1 API 解析 2.1.2 示例代码 2.2 合并 2.2.1 union 2.2.2 connect 2.2.3 ...
- 《基于Apache Flink的流处理》读书笔记
第1章 状态化流处理概述 传统数据处理 绝大多数企业所实现的传统架构都会将数据处理分为两类: 事务型处理 分析型处理 事务型处理 企业在日常业务运营过程中会用到各类应用,例如:客户管理管理软件.基于W ...
- 技术13期:一文读懂Flink的流式处理及窗口理解
Apache Flink是一个框架和分布式大数据处理引擎,可对有界数据流和无界数据流进行有状态计算.可部署在各种集群环境,对各种大小的数据规模进行快速计算. Flink基本概念 流处理:特点是无限.实 ...
- 2021年大数据Flink(十一):流批一体API Source
目录 Source 预定义Source 基于集合的Source 基于文件的Source 基于Socket的Source 自定义Source 随机生成数据 MySQL Sou ...
- 大数据架构如何做到流批一体?【对于Flink等流批一体的概念做了很好的澄清!】
导读:大数据与现有的科技手段结合,对大多数产业而言都能产生巨大的经济及社会价值.这也是当下许多企业,在大数据上深耕的原因.大数据分析场景需要解决哪些技术挑战?目前,有哪些主流大数据架构模式及其发展?今 ...
- Iceberg 在基于 Flink 的流式数据入库场景中的应用
本文以流式数据入库的场景为基础,介绍引入 Iceberg 作为落地格式和嵌入 Flink sink 的收益,并分析了当前可实现的框架及要点. 应用场景 流式数据入库,是大数据和数据湖的典型应用场景.上 ...
- Apache Flink,流计算?不仅仅是流计算!
阿里妹导读:2018年12月下旬,由阿里巴巴集团主办的Flink Forward China在北京国家会议中心举行.Flink Forward是由Apache软件基金会授权的全球范围内的Flink技术 ...
最新文章
- 百度地图android室内定位,百度地图4.0正式发布,主打免费语音导航、室内定位、实时公交和生活搜索 (视频)...
- java中function_Java 8:java.util.function中的TriFunction(和亲属)在哪里?还是有什么选择?...
- codewars048: Triple Double
- 跨域post请求实现方案小结--转
- 如何使用VisualVM监视服务器上的多个JVM
- Delphi中动态调用DLL的方法
- 使用git在本地电脑与远程GitHub/gitlub库中clone别人的github上的远程仓库代码,找资源
- 收不到oracle邮件,dovecot不能够收邮件
- 5G:无人驾驶的“超级英雄”路
- 优化数据库的思想及SQL语句优化的原则
- ubuntu 18.04 使用 nvm 安装 nodejs
- wlanconnect无法连接wifi_苹果iphone12无法连接wifi怎么回事 解决方法分享
- 商城小程序项目完整源码(微信小程序)
- STM32使用MCUISP下载程序教程
- TeamTalk源码分析之win-client
- Repeater的 Items属性、Items里面的控件有几个?
- 以太坊用户体验的痛点
- 阿里云互动课堂解决方案助力淘宝教育,打造普惠教育平台
- QTimer定时器问题分析
- c++面试经验(可下载文档)
热门文章
- 电池SOC估计-EKF UKF
- 分享美容护肤门店预约下单小程序开发制作功能介绍
- HTML图片鼠标滑动加边框,鼠标移动到图片上时,用css怎么实现图片加边框效果?...
- 手写Promise 封装Promise resolve reject then catch Promise.resolve Promise.reject
- 由内而外全面进化,影像娱乐都出彩,vivo S12 Pro上手
- PDF带目录导出java_itextpdf为pdf文件添加目录(可跳转)
- HTML对字体的所有操作详解(经典)
- 使用SINet进行伪装目标检测
- 网易云 linux 网络,网易云音乐正式登陆Linux平台
- 《GAMES104-现代游戏引擎:从入门到实践》-05 学习笔记