测试hudi-0.7.0对接spark structure streaming
测试hudi-0.7.0对接spark structure streaming
测试环境
Hudi version :0.7.0
Spark version :2.4.0
Hive version :2.1.1
Hadoop version :3.0.0
Storage (HDFS/S3/GCS..) :HDFS
Running on Docker? (yes/no) :no
测试内容
表类型:copy on write
插入方式:bulkinsert
控制文件大小:clustering配置
是否同步hive:同步hive测试的参数配置如下://hudi配置.option(TABLE_TYPE_OPT_KEY, "COPY_ON_WRITE").option("hoodie.datasource.write.operation","bulk_insert").option(PRECOMBINE_FIELD_OPT_KEY, "kafka_timestamp")// 以kafka分区和偏移量作为组合主键.option(RECORDKEY_FIELD_OPT_KEY, "kafka_partition_offset")// 以当前日期作为分区.option(PARTITIONPATH_FIELD_OPT_KEY, "partition_date")//clustering配置.option("hoodie.parquet.small.file.limit", "0").option("hoodie.clustering.inline", "true").option("hoodie.clustering.inline.max.commits", "4").option("hoodie.clustering.plan.strategy.target.file.max.bytes", "1073741824").option("hoodie.clustering.plan.strategy.small.file.limit", "629145600").option("hoodie.clustering.plan.strategy.sort.columns", "") //optional, if sorting is needed as part of rewriting data//hive配置.option("hoodie.table.name", "copy_on_write_table").option("hoodie.datasource.write.hive_style_partitioning", "true").option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY, "hudi").option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, "hudi_hive_sync").option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true").option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY,"").option(DataSourceWriteOptions.HIVE_URL_OPT_KEY, "jdbc:hive2://ip:port").option(DataSourceWriteOptions.HIVE_USER_OPT_KEY, "USERNAME").option(DataSourceWriteOptions.HIVE_PASS_OPT_KEY, "PASSWORD").option("HoodieWriteConfig.EMBEDDED_TIMELINE_SERVER_ENABLED", "false")//清理配置.option("hoodie.cleaner.policy","KEEP_LATEST_FILE_VERSIONS").option("hoodie.cleaner.fileversions.retained","1").option("hoodie.cleaner.automatic","true").option("hoodie.clean.async","true")
测试代码如下:
object SparkHudi {val logger = Logger.getLogger(SparkHudi.getClass)def main(args: Array[String]): Unit = {val spark = SparkSession.builder.appName("SparkHudi")//.master("local[*]").config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").config("spark.default.parallelism", 9).config("spark.sql.shuffle.partitions", 9).enableHiveSupport().getOrCreate()// 添加监听器,每一批次处理完成,将该批次的相关信息,如起始offset,抓取记录数量,处理时间打印到控制台spark.streams.addListener(new StreamingQueryListener() {override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {println("Query started: " + queryStarted.id)}override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {println("Query terminated: " + queryTerminated.id)}override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {println("Query made progress: " + queryProgress.progress)}})// 定义kafka流val dataStreamReader = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "kafka IP:9092").option("subscribe", "TopicName").option("startingOffsets", "latest").option("maxOffsetsPerTrigger", 100000)// 加载流数据,这里因为只是测试使用,直接读取kafka消息而不做其他处理,是spark结构化流会自动生成每一套消息对应的kafka元数据,如消息所在主题,分区,消息对应offset等。val df = dataStreamReader.load().selectExpr("topic as kafka_topic""CAST(partition AS STRING) kafka_partition","cast(timestamp as String) kafka_timestamp","CAST(offset AS STRING) kafka_offset","CAST(key AS STRING) kafka_key","CAST(value AS STRING) kafka_value","current_timestamp() current_time",
).selectExpr("kafka_topic""concat(kafka_partition,'-',kafka_offset) kafka_partition_offset","kafka_offset","kafka_timestamp","kafka_key","kafka_value","substr(current_time,1,10) partition_date")// 创建并启动queryval query = df.writeStream.queryName("demo")..foreachBatch { (batchDF: DataFrame, _: Long) => {batchDF.persist()println(LocalDateTime.now() + "start writing cow table")batchDF.write.format("org.apache.hudi").option(TABLE_TYPE_OPT_KEY, "COPY_ON_WRITE").option("hoodie.datasource.write.operation","bulk_insert").option(PRECOMBINE_FIELD_OPT_KEY, "kafka_timestamp")// 以kafka分区和偏移量作为组合主键.option(RECORDKEY_FIELD_OPT_KEY, "kafka_partition_offset")// 以当前日期作为分区.option(PARTITIONPATH_FIELD_OPT_KEY, "partition_date")//clustering配置.option("hoodie.parquet.small.file.limit", "0").option("hoodie.clustering.inline", "true").option("hoodie.clustering.inline.max.commits", "4").option("hoodie.clustering.plan.strategy.target.file.max.bytes", "1073741824").option("hoodie.clustering.plan.strategy.small.file.limit", "629145600").option("hoodie.clustering.plan.strategy.sort.columns", "") //optional, if sorting is needed as part of rewriting data//hive配置.option("hoodie.table.name", "copy_on_write_table").option("hoodie.datasource.write.hive_style_partitioning", "true").option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY, "hudi").option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, "hudi_hive_sync").option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true").option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY,"").option(DataSourceWriteOptions.HIVE_URL_OPT_KEY, "jdbc:hive2://ip:port").option(DataSourceWriteOptions.HIVE_USER_OPT_KEY, "USERNAME").option(DataSourceWriteOptions.HIVE_PASS_OPT_KEY, "PASSWORD").option("HoodieWriteConfig.EMBEDDED_TIMELINE_SERVER_ENABLED", "false")//清理配置.option("hoodie.cleaner.policy","KEEP_LATEST_FILE_VERSIONS").option("hoodie.cleaner.fileversions.retained","1").option("hoodie.cleaner.automatic","true").option("hoodie.clean.async","true").mode(SaveMode.Append).save("/hudi/sparkHudi/hudi表")println(LocalDateTime.now() + "finish")batchDF.unpersist()}}.option("checkpointLocation", "/tmp/sparkHudi/checkpoint/").start()query.awaitTermination()}
}
测试结果
将之前的旧版本hudi-0.6.0替换成hudi-0.7.0,经测试没有报错,符合预期。新版本增加了clustering配置,小文件会合并成大文件,减少了查询引擎需要扫描的文件数,因而提高了查询效率。不过合并过程会降低写入速度。
测试hudi-0.7.0对接spark structure streaming相关推荐
- Spark Structure Streaming(一)之简介
一.Structure Streaming 结构化流是基于Spark SQL引擎构建的可伸缩且容错的流处理引擎.可以像对静态数据进行批处理计算一样,来表示流计算. 当流数据继续到达时,Spark SQ ...
- Hudi(1.0、2.0)简介
文章目录 Hudi(v1.0) 一.Hudi介绍 1.介绍 2.特性 二.Hudi快速构建 1.环境准备 2.Maven安装 3.Git安装 4.构建Hudi 三.通过Spark-shell快速开始 ...
- 从0到1搭建spark集群---企业集群搭建
今天分享一篇从0到1搭建Spark集群的步骤,企业中大家亦可以参照次集群搭建自己的Spark集群. 一.下载Spark安装包 可以从官网下载,本集群选择的版本是spark-1.6.0-bin-hado ...
- Apache Spark 2.2.0 中文文档 - Spark RDD(Resilient Distributed Datasets)
Spark RDD(Resilient Distributed Datasets)论文 概要 1: 介绍 2: Resilient Distributed Datasets(RDDs) 2.1 RDD ...
- Rainbond 5.0正式发布, 支持对接管理已有Kubernetes集群...
Rainbond 5.0正式发布, 支持对接管理已有Kubernetes集群 今天非常高兴向大家宣布Rainbond v5.0正式发布,Rainbond是开源的企业应用云操作系统,支撑企业应用开发 ...
- Hudi 0.11.0 + Flink1.14.4 + Hive + Flink CDC + Kafka 集成
Hudi 0.11.0 + Flink1.14.4 + Hive + Flink CDC + Kafka 集成 一.环境准备 1.1 软件版本 Flink 1.14.4Scala 2.11CDH 6. ...
- Apache Hudi 0.8.0 版本发布,Flink 集成有重大提升以及支持并行写
4月初,Apache Hudi 发布了 0.8 版本,这个版本供解决了 97 个 ISSUES,下面简单介绍一下这个版本的迁移以及重要特性. 迁移指南 •如果从 0.5.3 以下版本迁移,请检查这个版 ...
- Apache Hudi 0.7.0 和 0.8.0 新功能已在 Amazon EMR 中可用
文末限时福利倒计时3天,不要错过! 前言 Apache Hudi 是一个开源事务性数据湖框架,通过提供记录级插入.更新和删除功能,极大地简化了增量数据处理和数据管道开发.如果您要在 Amazon Si ...
- GTX 1080Ti + cuda8.0 + cuDNN6.0 安装及测试
GPU 显卡厂商已经安装好了,直接安装 cuda8.0 + cuDNN6.0 我这里的显卡是 GTX 1080 Ti cuda安装 我下载的是cuda8.0的是deb格式的1.9个G地址:https: ...
最新文章
- 【Pandas库】(6) 索引操作--改、查、高级索引
- 机器人核心:感知与规划
- ORACLE EXP命令
- [video super resolution] ESPCN论文笔记
- 将PostgreSQL PL / Java安装为PostgreSQL扩展
- 【LeetCode笔记】17.电话号码的字母组合(Java、DFS)
- 中断处理程序与中断服务例程
- Maven : [ERROR] Project xxx is duplicated in the reactor @
- 检查图层当中是否存在高程基准(C++)ArcObject
- python Supervisor
- JAVA-ZIP和GZIP压缩实现
- ARG MIN的含义是什么?
- 湿度传感器pcb遇到的问题
- 坑爹的matlab除法
- 计算机专业线性代数教学大纲,线性代数(专业必修课)教学大纲(2018版)
- 数字信号处理——CFAR检测器设计(1)
- AspNetPager 存储过程
- maven 安装配置 - vscode for java
- asdfasdfasdf
- ffmpeg js转换音频_实现纯前端下的音频剪辑处理
热门文章
- 华为云桌面盒子不支持分屏_华为MatePad Pro 5G旗舰平板国内正式发布 售价5299元起...
- ai人工智能的数据服务_从AI数据集中消除无意识的偏见
- 自动化脚本上传图片怎么办_一切都自动化后我们将怎么办?
- 开源软件的安全性风险_开源安全性,Google惊喜等
- 政府 开源软件_为什么不是所有的政府软件都是开源的?
- (16) Node.js 模块的加载逻辑
- (3)JavaScript 的注释
- 前端:HTML/02/排版标记,块元素,行内元素,html字符实体,列表标记,图片标记
- SpringBoot 实现Session共享
- 属性 元素的内容 创建,插入和删除节点 虚拟节点