前言

最近公司有一个生产的小集群,专门用于运行spark作业。但是偶尔会因为nn或dn压力过大而导致作业checkpoint操作失败进而导致spark 流任务失败。本篇记录从应用层面对spark作业进行优化,进而达到优化集群的作用。

集群使用情况

有数据的目录以及使用情况如下:

目录 说明 大小 文件数量 数据数量占比 数据大小占比
/user/root/.sparkStaging/applicationIdxxx spark任务配置以及所需jar包 5G 约1k 约20% 约100%
/tmp/checkpoint/xxx/{commits|metadata|offsets|sources} checkpoint文件,其中commits和offsets频繁变动 2M 约4k 约40% 约0%

对于.sparkStaging目录,不经常变动,只需要优化其大小即可。

对于 checkpoint目录,频繁性增删,从生成周期和保留策略两方面去考虑。

.sparkStaging目录优化

对于/user/hadoop/.sparkStaging下文件,是spark任务依赖文件,可以将jar包上传到指定目录下,避免或减少了jar包的重复上传,进而减少任务的等待时间。

可以在spark的配置文件spark-defaults.conf配置如下内容:

spark.yarn.archive=hdfs://hdfscluster/user/hadoop/jars
spark.yarn.preserve.staging.files=false

参数说明

Property Name Default Meaning
spark.yarn.archive (none)
An archive containing needed Spark jars for distribution to the YARN cache. If set, this configuration replaces spark.yarn.jars and the archive is used in all the application's containers. The archive should contain jar files in its root directory. Like with the previous option, the archive can also be hosted on HDFS to speed up file distribution.
spark.yarn.preserve.staging.files false Set to true to preserve the staged files (Spark jar, app jar, distributed cache files) at the end of the job rather than delete them.

checkpoint优化

首先了解一下 checkpoint文件代表的含义。

checkpoint文件说明

  • offsets 目录 - 预先记录日志,记录每个批次中存在的偏移量。为了确保给定的批次将始终包含相同的数据,我们在进行任何处理之前将其写入此日志。因此,该日志中的第N个记录指示当前正在处理的数据,第N-1个条目指示哪些偏移已持久地提交给sink。

  • commits 目录 - 记录已完成的批次ID的日志。这用于检查批处理是否已完全处理,并且其输出已提交给接收器,因此无需再次处理。(例如)在重新启动过程中使用,以帮助识别接下来要运行的批处理。

  • metadata 文件 - 与整个查询关联的元数据,只有一个 StreamingQuery 唯一ID

  • sources目录 - 保存起始offset信息

下面从两个方面来优化checkpoint。

第一,从触发checkpoint机制方面考虑

trigger的机制

Trigger是用于指示 StreamingQuery 多久生成一次结果的策略。

Trigger有三个实现类,分别为:

  • OneTimeTrigger - A Trigger that processes only one batch of data in a streaming query then terminates the query.

  • ProcessingTime - A trigger that runs a query periodically based on the processing time. If interval is 0, the query will run as fast as possible.by default,trigger is ProcessingTime, and interval=0

  • ContinuousTrigger - A Trigger that continuously processes streaming data, asynchronously checkpointing at the specified interval.

可以为 ProcessingTime 指定一个时间 或者使用 指定时间的ContinuousTrigger ,固定生成checkpoint的周期,避免checkpoint生成过于频繁,减轻多任务下小集群的nn的压力

第二,从checkpoint保留机制考虑。

保留机制

spark.sql.streaming.minBatchesToRetain - 必须保留并使其可恢复的最小批次数,默认为 100

可以调小保留的batch的次数,比如调小到 20,这样 checkpoint 小文件数量整体可以减少到原来的 20%

checkpoint 参数验证

主要验证trigger机制保留机制

验证trigger机制

未设置trigger效果

未设置trigger前,spark structured streaming 的查询batch提交的是每次拉取一批数据直到处理完了再拉取下一批数据进行执行。

每一个batch的query任务的提交是有规律可寻的,即每隔5s提交一次代码,即trigger设置生效

注意,如果消息不能马上被消费,消息会有积压,structured streaming 目前并无与spark streaming效果等同的背压机制,为防止单批次query查询的数据源数据量过大,避免程序出现数据倾斜或者无法挽回的OutOfMemory错误,可以通过 maxOffsetsPerTrigger 参数来设置单个批次允许抓取的最大消息条数。

使用案例如下:

spark.readStream.format("kafka").option("kafka.bootstrap.servers", "xxx:9092").option("subscribe", "test-name").option("startingOffsets", "earliest").option("maxOffsetsPerTrigger", 1).option("group.id", "2").option("auto.offset.reset", "earliest").load()

验证保留机制

默认保留机制效果

spark任务提交参数

#!/bin/bash
spark-submit \
--class zd.Example \
--master yarn \
--deploy-mode client \
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.3,org.apache.kafka:kafka-clients:2.0.0 \
--repositories http://maven.aliyun.com/nexus/content/groups/public/ \
/root/spark-test-1.0-SNAPSHOT.jar

如下图,offsets和commits最终最少各保留100个文件。

修改保留策略

通过修改任务提交参数来进一步修改checkpoint的保留策略。

添加 --conf spark.sql.streaming.minBatchesToRetain=2 ,完整脚本如下:

#!/bin/bash
spark-submit \
--class zd.Example \
--master yarn \
--deploy-mode client \
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.3,org.apache.kafka:kafka-clients:2.0.0 \
--repositories http://maven.aliyun.com/nexus/content/groups/public/ \
--conf spark.sql.streaming.minBatchesToRetain=2 \
/root/spark-test-1.0-SNAPSHOT.jar

修改后保留策略效果

修改后保留策略截图如下:

即 checkpoint的保留策略参数设置生效

总结

综上,可以通过设置 trigger 来控制每一个batch的query提交的时间间隔,可以通过设置checkpoint文件最少保留batch的大小来减少checkpoint小文件的保留个数。

转载自:http://www.gzywkj.com/post/13405.html

参照

  1. https://github.com/apache/spark/blob/master/docs/running-on-yarn.md
  2. https://blog.csdn.net/lm709409753/article/details/85250859
  3. https://github.com/apache/spark/blob/v2.4.3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
  4. https://github.com/apache/spark/blob/v2.4.3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
  5. https://github.com/apache/spark/blob/v2.4.3/sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala
  6. https://github.com/apache/spark/blob/v2.4.3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTrigger.scala
  7. https://github.com/apache/spark/blob/v2.4.3/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

spark 集群优化相关推荐

  1. Spark集群资源如何分配

    前言 新来的实习小伙,在公司的一个小集群(14台服务器,用于处理一些数据量较小的月数据)上面提交了一个spark application,然后其他人提交的application都在排队中,一个个的在抱 ...

  2. spark 并行处理_如何使用Spark集群并行处理大数据

    spark 并行处理 by Hari Santanam 通过Hari Santanam 如何使用Spark集群并行处理大数据 (How to use Spark clusters for parall ...

  3. Spark集群安装介绍

    (1)初学者对于spark的几个疑问 http://aperise.iteye.com/blog/2302481 (2)spark开发环境搭建 http://aperise.iteye.com/blo ...

  4. docker下,极速搭建spark集群(含hdfs集群)

    搭建spark和hdfs的集群环境会消耗一些时间和精力,处于学习和开发阶段的同学关注的是spark应用的开发,他们希望整个环境能快速搭建好,从而尽快投入编码和调试,今天咱们就借助docker,极速搭建 ...

  5. HADOOP集群优化——CPU、内存、磁盘IO、YARN监控

    目录 hadoop集群优化指标---CPU.内存.磁盘IO.YARN监控 Linux性能监测:CPU篇 Linux性能监测:内存篇 Linux性能监测:磁盘IO篇 Linux性能监测:YARN篇 写在 ...

  6. docker下的spark集群,调整参数榨干硬件

    本文是<docker下,极速搭建spark集群(含hdfs集群)>的续篇,前文将spark集群搭建成功并进行了简单的验证,但是存在以下几个小问题: spark只有一个work节点,只适合处 ...

  7. Spark-----Spark 与 Hadoop 对比,Spark 集群搭建与示例运行,RDD算子简单入门

    目录 一.Spark 概述 1.1. Spark是什么 1.2. Spark的特点(优点) 1.3. Spark组件 1.4. Spark和Hadoop的异同 二.Spark 集群搭建 2.1. Sp ...

  8. spark 序列化错误 集群提交时_【问题解决】本地提交任务到Spark集群报错:Initial job has not accepted any resources...

    本地提交任务到Spark集群报错:Initial job has not accepted any resources 错误信息如下: 18/04/17 18:18:14 INFO TaskSched ...

  9. spark集群使用hanlp进行分布式分词操作说明

    本篇分享一个使用hanlp分词的操作小案例,即在spark集群中使用hanlp完成分布式分词的操作,文章整理自[qq_33872191]的博客,感谢分享!以下为全文:   分两步: 第一步:实现han ...

  10. GIS+=地理信息+云计算技术——Spark集群部署

    第一步:安装软件         Spark 1.5.4:wget http://www.apache.org/dyn/closer.lua/spark/spark-1.5.2/spark-1.5.2 ...

最新文章

  1. mysql定时备份并上传ftp_Linux下定时任务实现mysql自动备份并上传远程ftp
  2. verilog 浮点转定点_浮点数0.1+0.2为何不等于0.3
  3. USB相关结构体之struct usb_device
  4. 你还在 new 对象吗?Java8 通用 Builder 了解一下?
  5. 机器学习笔记(part1)--Frobenius范数与迹运算
  6. 【分享】华为总裁任正非谈企业管理:正确的方向来自于妥协
  7. java path环境变量_java配置环境变量
  8. 【九天教您南方cass 9.1】 03 编码法绘制地形图
  9. Sqlmap脱库之“你的数据我所见”
  10. 测试用例入门(二) - 使用等价类划分法编写测试用例
  11. Ubuntu挑战绿坝
  12. 计算机基础应用模拟考试软件,全国计算机等级考试全真训练模拟考试软件一级基础及MS-Office应用...
  13. 百世赴美IPO拟募7.5亿美元,“另类”大佬周韶宁迎来新冒险
  14. 深度学习--python 读取并显示图片的方法
  15. 用计算机弹音乐我们一起猫叫,抖音上面我们一起学猫叫一起喵喵喵是什么歌 抖音学猫叫歌曲歌词...
  16. 创建制作SDK的静态库工程
  17. STM32传感器外设集--温湿度模块(DHT11)
  18. 2019年4月中国编程语言排行榜,java占有率一骑绝尘,python工资领先
  19. 聊天室加入用户名查重功能
  20. 手把手教你使用ADSP-21569(七)在线调试程序的详细说明(TDM配置:4进12出)

热门文章

  1. 免费logo设计在线生成(不定时更新)
  2. 什么是OneData?阿里数据中台实施方法论解读
  3. matlab单点弦截法求解,弦截法求根c语言
  4. 基于SuperMap iDesktop制作天地图1--10级详细说明
  5. 日历查询---在线阴阳历转换器
  6. 【小万出生记——第0篇】想做一款机械手
  7. [数据库]-- mysql 获取昨天日期、今天日期、明天日期以及前一个小时和后一个小时的时间
  8. js 打印去掉页眉页脚页码_javascript 打印时去掉页眉页脚
  9. 十月百度,阿里巴巴,迅雷搜狗最新面试五十三题(持续更新中10.16)
  10. python判断一个列表是否包含另一个列表_Python-检查一个列表是否包含在另一个列表中...