Flink实操 : Sink操作
.
- 一 .前言
- 二 .类型
- 2.1. 基于本地集合的sink
- 2.2. 基于文件的sink
- 2.2.1.将数据写入本地文件
- 2.2.2.将数据写入HDFS
- 2.3. Kafka Sink
- 2.4. MySQL Sink
一 .前言
二 .类型
2.1. 基于本地集合的sink
目标:
基于下列数据,分别 进行打印输出,error输出,collect()
(19, "zhangsan", 178.8),
(17, "lisi", 168.8),
(18, "wangwu", 184.8),
(21, "zhaoliu", 164.8)
代码:
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentobject SinkCollection {def main(args: Array[String]): Unit = {// 1. 创建流处理环境val env = StreamExecutionEnvironment.getExecutionEnvironment// 2.用fromCollection创建DataStream(fromCollection)val data = env.fromCollection(List((19, "zhangsan", 178.8), (17, "lisi", 168.8), (18, "wangwu", 184.8), (21, "zhaoliu", 164.8) ))// 3.处理数据// 4.打印输出data.print()data.printToErr()// data 的数据为批处理的时候可以使用collect// print(data.collect())// 5.执行任务env.execute()}}
2.2. 基于文件的sink
flink支持多种存储设备上的文件,包括本地文件,hdfs文件等。
flink支持多种文件的存储格式,包括text文件,CSV文件等。
writeAsText():TextOuputFormat - 将元素作为字符串写入行。字符串是通过调用每个元素的toString()方法获得的。
2.2.1.将数据写入本地文件
目标:
基于下列数据,写入到文件中
List((1,"flink"),(2,"sink"))
代码:
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentobject LocalFileSink {def main(args: Array[String]): Unit = {// 1. 创建流处理环境val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)// 2.用fromCollection创建DataStream(fromCollection)val data = env.fromCollection(List((1,"flink"),(2,"sink")))// 3.处理数据// 4.打印输出data.writeAsText("/opt/a/tmp/FileSink.txt",WriteMode.OVERWRITE)// 5.执行任务env.execute()}
}
2.2.2.将数据写入HDFS
写入数据到HDFS中
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentobject HdfsFileSink {def main(args: Array[String]): Unit = {// 1. 创建流处理环境val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)// 2.用fromCollection创建DataStream(fromCollection)val data = env.fromCollection(List((1,"flink"),(2,"sink")))// 3.处理数据// 4.打印输出data.writeAsText("hdfs://h23:8020/tmp/test/tmp/FileSink.txt",WriteMode.OVERWRITE)// 5.执行任务env.execute()}
}
2.3. Kafka Sink
kafka-console-consumer.sh --from-beginning --topic test2 --zookeeper node01:2181,node02:2181,node03:2181
示例
将数据落地到Kafka中
cd /usr/hdp/current/kafka-broker/bin查看topic列表 : ./kafka-topics.sh --zookeeper hxx:2181 list创建topic :./kafka-topics.sh --zookeeper hxx:2181 --create --topic test --partitions 3 --replication-factor 1生产topic数据
./kafka-console-producer.sh --broker-list 192.168.xx.xx:9092 --topic test读取topic数据
./kafka-console-consumer.sh --bootstrap-server 192.168.xx.xx:9092 --topic test --from-beginning
import java.util.Propertiesimport org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaProducer}object KafkaSink {def main(args: Array[String]): Unit = {// 1. 创建流处理环境val env = StreamExecutionEnvironment.getExecutionEnvironment// 2.用fromCollection创建DataStream(fromCollection)val data = env.fromCollection(List("flink","Spark"))// 3.构造Kafka Sinkval pro: Properties = new Propertiespro.setProperty("bootstrap.servers", " 192.168.xx.xx:9092")val kafkaSink = new FlinkKafkaProducer[String]("test",new SimpleStringSchema(),pro)// 4.打印输出data.addSink(kafkaSink)// 5.执行任务env.execute()}
}
2.4. MySQL Sink
示例
加载下列本地集合,导入MySql中
List((10, "dazhuang", "123456", "大壮"),(11, "erya", "123456", "二丫"),(12, "sanpang", "123456", "三胖"))
代码
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}object MySqlSink {def main(args: Array[String]): Unit = {// 1. 创建流处理环境val env = StreamExecutionEnvironment.getExecutionEnvironment// 2.用fromCollection创建DataStream(fromCollection)val data : DataStream[(Int, String, String, String)] = env.fromCollection(List((10, "dazhuang", "123456", "大壮"),(11, "erya", "123456", "二丫"),(12, "sanpang", "123456", "三胖")))// 3.设置sinkdata.addSink(new MySqlCustomSink)// 4.执行任务env.execute()}
}class MySqlCustomSink extends RichSinkFunction [(Int,String,String,String)] {private var connection: Connection = nullprivate var ps: PreparedStatement = nulloverride def open(parameters: Configuration): Unit = {//1:加载驱动Class.forName("com.mysql.jdbc.Driver")//2:创建连接connection = DriverManager.getConnection("jdbc:mysql://127.0.0.1:3306/tmp?useUnicode=true&characterEncoding=UTF-8&useSSL=false", "root", "root")//3:获得执行语句val sql = "insert into user(id , username , password , name) values(?,?,?,?);"ps = connection.prepareStatement(sql)}override def invoke(value: (Int, String, String, String), context: SinkFunction.Context): Unit = {try {//4.组装数据,执行插入操作ps.setInt(1, value._1)ps.setString(2, value._2)ps.setString(3, value._3)ps.setString(4, value._4)ps.executeUpdate()} catch {case e: Exception => println(e.getMessage)}}//关闭连接操作override def close(): Unit = {if (connection != null) {connection.close()}if (ps != null) {ps.close()}}}
github 地址: https://github.com/BoYiZhang/flink-demo
Flink实操 : Sink操作相关推荐
- Flink实操 : 算子操作
. 一 .前言 二 .算子操作 2.1. map 2.2. flatMap 2.3. mapPartition 2.4. filter 2.5. reduce/groupBy 2.6. reduceG ...
- Flink实操 : DataSource操作
. 一 .前言 二 .四种读取类型 2.1. 基于本地集合的source(Collection-based-source) 2.2. 基于文件的source(File-based-source) 2. ...
- jQuery入门实操-css操作,鼠标点击事件,页面计算器
前言 本文是学习jQuery中的一些实践,是jQuery入门的实操案例.更多语法可参考w3school的jQuery参考手册 jQuery是一个快速.简洁的JavaScript框架,是继Prototy ...
- Flink实操 : 广播变量/累加器/分布式缓存
. 一 .前言 二 .广播变量使用 2.1.前言 2.2. 使用 三 .累加器 3.1. 前言 3.2. 使用 四 .分布式缓存 4.1. 前言 4.2.使用 一 .前言 二 .广播变量使用 2.1. ...
- Flink实操 : 状态管理
. 一 .概念 1.1. 什么是有状态的计算? 1.2. 传统的流计算系统缺少对于程序状态的有效支持 1.3. Flink丰富的状态访问和高效的容错机制 二 .Keyed State 2.1.保存st ...
- 完整代码+实操!手把手教你操作Faster R-CNN和Mask R-CNN
点击上方↑↑↑蓝字关注我们~ 「2019 Python开发者日」全日程揭晓,请扫码咨询 ↑↑↑ 机器视觉领域的核心问题之一就是目标检测(Object Detection),它的任务是找出图像当中所有感 ...
- rtk采点后如何导入cad_【干货】RTK实操视频:工程之星5.0操作攻略!(第五部分)...
前期回顾:[干货]RTK实操视频:工程之星5.0操作攻略!(第一部分)[干货]RTK实操视频:工程之星5.0操作教程(第二部分) [干货]RTK实操视频:工程之星5.0操作攻略!(第三部分) [干货] ...
- 从实操教学到赛题演练,腾讯专家亲授TI-ONE平台操作攻略!
5月10日,我们迎来了"视"界直播周的首场直播--"2021腾讯广告算法大赛赛题解析".直播现场,芦清林和熊江丰老师对本届赛事的两大赛题进行了深入浅出的解析 ...
- 2021年R1快开门式压力容器操作考试题及R1快开门式压力容器操作实操考试视频
题库来源:安全生产模拟考试一点通公众号小程序 安全生产模拟考试一点通:R1快开门式压力容器操作考试题是安全生产模拟考试一点通生成的,R1快开门式压力容器操作证模拟考试题库是根据R1快开门式压力容器操作 ...
最新文章
- 中国程序员开发的远程桌面火了!Mac 可用,只有 9MB,支持自建中继器
- python输入列表方法_Python用input输入列表的方法
- Kconfig文件结构(图文)简介
- Open Infrastructure开启开放协作新时代
- ajax获得excel文件流在前端打开_主流前端技术讲解,面试必考!
- java8--IO(java疯狂讲义3复习笔记)
- 845B - Luba And The Ticket
- Java中的深拷贝(深复制)和浅拷贝(浅复制)
- 利用syslinux制作Dos、WinPE、Slax Linux集成u盘
- jQuery基础--选择器
- 开两个服务内存溢出_详解JVM内存区域
- linux vi 字符串替换
- c语言一本书的页码从自然数1开始顺序编码,算法设计与分析 1-1 统计数字问题(C语言版)...
- Ubuntu20.04 系统搭建 NetBox(开源 IPAM/DC 管理工具)
- 解决conda在Downloading and Extracting Packages时下载速度慢的问题
- to be ruled from Windows Space about C++
- 跨国项目要注意时区问题
- JavaScript实现在线websocket WSS测试工具 -toolfk程序员工具网
- 聊天的一点笔记--顺便浅谈技术服务公司的激励制度设计
- ipad出现support.apple.com