1.架构图

2.实现实例

2.1 通过flink cdc 的两张表 合并 成一张视图, 同时写入到数据湖(hudi) 中 同时写入到kafka 中

2.2 实现思路

1.在flinksql 中创建flink cdc 表
2.创建视图(用两张表关联后需要的列的结果显示为一张速度)
3.创建输出表,关联Hudi表,并且自动同步到Hive表
4.查询视图数据,插入到输出表 -- flink  后台实时执行

2.3pom 文件需要的类

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>wudl-hudi</artifactId><groupId>wudl-hudi</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>flink13.5-hudi</artifactId><!-- 指定仓库位置,依次为aliyun、apache和cloudera仓库 --><repositories><repository><id>aliyun</id><url>http://maven.aliyun.com/nexus/content/groups/public/</url></repository><repository><id>apache</id><url>https://repository.apache.org/content/repositories/snapshots/</url></repository><repository><id>cloudera</id><url>https://repository.cloudera.com/artifactory/cloudera-repos/</url></repository><repository><id>spring-plugin</id><url>https://repo.spring.io/plugins-release/</url></repository></repositories><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>${java.version}</maven.compiler.source><maven.compiler.target>${java.version}</maven.compiler.target><java.version>1.8</java.version><scala.binary.version>2.12</scala.binary.version><flink.version>1.13.5</flink.version><hadoop.version>2.7.3</hadoop.version><mysql.version>8.0.16</mysql.version><flink-mysql-cdc>2.0.2</flink-mysql-cdc></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-jdbc_2.12</artifactId><version>1.10.3</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>${flink-mysql-cdc}</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.78</version></dependency><!-- Flink Client --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- Flink Table API & SQL --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.hudi</groupId><artifactId>hudi-flink-bundle_${scala.binary.version}</artifactId><version>0.10.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-shaded-hadoop-2-uber</artifactId><version>2.7.5-10.0</version></dependency><!-- MySQL/FastJson/lombok --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>${mysql.version}</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.12</version></dependency><!-- slf4j及log4j --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.7</version><scope>runtime</scope></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version><scope>runtime</scope></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.0.0</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build>
</project>

2.4 代码实现

package com.wudl.hudi.sink;//import org.apache.flink.api.common.restartstrategy.RestartStrategies;
//import org.apache.flink.runtime.state.filesystem.FsStateBackend;
//import org.apache.flink.streaming.api.CheckpointingMode;
//import org.apache.flink.streaming.api.environment.CheckpointConfig;
//import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
//import org.apache.flink.table.api.EnvironmentSettings;
//import org.apache.flink.table.api.Table;
//import org.apache.flink.table.api.TableResult;
//import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
//
//import static org.apache.flink.table.api.Expressions.$;import com.wudl.hudi.utils.MyKafkaUtil;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;import static org.apache.flink.table.api.Expressions.$;/*** @author :wudl* @date :Created in 2022-02-19 22:18* @description:* @modified By:* @version: 1.0*/public class MysqlJoinMysqlHuDi {public static void main(String[] args) throws Exception {// 1-获取表执行环境getExecutionEnvironmentStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// TODO: 由于增量将数据写入到Hudi表,所以需要启动Flink Checkpoint检查点env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);// 1.1 开启CKenv.enableCheckpointing(5000L);env.getCheckpointConfig().setCheckpointTimeout(10000L);env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//正常Cancel任务时,保留最后一次CKenv.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//重启策略env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000L));//状态后端env.setStateBackend(new FsStateBackend("hdfs://192.168.1.161:8020/flink-hudi/ck"));//设置访问HDFS的用户名System.setProperty("HADOOP_USER_NAME", "root");// 2-创建输入表,TODO:从Kafka消费数据tableEnv.executeSql("CREATE TABLE IF NOT EXISTS   source_mysql ( " +"  id BIGINT  primary key NOT ENFORCED ," +"  name string," +"  age int ," +"  birthday TIMESTAMP(3)," +"  ts TIMESTAMP(3)" +") WITH ( " +" 'connector' = 'mysql-cdc', " +" 'hostname' = '192.168.1.162', " +" 'port' = '3306', " +" 'username' = 'root', " +" 'password' = '123456', " +" 'server-time-zone' = 'Asia/Shanghai', " +" 'scan.startup.mode' = 'initial', " +" 'database-name' = 'wudldb', " +" 'table-name' = 'Flink_cdc' " +" )");tableEnv.executeSql("CREATE TABLE IF NOT EXISTS   source_mysql_Flink_cdd ( " +"  id BIGINT  primary key NOT ENFORCED ," +"  phone string," +"  address string ," +"  ts TIMESTAMP(3)" +") WITH ( " +" 'connector' = 'mysql-cdc', " +" 'hostname' = '192.168.1.162', " +" 'port' = '3306', " +" 'username' = 'root', " +" 'password' = '123456', " +" 'server-time-zone' = 'Asia/Shanghai', " +" 'scan.startup.mode' = 'initial', " +" 'database-name' = 'wudldb', " +" 'table-name' = 'Flink_cdd' " +" )");String joinSql = "SELECT b.id id,b.name name,b.age age,CAST(b.birthday as STRING) birthday ,a.phone phone,a.address address,CAST(a.ts AS STRING) ts FROM source_mysql_Flink_cdd  a INNER JOIN   source_mysql b ON a.id = b.id";Table tableMysqlJoin = tableEnv.sqlQuery(joinSql);// 4-创建输出表,TODO: 关联到Hudi表,指定Hudi表名称,存储路径,字段名称等等信息tableEnv.createTemporaryView("viewFlinkCdc",tableMysqlJoin);     tableEnv.executeSql("CREATE TABLE myslqjoinmysqlhudiSink (" +" id BIGINT PRIMARY KEY NOT ENFORCED," +" name STRING," +" age INT," +" birthday STRING," +" phone STRING," +" address STRING," +" ts STRING" +")" +"WITH (" +" 'connector' = 'hudi'," +" 'path' = 'file:///D:/myslqjoinmysqlhudiSink'," +
//                      " 'path' = 'hdfs://192.168.1.161:8020/hudi-warehouse/order_hudi_sink' ,\n" +" 'table.type' = 'MERGE_ON_READ'," +" 'write.operation' = 'upsert'," +" 'hoodie.datasource.write.recordkey.field'= 'id'," +" 'write.precombine.field' = 'ts'," +" 'write.tasks'= '1'" +")");TableResult kafkaSink = tableEnv.executeSql("CREATE TABLE flinkCdc_kafka_Sink (" +"  id BIGINT NOT NULL," +"  name STRING," +"  age INT," +"  birthday STRING," +"  phone STRING," +"  address STRING," +"  ts STRING" +") WITH (" +"  'connector' = 'kafka'," +"  'topic' = 'sinktest'," +"  'scan.startup.mode' = 'earliest-offset', "+"  'properties.bootstrap.servers' = '192.168.1.161:6667'," +"  'format' = 'debezium-json'," +"    'debezium-json.ignore-parse-errors'='true' " +")");//        // 5-通过子查询方式,将数据写入输出表tableEnv.executeSql("INSERT INTO myslqjoinmysqlhudiSink " +"SELECT id,name,age,birthday,phone,address, ts FROM viewFlinkCdc");tableEnv.sqlQuery("select * from flinkCdc_kafka_Sink").printSchema();tableEnv.sqlQuery("select * from viewFlinkCdc").printSchema();tableEnv.executeSql("insert into flinkCdc_kafka_Sink  SELECT b.id id,b.name name,b.age age,CAST(b.birthday as STRING) birthday ,a.phone phone,a.address address,CAST(a.ts AS STRING) ts FROM source_mysql_Flink_cdd  a INNER JOIN   source_mysql b ON a.id = b.id ");//  tableEnv.executeSql("insert into myslqjoinmysqlhudiSink  SELECT b.id id,b.name name,b.age age,CAST(b.birthday as STRING) birthday ,a.phone phone,a.address address,CAST(a.ts AS STRING) ts FROM source_mysql_Flink_cdd  a INNER JOIN   source_mysql b ON a.id = b.id ");//   tableEnv.executeSql("insert into flinkCdc_kafka_Sink  select id,name,age,CAST(birthday as STRING) birthday ,phone, address,CAST(ts AS STRING) ts  from myslqjoinmysqlhudiSink ");//        tableEnv.executeSql("insert into flinkcdc_hudi_sink  select id,name,age,CAST(birthday as STRING) birthday,  CAST(ts as STRING) ts  from source_mysql ");System.out.println("--------------------------");}
}

2.5 mysql 表结构

CREATE TABLE `Flink_cdc` (`id` bigint(64) NOT NULL AUTO_INCREMENT,`name` varchar(64) DEFAULT NULL,`age` int(20) DEFAULT NULL,`birthday` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,`ts` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=7604 DEFAULT CHARSET=utf8mb4
#*********************************************************************************
CREATE TABLE `Flink_cdd` (`id` bigint(20) NOT NULL AUTO_INCREMENT,`phone` varchar(20) DEFAULT NULL,`address` varchar(200) DEFAULT NULL,`ts` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=7417 DEFAULT CHARSET=utf8mb4

2.6mysql 代码实现

package com.wudl.hudi.source;import com.alibaba.fastjson.JSON;
import com.wudl.hudi.entity.FlinkCdcBean;
import com.wudl.hudi.entity.Order;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;import java.text.SimpleDateFormat;
import java.util.*;/*** @author :wudl* @date :Created in 2022-02-19 14:24* @description:* @modified By:* @version: 1.0*/public class GenerateMysqlFlinkCdcBean implements SourceFunction<FlinkCdcBean> {private boolean isRunning = true;String[] citys = {"北京", "广东", "山东", "江苏", "河南", "上海", "河北", "浙江", "香港", "山西", "陕西", "湖南", "重庆", "福建", "天津", "云南", "四川", "广西", "安徽", "海南", "江西", "湖北", "山西", "辽宁", "内蒙古"};Integer i = 0;List<Order> list = new ArrayList<>();@Overridepublic void run(SourceContext<FlinkCdcBean> ctx) throws Exception {Random random = new Random();SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");while (isRunning) {int number = random.nextInt(4) + 1;String name = getChineseName();String address = citys[random.nextInt(citys.length)];int age = random.nextInt(25);String birthday = getDate();String phone = getTel();java.sql.Timestamp ts = new java.sql.Timestamp(df.parse(getDate()).getTime());FlinkCdcBean flinkCdcBean = new FlinkCdcBean(name, age, birthday, ts, phone, address);ctx.collect(flinkCdcBean);}}/*** 获取当前时间** @return*/public static String getDate() throws InterruptedException {Calendar calendar = Calendar.getInstance();Date date = calendar.getTime();String dataStr = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS").format(date);Thread.sleep(10);return dataStr;}public static int getNum(int start, int end) {return (int) (Math.random() * (end - start + 1) + start);}private static String[] telFirst = "134,135,136,137,138,139,150,151,152,157,158,159,130,131,132,155,156,133,153".split(",");/*** 获取手机号码** @return*/private static String getTel() {int index = getNum(0, telFirst.length - 1);String first = telFirst[index];String second = String.valueOf(getNum(1, 888) + 10000).substring(1);String third = String.valueOf(getNum(1, 9100) + 10000).substring(1);return first + second + third;}@Overridepublic void cancel() {isRunning = false;}private static String firstName="赵钱孙李周吴郑王冯陈褚卫蒋沈韩杨朱秦尤许何吕施张孔曹严华金魏陶姜戚谢邹喻柏水窦章云苏潘葛奚范彭郎鲁韦昌马苗凤花方俞任袁柳酆鲍史唐费廉岑薛雷贺倪汤滕殷罗毕郝邬安常乐于时傅皮卞齐康伍余元卜顾孟平黄和穆萧尹姚邵湛汪祁毛禹狄米贝明臧计伏成戴谈宋茅庞熊纪舒屈项祝董梁杜阮蓝闵席季麻强贾路娄危江童颜郭梅盛林刁钟徐邱骆高夏蔡田樊胡凌霍虞万支柯咎管卢莫经房裘缪干解应宗宣丁贲邓郁单杭洪包诸左石崔吉钮龚程嵇邢滑裴陆荣翁荀羊於惠甄魏加封芮羿储靳汲邴糜松井段富巫乌焦巴弓牧隗山谷车侯宓蓬全郗班仰秋仲伊宫宁仇栾暴甘钭厉戎祖武符刘姜詹束龙叶幸司韶郜黎蓟薄印宿白怀蒲台从鄂索咸籍赖卓蔺屠蒙池乔阴郁胥能苍双闻莘党翟谭贡劳逄姬申扶堵冉宰郦雍却璩桑桂濮牛寿通边扈燕冀郏浦尚农温别庄晏柴瞿阎充慕连茹习宦艾鱼容向古易慎戈廖庚终暨居衡步都耿满弘匡国文寇广禄阙东殴殳沃利蔚越夔隆师巩厍聂晁勾敖融冷訾辛阚那简饶空曾毋沙乜养鞠须丰巢关蒯相查后江红游竺权逯盖益桓公万俟司马上官欧阳夏侯诸葛闻人东方赫连皇甫尉迟公羊澹台公冶宗政濮阳淳于仲孙太叔申屠公孙乐正轩辕令狐钟离闾丘长孙慕容鲜于宇文司徒司空亓官司寇仉督子车颛孙端木巫马公西漆雕乐正壤驷公良拓拔夹谷宰父谷粱晋楚阎法汝鄢涂钦段干百里东郭南门呼延归海羊舌微生岳帅缑亢况后有琴梁丘左丘东门西门商牟佘佴伯赏南宫墨哈谯笪年爱阳佟第五言福百家姓续";private static String girl="秀娟英华慧巧美娜静淑惠珠翠雅芝玉萍红娥玲芬芳燕彩春菊兰凤洁梅琳素云莲真环雪荣爱妹霞香月莺媛艳瑞凡佳嘉琼勤珍贞莉桂娣叶璧璐娅琦晶妍茜秋珊莎锦黛青倩婷姣婉娴瑾颖露瑶怡婵雁蓓纨仪荷丹蓉眉君琴蕊薇菁梦岚苑婕馨瑗琰韵融园艺咏卿聪澜纯毓悦昭冰爽琬茗羽希宁欣飘育滢馥筠柔竹霭凝晓欢霄枫芸菲寒伊亚宜可姬舒影荔枝思丽 ";private static String boy="伟刚勇毅俊峰强军平保东文辉力明永健世广志义兴良海山仁波宁贵福生龙元全国胜学祥才发武新利清飞彬富顺信子杰涛昌成康星光天达安岩中茂进林有坚和彪博诚先敬震振壮会思群豪心邦承乐绍功松善厚庆磊民友裕河哲江超浩亮政谦亨奇固之轮翰朗伯宏言若鸣朋斌梁栋维启克伦翔旭鹏泽晨辰士以建家致树炎德行时泰盛雄琛钧冠策腾楠榕风航弘";/*** 返回中文姓名*/private static String name_sex = "";private static String getChineseName() {int index=getNum(0, firstName.length()-1);String first=firstName.substring(index, index+1);int sex=getNum(0,1);String str=boy;int length=boy.length();if(sex==0){str=girl;length=girl.length();name_sex = "女";}else {name_sex="男";}index=getNum(0,length-1);String second=str.substring(index, index+1);int hasThird=getNum(0,1);String third="";if(hasThird==1){index=getNum(0,length-1);third=str.substring(index, index+1);}return first+second+third;}public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<FlinkCdcBean> addSource = env.addSource(new GenerateMysqlFlinkCdcBean());addSource.print();Thread.sleep(5000);addSource.addSink(new MysqlJdbcSink());env.execute();}}
package com.wudl.hudi.source;import com.wudl.hudi.entity.FlinkCdcBean;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;/*** @author :wudl* @date :Created in 2022-02-19 15:08* @description:* @modified By:* @version: 1.0*/public class MysqlJdbcSink extends RichSinkFunction<FlinkCdcBean> {// 声明连接和预编译语句Connection connection = null;PreparedStatement insertStmtFlink_cdc = null;PreparedStatement insertStmtFlink_cdd = null;@Overridepublic void open(Configuration parameters) throws Exception {connection = DriverManager.getConnection("jdbc:mysql://192.168.1.162:3306/wudldb?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai", "root", "123456");insertStmtFlink_cdc = connection.prepareStatement("INSERT INTO `wudldb`.`Flink_cdc`(NAME,age,birthday,ts) VALUES(?,?,?,?) ");insertStmtFlink_cdd = connection.prepareStatement("INSERT INTO  `wudldb`.`Flink_cdd` (phone,address,ts) VALUES(?,?,?)  ");}// 每来一条数据,调用连接,执行sql@Overridepublic void invoke(FlinkCdcBean fc, Context context) throws Exception {/*****************    */insertStmtFlink_cdc.setString(1, fc.getName());insertStmtFlink_cdc.setInt(2, fc.getAge());insertStmtFlink_cdc.setString(3, fc.getTs().toString());insertStmtFlink_cdc.setString(4, fc.getTs().toString());insertStmtFlink_cdc.execute();insertStmtFlink_cdd.setString(1, fc.getPhone());insertStmtFlink_cdd.setString(2, fc.getAddress());insertStmtFlink_cdd.setString(3, fc.getTs().toString());insertStmtFlink_cdd.executeUpdate();}@Overridepublic void close() throws Exception {insertStmtFlink_cdc.close();insertStmtFlink_cdd.close();connection.close();}
}

2.7 读取hudi 数据

package com.wudl.hudi.sink;import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;/*** @author :wudl* @date :Created in 2022-02-19 22:18* @description:* @modified By:* @version: 1.0*/public class MysqlJoinMysqlHuDiRead {public static void main(String[] args) throws Exception {// 1-获取表执行环境getExecutionEnvironmentStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// TODO: 由于增量将数据写入到Hudi表,所以需要启动Flink Checkpoint检查点env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);tableEnv.executeSql("CREATE TABLE order_hudi(\n" +" id BIGINT PRIMARY KEY NOT ENFORCED," +" name STRING," +" age INT," +" birthday STRING," +" phone STRING," +" address STRING," +" ts STRING" +")" +"WITH (" +"    'connector' = 'hudi'," +"    'path' = 'file:///D:/myslqjoinmysqlhudiSink'," +"    'table.type' = 'MERGE_ON_READ'," +"    'read.streaming.enabled' = 'true'," +"    'read.streaming.check-interval' = '4'" +")");tableEnv.executeSql("select * from  order_hudi ").print();}
}


运行jar

2.8 提交集群后需要修改的hdfs 上面的路径

 tableEnv.executeSql("CREATE TABLE myslqjoinmysqlhudiSink(\n" +"id bigint ,\n" +"name string,\n" +"age int,\n" +"birthday STRING,\n" +"phone STRING,\n" +"address STRING,\n" +"ts TIMESTAMP(3),\n" +"primary key(id) not enforced\n" +")\n" +"with(\n" +"'connector'='hudi',\n" +"'path'= 'hdfs://192.168.1.161:8020/myslqjoinmysqlhudiSink', \n" +"'table.type'= 'MERGE_ON_READ',\n" +"'hoodie.datasource.write.recordkey.field'= 'id', \n" +"'write.precombine.field'= 'ts',\n" +"'write.tasks'= '1',\n" +"'write.rate.limit'= '2000', \n" +"'compaction.tasks'= '1', \n" +"'compaction.async.enabled'= 'true',\n" +"'compaction.trigger.strategy'= 'num_commits',\n" +"'compaction.delta_commits'= '1',\n" +"'changelog.enabled'= 'true',\n" +"'read.streaming.enabled'= 'true',\n" +"'read.streaming.check-interval'= '3',\n" +"'hive_sync.enable'= 'true',\n" +"'hive_sync.mode'= 'hms',\n" +"'hive_sync.metastore.uris'= 'thrift://node02.com:9083',\n" +"'hive_sync.jdbc_url'= 'jdbc:hive2://node02.com:10000',\n" +"'hive_sync.table'= 'myslqjoinmysqlhudiSink',\n" +"'hive_sync.db'= 'db_hive',\n" +"'hive_sync.username'= 'root',\n" +"'hive_sync.password'= '123456',\n" +"'hive_sync.support_timestamp'= 'true'\n" +")");

2.9 命令提交

[root@node01 bin]# ./flink run -m 192.168.1.161:8081 -c com.wudl.hudi.sink.MysqlJoinMysqlHuDi /opt/module/jar/flink13.5-hudi-1.0-SNAPSHOT-jar-with-dependencies.jar
Setting HBASE_CONF_DIR=/etc/hbase/conf because no HBASE_CONF_DIR was set.
log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Job has been submitted with JobID 225aba756224502aa9e643d75560ddb9
Job has been submitted with JobID 63a6b9e2bb697a4ce0c3f993c720b534
--------------------------
[root@node01 bin]# 

flink 后台

效果

Flink 版本数据湖(hudi)实时数仓---flinkcdc hudi kafak hive相关推荐

  1. Flink 与 TiDB 联合发布实时数仓最佳实践白皮书

    简介:点击链接,动动手指获取白皮书-另外,实时数仓 Meetup 议题征集中! GitHub 地址 https://github.com/apache/flink 欢迎大家给 Flink 点赞送 st ...

  2. 如何基于 Apache Doris 与 Apache Flink 快速构建极速易用的实时数仓

    随着大数据应用的不断深入,企业不再满足离线数据加工计算的时效,实时数据需求已成为数据应用新常态.伴随着实时分析需求的不断膨胀,传统的数据架构面临的成本高.实时性无法保证.组件繁冗.运维难度高等问题日益 ...

  3. 实时数仓 大数据 Hadoop flink kafka

    ⼀.实时数仓建设背景 实时需求⽇趋迫切 ⽬前各⼤公司的产品需求和内部决策对于数据实时性的要求越来越迫切,需要实时数仓的能⼒来赋能.传统离 线数仓的数据时效性是 T+1,调度频率以天为单位,⽆法⽀撑实时 ...

  4. 【硬刚大数据】Flink在实时在实时计算平台和实时数仓中的企业级应用小结

    欢迎关注博客主页:https://blog.csdn.net/u013411339 欢迎点赞.收藏.留言 ,欢迎留言交流! 本文由[王知无]原创,首发于 CSDN博客! 本文首发CSDN论坛,未经过官 ...

  5. Flink实践:跨境电商 Shopee 的实时数仓之路

    问题导读: 1.为什么要建立实时数仓? 2.Flink 在实时数据数仓建设中结合 Druid.Hive 有哪些应用场景? 3.实时任务监控如何实现? 4.Streaming SQL 如何平台化? 导读 ...

  6. ULTRON — 360基于Flink的实时数仓平台

    前言:ULTRON项目从去年开始立项,伴随着Flink社区的成熟不断迭代.在开发过程中,面临着许多困难,一方面是人手紧张,另一方面是需要打通和实现的功能点复杂,从底层K8S/YARN到FLINK核心的 ...

  7. Flink 数据湖 助力美团数仓增量生产

    一.美团数仓架构图 如上图,是美团最新的数仓架构图. 整个架构图分为三层,从下往上看,最下面一层是数据安全,包括受限域认证系统.加工层权限系统,应用层权限系统,安全审计系统,来保证最上层数据集成与处理 ...

  8. 小米基于 Flink 的实时数仓建设实践

    摘要:本文整理自小米软件开发工程师周超,在 Flink Forward Asia 2022 平台建设专场的分享.本篇内容主要分为四个部分: 小米数仓架构演变 Flink+Iceberg 架构升级实践 ...

  9. hive增量表和全量表_基于 Flink + Hive 构建流批一体准实时数仓

    基于 Hive 的离线数仓往往是企业大数据生产系统中不可缺少的一环.Hive 数仓有很高的成熟度和稳定性,但由于它是离线的,延时很大.在一些对延时要求比较高的场景,需要另外搭建基于 Flink 的实时 ...

最新文章

  1. pip 20.3 新版本发布!即将抛弃 Python 2.x
  2. java继承的关键字_超级关键字在Java继承中的作用
  3. android.animation(1) - ValueAnimator的ofInt(), ofFloat(), addUpdateListener(), addListener()(转)
  4. 互联网公司常用分库分表方案汇总
  5. Eclipse And Android 使用心得
  6. 最优化课堂笔记02:第二章 线性规划
  7. 1062 最简分数(PAT乙级 C++)
  8. 前端学习(1437):vue一些链接
  9. python中split函数_python strip()函数和Split函数的用法总结
  10. randn函数加噪声_语义分割中常用的损失函数1(基础篇)
  11. Tomcat服务脚本
  12. java 命令行读取_Java:从控制台(console,命令行)读取字符 | 学步园
  13. 数学建模酶促反应matlab求解,数学建模——酶促反应.doc
  14. 人工智能为失散家人寻找“回家路”
  15. 批处理写的关机小程序--bat
  16. Artlantis studio 2021 for Mac(三维渲染工具)
  17. 觉得清楚,跟说清楚写清楚,两回事
  18. mui下拉刷新 ,无法滑动
  19. java 7 new feature
  20. 大龄程序员该何去何从,35岁后的路在何方

热门文章

  1. 什么是作用域和作用域链?
  2. 2-2 CAD基础 对象捕捉/追踪
  3. Android10相机,方向错了?一亿像素只是噱头?小米10pro硬刚相机评测
  4. AWZ爱伪装正版一键新机软件详细安装教程
  5. ffmpeg推流B站直播--新手C++项目尝试
  6. 强化学习系列 - 刘建平Pinard
  7. 教程|监控项类型—SNMP客户端
  8. Mac电脑使用:关闭iMessage上登陆的账号,关闭iMessage功能
  9. 那些炫酷的词云图是怎么做出来的?
  10. 【daz教程】相关桥接及安装