​编辑执行环境

创建执行环境

执行模式

触发程序执行

源算子(Source)

读取有界数据流

读取无界数据

读取自定义数据源(源算子)


DataStream是一个 Flink 程序,其实就是对 DataStream 的各种转换。具体来说,代码基本上都由以下几 部分构成:

  • 获取执行环境(Execution Environment)
  • 读取数据源(Source)
  • 定义基于数据的转换操作(Transformations)
  • 定义计算结果的输出位置(Sink)
  • 触发程序执行(Execute)

 执行环境

   创建执行环境

编写Flink 程 序 的 第 一 步 , 就 是 创 建 执 行 环 境 。 我 们 要 获 取 的 执 行 环 境 , 是 StreamExecutionEnvironment 类的对象,这是所有 Flink 程序的基础。在代码中创建执行环境的 方式,就是调用这个类的静态方法,具体有以下三种:

1、getExecutionEnvironment(智能)

就是直接调用 getExecutionEnvironment 方法。它会根据当前运行的上下文 直接得到正确的结果;也就是说,这个方法会根据当前运行的方式,自行决定该返回什么样的 运行环境。

val env = StreamExecutionEnvironment.getExecutionEnvironment

这种“智能”的方式不需要我们额外做判断,用起来简单高效,是最常用的一种创建执行 环境的方式。

2、createLocalEnvironment(本地)

这个方法返回一个本地执行环境。可以在调用时传入一个参数,指定默认的并行度;如果 不传入,则默认并行度就是本地的 CPU 核心数。

val localEnvironment = StreamExecutionEnvironment.createLocalEnvironment()

3.、createRemoteEnvironment(远程)

这个方法返回集群执行环境。需要在调用时指定 JobManager 的主机名和端口号,并指定 要在集群中运行的 Jar 包。

val remoteEnv = StreamExecutionEnvironment.createRemoteEnvironment("host", // JobManager 主机名1234, // JobManager 进程端口号"path/to/jarFile.jar" // 提交给 JobManager 的 JAR 包
)

执行模式

在之前的 Flink 版本中,批处理的执行环境与流处理类似,是调用类 ExecutionEnvironment 的静态方法,并返回它的对象:

// 批处理环境
val batchEnv = ExecutionEnvironment.getExecutionEnvironment
// 流处理环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
  1. 流执行模式(STREAMING) 这是 DataStream API 最经典的模式,一般用于需要持续实时处理的无界数据流。默认情 况下,程序使用的就是 STREAMING 执行模式。
  2. 批执行模式(BATCH) 专门用于批处理的执行模式, 这种模式下,Flink 处理作业的方式类似于 MapReduce 框架。 对于不会持续计算的有界数据,我们用这种模式处理会更方便。
  3. 自动模式(AUTOMATIC) 在这种模式下,将由程序根据输入数据源是否有界,来自动选择执行模式。 由于 Flink 程序默认是 STREAMING 模式,我们这里重点介绍一下 BATCH 模式的配置。

 BATCH (批处理)模式的配置,主要有两种方式:

        1、通过命令行配置

在提交作业时,增加 execution.runtime-mode 参数,指定值为 BATCH。

bin/flink run -Dexecution.runtime-mode=BATCH ...

2、通过代码配置(不推荐)

在代码中,直接基于执行环境调用 setRuntimeMode 方法,传入 BATCH 模式。

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setRuntimeMode(RuntimeExecutionMode.BATCH)

触发程序执行

我们需要显式地调用执行环境的 execute()方法,来触发程序执行。execute()方法将一直等 待作业完成,然后返回一个执行结果(JobExecutionResult)。

env.execute()

源算子(Source)

Flink 可以从各种来源获取数据,然后构建 DataStream 进行转换处理。一般将数据的输入 来源称为数据源,而读取数据的算子就是源算子(Source)。所以,Source 就是我们整个处理 程序的输入端。

Flink 代码中通用的添加 Source 的方式,是调用执行环境的 addSource()方法,方法传入一个对象参数,需要实现 SourceFunction 接口,返回一个 DataStream。

//通过调用 addSource()方法可以获取 DataStream 对象
val stream = env.addSource(...)

读取有界数据流

        案例需求:

为了更好地理解,我们先构建一个实际应用场景。比如网站的访问操作,可以抽象成一个 三元组(用户名,用户访问的 url,用户访问 url 的时间戳),所以在这里,我们可以创建一个 类 Event,将用户行为包装成它的一个对象。Event 包含了以下一些字段

 定义样例类

case class Event(user: String, url: String, timestamp: Long)

1、从元素中读取数据

//从元素中读取数据val stream: DataStream[Int] = env.fromElements(1, 2, 3, 4, 5, 6) //指定元素数据//创建当前样例类Event的对象val value1: DataStream[Event] = env.fromElements(Event("张三", "01", 1000L),Event("李四", "04", 2000L),Event("王五", "01", 6000L),Event("赵六", "03", 1000L))

 2、从集合中读取数据

 //从集合中读取数据//定义一个集合val e = List(Event("张三", "01", 1000L),Event("李四", "04", 2000L),Event("王五", "01", 6000L),Event("赵六", "03", 1000L))val value2: DataStream[Event] = env.fromCollection(e)

3、从文件读取数据

真正的实际应用中,自然不会直接将数据写在代码中。通常情况下,我们会从存储介质中 获取数据,一个比较常见的方式就是读取日志文件。这也是批处理中最常见的读取方式。

val value3: DataStream[String] = env.readTextFile("datas\\wc.txt")

  读取无界数据

1、从 Socket 读取数据 

不论从集合还是文件,我们读取的其实都是有界数据。在流处理的场景中,数据往往是无 界的。一个简单的例子,就是我们之前用到的读取 socket 文本流。这种方式由于吞吐量小、 稳定性较差,一般也是用于测试。

val stream = env.socketTextStream("localhost", 7777)

2、从 Kafka 读取数据

Flink 官方提供了连接工具 flink-connector-kafka,直接帮我们实现了一个消费者 FlinkKafkaConsumer,它就是用来读取 Kafka 数据的 SourceFunction。

        添加连接器依赖

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency>

调用 env.addSource(),传入 FlinkKafkaConsumer 的对象实例即可:

def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//设置全局并行度env.setParallelism(1)//使用Properties对象保存kafa连接的相关配置val properties: Properties = new Properties()properties.setProperty("bootstrap.servers","master:9000")properties.setProperty("group.id","consumer-group")val stream: DataStream[String] = env.addSource( new FlinkKafkaConsumer[String]("clicks", new SimpleStringSchema(),properties))stream.print()env.execute()}

读取自定义数据源(源算子)

接下来我们创建一个自定义的数据源,实现 SourceFunction 接口。主要重写两个关键方法: run()和 cancel()。

  • run()方法:使用运行时上下文对象(SourceContext)向下游发送数据;
  • cancel()方法:通过标识位控制退出循环,来达到中断数据源的效果。

创建自定义读取数据源类 

class f4 extends SourceFunction[ Event ]{ //实现SourceFunction接口 泛型为之前定义好的样例类Event//标志位var running = true//重写抽象方法override def run(sourceContext: SourceFunction.SourceContext[Event]): Unit = {//随机数生成器val random = new Random ()//定义数据随机选择的范围val user = Array ("张三", "李四", "王五")val url = Array ("02", "01", "03", "04")//用标志位作为循环判断条件,不停的发出数据while (running) {val event = Event (user (random.nextInt (user.length) ), url (random.nextInt (url.length) ), Calendar.getInstance.getTimeInMillis)//调用ctx的方法向下游发送数据sourceContext.collect (event)//每隔1秒发送一条数据Thread.sleep (1000)}}override def cancel(): Unit = running = false
}

测试类运行 

def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//全局并行度设置为1env.setParallelism(1)//读取自定义的数据源val ste: DataStream[Event] = env.addSource(new f4)//输出ste.print()//执行env.execute()}

执行效果:每隔1秒自动输出一行数据

Flink-DataStream执行环境和数据读取相关推荐

  1. 【基础】Flink -- DataStream API

    Flink -- DataStream API 执行环境 Execution Environment 创建执行环境 设置执行模式 触发程序执行 源算子 Source 从集合中读取数据 从文件读取数据 ...

  2. JavaScript执行环境 + 变量对象 + 作用域链 + 闭包

    闭包真的是一个谈烂掉的内容.说到闭包,自然就涉及到执行环境.变量对象以及作用域链.汤姆大叔翻译的<深入理解JavaScript系列>很好,帮我解决了一直以来似懂非懂的很多问题,包括闭包.下 ...

  3. 隐私计算技术解读:可信执行环境(TEE)概要及应用

    本文阐释梳理了可信执行环境(TEE)的概念定义及发展脉络,剖析 TEE 与基于密码学的隐私保护技术的对比及其在联邦学习中的应用,最后介绍 TEE 的现有框架和相关应用. 作者 | 深圳市洞见智慧科技有 ...

  4. 隐私计算中可信执行环境的一知半解

    隐私计算是使数据"可用不可见"的技术,它包括了密码学.人工智能.安全硬件等众多领域交叉的学科体系.对于隐私计算而言,业界通常分为三大路径技术:以安全多方计算为代表的密码学路径.以可 ...

  5. java读取sh脚本_linux环境下java读取sh脚本并执行

    linux环境下java读取sh脚本并执行 作者: CSDN博客 更新时间:2013-09-27 11:49:16 原文链接 Process process; String cmd = "/ ...

  6. SpringBoot:yaml配置及语法、yml数据读取、多环境开发控制

    yaml 配置 YAML(YAML Ain't Markup Language),一种数据序列化格式 优点: 容易阅读 容易与脚本语言交互 以数据为核心,重数据轻格式 YAML文件扩展名 .yml(主 ...

  7. mysql executereader_C# 操作MySQL数据库, ExecuteReader()方法参数化执行T-SQL语句, 游标读取数据...

    C# 操作My SQL数据库需要引用"MySql.Data", 可通过两种方式获取. 1.从NuGet下载"Install-Package MySql.Data -Ver ...

  8. 冲量在线亮相英特尔隐私计算研讨会,基于可信执行环境技术构建更安全的数据流通方案

    近日,冲量在线受邀参与由英特尔(中国)联合上海国创科技产业创新发展中心于上海召开的"隐私计算技术研讨会--安全与效率"会议,同各位隐私计算领域的前沿技术专家共同畅谈数据智能时代隐私 ...

  9. 冲量在线受邀参加可信执行环境技术沙龙:持续深耕数据流通,构建数据生态

    在大数据和互联网技术迅速发展的今天,隐私的重要性不言而喻,每次有大公司的客户隐私泄露的情况出现,都会在网络中激起千层浪.据<中国网民权益保护调查报告2020>调查显示,82.3%的网民亲身 ...

最新文章

  1. oracle中睡眠,sql - ORACLE中的睡眠功能 - 堆栈内存溢出
  2. Android程序员如何有效提升学习效率?帮你突破瓶颈
  3. 巧用句柄函数:闪烁窗体,做提示功能时很有用哦
  4. 谷歌Edge TPU:将机器学习引入边缘,撬动边缘计算/IOT大“地球”
  5. 计算机一级excel如何选择2个,2017年计算机一级excel操作题(2)
  6. 设置背景图时防止图片拉伸的解决方法
  7. 【高校宿舍管理系统】第一章 建立数据库以及项目框架搭建
  8. python是什么 自学-自学python用什么系统好
  9. UBUNTU安装后的root/su密码问题
  10. maven 项目搭建
  11. 如何刷访问量 的详细介绍
  12. ## Android Studio 开发(四)--蓝牙通信
  13. 接口管理工具Rap的安装
  14. 我学ERP 之 金蝶ERP-K3_第4章 销售管理
  15. linux sftp与ftp,Linux ftp和sftp命令
  16. fedora下载中的kde、xfce、workstation区别
  17. 廖雪峰讲python高阶函数求导_廖雪峰python课程笔记
  18. Linux安装MySQL(使用yum)
  19. 谁在唱衰PC?说出你的理由
  20. processing判断一个点(鼠标事件)是否在三角形、圆、椭圆、矩形内之第二章(超详细鼠标交互)

热门文章

  1. 计算机组成原理实验信号cpu,计算机组成原理实验43_实验报告
  2. 现代诗与古典传统的关系
  3. 前端MUI+H5+HBuilderX开发APP(IOS,android),后台Springboot,java学习与实践文章,更新中(二)
  4. python|简单实现英文单词大小写转化
  5. MySQL数据库的核心MVCC详解
  6. [ java ] 坦克大战 5.0 ~ 最终完整版
  7. electron使用下载监听接口(will-download)出现不触发done
  8. SOLIDWORKS怎样做填充阵列
  9. 火星超大nasa开源全景图分享
  10. 晨读一年的复盘会议_一年后,在家中远程办公,网真和视频会议