.

  • 一 .前言
  • 二 .广播变量使用
    • 2.1.前言
    • 2.2. 使用
  • 三 .累加器
    • 3.1. 前言
    • 3.2. 使用
  • 四 .分布式缓存
    • 4.1. 前言
    • 4.2.使用

一 .前言

二 .广播变量使用

2.1.前言

Flink支持广播。可以将数据广播到TaskManager上,数据存储到内存中。

数据存储在内存中,这样可以减缓大量的shuffle操作;比如在数据join阶段,不可避免的就是大量的shuffle操作,我们可以把其中一个dataStream广播出去,一直加载到taskManager的内存中,可以直接在内存中拿数据,避免了大量的shuffle,导致集群性能下降;

广播变量创建后,它可以运行在集群中的任何function上,而不需要多次传递给集群节点。
另外需要记住,不应该修改广播变量,这样才能确保每个节点获取到的值都是一致的。

  • 可以理解广播就是一个公共的共享变量
  • 将一个数据集广播后,不同的Task都可以在节点上获取到
  • 每个节点只存一份
  • 如果不使用广播,每一个Task都会拷贝一份数据集,造成内存资源浪费

2.2. 使用

  • 在需要使用广播的操作后,使用withBroadcastSet创建广播
  • 在操作中,使用getRuntimeContext.getBroadcastVariable[广播数据类型](广播名)获取广播变量
package com.boyi.broadcastimport org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configurationobject BroadCastDemo {def main(args: Array[String]): Unit = {// 1. 获取`ExecutionEnvironment`运行环境val env = ExecutionEnvironment.getExecutionEnvironment// 1. 分别创建两个数据集val studentDataSet: DataSet[(Int, String)] = env.fromCollection(List((1, "张三"), (2, "李四"), (3, "王五")))val scoreDataSet: DataSet[(Int, String, Int)] = env.fromCollection(List((1, "语文", 50), (2, "数学", 70), (3, "英文", 86)))// 1. 使用`RichMapFunction`对`成绩`数据集进行map转换// 将成绩数据(学生ID,学科,成绩) -> (学生姓名,学科,成绩)val resultDataSet: DataSet[(String, String, Int)] = scoreDataSet.map(new RichMapFunction[(Int, String, Int), (String, String, Int)] {var bc_studentList: List[(Int, String)] = null// - 重写`open`方法中,获取广播数据override def open(parameters: Configuration): Unit = {import scala.collection.JavaConverters._bc_studentList = getRuntimeContext.getBroadcastVariable[(Int, String)]("bc_student").asScala.toList}//   - 在`map`方法中使用广播进行转换override def map(value: (Int, String, Int)): (String, String, Int) = {// 获取学生IDval studentId: Int = value._1// 过滤出和学生ID相同的内容val tuples: List[(Int, String)] = bc_studentList.filter((x: (Int, String)) => x._1 == studentId)// 构建元组(tuples(0)._2,value._2,value._3)}}).withBroadcastSet(studentDataSet, "bc_student")// 3. 打印测试resultDataSet.print()}}

三 .累加器

3.1. 前言

Accumulator 即累加器,与 MapReduce counter 的应用场景差不多,都能很好地观察task在运行期间的数据变化
可以在Flink job任务中的算子函数中操作累加器,但是只能在任务执行结束之后才能获得累加器的最终结果。

Flink现在有以下内置累加器。每个累加器都实现了Accumulator接口。

  • IntCounter
  • LongCounter
  • DoubleCounter

3.2. 使用

遍历下列数据, 打印出单词的总数

"a","b","c","d"

开发步骤:

  1. 获取批处理环境
  2. 加载本地集合
  3. map转换
    1. 定义累加器
    2. 注册累加器
    3. 累加数据
  4. 数据写入到文件中
  5. 执行任务,获取任务执行结果对象(JobExecutionResult)
  6. 获取累加器数值
  7. 打印数值
package com.boyi.broadcastimport org.apache.flink.api.common.accumulators.IntCounter
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration/*** counter 累加器*/
object BatchDemoCounter {def main(args: Array[String]): Unit = {//获取执行环境val env = ExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.api.scala._val data = env.fromElements("a","b","c","d")val res = data.map(new RichMapFunction[String,String] {//1:定义累加器val numLines = new IntCounteroverride def open(parameters: Configuration): Unit = {super.open(parameters)//2:注册累加器getRuntimeContext.addAccumulator("num-lines",this.numLines)}var sum = 0;override def map(value: String) = {//如果并行度为1,使用普通的累加求和即可,但是设置多个并行度,则普通的累加求和结果就不准了sum += 1;System.out.println("sum:"+sum);this.numLines.add(1)value}}).setParallelism(1)res.writeAsText("/opt/a/tmp/BatchDemoCounter")val jobResult = env.execute("BatchDemoCounterScala")//    //3:获取累加器val num = jobResult.getAccumulatorResult[Int]("num-lines")println("num:"+num)}
}

四 .分布式缓存

4.1. 前言

Flink提供了一个类似于Hadoop的分布式缓存,让并行运行实例的函数可以在本地访问。
这个功能可以被使用来分享外部静态的数据.

缓存的使用流程:

使用ExecutionEnvironment实例对本地的或者远程的文件(例如:HDFS上的文件),为缓存文件指定一个名字注册该缓存文件。当程序执行时候,Flink会自动将复制文件或者目录到所有worker节点的本地文件系统中,函数可以根据名字去该节点的本地文件系统中检索该文件!

注意:广播是将变量分发到各个worker节点的内存上,分布式缓存是将文件缓存到各个worker节点上

4.2.使用

遍历下列数据, 并在open方法中获取缓存的文件

a,b,c,d

import org.apache.commons.io.FileUtils
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration/*** 分布式缓存*/
object BatchDemoDisCache {def main(args: Array[String]): Unit = {//获取执行环境val env = ExecutionEnvironment.getExecutionEnvironment//隐式转换import org.apache.flink.api.scala._//1:注册文件env.registerCachedFile("/opt/a/tmp/BatchDemoDisCache.txt","BatchDemoDisCache.txt")//读取数据val data = env.fromElements("a","b","c","d")val result = data.map(new RichMapFunction[String,String] {override def open(parameters: Configuration): Unit = {super.open(parameters)//访问数据val myFile = getRuntimeContext.getDistributedCache.getFile("BatchDemoDisCache.txt")val lines = FileUtils.readLines(myFile)val it = lines.iterator()while (it.hasNext){val line = it.next();println("line:"+line)}}override def map(value: String) = {value}})result.print()}
}

地址: https://github.com/BoYiZhang/flink-demo

Flink实操 : 广播变量/累加器/分布式缓存相关推荐

  1. Flink的累加器和广播变量、广播流、分布式缓存

    1.Accumulator累加器  Accumulator即累加器,与Mapreduce counter的应用场景差不多,都能很好地观察task在运行期间的数据变化.可以在Flink job任务中的算 ...

  2. 实操|特征变量多重共线性的分析与检验(含代码)

    对于Linear回归.Logistic回归等线性模型来讲,特征变量的多重共线性是衡量模型性能的一个重要维度.因此,如何有效识别并解决模型特征的多重共线性问题,是实际业务场景建立线性模型过程的必要环节. ...

  3. Flink实操 : 状态管理

    . 一 .概念 1.1. 什么是有状态的计算? 1.2. 传统的流计算系统缺少对于程序状态的有效支持 1.3. Flink丰富的状态访问和高效的容错机制 二 .Keyed State 2.1.保存st ...

  4. Flink实操 : DataSource操作

    . 一 .前言 二 .四种读取类型 2.1. 基于本地集合的source(Collection-based-source) 2.2. 基于文件的source(File-based-source) 2. ...

  5. Flink实操 : 算子操作

    . 一 .前言 二 .算子操作 2.1. map 2.2. flatMap 2.3. mapPartition 2.4. filter 2.5. reduce/groupBy 2.6. reduceG ...

  6. Flink实操 : Sink操作

    . 一 .前言 二 .类型 2.1. 基于本地集合的sink 2.2. 基于文件的sink 2.2.1.将数据写入本地文件 2.2.2.将数据写入HDFS 2.3. Kafka Sink 2.4. M ...

  7. flink的广播、累加、缓存

    flink的广播.累加器.分布式缓存 Flink的广播变量 Flink支持广播.可以将数据广播到TaskManager上,数据存储到内存中.数据存储在内存中,这样可以减缓大量的 shuwle操作:比如 ...

  8. Spark编程指引(四)----共享变量(广播变量和累加器)

    转自:http://blog.csdn.net/happyanger6/article/details/46576831 共享变量 通常情况下,当向Spark操作(如map,reduce)传递一个函数 ...

  9. SparkCore:RDD累加器和广播变量(最详细的介绍)!!!!!!

    RDD累加器和广播变量 在默认情况下,当Spark在集群的多个不同节点的多个任务上并行运行一个函数时,在每个任务上都生成一个副本.但是,有时候需要在多个任务之间共享变量,或者在任务(Task)和任务控 ...

最新文章

  1. ios开发两个简单的错误提示和原因
  2. C#中判断某软件是否已安装
  3. Java并发程序设计(八)设计模式与并发之单例模式
  4. 字符串的回文子序列个数_计算给定字符串中回文子序列的数量
  5. Java主要处理哪些类型的异常_Java技术高效处理异常有哪些呢?
  6. php 生成图片 打印,php 生成水印图片
  7. 前端 学习笔记day47 其他标签
  8. python集合和序列解包
  9. 基于ArcGIS模型构建器工具的土地利用现状重分类流程及常见问题
  10. 如何用Python画一只机器猫?
  11. 容联七陌云客服通话超强稳定,今通国际客户服务更加便捷
  12. xx-Pixiv Spider
  13. 程序员职业技能编写_程序员不需要的不需要编写代码的技能
  14. mos管的rc吸收电路计算_RC吸收电路
  15. 关于服务器端和客户端的区别
  16. python confluent_kafka 关于消费者消费时间过长,导致的leave group
  17. ngrock内网穿透(Ngrok 和 Sunny-Ngrok )
  18. 折线图 和如何在图上写字
  19. Mac最佳视频编辑器推荐
  20. 在Wifi网络中嗅探明文密码(HTTP POST请求、POP等)

热门文章

  1. 华为云CDN加速,让你告别网速慢的烦恼
  2. matlab画一个放大图中图
  3. Elasticsearch集成(二)
  4. 利用pandas对在链家网爬取的租房数据进行清洗
  5. 搭建Android上的服务器
  6. 树莓派 USB麦克风 录音
  7. 企业服务总线Enterprise service bus介绍
  8. 不起眼却有大作用的 .NET功能集(转发)
  9. HttpReques
  10. XML中PCDATA与CDATA的区别