1、Transform

1.1 map

val streamMap = stream.map { x => x * 2 }

1.2 flatmap

flatMap的函数签名:def flatMap[A,B](as: List[A])(f: A ⇒ List[B]): List[B]

例如: flatMap(List(1,2,3))(i ⇒ List(i,i))

结果是List(1,1,2,2,3,3)

而List("a b", "c d").flatMap(line ⇒ line.split(" "))

结果是List(a, b, c, d)

val streamFlatMap = stream.flatMap{

x => x.split(" ")

}

1.3 filter

val streamFilter = stream.filter{

x => x == 1

}

1.4 keyby

DataStream→ KeyedStream:逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同key的元素,在内部以hash的形式实现的。

val streamKeyby = stream.keyBy(0)

1.5 滚动聚合算子(Rolling Aggregation)

这些算子可以针对KeyedStream的每一个支流做聚合。

sum()

min()

max()

minBy()

maxBy()

1.6 reduce

KeyedStream → DataStream:一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。

val env: StreamExecutionEnvironment =StreamExecutionEnvironment.getExecutionEnvironment

val dataDS: DataStream[String]= env.readTextFile("input/data.txt")

val ds: DataStream[WaterSensor]=dataDS.map(

s=>{

val datas= s.split(",")

WaterSensor(datas(0), datas(1).toLong, datas(2).toDouble)

}

).keyBy(0).reduce(

(s1, s2)=>{

println(s"${s1.vc} <==> ${s2.vc}")

WaterSensor(s1.id, s1.ts, math.max(s1.vc, s2.vc))

}

)

ds.print()

env.execute("sensor")

1.7 split & select

Split算子

DataStream → SplitStream:根据某些特征把一个DataStream拆分成两个或者多个DataStream。

val split =someDataStream.split(

(num: Int)=>(num% 2) match {case 0 => List("even")case 1 => List("odd")

}

)

Select算子

SplitStream→DataStream:从一个SplitStream中获取一个或者多个DataStream。

val even = split select("even")

val odd= split select("odd")

val all= split.select("even","odd")

1.8 Connect & CoMap

DataStream,DataStream → ConnectedStreams:连接两个保持他们类型的数据流,两个数据流被Connect之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。

someStream : DataStream[Int] =...

otherStream : DataStream[String]=...

val connectedStreams= someStream.connect(otherStream)

connectedStreams.map(

(_ : Int)=> true,

(_ : String)=> false)

connectedStreams.flatMap(

(_ : Int)=> true,

(_ : String)=> false)

1.9 Union

DataStream → DataStream:对两个或者两个以上的DataStream进行union操作,产生一个包含所有DataStream元素的新DataStream。

dataStream.union(otherStream1, otherStream2, ...)

Connect与Union区别:

Union之前两个流的类型必须是一样,Connect可以不一样,在之后的coMap中再去调整成为一样的。

Connect只能操作两个流,Union可以操作多个。

2、支持的数据类型

Flink 会尽力推断有关数据类型的大量信息,这些数据会在分布式计算期间被网络交换或存储。 可以把它想象成一个推断表结构的数据库。在大多数情况下,Flink 可以依赖自身透明的推断出所有需要的类型信息。 掌握这些类型信息可以帮助 Flink 实现很多意想不到的特性:

对于使用 POJOs 类型的数据,可以通过指定字段名(比如 dataSet.keyBy("username") )进行 grouping 、joining、aggregating 操作。 类型信息可以帮助 Flink 在运行前做一些拼写错误以及类型兼容方面的检查,而不是等到运行时才暴露这些问题。

Flink 对数据类型了解的越多,序列化和数据布局方案就越好。 这对 Flink 中的内存使用范式尤为重要(可以尽可能处理堆上或者堆外的序列化数据并且使序列化操作很廉价)。

最后,它还使用户在大多数情况下免于担心序列化框架以及类型注册。

通常在应用运行之前的阶段 (pre-flight phase),需要数据的类型信息 - 也就是在程序对 DataStream 或者 DataSet 的操作调用之后,在 execute()、print()、count()、collect() 调用之前。

Flink支持Java和Scala中所有常见数据类型。使用最广泛的类型有以下几种。

2.0 Flink 的 TypeInformation 类

类 TypeInformation 是所有类型描述符的基类。该类表示类型的基本属性,并且可以生成序列化器,在一些特殊情况下可以生成类型的比较器。 (请注意,Flink 中的比较器不仅仅是定义顺序 - 它们是处理键的基础工具)

Flink 内部对类型做了如下区分:

基础类型:所有的 Java 主类型(primitive)以及他们的包装类,再加上 void、String、Date、BigDecimal 以及 BigInteger。

主类型数组(primitive array)以及对象数组

复合类型

Flink 中的 Java元组 (Tuples)(元组是 Flink Java API 的一部分):最多支持25个字段,null 是不支持的。

Scala 中的 case classes (包括 Scala 元组):null 是不支持的。

Row:具有任意数量字段的元组并且支持 null 字段。。

POJOs: 遵循某种类似 bean 模式的类。

辅助类型 (Option、Either、Lists、Maps 等)

泛型类型:这些不是由 Flink 本身序列化的,而是由 Kryo 序列化的。

POJOs 是特别有趣的,因为他们支持复杂类型的创建以及在键的定义中直接使用字段名: dataSet.join(another).where("name").equalTo("personName") 它们对运行时也是透明的,并且可以由 Flink 非常高效地处理。

TypeInformation 支持以下几种类型:

BasicTypeInfo: 任意Java 基本类型或 String 类型

BasicArrayTypeInfo: 任意Java基本类型数组或 String 数组

WritableTypeInfo: 任意 Hadoop Writable 接口的实现类

TupleTypeInfo: 任意的 Flink Tuple 类型(支持Tuple1 to Tuple25)。Flink tuples 是固定长度固定类型的Java Tuple实现

CaseClassTypeInfo: 任意的 Scala CaseClass(包括 Scala tuples)

PojoTypeInfo: 任意的 POJO (Java or Scala),例如,Java对象的所有成员变量,要么是 public 修饰符定义,要么有 getter/setter 方法

GenericTypeInfo: 任意无法匹配之前几种类型的类

针对前六种类型数据集,Flink皆可以自动生成对应的TypeSerializer,能非常高效地对数据集进行序列化和反序列化。

2.1 基础数据类型

Flink支持所有的Java和Scala基础数据类型,Int, Double, Long, String, …​

val numbers: DataStream[Long] = env.fromElements(1L, 2L, 3L, 4L)

numbers.map( n=> n + 1 )

2.2 Java和Scala元组(Tuples)

val persons: DataStream[(String, Integer)] =env.fromElements(

("Adam", 17),

("Sarah", 23) )

persons.filter(p=> p._2 > 18)

2.3 Scala样例类(case classes)

case classPerson(name: String, age: Int)

val persons: DataStream[Person]=env.fromElements(

Person("Adam", 17),

Person("Sarah", 23) )

persons.filter(p=> p.age > 18)

2.4 Java简单对象(POJOs)

public classPerson {publicString name;public intage;publicPerson() {}public Person(String name, intage) {this.name =name;this.age =age;

}

}

DataStream persons =env.fromElements(new Person("Alex", 42),new Person("Wendy", 23));

2.5 其它(Arrays, Lists, Maps, Enums, 等等)

Flink对Java和Scala中的一些特殊目的的类型也都是支持的,比如Java的ArrayList,HashMap,Enum等等。

3、实现UDF函数----更细粒度的控制流

Flink在使用各种不同算子的同时,为了能更细粒度的控制数据和操作数据,给开发者提供了对现有函数进行扩展的能力

3.1 函数类(Function Classes)

Flink暴露了所有udf函数的接口(实现方式为接口或者抽象类)。例如MapFunction, FilterFunction, ProcessFunction等等。

自定义函数类实现MapFunction接口:

def main(args: Array[String]): Unit ={//TODO 从文件中获取数据源

val env: StreamExecutionEnvironment =StreamExecutionEnvironment.getExecutionEnvironment;

env.setParallelism(1)

val list=List(

WaterSensor("sensor_1", 150000L, 25),

WaterSensor("sensor_1", 150001L, 27),

WaterSensor("sensor_1", 150005L, 30),

WaterSensor("sensor_1", 150007L, 40)

)

val waterSensorDS: DataStream[WaterSensor]=env.fromCollection(list)//UDF函数:自定义函数进行数据的处理//waterSensorDS.map(ws=>(ws.id, ws.vc))//也可以使用函数类来代替匿名函数

val mapFunctionDS: DataStream[(String, Int)] = waterSensorDS.map( newMyMapFunction )

mapFunctionDS.print("mapfun>>>")

env.execute()

}//自定义UDF函数。来实现映射转换功能//1. 继承MapFunction//2. 重写方法

classMyMapFunction extends MapFunction[WaterSensor, (String, Int)]{override def map(ws: WaterSensor): (String, Int) ={

(ws.id, ws.vc)

}

}

3.2 匿名函数(Lambda Functions)

val tweets: DataStream[String] =...

val flinkTweets= tweets.filter(_.contains("flink"))

3.3 富函数(Rich Functions)

“富函数”是DataStream API提供的一个函数类的接口,所有Flink函数类都有其Rich版本。它与常规函数的不同在于,可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。也有意味着提供了更多的,更丰富的功能

RichMapFunction

RichFlatMapFunction

RichFilterFunction

...

Rich Function有一个生命周期的概念。典型的生命周期方法有:

open()方法是rich function的初始化方法,当一个算子例如map或者filter被调用之前open()会被调用。

close()方法是生命周期中的最后一个调用的方法,做一些清理工作。

getRuntimeContext()方法提供了函数的RuntimeContext的一些信息,例如函数执行的并行度,任务的名字,以及state状态

def main(args: Array[String]): Unit ={//TODO 从文件中获取数据源

val env: StreamExecutionEnvironment =StreamExecutionEnvironment.getExecutionEnvironment;

env.setParallelism(1)

val list=List(

WaterSensor("sensor_1", 150000L, 25),

WaterSensor("sensor_1", 150001L, 27),

WaterSensor("sensor_1", 150005L, 30),

WaterSensor("sensor_1", 150007L, 40)

)

val waterSensorDS: DataStream[WaterSensor]=env.fromCollection(list)//UDF函数:自定义函数进行数据的处理//waterSensorDS.map(ws=>(ws.id, ws.vc))//也可以使用函数类来代替匿名函数

val mapFunctionDS: DataStream[(String, Int)] = waterSensorDS.map( newMyMapRichFunction )

mapFunctionDS.print("mapfun>>>")

env.execute()

}//自定义UDF 富函数。来实现映射转换功能//1. 继承RichMapFunction//2. 重写方法

classMyMapRichFunction extends RichMapFunction[WaterSensor, (String, Int)] {override def open(parameters: Configuration): Unit =super.open(parameters)override def map(ws: WaterSensor): (String, Int) ={//getRuntimeContext.

(ws.id, getRuntimeContext.getIndexOfThisSubtask)

}override def close(): Unit =super.close()

}

flink的java api_Flink 流处理API之二相关推荐

  1. java 有多少api_Java常用API(二)

    API 正则表达式 正则表达式的概念 正则表达式(英语:Regular Expression,在代码中常简写为regex) 正则表达式是一个字符串,使用单个字符串来描述.用来定义匹配规则,匹配一系列符 ...

  2. flink读取不到文件_Flink流处理API——Source

    本文主要从以下几个方面介绍Flink的流处理API--Source 一.从集合中读取数据 二.从文件中读取数据 三.从Kafka中读取数据 四.自定义Source 数据处理的过程基本可以分为三个阶段分 ...

  3. Flink教程(06)- Flink批流一体API(Source示例)

    文章目录 01 引言 02 Source 2.1 基于集合的Source 2.2 基于文件的Source 2.3 基于Socket的Source 2.4 自定义Source 2.4.1 案例 - 随机 ...

  4. java流式api,Java 8 中流式API性能基准测试

    测试代码 package hello.test; import org.openjdk.jmh.annotations.*; import org.openjdk.jmh.runner.Runner; ...

  5. Flink教程(10)- Flink批流一体API(其它)

    文章目录 01 引言 02 累加器 2.1 相关API 2.2 示例代码 03 广播变量 3.1 原理 3.2 示例代码 04 分布式缓存 4.1 原理 4.2 示例代码 05 文末 01 引言 在前 ...

  6. Flink教程(09)- Flink批流一体API(Connectors示例)

    文章目录 01 引言 02 Connectors 2.1 Flink目前支持的Connectors 2.2 JDBC案例 2.3 Kafa案例 2.3.1 Kafa相关命令 2.3.2 Kafka C ...

  7. Flink教程(07)- Flink批流一体API(Transformation示例)

    文章目录 01 引言 02 Transformation 2.1 基本操作 2.1.1 API 解析 2.1.2 示例代码 2.2 合并 2.2.1 union 2.2.2 connect 2.2.3 ...

  8. Flink之Java入门

    介绍 Flink是一个处理流数据的组件,在实时计算等场景下可以发挥巨大的作用. 流数据一般分为: 有界数据流(知道数据的起点和终点,例如一个txt文件的数据) 无界数据流(不知道数据的终点,例如kaf ...

  9. Java IO流学习总结三:缓冲流-BufferedInputStream、BufferedOutputStream

    Java IO流学习总结三:缓冲流-BufferedInputStream.BufferedOutputStream 转载请标明出处:http://blog.csdn.net/zhaoyanjun6/ ...

最新文章

  1. 赖江山:生态学研究都在用哪些R包?
  2. 车辆检测--DAVE: A Unified Framework for Fast Vehicle Detection and Annotation
  3. python中md5_Python的md5是什么意思
  4. NumericUpDown 控件输入限制小数位
  5. python 有效的字母异位词
  6. 多生产者多消费者问题
  7. ksd文件怎么打开_文件KSDStore这是个什么文件 – 手机爱问
  8. 那些年职场老鸟都踩过哪些坑?送给后来人的一些职场建议
  9. 什么是JavaScript中的回调函数?
  10. Python sqrt() 函数
  11. Python学习笔记:演示多根继承
  12. VS和IIS的一些问题
  13. java 对象排重_现代化的 Java (八)——重说对象序列化
  14. Java抽象类(Abstract Class)与接口(Interface)区别
  15. TabLayout的自定义
  16. sudo spctl --master-disable_2020推荐聊城灭火器检测--正规企业--【聊城市久安消防】...
  17. lopatkin俄大神精简中文系统 Windows 10 Pro 10240.16393.150717-1719.th1_st1 x86-x64 CN Tablet PC FINAL...
  18. 如何做外链成为有效外链之做有效问答外链
  19. 使用apache的ftpserver搭建ftp服务器
  20. TI-Davinci开发系列之七DVSDK-4.03目录介绍

热门文章

  1. 瑞士名表排名介绍及手表品牌识别
  2. Deepin系统应用商店 不可用 解决办法
  3. 本周大新闻|华为发布BB观影眼镜,Geenee AR试穿加入AI生成玩法
  4. 电脑剪辑视频用什么工具?好用的视频剪辑工具推荐
  5. 工程光学第一、二、六章学习总结
  6. 5.7.1 使用向导创建交叉表查询
  7. 12306自动抢票及自动识别验证码功能(二)
  8. 关于水平集函数的重新初始化过程
  9. imToken 投资 imKey 并推出通用硬件钱包方案
  10. 创客集结号:3D打印技术原理