测试hudi-0.7.0对接spark structure streaming

测试环境

Hudi version :0.7.0
Spark version :2.4.0
Hive version :2.1.1
Hadoop version :3.0.0
Storage (HDFS/S3/GCS..) :HDFS
Running on Docker? (yes/no) :no

测试内容

表类型:copy on write
插入方式:bulkinsert
控制文件大小:clustering配置
是否同步hive:同步hive测试的参数配置如下://hudi配置.option(TABLE_TYPE_OPT_KEY, "COPY_ON_WRITE").option("hoodie.datasource.write.operation","bulk_insert").option(PRECOMBINE_FIELD_OPT_KEY, "kafka_timestamp")// 以kafka分区和偏移量作为组合主键.option(RECORDKEY_FIELD_OPT_KEY, "kafka_partition_offset")// 以当前日期作为分区.option(PARTITIONPATH_FIELD_OPT_KEY, "partition_date")//clustering配置.option("hoodie.parquet.small.file.limit", "0").option("hoodie.clustering.inline", "true").option("hoodie.clustering.inline.max.commits", "4").option("hoodie.clustering.plan.strategy.target.file.max.bytes", "1073741824").option("hoodie.clustering.plan.strategy.small.file.limit", "629145600").option("hoodie.clustering.plan.strategy.sort.columns", "") //optional, if sorting is needed as part of rewriting data//hive配置.option("hoodie.table.name", "copy_on_write_table").option("hoodie.datasource.write.hive_style_partitioning", "true").option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY, "hudi").option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, "hudi_hive_sync").option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true").option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY,"").option(DataSourceWriteOptions.HIVE_URL_OPT_KEY, "jdbc:hive2://ip:port").option(DataSourceWriteOptions.HIVE_USER_OPT_KEY, "USERNAME").option(DataSourceWriteOptions.HIVE_PASS_OPT_KEY, "PASSWORD").option("HoodieWriteConfig.EMBEDDED_TIMELINE_SERVER_ENABLED", "false")//清理配置.option("hoodie.cleaner.policy","KEEP_LATEST_FILE_VERSIONS").option("hoodie.cleaner.fileversions.retained","1").option("hoodie.cleaner.automatic","true").option("hoodie.clean.async","true")

测试代码如下:

object SparkHudi {val logger = Logger.getLogger(SparkHudi.getClass)def main(args: Array[String]): Unit = {val spark = SparkSession.builder.appName("SparkHudi")//.master("local[*]").config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").config("spark.default.parallelism", 9).config("spark.sql.shuffle.partitions", 9).enableHiveSupport().getOrCreate()// 添加监听器,每一批次处理完成,将该批次的相关信息,如起始offset,抓取记录数量,处理时间打印到控制台spark.streams.addListener(new StreamingQueryListener() {override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {println("Query started: " + queryStarted.id)}override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {println("Query terminated: " + queryTerminated.id)}override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {println("Query made progress: " + queryProgress.progress)}})// 定义kafka流val dataStreamReader = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "kafka IP:9092").option("subscribe", "TopicName").option("startingOffsets", "latest").option("maxOffsetsPerTrigger", 100000)// 加载流数据,这里因为只是测试使用,直接读取kafka消息而不做其他处理,是spark结构化流会自动生成每一套消息对应的kafka元数据,如消息所在主题,分区,消息对应offset等。val df = dataStreamReader.load().selectExpr("topic as kafka_topic""CAST(partition AS STRING) kafka_partition","cast(timestamp as String) kafka_timestamp","CAST(offset AS STRING) kafka_offset","CAST(key AS STRING) kafka_key","CAST(value AS STRING) kafka_value","current_timestamp() current_time",
).selectExpr("kafka_topic""concat(kafka_partition,'-',kafka_offset) kafka_partition_offset","kafka_offset","kafka_timestamp","kafka_key","kafka_value","substr(current_time,1,10) partition_date")// 创建并启动queryval query = df.writeStream.queryName("demo")..foreachBatch { (batchDF: DataFrame, _: Long) => {batchDF.persist()println(LocalDateTime.now() + "start writing cow table")batchDF.write.format("org.apache.hudi").option(TABLE_TYPE_OPT_KEY, "COPY_ON_WRITE").option("hoodie.datasource.write.operation","bulk_insert").option(PRECOMBINE_FIELD_OPT_KEY, "kafka_timestamp")// 以kafka分区和偏移量作为组合主键.option(RECORDKEY_FIELD_OPT_KEY, "kafka_partition_offset")// 以当前日期作为分区.option(PARTITIONPATH_FIELD_OPT_KEY, "partition_date")//clustering配置.option("hoodie.parquet.small.file.limit", "0").option("hoodie.clustering.inline", "true").option("hoodie.clustering.inline.max.commits", "4").option("hoodie.clustering.plan.strategy.target.file.max.bytes", "1073741824").option("hoodie.clustering.plan.strategy.small.file.limit", "629145600").option("hoodie.clustering.plan.strategy.sort.columns", "") //optional, if sorting is needed as part of rewriting data//hive配置.option("hoodie.table.name", "copy_on_write_table").option("hoodie.datasource.write.hive_style_partitioning", "true").option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY, "hudi").option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, "hudi_hive_sync").option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true").option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY,"").option(DataSourceWriteOptions.HIVE_URL_OPT_KEY, "jdbc:hive2://ip:port").option(DataSourceWriteOptions.HIVE_USER_OPT_KEY, "USERNAME").option(DataSourceWriteOptions.HIVE_PASS_OPT_KEY, "PASSWORD").option("HoodieWriteConfig.EMBEDDED_TIMELINE_SERVER_ENABLED", "false")//清理配置.option("hoodie.cleaner.policy","KEEP_LATEST_FILE_VERSIONS").option("hoodie.cleaner.fileversions.retained","1").option("hoodie.cleaner.automatic","true").option("hoodie.clean.async","true").mode(SaveMode.Append).save("/hudi/sparkHudi/hudi表")println(LocalDateTime.now() + "finish")batchDF.unpersist()}}.option("checkpointLocation", "/tmp/sparkHudi/checkpoint/").start()query.awaitTermination()}
}

测试结果

将之前的旧版本hudi-0.6.0替换成hudi-0.7.0,经测试没有报错,符合预期。新版本增加了clustering配置,小文件会合并成大文件,减少了查询引擎需要扫描的文件数,因而提高了查询效率。不过合并过程会降低写入速度。

测试hudi-0.7.0对接spark structure streaming相关推荐

  1. Spark Structure Streaming(一)之简介

    一.Structure Streaming 结构化流是基于Spark SQL引擎构建的可伸缩且容错的流处理引擎.可以像对静态数据进行批处理计算一样,来表示流计算. 当流数据继续到达时,Spark SQ ...

  2. Hudi(1.0、2.0)简介

    文章目录 Hudi(v1.0) 一.Hudi介绍 1.介绍 2.特性 二.Hudi快速构建 1.环境准备 2.Maven安装 3.Git安装 4.构建Hudi 三.通过Spark-shell快速开始 ...

  3. 从0到1搭建spark集群---企业集群搭建

    今天分享一篇从0到1搭建Spark集群的步骤,企业中大家亦可以参照次集群搭建自己的Spark集群. 一.下载Spark安装包 可以从官网下载,本集群选择的版本是spark-1.6.0-bin-hado ...

  4. Apache Spark 2.2.0 中文文档 - Spark RDD(Resilient Distributed Datasets)

    Spark RDD(Resilient Distributed Datasets)论文 概要 1: 介绍 2: Resilient Distributed Datasets(RDDs) 2.1 RDD ...

  5. Rainbond 5.0正式发布, 支持对接管理已有Kubernetes集群...

    Rainbond 5.0正式发布, 支持对接管理已有Kubernetes集群 ​ 今天非常高兴向大家宣布Rainbond v5.0正式发布,Rainbond是开源的企业应用云操作系统,支撑企业应用开发 ...

  6. Hudi 0.11.0 + Flink1.14.4 + Hive + Flink CDC + Kafka 集成

    Hudi 0.11.0 + Flink1.14.4 + Hive + Flink CDC + Kafka 集成 一.环境准备 1.1 软件版本 Flink 1.14.4Scala 2.11CDH 6. ...

  7. Apache Hudi 0.8.0 版本发布,Flink 集成有重大提升以及支持并行写

    4月初,Apache Hudi 发布了 0.8 版本,这个版本供解决了 97 个 ISSUES,下面简单介绍一下这个版本的迁移以及重要特性. 迁移指南 •如果从 0.5.3 以下版本迁移,请检查这个版 ...

  8. Apache Hudi 0.7.0 和 0.8.0 新功能已在 Amazon EMR 中可用

    文末限时福利倒计时3天,不要错过! 前言 Apache Hudi 是一个开源事务性数据湖框架,通过提供记录级插入.更新和删除功能,极大地简化了增量数据处理和数据管道开发.如果您要在 Amazon Si ...

  9. GTX 1080Ti + cuda8.0 + cuDNN6.0 安装及测试

    GPU 显卡厂商已经安装好了,直接安装 cuda8.0 + cuDNN6.0 我这里的显卡是 GTX 1080 Ti cuda安装 我下载的是cuda8.0的是deb格式的1.9个G地址:https: ...

最新文章

  1. 【Pandas库】(6) 索引操作--改、查、高级索引
  2. 机器人核心:感知与规划
  3. ORACLE EXP命令
  4. [video super resolution] ESPCN论文笔记
  5. 将PostgreSQL PL / Java安装为PostgreSQL扩展
  6. 【LeetCode笔记】17.电话号码的字母组合(Java、DFS)
  7. 中断处理程序与中断服务例程
  8. Maven : [ERROR] Project xxx is duplicated in the reactor @
  9. 检查图层当中是否存在高程基准(C++)ArcObject
  10. python Supervisor
  11. JAVA-ZIP和GZIP压缩实现
  12. ARG MIN的含义是什么?
  13. 湿度传感器pcb遇到的问题
  14. 坑爹的matlab除法
  15. 计算机专业线性代数教学大纲,线性代数(专业必修课)教学大纲(2018版)
  16. 数字信号处理——CFAR检测器设计(1)
  17. AspNetPager 存储过程
  18. maven 安装配置 - vscode for java
  19. asdfasdfasdf
  20. ffmpeg js转换音频_实现纯前端下的音频剪辑处理

热门文章

  1. 华为云桌面盒子不支持分屏_华为MatePad Pro 5G旗舰平板国内正式发布 售价5299元起...
  2. ai人工智能的数据服务_从AI数据集中消除无意识的偏见
  3. 自动化脚本上传图片怎么办_一切都自动化后我们将怎么办?
  4. 开源软件的安全性风险_开源安全性,Google惊喜等
  5. 政府 开源软件_为什么不是所有的政府软件都是开源的?
  6. (16) Node.js 模块的加载逻辑
  7. (3)JavaScript 的注释
  8. 前端:HTML/02/排版标记,块元素,行内元素,html字符实体,列表标记,图片标记
  9. SpringBoot 实现Session共享
  10. 属性 元素的内容 创建,插入和删除节点 虚拟节点