Apache Flink 零基础入门(十七)Flink 自定义Sink
需求:socket发送过来的数据,把String类型转成对象,然后把Java对象保存到Mysql数据库中。
创建数据库和表
create database imooc_flink;
create table student(
id int(11) NOT NULL AUTO_INCREMENT,
name varchar(25),
age int(10),
primary key(id)
)
导入mysql依赖:
<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.15</version></dependency>
创建POJO Student
package com.vincent.course05;public class Student {private int id;private String name;private int age;@Overridepublic String toString() {return "Student{" +"id=" + id +", name='" + name + '\'' +", age=" + age +'}';}public int getId() {return id;}public void setId(int id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}public int getAge() {return age;}public void setAge(int age) {this.age = age;}
}
然后创建连接,SinkToMySQL.java
package com.vincent.course05;import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;public class SinkToMySQL extends RichSinkFunction<Student> {PreparedStatement ps;private Connection connection;/*** open() 方法中建立连接,这样不用每次 invoke 的时候都要建立连接和释放连接** @param parameters* @throws Exception*/@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);connection = getConnection();String sql = "insert into student(id, name, age) values(?, ?, ?);";ps = this.connection.prepareStatement(sql);}@Overridepublic void close() throws Exception {super.close();//关闭连接和释放资源if (connection != null) {connection.close();}if (ps != null) {ps.close();}}/*** 每条数据的插入都要调用一次 invoke() 方法** @param value* @param context* @throws Exception*/@Overridepublic void invoke(Student value, Context context) throws Exception {//组装数据,执行插入操作ps.setInt(1, value.getId());ps.setString(2, value.getName());ps.setInt(3, value.getAge());ps.executeUpdate();}private static Connection getConnection() {Connection con = null;try {Class.forName("com.mysql.cj.jdbc.Driver");con = DriverManager.getConnection("jdbc:mysql://192.168.152.45:3306/imooc_flink?useUnicode=true&characterEncoding=UTF-8", "root", "123456");} catch (Exception e) {e.printStackTrace();System.out.println("-----------mysql get connection has exception , msg = " + e.getMessage());}return con;}
}
main方法:
public static void main(String[] args) throws Exception {StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> source = environment.socketTextStream("192.168.152.45", 9999);SingleOutputStreamOperator<Student> studentStream = source.map(new MapFunction<String, Student>() {@Overridepublic Student map(String value) throws Exception {String[] splits = value.split(",");Student student = new Student();student.setId(Integer.parseInt(splits[0]));student.setName(splits[1]);student.setAge(Integer.parseInt(splits[2]));return student;}});studentStream.addSink(new SinkToMySQL());environment.execute("JavaCustomSinkToMysql");}
从socket中获取数据,数据格式使用逗号分割,在控制台中输入:
nc -lk 9999
1,tom,23
检查数据库,数据库中多了一条数据
mysql> select * from student;
+----+------+------+
| id | name | age |
+----+------+------+
| 1 | tom | 23 |
+----+------+------+
1 row in set (0.00 sec)
这样就很方便的使用自定义的sink,写入到MySQL中去。
总结:
第一步:继承RichSinkFunction<T> T就是想要写入的对象类型
第二步:重写方法 open/close生命周期方法,invoke每条记录执行一次
默认情况下open方法的并行度不是1,跟具体的电脑有关系。
Apache Flink 零基础入门(十七)Flink 自定义Sink相关推荐
- Apache Flink 零基础入门【转】
Apache Flink 零基础入门(一):基础概念解析 Apache Flink 零基础入门(二):DataStream API 编程 转载于:https://www.cnblogs.com/dav ...
- Apache Flink 零基础入门(十八)Flink Table APISQL
什么是Flink关系型API? 虽然Flink已经支持了DataSet和DataStream API,但是有没有一种更好的方式去编程,而不用关心具体的API实现?不需要去了解Java和Scala的具体 ...
- Apache Flink 零基础入门(三)编写最简单的helloWorld
实验环境 JDK 1.8 IDE Intellij idea Flink 1.8.1 实验内容 创建一个Flink简单Demo,可以从流数据中统计单词个数. 实验步骤 首先创建一个maven项目,其中 ...
- Apache Flink 零基础入门(十五)Flink DataStream编程(如何自定义DataSource)
数据源可以通过StreamExecutionEnvironment.addSource(sourceFunction)方式来创建,Flink也提供了一些内置的数据源方便使用,例如readTextFil ...
- Apache Flink 零基础入门(一):基础概念解析
Apache Flink 的定义.架构及原理 Apache Flink 是一个分布式大数据处理引擎,可对有限数据流和无限数据流进行有状态或无状态的计算,能够部署在各种集群环境,对各种规模大小的数据进行 ...
- Apache Flink 零基础入门(二十)Flink部署与作业的提交
之前我们都是基于Idea在本地进行开发,这种方式很适合开发以及测试,但是开发完之后,如何提交到服务器中运行? Flink单机部署方式 本地开发和测试过程中非常有用,只要把代码放到服务器直接运行. 前置 ...
- Apache Flink 零基础入门(二十)Flink kafka connector
内置source和sink 内置source包括从文件读取,从文件夹读取,从socket中读取.从集合或者迭代器中读取.内置的sink包括写文件.控制台输出.socket 内置connectors A ...
- Apache Flink 零基础入门(十九)Flink windows和Time操作
Time类型 在Flink中常用的Time类型: 处理时间 摄取时间 事件时间 处理时间 是上图中,最后一步的处理时间,表示服务器中执行相关操作的处理时间.例如一些算子操作时间,在服务器上面的时间. ...
- Apache Flink 零基础入门(十四)Flink 分布式缓存
Apache Flink 提供了一个分布式缓存,类似于Hadoop,用户可以并行获取数据. 通过注册一个文件或者文件夹到本地或者远程HDFS等,在getExecutionEnvironment中指定一 ...
最新文章
- linux存储实用程序,技术|使用 Linux 实用程序 gPhoto2 备份手机存储
- 问答系统的搭建与财报知识图谱关系比较密切 问题相似度方面
- Eclipse 之 EasyExplore 插件
- 电阻应用电路之指示灯电路的设计
- WCF后传系列(3):深入WCF寻址Part 3—消息过滤引擎
- python找零钱问题_Python基于回溯法子集树模板解决找零问题示例
- php 随机数 名称,php – 从标题更改为随机数
- QTP的那些事--XPath的重要使用
- html渲染json的插件,[ json editor] 如何在网页中使用Json editor 插件
- python是什么类型的语言-为什么说 Python 是强类型语言?
- 目标检测(二十)--Mask R-CNN
- 《编译原理及实践教程》第一章学习笔记
- 基于JAVA教务排课系统计算机毕业设计源码+数据库+lw文档+系统+部署
- C++与QT学习路线
- 使用docker搭建couchbase集群
- 计算机海报大赛策划书,海报策划书模板.docx
- 关于define与defined的区别
- ubuntu20.04桌面美化
- STM32F103实现OV7725拍照存储为BMP位图
- 【矩阵计算GPU加速】numpy 矩阵计算利用GPU加速,cupy包
热门文章
- 怎么计算网站高峰期并发量和所需的带宽?
- 在 IE8 下 EXT的显示问题
- 正确理解hibernate的inverse属性
- jquery中的ajax如何接收json串形式的接口
- 如何使用Linux的Crontab定时执行PHP脚本的方法
- 安装错误 服务尚未启动_原创 | 西门子300软件安装出错处理大全
- matlab遗传算法m文件,matlab上安装遗传算法工具箱
- android novate乱码,Android RxJava+Retrofit2+RxBinding
- jquery中获取元素的几种方式小结
- win10html中文乱码,Win10预览版10125中文语言包安装及乱码解决方法