9.Sink
9.1.API
9.2.注意
9.3.自定义sink

9.Sink

9.1.API

1、ds.print 直接输出到控制台
2、ds.printToErr()直接输出到控制台,用红色
3、ds.writeAsText(“本地/HDFS的path”,WriteMode.OVERWRITE).setParallelism(1);

9.2.注意

在输出到path的时候,可以在前面设置并行度,如果
并行度>1,则path为目录
并行度=1,则path为文件名

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** @author tuzuoquan* @date 2022/4/19 22:46*/
public class SinkDemo01 {public static void main(String[] args) throws Exception {//TODO 0.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//TODO 1.sourceDataStream<String> ds = env.readTextFile("D:/tmp/stderr.log");//TODO 2.transformation//TODO 3.sinkds.print();ds.print("输出标识");//会在控制台上以红色输出ds.printToErr();//会在控制台上以红色输出ds.printToErr("输出标识");ds.writeAsText("D:/tmp/output/result1").setParallelism(1);ds.writeAsText("D:/tmp/output/result1").setParallelism(2);//TODO 4.executeenv.execute();}}
输出结果:
8> bbb ccc eee ttt wwww qqqq aaaa dddd xxxx
输出标识:8> bbb ccc eee ttt wwww qqqq aaaa dddd xxxx
3> eee ggg hhvvv vvvvddd dddd dddd ssss xxxx
输出标识:3> eee ggg hhvvv vvvvddd dddd dddd ssss xxxx
1> xxxxx cccc vvv bbbb nnnn mmmm mmmmm uuuu
输出标识:1> xxxxx cccc vvv bbbb nnnn mmmm mmmmm uuuu
6> sss rrrr vvv ffff hhh iii ooo ppp lll yyy
输出标识:6> sss rrrr vvv ffff hhh iii ooo ppp lll yyy
5> sss  bbb ccc ddd eee fff ddd xxx kkk nnn
输出标识:5> sss  bbb ccc ddd eee fff ddd xxx kkk nnn
8> bbb ccc eee ttt wwww qqqq aaaa dddd xxxx
输出标识:8> bbb ccc eee ttt wwww qqqq aaaa dddd xxxx
3> eee ggg hhvvv vvvvddd dddd dddd ssss xxxx
输出标识:3> eee ggg hhvvv vvvvddd dddd dddd ssss xxxx
1> xxxxx cccc vvv bbbb nnnn mmmm mmmmm uuuu
输出标识:1> xxxxx cccc vvv bbbb nnnn mmmm mmmmm uuuu
6> sss rrrr vvv ffff hhh iii ooo ppp lll yyy
输出标识:6> sss rrrr vvv ffff hhh iii ooo ppp lll yyy
5> sss  bbb ccc ddd eee fff ddd xxx kkk nnn
输出标识:5> sss  bbb ccc ddd eee fff ddd xxx kkk nnn另外,在D:/tmp/output/result1和D:/tmp/output/result2中有数据。

9.3.自定义sink

需求:将FLink集合中的数据通过自定义Sink保存到MySQL

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;/*** @author tuzuoquan* @date 2022/4/21 21:52*/
public class SinkDemo02 {public static void main(String[] args) throws Exception {//TODO 1.sourceStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//TODO 1.sourceDataStream<Student> studentDS = env.fromElements(new Student(null, "tony", 18));//TODO 2.transformation//TODO 3.sinkstudentDS.addSink(new MySQLSink());//TODO 4.executeenv.execute();}@Data@NoArgsConstructor@AllArgsConstructorpublic static class Student {private Integer id;private String name;private Integer age;}public static class MySQLSink extends RichSinkFunction<Student> {private Connection conn = null;private PreparedStatement ps = null;@Overridepublic void open(Configuration parameters) throws Exception {conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata", "root", "root");String sql = "INSERT INTO `t_student` (`id`, `name`, `age`) VALUES (null, ?, ?);";ps = conn.prepareStatement(sql);}@Overridepublic void invoke(Student value, Context context) throws Exception {//设置?占位符参数值ps.setString(1, value.getName());ps.setInt(2, value.getAge());//执行sqlps.executeUpdate();}@Overridepublic void close() throws Exception {if (conn != null) { conn.close(); }if (ps != null) { ps.close(); }}}}

9.FLINK Sink\API\自定义sink相关推荐

  1. Flink:Sink、自定义Sink

    Sink有下沉的意思,在Flink中所谓的Sink其实可以表示为将数据存储起来的意思,也可以将范围扩大,表示将处理完的数据发送到指定的存储系统的输出操作. 之前我们一直在使用的print方法其实就是一 ...

  2. Apache Flink 零基础入门(十七)Flink 自定义Sink

    需求:socket发送过来的数据,把String类型转成对象,然后把Java对象保存到Mysql数据库中. 创建数据库和表 create database imooc_flink; create ta ...

  3. Flink之流处理API之Sink

    Sink Flink没有类似于spark中foreach方法,让用户进行迭代的操作.虽有对外的输出操作都要利用Sink完成.最后通过类似如下方式完成整个任务最终输出操作. myDstream.addS ...

  4. Flink SQL 自定义 Sink

    1. 背景 2. 步骤 3.自定义 sink 代码 4. 使用 Redis Sink 5.详细解释 6.原理 7.参考 1.背景 内部要做 Flink SQL 平台,本文以自定义 Redis Sink ...

  5. (十八)Flink Table API SQL 编程指南 Table API 和Datastream API 集成

    文章目录 DataStream 和 Table 之间的转换 依赖项和导入 配置 执行行为 datastream API table API 批处理运行时模式 Changelog统一 处理(仅插入)流 ...

  6. Flink DataStream API(基础版)

    概述   DataStream(数据流)本身是 Flink 中一个用来表示数据集合的类(Class),我们编写的 Flink 代码其实就是基于这种数据类型的处理,所以这套核心API 就以DataStr ...

  7. 1.10.Flink DataStreamAPI(API的抽象级别、Data Sources、connectors、Source容错性保证、Sink容错性保证、自定义sink、partition等)

    1.10.Flink DataStreamAPI 1.10.1.Flink API的抽象级别 1.10.2.DatSource部分详解 1.10.2.1.DataStream API之Data Sou ...

  8. flink API之Sink入门

    kafka sink 添加依赖 <dependency><groupId>org.apache.flink</groupId><artifactId>f ...

  9. flink学习(6)之sink

    在flink的官方文档中看到,无论是source还是sink都称之为flink的Connectors 点击overview然后就可以看到它所有的cnnectors 从上边的图片中我们发现,这些组件不是 ...

最新文章

  1. LeetCode简单题之数组中两元素的最大乘积
  2. ES6新特性3:函数的扩展
  3. 计算机电缆djyvp工艺,计算机电缆dJyvP相关办法.pdf
  4. 最大值(3.3)(java)
  5. 详解C中volatile关键字
  6. Intel Realsense D435 python 如何获取(打印)所有摄像头序列号信息?
  7. HTML5学习笔记简明版(8):新增的全局属性
  8. 平面设计师必读的十本书
  9. React Natvie Fetch工具类
  10. Linux蓝牙耳机软件,Linux安装驱动并使用Blueman连接蓝牙耳机的详细介绍(图文)...
  11. Eclipse学习笔记
  12. EXCEL单元格式(亿元,万元)
  13. 电脑开机出现警报音后提示要按F1才能进入的分析处理
  14. Linux安装杀毒软件clamav
  15. LDAP简介及Java、客户端连接
  16. 2022-2028全球及中国云端税务软件行业研究及十四五规划分析报告
  17. **统计出现字数最多的字符**
  18. 使用GHOST对Windows操作系统进行备份和还原
  19. SpringAOP中@within和@annotation的区别
  20. PTA 1056 Mice and Rice (25分)

热门文章

  1. 网络存储技术Windows server 2012(项目三 存储池的配置与管理)
  2. 阿里云 实人认证(详细)RPMin
  3. java 中报错 ~[classes/:na]
  4. 常见的计算机专业的复合命题例子,第五章、复合命题.ppt
  5. 如何基于ADAS Logger和CANape搭建一个自动驾驶路试采集系统
  6. 综合评价的基本理论和数据预处理
  7. html发展时间轴纵向插件,jquery响应式垂直时间轴插件vertical-timeline
  8. QPushButton禁用状态文字变形变粗
  9. 关于Office365邮箱附件大小限制问题
  10. DC report_timing 报告分析(STA)