Flink实操 : 广播变量/累加器/分布式缓存
.
- 一 .前言
- 二 .广播变量使用
- 2.1.前言
- 2.2. 使用
- 三 .累加器
- 3.1. 前言
- 3.2. 使用
- 四 .分布式缓存
- 4.1. 前言
- 4.2.使用
一 .前言
二 .广播变量使用
2.1.前言
Flink支持广播。可以将数据广播到TaskManager上,数据存储到内存中。
数据存储在内存中,这样可以减缓大量的shuffle操作;比如在数据join阶段,不可避免的就是大量的shuffle操作,我们可以把其中一个dataStream广播出去,一直加载到taskManager的内存中,可以直接在内存中拿数据,避免了大量的shuffle,导致集群性能下降;
广播变量创建后,它可以运行在集群中的任何function上,而不需要多次传递给集群节点。
另外需要记住,不应该修改广播变量,这样才能确保每个节点获取到的值都是一致的。
- 可以理解广播就是一个公共的共享变量
- 将一个数据集广播后,不同的Task都可以在节点上获取到
- 每个节点
只存一份
- 如果不使用广播,每一个Task都会拷贝一份数据集,造成内存资源浪费
2.2. 使用
- 在需要使用广播的操作后,使用
withBroadcastSet
创建广播 - 在操作中,使用getRuntimeContext.getBroadcastVariable
[广播数据类型]
(广播名
)获取广播变量
package com.boyi.broadcastimport org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configurationobject BroadCastDemo {def main(args: Array[String]): Unit = {// 1. 获取`ExecutionEnvironment`运行环境val env = ExecutionEnvironment.getExecutionEnvironment// 1. 分别创建两个数据集val studentDataSet: DataSet[(Int, String)] = env.fromCollection(List((1, "张三"), (2, "李四"), (3, "王五")))val scoreDataSet: DataSet[(Int, String, Int)] = env.fromCollection(List((1, "语文", 50), (2, "数学", 70), (3, "英文", 86)))// 1. 使用`RichMapFunction`对`成绩`数据集进行map转换// 将成绩数据(学生ID,学科,成绩) -> (学生姓名,学科,成绩)val resultDataSet: DataSet[(String, String, Int)] = scoreDataSet.map(new RichMapFunction[(Int, String, Int), (String, String, Int)] {var bc_studentList: List[(Int, String)] = null// - 重写`open`方法中,获取广播数据override def open(parameters: Configuration): Unit = {import scala.collection.JavaConverters._bc_studentList = getRuntimeContext.getBroadcastVariable[(Int, String)]("bc_student").asScala.toList}// - 在`map`方法中使用广播进行转换override def map(value: (Int, String, Int)): (String, String, Int) = {// 获取学生IDval studentId: Int = value._1// 过滤出和学生ID相同的内容val tuples: List[(Int, String)] = bc_studentList.filter((x: (Int, String)) => x._1 == studentId)// 构建元组(tuples(0)._2,value._2,value._3)}}).withBroadcastSet(studentDataSet, "bc_student")// 3. 打印测试resultDataSet.print()}}
三 .累加器
3.1. 前言
Accumulator 即累加器,与 MapReduce counter 的应用场景差不多,都能很好地观察task在运行期间的数据变化
可以在Flink job任务中的算子函数中操作累加器,但是只能在任务执行结束之后才能获得累加器的最终结果。
Flink现在有以下内置累加器。每个累加器都实现了Accumulator接口。
- IntCounter
- LongCounter
- DoubleCounter
3.2. 使用
遍历下列数据, 打印出单词的总数
"a","b","c","d"
开发步骤:
- 获取批处理环境
- 加载本地集合
- map转换
- 定义累加器
- 注册累加器
- 累加数据
- 数据写入到文件中
- 执行任务,获取任务执行结果对象(JobExecutionResult)
- 获取累加器数值
- 打印数值
package com.boyi.broadcastimport org.apache.flink.api.common.accumulators.IntCounter
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration/*** counter 累加器*/
object BatchDemoCounter {def main(args: Array[String]): Unit = {//获取执行环境val env = ExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.api.scala._val data = env.fromElements("a","b","c","d")val res = data.map(new RichMapFunction[String,String] {//1:定义累加器val numLines = new IntCounteroverride def open(parameters: Configuration): Unit = {super.open(parameters)//2:注册累加器getRuntimeContext.addAccumulator("num-lines",this.numLines)}var sum = 0;override def map(value: String) = {//如果并行度为1,使用普通的累加求和即可,但是设置多个并行度,则普通的累加求和结果就不准了sum += 1;System.out.println("sum:"+sum);this.numLines.add(1)value}}).setParallelism(1)res.writeAsText("/opt/a/tmp/BatchDemoCounter")val jobResult = env.execute("BatchDemoCounterScala")// //3:获取累加器val num = jobResult.getAccumulatorResult[Int]("num-lines")println("num:"+num)}
}
四 .分布式缓存
4.1. 前言
Flink提供了一个类似于Hadoop的分布式缓存,让并行运行实例的函数可以在本地访问。
这个功能可以被使用来分享外部静态的数据.
缓存的使用流程:
使用ExecutionEnvironment实例对本地的或者远程的文件(例如:HDFS上的文件),为缓存文件指定一个名字注册该缓存文件。当程序执行时候,Flink会自动将复制文件或者目录到所有worker节点的本地文件系统中,函数可以根据名字去该节点的本地文件系统中检索该文件!
注意:广播是将变量分发到各个worker节点的内存上,分布式缓存是将文件缓存到各个worker节点上
4.2.使用
遍历下列数据, 并在open方法中获取缓存的文件
a,b,c,d
import org.apache.commons.io.FileUtils
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration/*** 分布式缓存*/
object BatchDemoDisCache {def main(args: Array[String]): Unit = {//获取执行环境val env = ExecutionEnvironment.getExecutionEnvironment//隐式转换import org.apache.flink.api.scala._//1:注册文件env.registerCachedFile("/opt/a/tmp/BatchDemoDisCache.txt","BatchDemoDisCache.txt")//读取数据val data = env.fromElements("a","b","c","d")val result = data.map(new RichMapFunction[String,String] {override def open(parameters: Configuration): Unit = {super.open(parameters)//访问数据val myFile = getRuntimeContext.getDistributedCache.getFile("BatchDemoDisCache.txt")val lines = FileUtils.readLines(myFile)val it = lines.iterator()while (it.hasNext){val line = it.next();println("line:"+line)}}override def map(value: String) = {value}})result.print()}
}
地址: https://github.com/BoYiZhang/flink-demo
Flink实操 : 广播变量/累加器/分布式缓存相关推荐
- Flink的累加器和广播变量、广播流、分布式缓存
1.Accumulator累加器 Accumulator即累加器,与Mapreduce counter的应用场景差不多,都能很好地观察task在运行期间的数据变化.可以在Flink job任务中的算 ...
- 实操|特征变量多重共线性的分析与检验(含代码)
对于Linear回归.Logistic回归等线性模型来讲,特征变量的多重共线性是衡量模型性能的一个重要维度.因此,如何有效识别并解决模型特征的多重共线性问题,是实际业务场景建立线性模型过程的必要环节. ...
- Flink实操 : 状态管理
. 一 .概念 1.1. 什么是有状态的计算? 1.2. 传统的流计算系统缺少对于程序状态的有效支持 1.3. Flink丰富的状态访问和高效的容错机制 二 .Keyed State 2.1.保存st ...
- Flink实操 : DataSource操作
. 一 .前言 二 .四种读取类型 2.1. 基于本地集合的source(Collection-based-source) 2.2. 基于文件的source(File-based-source) 2. ...
- Flink实操 : 算子操作
. 一 .前言 二 .算子操作 2.1. map 2.2. flatMap 2.3. mapPartition 2.4. filter 2.5. reduce/groupBy 2.6. reduceG ...
- Flink实操 : Sink操作
. 一 .前言 二 .类型 2.1. 基于本地集合的sink 2.2. 基于文件的sink 2.2.1.将数据写入本地文件 2.2.2.将数据写入HDFS 2.3. Kafka Sink 2.4. M ...
- flink的广播、累加、缓存
flink的广播.累加器.分布式缓存 Flink的广播变量 Flink支持广播.可以将数据广播到TaskManager上,数据存储到内存中.数据存储在内存中,这样可以减缓大量的 shuwle操作:比如 ...
- Spark编程指引(四)----共享变量(广播变量和累加器)
转自:http://blog.csdn.net/happyanger6/article/details/46576831 共享变量 通常情况下,当向Spark操作(如map,reduce)传递一个函数 ...
- SparkCore:RDD累加器和广播变量(最详细的介绍)!!!!!!
RDD累加器和广播变量 在默认情况下,当Spark在集群的多个不同节点的多个任务上并行运行一个函数时,在每个任务上都生成一个副本.但是,有时候需要在多个任务之间共享变量,或者在任务(Task)和任务控 ...
最新文章
- ios开发两个简单的错误提示和原因
- C#中判断某软件是否已安装
- Java并发程序设计(八)设计模式与并发之单例模式
- 字符串的回文子序列个数_计算给定字符串中回文子序列的数量
- Java主要处理哪些类型的异常_Java技术高效处理异常有哪些呢?
- php 生成图片 打印,php 生成水印图片
- 前端 学习笔记day47 其他标签
- python集合和序列解包
- 基于ArcGIS模型构建器工具的土地利用现状重分类流程及常见问题
- 如何用Python画一只机器猫?
- 容联七陌云客服通话超强稳定,今通国际客户服务更加便捷
- xx-Pixiv Spider
- 程序员职业技能编写_程序员不需要的不需要编写代码的技能
- mos管的rc吸收电路计算_RC吸收电路
- 关于服务器端和客户端的区别
- python confluent_kafka 关于消费者消费时间过长,导致的leave group
- ngrock内网穿透(Ngrok 和 Sunny-Ngrok )
- 折线图 和如何在图上写字
- Mac最佳视频编辑器推荐
- 在Wifi网络中嗅探明文密码(HTTP POST请求、POP等)