作为延续解剖的-Apache的火花的工作后,我将分享如何利用星火UI调谐工作。 我将继续使用先前文章中使用的相同示例,新的spark应用程序将在以下方面完成工作

–阅读纽约市停车票

–通过“板ID”进行汇总并计算违规日期

–保存结果

此代码的DAG看起来像这样

这是多阶段的工作,因此需要一些数据混洗,因为此示例混洗写入为564mb,输出为461MB。

让我们看看我们可以做些什么来减少这种情况?

让我们从“ Stage2”开始采取自上而下的方法。 首先想到的是探索压缩。
当前代码

aggValue.map {case (key, value) => Array(key, value._1, value._2.mkString(",")).mkString("\t")}.saveAsTextFile(s"/data/output/${now}")

新密码

aggValue.map {case (key, value) => Array(key, value._1, value._2.mkString(",")).mkString("\t")}.saveAsTextFile(s"/data/output/${now}", classOf[GzipCodec])

新代码仅在写入时启用gzip,让我们看看我们在Spark UI上看到的内容

用Gzip保存

只需写入编码器,写入量下降了70%。 现在达到135Mb并加快了工作速度。

让我们先看看还有什么可能,然后再进行更多的内部调整

最终输出如下所示

1RA32   1       05/07/2014
92062KA 2       07/29/2013,07/18/2013
GJJ1410 3       12/07/2016,03/04/2017,04/25/2015
FJZ3486 3       10/21/2013,01/25/2014
FDV7798 7       03/09/2014,01/14/2014,07/25/2014,11/21/2015,12/04/2015,01/16/2015

进攻日期以原始格式存储,可以对此应用少量编码以提高速度。

Java 8添加了LocalDate来简化日期操作,该类带有一些方便的功能,其中之一就是toEpocDay。

此函数将日期转换为1970年的日期,因此这意味着在4个字节(Int)中,我们最多可以存储5K年,与当前格式占用10个字节相比,这似乎可以节省很多。

epocDay的代码段

val issueDate = LocalDate.parse(row(aggFieldsOffset.get("issue date").get), ISSUE_DATE_FORMAT)val issueDateValues = mutable.Set[Int]()issueDateValues.add(issueDate.toEpochDay.toInt)result = (fieldOffset.map(fieldInfo => row(fieldInfo._2)).mkString(","), (1, issueDateValues))

更改后的Spark UI。 我还做了另一项更改以使用KryoSerializer

这是一个巨大的改进,随机写入从564Mb更改为409MB(提高27%),输出从134Mb更改为124 Mb(提高8%)

现在让我们转到Spark UI上的另一部分,该部分显示了执行者端的日志。

以上运行的GC日志显示以下内容

2018-10-28T17:13:35.332+0800: 130.281: [GC (Allocation Failure) [PSYoungGen: 306176K->20608K(327168K)] 456383K->170815K(992768K), 0.0222440 secs] [Times: user=0.09 sys=0.00, real=0.03 secs]
2018-10-28T17:13:35.941+0800: 130.889: [GC (Allocation Failure) [PSYoungGen: 326784K->19408K(327168K)] 476991K->186180K(992768K), 0.0152300 secs] [Times: user=0.09 sys=0.00, real=0.02 secs]
2018-10-28T17:13:36.367+0800: 131.315: [GC (GCLocker Initiated GC) [PSYoungGen: 324560K->18592K(324096K)] 491332K->199904K(989696K), 0.0130390 secs] [Times: user=0.11 sys=0.00, real=0.01 secs]
2018-10-28T17:13:36.771+0800: 131.720: [GC (GCLocker Initiated GC) [PSYoungGen: 323744K->18304K(326656K)] 505058K->215325K(992256K), 0.0152620 secs] [Times: user=0.09 sys=0.00, real=0.02 secs]
2018-10-28T17:13:37.201+0800: 132.149: [GC (Allocation Failure) [PSYoungGen: 323456K->20864K(326656K)] 520481K->233017K(992256K), 0.0199460 secs] [Times: user=0.12 sys=0.00, real=0.02 secs]
2018-10-28T17:13:37.672+0800: 132.620: [GC (Allocation Failure) [PSYoungGen: 326016K->18864K(327168K)] 538169K->245181K(992768K), 0.0237590 secs] [Times: user=0.17 sys=0.00, real=0.03 secs]
2018-10-28T17:13:38.057+0800: 133.005: [GC (GCLocker Initiated GC) [PSYoungGen: 324016K->17728K(327168K)] 550336K->259147K(992768K), 0.0153710 secs] [Times: user=0.09 sys=0.00, real=0.01 secs]
2018-10-28T17:13:38.478+0800: 133.426: [GC (Allocation Failure) [PSYoungGen: 322880K->18656K(326144K)] 564301K->277690K(991744K), 0.0156780 secs] [Times: user=0.00 sys=0.00, real=0.01 secs]
2018-10-28T17:13:38.951+0800: 133.899: [GC (Allocation Failure) [PSYoungGen: 323808K->21472K(326656K)] 582842K->294338K(992256K), 0.0157690 secs] [Times: user=0.09 sys=0.00, real=0.02 secs]
2018-10-28T17:13:39.384+0800: 134.332: [GC (Allocation Failure) [PSYoungGen: 326624K->18912K(317440K)] 599490K->305610K(983040K), 0.0126610 secs] [Times: user=0.11 sys=0.00, real=0.02 secs]
2018-10-28T17:13:39.993+0800: 134.941: [GC (Allocation Failure) [PSYoungGen: 313824K->17664K(322048K)] 600522K->320486K(987648K), 0.0111380 secs] [Times: user=0.00 sys=0.00, real=0.02 secs] 

让我们专注于一条线

2018-10-28T17:13:39.993+0800: 134.941: [GC (Allocation Failure) [PSYoungGen: 313824K->17664K(322048K)] 600522K->320486K(987648K) , 0.0111380 secs] [Times: user=0.00 sys=0.00, real=0.02 secs] 

次要GC之前的堆为600MB,之后为320MB,总堆大小为987MB。

执行器分配了2GB内存,并且此Spark应用程序未使用所有内存,我们可以通过发送更多任务或更大任务来给执行器增加更多负载。

我将输入分区从270减少到100

带270个输入分区

带100个输入分区

100个输入分区看起来更好,可减少大约10%以上的数据洗牌。

其他技巧

现在,我将分享一些将大大改变GC的东西!

优化前的代码

private def mergeValues(value1: (Int, mutable.Set[Int]), value2: (Int, mutable.Set[Int])): (Int, mutable.Set[Int]) = {val newCount = value1._1 + value2._1val dates = value1._2dates.foreach(d => value2._2.add(d))(newCount, value2._2)}private def saveData(aggValue: RDD[(String, (Int, mutable.Set[Int]))], now: String) = {aggValue.map { case (key, value) => Array(key, value._1, value._2.mkString(",")).mkString("\t") }.coalesce(100).saveAsTextFile(s"/data/output/${now}", classOf[GzipCodec])}

优化后的代码

private def mergeValues(value1: GroupByValue, value2: GroupByValue): GroupByValue = {if (value2.days.size > value1.days.size) {value2.count = value1.count + value2.countvalue1.days.foreach(d => value2.days.add(d))value2}else {value1.count = value1.count + value2.countvalue2.days.foreach(d => value1.days.add(d))value1}}private def saveData(aggValue: RDD[(String, GroupByValue)], now: String) = {aggValue.mapPartitions(rows => {val buffer = new StringBuffer()rows.map {case (key, value) =>buffer.setLength(0)buffer.append(key).append("\t").append(value.count).append("\t").append(value.days.mkString(","))buffer.toString}}).coalesce(100).saveAsTextFile(s"/data/output/${now}", classOf[GzipCodec])}

新代码正在对集合进行优化合并,它向大集合中添加了小集合,并且还引入了Case类。

另一种优化是保存功能,其中它使用mapPartitions通过使用StringBuffer减少对象分配。

我使用http://gceasy.io获得了一些GC统计信息。

更改代码之前

更改代码后

新代码为例如产生更少的垃圾。

总GC 126 GB和122 GB(约提高4%)

最大GC时间720ms与520 ms(约好25%)

优化看起来很有希望。

该博客中使用的所有代码都可以在github repo sparkperformance上找到

请继续关注有关此内容的更多信息。

翻译自: https://www.javacodegeeks.com/2018/11/insights-spark-ui.html

Spark UI的见解相关推荐

  1. spark ui_Spark UI的见解

    spark ui 作为apache-spark-job解剖的后续文章,我将分享您如何使用Spark UI进行作业调整. 我将继续使用先前文章中使用的相同示例,新的spark应用程序将在以下方面完成工作 ...

  2. spark ui 上schedulingDelay理解

    Spark UI页面上的schedulingDelay具体为一批job submit到第一个job开始执行的这段时间. 其中submitTime在JobGenerate中根据定时生成job的事件而触发 ...

  3. Spark UI无法查看到slave节点

    背景信息: Spark两个节点,主机名分别为master和slave,$SPARK_HOMR/conf/slaves配置了两条记录:两行分别为master和slave. 先使用./sbin/start ...

  4. Spark UI在虚拟机中可以打开,但是在宿主机上无法访问

    场景描述: 在笔记本Windows10的VMware Workstation Pro中安装CentOS 7 的VM,并部署了Spark 错误描述: 在Centos中可以访问Spark UI,但是在Wi ...

  5. Spark UI (基于Yarn) 分析与定制

    前言 有时候我们希望能对Spark UI进行一些定制化增强.并且我们希望尽可能不更改Spark的源码.为了达到此目标,我们会从如下三个方面进行阐述: 理解Spark UI的处理流程 现有Executo ...

  6. 手把手带你了解Spark作业“体检报告” --Spark UI

    手把手带你了解Spark作业"体检报告" --Spark UI Spark UI 一级入口 Executors Environment Storage SQL Jobs Stage ...

  7. spark ui job和stage的dag图查看过去运行的任务,查不到,分析源码解决问题

    项目场景: 使用用2.x跑任务,产看耗时的spark job, stage,发现job和stage的dag信息缺失 问题描述: sparkUI 显示dag信息缺失问题: 使用用2.x跑任务,查看spa ...

  8. Spark UI界面原理

    当Spark程序在运行时,会提供一个Web页面查看Application运行状态信息.是否开启UI界面由参数spark.ui.enabled(默认为true)来确定.下面列出Spark UI一些相关配 ...

  9. 漏洞通告 | ​Apache Spark UI命令漏洞;Grails远程代码漏洞;Confluence Questions漏洞

    [漏洞通告]Apache Spark UI 命令注入漏洞 基础信息 CVE CVE-2022-33891 等级 高危 类型 命令注入 漏洞详情 Apache Spark是美国阿帕奇(Apache)软件 ...

最新文章

  1. 基于catalog 创建RMAN存储脚本
  2. 光源时间_天哪!你们居然都错了!D65光源,指的是上午还是下午的太阳光?
  3. UA MATH564 概率论 样本均值的偏度与峰度
  4. sql相同顺序法和一次封锁法_数学专题 | Ep01 隔板法的妙用
  5. go 访问数据库mysql基础
  6. junit单元测试诡异问题
  7. Spring源码阅读 —— 一文看懂AOP的流程
  8. 江苏成人高考考前注意事项
  9. Mstar 方案白板书写加速
  10. 服务器端给客户端发送消息,linux 服务器端给客户端发送消息
  11. python matplotlib 基础练习:画一元二次函数
  12. linux sdl windows.h,SDL入门教程(十):1、多语言支持,Win32下的GetText
  13. 【css】svg修改图标颜色
  14. 安卓源码避坑指南10—蓝牙音乐播放状态和歌曲信息不更新
  15. 恋词题源报刊Unit1
  16. rocksdb 备份backup
  17. cuda矩阵相乘_CUDA计算矩阵相乘
  18. Android备考01 黑马76期-day03 操作数据库
  19. 【转码方式】-Base64
  20. python django 安装虚拟环境创建工程项目

热门文章

  1. 【LCT】洞穴勘测(luogu 2147/金牌导航 LCT-1)
  2. 【期望】关灯游戏(金牌导航 期望-8)
  3. 学习分享会(2019.5.31)
  4. Nacos(十)之Kubernetes Nacos
  5. java实现多文件上传至本地服务器
  6. 厉害了,Servlet3的异步处理机制
  7. 同学们,看看这里吧!!!
  8. 考研英语二大纲22年考研
  9. 查看防火墙状态并关闭防火墙
  10. mybatis的$和#详解分析