1.13.Flink 支持的DataType和序列化
1.13.1.Flink支持的DataType
1.13.2.Flink的序列化
1.14.Flink Broadcast & Accumulators & Counters &Distributed Cache
1.14.1.DataStreaming中的Broadcast
1.14.2.Flink Broadcast(广播变量)
1.14.3.Flink Accumulators & Counters
1.14.4.Flink Broadcast和Accumulators的区别
1.14.5.Flink Distributed Cache(分布式缓存)

1.13.Flink 支持的DataType和序列化

1.13.1.Flink支持的DataType

Java Tuple 和 Scala case class
Java POJOs:java实体类
Primitive Types
默认支持java和scala基本类型
General Class Types
默认支持大多数java和scala class
Hadoop Writables
支持hadoop中实现了org.apache.hadoop.Writable的数据类型。
Special Types
例如scala中的Either Option和Try

1.13.2.Flink的序列化

Flink自带了针对诸如int,long,String等标准类型的序列化器
针对Flink无法实现序列化的数据类型,我们可以交给Avro和Kryo
使用方法:ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
使用avro序列化:env.getConfig().enableForceAvro();
使用kryo序列化:env.getConfig().enableForceKryo();
使用自定义序列化:env.getConfig().addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass)

1.14.Flink Broadcast & Accumulators & Counters &Distributed Cache

1.14.1.DataStreaming中的Broadcast

把元素广播给所有的分区,数据会被重复处理
一、类似于storm中的allGrouping
二、dataStream.broadcast()

1.14.2.Flink Broadcast(广播变量)

广播变量允许编程人员在每台机器上保持1个只读的缓存变量,而不是传送变量的副本给tasks

广播变量创建后,它可以运行在集群中的任何function上,而不需要多次传递给集群节点。另外需要记住,不应该修改广播变量,这样才能确保每个节点获取到的值都是一致的。

一句话解释,可以理解为是一个公共的共享变量,我们可以把一个dataset数据集广播出去,然后不同的task在节点上都能够获取到,这个数据在每个节点上只会存在一份。如果不使用broadcast,则在每个节点中的每个task中都需要拷贝一份dataset数据集,比较浪费内存(也就是一个节点中可能会存在多份dataset数据)。

用法:
1:初始化数据
DataSet toBroadcast = env.fromElements(1, 2, 3)
2:广播数据
.withBroadcastSet(toBroadcast,”broadcastSetName”)
3:获取数据
Collection broadcastSet = getRuntimeContext().getBroadcastVariable(“broadcastSetName”);
注意:
1:广播出去的变量存在于每个节点的内存中,所以这个数据集不能太大。因为广播出去的数据,会常驻内存,除非程序执行结束。
2:广播变量在初始化广播出去以后不支持修改,这样才能保证每个节点的数据都是一致的。

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;/*** broadcast广播变量** 需求:* flink会从数据源中获取到用户的姓名** 最终需要把用户的姓名和年龄信息打印出来** 分析:* 所以就需要在中间的map处理的时候获取用户的年龄信息** 建议吧用户的关系数据集使用广播变量进行处理** 注意:如果多个算子需要使用同一份数据集,那么需要在对应的多个算子后面分别注册广播变量* Created by xxx.xxx on 2018/10/8*/
public class BatchDemoBroadcast {public static void main(String[] args) throws Exception{//获取运行环境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//1:准备需要广播的数据ArrayList<Tuple2<String, Integer>> broadData = new ArrayList<>();broadData.add(new Tuple2<>("zs",18));broadData.add(new Tuple2<>("ls",20));broadData.add(new Tuple2<>("ww",17));DataSet<Tuple2<String, Integer>> tupleData = env.fromCollection(broadData);//1.1:处理需要广播的数据,把数据集转换成map类型,map中的key就是用户姓名,value就是用户年龄DataSet<HashMap<String, Integer>> toBroadcast = tupleData.map(new MapFunction<Tuple2<String, Integer>, HashMap<String, Integer>>() {@Overridepublic HashMap<String, Integer> map(Tuple2<String, Integer> value) throws Exception {HashMap<String, Integer> res = new HashMap<>();res.put(value.f0, value.f1);return res;}});//源数据DataSource<String> data = env.fromElements("zs", "ls", "ww");//注意:在这里需要使用到RichMapFunction获取广播变量DataSet<String> result = data.map(new RichMapFunction<String, String>() {List<HashMap<String, Integer>> broadCastMap = new ArrayList<HashMap<String, Integer>>();HashMap<String, Integer> allMap = new HashMap<String, Integer>();/*** 这个方法只会执行一次* 可以在这里实现一些初始化的功能** 所以,就可以在open方法中获取广播变量数据*/@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);//3:获取广播数据this.broadCastMap = getRuntimeContext().getBroadcastVariable("broadCastMapName");for (HashMap map : broadCastMap) {allMap.putAll(map);}}@Overridepublic String map(String value) throws Exception {Integer age = allMap.get(value);return value + "," + age;}}).withBroadcastSet(toBroadcast, "broadCastMapName");//2:执行广播数据的操作result.print();}}
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configurationimport scala.collection.mutable.ListBuffer/*** broadcast 广播变量* Created by xxxx on 2020/10/09 on 2018/10/30.*/
object BatchDemoBroadcastScala {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.api.scala._//1: 准备需要广播的数据val broadData = ListBuffer[Tuple2[String,Int]]()broadData.append(("zs",18))broadData.append(("ls",20))broadData.append(("ww",17))//1.1处理需要广播的数据val tupleData = env.fromCollection(broadData)val toBroadcastData = tupleData.map(tup=>{Map(tup._1->tup._2)})val text = env.fromElements("zs","ls","ww")val result = text.map(new RichMapFunction[String,String] {var listData: java.util.List[Map[String,Int]] = nullvar allMap  = Map[String,Int]()override def open(parameters: Configuration): Unit = {super.open(parameters)this.listData = getRuntimeContext.getBroadcastVariable[Map[String,Int]]("broadcastMapName")val it = listData.iterator()while (it.hasNext){val next = it.next()allMap = allMap.++(next)}}override def map(value: String) = {val age = allMap.get(value).getvalue+","+age}}).withBroadcastSet(toBroadcastData,"broadcastMapName")result.print()}}

1.14.3.Flink Accumulators & Counters

Accumulator即累加器,与Mapreduce counter的应用场景差不多,都能很好地观察task在运行期间的数据变化。
可以在Flink job任务中的算子函数中操作累加器,但是只能在任务执行结束之后才能获得累加器的最终结果。
Counter是一个具体的累加器(Accumulator)实现
IntCounter, LongCounter 和 DoubleCounter
用法:
1:创建累加器
private IntCounter numLines = new IntCounter();

2:注册累加器
getRuntimeContext().addAccummulator(“num-lines”,this.numLines);

3:使用累加器
this.numLines.add(1);

4:获取累加器的结果
myJobExecutionResult.getAccumulatorResult(“num-lines”)

案例:

import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.accumulators.IntCounter;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.configuration.Configuration;/*** 全局累加器** counter 计数器** 需求:* 计算map函数中处理了多少数据** 注意:只有在任务执行结束后,才能获取到累加器的值** Created by xxx.xxx on 2018/10/8.*/
public class BatchDemoCounter {public static void main(String[] args) throws Exception{//获取运行环境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();DataSource<String> data = env.fromElements("a", "b", "c", "d", "e");DataSet<String> result = data.map(new RichMapFunction<String, String>() {//1:创建累加器private IntCounter numLines = new IntCounter();@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);//2:注册累加器getRuntimeContext().addAccumulator("num-lines",this.numLines);}//int sum = 0;@Overridepublic String map(String value) throws Exception {//如果并行度为1,使用普通的累加求和即可,但是设置多个并行度,则普通的累加求和结果就不准了//sum++;//System.out.println("sum:"+sum);this.numLines.add(1);return value;}}).setParallelism(8);//result.print();result.writeAsText("d:\\data\\count10");JobExecutionResult jobResult = env.execute("counter");//3:获取累加器int num = jobResult.getAccumulatorResult("num-lines");System.out.println("num:"+num);}}
import org.apache.flink.api.common.accumulators.IntCounter
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration/*** counter 累加器* Created by xxxx on 2020/10/09 on 2018/10/30.*/
object BatchDemoCounterScala {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.api.scala._val data = env.fromElements("a","b","c","d")val res = data.map(new RichMapFunction[String,String] {//1:定义累加器val numLines = new IntCounteroverride def open(parameters: Configuration): Unit = {super.open(parameters)//2:注册累加器getRuntimeContext.addAccumulator("num-lines",this.numLines)}override def map(value: String) = {this.numLines.add(1)value}}).setParallelism(4)res.writeAsText("d:\\data\\count21")val jobResult = env.execute("BatchDemoCounterScala")//3:获取累加器val num = jobResult.getAccumulatorResult[Int]("num-lines")println("num:"+num)}}

1.14.4.Flink Broadcast和Accumulators的区别

Broadcast(广播变量)允许程序将一个只读的变量缓存在每台机器上,而不用在任务之间传递变量。广播变量可以进行共享,但是不可以进行修改。
Accumulators(累加器)是可以在不同任务中对同一个变量进行累加操作。

1.14.5.Flink Distributed Cache(分布式缓存)

Flink提供了一个分布式缓存,类似于hadoop,可以使用户在并行函数中很方便的读取本地文件
此缓存的工作机制如下:程序注册一个文件或者目录(本地或远程文件系统,例如hdfs或者s3),通过ExecutionEnvironment注册缓存文件并为它起一个名字。当程序执行,Flink自动将文件或目录复制到所有taskmanager节点的本地文件系统,用户可以通过这个指定的名称查找文件或者目录,然后从taskmanager节点的本地文件系统访问它
用户:
1:注册一个文件
env.registerCachedFile(“hdfs:///path/to/your/file”, “hdfsFile”)
2、访问数据
File myFile = getRuntimeContext().getDistributedCache().getFile(“hdfsFile”);

案例:

import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.configuration.Configuration;import java.io.File;
import java.util.ArrayList;
import java.util.List;/*** Distributed Cache** Created by xxxx on 2020/10/09 .*/
public class BatchDemoDisCache {public static void main(String[] args) throws Exception{//获取运行环境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//1:注册一个文件,可以使用hdfs或者s3上的文件env.registerCachedFile("d:\\data\\file\\a.txt","a.txt");DataSource<String> data = env.fromElements("a", "b", "c", "d");DataSet<String> result = data.map(new RichMapFunction<String, String>() {private ArrayList<String> dataList = new ArrayList<String>();@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);//2:使用文件File myFile = getRuntimeContext().getDistributedCache().getFile("a.txt");List<String> lines = FileUtils.readLines(myFile);for (String line : lines) {this.dataList.add(line);System.out.println("line:" + line);}}@Overridepublic String map(String value) throws Exception {//在这里就可以使用dataListreturn value;}});result.print();}
}
import org.apache.commons.io.FileUtils
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration/*** Distributed Cache* Created by xxxx on 2020/10/09*/
object BatchDemoDisCacheScala {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.api.scala._//1:注册文件env.registerCachedFile("d:\\data\\file\\a.txt","b.txt")val data = env.fromElements("a","b","c","d")val result = data.map(new RichMapFunction[String,String] {override def open(parameters: Configuration): Unit = {super.open(parameters)val myFile = getRuntimeContext.getDistributedCache.getFile("b.txt")val lines = FileUtils.readLines(myFile)val it = lines.iterator()while (it.hasNext){val line = it.next();println("line:"+line)}}override def map(value: String) = {value}})result.print()}
}

1.13.、1.14.Flink 支持的DataType和序列化、Flink Broadcast Accumulators Counters Distributed Cache相关推荐

  1. Linux 4.13/4.14内核中带来的ULP(Upper Layer Protocol)

    序 过了一个很爽的国庆假期,跟小小的小男朋友家长一起回其老家尝到了潮汕美食,南澳岛捕鱼捕虾,海鲜撑到爆,回到深圳次日小小另一个小朋友家长又带我们到东莞长安尝到了正宗的恩施土家菜,几天下来喝了几顿爽酒, ...

  2. Thinkpad W520 + Ubuntu 12.04LTS, 13.10, 14.04LTS安装Nvidia显卡驱动设置

    Thinkpad W520 + Ubuntu 12.04LTS, 13.10, 14.04LTS安装Nvidia显卡驱动设置 http://henzhai.com/tech/2012/07/w520- ...

  3. Xamarin图表开发基础教程(13)OxyPlot框架支持的其它图表

    Xamarin图表开发基础教程(13)OxyPlot框架支持的其它图表 除了以上提到的图表外,OxyPlot组件还包含了6种类型的其它图表,分别为等高线图.箱线图.饼图.热图.散点图和散点误差图,如图 ...

  4. Node.js 15 正式发布,14 将支持到 2023 年

    来源 | https://www.oschina.net/news/119346/node-js-15-released JavaScript 运行时 Node.js 已经更新到了 15 版本.Nod ...

  5. Spring Framework 5.2.5 发布,增加对 Java 14 的支持

    点击上方蓝色"程序猿DD",选择"设为星标" 回复"资源"获取独家整理的学习资料! 来源 | https://www.oschina.net ...

  6. Apache Flink vs Apache Spark——感觉二者是互相抄袭啊 看谁的好就抄过来 Flink支持在runtime中的有环数据流,这样表示机器学习算法更有效而且更有效率...

    Apache Flink是什么 Flink是一款新的大数据处理引擎,目标是统一不同来源的数据处理.这个目标看起来和Spark和类似.没错,Flink也在尝试解决 Spark在解决的问题.这两套系统都在 ...

  7. node 更新_Node.js 15 正式发布,14 将支持到 2023 年

    来源 | https://www.oschina.net/news/119346/node-js-15-releasedJavaScript 运行时 Node.js 已经更新到了 15 版本.Node ...

  8. Apache Flink 零基础入门(九)Flink支持哪些数据类型

    Flink有7种数据类型分别是: Java Tuples and Scala Case Classes Java POJOs Primitive Types Regular Classes Value ...

  9. 通话录音_iOS 14将支持通话录音功能,但有隐性使用条件

    6月9日消息,近日苹果已经正式宣布将于本月20日举行线上WWDC 2020大会,届时iOS 14将正式和大家见面.今日有外媒根据泄露的代码发现苹果或将在iOS 14中添加一项大家期待已久的新功能--支 ...

最新文章

  1. 在GridView中添加按钮后,如何触发按钮的各种事件?
  2. 吊打一切现有开源OCR项目:效果再升7%,速度提升220%
  3. Prepare for Mac App Store Submission--为提交到Mac 应用商店做准备
  4. android时间戳字体,Android获取当前时间戳?
  5. windows操作系统,python环境下django的自动安装
  6. 【Flink】Flink 写入 kafka 报错 Failed to send data to Kafka: Expiring 4 record(s) for 20001 ms has passed
  7. 2021-2025年中国专业灯具行业市场供需与战略研究报告
  8. linux 分区 LVM 挂载
  9. Java设计模式------工厂模式-------工厂方法模式
  10. Delphi XE组件开发技术
  11. 游戏动作3d模型素材推荐 精品 小众
  12. 阿里云邮箱推送发送邮件 25端口禁用 问题
  13. React Native常用三方组件库大全
  14. linux 下载百度网盘资源 centos安装aria2
  15. android动态开场,看得见的数据结构Android版之开篇前言+完篇总结
  16. 区块链靠什么开启下一个互联网传奇?迅雷链:回归技术
  17. pdf编辑软件哪个好用?这个办法值得一试
  18. 现代电子计算机音乐制作,现代电子音乐制作利器——Alesis VI61 MIDI键盘
  19. 目标管理是项目管理的核心思想之
  20. 北京SAP-AGS CoE support consultant intern 面试总结

热门文章

  1. java满天星星代码_满天星空的代码实现
  2. opencv学习笔记3:像素处理
  3. VTK:正态估计用法实战
  4. VTK:外部轮廓用法实战
  5. wxWidgets 示例展示了 wxSecretStore 类的使用
  6. boost::signals2::trackable相关的测试程序
  7. select_arg_from_python相关的测试程序
  8. boost::geometry::compress_variant用法的测试程序
  9. boost::geometry模块自定义指针到点示例
  10. boost::to_address用法实例