初识Flink广播变量broadcast
Broadcast 广播变量:可以理解为是一个公共的共享变量,我们可以把一个dataset 或者不变的缓存对象(例如map list集合对象等)数据集广播出去,然后不同的任务在节点上都能够获取到,并在每个节点上只会存在一份,而不是在每个并发线程中存在。如果不使用broadcast,则在每个节点中的每个任务中都需要拷贝一份dataset数据集,比较浪费内存(也就是一个节点中可能会存在多份dataset数据)。
import org.apache.flink.api.common.functions.RichMapFunction import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.flink.configuration.Configuration import scala.collection.mutable.ListBufferobject BatchDemoBroadcastScala {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.api.scala._//1: 准备需要广播的数据val broadData = ListBuffer[Tuple2[String,Int]]()broadData.append(("zs",18))broadData.append(("ls",20))broadData.append(("ww",17))//1.1处理需要广播的数据val tupleData = env.fromCollection(broadData)val toBroadcastData = tupleData.map(tup=>{Map(tup._1->tup._2)})val text = env.fromElements("zs","ls","ww")val result = text.map(new RichMapFunction[String,String] {var listData: java.util.List[Map[String,Int]] = nullvar allMap = Map[String,Int]()override def open(parameters: Configuration): Unit = {super.open(parameters)this.listData = getRuntimeContext.getBroadcastVariable[Map[String,Int]]("broadcastMapName")val it = listData.iterator()while (it.hasNext){val next = it.next()allMap = allMap.++(next)}}override def map(value: String) = {val age = allMap.get(value).getvalue+","+age}}).withBroadcastSet(toBroadcastData,"broadcastMapName")result.print()} }
1、设置广播变量
在某个需要用到该广播变量的算子后调用withBroadcastSet(var1, var2)进行设置,var1为需要广播变量的变量名,var2是自定义变量名,为String类型。注意,被广播的变量只能为DataSet类型,不能为List、Int、String等类型。
2、
获取广播变量
创建该算子对应的富函数类,例如map函数的富函数类是RichMapFunction,该类有两个构造参数,第一个参数为算子输入数据类型,第二个参数为算子输出数据类型。首先创建一个Traversable[_]接口用于接收广播变量并初始化为空,接收类型与算子输入数据类型相对应;然后重写open函数,通过getRuntimeContext.getBroadcastVariable[_](var)获取到广播变量,var即为设置广播变量时的自定义变量名,类型为String,open函数在算子生命周期的初始化阶段便会调用;最后在map方法中对获取到的广播变量进行访问及其它操作。
参考:
https://blog.csdn.net/fct2001140269/article/details/84402798
https://blog.csdn.net/qq_34842671/article/details/80746593
转载于:https://www.cnblogs.com/linkmust/p/10901731.html
初识Flink广播变量broadcast相关推荐
- Flink 广播变量
广播变量简介 在Flink中,同一个算子可能存在若干个不同的并行实例,计算过程可能不在同一个Slot中进行,不同算子之间更是如此,因此不同算子的计算数据之间不能像Java数组之间一样互相访问,而广播变 ...
- spark共享变量(广播变量Broadcast Variable,累加器Accumulators)
2019独角兽企业重金招聘Python工程师标准>>> 一般来说,当一个函数被传递给Spark操作(例如map和reduce),在一个远程集群上运行,它实际上操作的是这个函数用到的所 ...
- spark中的广播变量broadcast
Spark中的Broadcast处理 首先先来看一看broadcast的使用代码: val values = List[Int](1,2,3) val broadcastValues = sparkC ...
- Spark广播变量Broadcast
注意:原文出处https://www.jianshu.com/p/3bd18acd2f7f Broadcast 顾名思义,broadcast 就是将数据从一个节点发送到其他各个节点上去.这样的场景很多 ...
- SparkCore:RDD累加器和广播变量(最详细的介绍)!!!!!!
RDD累加器和广播变量 在默认情况下,当Spark在集群的多个不同节点的多个任务上并行运行一个函数时,在每个任务上都生成一个副本.但是,有时候需要在多个任务之间共享变量,或者在任务(Task)和任务控 ...
- Flink的累加器和广播变量、广播流、分布式缓存
1.Accumulator累加器 Accumulator即累加器,与Mapreduce counter的应用场景差不多,都能很好地观察task在运行期间的数据变化.可以在Flink job任务中的算 ...
- Broadcast Variables广播变量
Flink Broadcast Variables: Broadcast variables允许你创建一个数据集在所有的并行操作节点都能获取到,除了常规的输入操作.针对一些小的依赖数据集,这种方式是非 ...
- Flink实操 : 广播变量/累加器/分布式缓存
. 一 .前言 二 .广播变量使用 2.1.前言 2.2. 使用 三 .累加器 3.1. 前言 3.2. 使用 四 .分布式缓存 4.1. 前言 4.2.使用 一 .前言 二 .广播变量使用 2.1. ...
- Spark广播变量与累加器
在之前的文章中,我介绍了flink广播状态,从而了解了flink广播状态实际上就是将一个流广播到下游所有算子之中.在本文中我将介绍spark中类似的概念,为了方便理解,先放张spark应用程序架构图. ...
最新文章
- 【gradle】问题及解决
- kettle 如何使用java代码
- flex的mxmlc命令行编译as3文件成swf
- oracle+监控索引使用,ORACLE 监控索引的使用
- 第一篇:数据库服务概述
- 【摩天好课推荐】2 Python语言入门
- 经典C语言程序100例之十六
- Objective-C学习笔记--NSLog用法及例子
- matlab与ie交互
- php 日期加减处理函数,php日期加减处理函数示例
- Variant类型转换成CString代码
- Python编程一定要注意的那些“坑”(七)
- Algs4-1.5.11实现加权quick-find算法
- 易语言webservice接口_易语言webservice接口调用助
- 基于深度学习的图像匹配技术一览
- VMware如何安装windows10教程
- 从中医的角度认识感冒
- 腾讯云发送短信验证码
- VUE + CSS画三角形
- 一名IT界“老”技术人关于学习与成长的分享,受益!
热门文章
- sqlite主键会加速吗_股指分歧中创下3107新高,权重主动下蹲蓄势,后市会加速吗?...
- activate tensorflow_“量子固件”来了!利用 TensorFlow 提升量子计算硬件性能
- 如何确定自己是否适合做程序员?
- 【Java 多线程】多线程带来的的风险-线程安全、多线程五个经典案例
- latch.await java有什么作用,Android系统。 Countdownlatch.await不起作用
- php地址选择插件,微信小程序中关于三级联动地址选择器的实例分享
- ddos攻击数据集_ddos攻击和cc攻击有什么区别?他们2个哪个更厉害?
- 接口规范 5. 点播流相关接口
- oracle手注,oracle手注
- word插入visio图显示不完全