9.FLINK Sink\API\自定义sink
9.Sink
9.1.API
9.2.注意
9.3.自定义sink
9.Sink
9.1.API
1、ds.print 直接输出到控制台
2、ds.printToErr()直接输出到控制台,用红色
3、ds.writeAsText(“本地/HDFS的path”,WriteMode.OVERWRITE).setParallelism(1);
9.2.注意
在输出到path的时候,可以在前面设置并行度,如果
并行度>1,则path为目录
并行度=1,则path为文件名
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** @author tuzuoquan* @date 2022/4/19 22:46*/
public class SinkDemo01 {public static void main(String[] args) throws Exception {//TODO 0.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//TODO 1.sourceDataStream<String> ds = env.readTextFile("D:/tmp/stderr.log");//TODO 2.transformation//TODO 3.sinkds.print();ds.print("输出标识");//会在控制台上以红色输出ds.printToErr();//会在控制台上以红色输出ds.printToErr("输出标识");ds.writeAsText("D:/tmp/output/result1").setParallelism(1);ds.writeAsText("D:/tmp/output/result1").setParallelism(2);//TODO 4.executeenv.execute();}}
输出结果:
8> bbb ccc eee ttt wwww qqqq aaaa dddd xxxx
输出标识:8> bbb ccc eee ttt wwww qqqq aaaa dddd xxxx
3> eee ggg hhvvv vvvvddd dddd dddd ssss xxxx
输出标识:3> eee ggg hhvvv vvvvddd dddd dddd ssss xxxx
1> xxxxx cccc vvv bbbb nnnn mmmm mmmmm uuuu
输出标识:1> xxxxx cccc vvv bbbb nnnn mmmm mmmmm uuuu
6> sss rrrr vvv ffff hhh iii ooo ppp lll yyy
输出标识:6> sss rrrr vvv ffff hhh iii ooo ppp lll yyy
5> sss bbb ccc ddd eee fff ddd xxx kkk nnn
输出标识:5> sss bbb ccc ddd eee fff ddd xxx kkk nnn
8> bbb ccc eee ttt wwww qqqq aaaa dddd xxxx
输出标识:8> bbb ccc eee ttt wwww qqqq aaaa dddd xxxx
3> eee ggg hhvvv vvvvddd dddd dddd ssss xxxx
输出标识:3> eee ggg hhvvv vvvvddd dddd dddd ssss xxxx
1> xxxxx cccc vvv bbbb nnnn mmmm mmmmm uuuu
输出标识:1> xxxxx cccc vvv bbbb nnnn mmmm mmmmm uuuu
6> sss rrrr vvv ffff hhh iii ooo ppp lll yyy
输出标识:6> sss rrrr vvv ffff hhh iii ooo ppp lll yyy
5> sss bbb ccc ddd eee fff ddd xxx kkk nnn
输出标识:5> sss bbb ccc ddd eee fff ddd xxx kkk nnn另外,在D:/tmp/output/result1和D:/tmp/output/result2中有数据。
9.3.自定义sink
需求:将FLink集合中的数据通过自定义Sink保存到MySQL
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;/*** @author tuzuoquan* @date 2022/4/21 21:52*/
public class SinkDemo02 {public static void main(String[] args) throws Exception {//TODO 1.sourceStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//TODO 1.sourceDataStream<Student> studentDS = env.fromElements(new Student(null, "tony", 18));//TODO 2.transformation//TODO 3.sinkstudentDS.addSink(new MySQLSink());//TODO 4.executeenv.execute();}@Data@NoArgsConstructor@AllArgsConstructorpublic static class Student {private Integer id;private String name;private Integer age;}public static class MySQLSink extends RichSinkFunction<Student> {private Connection conn = null;private PreparedStatement ps = null;@Overridepublic void open(Configuration parameters) throws Exception {conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata", "root", "root");String sql = "INSERT INTO `t_student` (`id`, `name`, `age`) VALUES (null, ?, ?);";ps = conn.prepareStatement(sql);}@Overridepublic void invoke(Student value, Context context) throws Exception {//设置?占位符参数值ps.setString(1, value.getName());ps.setInt(2, value.getAge());//执行sqlps.executeUpdate();}@Overridepublic void close() throws Exception {if (conn != null) { conn.close(); }if (ps != null) { ps.close(); }}}}
9.FLINK Sink\API\自定义sink相关推荐
- Flink:Sink、自定义Sink
Sink有下沉的意思,在Flink中所谓的Sink其实可以表示为将数据存储起来的意思,也可以将范围扩大,表示将处理完的数据发送到指定的存储系统的输出操作. 之前我们一直在使用的print方法其实就是一 ...
- Apache Flink 零基础入门(十七)Flink 自定义Sink
需求:socket发送过来的数据,把String类型转成对象,然后把Java对象保存到Mysql数据库中. 创建数据库和表 create database imooc_flink; create ta ...
- Flink之流处理API之Sink
Sink Flink没有类似于spark中foreach方法,让用户进行迭代的操作.虽有对外的输出操作都要利用Sink完成.最后通过类似如下方式完成整个任务最终输出操作. myDstream.addS ...
- Flink SQL 自定义 Sink
1. 背景 2. 步骤 3.自定义 sink 代码 4. 使用 Redis Sink 5.详细解释 6.原理 7.参考 1.背景 内部要做 Flink SQL 平台,本文以自定义 Redis Sink ...
- (十八)Flink Table API SQL 编程指南 Table API 和Datastream API 集成
文章目录 DataStream 和 Table 之间的转换 依赖项和导入 配置 执行行为 datastream API table API 批处理运行时模式 Changelog统一 处理(仅插入)流 ...
- Flink DataStream API(基础版)
概述 DataStream(数据流)本身是 Flink 中一个用来表示数据集合的类(Class),我们编写的 Flink 代码其实就是基于这种数据类型的处理,所以这套核心API 就以DataStr ...
- 1.10.Flink DataStreamAPI(API的抽象级别、Data Sources、connectors、Source容错性保证、Sink容错性保证、自定义sink、partition等)
1.10.Flink DataStreamAPI 1.10.1.Flink API的抽象级别 1.10.2.DatSource部分详解 1.10.2.1.DataStream API之Data Sou ...
- flink API之Sink入门
kafka sink 添加依赖 <dependency><groupId>org.apache.flink</groupId><artifactId>f ...
- flink学习(6)之sink
在flink的官方文档中看到,无论是source还是sink都称之为flink的Connectors 点击overview然后就可以看到它所有的cnnectors 从上边的图片中我们发现,这些组件不是 ...
最新文章
- LeetCode简单题之数组中两元素的最大乘积
- ES6新特性3:函数的扩展
- 计算机电缆djyvp工艺,计算机电缆dJyvP相关办法.pdf
- 最大值(3.3)(java)
- 详解C中volatile关键字
- Intel Realsense D435 python 如何获取(打印)所有摄像头序列号信息?
- HTML5学习笔记简明版(8):新增的全局属性
- 平面设计师必读的十本书
- React Natvie Fetch工具类
- Linux蓝牙耳机软件,Linux安装驱动并使用Blueman连接蓝牙耳机的详细介绍(图文)...
- Eclipse学习笔记
- EXCEL单元格式(亿元,万元)
- 电脑开机出现警报音后提示要按F1才能进入的分析处理
- Linux安装杀毒软件clamav
- LDAP简介及Java、客户端连接
- 2022-2028全球及中国云端税务软件行业研究及十四五规划分析报告
- **统计出现字数最多的字符**
- 使用GHOST对Windows操作系统进行备份和还原
- SpringAOP中@within和@annotation的区别
- PTA 1056 Mice and Rice (25分)
热门文章
- 网络存储技术Windows server 2012(项目三 存储池的配置与管理)
- 阿里云 实人认证(详细)RPMin
- java 中报错 ~[classes/:na]
- 常见的计算机专业的复合命题例子,第五章、复合命题.ppt
- 如何基于ADAS Logger和CANape搭建一个自动驾驶路试采集系统
- 综合评价的基本理论和数据预处理
- html发展时间轴纵向插件,jquery响应式垂直时间轴插件vertical-timeline
- QPushButton禁用状态文字变形变粗
- 关于Office365邮箱附件大小限制问题
- DC report_timing 报告分析(STA)