数据湖与实时数据湖是什么?

各个行业企业都在构建企业级数据湖,将企业内多种格式数据源汇聚的大数据平台,通过严格的数据权限和资源管控,将数据和算力开放给各种使用者。一份数据支持多种分析,是数据湖最大的特点。如果数据湖的数据,从数据源产生后,可以在1分钟以内实时进入到数据湖存储,支持各种交互式分析,这种数据湖通常叫做实时数据湖,如果可以做到15分钟之内,也可称为准实时数据湖。构建实时数据湖,正在成为5G和IOT时代,支撑各个企业实时分析业务的数据湖新目标。

华为MRS实时数据湖方案介绍

  1. 生产库数据通过CDC工具(debezium)实时录入到MRS集群中Kafka的指定topic里;
  2. 在MRS集群启动一个SparkStreaming任务,实时读取Kafka指定topic里的数据;
  3. 同时该SparkStreaming任务将读取到的数据进行解析处理并写入到一张hudi表中;
  4. 写入hudi表的同时可以指定该数据也写入hive表;
  5. 通过MRS提供的交互式查询引擎HetuEngine对数据进行快速的交互式查询。

使用华为MRS实时数据湖方案的优势:

  1. ACID事务能力得以保证,湖内一份数据满足所有的分析业务需求,减少数据搬迁,减少数据冗余;
  2. 数据一致性保证,保证增量数据与入湖后数据一致性检测;
  3. 数据加工流转,在一个存储层内闭环,数据流动更高效;
  4. 基于HetuEngine引擎实现交互式查询,性能不降低。

下面会针对方案的三个关键组件:CDC工具,数据存储引擎Hudi,交互式查询引擎HetuEngine进行详细的介绍

样例数据简介

生产库MySQL原始数据(前10条,共1000条):

CDC工具

简介

CDC(changed data capture)为动态数据抓取,常见的方式分为同步和异步。同步CDC主要是采用触发器记录新增数据,基本能够做到实时增量抽取。而异步CDC则是通过分析已经commit的日志记录来得到增量数据信息。常见的CDC工具有Canal, DataBus, Maxwell, Debezium, OGG等。本方案采用debezium作为CDC工具

对接步骤

具体参考:https://fusioninsight.github.io/ecosystem/zh-hans/Data_Integration/DEBEZIUM/

完成对接后,针对MySQL生产库分别做增、改、删除操作对应的kafka消息

增加操作: insert into hudi.hudisource values (1001,“蒋语堂”,38,“女”,“图”,"《星球大战》",28732);

对应kafka消息体:

更改操作: UPDATE hudi.hudisource SET uname=‘Anne Marie’ WHERE uid=1001;

对应kafka消息体:

删除操作: delete from hudi.hudisource where uid=1001;

对应kafka消息体:

Hudi

简介

Apache Hudi是一个Data Lakes的开源方案,Hudi是Hadoop Updates and Incrementals的简写。具有以下的特性

  • ACID事务能力,支持实时入湖和批量入湖。
  • 多种视图能力(读优化视图/增量视图/实时视图),支持快速数据分析。
  • MVCC设计,支持数据版本回溯。
  • 自动管理文件大小和布局,以优化查询性能准实时摄取,为查询提供最新数据。
  • 支持并发读写,基于snapshot的隔离机制实现写入时可读取。
  • 支持原地转表,将存量的历史表转换为Hudi数据集。

样例代码解析

使用Hudi实时入湖的样例代码分三个部分

  • Kafka数据消费
  • 数据内容解析、处理
  • 解析后数据的写入

Kafka数据消费部分样例代码:

String savePath = "hdfs://hacluster/huditest2/";
String groupId = "group1";
System.out.println("groupID is: " + groupId);
String brokerList = "172.16.5.51:21005";
System.out.println("brokerList is: " + brokerList);
String topic = "hudisource";
System.out.println("topic is: " + topic);
String interval = "5";HashMap<String, Object> kafkaParam = new HashMap<>();
kafkaParam.put("bootstrap.servers", brokerList);
kafkaParam.put("group.id", groupId);
kafkaParam.put("auto.offset.reset", "earliest");
kafkaParam.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaParam.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");HashSet<String> topics = new HashSet<>();
topics.add(topic);String[] topicArray = {topic};
Set<String> topicSet = new HashSet<String>(Arrays.asList(topicArray));
ConsumerStrategy consumerStrategy = ConsumerStrategies.Subscribe(topicSet, kafkaParam);//本地调试
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("hudi-java-demo");conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.set("spark.streaming.kafka.maxRatePerPartition", "10");
conf.set("spark.streaming.backpressure.enabled", "true");JavaInputDStream<ConsumerRecord<String, String>> directStream = KafkaUtils.createDirectStream(jssc,LocationStrategies.PreferConsistent(),consumerStrategy);

数据内容解析、处理部分样例代码:

JavaDStream<List> lines =directStream.filter(//过滤空行和脏数据new Function<ConsumerRecord<String, String>, Boolean>() {public Boolean call(ConsumerRecord<String, String> v1) throws Exception {                     if (v1.value() == null) {return false;}try{String op = debeziumJsonParser.getOP(v1.value());}catch (Exception e){return false;}return true;}}).map(new Function<ConsumerRecord<String, String>, List>() {public List call(ConsumerRecord<String, String> v1) throws Exception {//将debezium接进来的数据解析写进List                       String op = debeziumJsonParser.getOP(v1.value());JSONObject json_obj = JSON.parseObject(v1.value());                     Boolean is_delete = false;String out_str = "";if(op.equals("c")){                            out_str =  json_obj.getJSONObject("payload").get("after").toString();}else if(op.equals("u")){                            out_str =   json_obj.getJSONObject("payload").get("after").toString();}else {is_delete = true;out_str =   json_obj.getJSONObject("payload").get("before").toString();}LinkedHashMap<String, String> jsonMap = JSON.parseObject(out_str, new TypeReference<LinkedHashMap<String, String>>() {});int cnt =0;List out_list = new ArrayList();for (Map.Entry<String, String> entry : jsonMap.entrySet()) {out_list.add(entry.getValue());cnt++;}out_list.add(is_delete);String commitTime = Long.toString(System.currentTimeMillis());out_list.add(commitTime);System.out.println(out_list);out_list.add(op);return out_list;}});

debezium更新字段解析样例代码:

public class debeziumJsonParser {public static String getOP(String message){JSONObject json_obj = JSON.parseObject(message);String op = json_obj.getJSONObject("payload").get("op").toString();return  op;}
}

解析后数据的写入hudi表,hive表样例代码:

lines.foreachRDD(new VoidFunction<JavaRDD<List>>() {@Overridepublic void call(JavaRDD<List> stringJavaRDD) throws Exception {if (!stringJavaRDD.isEmpty()) {System.out.println("stringJavaRDD collect---"+stringJavaRDD.collect());List<Row> rowList =new ArrayList<>();//把数据上一步数据写进stringJavaRddfor(List row: stringJavaRDD.collect()){String uid = row.get(0).toString();String name = row.get(1).toString();String age = row.get(2).toString();String sex = row.get(3).toString();String mostlike = row.get(4).toString();String lastview = row.get(5).toString();String totalcost = row.get(6).toString();Boolean _hoodie_is_deleted = Boolean.valueOf(row.get(7).toString());      String commitTime = row.get(8).toString();String op = row.get(9).toString();Row returnRow = RowFactory.create(uid, name, age, sex, mostlike, lastview, totalcost, _hoodie_is_deleted, commitTime, op);rowList.add(returnRow);}JavaRDD<Row> stringJavaRdd = jsc.parallelize(rowList);//写入表的字段schema设计List<StructField> fields = new ArrayList<>();fields.add(DataTypes.createStructField("uid", DataTypes.StringType, true));fields.add(DataTypes.createStructField("name", DataTypes.StringType, true));fields.add(DataTypes.createStructField("age", DataTypes.StringType, true));fields.add(DataTypes.createStructField("sex", DataTypes.StringType, true));fields.add(DataTypes.createStructField("mostlike", DataTypes.StringType, true));fields.add(DataTypes.createStructField("lastview", DataTypes.StringType, true));fields.add(DataTypes.createStructField("totalcost", DataTypes.StringType, true));fields.add(DataTypes.createStructField("_hoodie_is_deleted", DataTypes.BooleanType, true));fields.add(DataTypes.createStructField("commitTime", DataTypes.StringType, true));fields.add(DataTypes.createStructField("op", DataTypes.StringType, true));StructType schema = DataTypes.createStructType(fields);Dataset<Row> dataFrame = sqlContext.createDataFrame(stringJavaRdd, schema);Dataset<Row> rowDataset = dataFrame.withColumn("ts", dataFrame.col("commitTime")).withColumn("uuid", dataFrame.col("uid"));        //将数据写入hudi表以及hive表rowDataset.write().format("org.apache.hudi").option("PRECOMBINE_FIELD_OPT_KEY", "ts").option("RECORDKEY_FIELD_OPT_KEY", "uuid").option("hoodie.datasource.write.keygenerator.class", "org.apache.hudi.keygen.NonpartitionedKeyGenerator")                  .option("hoodie.table.name", "huditesttable").option("hoodie.upsert.shuffle.parallelism", "10")                  .option("hoodie.delete.shuffle.parallelism", "10").option("hoodie.insert.shuffle.parallelism", "10").option("hoodie.bulkinsert.shuffle.parallelism", "10").option("hoodie.finalize.write.parallelism", "10").option("hoodie.cleaner.parallelism", "10").option("hoodie.datasource.write.operation", "upsert")                  .option("hoodie.datasource.hive_sync.enable", "true")                    .option("hoodie.datasource.hive_sync.partition_extractor_class", "org.apache.hudi.hive.NonPartitionedExtractor").option("hoodie.datasource.hive_sync.database", "default").option("hoodie.datasource.hive_sync.table", "hudidebezium").option("hoodie.datasource.hive_sync.use_jdbc", "false").mode(SaveMode.Append).save(savePath);}}
});jssc.start();
jssc.awaitTermination();
jssc.close();

Hudi任务提交命令

source /opt/client/bigdata_env
source /opt/client/Hudi/component_env
spark-submit --master yarn --deploy-mode client --jars /opt/hudi-demo4/fastjson-1.2.4.jar --class hudiIn /opt/hudi-demo4/HudiJavaDemo-1.0-SNAPSHOT.jar

HetuEngine

简介

HetuEngine是华为FusionInsight MRS提供的高性能分布式SQL查询、数据虚拟化引擎。能与大数据生态无缝融合,实现海量数据秒级查询;支持多源异构协同,提供数据湖内一站式SQL融合分析。

同时HetuEngine拥有开放的接口,能够支持各报表、分析软件对接,具体可参考生态地图:https://fusioninsight.github.io/ecosystem/zh-hans/

下面我们以帆软FineBI为例进行查询、分析。

配置FineBI对接HetuEngine

JDBC URL: jdbc:presto://172.16.5.51:29860,172.16.5.52:29860/hive/default?serviceDiscoveryMode=hsbroker&tenant=default

查看初始同步数据:

通过HetuEngine检查增、改、删除操作

Mysql增加操作对应hive表结果:

Mysql更改操作对应hive表结果:

Mysql删除操作对应hive表结果:

报表:

电影喜爱度分析:

电影标签喜爱度分析:

本文由华为云发布。

华为云MRS基于Hudi和HetuEngine构建实时数据湖最佳实践相关推荐

  1. Apache Hudi 在 B 站构建实时数据湖的实践

    简介: B 站选择 Flink + Hudi 的数据湖技术方案,以及针对其做出的优化. 本文作者喻兆靖,介绍了为什么 B 站选择 Flink + Hudi 的数据湖技术方案,以及针对其做出的优化.主要 ...

  2. 基于Flink+ClickHouse构建实时游戏数据分析最佳实践

    简介:本实践介绍如何快速收集海量用户行为数据,实现秒级响应的实时用户行为分析,并通过实时流计算.云数据库ClickHouse等技术进行深入挖掘和分析,得到用户特征和画像,实现个性化系统推荐服务. 直达 ...

  3. Hudi自带工具DeltaStreamer的实时入湖最佳实践

    摘要:本文介绍如何使用Hudi自带入湖工具DeltaStreamer进行数据的实时入湖. 本文分享自华为云社区<华为FusionInsight MRS实战 - Hudi实时入湖之DeltaStr ...

  4. 数禾云上数据湖最佳实践

    简介: 数禾科技从成立伊始就组建了大数据团队并搭建了大数据平台.并在ECS上搭建了自己的Cloudera Hadoop集群.但随着公司互联网金融业务的快速扩张发展,大数据团队承担的责任也越来越重,实时 ...

  5. 【华为云技术分享】基于华为云IOT及无线RFID技术的智慧仓储解决方案最佳实践系列一

    摘要:仓储管理存在四大细分场景:出入库管理.盘点.分拣和货物跟踪.本系列将介绍利用华为云IOT全栈云服务,端侧采用华为收发分离式RFID解决方案,打造端到端到IOT智慧仓储解决方案的最佳实践. 仓储是 ...

  6. 基于 Kafka 与 Debezium 构建实时数据同步

    起源 在进行架构转型与分库分表之前,我们一直采用非常典型的单体应用架构:主服务是一个 Java WebApp,使用 Nginx 并选择 Session Sticky 分发策略做负载均衡和会话保持:背后 ...

  7. 基于Flink1.14 + Iceberg0.13构建实时数据湖实战

    点击上方蓝色字体,选择"设为星标" 回复"面试"获取更多惊喜 八股文教给我,你们专心刷题和面试 Hi,我是王知无,一个大数据领域的原创作者. 放心关注我,获取更 ...

  8. 百信银行基于 Apache Hudi 实时数据湖演进方案

    简介:本文介绍了百信银行实时计算平台的建设情况,实时数据湖构建在 Hudi 上的方案和实践方法,以及实时计算平台集成 Hudi 和使用 Hudi 的方式. 本文介绍了百信银行实时计算平台的建设情况,实 ...

  9. 如何快速构建企业级数据湖仓?

    更多技术交流.求职机会,欢迎关注字节跳动数据平台微信公众号,回复[1]进入官方交流群 本文整理自火山引擎开发者社区技术大讲堂第四期演讲,主要介绍了数据湖仓开源趋势.火山引擎 EMR 的架构及特点,以及 ...

最新文章

  1. linux文件系统dentry_NFS 文件系统源代码剖析
  2. 【c++】【转】如何只在heap上创建对象,如何只在stack上建立对象?
  3. python3 使用psutil 查看内存占用
  4. 屏幕坐标系和视口坐标系
  5. MFC中的GDI绘图
  6. python中协程与函数的区别_深入浅析python 协程与go协程的区别
  7. 【JulyEdu-Python基础】第 8 课:Python第三方库
  8. Swif基础语法01
  9. 送ta一朵独一无二的玫瑰花
  10. gdb调试时,Program received signal SIGPIPE, Broken pipe.
  11. python在json文件中查找指定数据_Python中json的取值 如何使用python提取json中指定字段的数据...
  12. 高端的面试从来不会在HashMap的红黑树上纠缠太多
  13. 自动化测试--实现一套完全解耦的简单测试框架(二)
  14. 在单链表中删除指定值的节点
  15. MongoDB 在windows shell环境下的基本操作和命令的使用示例(三)
  16. Oracle踩坑之解决数值0.2只显示成.2方法
  17. 常见电子元器件检测经验
  18. VR与平板电脑高速PCB设计实战攻略
  19. 软件设计师(3)-->数据库
  20. Linux的下Ip计算器

热门文章

  1. 巧用url 让seo做起来更轻松
  2. 【LeetCode】55. 跳跃游戏 (JavaScript)
  3. 程序修行从“拔刀术”到“万剑诀”
  4. Textstudio 应用程序无法正常启动0xc000007b
  5. ddn(ddnnpap)
  6. 深入浅出理解reedsolomon库数据冗余算法原理和具体实现源码分析
  7. Android Studio连接夜深模拟器
  8. 精度、召回率、准确率、F1、ROC、AUC的理解
  9. EPON OLT光模块RSSI问题分析报告
  10. 密码学归约证明——哈希签名范式的安全性