flink DataStream API使用及原理
传统的大数据处理方式一般是批处理式的,也就是说,今天所收集的数据,我们明天再把今天收集到的数据算出来,以供大家使用,但是在很多情况下,数据的时效性对于业务的成败是非常关键的。
Spark 和 Flink 都是通用的开源大规模处理引擎,目标是在一个系统中支持所有的数据处理以带来效能的提升。两者都有相对比较成熟的生态系统。是下一代大数据引擎最有力的竞争者。
Spark 的生态总体更完善一些,在机器学习的集成和易用性上暂时领先。
Flink 在流计算上有明显优势,核心架构和模型也更透彻和灵活一些。
本文主要通过实例来分析flink的流式处理过程,并通过源码的方式来介绍流式处理的内部机制。
DataStream整体概述
主要分5部分,下面我们来分别介绍:
1.运行环境StreamExecutionEnvironment
StreamExecutionEnvironment是个抽象类,是流式处理的容器,实现类有两个,分别是
LocalStreamEnvironment:
RemoteStreamEnvironment:
/*** The StreamExecutionEnvironment is the context in which a streaming program is executed. A* {@linkLocalStreamEnvironment} will cause execution in the current JVM, a* {@linkRemoteStreamEnvironment} will cause execution on a remote setup.** <p>The environment provides methods to control the job execution (such as setting the parallelism* or the fault tolerance/checkpointing parameters) and to interact with the outside world (data access).**@seeorg.apache.flink.streaming.api.environment.LocalStreamEnvironment*@seeorg.apache.flink.streaming.api.environment.RemoteStreamEnvironment*/
2.数据源DataSource数据输入
包含了输入格式InputFormat
/*** Creates a new data source.**@paramcontext The environment in which the data source gets executed.*@paraminputFormat The input format that the data source executes.*@paramtype The type of the elements produced by this input format.*/public DataSource(ExecutionEnvironment context, InputFormat<OUT, ?> inputFormat, TypeInformation<OUT>type, String dataSourceLocationName) {super(context, type);this.dataSourceLocationName =dataSourceLocationName;if (inputFormat == null) {throw new IllegalArgumentException("The input format may not be null.");}this.inputFormat =inputFormat;if (inputFormat instanceofNonParallelInput) {this.parallelism = 1;}}
flink将数据源主要分为内置数据源和第三方数据源,内置数据源有 文件,网络socket端口及集合类型数据;第三方数据源实用Connector的方式来连接如kafka Connector,es connector等,自己定义的话,可以实现SourceFunction,封装成Connector来做。
3.DataStream转换
DataStream:同一个类型的流元素,DataStream可以通过transformation转换成另外的DataStream,示例如下
@link DataStream#map
@link DataStream#filter
StreamOperator:流式算子的基本接口,三个实现类
AbstractStreamOperator:
OneInputStreamOperator:
TwoInputStreamOperator:
/*** Basic interface for stream operators. Implementers would implement one of* {@linkorg.apache.flink.streaming.api.operators.OneInputStreamOperator} or* {@linkorg.apache.flink.streaming.api.operators.TwoInputStreamOperator} to create operators* that process elements.** <p>The class {@linkorg.apache.flink.streaming.api.operators.AbstractStreamOperator}* offers default implementation for the lifecycle and properties methods.** <p>Methods of {@codeStreamOperator} are guaranteed not to be called concurrently. Also, if using* the timer service, timer callbacks are also guaranteed not to be called concurrently with* methods on {@codeStreamOperator}.**@param<OUT> The output type of the operator*/
4.DataStreamSink输出
/*** Adds the given sink to this DataStream. Only streams with sinks added* will be executed once the {@linkStreamExecutionEnvironment#execute()}* method is called.**@paramsinkFunction* The object containing the sink's invoke function.*@returnThe closed DataStream.*/public DataStreamSink<T> addSink(SinkFunction<T>sinkFunction) {//read the output type of the input Transform to coax out errors about MissingTypeInfo transformation.getOutputType();//configure the type if neededif (sinkFunction instanceofInputTypeConfigurable) {((InputTypeConfigurable) sinkFunction).setInputType(getType(), getExecutionConfig());}StreamSink<T> sinkOperator = new StreamSink<>(clean(sinkFunction));DataStreamSink<T> sink = new DataStreamSink<>(this, sinkOperator);getExecutionEnvironment().addOperator(sink.getTransformation());returnsink;}
5.执行
/*** Executes the JobGraph of the on a mini cluster of ClusterUtil with a user* specified name.**@paramjobName* name of the job*@returnThe result of the job execution, containing elapsed time and accumulators.*/@Overridepublic JobExecutionResult execute(String jobName) throwsException {//transform the streaming program into a JobGraphStreamGraph streamGraph =getStreamGraph();streamGraph.setJobName(jobName);JobGraph jobGraph=streamGraph.getJobGraph();jobGraph.setAllowQueuedScheduling(true);Configuration configuration= newConfiguration();configuration.addAll(jobGraph.getJobConfiguration());configuration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE,"0");//add (and override) the settings with what the user definedconfiguration.addAll(this.configuration);if (!configuration.contains(RestOptions.BIND_PORT)) {configuration.setString(RestOptions.BIND_PORT,"0");}int numSlotsPerTaskManager =configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, jobGraph.getMaximumParallelism());MiniClusterConfiguration cfg= newMiniClusterConfiguration.Builder().setConfiguration(configuration).setNumSlotsPerTaskManager(numSlotsPerTaskManager).build();if(LOG.isInfoEnabled()) {LOG.info("Running job on local embedded Flink mini cluster");}MiniCluster miniCluster= newMiniCluster(cfg);try{miniCluster.start();configuration.setInteger(RestOptions.PORT, miniCluster.getRestAddress().get().getPort());returnminiCluster.executeJobBlocking(jobGraph);}finally{transformations.clear();miniCluster.close();}}
6.总结
Flink的执行方式类似于管道,它借鉴了数据库的一些执行原理,实现了自己独特的执行方式。
7.展望
Stream涉及的内容还包括Watermark,window等概念,因篇幅限制,这篇仅介绍flink DataStream API使用及原理。
下篇将介绍Watermark,下下篇是windows窗口计算。
参考资料
【1】https://baijiahao.baidu.com/s?id=1625545704285534730&wfr=spider&for=pc
【2】https://blog.51cto.com/13654660/2087705
转载于:https://www.cnblogs.com/davidwang456/p/11046857.html
flink DataStream API使用及原理相关推荐
- Flink DataStream API 介绍
Flink DataStream API 介绍 StreamExecutionEnvironment #mermaid-svg-JKeWa22W2vWA4zBS {font-family:" ...
- 【基础】Flink -- DataStream API
Flink -- DataStream API 执行环境 Execution Environment 创建执行环境 设置执行模式 触发程序执行 源算子 Source 从集合中读取数据 从文件读取数据 ...
- [Flink]Flink DataStream API 概览
目录 什么是 DataStream 什么能被转化为流 流式Flink程序的开发流程 DataStream的数据源 迭代数据流 配置运行时参数 什么是 DataStream Datastream API ...
- Flink DataStream API(基础版)
概述 DataStream(数据流)本身是 Flink 中一个用来表示数据集合的类(Class),我们编写的 Flink 代码其实就是基于这种数据类型的处理,所以这套核心API 就以DataStr ...
- Flink DataStream API 中的多面手——Process Function详解
https://mp.weixin.qq.com/s/SOCAE-t25DPVlQMxuOT0jw 引言 在Flink的时间与watermarks详解这篇文章中,阐述了Flink的时间与水位线的相关内 ...
- flink DataStream API(三)事件时间-生成水印
文章目录 生成水印 水印策略介绍 使用水印策略 处理空闲源 编写 `WatermarkGenerators` 编写周期 WatermarkGenerator 编写标点WatermarkGenerato ...
- flink dataset api使用及原理
随着大数据技术在各行各业的广泛应用,要求能对海量数据进行实时处理的需求越来越多,同时数据处理的业务逻辑也越来越复杂,传统的批处理方式和早期的流式处理框架也越来越难以在延迟性.吞吐量.容错能力以及使用便 ...
- 学习笔记Flink(六)—— Flink DataStream API编程
一.Flink程序构成 获取执行环境: 加载/创建初始数据: 编写对数据的转换操作: 指定计算结果存放的位置: 触发程序执行: 二.数据源 Collection 类型数据源 fromCollectio ...
- flink DataStream API(三)事件时间-内置水印生成器
文章目录 内置水印生成器 单调递增的时间戳 固定的延迟时间 内置水印生成器 如生成水印中所述,Flink提供了抽象,允许程序员分配他们自己的时间戳并发出他们自己的水印.更具体地说,可以通过实现Wate ...
最新文章
- 2011百度之星初赛B圆环
- 【转】ASP.NET AJAX入门系列
- PAT甲级1050 String Subtraction:[C++题解]字符串作差
- python读xml文件生成头文件_Python根据指定文件生成XML的方法
- xs资料网-冲压模图档下载_伺服冲床能做的精密冲压件都有哪些?!
- c语言怎么判定结构体有无数据,C语言中什么是结构体,怎么定义结构体。
- Demuxed:编解码器和压缩的未来
- 配置树莓派linux的内核和编译并将镜像拷贝至树莓派
- 【UDP通过多线程改进,在一个窗口中同时接收又发送】
- 刚来公司时我却做了一件最傻的事
- html css 圆形按钮 仿uc,10款基于jquery的web前端动画特效
- 电商项目数据库设计 | 第五篇:参考京东商城详细讲解商品数据库设计
- 十六、 方差分析--使用Python进行双因素方差分析
- 与门非门在电子计算机中的应用,与非门电路
- 《Python编程无师自通》第20章 融会贯通
- 文字,字体,图像,列表
- 用 GitHub 搭建静态博客太繁琐?用这个小工具实现「傻瓜式」发布!
- JVM: PermGen space
- 使用 PyTorch 进行 风格迁移(Neural-Transfer)
- 802.11与802.3数据帧转换(即有线和无线数据帧转换)
热门文章
- 第二模块_找钱:融资与管理_1
- tcp/ip 协议栈Linux内核源码分析八 路由子系统分析三 路由表
- soap php 分开类,PHP SoapClient类型映射的行为有所不同
- c++ string 头文件_“延期不延学” 第25期 | C++篇 | C/C++常用函数
- Android旋转切换条目,Android:当我添加动态新的ListView条目时,ListView中的旋转器会丢失它们的值...
- android 关机 流程_Android系统关机的全流程解析
- python list 去重_Python中对列表list去重
- Transformer用到3D点云分割
- mysql8 允许外网访问
- cifar-10 cnn 分类