点击上方“zhisheng”,选择“设为星标”

后台回复”ffa“可以查看 Flink 资料

前言

在实时计算作业中,往往需要动态改变一些配置,举几个栗子:

  • 实时日志ETL服务,需要在日志的格式、字段发生变化时保证正常解析;

  • 实时NLP服务,需要及时识别新添加的领域词与停用词;

  • 实时风控服务,需要根据业务情况调整触发警告的规则。

那么问题来了:配置每次变化都得手动修改代码,再重启作业吗?答案显然是否定的,毕竟实时任务的终极目标就是7 x 24无间断运行。Spark Streaming和Flink的广播机制都能做到这点,本文分别来简单说明一下。

Spark Streaming的场合

Spark Core内部的广播机制: 广播变量(broadcast variable)的设计初衷是简单地作为只读缓存,在Driver与Executor间共享数据,Spark文档中的原话如下:

Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a large input dataset in an efficient manner.

也就是说原生并未支持广播变量的更新,所以我们得自己稍微hack一下。直接贴代码吧。

public class BroadcastStringPeriodicUpdater {  private static final int PERIOD = 60 * 1000;  private static volatile BroadcastStringPeriodicUpdater instance;  private Broadcast<String> broadcast;  private long lastUpdate = 0L;  private BroadcastStringPeriodicUpdater() {}  public static BroadcastStringPeriodicUpdater getInstance() {    if (instance == null) {      synchronized (BroadcastStringPeriodicUpdater.class) {        if (instance == null) {          instance = new BroadcastStringPeriodicUpdater();        }      }    }    return instance;  }  public String updateAndGet(SparkContext sc) {    long now = System.currentTimeMillis();    long offset = now - lastUpdate;    if (offset > PERIOD || broadcast == null) {      if (broadcast != null) {        broadcast.unpersist();      }      lastUpdate = now;      String value = fetchBroadcastValue();      broadcast = JavaSparkContext.fromSparkContext(sc).broadcast(value);    }    return broadcast.getValue();  }  private String fetchBroadcastValue() {  }}

这段代码将字符串型广播变量的更新包装成了一个单例类,更新周期是60秒。在Streaming主程序中,就可以这样使用了:

  dStream.transform(rdd -> {String broadcastValue = BroadcastStringPeriodicUpdater.getInstance().updateAndGet(rdd.context());    rdd.mapPartitions(records -> {});  });

这种方法基本上解决了问题,但不是十全十美的,因为广播数据的更新始终是周期性的,并且周期不能太短(得考虑外部存储的压力),从根本上讲还是受Spark Streaming微批次的设计理念限制的。接下来看看Flink是怎样做的。

Flink的场合

Flink中也有与Spark类似的广播变量,用法也几乎相同。但是Flink在1.5版本引入了更加灵活的广播状态(broadcast state),可以视为operator state的一种特殊情况。它能够将一个流中的数据(通常是较少量的数据)广播到下游算子的所有并发实例中,实现真正的低延迟动态更新。

下图来自Data Artisans(被阿里收购了的Flink母公司)的PPT,其中流A是普通的数据流,流B就是含有配置信息的广播流(broadcast stream),也可以叫控制流(control stream)。流A的数据按照keyBy()算子的规则发往下游,而流B的数据会广播,最后再将这两个流的数据连接到一起进行处理。

既然它的名字叫“广播状态”,那么就一定要有与它对应的状态描述符StateDescriptor。Flink直接使用了MapStateDescriptor作为广播的状态描述符,方便存储多种不同的广播数据。示例:

    MapStateDescriptor<String, String> broadcastStateDesc = new MapStateDescriptor<>(      "broadcast-state-desc",      String.class,      String.class    );

接下来在控制流controlStream上调用broadcast()方法,将它转换成广播流BroadcastStream。controlStream的产生方法与正常数据流没什么不同,一般是从消息队列的某个特定topic读取。

BroadcastStream<String> broadcastStream = controlStream  .setParallelism(1)  .broadcast(broadcastStateDesc);

然后在DataStream上调用connect()方法,将它与广播流连接起来,生成BroadcastConnectedStream。

BroadcastConnectedStream<String, String> connectedStream = sourceStream.connect(broadcastStream);
最后就要调用process()方法对连接起来的流进行处理了。如果DataStream是一个普通的流,
需要定义BroadcastProcessFunction,反之,如果该DataStream是一个KeyedStream,
就需要定义KeyedBroadcastProcessFunction。
并且与之前我们常见的ProcessFunction不同的是,它们都多了一个专门处理广播数据的方法
processBroadcastElement()。类图如下所示。

下面给出一个说明性的代码示例。

    connectedStream.process(new BroadcastProcessFunction<String, String, String>() {private static final long serialVersionUID = 1L;@Overridepublic void processElement(String value, ReadOnlyContext ctx, Collector<String> out) throws Exception {ReadOnlyBroadcastState<String, String> state = ctx.getBroadcastState(broadcastStateDesc);for (Entry<String, String> entry : state.immutableEntries()) {String bKey = entry.getKey();String bValue = entry.getValue();// 根据广播数据进行原数据流的各种处理}out.collect(value);}@Overridepublic void processBroadcastElement(String value, Context ctx, Collector<String> out) throws Exception {BroadcastState<String, String> state = ctx.getBroadcastState(broadcastStateDesc);// 如果需要的话,对广播数据进行转换,最后写入状态state.put("some_key", value);}});

可见,BroadcastProcessFunction的行为与RichCoFlatMapFunction、CoProcessFunction非常相像。其基本思路是processBroadcastElement()方法从广播流中获取数据,进行必要的转换之后将其以键值对形式写入BroadcastState。而processElement()方法从BroadcastState获取广播数据,再将其与原流中的数据结合处理。也就是说,BroadcastState起到了两个流之间的桥梁作用。

最后还有一点需要注意,processElement()方法获取的Context实例是ReadOnlyContext,说明只有在广播流一侧才能修改BroadcastState,而数据流一侧只能读取BroadcastState。这提供了非常重要的一致性保证:假如数据流一侧也能修改BroadcastState的话,不同的operator实例有可能产生截然不同的结果,对下游处理造成困扰。

来源:jianshu/p/97dae75c266c

作者:LittleMagic

如果觉得文章对你有帮助,请转发朋友圈、点在看,让更多人获益,感谢您的支持!

END

关注我

公众号(zhisheng)里回复 面经、ES、Flink、 Spring、Java、Kafka、监控 等关键字可以查看更多关键字对应的文章。

Flink 实战

1、《从0到1学习Flink》—— Apache Flink 介绍
2、《从0到1学习Flink》—— Mac 上搭建 Flink 1.6.0 环境并构建运行简单程序入门
3、《从0到1学习Flink》—— Flink 配置文件详解
4、《从0到1学习Flink》—— Data Source 介绍
5、《从0到1学习Flink》—— 如何自定义 Data Source ?
6、《从0到1学习Flink》—— Data Sink 介绍
7、《从0到1学习Flink》—— 如何自定义 Data Sink ?
8、《从0到1学习Flink》—— Flink Data transformation(转换)
9、《从0到1学习Flink》—— 介绍 Flink 中的 Stream Windows
10、《从0到1学习Flink》—— Flink 中的几种 Time 详解
11、《从0到1学习Flink》—— Flink 读取 Kafka 数据写入到 ElasticSearch
12、《从0到1学习Flink》—— Flink 项目如何运行?
13、《从0到1学习Flink》—— Flink 读取 Kafka 数据写入到 Kafka
14、《从0到1学习Flink》—— Flink JobManager 高可用性配置
15、《从0到1学习Flink》—— Flink parallelism 和 Slot 介绍
16、《从0到1学习Flink》—— Flink 读取 Kafka 数据批量写入到 MySQL
17、《从0到1学习Flink》—— Flink 读取 Kafka 数据写入到 RabbitMQ
18、《从0到1学习Flink》—— 你上传的 jar 包藏到哪里去了
19、大数据“重磅炸弹”——实时计算框架 Flink
20、《Flink 源码解析》—— 源码编译运行
21、为什么说流处理即未来?
22、OPPO数据中台之基石:基于Flink SQL构建实时数据仓库
23、流计算框架 Flink 与 Storm 的性能对比
24、Flink状态管理和容错机制介绍
25、原理解析 | Apache Flink 结合 Kafka 构建端到端的 Exactly-Once 处理
26、Apache Flink 是如何管理好内存的?
27、《从0到1学习Flink》——Flink 中这样管理配置,你知道?
28、《从0到1学习Flink》——Flink 不可以连续 Split(分流)?
29、Flink 从0到1学习—— 分享四本 Flink 的书和二十多篇 Paper 论文
30、360深度实践:Flink与Storm协议级对比
31、Apache Flink 1.9 重大特性提前解读
32、如何基于Flink+TensorFlow打造实时智能异常检测平台?只看这一篇就够了
33、美团点评基于 Flink 的实时数仓建设实践
34、Flink 灵魂两百问,这谁顶得住?
35、一文搞懂 Flink 的 Exactly Once 和 At Least Once
36、你公司到底需不需要引入实时计算引擎?
37、Flink 从0到1学习 —— 如何使用 Side Output 来分流?
38、一文让你彻底了解大数据实时计算引擎 Flink
39、基于 Flink 实现的商品实时推荐系统(附源码)
40、如何使用 Flink 每天实时处理百亿条日志?
41、Flink 在趣头条的应用与实践
42、Flink Connector 深度解析
43、滴滴实时计算发展之路及平台架构实践
44、Flink Back Pressure(背压)是怎么实现的?有什么绝妙之处?
45、Flink 实战 | 贝壳找房基于Flink的实时平台建设
46、如何使用 Kubernetes 部署 Flink 应用
47、一文彻底搞懂 Flink 网络流控与反压机制
48、Flink中资源管理机制解读与展望
49、Flink 实时写入数据到 ElasticSearch 性能调优
50、深入理解 Flink 容错机制
51、吐血之作 | 流系统Spark/Flink/Kafka/DataFlow端到端一致性实现对比

Flink 源码解析

知识星球里面可以看到下面文章

文章不错?点个【在看】吧! ????


http://www.taodudu.cc/news/show-5579862.html

相关文章:

  • 修改4.1-5.0 5.0-6.0 6.0及以上状态栏文字图标的颜色设置(不包括小米,魅族,oppo,后续会更新)
  • 安全公司发现38款手机预装恶意软件,三星、小米、OPPO等悉数在列
  • OPPO百万级高并发MongoDB集群性能数十倍提升优化实践
  • 记录一次java集成华为,oppo,vivo推送的经历
  • 分析手机所拍照片中间会发红的原因
  • EXCEL 编写公式后再编辑单元格格式会自动变成文本解决方法
  • Excel:让表格中所有数据进行一次加/减/乘/除/求余
  • Python办公自动化之 openpyxl 操作 Excel
  • python 自动填excel_Python读写Excel自动填表
  • Java数据库篇_01 数据库设计基础(华为云学习笔记)
  • 华为机考Java版
  • 4.使用Office Open XML SDK访问Excel2007数据表
  • XML通用操作类二
  • Spring学习3 - Bean的属性、DI依赖注入三种方式
  • 如何在Google文档中创建连字符,连字符和Em连字符
  • 女程序员的键盘,你一定没见过!
  • 电脑键盘快捷键自定义_如何自定义OS X键盘和添加快捷方式
  • SQL sever基础代码
  • ffmpeg去除视频水印之计算水印位置 —— 筑梦之路
  • Autodesk 3ds max 2017 无法保存导出fbx的预设(FBX export does not save presets - Autodesk)
  • 3D美术 18——max中运用运动捕捉,解决导入BIP时所有骨骼变成塌陷针(黑帧)的问题
  • Autodesk 3ds Max 2017 中文正确注册的姿势
  • 日志mysql报错100_Mysql10055 | 林熊熊的小站
  • 解决socket交互的10048和10055错误的总结
  • activeMQ的consumer添加MessageListener运行没效果
  • springboot表单提交不支持put,delete
  • rocketmq 提示不支持sql
  • Failed to download metadata for repo ‘docker-ce-stable‘: Cannot download repomd.xml
  • Jenkins+Gitee 配合实现持续集成踩坑点
  • SpringBoot多模块项目初始化搭建

Spark/Flink广播实现作业配置动态更新相关推荐

  1. Flink从入门到精通100篇(十七)-Spark/Flink广播如何实现作业配置动态更新?

    前言 在实时计算作业中,往往需要动态改变一些配置,举几个栗子: 实时日志ETL服务,需要在日志的格式.字段发生变化时保证正常解析: 实时NLP服务,需要及时识别新添加的领域词与停用词: 实时风控服务, ...

  2. Spring Boot配置动态更新

    解释 配置动态更新在本文中指当更改应用的配置项后,无需要重启应用新配置即可生效. 概述 配置动态更新是应用的一种通用性需求,很实现 的方式有很多种,如监听配置文件内容变化.使用配置中心等等.Sprin ...

  3. 【flink】Flink-Cep实现规则动态更新

    1.概述 我们是用processfunction实现的cep动态更新,然后看到这个是原生api感觉有趣,研究一下 原文:https://mp.weixin.qq.com/s/mh–wQvAWQq2tD ...

  4. springcloud配置动态更新

    在实现springcloud的配置中心后,我们需要考虑的就是动态刷新配置.如果考虑只有一个客户端,我们可以在配置文件发生push操作的时候添加webhook,使用webhook发送刷新的post请求到 ...

  5. zookeeper 网关_多图,5000 字分享,API 网关如何实现配置动态更新?

    前言 网关是流量请求的入口,在微服务架构中承担了非常重要的角色,网关高可用的重要性不言而喻.在使用网关的过程中,为了满足业务诉求,经常需要变更配置,比如流控规则.路由规则等等.因此,网关动态配置是保障 ...

  6. ribbon 配置 动态更新_Netflix开源工具:在SpringBoot实现动态路由

    前言 假设你有一个服务A,要调用服务B(有三个实例,B1.B2.B3),如何只调用其中的B1和B2,屏蔽掉B3?实际上解决方法大致分为两类. 一种是外部路由,就是通过网关等组件,在请求链路上进行路由选 ...

  7. ribbon 配置 动态更新_SpringCloud实战三-Ribbon

    负载均衡 是一种计算机技术,用来在多个计算机(计算机集群).网络连接.CPU.磁盘驱动器或其他资源中分配负载,以达到最优化资源使用.最大化吞吐率.最小化响应时间.同时避免过载的目的. 传统项目,都使用 ...

  8. 机顶盒ttl无法输入_一个作业,多个TTL——Flink SQL 细粒度TTL配置的实现(二)

    ​在系列文前篇<FlinkSQL细粒度TTL配置的实现(一)>中,我们介绍了实现Flink SQL 细粒度TTL配置的基本原理:通过将原来一段SQL按照TTL的不同拆分为多段子SQL,然后 ...

  9. 【Spark】SparkStreaming-流处理-规则动态更新-解决方案

    SparkStreaming-流处理-规则动态更新-解决方案 image2017-10-27_11-10-53.png (1067×738)elasticsearch-headElasticsearc ...

最新文章

  1. 面试彩蛋1:斐波那契数列用递归函数、循环函数实现
  2. 特殊字符、Date、JS应用
  3. SAP物料主数据创建时间和创建个数的函数关系
  4. Ubuntu 16.04 安装mysql5.7
  5. 关闭子窗口 父窗口自动刷新
  6. opengl png图片 qt_Qt资源文件的格式,并用CMake添加Qt资源文件
  7. vue ui框架_Vue移动端UI框架指南
  8. Python Day47索引
  9. 如何在cocoapods中使用更新版本的pod
  10. 备份文件时,添加时间戳
  11. MSDE 下载安装、创建管理数据库
  12. python爬数据实例_Python实例教程爬虫爬取NBA数据功能示例
  13. 判断两个单词是否互为变位词,如“book”,“koob”,代码如下
  14. 4种“附近的人”实现方式
  15. qt项目在Linux平台上面发布成可执行程序.run
  16. 如何排版 微信公众号「代码块」之 MarkEditor
  17. 蓝桥杯 2014-5 圆周率
  18. 计算机软件蒋勇,西南科技大学考研研究生导师简介-蒋勇
  19. linux(以ubuntu为例)下Android利用ant自动编译、修改配置文件、批量多渠道,打包生...
  20. Java多线程案例——线程池

热门文章

  1. dota2地形皮肤制作教程
  2. 如何解决异步回调地狱
  3. byte byt[] = new byte[1024]的含义
  4. 企业做3A有什么用?
  5. [贝聊科技]如何将 iOS 项目的编译速度提高5倍
  6. Java调用long的最大值和最小值
  7. 计算机音乐数字乐谱生僻字,生僻字钢琴简谱-数字双手-陈柯宇
  8. idea全局搜索class文件或者字符串
  9. 愤怒的小鸟4只编外鸟_愤怒的小鸟思黛拉四种小鸟攻略 技能详细解析
  10. 将属性值导出为 AutoCAD 块属性