Spark中的Broadcast处理

首先先来看一看broadcast的使用代码:

val values = List[Int](1,2,3)

val broadcastValues = sparkContext.broadcast(values)

rdd.mapPartitions(iter => {

broadcastValues.getValue.foreach(println)

})

在上面的代码中,首先生成了一个集合变量,把这个变量通过sparkContext的broadcast函数进行广播,

最后在rdd的每个partition的迭代时,使用这个广播变量.

接下来看看广播变量的生成与数据的读取实现部分:

def broadcast[T: ClassTag](value: T): Broadcast[T] = {
  assertNotStopped()
  if (classOf[RDD[_]].isAssignableFrom(classTag[T].runtimeClass)) {

这里要注意,使用broadcast时,不能直接对RDD进行broadcast的操作.
    // This is a warning instead of an exception in order to avoid breaking

//       user programs that
    // might have created RDD broadcast variables but not used them:
    logWarning("Can not directly broadcast RDDs; instead, call collect() and "
      + "broadcast the result (see SPARK-5063)")
  }

通过broadcastManager中的newBroadcast函数来进行广播.
  val bc = env.broadcastManager.newBroadcast[T](value, isLocal)
  val callSite = getCallSite
  logInfo("Created broadcast " + bc.id + " from " + callSite.shortForm)
  cleaner.foreach(_.registerBroadcastForCleanup(bc))
  bc
}

在BroadcastManager中生成广播变量的函数,这个函数直接使用的broadcastFactory的相应函数.

broadcastFactory的实例通过配置spark.broadcast.factory,

默认是TorrentBroadcastFactory.

def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean): Broadcast[T] = {
  broadcastFactory.newBroadcast[T](value_, isLocal,

nextBroadcastId.getAndIncrement())
}

在TorrentBroadcastFactory中生成广播变量的函数:

在这里面,直接生成了一个TorrentBroadcast的实例.

override def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: Long)

: Broadcast[T] = {
  new TorrentBroadcast[T](value_, id)
}

TorrentBroadcast实例生成时的处理流程:

这里基本的代码部分是直接写入这个要广播的变量,返回的值是这个变量所占用的block的个数.

Broadcast的block的大小通过spark.broadcast.blockSize配置.默认是4MB,

Broadcast的压缩是否通过spark.broadcast.compress配置,默认是true表示启用,默认情况下使用snappy的压缩.

private val broadcastId = BroadcastBlockId(id)
/** Total number of blocks this broadcast variable contains. */
private val numBlocks: Int = writeBlocks(obj)

接下来生成一个lazy的属性,这个属性仅仅有在详细的使用时,才会运行,在实例生成时不运行(上面的演示样例中的getValue.foreach时运行).

@transient private lazy val _value: T = readBroadcastBlock()

override protected def getValue() = {
  _value
}

看看实例生成时的writeBlocks的函数:

private def writeBlocks(value: T): Int = {

这里先把这个广播变量保存一份到当前的task的storage中,这样做是保证在读取时,假设要使用这个广播变量的task就是本地的task时,直接从blockManager中本地读取.
  SparkEnv.get.blockManager.putSingle(broadcastId, value,

StorageLevel.MEMORY_AND_DISK,
    tellMaster = false)

这里依据block的设置大小,对value进行序列化/压缩分块,每个块的大小为blocksize的大小,
  val blocks =
    TorrentBroadcast.blockifyObject(value, blockSize, SparkEnv.get.serializer,

compressionCodec)

这里把序列化并压缩分块后的blocks进行迭代,存储到blockManager中,
  blocks.zipWithIndex.foreach { case (block, i) =>
    SparkEnv.get.blockManager.putBytes(
      BroadcastBlockId(id, "piece" + i),
      block,
      StorageLevel.MEMORY_AND_DISK_SER,
      tellMaster = true)
  }

这个函数的返回值是一个int类型的值,这个值就是序列化压缩存储后block的个数.
  blocks.length
}

在我们的演示样例中,使用getValue时,会运行实例初始化时定义的lazy的函数readBroadcastBlock:

private def readBroadcastBlock(): T = Utils.tryOrIOException {
  TorrentBroadcast.synchronized {
    setConf(SparkEnv.get.conf)

这里先从local端的blockmanager中直接读取storage中相应此广播变量的内容,假设能读取到,表示这个广播变量已经读取过来或者说这个task就是广播的本地executor.
    SparkEnv.get.blockManager.getLocal(broadcastId).map(_.data.next()) match {
      case Some(x) =>
        x.asInstanceOf[T]

以下这部分运行时,表示这个广播变量在当前的executor中是第一次读取,通过readBlocks函数去读取这个广播变量的全部的blocks,反序列化后,直接把这个广播变量存储到本地的blockManager中,下次读取时,就能够直接从本地进行读取.
      case None =>
        logInfo("Started reading broadcast variable " + id)
        val startTimeMs = System.currentTimeMillis()
        val blocks = readBlocks()
        logInfo("Reading broadcast variable " + id + " took" +

Utils.getUsedTimeMs(startTimeMs))

val obj = TorrentBroadcast.unBlockifyObject[T](
          blocks, SparkEnv.get.serializer, compressionCodec)
        // Store the merged copy in BlockManager so other tasks on this executor don't
        // need to re-fetch it.
        SparkEnv.get.blockManager.putSingle(
          broadcastId, obj, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
        obj
    }
  }
}

最后再看看readBlocks函数的处理流程:

private def readBlocks(): Array[ByteBuffer] = {

这里定义的变量用于存储读取到的block的信息,numBlocks是广播变量序列化后所占用的block的个数.
  val blocks = new Array[ByteBuffer](numBlocks)
  val bm = SparkEnv.get.blockManager

这里開始迭代读取每个block的内容,这里的读取是先从local中进行读取,假设local中没有读取到数据时,通过blockManager读取远端的数据,通过读取这个block相应的location从这个location去读取这个block的内容,并存储到本地的blockManager中.最后,这个函数返回读取到的blocks的集合.
  for (pid <- Random.shuffle(Seq.range(0, numBlocks))) {
    val pieceId = BroadcastBlockId(id, "piece" + pid)
    logDebug(s"Reading piece $pieceId of $broadcastId")

def getLocal: Option[ByteBuffer] = bm.getLocalBytes(pieceId)
    def getRemote: Option[ByteBuffer] = bm.getRemoteBytes(pieceId).map { block =>
      SparkEnv.get.blockManager.putBytes(
        pieceId,
        block,
        StorageLevel.MEMORY_AND_DISK_SER,
        tellMaster = true)
      block
    }
    val block: ByteBuffer = getLocal.orElse(getRemote).getOrElse(
      throw new SparkException(s"Failed to get $pieceId of $broadcastId"))
    blocks(pid) = block
  }
  blocks
}

spark中的广播变量broadcast相关推荐

  1. spark共享变量(广播变量Broadcast Variable,累加器Accumulators)

    2019独角兽企业重金招聘Python工程师标准>>> 一般来说,当一个函数被传递给Spark操作(例如map和reduce),在一个远程集群上运行,它实际上操作的是这个函数用到的所 ...

  2. Spark广播变量Broadcast

    注意:原文出处https://www.jianshu.com/p/3bd18acd2f7f Broadcast 顾名思义,broadcast 就是将数据从一个节点发送到其他各个节点上去.这样的场景很多 ...

  3. Spark _15 _广播变量和累加器

    广播变量和累加器 广播变量 广播变量理解图 未使用广播变量: package SparkRadioimport org.apache.spark.rdd.RDD import org.apache.s ...

  4. 初识Flink广播变量broadcast

    Broadcast 广播变量:可以理解为是一个公共的共享变量,我们可以把一个dataset 或者不变的缓存对象(例如map list集合对象等)数据集广播出去,然后不同的任务在节点上都能够获取到,并在 ...

  5. android中的广播退出机制broadcast

    当我们在一个anctivity中需要关闭其他已经打开的activity的时候,广播退出机制是一个比较好的办法 下面来看源码,首先,我们需要先写一个父类:BaseActivity.java packag ...

  6. Spark 广播变量和累加器

    Spark 的一个核心功能是创建两种特殊类型的变量:广播变量和累加器 广播变量(groadcast varible)为只读变量,它有运行SparkContext的驱动程序创建后发送给参与计算的节点.对 ...

  7. 【Spark】广播变量和累加器

    文章目录 一.Spark广播变量 二.累加器 Reference 一.Spark广播变量 多进程编程中,不同进程可以通过创建共享内存,进行进程间通信.而在分布式中,Spark通过[广播变量]和[累加器 ...

  8. Spark之cache ,persist ,checkpoint ,广播变量及其案例 : 根据IP地址(浏览器访问日志获取) / 经度纬度定位地理位置案例(7)

    一  cache 和 persist 和 unpersist 1  cache 和 persist 1.1  cache 和 persist 的使用场景 (为什么使用 ?) 一个 applicatio ...

  9. 【原创】大叔问题定位分享(11)Spark中对大表子查询加limit为什么会报Broadcast超时错误...

    当两个表需要join时,如果一个是大表,一个是小表,正常的map-reduce流程需要shuffle,这会导致大表数据在节点间网络传输,常见的优化方式是将小表读到内存中并广播到大表处理,避免shuff ...

最新文章

  1. gRPC学习记录(二)--Hello World
  2. python用random产生验证码,以及random的一些其他用法
  3. ANSYS——相同模型不同创建方式的同载荷同约束下的比较
  4. zookeeper结构和命令详解
  5. Qt 自定义事件的实现
  6. 关于Cococs中的CCActionEase(下)
  7. vscode 乱码_如何使用VS Code 编辑Keil项目(51/STM32)
  8. 诗歌的创作、诗词总结与应用
  9. Python自学记录--steam密码加密逆向
  10. 友华PT939G移动光猫开启telnet获取配置文件
  11. 关于teamviewer不能进行局域网连接的问题
  12. 1024 程序员节狂欢盛会,等了一年终于来了!
  13. 大一新生能从ACM比赛中得到什么?
  14. linux下qt实现计算器,QT实现简单计算器功能
  15. 【嵌入式】基于平头哥内核的W801 WIFI SoC的OTA方案设计
  16. OAuth2客户端springsecurity5 - 神经病的缥缈之旅
  17. php tp3.2 添加表内容,数据创建 · ThinkPHP3.2.3完全开发手册 · 看云
  18. 文本数据“关键词渲染”的高频词可视化——词云图。
  19. 为什么公司不愿意要Java培训班学员
  20. KVM学习(四)windows server半虚拟化驱动virtio

热门文章

  1. 一个复杂系统的拆分改造实践
  2. Mongodb账号密码模式的基本认证
  3. 【极品手机推荐】安卓3G运存16G内存,相机1300+500,三星高画质显示J7109|J7108
  4. SparkSql官方文档中文翻译(java版本)
  5. linux 下mysql安装配置管理以及优化
  6. Javascript实现浏览器菜单命令
  7. Eclipse插件开发入门
  8. php 重载等号,重载运算符
  9. 安卓10省电还是费电_iOS 13省电教程:关掉这8个功能iPhone多用3小时
  10. MySQL子查询介绍