kafka拉取mysql数据库_kafka里信息用flink获取后放入mysql
1. 安装zookeeper, kafka
2. 启动zookeeper, kafka server
3. 准备工作
在Mysql数据库创建一个table, t_student
加入maven需要的flink资源
1.10.0
2.12
org.scala-lang
scala-library
2.12.7
org.apache.flink
flink-streaming-scala_${scala.binary.version}
${flink.version}
org.apache.flink
flink-scala_${scala.binary.version}
${flink.version}
org.apache.flink
flink-connector-kafka-0.10_${scala.binary.version}
${flink.version}
org.apache.flink
flink-java
${flink.version}
compile
org.apache.flink
flink-streaming-java_${scala.binary.version}
${flink.version}
compile
消费端importcom.alibaba.fastjson.JSON;importorg.apache.flink.api.common.serialization.SimpleStringSchema;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;importjava.util.Properties;public classConsumerMain {
public static voidmain(String[] args) throwsException {
finalStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties props = newProperties();props.put("bootstrap.servers","localhost:9092");props.put("zookeeper.connect","localhost:2181");props.put("group.id","metric-group");props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");props.put("auto.offset.reset","latest");SingleOutputStreamOperator student = env.addSource(newFlinkKafkaConsumer010<>(
"student",newSimpleStringSchema(),props)).setParallelism(1)
.map(string -> JSON.parseObject(string,Student.class));student.addSink(newSinkToMySQL());env.execute("Flink add sink");}
}
public classSinkToMySQL extendsRichSinkFunction {
PreparedStatement ps;privateConnection connection;/*****@paramparameters*@throwsException*/@Overridepublic voidopen(Configuration parameters) throwsException {
super.open(parameters);connection= getConnection();String sql = "insert into t_student(id, name, address, age) values(?, ?, ?, ?);";ps= this.connection.prepareStatement(sql);}
@Overridepublic voidclose() throwsException {
super.close();if(connection!= null) {
connection.close();}
if(ps!= null) {
ps.close();}
}
/*** @param value* @param context* @throws Exception*/@Overridepublic voidinvoke(Student value,Context context) throwsException {
System.out.println(" value: "+ JSON.toJSONString(value));ps.setInt(1,value.getId());ps.setString(2,value.getName());ps.setString(3,value.getAddress());ps.setInt(4,value.getAge());ps.executeUpdate();}
privateConnection getConnection() throwsSQLException {
Connection con = null;try{
Class.forName("com.mysql.jdbc.Driver");con = DriverManager.getConnection("jdbc:mysql://localhost:3306/demo","root","Myh090928");} catch(Exception e) {
System.out.println("-----------mysql get connection has exception , msg = "+ e.getMessage());}
returncon;}
}
具体代码可以从这下载
kafka拉取mysql数据库_kafka里信息用flink获取后放入mysql相关推荐
- 如果可以,我想并行消费Kafka拉取的数据库Binlog
关注 "Java艺术" 我们一起成长! 本篇所述内容只在试用阶段,可行性有待考验! 笔者在上一篇提到:由于Binlog需要顺序消费,所以阿里数据订阅服务DTS只将Binlog放入t ...
- 如何查找mysql数据库ip_如何查看连接MYSQL数据库的IP信息
我们通常情况下要统计数据库的连接数指的是统计总数,没有细分到每个IP上.现在要监控每个IP的连接数,实现方式如下: 方法一: 复制代码 代码如下: select SUBSTRING_INDEX(hos ...
- 如何查看MySQL数据库状态及信息(内存、数据库、编码格式、表、列、索引等)
备战2022春招或暑期实习,本专栏会持续输出MySQL系列文章,祝大家每天进步亿点点!文末私信作者,我们一起去大厂. 本篇总结的是 <如何查看MySQL数据库状态及信息>,后续会每日更新~ ...
- MySQL数据库,从入门到精通:第七篇——MySQL单行函数应用
MySQL数据库,从入门到精通:第七篇--MySQL单行函数应用 第七篇_单行函数 1. 函数的理解 1.1 什么是函数 1.2 不同DBMS函数的差异 2. 数值函数 2.1 基本函数 2.3 三角 ...
- MySQL数据库,从入门到精通:第四篇——MySQL中常用的运算符及其用法
MySQl学习(MySQL数据库,从入门到精通:第四篇--MySQL中常用的运算符及其用法 第四篇_MySQL中常用的运算符及其用法运算符 1. 算术运算符 1.加法与减法运算符 2.乘法与除法运算符 ...
- 阿里云安装数据库mysql数据库服务器_阿里云CentOs服务器 安装与配置mysql数据库...
Linux 安装mysql 数据库 一下为mysql 安装教程 Using username"root". Last login: Tue Oct8 09:30:34 2019 f ...
- mysql数据库根据经纬度计算距离,获取离我最近的地点列表,并排序。附近的人,附近商店等功能,一个sql就搞定
mysql数据库根据经纬度计算距离,获取离我最近的地点列表,并排序 附近的人,附近商家等功能,顺序显示顺序是由近到远的,便利用户查看和判断,sql实现如下. StringBuilder sb=new ...
- Kafka实战 - 06 Kafka消费者:从指定Topic拉取工单处置记录信息并存入MongoDB数据库
文章目录 1. 处置记录表 t_disposal_record 2. kafka 主题和消费者配置 3. 定义一个线程任务 KafkaTask 1. kafka Topic中的数据:KafkaDisp ...
- kafka consumer配置拉取速度慢_Kafka消费者的使用和原理
这周我们学习下消费者,仍然还是先从一个消费者的Hello World学起: public class Consumer { public static void main(String[] args) ...
最新文章
- html背景图片自适应纵向,HTML5 body设置全屏背景图片 如何让body的背景图片自适应整个屏----实战经验...
- mac上的mongodb安装与使用的踩坑记
- 数据结构 3-1-1 栈
- vi/vim 键盘图
- jQuery实现手风琴效果
- php上证指数抽奖代码,[高手指点] 美团网的抽奖是什么原理?
- 十二导联动态心电图技术参数
- MFC CImageList 详解
- 将图片url转file类型
- 微信ubuntu版服务器,Ubuntu 18.04 安装微信(Linux通用)
- 键盘没有 菜单键 menu 键盘映射
- java 加密 压缩_如何用java 将文件加密压缩为zip文件.
- 举个栗子!Tableau 技巧(133):完整显示工具提示中的数据
- matlab gui双音拨号,用matlab GUI功能模拟DTMF拨号系统.doc
- 微信小游戏开发实战教程15-关卡编辑器的制作以及关卡分享功能的实现
- Github创建、删除organization
- 【使用python和flask建个人博客】增加了重复类型的卡片功能,用于更好的完成日常的工作与生活
- 利用高斯(Guass)算法求解2维的SVP向量
- 如何用数学课件制作工具演示正方体展开动画
- SEO工作者面试基本都会被问到的问题