博主把核心的内容写在最前面,其他内容和完整的代码放在最后面哈:

文章目录

  • pom配置
  • 主要代码
  • 其他内容:MyData2类,与生成数据源的类MyDataSource2

pom配置

      <dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.11</version></dependency>

主要代码

package write_to_mysql;import create_data.MyData2; // 格式见其他内容
import create_data.MyDataSource2; // 格式见其他内容
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class GetMain {public static void main(String[] args) throws Exception {// set up the streaming execution environmentfinal StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<MyData2> sourceStream = env.addSource(new MyDataSource2()); // 得到数据源sourceStream.addSink(new MysqlSink()); // 核心!保存到mysqlenv.execute("Flink_to_mysql demo");}
}

可以看到,使用流.addSink()就可以保存流的数据了,这个MysqlSink是自己写的保存逻辑,代码如下:

package write_to_mysql;import create_data.MyData2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;public class MysqlSink extends RichSinkFunction<MyData2> {private PreparedStatement ps = null;private Connection connection = null;String driver = "com.mysql.jdbc.Driver";String url = "jdbc:mysql://127.0.0.1:3306/my_test_db?useSSL=false";String username = "testuser";String password = "testpassword";@Overridepublic void open(Configuration parameters) throws Exception { // 要执行的代码super.open(parameters);  // 用于建立连接Class.forName(driver);  //加载JDBC驱动connection = DriverManager.getConnection(url, username, password);String sql = "insert into test_db.test_csv (col_1,col_2,col_3,col_4)" +"values (?,?,?,'what');";ps = connection.prepareStatement(sql);}@Overridepublic void invoke(MyData2 value, Context context) throws Exception { // 真正执行的操作// 每次插入都会调用一次// 这里的value.xxx根据具体的操作逻辑来// ps.setxxx(n,xxx) 这里的n代表要保存的位置,也就是要把数据拍到上面的String sql的第几个?(问号)上ps.setString(1, String.valueOf(value.keyId));ps.setString(2, String.valueOf(value.timestamp));ps.setString(3, String.valueOf(value.num));ps.executeUpdate();}@Overridepublic void close() throws Exception { // 关闭操作super.close();if (connection != null) {connection.close();}if (ps != null) {ps.close();}}
}

只需要覆写openinvokeclose三个函数即可,一个用于打开连接,一个用于执行操作,一个用于关闭连接。

其他内容:MyData2类,与生成数据源的类MyDataSource2

数据类与生成数据的类请参考:https://blog.csdn.net/weixin_35757704/article/details/120626180

MyData2.java

package create_data;import java.util.Arrays;public class MyData2 {public int keyId;public long timestamp;public int num;public double[] valueList;public MyData2() {}public MyData2(int accountId, long timestamp, int num, double[] valueList) {this.keyId = accountId;this.timestamp = timestamp;this.num = num;this.valueList = valueList;}public long getKeyId() {return keyId;}public void setKeyId(int keyId) {this.keyId = keyId;}public long getTimestamp() {return timestamp;}public void setTimestamp(long timestamp) {this.timestamp = timestamp;}public double[] getValueList() {return valueList;}public void setValueList(double[] valueList) {this.valueList = valueList;}public int getNum() {return num;}public void setNum(int num) {this.num = num;}@Overridepublic String toString() {return "MyData{" +"keyId=" + keyId +", timestamp=" + timestamp +", num=" + num +", valueList= " + Arrays.toString(valueList) +'}';}
}

MyDataSource2.java

package create_data;import org.apache.flink.streaming.api.functions.source.SourceFunction;import java.util.Random;public class MyDataSource2 implements SourceFunction<MyData2> {// 定义标志位,用来控制数据的产生private boolean isRunning = true;private final Random random = new Random(0);@Overridepublic void run(SourceContext ctx) throws Exception {while (isRunning) {ctx.collect(new MyData2(random.nextInt(3), System.currentTimeMillis(), 1, new double[]{random.nextDouble()}));Thread.sleep(1000L); // 1s生成1个数据}}@Overridepublic void cancel() {isRunning = false;}
}

java Flink使用addSink方法保存流到mysql数据库中相关推荐

  1. java flink使用addSink方法保存流数据到redis

    博主把核心的内容写在最前面,其他内容和完整的代码放在最后面哈: 文章目录 pom配置 主要代码 其他内容:MyData2类,与生成数据源的类MyDataSource2 pom配置 <depend ...

  2. session mysql java_PHP自定义session处理方法,保存到MySQL数据库中

    我们都知道,session是为了解决因特网的无状态属性而创造出来的.我们可以用session这种会话管理机制来构建购物车.监控站点网络访问,甚至还可以跟踪某一个用户具体是如何使用你的应用的.PHP默认 ...

  3. springMVC保存数据到mysql数据库中文乱码问题解决方法

    springMVC保存数据到mysql数据库中文乱码问题解决方法 参考文章: (1)springMVC保存数据到mysql数据库中文乱码问题解决方法 (2)https://www.cnblogs.co ...

  4. 获取mysql可行方法_Mysql学习Java实现获得MySQL数据库中所有表的记录总数可行方法...

    <Mysql学习Java实现获得MySQL数据库中所有表的记录总数可行方法>要点: 本文介绍了Mysql学习Java实现获得MySQL数据库中所有表的记录总数可行方法,希望对您有用.如果有 ...

  5. Android学习笔记——保存数据到SQL数据库中(Saving Data in SQL Databases)

    知识点: 1.使用SQL Helper创建数据库 2.数据的增删查改(PRDU:Put.Read.Delete.Update) 背景知识: 上篇文章学习了保存文件,今天学习的是保存数据到SQL数据库中 ...

  6. mysql schema 保存数据_如何在mysql数据库中保存apache spark schema输出

    任何人都可以告诉我,如果有任何方式在apache的火花存储在mysql数据库的JavaRDD?我从2个CSV文件中获取输入,然后在对其内容进行连接操作之后,我需要将输出(输出JavaRDD)保存在my ...

  7. mye连接mysql数据库_MySQL_如何在Java程序中访问mysql数据库中的数据并进行简单的操作,在上篇文章给大家介绍了Myeclip - phpStudy...

    如何在Java程序中访问mysql数据库中的数据并进行简单的操作 在上篇文章给大家介绍了Myeclipse连接mysql数据库的方法,通过本文给大家介绍如何在Java程序中访问mysql数据库中的数据 ...

  8. java向mysql写入数据慢_通过java代码往mysql数据库中写入日期相关数据少13个小时...

    通过show variables like '%time_zone%'; 查看时区: CST 时区 名为 CST 的时区是一个很混乱的时区,有四种含义: 美国中部时间 Central Standard ...

  9. MySQL数据库中导入导出方法以及工具介绍

    MySQL数据库中导入导出方法以及工具介绍 1.MySQLimport的语法介绍: mysqlimport位于mysql/bin目录中,是mysql的一个载入(或者说导入)数据的一个非常有效的工具.这 ...

最新文章

  1. stylegan生成循环gif图片
  2. python中是干嘛的-学 Python 都用来干嘛的?
  3. Django框架(14.Django中模型类的关系,以及模型类关联查询)
  4. Apache Flink 零基础入门(五)Flink开发实时处理应用程序
  5. BZOJ 2818 Gcd
  6. d3js scales深入理解
  7. C++语言虚函数表实现多态原理
  8. web前端网页设计作业—个人网页(游戏主题)(html+css+js)
  9. Moonlight 串流分辨率设置
  10. 六度空间-c++实现
  11. QQ微信可以上网,但是google浏览器上不了网怎么办?
  12. 双碑零基础法语学习 学习法语要知道哪些法语常识?
  13. 数组添加/扩容和数组缩减
  14. 10和100Mbps以太网
  15. 66.假定输入的字符串中只包含字母和*号。请编写函数fun,它的功能是:删除字符串中所有的*号。在编写函数时,不得使用C语言提供的字符串函数。
  16. EMNIST: an extension of MNIST to handwritten letters(数据集简介)
  17. 交互组件滚动条,搜索框,上传组件,翻页的微创新
  18. 那些年我们一起手写过的单例
  19. 网站蜘蛛日志分析解读,SEO站长自查诊断
  20. Python中用Matplotlib做多个纵轴(多y轴)

热门文章

  1. C++自由存储空间:new
  2. opencv python 多帧降噪算法_实战 | OpenCV实现视频防抖
  3. 从小白到大数据技术专家的学习历程,你准备好了吗
  4. 瑞驰vCloud超融合让数据中心业务更稳定
  5. Ubuntu 源码方式安装Subversion
  6. php 类自动执行方法,php类讲解8:PHP 自动加载类 __autoload() 方法
  7. docker 安装的东西都在哪儿_热水器越来越不流行了,如今都安装这东西,好用安全,还省电费...
  8. Redis五大基本数据类型及其相关命令及常用用途
  9. 语言迭代发_从今天开始,每天学点R语言~
  10. 数据中台 画像标签_如何通过数据中台标签平台,圈出产品高价值用户?