精选30+云产品,助力企业轻松上云!>>>

点击蓝色“大数据每日哔哔”关注我

加个“星标”,第一时间获取大数据架构,实战经验



区别

最近有不少同学问我,Spark 中 foreachRDD、foreachPartition和foreach 的区别,工作中经常会用错或不知道怎么用,今天简单聊聊它们之间的区别:

其实区别它们很简单,首先是作用范围不同,foreachRDD 作用于 DStream中每一个时间间隔的 RDD,foreachPartition 作用于每一个时间间隔的RDD中的每一个 partition,foreach 作用于每一个时间间隔的 RDD 中的每一个元素。

http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd

SparkStreaming 中对 foreachRDD的说明。

foreach 与 foreachPartition都是在每个partition中对iterator进行操作,不同的是,foreach是直接在每个partition中直接对iterator执行foreach操作,而传入的function只是在foreach内部使用,而foreachPartition是在每个partition中把iterator给传入的function,让function自己对iterator进行处理(可以避免内存溢出)

一个简单的例子

在Spark 官网中,foreachRDD被划分到Output Operations on DStreams中,所有我们首先要明确的是,它是一个输出操作的算子,然后再来看官网对它的含义解释:The most generic output operator that applies a function, func, to each RDD generated from the stream. This function should push the data in each RDD to an external system, such as saving the RDD to files, or writing it over the network to a database. Note that the function func is executed in the driver process running the streaming application, and will usually have RDD actions in it that will force the computation of the streaming RDDs.

最常用的输出操作,需要一个函数作为参数,函数作用于DStream中的每一个RDD,函数将RDD中的数据输出到外部系统,如文件、数据库,在driver上执行

函数中通常要有action算子,因为foreachRDD本身是transform算子

官网还给出了开发者常见的错误:

Often writing data to external system requires creating a connection object (e.g. TCP connection to a remote server) and using it to send data to a remote system. For this purpose, a developer may inadvertently try creating a connection object at the Spark driver, and then try to use it in a Spark worker to save records in the RDDs. For example :(中文解析见代码下方)

// ① 这种写法是错误的 ❌dstream.foreachRDD { rdd =>  val connection = createNewConnection()  // executed at the driver  rdd.foreach { record =>    connection.send(record) // executed at the worker  }}

上面说的是我们使用foreachRDD向外部系统输出数据时,通常要创建一个连接对象,如果像上面的代码中创建在 driver 上就是错误的,因为foreach在每个节点上执行时节点上并没有连接对象。driver节点就一个,而worker节点有多个。

所以,我们改成下面这样:

// ② 把创建连接写在 forech 里面,RDD 中的每个元素都会创建一个连接dstream.foreachRDD { rdd =>  rdd.foreach { record =>    val connection = createNewConnection() // executed at the worker    connection.send(record) // executed at the worker    connection.close()  }}

这时不会出现计算节点没有连接对象的情况。但是,这样写会在每次循环RDD的时候都会创建一个连接,创建连接和关闭连接都很频繁,造成系统不必要的开销。

可以通过使用 foreachPartirion 来解决这类问题:

// ③ 使用foreachPartitoin来减少连接的创建,RDD的每个partition创建一个链接dstream.foreachRDD { rdd =>  rdd.foreachPartition { partitionOfRecords =>    val connection = createNewConnection()    partitionOfRecords.foreach(record => connection.send(record))    connection.close()  }}

上面这种方式还可以优化,虽然连接申请变少了,但是对一每一个partition来说,连接还是没有办法复用,所以我们可以引入静态连接池。官方说明:该连接池必须是静态的、懒加载的。

// ④ 使用静态连接池,可以增加连接的复用、减少连接的创建和关闭。dstream.foreachRDD { rdd =>  rdd.foreachPartition { partitionOfRecords =>    // ConnectionPool is a static, lazily initialized pool of connections    val connection = ConnectionPool.getConnection()    partitionOfRecords.foreach(record => connection.send(record))    ConnectionPool.returnConnection(connection)  // return to the pool for future reuse  }}

这里需要注意的是:使用连接池中的连接应按需创建,如果有一段时间不使用,则应超时,这样实现了向外部系统最有效地发送地数据。

总结

文中所展示的例子也是初学者容易犯的错误,通过分析 RDD 数据输出到第三方系统的例子,我们可以很清晰的理解foreachRDD、foreachPartiton、foreach 的作用范围。

(完)

对了,学累了别忘记起身运动运动

本文分享自微信公众号 - 大数据每日哔哔(bb-bigdata)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

Spark中foreachRDD、foreachPartition和foreach解读相关推荐

  1. Spark中foreachPartition和mapPartitions的区别

    Spark中foreachPartition和mapPartitions的区别 spark的运算操作有两种类型:分别是Transformation和Action,区别如下: Transformatio ...

  2. spark中各类key算子的用法汇总(持续更新中)

    启动方式: spark-shell --master yarn 依赖导入: import org.apache.spark.{SparkConf, SparkContext} 输出rdd的类型举例: ...

  3. java 单例模式打包jar_在 Spark 中实现单例模式的技巧

    单例模式是一种常用的设计模式,但是在集群模式下的 Spark 中使用单例模式会引发一些错误.我们用下面代码作例子,解读在 Spark 中使用单例模式遇到的问题. object Example{ var ...

  4. apache spark_如何将自定义数据源集成到Apache Spark中

    apache spark 如今,流数据是一个热门话题,而Apache Spark是出色的流框架. 在此博客文章中,我将向您展示如何将自定义数据源集成到Spark中. Spark Streaming使我 ...

  5. 如何将自定义数据源集成到Apache Spark中

    如今,流数据是一个热门话题,而Apache Spark是出色的流框架. 在此博客文章中,我将向您展示如何将自定义数据源集成到Spark中. Spark Streaming使我们能够从各种来源进行流传输 ...

  6. spark中local模式与cluster模式使用场景_不可不知的Spark调优点

    不可不知的Spark调优点​mp.weixin.qq.com 在利用Spark处理数据时,如果数据量不大,那么Spark的默认配置基本就能满足实际的业务场景.但是当数据量大的时候,就需要做一定的参数配 ...

  7. 理解spark中的job、stage、task

    什么是Spark? Spark是处理大数据常用的计算引擎.Spark是一个用来实现快速而通用的集群计算的平台.扩展了广泛使用的MapReduce计算模型,而且高效地支持更多的计算模式,包括交互式查询和 ...

  8. spark中local模式与cluster模式使用场景_Spark 知识点 ( 架构 RDD Task )

    1. Spark介绍 Apache Spark是一个围绕速度.易用性和复杂分析构建的大数据处理框架,最初在2009年由加州大学伯克利分校的AMPLab开发,并于2010年成为Apache的开源项目之一 ...

  9. spark中的广播变量broadcast

    Spark中的Broadcast处理 首先先来看一看broadcast的使用代码: val values = List[Int](1,2,3) val broadcastValues = sparkC ...

最新文章

  1. 现在很多技术知识点缺乏来龙去脉的介绍
  2. 初始化JQuery方法与(function(){})(para)匿名方法介绍
  3. 休眠事实:了解刷新操作顺序很重要
  4. mysql 存储引擎作用_MySQL常用存储引擎功能与用法详解
  5. templateref html内容,angular4中的ElemenetRef和TemplateRef之间的区别
  6. 基于序列标注的信息抽取模型(已申请专利)
  7. swift学习笔记《5》- 实用
  8. JQUERY-SELECT 实现下拉框可以搜索、选择
  9. PRM–endRequest事件
  10. Java程序员集合框架面试题
  11. python:TypeError: ‘dict_keys‘ object does not support indexing
  12. Spring Mvc:用MultiPartFile上传单个文件,多个文件
  13. idea 格式化代码时, 不换行
  14. 【Drools】Drools使用入门(一)Drools上手教程(包括动态加载规则文件)
  15. Python 的输出矩阵的一些常用设置
  16. 常见电线电缆电阻的检测方法盘点
  17. java基础—输入/输出
  18. 第三方微信授权登录的iOS代码分析
  19. 一个好的科技公司logo长这样
  20. 带你玩转区块链--以太坊基础、发币、基于智能合约实现彩票项目-第二章-第一节【以太坊篇】

热门文章

  1. WinForm下屏幕截图程序的实现
  2. Deno + Oak 构建酷炫的 Todo API
  3. 使用PostgREST的RestAPI操作之相关软件生态系统
  4. 容器编排技术 -- Kubernetes入门概述
  5. Hadoop教程(三)HDFS文件系统Shell命令
  6. ZooKeeper管理员指南
  7. Homebrew命令详解
  8. 虚拟机网络连接三种方式(桥接、NAT、主机)
  9. 从厕所排队引发的产品设计方案思考
  10. 【C语言】一堆数组中存放了10个小于100的整数,请编程对所有数据按照从小到大的顺序进行排序,若个位数相等,则按照十位从小到大的顺序排序,输出排序后的结果...