传统的大数据处理方式一般是批处理式的,也就是说,今天所收集的数据,我们明天再把今天收集到的数据算出来,以供大家使用,但是在很多情况下,数据的时效性对于业务的成败是非常关键的。

Spark 和 Flink 都是通用的开源大规模处理引擎,目标是在一个系统中支持所有的数据处理以带来效能的提升。两者都有相对比较成熟的生态系统。是下一代大数据引擎最有力的竞争者。

Spark 的生态总体更完善一些,在机器学习的集成和易用性上暂时领先。

Flink 在流计算上有明显优势,核心架构和模型也更透彻和灵活一些。

本文主要通过实例来分析flink的流式处理过程,并通过源码的方式来介绍流式处理的内部机制。

DataStream整体概述

主要分5部分,下面我们来分别介绍:

1.运行环境StreamExecutionEnvironment

StreamExecutionEnvironment是个抽象类,是流式处理的容器,实现类有两个,分别是

LocalStreamEnvironment:
RemoteStreamEnvironment:
/*** The StreamExecutionEnvironment is the context in which a streaming program is executed. A* {@linkLocalStreamEnvironment} will cause execution in the current JVM, a* {@linkRemoteStreamEnvironment} will cause execution on a remote setup.** <p>The environment provides methods to control the job execution (such as setting the parallelism* or the fault tolerance/checkpointing parameters) and to interact with the outside world (data access).**@seeorg.apache.flink.streaming.api.environment.LocalStreamEnvironment*@seeorg.apache.flink.streaming.api.environment.RemoteStreamEnvironment*/

2.数据源DataSource数据输入

包含了输入格式InputFormat

    /*** Creates a new data source.**@paramcontext The environment in which the data source gets executed.*@paraminputFormat The input format that the data source executes.*@paramtype The type of the elements produced by this input format.*/public DataSource(ExecutionEnvironment context, InputFormat<OUT, ?> inputFormat, TypeInformation<OUT>type, String dataSourceLocationName) {super(context, type);this.dataSourceLocationName =dataSourceLocationName;if (inputFormat == null) {throw new IllegalArgumentException("The input format may not be null.");}this.inputFormat =inputFormat;if (inputFormat instanceofNonParallelInput) {this.parallelism = 1;}}

flink将数据源主要分为内置数据源和第三方数据源,内置数据源有 文件,网络socket端口及集合类型数据;第三方数据源实用Connector的方式来连接如kafka Connector,es connector等,自己定义的话,可以实现SourceFunction,封装成Connector来做。

3.DataStream转换

DataStream:同一个类型的流元素,DataStream可以通过transformation转换成另外的DataStream,示例如下

@link DataStream#map

@link DataStream#filter

StreamOperator:流式算子的基本接口,三个实现类

AbstractStreamOperator:

OneInputStreamOperator:

TwoInputStreamOperator:

/*** Basic interface for stream operators. Implementers would implement one of* {@linkorg.apache.flink.streaming.api.operators.OneInputStreamOperator} or* {@linkorg.apache.flink.streaming.api.operators.TwoInputStreamOperator} to create operators* that process elements.** <p>The class {@linkorg.apache.flink.streaming.api.operators.AbstractStreamOperator}* offers default implementation for the lifecycle and properties methods.** <p>Methods of {@codeStreamOperator} are guaranteed not to be called concurrently. Also, if using* the timer service, timer callbacks are also guaranteed not to be called concurrently with* methods on {@codeStreamOperator}.**@param<OUT> The output type of the operator*/

4.DataStreamSink输出

    /*** Adds the given sink to this DataStream. Only streams with sinks added* will be executed once the {@linkStreamExecutionEnvironment#execute()}* method is called.**@paramsinkFunction*            The object containing the sink's invoke function.*@returnThe closed DataStream.*/public DataStreamSink<T> addSink(SinkFunction<T>sinkFunction) {//read the output type of the input Transform to coax out errors about MissingTypeInfo
transformation.getOutputType();//configure the type if neededif (sinkFunction instanceofInputTypeConfigurable) {((InputTypeConfigurable) sinkFunction).setInputType(getType(), getExecutionConfig());}StreamSink<T> sinkOperator = new StreamSink<>(clean(sinkFunction));DataStreamSink<T> sink = new DataStreamSink<>(this, sinkOperator);getExecutionEnvironment().addOperator(sink.getTransformation());returnsink;}

5.执行

/*** Executes the JobGraph of the on a mini cluster of ClusterUtil with a user* specified name.**@paramjobName*            name of the job*@returnThe result of the job execution, containing elapsed time and accumulators.*/@Overridepublic JobExecutionResult execute(String jobName) throwsException {//transform the streaming program into a JobGraphStreamGraph streamGraph =getStreamGraph();streamGraph.setJobName(jobName);JobGraph jobGraph=streamGraph.getJobGraph();jobGraph.setAllowQueuedScheduling(true);Configuration configuration= newConfiguration();configuration.addAll(jobGraph.getJobConfiguration());configuration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE,"0");//add (and override) the settings with what the user definedconfiguration.addAll(this.configuration);if (!configuration.contains(RestOptions.BIND_PORT)) {configuration.setString(RestOptions.BIND_PORT,"0");}int numSlotsPerTaskManager =configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, jobGraph.getMaximumParallelism());MiniClusterConfiguration cfg= newMiniClusterConfiguration.Builder().setConfiguration(configuration).setNumSlotsPerTaskManager(numSlotsPerTaskManager).build();if(LOG.isInfoEnabled()) {LOG.info("Running job on local embedded Flink mini cluster");}MiniCluster miniCluster= newMiniCluster(cfg);try{miniCluster.start();configuration.setInteger(RestOptions.PORT, miniCluster.getRestAddress().get().getPort());returnminiCluster.executeJobBlocking(jobGraph);}finally{transformations.clear();miniCluster.close();}}

6.总结

  Flink的执行方式类似于管道,它借鉴了数据库的一些执行原理,实现了自己独特的执行方式。

7.展望

Stream涉及的内容还包括Watermark,window等概念,因篇幅限制,这篇仅介绍flink DataStream API使用及原理。

下篇将介绍Watermark,下下篇是windows窗口计算。

参考资料

【1】https://baijiahao.baidu.com/s?id=1625545704285534730&wfr=spider&for=pc

【2】https://blog.51cto.com/13654660/2087705

转载于:https://www.cnblogs.com/davidwang456/p/11046857.html

flink DataStream API使用及原理相关推荐

  1. Flink DataStream API 介绍

    Flink DataStream API 介绍 StreamExecutionEnvironment #mermaid-svg-JKeWa22W2vWA4zBS {font-family:" ...

  2. 【基础】Flink -- DataStream API

    Flink -- DataStream API 执行环境 Execution Environment 创建执行环境 设置执行模式 触发程序执行 源算子 Source 从集合中读取数据 从文件读取数据 ...

  3. [Flink]Flink DataStream API 概览

    目录 什么是 DataStream 什么能被转化为流 流式Flink程序的开发流程 DataStream的数据源 迭代数据流 配置运行时参数 什么是 DataStream Datastream API ...

  4. Flink DataStream API(基础版)

    概述   DataStream(数据流)本身是 Flink 中一个用来表示数据集合的类(Class),我们编写的 Flink 代码其实就是基于这种数据类型的处理,所以这套核心API 就以DataStr ...

  5. Flink DataStream API 中的多面手——Process Function详解

    https://mp.weixin.qq.com/s/SOCAE-t25DPVlQMxuOT0jw 引言 在Flink的时间与watermarks详解这篇文章中,阐述了Flink的时间与水位线的相关内 ...

  6. flink DataStream API(三)事件时间-生成水印

    文章目录 生成水印 水印策略介绍 使用水印策略 处理空闲源 编写 `WatermarkGenerators` 编写周期 WatermarkGenerator 编写标点WatermarkGenerato ...

  7. flink dataset api使用及原理

    随着大数据技术在各行各业的广泛应用,要求能对海量数据进行实时处理的需求越来越多,同时数据处理的业务逻辑也越来越复杂,传统的批处理方式和早期的流式处理框架也越来越难以在延迟性.吞吐量.容错能力以及使用便 ...

  8. 学习笔记Flink(六)—— Flink DataStream API编程

    一.Flink程序构成 获取执行环境: 加载/创建初始数据: 编写对数据的转换操作: 指定计算结果存放的位置: 触发程序执行: 二.数据源 Collection 类型数据源 fromCollectio ...

  9. flink DataStream API(三)事件时间-内置水印生成器

    文章目录 内置水印生成器 单调递增的时间戳 固定的延迟时间 内置水印生成器 如生成水印中所述,Flink提供了抽象,允许程序员分配他们自己的时间戳并发出他们自己的水印.更具体地说,可以通过实现Wate ...

最新文章

  1. 2011百度之星初赛B圆环
  2. 【转】ASP.NET AJAX入门系列
  3. PAT甲级1050 String Subtraction:[C++题解]字符串作差
  4. python读xml文件生成头文件_Python根据指定文件生成XML的方法
  5. xs资料网-冲压模图档下载_伺服冲床能做的精密冲压件都有哪些?!
  6. c语言怎么判定结构体有无数据,C语言中什么是结构体,怎么定义结构体。
  7. Demuxed:编解码器和压缩的未来
  8. 配置树莓派linux的内核和编译并将镜像拷贝至树莓派
  9. 【UDP通过多线程改进,在一个窗口中同时接收又发送】
  10. 刚来公司时我却做了一件最傻的事
  11. html css 圆形按钮 仿uc,10款基于jquery的web前端动画特效
  12. 电商项目数据库设计 | 第五篇:参考京东商城详细讲解商品数据库设计
  13. 十六、 方差分析--使用Python进行双因素方差分析
  14. 与门非门在电子计算机中的应用,与非门电路
  15. 《Python编程无师自通》第20章 融会贯通
  16. 文字,字体,图像,列表
  17. 用 GitHub 搭建静态博客太繁琐?用这个小工具实现「傻瓜式」发布!
  18. JVM: PermGen space
  19. 使用 PyTorch 进行 风格迁移(Neural-Transfer)
  20. 802.11与802.3数据帧转换(即有线和无线数据帧转换)

热门文章

  1. 第二模块_找钱:融资与管理_1
  2. tcp/ip 协议栈Linux内核源码分析八 路由子系统分析三 路由表
  3. soap php 分开类,PHP SoapClient类型映射的行为有所不同
  4. c++ string 头文件_“延期不延学” 第25期 | C++篇 | C/C++常用函数
  5. Android旋转切换条目,Android:当我添加动态新的ListView条目时,ListView中的旋转器会丢失它们的值...
  6. android 关机 流程_Android系统关机的全流程解析
  7. python list 去重_Python中对列表list去重
  8. Transformer用到3D点云分割
  9. mysql8 允许外网访问
  10. cifar-10 cnn 分类