需求:socket发送过来的数据,把String类型转成对象,然后把Java对象保存到Mysql数据库中。

创建数据库和表

create database imooc_flink;
create table student(
id int(11) NOT NULL AUTO_INCREMENT,
name varchar(25),
age int(10),
primary key(id)
)

导入mysql依赖:

     <dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.15</version></dependency>

创建POJO Student

package com.vincent.course05;public class Student {private int id;private String name;private int age;@Overridepublic String toString() {return "Student{" +"id=" + id +", name='" + name + '\'' +", age=" + age +'}';}public int getId() {return id;}public void setId(int id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}public int getAge() {return age;}public void setAge(int age) {this.age = age;}
}

然后创建连接,SinkToMySQL.java

package com.vincent.course05;import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;public class SinkToMySQL extends RichSinkFunction<Student> {PreparedStatement ps;private Connection connection;/*** open() 方法中建立连接,这样不用每次 invoke 的时候都要建立连接和释放连接** @param parameters* @throws Exception*/@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);connection = getConnection();String sql = "insert into student(id, name, age) values(?, ?, ?);";ps = this.connection.prepareStatement(sql);}@Overridepublic void close() throws Exception {super.close();//关闭连接和释放资源if (connection != null) {connection.close();}if (ps != null) {ps.close();}}/*** 每条数据的插入都要调用一次 invoke() 方法** @param value* @param context* @throws Exception*/@Overridepublic void invoke(Student value, Context context) throws Exception {//组装数据,执行插入操作ps.setInt(1, value.getId());ps.setString(2, value.getName());ps.setInt(3, value.getAge());ps.executeUpdate();}private static Connection getConnection() {Connection con = null;try {Class.forName("com.mysql.cj.jdbc.Driver");con = DriverManager.getConnection("jdbc:mysql://192.168.152.45:3306/imooc_flink?useUnicode=true&characterEncoding=UTF-8", "root", "123456");} catch (Exception e) {e.printStackTrace();System.out.println("-----------mysql get connection has exception , msg = " + e.getMessage());}return con;}
}

main方法:

public static void main(String[] args) throws Exception {StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> source = environment.socketTextStream("192.168.152.45", 9999);SingleOutputStreamOperator<Student> studentStream = source.map(new MapFunction<String, Student>() {@Overridepublic Student map(String value) throws Exception {String[] splits = value.split(",");Student student = new Student();student.setId(Integer.parseInt(splits[0]));student.setName(splits[1]);student.setAge(Integer.parseInt(splits[2]));return student;}});studentStream.addSink(new SinkToMySQL());environment.execute("JavaCustomSinkToMysql");}

从socket中获取数据,数据格式使用逗号分割,在控制台中输入:

nc -lk 9999
1,tom,23

检查数据库,数据库中多了一条数据

mysql> select * from student;
+----+------+------+
| id | name | age  |
+----+------+------+
|  1 | tom  |   23 |
+----+------+------+
1 row in set (0.00 sec)

这样就很方便的使用自定义的sink,写入到MySQL中去。

总结:

第一步:继承RichSinkFunction<T> T就是想要写入的对象类型

第二步:重写方法 open/close生命周期方法,invoke每条记录执行一次

默认情况下open方法的并行度不是1,跟具体的电脑有关系。

Apache Flink 零基础入门(十七)Flink 自定义Sink相关推荐

  1. Apache Flink 零基础入门【转】

    Apache Flink 零基础入门(一):基础概念解析 Apache Flink 零基础入门(二):DataStream API 编程 转载于:https://www.cnblogs.com/dav ...

  2. Apache Flink 零基础入门(十八)Flink Table APISQL

    什么是Flink关系型API? 虽然Flink已经支持了DataSet和DataStream API,但是有没有一种更好的方式去编程,而不用关心具体的API实现?不需要去了解Java和Scala的具体 ...

  3. Apache Flink 零基础入门(三)编写最简单的helloWorld

    实验环境 JDK 1.8 IDE Intellij idea Flink 1.8.1 实验内容 创建一个Flink简单Demo,可以从流数据中统计单词个数. 实验步骤 首先创建一个maven项目,其中 ...

  4. Apache Flink 零基础入门(十五)Flink DataStream编程(如何自定义DataSource)

    数据源可以通过StreamExecutionEnvironment.addSource(sourceFunction)方式来创建,Flink也提供了一些内置的数据源方便使用,例如readTextFil ...

  5. Apache Flink 零基础入门(一):基础概念解析

    Apache Flink 的定义.架构及原理 Apache Flink 是一个分布式大数据处理引擎,可对有限数据流和无限数据流进行有状态或无状态的计算,能够部署在各种集群环境,对各种规模大小的数据进行 ...

  6. Apache Flink 零基础入门(二十)Flink部署与作业的提交

    之前我们都是基于Idea在本地进行开发,这种方式很适合开发以及测试,但是开发完之后,如何提交到服务器中运行? Flink单机部署方式 本地开发和测试过程中非常有用,只要把代码放到服务器直接运行. 前置 ...

  7. Apache Flink 零基础入门(二十)Flink kafka connector

    内置source和sink 内置source包括从文件读取,从文件夹读取,从socket中读取.从集合或者迭代器中读取.内置的sink包括写文件.控制台输出.socket 内置connectors A ...

  8. Apache Flink 零基础入门(十九)Flink windows和Time操作

    Time类型 在Flink中常用的Time类型: 处理时间 摄取时间 事件时间 处理时间 是上图中,最后一步的处理时间,表示服务器中执行相关操作的处理时间.例如一些算子操作时间,在服务器上面的时间. ...

  9. Apache Flink 零基础入门(十四)Flink 分布式缓存

    Apache Flink 提供了一个分布式缓存,类似于Hadoop,用户可以并行获取数据. 通过注册一个文件或者文件夹到本地或者远程HDFS等,在getExecutionEnvironment中指定一 ...

最新文章

  1. linux存储实用程序,技术|使用 Linux 实用程序 gPhoto2 备份手机存储
  2. 问答系统的搭建与财报知识图谱关系比较密切 问题相似度方面
  3. Eclipse 之 EasyExplore 插件
  4. 电阻应用电路之指示灯电路的设计
  5. WCF后传系列(3):深入WCF寻址Part 3—消息过滤引擎
  6. python找零钱问题_Python基于回溯法子集树模板解决找零问题示例
  7. php 随机数 名称,php – 从标题更改为随机数
  8. QTP的那些事--XPath的重要使用
  9. html渲染json的插件,[ json editor] 如何在网页中使用Json editor 插件
  10. python是什么类型的语言-为什么说 Python 是强类型语言?
  11. 目标检测(二十)--Mask R-CNN
  12. 《编译原理及实践教程》第一章学习笔记
  13. 基于JAVA教务排课系统计算机毕业设计源码+数据库+lw文档+系统+部署
  14. C++与QT学习路线
  15. 使用docker搭建couchbase集群
  16. 计算机海报大赛策划书,海报策划书模板.docx
  17. 关于define与defined的区别
  18. ubuntu20.04桌面美化
  19. STM32F103实现OV7725拍照存储为BMP位图
  20. 【矩阵计算GPU加速】numpy 矩阵计算利用GPU加速,cupy包

热门文章

  1. 怎么计算网站高峰期并发量和所需的带宽?
  2. 在 IE8 下 EXT的显示问题
  3. 正确理解hibernate的inverse属性
  4. jquery中的ajax如何接收json串形式的接口
  5. 如何使用Linux的Crontab定时执行PHP脚本的方法
  6. 安装错误 服务尚未启动_原创 | 西门子300软件安装出错处理大全
  7. matlab遗传算法m文件,matlab上安装遗传算法工具箱
  8. android novate乱码,Android RxJava+Retrofit2+RxBinding
  9. jquery中获取元素的几种方式小结
  10. win10html中文乱码,Win10预览版10125中文语言包安装及乱码解决方法