写入mysql_《从0到1学习Flink》—— Flink 读取 Kafka 数据批量写入到 MySQL
前言
之前其实在 《从0到1学习Flink》—— 如何自定义 Data Sink ? 文章中其实已经写了点将数据写入到 MySQL,但是一些配置化的东西当时是写死的,不能够通用,最近知识星球里有朋友叫我: 写个从 kafka 中读取数据,经过 Flink 做个预聚合,然后创建数据库连接池将数据批量写入到 mysql 的例子。
于是才有了这篇文章,更多提问和想要我写的文章可以在知识星球里像我提问,我会根据提问及时回答和尽可能作出文章的修改。
准备
你需要将这两个依赖添加到 pom.xml 中
<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.34</version>
</dependency>
读取 kafka 数据
这里我依旧用的以前的 student 类,自己本地起了 kafka 然后造一些测试数据,这里我们测试发送一条数据则 sleep 10s,意味着往 kafka 中一分钟发 6 条数据。
package com.zhisheng.connectors.mysql.utils;import com.zhisheng.common.utils.GsonUtil;
import com.zhisheng.connectors.mysql.model.Student;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;/*** Desc: 往kafka中写数据,可以使用这个main函数进行测试* Created by zhisheng on 2019-02-17* Blog: http://www.54tianzhisheng.cn/tags/Flink/*/
public class KafkaUtil {public static final String broker_list = "localhost:9092";public static final String topic = "student"; //kafka topic 需要和 flink 程序用同一个 topicpublic static void writeToKafka() throws InterruptedException {Properties props = new Properties();props.put("bootstrap.servers", broker_list);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");KafkaProducer producer = new KafkaProducer<String, String>(props);for (int i = 1; i <= 100; i++) {Student student = new Student(i, "zhisheng" + i, "password" + i, 18 + i);ProducerRecord record = new ProducerRecord<String, String>(topic, null, null, GsonUtil.toJson(student));producer.send(record);System.out.println("发送数据: " + GsonUtil.toJson(student));Thread.sleep(10 * 1000); //发送一条数据 sleep 10s,相当于 1 分钟 6 条}producer.flush();}public static void main(String[] args) throws InterruptedException {writeToKafka();}
}
从 kafka 中读取数据,然后序列化成 student 对象。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("zookeeper.connect", "localhost:2181");
props.put("group.id", "metric-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "latest");SingleOutputStreamOperator<Student> student = env.addSource(new FlinkKafkaConsumer011<>("student", //这个 kafka topic 需要和上面的工具类的 topic 一致new SimpleStringSchema(),props)).setParallelism(1).map(string -> GsonUtil.fromJson(string, Student.class)); //,解析字符串成 student 对象
因为 RichSinkFunction 中如果 sink 一条数据到 mysql 中就会调用 invoke 方法一次,所以如果要实现批量写的话,我们最好在 sink 之前就把数据聚合一下。那这里我们开个一分钟的窗口去聚合 Student 数据。
student.timeWindowAll(Time.minutes(1)).apply(new AllWindowFunction<Student, List<Student>, TimeWindow>() {@Overridepublic void apply(TimeWindow window, Iterable<Student> values, Collector<List<Student>> out) throws Exception {ArrayList<Student> students = Lists.newArrayList(values);if (students.size() > 0) {System.out.println("1 分钟内收集到 student 的数据条数是:" + students.size());out.collect(students);}}
});
写入数据库
这里使用 DBCP 连接池连接数据库 mysql,pom.xml 中添加依赖:
<dependency><groupId>org.apache.commons</groupId><artifactId>commons-dbcp2</artifactId><version>2.1.1</version>
</dependency>
如果你想使用其他的数据库连接池请加入对应的依赖。
这里将数据写入到 MySQL 中,依旧是和之前文章一样继承 RichSinkFunction 类,重写里面的方法:
package com.zhisheng.connectors.mysql.sinks;import com.zhisheng.connectors.mysql.model.Student;
import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.List;/*** Desc: 数据批量 sink 数据到 mysql* Created by zhisheng_tian on 2019-02-17* Blog: http://www.54tianzhisheng.cn/tags/Flink/*/
public class SinkToMySQL extends RichSinkFunction<List<Student>> {PreparedStatement ps;BasicDataSource dataSource;private Connection connection;/*** open() 方法中建立连接,这样不用每次 invoke 的时候都要建立连接和释放连接** @param parameters* @throws Exception*/@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);dataSource = new BasicDataSource();connection = getConnection(dataSource);String sql = "insert into Student(id, name, password, age) values(?, ?, ?, ?);";ps = this.connection.prepareStatement(sql);}@Overridepublic void close() throws Exception {super.close();//关闭连接和释放资源if (connection != null) {connection.close();}if (ps != null) {ps.close();}}/*** 每条数据的插入都要调用一次 invoke() 方法** @param value* @param context* @throws Exception*/@Overridepublic void invoke(List<Student> value, Context context) throws Exception {//遍历数据集合for (Student student : value) {ps.setInt(1, student.getId());ps.setString(2, student.getName());ps.setString(3, student.getPassword());ps.setInt(4, student.getAge());ps.addBatch();}int[] count = ps.executeBatch();//批量后执行System.out.println("成功了插入了" + count.length + "行数据");}private static Connection getConnection(BasicDataSource dataSource) {dataSource.setDriverClassName("com.mysql.jdbc.Driver");//注意,替换成自己本地的 mysql 数据库地址和用户名、密码dataSource.setUrl("jdbc:mysql://localhost:3306/test");dataSource.setUsername("root");dataSource.setPassword("root123456");//设置连接池的一些参数dataSource.setInitialSize(10);dataSource.setMaxTotal(50);dataSource.setMinIdle(2);Connection con = null;try {con = dataSource.getConnection();System.out.println("创建连接池:" + con);} catch (Exception e) {System.out.println("-----------mysql get connection has exception , msg = " + e.getMessage());}return con;}
}
核心类 Main
核心程序如下:
public class Main {public static void main(String[] args) throws Exception{final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("zookeeper.connect", "localhost:2181");props.put("group.id", "metric-group");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("auto.offset.reset", "latest");SingleOutputStreamOperator<Student> student = env.addSource(new FlinkKafkaConsumer011<>("student", //这个 kafka topic 需要和上面的工具类的 topic 一致new SimpleStringSchema(),props)).setParallelism(1).map(string -> GsonUtil.fromJson(string, Student.class)); //student.timeWindowAll(Time.minutes(1)).apply(new AllWindowFunction<Student, List<Student>, TimeWindow>() {@Overridepublic void apply(TimeWindow window, Iterable<Student> values, Collector<List<Student>> out) throws Exception {ArrayList<Student> students = Lists.newArrayList(values);if (students.size() > 0) {System.out.println("1 分钟内收集到 student 的数据条数是:" + students.size());out.collect(students);}}}).addSink(new SinkToMySQL());env.execute("flink learning connectors kafka");}
}
运行项目
运行 Main 类后再运行 KafkaUtils.java 类!
下图是往 Kafka 中发送的数据:
下图是运行 Main 类的日志,会创建 4 个连接池是因为默认的 4 个并行度,你如果在 addSink 这个算子设置并行度为 1 的话就会创建一个连接池:
下图是批量插入数据库的结果:
总结
本文从知识星球一位朋友的疑问来写的,应该都满足了他的条件(批量/数据库连接池/写入mysql),的确网上很多的例子都是简单的 demo 形式,都是单条数据就创建数据库连接插入 MySQL,如果要写的数据量很大的话,会对 MySQL 的写有很大的压力。这也是我之前在 《从0到1学习Flink》—— Flink 写入数据到 ElasticSearch 中,数据写 ES 强调过的,如果要提高性能必定要批量的写。就拿我们现在这篇文章来说,如果数据量大的话,聚合一分钟数据达万条,那么这样批量写会比来一条写一条性能提高不知道有多少。
本文原创地址是: http://www.54tianzhisheng.cn/2019/01/15/Flink-MySQL-sink/ , 未经允许禁止转载。
关注我
微信公众号:zhisheng
另外我自己整理了些 Flink 的学习资料,目前已经全部放到微信公众号了。你可以加我的微信:zhisheng_tian,然后回复关键字:Flink 即可无条件获取到。
更多私密资料请加入知识星球!
Github 代码仓库
https://github.com/zhisheng17/flink-learning/
以后这个项目的所有代码都将放在这个仓库里,包含了自己学习 flink 的一些 demo 和博客。
本文的项目代码在 https://github.com/zhisheng17/flink-learning/tree/master/flink-learning-connectors/flink-learning-connectors-mysql
相关文章
1、《从0到1学习Flink》—— Apache Flink 介绍
2、《从0到1学习Flink》—— Mac 上搭建 Flink 1.6.0 环境并构建运行简单程序入门
3、《从0到1学习Flink》—— Flink 配置文件详解
4、《从0到1学习Flink》—— Data Source 介绍
5、《从0到1学习Flink》—— 如何自定义 Data Source ?
6、《从0到1学习Flink》—— Data Sink 介绍
7、《从0到1学习Flink》—— 如何自定义 Data Sink ?
8、《从0到1学习Flink》—— Flink Data transformation(转换)
9、《从0到1学习Flink》—— 介绍Flink中的Stream Windows
10、《从0到1学习Flink》—— Flink 中的几种 Time 详解
11、《从0到1学习Flink》—— Flink 写入数据到 ElasticSearch
12、《从0到1学习Flink》—— Flink 项目如何运行?
13、《从0到1学习Flink》—— Flink 写入数据到 Kafka
14、《从0到1学习Flink》—— Flink JobManager 高可用性配置
15、《从0到1学习Flink》—— Flink parallelism 和 Slot 介绍
16、《从0到1学习Flink》—— Flink 读取 Kafka 数据批量写入到 MySQL
17、《从0到1学习Flink》—— Flink 读取 Kafka 数据写入到 RabbitMQ
写入mysql_《从0到1学习Flink》—— Flink 读取 Kafka 数据批量写入到 MySQL相关推荐
- 《从0到1学习Flink》—— Flink 读取 Kafka 数据批量写入到 MySQL
<!-- more --> 前言 之前其实在 <从0到1学习Flink>-- 如何自定义 Data Sink ? 文章中其实已经写了点将数据写入到 MySQL,但是一些配置化的 ...
- python读取数据库数据、并保存为docx_Python从数据库读取大量数据批量写入文件的方法...
Python从数据库读取大量数据批量写入文件的方法 使用机器学习训练数据时,如果数据量较大可能我们不能够一次性将数据加载进内存,这时我们需要将数据进行预处理,分批次加载进内存. 下面是代码作用是将数据 ...
- Python从数据库读取大量数据批量写入文件的方法
今天小编就为大家分享一篇Python从数据库读取大量数据批量写入文件的方法,具有很好的参考价值,希望对大家有所帮助.一起跟随小编过来看看吧 使用机器学习训练数据时,如果数据量较大可能我们不能够一次性将 ...
- python批量读取文件内容_Python从数据库读取大量数据批量写入文件的方法
使用机器学习训练数据时,如果数据量较大可能我们不能够一次性将数据加载进内存,这时我们需要将数据进行预处理,分批次加载进内存. 下面是代码作用是将数据从数据库读取出来分批次写入txt文本文件,方便我们做 ...
- flink DDL读取kafka数据-Scala嵌入DDL形式
步驟: service firewalld stop(关闭防火墙) 啓動hadoop 離開安全模式 啓動zookeeper與kafka集羣 操作 命令 备注 查看topic $KAFKA/bin/ka ...
- kafka学习_《从0到1学习Flink》—— Flink 读取 Kafka 数据写入到 RabbitMQ
前言 之前有文章 <从0到1学习Flink>-- Flink 写入数据到 Kafka 写过 Flink 将处理后的数据后发到 Kafka 消息队列中去,当然我们常用的消息队列可不止这一种, ...
- 【高级内部资料】.NET数据批量写入性能分析 第一篇
[高级内部资料].NET数据批量写入性能分析 第一篇 说起数据的批量写入,相信大家应该不陌生了,那么我们本系列的文章不准备讲述如何来进行数据的批量写入,而是介绍常用的数据批量写入方法的性能分析. 同时 ...
- 【腾讯轻量应用服务器上部署kafka并通过flink读取kafka数据】
环境准备 经过1个月的摸索,最终选择在腾讯云上搭建一个学习环境.当时选择原因还是新用户有优惠(150左右3年),但现在看1核2g的配置勉强够用,建议后续小伙伴选择时最好是2核4g配置. 由于是单节点安 ...
- 读取Excel 数据并写入到Word示例
读取Excel 数据并写入到Word示例 0x01 读取Excel 数据并写入到Word示例 1.1 配置pom.xml 1.2 配置 application.properties 1.3 自定义配置 ...
最新文章
- swift 组件化_京东商城订单模块基于 Swift 的改造方案与实践
- a byte of python-《A Byte of Python》笔记
- 【Groovy】集合遍历 ( 集合中有集合元素时调用 flatten 函数拉平集合元素 | 代码示例 )
- TP自动生成模块目录
- Hadoop 单机版和伪分布式版安装
- 关于界面软件测试点,电子商务网站--界面测试的测试点
- 人群与网络:搜索引擎广告位的定价
- Android逆向系列之ARM语法篇
- C#快速排序源码演示
- python 会议室预约系统解决方案_会议预约系统_智能会议预约管理系统_轻松实现会议管理解决方案...
- 注册石墨文档无法连接服务器,石墨文档没有访问权限的解决方法
- mysql 分库分表实战_DBLE分库分表实战
- 想成为一名数据科学家?你得先读读这篇文章
- 数据分析概要及分析分析思路
- 【清北学堂济南刷题班】集合
- freemarker实现word模板
- 2023年ICPC全国邀请赛(陕西)-Volunteer角度
- 结账时提示:不能在对象 't_Balance' 中插入重复键
- 软考-软件设计师】(二). 操作系统
- 【程序设计与实践】实验五:停车场管理
热门文章
- 事件 ID 9665
- 4.Spark 学习成果转化—机器学习—使用Spark ML的线性回归来预测房屋价格 (线性回归问题)
- 15款jQuery带缩略图的图片轮播切换特效代码
- C语言开发数字华容道实现,jQuery实现数字华容道小游戏(实例代码)
- 学会使用x2struct(json -> struct)
- c++入门(多态and纯虚函数and抽象类)
- java虚拟机能自动处理 异常_对于非运行时异常,程序中一般可不做处理,由java虚拟机自动...
- 计算机全国211院校排名2015,2015年全国211大学名单排名汇总
- VMware 11 安装苹果系统
- ArcGIS坐标系篇之几种常见坐标系的相互转换(二)