目录

共享变量

广播变量

累加器

​​​​​​​案例演示


共享变量

在默认情况下,当Spark在集群的多个不同节点的多个任务上并行运行一个函数时,它会把函数中涉及到的每个变量,在每个任务上都生成一个副本。但是,有时候需要在多个任务之间共享变量,或者在任务(Task)和任务控制节点(Driver Program)之间共享变量。

为了满足这种需求,Spark提供了两种类型的变量:

 1)、广播变量Broadcast Variables

广播变量用来把变量在所有节点的内存之间进行共享,在每个机器上缓存一个只读的变量,而不是为机器上的每个任务都生成一个副本;

  2)、累加器Accumulators

累加器支持在所有不同节点之间进行累加计算(比如计数或者求和);

官方文档:http://spark.apache.org/docs/2.4.5/rdd-programming-guide.html#shared-variables

​​​​​​​广播变量

广播变量允许开发人员在每个节点(Worker or Executor)缓存只读变量,而不是在Task之间传递这些变量。使用广播变量能够高效地在集群每个节点创建大数据集的副本。同时Spark还使用高效的广播算法分发这些变量,从而减少通信的开销。

可以通过调用sc.broadcast(v)创建一个广播变量,该广播变量的值封装在v变量中,可使用获取该变量value的方法进行访问。

​​​​​​​累加器

Spark提供的Accumulator,主要用于多个节点对一个变量进行共享性的操作。Accumulator只提供了累加的功能,即确提供了多个task对一个变量并行操作的功能。但是task只能对Accumulator进行累加操作,不能读取Accumulator的值,只有Driver程序可以读取Accumulator的值。创建的Accumulator变量的值能够在Spark Web UI上看到,在创建时应该尽量为其命名。

Spark内置了三种类型的Accumulator,分别是LongAccumulator用来累加整数型,DoubleAccumulator用来累加浮点型,CollectionAccumulator用来累加集合元素

当内置的Accumulator无法满足要求时,可以继承AccumulatorV2实现自定义的累加器。实现自定义累加器的步骤:

第一步、继承AccumulatorV2,实现相关方法;

第二步、创建自定义Accumulator的实例,然后在SparkContext上注册它;

官方提供实例如下:

​​​​​​​案例演示

以词频统计WordCount程序为例,假设处理的数据如下所示,包括非单词符合,统计数据词频时过滤非单词的特殊符号并且统计总的格式。

实现功能:

 第一、过滤特殊字符

非单词符合存储列表List中

使用广播变量广播列表

 第二、累计统计非单词符号出现次数

定义一个LongAccumulator累加器,进行计数

示例代码:

package cn.itcast.coreimport org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.util.LongAccumulator
import org.apache.spark.{SparkConf, SparkContext}/*** 基于Spark框架使用Scala语言编程实现词频统计WordCount程序,将符号数据过滤,并统计出现的次数* -a. 过滤标点符号数据* 使用广播变量* -b. 统计出标点符号数据出现次数* 使用累加器*/
object SparkSharedVariableTest {def main(args: Array[String]): Unit = {val sparkConf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[*]")val sc: SparkContext = new SparkContext(sparkConf)sc.setLogLevel("WARN")// 读取文件数据val datasRDD: RDD[String] = sc.textFile("data/input/words2.txt", minPartitions = 2)// 字典数据,只要有这些单词就过滤: 特殊字符存储列表List中val list: List[String] = List(",", ".", "!", "#", "$", "%")// 通过广播变量 将列表list广播到各个Executor内存中,便于多个Task使用val listBroadcast: Broadcast[List[String]] = sc.broadcast(list)// 定义累加器,记录单词为符号数据的个数val accumulator: LongAccumulator = sc.longAccumulator("mycounter")// 分割单词,过滤数据val wordsRDD = datasRDD// 1)、过滤数据,去除空行数据.filter(line => line != null && line.trim.length > 0)// 2)、分割单词.flatMap(_.trim.split("\\s+"))// 3)、过滤字典数据:符号数据.filter(word => {// 获取符合列表 ,从广播变量中获取列表list的值val listValue = listBroadcast.value// 判断单词是否为符号数据,如果是就过滤掉val isCharacter = listValue.contains(word)if (isCharacter) {// 如果单词为符号数据,累加器加1accumulator.add(1L)}!isCharacter})val resultRDD: RDD[(String, Int)] = wordsRDD// 转换为二元组.mapPartitions(iter => {iter.map((_, 1))})// 按照单词聚合统计.reduceByKey(_+_)resultRDD.foreach(println)println(s"过滤符合数据的个数:${accumulator.value}")// 应用程序运行结束,关闭资源sc.stop()}
}

也可以通过WEB UI查看累加器的值

2021年大数据Spark(十九):Spark Core的​​​​​​​共享变量相关推荐

  1. 2021年大数据Kafka(九):kafka消息存储及查询机制原理

    全网最详细的大数据Kafka文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 kafka消息存储及查询机制原理 一.Kafka数据存储机制 ...

  2. 2021年大数据HBase(九):Apache Phoenix的安装

    全网最详细的大数据HBase文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 前言 系列历史文章 安装Phoenix 一.下载 二.安装 1.上传安装包 ...

  3. 2021年大数据Hive(九):Hive的数据压缩

    全网最详细的大数据Hive文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 前言 Hive的数据压缩 一.MR支持的压缩编码 二.压缩配置 ...

  4. 2021年大数据发展十大趋势:抓准一个,就能掌握先机!

    导读:如何激活数据价值.真正从大数据中"淘金",成为2021年大数据的重中之重.大数据究竟如何持续不断地影响组织和机构,以及它给这个世界带来了何种影响,本文特别梳理出2021年大数 ...

  5. 2021年大数据Hadoop(九):HDFS的高级使用命令

    2021大数据领域优质创作博客,带你从入门到精通,该博客每天更新,逐渐完善大数据各个知识体系的文章,帮助大家更高效学习. 有对大数据感兴趣的可以关注微信公众号:三帮大数据 目录 HDFS的高级使用命令 ...

  6. 2021年大数据ELK(九):使用VSCode测试分词器

    全网最详细的大数据ELK文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 使用VSCode测试分词器 一.准备VSCode开发环境 1.打开VSCode ...

  7. 2021年大数据Flink(九):Flink原理初探

    Flink原理初探 Flink角色分工 在实际生产中,Flink 都是以集群在运行,在运行的过程中包含了两类进程. JobManager: 它扮演的是集群管理者的角色,负责调度任务.协调 checkp ...

  8. 2021年大数据Hadoop(二十九):​​​​​​​关于YARN常用参数设置

    全网最详细的Hadoop文章系列,强烈建议收藏加关注! 后面更新文章都会列出历史文章目录,帮助大家回顾知识重点. 目录 本系列历史文章 前言 关于yarn常用参数设置 设置container分配最小内 ...

  9. 2021年大数据Hadoop(三十):Hadoop3.x的介绍

    全网最详细的Hadoop文章系列,强烈建议收藏加关注! 后面更新文章都会列出历史文章目录,帮助大家回顾知识重点. 目录 本系列历史文章 前言 Hadoop3.x的介绍 介绍 Hadoop 3.0新特性 ...

  10. 2021年大数据Hadoop(二十五):YARN通俗介绍和基本架构

    全网最详细的Hadoop文章系列,强烈建议收藏加关注! 后面更新文章都会列出历史文章目录,帮助大家回顾知识重点. 目录 本系列历史文章 前言 YARN通俗介绍和基本架构 Yarn通俗介绍 Yarn基本 ...

最新文章

  1. Linux 中SeLinux 的禁用方式
  2. pdo php分页6,php pdo自动分页类代码与例子
  3. svn教程----svn简介
  4. [改善Java代码]性能考虑,数组是首选
  5. javaone_JavaOne 2012:JavaOne技术主题演讲
  6. Linux 安装 jdk ( 两种方式 )
  7. 《Programming WPF》翻译 第7章 4.转换
  8. apiclod 上传图片_Apicloud——关于上传图片、视频(二)
  9. gromacs 安装_GROMACS:粗粒化力场建立和模拟上线!
  10. cas client 更新ticket_有人知道 cas单点登录系统是怎么样取得proxyticket的?
  11. 电子计算机的速度快精确度,都21世纪了,还不来了解了解什么叫compute?
  12. 深入Node.js的模块机制
  13. PHPSTORM 6.0.3 Xdebug 配置9
  14. 二维条码 PDF417 zxing-cpp解码详细过程
  15. HBuilderX安装教程
  16. 信号失真度测量装置(A 题)--2021 年全国大学生电子设计竞赛试题
  17. web基础(一)——初识HTML5
  18. React Native之携程Moles框架
  19. 广东专升本C语言程序设计(完结)
  20. 在实体引用中,实体名称必须紧跟在 ‘’后面

热门文章

  1. JS Uncaught SyntaxError:Unexpected identifier异常报错原因及其解决方法
  2. mount 挂载磁盘报错 mount: wrong fs type, bad option, bad superblock on
  3. 路由器远程登陆配置:01一个人登陆多台设备
  4. Redis常见面试问题汇总及解析
  5. pycharm配置远程调试docker
  6. LeetCode简单题之整数的各位积和之差
  7. 驾驶员行为监控系统:需要它来管理车队
  8. Google Pixel 超分辨率--Super Resolution Zoom
  9. Android json请求格式与from表单格式
  10. 微信小程序根据后台返回值设置自己想要的结果