1. 安装zookeeper, kafka

2. 启动zookeeper, kafka server

3. 准备工作

在Mysql数据库创建一个table, t_student

加入maven需要的flink资源

1.10.0

2.12

org.scala-lang

scala-library

2.12.7

org.apache.flink

flink-streaming-scala_${scala.binary.version}

${flink.version}

org.apache.flink

flink-scala_${scala.binary.version}

${flink.version}

org.apache.flink

flink-connector-kafka-0.10_${scala.binary.version}

${flink.version}

org.apache.flink

flink-java

${flink.version}

compile

org.apache.flink

flink-streaming-java_${scala.binary.version}

${flink.version}

compile

消费端importcom.alibaba.fastjson.JSON;importorg.apache.flink.api.common.serialization.SimpleStringSchema;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;importjava.util.Properties;public classConsumerMain {

public static voidmain(String[] args) throwsException {

finalStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties props = newProperties();props.put("bootstrap.servers","localhost:9092");props.put("zookeeper.connect","localhost:2181");props.put("group.id","metric-group");props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");props.put("auto.offset.reset","latest");SingleOutputStreamOperator student = env.addSource(newFlinkKafkaConsumer010<>(

"student",newSimpleStringSchema(),props)).setParallelism(1)

.map(string -> JSON.parseObject(string,Student.class));student.addSink(newSinkToMySQL());env.execute("Flink add sink");}

}

public classSinkToMySQL extendsRichSinkFunction {

PreparedStatement ps;privateConnection connection;/*****@paramparameters*@throwsException*/@Overridepublic voidopen(Configuration parameters) throwsException {

super.open(parameters);connection= getConnection();String sql = "insert into t_student(id, name, address, age) values(?, ?, ?, ?);";ps= this.connection.prepareStatement(sql);}

@Overridepublic voidclose() throwsException {

super.close();if(connection!= null) {

connection.close();}

if(ps!= null) {

ps.close();}

}

/*** @param value* @param context* @throws Exception*/@Overridepublic voidinvoke(Student value,Context context) throwsException {

System.out.println(" value: "+ JSON.toJSONString(value));ps.setInt(1,value.getId());ps.setString(2,value.getName());ps.setString(3,value.getAddress());ps.setInt(4,value.getAge());ps.executeUpdate();}

privateConnection getConnection() throwsSQLException {

Connection con = null;try{

Class.forName("com.mysql.jdbc.Driver");con = DriverManager.getConnection("jdbc:mysql://localhost:3306/demo","root","Myh090928");} catch(Exception e) {

System.out.println("-----------mysql get connection has exception , msg = "+ e.getMessage());}

returncon;}

}

具体代码可以从这下载

kafka拉取mysql数据库_kafka里信息用flink获取后放入mysql相关推荐

  1. 如果可以,我想并行消费Kafka拉取的数据库Binlog

    关注 "Java艺术" 我们一起成长! 本篇所述内容只在试用阶段,可行性有待考验! 笔者在上一篇提到:由于Binlog需要顺序消费,所以阿里数据订阅服务DTS只将Binlog放入t ...

  2. 如何查找mysql数据库ip_如何查看连接MYSQL数据库的IP信息

    我们通常情况下要统计数据库的连接数指的是统计总数,没有细分到每个IP上.现在要监控每个IP的连接数,实现方式如下: 方法一: 复制代码 代码如下: select SUBSTRING_INDEX(hos ...

  3. 如何查看MySQL数据库状态及信息(内存、数据库、编码格式、表、列、索引等)

    备战2022春招或暑期实习,本专栏会持续输出MySQL系列文章,祝大家每天进步亿点点!文末私信作者,我们一起去大厂. 本篇总结的是 <如何查看MySQL数据库状态及信息>,后续会每日更新~ ...

  4. MySQL数据库,从入门到精通:第七篇——MySQL单行函数应用

    MySQL数据库,从入门到精通:第七篇--MySQL单行函数应用 第七篇_单行函数 1. 函数的理解 1.1 什么是函数 1.2 不同DBMS函数的差异 2. 数值函数 2.1 基本函数 2.3 三角 ...

  5. MySQL数据库,从入门到精通:第四篇——MySQL中常用的运算符及其用法

    MySQl学习(MySQL数据库,从入门到精通:第四篇--MySQL中常用的运算符及其用法 第四篇_MySQL中常用的运算符及其用法运算符 1. 算术运算符 1.加法与减法运算符 2.乘法与除法运算符 ...

  6. 阿里云安装数据库mysql数据库服务器_阿里云CentOs服务器 安装与配置mysql数据库...

    Linux 安装mysql 数据库 一下为mysql 安装教程 Using username"root". Last login: Tue Oct8 09:30:34 2019 f ...

  7. mysql数据库根据经纬度计算距离,获取离我最近的地点列表,并排序。附近的人,附近商店等功能,一个sql就搞定

    mysql数据库根据经纬度计算距离,获取离我最近的地点列表,并排序 附近的人,附近商家等功能,顺序显示顺序是由近到远的,便利用户查看和判断,sql实现如下. StringBuilder sb=new ...

  8. Kafka实战 - 06 Kafka消费者:从指定Topic拉取工单处置记录信息并存入MongoDB数据库

    文章目录 1. 处置记录表 t_disposal_record 2. kafka 主题和消费者配置 3. 定义一个线程任务 KafkaTask 1. kafka Topic中的数据:KafkaDisp ...

  9. kafka consumer配置拉取速度慢_Kafka消费者的使用和原理

    这周我们学习下消费者,仍然还是先从一个消费者的Hello World学起: public class Consumer { public static void main(String[] args) ...

最新文章

  1. html背景图片自适应纵向,HTML5 body设置全屏背景图片 如何让body的背景图片自适应整个屏----实战经验...
  2. mac上的mongodb安装与使用的踩坑记
  3. 数据结构 3-1-1 栈
  4. vi/vim 键盘图
  5. jQuery实现手风琴效果
  6. php上证指数抽奖代码,[高手指点] 美团网的抽奖是什么原理?
  7. 十二导联动态心电图技术参数
  8. MFC CImageList 详解
  9. 将图片url转file类型
  10. 微信ubuntu版服务器,Ubuntu 18.04 安装微信(Linux通用)
  11. 键盘没有 菜单键 menu 键盘映射
  12. java 加密 压缩_如何用java 将文件加密压缩为zip文件.
  13. 举个栗子!Tableau 技巧(133):完整显示工具提示中的数据
  14. matlab gui双音拨号,用matlab GUI功能模拟DTMF拨号系统.doc
  15. 微信小游戏开发实战教程15-关卡编辑器的制作以及关卡分享功能的实现
  16. Github创建、删除organization
  17. 【使用python和flask建个人博客】增加了重复类型的卡片功能,用于更好的完成日常的工作与生活
  18. 利用高斯(Guass)算法求解2维的SVP向量
  19. 如何用数学课件制作工具演示正方体展开动画
  20. SEO工作者面试基本都会被问到的问题

热门文章

  1. Django no such table: django_session错误解决办法
  2. linux中为文件赋读写权限
  3. Oracle中ROWNUM伪列和ROWID伪列的用法与区别
  4. matplotlib绘图_手把手教你使用Matplotlib绘图实战
  5. JavaScript——面向对象(封装)
  6. linux有两种工作界面,Linux 向用户提供了两种界面:用户界面和系统调用。
  7. myisam和innodb到底谁更快
  8. Spring重点面题总结
  9. mysql索引学习笔记
  10. tomcat设置http自动跳转为https访问