必须得创建好Mysql表

CREATE TABLE student (name VARCHAR(20),age INT);

读写代码

  • 创建实体类
public class Student {private String name;private int  age;public Student(String name, int age) {this.name = name;this.age = age;}public String getName() {return name;}public void setName(String name) {this.name = name;}public int getAge() {return age;}public void setAge(int age) {this.age = age;}@Overridepublic String toString() {return "Person{" +"name='" + name + '\'' +", age=" + age +'}';}
}
  • flink读写mysql代码
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink;
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.types.Row;import java.util.Arrays;public class FLink2Mysql {private static String driverClass = "com.mysql.jdbc.Driver";private static String dbUrl = "jdbc:mysql://localhost:3306/test";private static String userName = "root";private static String passWord = "123";public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<Student> input = env.fromCollection(Arrays.asList(new Student("Tom", 25), new Student("Jack", 24)));DataStream<Row> ds = input.map(new RichMapFunction<Student, Row>() {@Overridepublic Row map(Student student) throws Exception {return Row.of(student.getName(), student.getAge());}});TypeInformation<?>[] fieldTypes = new TypeInformation<?>[]{BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.INT_TYPE_INFO};RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);//写入mysql(追加模式)JDBCAppendTableSink sink = JDBCAppendTableSink.builder().setDrivername(driverClass).setDBUrl(dbUrl).setUsername(userName).setPassword(passWord).setParameterTypes(fieldTypes).setQuery("insert into student values(?,?)").build();sink.emitDataStream(ds);//查询mysqlJDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat().setDrivername(driverClass).setDBUrl(dbUrl).setUsername(userName).setPassword(passWord).setQuery("select * from student").setRowTypeInfo(rowTypeInfo).finish();DataStreamSource<Row> input1 = env.createInput(jdbcInputFormat);input1.print();try {env.execute();} catch (Exception e) {e.printStackTrace();}}
}
  • 查询mysql结果

后记

简易程度感觉和Spark还是有一定的差距,希望以后能更好的优化吧!

scala版本参考
java版本参考

Flink读写Mysql(Java版)相关推荐

  1. flink读mysql速度怎么样_[DB] Flink 读 MySQL

    思路 在 Flink 中创建一张表有两种方法: 从一个文件中导入表结构(Structure)(常用于批计算)(静态) 从 DataStream 或者 DataSet 转换成 Table (动态) pa ...

  2. 尚硅谷MySQL高级JAVA版

    尚硅谷MySQL高级JAVA版 1.MySQL环境 1.1.环境安装 1.2.安装位置 1.3.修改字符集 1.4.配置文件 2.MySQL逻辑架构 3.存储引擎 4.SQL性能下降的原因 5.SQL ...

  3. java读写mysql数据库_Java读写MySQL数据库小实例

     Java读写MySQL数据库小实例 首先需要安装和配置好MySQL数据库.接下来,先创建一个数据库,Java代码连接此数据库,然后读写. 假设基于MySQL命令行创建一个叫做:phildatab ...

  4. Java版DBHelper【简单】(MySQL数据库)

    http://www.cnblogs.com/xia520pi/archive/2012/05/22/2513920.html Java版DBHelper[简单](MySQL数据库) 1.测试数据库 ...

  5. java sql 工资管理,企业工资管理系统(Java+MySQL)Word版

    <企业工资管理系统(Java+MySQL)Word版>由会员分享,可在线阅读,更多相关<企业工资管理系统(Java+MySQL)Word版(24页珍藏版)>请在人人文库网上搜索 ...

  6. Flink读写系列之-读HBase并写入HBase

    这里读HBase提供两种方式,一种是继承RichSourceFunction,重写父类方法,一种是实现OutputFormat接口,具体代码如下: 方式一:继承RichSourceFunction p ...

  7. Twitter的分布式雪花算法 SnowFlake 每秒自增生成26个万个可排序的ID (Java版)

    分布式系统中,有一些需要使用全局唯一ID的场景,这种时候为了防止ID冲突可以使用36位的UUID,但是UUID有一些缺点,首先他相对比较长,另外UUID一般是无序的. 有些时候我们希望能使用一种简单一 ...

  8. 分布式主键解决方案----Twitter 雪花算法的原理(Java 版)

    SnowFlake 雪花算法 对于分布式系统环境,主键ID的设计很关键,什么自增intID那些是绝对不用的,比较早的时候,大部分系统都用UUID/GUID来作为主键,优点是方便又能解决问题,缺点是插入 ...

  9. TPC-W安装与配置(威斯康星大学Java版)

    来自  http://blog.csdn.net/cybercode/article/details/6737415 系统:CentOS 5.6 TPC-W 官方介绍:http://www.tpc.o ...

最新文章

  1. shell变量设置与显示
  2. 面对众多的前端框架,你该如何学习?
  3. 【IOS 开发】Objective - C 面向对象高级特性 - 包装类 | 类处理 | 类别 | 扩展 | 协议 | 委托 | 异常处理 | 反射
  4. LASSO与Item Response Theory模型中的隐变量选择
  5. android网络请求回调管理,Android HTTP网络请求的异步实现
  6. oracle中右击出现的含义,Oracle中经典的问题解决方案-Oracle
  7. 爬虫的基本知识第一个请求requests模块的基本使用
  8. MySQL5.7.17绿色版安装
  9. Delphi Menu Designer(菜单设计器)之一
  10. CentOS 7下搭建配置 SVN 服务器
  11. 数据结构(四)---栈的顺序存储的实现---java版
  12. da---tlc5615._CD-DA的完整形式是什么?
  13. ExtJS4.0的数据集 .
  14. hibernate中的一对多和多对多的映射关系
  15. 【bzoj4709】[Jsoi2011]柠檬 斜率优化
  16. Javascript创建对象几种方法解析
  17. P3194 [HNOI2008]水平可见直线
  18. 在 Mac 上的 Safari 浏览器中如何存储网页的一部分或整个网页?
  19. 【尚硅谷_数据结构与算法】十二、算法
  20. android手机解除root,手机一键ROOT以后如何解除?手机root后怎么恢复

热门文章

  1. 阮一峰网络日志 第36期 2018年12月21日
  2. Numpy的学习6-深浅赋值(copydeep copy)
  3. sharepoint 2013文档上传最大50M怎么改啊
  4. Wayland 1.0 发布,图形服务器
  5. 股市像脱缰的“野牛”,谁是最好的“牛崽”?
  6. 如何开发油猴脚本来block掘金用户
  7. Helpful links
  8. Web项目(四)————异步队列的实现
  9. Linux-CentOS上一些快捷键的使用
  10. openid4java 使用记录[转载]