Broadcast 广播变量:可以理解为是一个公共的共享变量,我们可以把一个dataset 或者不变的缓存对象(例如map list集合对象等)数据集广播出去,然后不同的任务在节点上都能够获取到,并在每个节点上只会存在一份,而不是在每个并发线程中存在。如果不使用broadcast,则在每个节点中的每个任务中都需要拷贝一份dataset数据集,比较浪费内存(也就是一个节点中可能会存在多份dataset数据)。

import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration
import scala.collection.mutable.ListBufferobject BatchDemoBroadcastScala {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.api.scala._//1: 准备需要广播的数据val broadData = ListBuffer[Tuple2[String,Int]]()broadData.append(("zs",18))broadData.append(("ls",20))broadData.append(("ww",17))//1.1处理需要广播的数据val tupleData = env.fromCollection(broadData)val toBroadcastData = tupleData.map(tup=>{Map(tup._1->tup._2)})val text = env.fromElements("zs","ls","ww")val result = text.map(new RichMapFunction[String,String] {var listData: java.util.List[Map[String,Int]] = nullvar allMap  = Map[String,Int]()override def open(parameters: Configuration): Unit = {super.open(parameters)this.listData = getRuntimeContext.getBroadcastVariable[Map[String,Int]]("broadcastMapName")val it = listData.iterator()while (it.hasNext){val next = it.next()allMap = allMap.++(next)}}override def map(value: String) = {val age = allMap.get(value).getvalue+","+age}}).withBroadcastSet(toBroadcastData,"broadcastMapName")result.print()}
}

1、设置广播变量
  在某个需要用到该广播变量的算子后调用withBroadcastSet(var1, var2)进行设置,var1为需要广播变量的变量名,var2是自定义变量名,为String类型。注意,被广播的变量只能为DataSet类型,不能为List、Int、String等类型。
2、

获取广播变量
创建该算子对应的富函数类,例如map函数的富函数类是RichMapFunction,该类有两个构造参数,第一个参数为算子输入数据类型,第二个参数为算子输出数据类型。首先创建一个Traversable[_]接口用于接收广播变量并初始化为空,接收类型与算子输入数据类型相对应;然后重写open函数,通过getRuntimeContext.getBroadcastVariable[_](var)获取到广播变量,var即为设置广播变量时的自定义变量名,类型为String,open函数在算子生命周期的初始化阶段便会调用;最后在map方法中对获取到的广播变量进行访问及其它操作。

参考:

https://blog.csdn.net/fct2001140269/article/details/84402798

https://blog.csdn.net/qq_34842671/article/details/80746593

转载于:https://www.cnblogs.com/linkmust/p/10901731.html

初识Flink广播变量broadcast相关推荐

  1. Flink 广播变量

    广播变量简介 在Flink中,同一个算子可能存在若干个不同的并行实例,计算过程可能不在同一个Slot中进行,不同算子之间更是如此,因此不同算子的计算数据之间不能像Java数组之间一样互相访问,而广播变 ...

  2. spark共享变量(广播变量Broadcast Variable,累加器Accumulators)

    2019独角兽企业重金招聘Python工程师标准>>> 一般来说,当一个函数被传递给Spark操作(例如map和reduce),在一个远程集群上运行,它实际上操作的是这个函数用到的所 ...

  3. spark中的广播变量broadcast

    Spark中的Broadcast处理 首先先来看一看broadcast的使用代码: val values = List[Int](1,2,3) val broadcastValues = sparkC ...

  4. Spark广播变量Broadcast

    注意:原文出处https://www.jianshu.com/p/3bd18acd2f7f Broadcast 顾名思义,broadcast 就是将数据从一个节点发送到其他各个节点上去.这样的场景很多 ...

  5. SparkCore:RDD累加器和广播变量(最详细的介绍)!!!!!!

    RDD累加器和广播变量 在默认情况下,当Spark在集群的多个不同节点的多个任务上并行运行一个函数时,在每个任务上都生成一个副本.但是,有时候需要在多个任务之间共享变量,或者在任务(Task)和任务控 ...

  6. Flink的累加器和广播变量、广播流、分布式缓存

    1.Accumulator累加器  Accumulator即累加器,与Mapreduce counter的应用场景差不多,都能很好地观察task在运行期间的数据变化.可以在Flink job任务中的算 ...

  7. Broadcast Variables广播变量

    Flink Broadcast Variables: Broadcast variables允许你创建一个数据集在所有的并行操作节点都能获取到,除了常规的输入操作.针对一些小的依赖数据集,这种方式是非 ...

  8. Flink实操 : 广播变量/累加器/分布式缓存

    . 一 .前言 二 .广播变量使用 2.1.前言 2.2. 使用 三 .累加器 3.1. 前言 3.2. 使用 四 .分布式缓存 4.1. 前言 4.2.使用 一 .前言 二 .广播变量使用 2.1. ...

  9. Spark广播变量与累加器

    在之前的文章中,我介绍了flink广播状态,从而了解了flink广播状态实际上就是将一个流广播到下游所有算子之中.在本文中我将介绍spark中类似的概念,为了方便理解,先放张spark应用程序架构图. ...

最新文章

  1. 【gradle】问题及解决
  2. kettle 如何使用java代码
  3. flex的mxmlc命令行编译as3文件成swf
  4. oracle+监控索引使用,ORACLE 监控索引的使用
  5. 第一篇:数据库服务概述
  6. 【摩天好课推荐】2 Python语言入门
  7. 经典C语言程序100例之十六
  8. Objective-C学习笔记--NSLog用法及例子
  9. matlab与ie交互
  10. php 日期加减处理函数,php日期加减处理函数示例
  11. Variant类型转换成CString代码
  12. Python编程一定要注意的那些“坑”(七)
  13. Algs4-1.5.11实现加权quick-find算法
  14. 易语言webservice接口_易语言webservice接口调用助
  15. 基于深度学习的图像匹配技术一览
  16. VMware如何安装windows10教程
  17. 从中医的角度认识感冒
  18. 腾讯云发送短信验证码
  19. VUE + CSS画三角形
  20. 一名IT界“老”技术人关于学习与成长的分享,受益!

热门文章

  1. sqlite主键会加速吗_股指分歧中创下3107新高,权重主动下蹲蓄势,后市会加速吗?...
  2. activate tensorflow_“量子固件”来了!利用 TensorFlow 提升量子计算硬件性能
  3. 如何确定自己是否适合做程序员?
  4. 【Java 多线程】多线程带来的的风险-线程安全、多线程五个经典案例
  5. latch.await java有什么作用,Android系统。 Countdownlatch.await不起作用
  6. php地址选择插件,微信小程序中关于三级联动地址选择器的实例分享
  7. ddos攻击数据集_ddos攻击和cc攻击有什么区别?他们2个哪个更厉害?
  8. 接口规范 5. 点播流相关接口
  9. oracle手注,oracle手注
  10. word插入visio图显示不完全