当两个表需要join时,如果一个是大表,一个是小表,正常的map-reduce流程需要shuffle,这会导致大表数据在节点间网络传输,常见的优化方式是将小表读到内存中并广播到大表处理,避免shuffle+reduce;

在hive中叫mapjoin(map-side join),配置为 hive.auto.convert.join

在spark中叫BroadcastHashJoin (broadcast hash join)

Spark SQL uses broadcast join (aka broadcast hash join) instead of hash join to optimize join queries when the size of one side data is below spark.sql.autoBroadcastJoinThreshold.

Broadcast join can be very efficient for joins between a large table (fact) with relatively small tables (dimensions) that could then be used to perform a star-schema join. It can avoid sending all data of the large table over the network.

有几种方式可以触发:

1)sql hint (从spark 2.3版本开始支持)

SELECT /*+ MAPJOIN(b) */ ...SELECT /*+ BROADCASTJOIN(b) */ ...SELECT /*+ BROADCAST(b) */ ...

2)broadcast function:DataFrame.broadcast

testTable3= testTable1.join(broadcast(testTable2), Seq("id"), "right_outer")

3)自动优化

org.apache.spark.sql.execution.SparkStrategies.JoinSelection

    private def canBroadcast(plan: LogicalPlan): Boolean = {plan.statistics.isBroadcastable ||(plan.statistics.sizeInBytes >= 0 &&plan.statistics.sizeInBytes <= conf.autoBroadcastJoinThreshold)}

例如:

spark-sql> explain select * from big_table1 a, (select * from big_table2 limit 10) b where a.id = b.id;

18/09/17 18:14:09 339 WARN Utils66: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.

== Physical Plan ==

BroadcastHashJoin [id#5], [id#14], Inner, BuildRight

:- *Filter isnotnull(id#5)

:  +- HiveTableScan [name#4, id#5], MetastoreRelation big_table1

+- BroadcastExchange HashedRelationBroadcastMode(List(input[6, string, false]))

+- Filter isnotnull(id#14)

+- GlobalLimit 10

+- Exchange SinglePartition

+- LocalLimit 10

+- HiveTableScan [id#14, ... 187 more fields], MetastoreRelation big_table2

Time taken: 4.216 seconds, Fetched 1 row(s)

BroadcastExchange 执行过程为

org.apache.spark.sql.execution.exchange.BroadcastExchangeExec

  override protected[sql] def doExecuteBroadcast[T](): broadcast.Broadcast[T] = {ThreadUtils.awaitResultInForkJoinSafely(relationFuture, timeout).asInstanceOf[broadcast.Broadcast[T]]}

其中timeout是指spark.sql.broadcastTimeout,默认300s

  private lazy val relationFuture: Future[broadcast.Broadcast[Any]] = {// broadcastFuture is used in "doExecute". Therefore we can get the execution id correctly here.
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)Future {// This will run in another thread. Set the execution id so that we can connect these jobs// with the correct execution.
SQLExecution.withExecutionId(sparkContext, executionId) {try {val beforeCollect = System.nanoTime()// Note that we use .executeCollect() because we don't want to convert data to Scala types
val input: Array[InternalRow] = child.executeCollect()if (input.length >= 512000000) {throw new SparkException(s"Cannot broadcast the table with more than 512 millions rows: ${input.length} rows")}val beforeBuild = System.nanoTime()longMetric("collectTime") += (beforeBuild - beforeCollect) / 1000000val dataSize = input.map(_.asInstanceOf[UnsafeRow].getSizeInBytes.toLong).sumlongMetric("dataSize") += dataSizeif (dataSize >= (8L << 30)) {throw new SparkException(s"Cannot broadcast the table that is larger than 8GB: ${dataSize >> 30} GB")}// Construct and broadcast the relation.
val relation = mode.transform(input)val beforeBroadcast = System.nanoTime()longMetric("buildTime") += (beforeBroadcast - beforeBuild) / 1000000val broadcasted = sparkContext.broadcast(relation)longMetric("broadcastTime") += (System.nanoTime() - beforeBroadcast) / 1000000SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, metrics.values.toSeq)broadcasted

对一个表broadcast执行过程为首先计算然后collect,然后通过SparkContext broadcast出去,并且执行过程为线程异步执行,超时时间为spark.sql.broadcastTimeout;

转载于:https://www.cnblogs.com/barneywill/p/10109434.html

【原创】大叔问题定位分享(11)Spark中对大表子查询加limit为什么会报Broadcast超时错误...相关推荐

  1. spark写入oracle 优化,spark读写数据库大表分区性能优化

    spark读写数据库大表分区性能优化:经常会遇到spark读写数据库再做分析,像mysql或oracle. 在数据量很大的情况下,如果只有一个worker一个excutor一个task,那你excut ...

  2. oracle模糊查询中的regexp_like嵌套子查询用法

    oracle模糊查询中的regexp_like嵌套子查询用法 regexp_like一般用于模糊查询某一列时包含多个查询条件 需求1:在用户表中查询出账号包含650000和230000的用户. sel ...

  3. mysql中多表联合查询语句_mysql中的多表联合查询语句是什么

    mysql中的多表联合查询语句是:[select 语句1 union [union 选项] select 语句2 union [union 选项] select 语句n].多表联合查询结果是将多个se ...

  4. 【华为云技术分享】Spark中的文件源(上)

    摘要: 在大数据/数据库领域,数据的存储格式直接影响着系统的读写性能.Spark针对不同的用户/开发者,支持了多种数据文件存储方式.本文的内容主要来自于Spark AI Summit 2019中的一个 ...

  5. Django框架(11.Django中的通过模型类查询数据以及相关函数和条件)

     Django中的查询函数 通过模型类.objects属性可以调用如下函数,实现对模型类对应的数据表的查询.    不管哪个函数注意返回值的类型 函数名 功能 返回值 说明 get 返回表中满足条件的 ...

  6. sql查询三级菜单分类_SQL面试50题——思路解答与分类整理(中)窗口函数与子查询...

    让我们每天都进步一点点 题目快速查找索引 阅读指南 上篇:SQL面试50题--思路解答与分类整理(上)聚合函数与表连接 [第一部分]聚合函数(sum/avg/count/min/max) [第二部分] ...

  7. MySQL中的多表联合查询

    多表联合查询的基本步骤: select 要查询的目标(可以是要查询的列,也可以是聚合函数) from 要查询的目标来自于哪个表 where 查询的条件 group by 分组的依据(分组的依据必须是查 ...

  8. 生产库中遇到mysql的子查询

    使用过oracle或者其他关系数据库的DBA或者开发人员都有这样的经验,在子查询上都认为数据库已经做过优化,能够很好的选择驱动表执行,然后在把该经验移植到mysql数据库上,但是不幸的是,mysql在 ...

  9. mysql语言中子查询是什么_SQL查询语句中,什么叫子查询?

    展开全部 嵌套SELECT语句也叫子查询,一个 SELECT 语句的查询结果能够作为另一个语句的输入值.子查询不但能够出现在Where子句3231313335323631343130323136353 ...

最新文章

  1. 小爱同学app安卓版_这一届的小爱Play,居然连喵都敢撩……
  2. python编程的常用工具_小白学Python(2)——常用Python编程工具,Python IDE
  3. 《Java编程思想》第四版读书笔记 第四章
  4. 43 SD配置-销售凭证设置-定义状态管理授权码
  5. python是值传递还是引用传递_Python里参数是值传递还是引用传递?
  6. 谷歌资助OSTIF审计8个重要开源项目,提升软件供应链安全
  7. Android解析ClassLoader(二)Android中的ClassLoader
  8. python基础:字符串操作
  9. 【学习笔记】Java工程师成神之路-基础篇
  10. 微信小程序分享功能的实现
  11. 11开根号不用计算机,数学开根号有什么方法?不用计算器
  12. 用C语言编程验证 “ 哥德巴赫猜想 ”
  13. 织梦dedecms建站流程
  14. STM32H743 USART1 LL 库
  15. 快速入门——深度学习理论解析与实战应用
  16. php随机给文字拼音,给文字加上拼音_php
  17. 算是我看源码时的一个小技巧吧~
  18. 列表的join方法,类方法formkeys,删除,集合,深浅拷贝赋值,冒泡排序
  19. 通过usb利用adb实现android手机和pc机通信
  20. EVE-ng模拟器安装教程和使用教程

热门文章

  1. HttpClient4.5.2 使用cookie保持会话——连接池的实现结束篇(4)
  2. espcms /public/class_connector.php intval truncation Vul Arbitrary User Login
  3. 学员参观IDC机房及实操实践活动
  4. 修改Android中strings.xml文件, 动态改变数据
  5. Timus 1015. Test the Difference!
  6. 让你在职场游刃有余的10句话
  7. delay 芯片时序output_set_input_delay/set_output_delay
  8. linux lddbus设备,Linux设备驱动程序学习(14)-Linux设备模型(各环节的整合)
  9. 翁恺老师C语言学习笔记(十)指针_运算符取得变量的地址
  10. 在 Microsoft word中插入代码