Flink官网提供了JdbcSink的功能,如下:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.fromElements(...).addSink(JdbcSink.sink("insert into books (id, title, author, price, qty) values (?,?,?,?,?)",(ps, t) -> {ps.setInt(1, t.id);ps.setString(2, t.title);ps.setString(3, t.author);ps.setDouble(4, t.price);ps.setInt(5, t.qty);},new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl(getDbMetadata().getUrl()).withDriverName(getDbMetadata().getDriverClass()).build()));
env.execute();

但是,在下才疏学浅,中间那个…看的在下一脸懵逼。
自己从网上查找资料发现只有创建一个RichSinkFunction的方法,在下就是想用官网的那个方法。于是经过在gitee寻找依赖,在代码中寻找关键字JdbcSink,终于找到了,Apache Flink的整个项目的代码。起初我还不知道,发现这个demo项目怎么这么大,还有几个依赖下载不下来。
在Flink的测试类中找到了JdbcSink相关的Java测试代码。
然后经过在下的不懈努力,终于写成了一个scala的测试类。如下:

import org.apache.flink.connector.jdbc.JdbcConnectionOptions.JdbcConnectionOptionsBuilder
import org.apache.flink.connector.jdbc.{JdbcSink, JdbcStatementBuilder}
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}import java.sql.{PreparedStatement, Types}case class TestEntry(id: Int, title: String, author: String, price: Double, qty: Int)object JDBCSinkTestJob {def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)env.fromElements(TestEntry(1001, "Java public for dummies", "Tan Ah Teck", 11.11, 11),TestEntry(1002, "More Java for dummies", "Tan Ah Teck", 22.22, 22),TestEntry(1003, "More Java for more dummies", "Mohammad Ali", 33.33, 33),TestEntry(1004, "A Cup of Java", "Kumar", 44.44, 44),TestEntry(1005, "A Teaspoon of Java", "Kevin Jones", 55.55, 55),TestEntry(1006, "A Teaspoon of Java 1.4", "Kevin Jones", 66.66, 66),TestEntry(1007, "A Teaspoon of Java 1.5", "Kevin Jones", 77.77, 77),TestEntry(1008, "A Teaspoon of Java 1.6", "Kevin Jones", 88.88, 88),TestEntry(1009, ("A Teaspoon of Java 1.7"), ("Kevin Jones"), 99.99, 99),TestEntry(1010, ("A Teaspoon of Java 1.8"), ("Kevin Jones"), 99, 1010)).addSink(JdbcSink.sink(String.format("insert into %s (id, title, author, price, qty) values (?,?,?,?,?)", "books"),new JdbcStatementBuilder[TestEntry] {override def accept(ps: PreparedStatement, t: TestEntry): Unit = {ps.setInt(1, t.id);ps.setString(2, t.title);ps.setString(3, t.author);if (t.price == null) {ps.setNull(4, Types.DOUBLE);} else {ps.setDouble(4, t.price);}ps.setInt(5, t.qty);}},new JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://hadoop10:3306/test?useUnicode=true&characterEncoding=UTF-8").withUsername("root").withPassword("123456").withDriverName("com.mysql.jdbc.Driver").build()))env.execute()}}

这个测试类,把测试代码的变量给提取了出来。
写代码的过程还需要导入MySQL需要的依赖,自行搜索。
测试结果

Perfect,在下非常的高兴,决定打两把游戏奖赏一下自己。

参考资料:
gitee中的Flink项目代码

Flink使用JdbcSink下沉数据到数据库MySQL相关推荐

  1. 大数据之数据库mysql优化实战(一)

    2019独角兽企业重金招聘Python工程师标准>>> :facepunch: 大数据之数据库mysql优化实战(一) 首先你要有数据,不然怎么测试,几百条就算了,还没跑就完了. 本 ...

  2. mysql查出倒序第一条数据_[数据库]mysql 记录根据日期字段倒序输出

    [数据库]mysql 记录根据日期字段倒序输出 0 2016-07-21 11:00:17 我们知道倒序输出是很简单的 select * from table order by id desc 直接这 ...

  3. python网页数据存入数据库_python网络爬虫抓取动态网页并将数据存入数据库MySQL...

    简述 以下的代码是使用python实现的网络爬虫,抓取动态网页 http://hb.qq.com/baoliao/ .此网页中的最新.精华下面的内容是由JavaScript动态生成的.审查网页元素与网 ...

  4. python网站数据写入mysql_python网络爬虫抓取动态网页并将数据存入数据库MySQL

    简述 以下的代码是使用python实现的网络爬虫,抓取动态网页 http://hb.qq.com/baoliao/ .此网页中的最新.精华下面的内容是由JavaScript动态生成的.审查网页元素与网 ...

  5. Java 批量插入数据到数据库(MySQL)中

    实现Java批量插入数据库数据: package Proxy;import java.io.BufferedReader; import java.io.File; import java.io.Fi ...

  6. excel表数据导入数据库mysql中,并解决导入时间格式问题

    1.准备好Excel表数据 id category_id category_pid title art_desc content imageurl tags   create_time   3 1 E ...

  7. mysql 导出中间 数据_MYSQL数据库之间的数据导出与导入

    源数据库地址: 172.16.1.7 目标数据库地址: 172.16.1.51 步骤: (1) 进入172.16.1.7服务器,登录mysql数据库 mysqldump -uusername -ppa ...

  8. nodejs操作sqlserver数据_pyspark操作MySQL、SQLServer数据库进行数据处理操作

    欢迎访问本人的CSDN博客[Together_CZ],我是沂水寒城. https://yishuihancheng.blog.csdn.net 在大数据处理领域里面,Hadoop和spark可以说是最 ...

  9. python爬取新闻并归数据库_Python爬取数据并写入MySQL数据库操作示例

    Python爬取数据并写入MySQL数据库的实例 首先我们来爬取 http://html-color-codes.info/color-names/ 的一些数据. 按 F12 或 ctrl+u 审查元 ...

最新文章

  1. Ubuntu VSCode OpenCV 环境配置
  2. 完全二叉树、平衡二叉树、二叉查找树(二叉排序树)
  3. Linux系统gdb工具使用,使用 GDB 工具调试 Go
  4. ML:MLOps系列讲解之《基于ML的软件的三个层次之02 Model: Machine Learning Pipelines——2.6 ML Model serialization forma》解读
  5. small用于不连续数组_Excel公式技巧19: 在方形区域内填充不重复的随机整数
  6. 第二讲 命令源码文件
  7. C#通过属性名字符串获取、设置对象属性值
  8. 数学学习笔记--概率论
  9. 'chcp' 不是内部或外部命令,也不是可运行的程序
  10. ios上传文件云服务器上,ios文件上传服务器
  11. 安卓学习随笔 -- 自定义标题栏
  12. 创建美区苹果账户ID
  13. python 写入文件时编码问题
  14. android自动生成cardview,CardView
  15. html视频标签略缩图,JavaScript截取video标签视频缩略图 方法1
  16. Internet Explorer 包含五个预定义区域
  17. html网页上展示晶圆的坐标图,一种测试不良芯片晶元坐标分布的方法与流程
  18. 最强计算机游戏,这可能是市面上体积最小性能最强的游戏主机了
  19. 使用栈实现中缀表达式转后缀表达式
  20. 《逆袭大学》文摘——7.1.2 中学生学习单片机的启示

热门文章

  1. 基于Greenplum构建下一代数据分析平台
  2. 【教程】花100块钱DIY一台民航客机雷达
  3. 数学之路(3)-数据分析(6)
  4. 新突破!德国MPQ证实光子是量子比特载波的优选
  5. 5-TDengine集成SpringBoot,MyBatis,MyBatisPlus
  6. 艾兰岛编辑器-设置旋转效果
  7. Window应急响应(六):NesMiner挖矿病毒
  8. 深度学习图像分类:Kaggle植物幼苗分类(Plant Seedlings Classification)完整代码
  9. 国产操作系统,路在何方?
  10. Web安全攻防 信息收集篇(仅供交流学习使用,请勿用于非法用途)