Flink支持广播变量,就是将数据广播到具体的taskmanager上,数据存储在内存中,这样可以减缓大量的shuffle操作;

比如在数据join阶段,不可避免的就是大量的shuffle操作,我们可以把其中一个dataSet广播出去,一直加载到taskManager的内存中,可以直接在内存中拿数据,避免了大量的shuffle,导致集群性能下降;

注意:因为广播变量是要把dataset广播到内存中,所以广播的数据量不能太大,否则会出现OOM这样的问题

Broadcast:Broadcast是通过withBroadcastSet(dataset,string)来注册的

Access:通过getRuntimeContext().getBroadcastVariable(String)访问广播变量

代码如下:

package com.yeahmobi.testimport org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironmentimport scala.collection.mutable
import scala.util.Random
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configurationimport scala.collection.mutable.ArrayBufferobject BroadCastTest {def main(args: Array[String]): Unit = {val env =ExecutionEnvironment.getExecutionEnvironment//TODO data2 join data3 的数据,使用广播变量完成val data2= new mutable.MutableList[(Int,Long,String)]data2.+=((1,1L,"Hi"))data2.+=((2, 2L, "Hello"))data2.+=((3, 2L, "Hello world"))val ds1 = env.fromCollection(Random.shuffle(data2))
//    ds1.print()val data3 = new mutable.MutableList[(Int,Long,Int,String,Long)]data3.+=((1,1L,0,"Hello",1L)) //.+  scala 万物皆函数 +也是函数名data3.+=((2, 2L, 1, "Hallo Welt", 2L))data3.+=((2, 3L, 2, "Hallo Welt wie", 1L))val ds2 = env.fromCollection(Random.shuffle(data3))//todo 使用内部类RichMapFunction,提供open和map,可以完成join的操作//abstract class RichMapFunction<IN, OUT>  in是输入类型,//out是输出类型val result: DataSet[ArrayBuffer[(Int, Long, String, String)]] = ds1.map(mapper = new RichMapFunction[(Int, Long, String), ArrayBuffer[(Int, Long, String, String)]] {var brodCast:mutable.Buffer[(Int, Long, Int, String, Long)] = null//爷爷类的AbstractRichFunction 的open方法override def open(parameters:Configuration):Unit ={
//        import scala.collection.JavaConversions._import scala.collection.JavaConverters._//asScala 需要使用隐式转换//获取放到广播变量中brodCast = this.getRuntimeContext.getBroadcastVariable[(Int,Long,Int,String,Long)]("ds2").asScala}//这里的value就是ds1的每一行数据override def map(value: (Int, Long, String)): ArrayBuffer[(Int, Long, String, String)] = {val toArray: Array[(Int, Long, Int, String, Long)] =brodCast.toArrayval array = new mutable.ArrayBuffer[(Int,Long,String,String)]()var index = 0var a:(Int,Long,String,String)= nullwhile(index < toArray.size){if(value._2 == toArray(index)._5){a = (value._1,value._2,value._3,toArray(index)._4)array += a}index = index + 1}array}}).withBroadcastSet(ds2,"ds2")println(result.collect())}
}

pom文件为:


<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.lin</groupId><artifactId>Flink-Demo</artifactId><version>0.0.1-SNAPSHOT</version><name>${project.artifactId}</name><properties><maven.compiler.source>1.6</maven.compiler.source><maven.compiler.target>1.6</maven.compiler.target><encoding>UTF-8</encoding><scala.version>2.11.5</scala.version><scala.compat.version>2.11</scala.compat.version></properties><dependencies><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_2.11</artifactId><version>1.1.2</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.11</artifactId><version>1.1.2</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.11</artifactId><version>1.1.2</version></dependency></dependencies><build><sourceDirectory>src/main/scala</sourceDirectory><testSourceDirectory>src/test/scala</testSourceDirectory><plugins><plugin><!-- see http://davidb.github.com/scala-maven-plugin --><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.0</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals><configuration><args><!--arg>-make:transitive</arg--><arg>-dependencyfile</arg><arg>${project.build.directory}/.scala_dependencies</arg></args></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-surefire-plugin</artifactId><version>2.18.1</version><configuration><useFile>false</useFile><disableXmlReport>true</disableXmlReport><!-- If you have classpath issue like NoDefClassError,... --><!-- useManifestOnlyJar>false</useManifestOnlyJar --><includes><include>**/*Test.*</include><include>**/*Suite.*</include></includes></configuration></plugin></plugins></build></project>

flink boardCast--广播变量相关推荐

  1. flink的广播、累加、缓存

    flink的广播.累加器.分布式缓存 Flink的广播变量 Flink支持广播.可以将数据广播到TaskManager上,数据存储到内存中.数据存储在内存中,这样可以减缓大量的 shuwle操作:比如 ...

  2. Flink 广播变量

    广播变量简介 在Flink中,同一个算子可能存在若干个不同的并行实例,计算过程可能不在同一个Slot中进行,不同算子之间更是如此,因此不同算子的计算数据之间不能像Java数组之间一样互相访问,而广播变 ...

  3. Flink的累加器和广播变量、广播流、分布式缓存

    1.Accumulator累加器  Accumulator即累加器,与Mapreduce counter的应用场景差不多,都能很好地观察task在运行期间的数据变化.可以在Flink job任务中的算 ...

  4. 初识Flink广播变量broadcast

    Broadcast 广播变量:可以理解为是一个公共的共享变量,我们可以把一个dataset 或者不变的缓存对象(例如map list集合对象等)数据集广播出去,然后不同的任务在节点上都能够获取到,并在 ...

  5. Spark中广播变量(boardcast)的使用

    1 使用广播变量的使用步骤 (1)声明广播变量 val board = sc.boardcast(XX) (2)使用 .value 获得广播变量的值 (3)变量只会被发到各个节点一次,且作为" ...

  6. Flink实操 : 广播变量/累加器/分布式缓存

    . 一 .前言 二 .广播变量使用 2.1.前言 2.2. 使用 三 .累加器 3.1. 前言 3.2. 使用 四 .分布式缓存 4.1. 前言 4.2.使用 一 .前言 二 .广播变量使用 2.1. ...

  7. Spark广播变量与累加器

    在之前的文章中,我介绍了flink广播状态,从而了解了flink广播状态实际上就是将一个流广播到下游所有算子之中.在本文中我将介绍spark中类似的概念,为了方便理解,先放张spark应用程序架构图. ...

  8. Broadcast Variables广播变量

    Flink Broadcast Variables: Broadcast variables允许你创建一个数据集在所有的并行操作节点都能获取到,除了常规的输入操作.针对一些小的依赖数据集,这种方式是非 ...

  9. spark 获取广播变量_Spark流式程序中广播变量和累加器为何使用单例模式

    Spark中广播变量详解以及如何动态更新广播变量​mp.weixin.qq.com 1.广播变量是只读的,使用单例模式可以减少Spark流式程序中每次job生成执行,频繁创建广播变量带来的开销 2. ...

最新文章

  1. 互联网项目开始时需要去谈的产品需求分析:
  2. Python装饰器(一)
  3. angular复习笔记4-模板
  4. PCL学习(三) SAC-IA 估记object pose
  5. Ubuntu中配置虚拟专用网络***
  6. npoi 未将对象引用设置到对象的实例_带你探索JVM的对象
  7. 大气压力换算公式_压力公式换算
  8. 管家婆软件显示服务器连接失败,管家婆软件提示“连接服务器失败”怎么办
  9. python mro_python之理解super及MRO列表
  10. android开发 抽屉,Android自带抽屉布局及NavigationView的使用
  11. 【目标检测】57、Dual Weighting Label Assignment | 专为目标检测设计的双权重标签分配
  12. WEB安全漏洞扫描与处理(下)——安全报告分析和漏洞处理
  13. APP登录功能的Java服务端开发
  14. ip6plus电池测试软件,超雪维修教程(七):iphone6/6 plus更换电池详细图文步骤
  15. (困难)SQL练习25:获取员工其当前的薪水比其manager当前薪水还高的相关信息
  16. 白鹭引擎的微信分享接口
  17. python培训学费多少钱-python培训学费多少钱
  18. 获取联系人的电话号码
  19. [转载] 罗辑思维的「成功」之道
  20. 苹果手机(iPhone)添加outlook邮箱设置

热门文章

  1. c# MODBUS协议源码 上/下位机源码烧写Flash工具
  2. BigDecimal 往左移动两位小数_移动信号灯
  3. Android 抛弃原生MediaPlayer 使用音频框架 StarrySky
  4. 数据库基础知识:概念篇
  5. 王宇阳:六个案例里的SEO启发
  6. windows和linux共用蓝牙鼠标,双系统共用蓝牙键鼠(win10+macOS)
  7. 化痰止咳平喘药题库【1】
  8. matlab 实现信号的微分和积分
  9. Ubuntu下wps英文界面切换成中文界面的方法
  10. 今日头条文章爬虫实战