因为flink版本迭代比较迅速,在我们进行代码的编写过程中容易出现版本不兼容的问题,为此本文是在flink版本为1.12.0的基础上完成开发的。

1:配置maven依赖(重要)

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>io-flink</groupId><artifactId>flink_id</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-scala-bridge_2.11</artifactId><version>1.12.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.11</artifactId><version>1.12.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.11</artifactId><version>1.12.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.11</artifactId><version>1.12.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>1.12.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>1.12.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>1.12.0</version></dependency><!--flink连接mysql--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_2.11</artifactId><version>1.12.0</version></dependency><!--flink连接kafka将得到的数据转化为json格式  --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>1.12.0</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.49</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.75</version></dependency></dependencies>
</project>

注:这里面有一个比较坑的地方,我们在之前的版本配置flink连接kafka的时候会遇到许多坑最常见的就是kafka在flink1.12之后需要设置为:flink-connector-kafka_2.11

而在1.12之前的版本应该设置成:flink-connector-kafka-0.11_2.11(新版本已经弃用)

在运行代码的时候会发生如下错误:

2:代码部分如下

package kafka2flink2mysqlimport org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.table.api.{EnvironmentSettings, Table}
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.slf4j.LoggerFactory
import org.slf4j.event.Levelobject flink_sql {private val log = LoggerFactory.getLogger(flink_sql.getClass.getSimpleName)def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)val settings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env,settings)import org.apache.flink.api.scala._val sql_info ="""CREATE TABLE t_user (|  user_id STRING,|  name STRING,|  sex STRING,|  money STRING|)| WITH|(|'connector' = 'kafka',|'topic' = 'flink',|'properties.bootstrap.servers' = 'hadoop:9092',|'scan.startup.mode' = 'earliest-offset',|'properties.group.id' = 'group_1',|'format' = 'json'|)""".stripMargintableEnv.executeSql(sql_info)//获得刚刚生成的表转化为table类val t_user: Table = tableEnv.from("t_user")//输出打印table的schema信息t_user.printSchema()//或者转化为流打印出来val stream: DataStream[(String, String, String, String)] = tableEnv.toAppendStream[(String,String,String,String)](t_user)stream.print()//定义mysql输出表val sql_info2 ="""CREATE TABLE mysql_user (|  user_id STRING,|  name STRING,|  sex STRING,|  money STRING|)| WITH|(|'connector' = 'jdbc',|'url' = 'jdbc:mysql://localhost:3306/one?serverTimezone=Asia/Shanghai&zeroDaeTimeBehavior=convertToNull&useSSL=true',|'driver' = 'com.mysql.jdbc.Driver',|'username' = 'root',|'password' = 'root',|'table-name' = 't_user',|'lookup.cache.max-rows' = '100',|'lookup.cache.ttl' = '60000'|)""".stripMargintableEnv.executeSql(sql_info2)var insert ="""|insert into mysql_user|select * from t_user|""".stripMargintableEnv.executeSql(insert)env.execute("flink_running")}
}

切记kafka的配置参数一定要按照提示写入,不然会报错

kafka端数据传输格式如下:
{"user_id":"1001","name":"zhangsan","sex":"nv","money":"499"}

java代码实现如下

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class datastreaming_demo {public static void main(String[] args) {Logger log = LoggerFactory.getLogger(datastreaming_demo.class);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);EnvironmentSettings build = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, build);String sql ="CREATE TABLE t_user (\n" +"user_id STRING,\n" +"name STRING,\n" +"sex STRING,\n" +"money STRING\n" +")\n" +"WITH\n" +"(\n" +"'connector' = 'kafka',\n" +"'topic' = 'flink_streaming',\n" +"'properties.bootstrap.servers' = 'localhost:9092',\n" +"'scan.startup.mode' = 'latest-offset',\n" +"'properties.group.id' = 'group_1',\n" +"'format' = 'json')\n";tableEnv.executeSql(sql);Table t_user = tableEnv.from("t_user");t_user.printSchema();
//        DataStream<pojo> pojoDataStream = tableEnv.toAppendStream(t_user,pojo.class);
//        pojoDataStream.print();String sql2 ="CREATE TABLE mysql_user (\n" +"user_id STRING,\n" +"name STRING,\n" +"sex STRING,\n" +"money STRING,\n" +"PRIMARY KEY (user_id) NOT ENFORCED\n"+")\n" +" WITH\n" +"(\n" +"'connector' = 'jdbc',\n" +"'url' = 'jdbc:mysql://localhost:3306/POC_DB?serverTimezone=Asia/Shanghai&zeroDaeTimeBehavior=convertToNull&useSSL=false',\n" +"'driver' = 'com.mysql.jdbc.Driver',\n" +"'username' = 'root',\n" +"'password' = '5675219999',\n" +"'table-name' = 'mysql_user',\n" +"'lookup.cache.max-rows' = '100',\n"+"'lookup.cache.ttl' = '60000'\n"+")";tableEnv.executeSql(sql2);Table mysql_user = tableEnv.from("mysql_user");mysql_user.printSchema();String insert = "insert into mysql_user select * from t_user ";tableEnv.executeSql(insert);try {env.execute("flink_running");} catch (Exception e) {log.info("抛出异常!");System.out.println(e.getMessage());}
}
}

pojo类

// 一定要包含空构造函数才行
public class pojo {public String user_id;public String name;public String sex;public String money;public pojo(String user_id, String name, String sex, String money) {this.user_id = user_id;this.name = name;this.sex = sex;this.money = money;}public pojo() {}public String getUser_id() {return user_id;}public String getName() {return name;}public String getSex() {return sex;}public String getMoney() {return money;}public void setUser_id(String user_id) {this.user_id = user_id;}public void setName(String name) {this.name = name;}public void setSex(String sex) {this.sex = sex;}public void setMoney(String money) {this.money = money;}}

flinksql实时读取kafka写入mysql相关推荐

  1. Flinksql读取Kafka写入Iceberg 实践亲测

    Flink sql实时读取Kafka写入Iceberg 实践亲测 前言 本文记录了使用HDFS的一个路径作为iceberg 的结果表,使用Flink sql实时消费kafka中的数据并写入iceber ...

  2. 【Spark】Spark Stream读取kafka写入kafka报错 AbstractMethodError

    1.概述 根据这个博客 [Spark]Spark 2.4 Stream 读取kafka 写入kafka 报错如下 Exception in thread "main" java.l ...

  3. 【Spark】Spark 2.4 Stream 读取kafka 写入kafka

    1.概述 昨天一网友写了一个spark程序 读取kafka写入kafka,结果数据就是无法写入,然后交给我看看,这个程序是spark stream ,这个东东我都没玩过,我用过spark struct ...

  4. java实时读取文件内容,java实时读取和写入文件

    实时读取和写入指定文件的内容 1.[代码][Java]代码 package org.log.file; import java.io.*; import java.text.SimpleDateFor ...

  5. 114.Spark大型电商项目-广告点击流量实时统计-使用高性能方式将实时计算结果写入MySQL中

    目录 误区 Spark Streaming foreachRDD的正确使用方式 对于这种实时计算程序的mysql插入,有两种pattern(模式) 代码 AdUserClickCount.java I ...

  6. python读取excel写入数据库_python实现读取excel写入mysql的小工具详解

    Python是数据分析的强大利器 利用Python做数据分析,第一步就是学习如何读取日常工作中产生各种excel报表并存入数据中,方便后续数据处理. 这里向大家分享python如何读取excel,并使 ...

  7. Flink 分别读取kafka和mysql作为source

    需求 首先从kafka中读取数据,然后从mysql中读取数据,然后将这两个数据进行合并处理. 环境 Flink 1.8.2 实现 public static void main(String[] ar ...

  8. Php连接及读取和写入mysql数据库的常用代码

    在这里我总结了常用的PHP连接MySQL数据库以及读取写入数据库的方法,希望能够帮到你,当然也是作为我自己的一个回顾总结. 1.为了更好地设置数据连接,一般会将数据连接所涉及的值定义成变量. $mys ...

  9. python读取excel写入mysql pandas_python pandas 读取文件 写入文件excel

    读取数据 import pandas as pd import collections def readLocationCodeForExcel(): read_file = r"test. ...

最新文章

  1. Apache Hadoop 2.7如何支持读写OSS
  2. React-Native学习指南
  3. PHP二维数组根据字段排序
  4. C#三层架构第五课之DBUtil层设计
  5. Linux -su、sudo、限制root远程登录
  6. php提交之前验证数据ajax提示,在通过Ajax请求提交之前使用jQuery进行表单验证
  7. P1989 无向图三元环计数 思维 + 建图
  8. MySQL weekday()函数
  9. C什么k什么_K线图基础知识丨什么是K线散兵坑形态?K线散兵坑形态的操作与案例详解...
  10. 【BZOJ】1969: [Ahoi2005]LANE 航线规划
  11. 54. yii 动作参数绑定
  12. 网络蜘蛛的基本原理--转载
  13. 网上订餐php论文,php032网上订餐系统
  14. Excel导出数据 基于注解实现 复制即可用
  15. 刘强东割袍弃兄弟,马爸爸醉心 996
  16. 寺库TRYTRY CMO欧泽超:技术向善,科学变美
  17. UE4 network优化
  18. cv2.HoughCircles函数的参数
  19. 国内的商业投诉机构形同虚设!
  20. 大数据有哪些工作?岗位技能要求汇总

热门文章

  1. 怎么给word文档注音_如何为整篇word文档加拼音标注
  2. 体育学校有没有计算机专业,职高有体育专业吗
  3. Git如何修改commit信息
  4. Mixly K210 人脸识别 物体识别 齐护机器人发布AIstart K210人工智能学习主机,解决人工智能学习难的问题
  5. 天圆地方放样软件_特大型天圆地方构件的放样制做方法及过程
  6. shiro认证时出现报错Submitted credentials for token [org.apache.shiro.authc.UsernamePasswordToken -
  7. 《浏览器工作原理与实践》学习笔记
  8. java格式化金额千位数,java金额格式化解决思路
  9. JAVA基础学习-复习day11
  10. 【DDD】持久化领域对象的方法实践