Flink提供三层API,每个API在简洁性和表达之间提供不同的权衡,并针对不同的用例
SQL/Table API(dynamic tables)
DataStream API(streams,windows)
ProcessFunction(event,state,time)

不要跟ProcessWindowFunction混为一谈
ProcessFunction是一个低阶的流处理操作,它可以访问流处理程序的基础构建模块:
事件(event)(流元素)
状态(state)(容错性,一致性,仅在keyed stream中)
定时器(times)(event time和processing time,仅在keyed stream中)

ProcessFunction可以看做是一个具有keyed state和timers访问权的FlatMapFunction
通过FuntimeContext访问keyed state

计时器允许应用程序对处理时间和事件时间中的更改做出响应。对processElement(...)函数每次
调用都获得一个Context对象,该对象可以访问元素的event time timestamp和TimerService

TimerService可用于将来的event/process time 瞬间注册回调。当达到计时器的特定时间时,将调用onTimer(...)方法。在该调用期间,所有状态都再次限定
在创建计时器时使用的键的范围内,从而允许计时器操作键控状态

简单来说:ProcessFunction可以看做是一个具有keyed state和timers访问权限的FlatMapFunction,是一个低阶的流处理操作算子;需要说明的是:ProcessFunction不同于
windowFunction和ProcessWindowFunction,后2者属于DateStream Api的范畴,大家使用的时候可以注意一下:ProcessFunction类可以重谢Open和Close方法。

ProcessFunction:

package Flink_API;import com.alibaba.fastjson.JSON;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.util.Collector;import java.io.Serializable;
import java.util.Properties;public class TestProcessFunction {public static void main(String[] rags) throws Exception {//获取Flink的运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);env.setParallelism(1);//读取用户浏览信息Properties consumerProperties=new Properties();consumerProperties.setProperty("bootstrap.servers","page01");consumerProperties.setProperty("groud.id","browsegroup");DataStreamSource<String> dataStreamSource=env.addSource(new FlinkKafkaConsumer010<String>("topic",new SimpleStringSchema(),consumerProperties));//解析数据DataStream<UserBrowseLog> processData=dataStreamSource.process(new ProcessFunction<String, UserBrowseLog>() {@Overridepublic void processElement(String s, Context context, Collector<UserBrowseLog> collector) throws Exception {try{UserBrowseLog browseLog= JSON.parseObject(s,UserBrowseLog.class);if(browseLog!=null){collector.collect(browseLog);}}catch(Exception e){System.out.print("解析Json_UserBrowseLog异常,异常信息是:"+e.getMessage());}}});processData.print();env.execute("TestProcessFunction");}public static class UserBrowseLog implements Serializable {private String userID;private String eventTime;private String eventType;private String productID;private Integer productPrice;public String getUserID() {return userID;}public void setUserID(String userID) {this.userID = userID;}public String getEventTime() {return eventTime;}public void setEventTime(String eventTime) {this.eventTime = eventTime;}public String getEventType() {return eventType;}public void setEventType(String eventType) {this.eventType = eventType;}public String getProductID() {return productID;}public void setProductID(String productID) {this.productID = productID;}public Integer getProductPrice() {return productPrice;}public void setProductPrice(Integer productPrice) {this.productPrice = productPrice;}@Overridepublic String toString() {return "UserBrowseLog{" +"userID='" + userID + '\'' +", eventTime='" + eventTime + '\'' +", eventType='" + eventType + '\'' +", productID='" + productID + '\'' +", productPrice=" + productPrice +'}';}}
}

数据处理的核心就是对数据进行各种转化Transformation操作,在Flink上就是通过转换将一个或多个DataStream转换成新的DataStream

Map算子:输入一个元素,然后返回一个元素,中间可以进行清洗转换等操作,一对一转换,即一条转换成另外一条 

package Flink_API;import com.alibaba.fastjson.JSON;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.util.Collector;import java.io.Serializable;
import java.util.Properties;public class TestMap {public static void main(String[] rags) throws Exception {//获取Flink的运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);env.setParallelism(1);//读取用户浏览信息Properties consumerProperties=new Properties();consumerProperties.setProperty("bootstrap.servers","page01");consumerProperties.setProperty("groud.id","browsegroup");DataStreamSource<String> dataStreamSource=env.addSource(new FlinkKafkaConsumer010<String>("topic",new SimpleStringSchema(),consumerProperties));//解析数据DataStream<UserBrowseLog> processData=dataStreamSource.map(new MapFunction<String, UserBrowseLog>() {@Overridepublic UserBrowseLog map(String s) throws Exception {UserBrowseLog browseLog=null;try{browseLog= JSON.parseObject(s, UserBrowseLog.class);System.out.print(browseLog==null);}catch(Exception e){System.out.print("解析Json_UserBrowseLog异常,异常信息是:"+e.getMessage());}return browseLog;}});processData.print();env.execute("TestProcessFunction");}public static class UserBrowseLog implements Serializable {private String userID;private String eventTime;private String eventType;private String productID;private Integer productPrice;public String getUserID() {return userID;}public void setUserID(String userID) {this.userID = userID;}public String getEventTime() {return eventTime;}public void setEventTime(String eventTime) {this.eventTime = eventTime;}public String getEventType() {return eventType;}public void setEventType(String eventType) {this.eventType = eventType;}public String getProductID() {return productID;}public void setProductID(String productID) {this.productID = productID;}public Integer getProductPrice() {return productPrice;}public void setProductPrice(Integer productPrice) {this.productPrice = productPrice;}@Overridepublic String toString() {return "UserBrowseLog{" +"userID='" + userID + '\'' +", eventTime='" + eventTime + '\'' +", eventType='" + eventType + '\'' +", productID='" + productID + '\'' +", productPrice=" + productPrice +'}';}}}

FlatMap:输入一个元素,可以返回0个,1个活多个元素,即编程0行或者多行

小知识汇总:

Flink提交方式俩种:测试的时候用第一种,部署的时候用第二种
第一种:先创建一个集群,然后把任务提交到集群(yarn-session.sh表示启动yarn集群,-n 2表示启动俩个taskManager,-jm 1024表示jobmanager内存大小,-tm 1024表示taskmanager内存大小)
yarn-session.sh -n 2 -jm 1024 -tm 1024 -d
flink run -c xxxx.jar
第二种:提交任务,先创建一个临时集群
flink run -m yarn-cluster -yn 2-yjm 1024 -ytm 1024 -c xxxxx.jar

package Flink_API;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.util.Collector;import java.io.Serializable;
import java.util.Properties;public class TestFlatMap {public static void main(String[] rags) throws Exception {//获取Flink的运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);env.setParallelism(1);//读取用户浏览信息Properties consumerProperties=new Properties();consumerProperties.setProperty("bootstrap.servers","page01");consumerProperties.setProperty("groud.id","browsegroup");DataStreamSource<String> dataStreamSource=env.addSource(new FlinkKafkaConsumer010<String>("topic",new SimpleStringSchema(),consumerProperties));//解析数据DataStream<Tuple2<String,Integer>> processData=dataStreamSource.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {@Overridepublic void flatMap(String s, Collector<Tuple2<String,Integer>> collector) throws Exception {String[] split =s.split("\\w+");for(String word:split){collector.collect(Tuple2.of(word,1));}}}).setParallelism(3);processData.print().setParallelism(1);env.execute("TestFlatMap");}public static class UserBrowseLog implements Serializable {private String userID;private String eventTime;private String eventType;private String productID;private Integer productPrice;public String getUserID() {return userID;}public void setUserID(String userID) {this.userID = userID;}public String getEventTime() {return eventTime;}public void setEventTime(String eventTime) {this.eventTime = eventTime;}public String getEventType() {return eventType;}public void setEventType(String eventType) {this.eventType = eventType;}public String getProductID() {return productID;}public void setProductID(String productID) {this.productID = productID;}public Integer getProductPrice() {return productPrice;}public void setProductPrice(Integer productPrice) {this.productPrice = productPrice;}@Overridepublic String toString() {return "UserBrowseLog{" +"userID='" + userID + '\'' +", eventTime='" + eventTime + '\'' +", eventType='" + eventType + '\'' +", productID='" + productID + '\'' +", productPrice=" + productPrice +'}';}}
}

Flink算子(ProcessFunction,map和Flatmap)相关推荐

  1. Flink系列之:Java代码实现深入浅出的理解Flink算子的使用方法

    Flink系列之:Java代码实现深入浅出的理解Flink算子的使用方法 一.Map算子 二.filter算子 三.flatMap算子 四.keyBy算子 五.Reduce算子 六.union算子 七 ...

  2. 【Flink】ProcessFunction:Flink最底层API使用教程

    1.美图 2.概述 之前提到的一些算子和函数能够进行一些时间上的操作,但是不能获取算子当前的Processing Time或者是Watermark时间戳,调用起来简单但功能相对受限.如果想获取数据流中 ...

  3. Flink 算子Function实例化的坑

    问题回顾 关于一段代码: object MySingleObj{// 陷阱:// 单例对象中一个是可变引用,一个是可变数组var str:String = _val list = new ListBu ...

  4. stream map方法_Java Stream中map和flatMap方法

    最近看到一篇讲stream语法的文章,学习Java中map()和flatMap()方法之间的区别. 虽然看起来这两种方法都做同样的事情,都是做的映射操作,但实际上差之毫厘谬以千里. 通过演示Demo中 ...

  5. 谈谈 Swift 中的 map 和 flatMap

    map 和 flatMap 是 Swift 中两个常用的函数,它们体现了 Swift 中很多的特性.对于简单的使用来说,它们的接口并不复杂,但它们内部的机制还是非常值得研究的,能够帮助我们够好的理解 ...

  6. Optional 中的 map 和 flatMap

    在面试的时候,面试官问了我一个问题.你使用过 Optional 吗?Optional 的原理是什么?我一听,这还不简单.Optional是一个枚举. public enum Optional<W ...

  7. RxJava 中的map与flatMap

    1.map和flatMap都是接受一个函数作为参数(Func1) 2.map函数只有一个参数,参数一般是Func1,Func1的<I,O>I,O模版分别为输入和输出值的类型,实现Func1 ...

  8. Java 8 Stream Api 中的 map和 flatMap 操作

    点击上方蓝色"程序猿DD",选择"设为星标" 回复"资源"获取独家整理的学习资料! 来源 | 公众号「码农小胖哥」 1.前言 Java 8  ...

  9. map and flatmap 区别

    2019独角兽企业重金招聘Python工程师标准>>> map vs flatMap in Spark September 24, 2014Big Dataexample, spar ...

最新文章

  1. mysql正斜杠_MySQL中的正斜杠和反斜杠 | | 数据库系统概论(字符匹配)
  2. 监控HP服务器cpu状态脚本
  3. docker 和挂载文件一起打包成新镜像_Docker文件系统和数据卷
  4. PYG教程【三】对Cora数据集进行半监督节点分类
  5. 本地windows主机无法访问虚拟机里主机解决办法
  6. 到底什么才是人生最大的投资
  7. centerpython_centeros下安装python3
  8. kallsyms 压缩_initrd.img、System.map学习札记
  9. 服务器 交换机的维护,服务器路由器交换机维护设置
  10. linux matplotlib 中文显示乱码
  11. linux vi中字符替换,Linux vi替换字符串
  12. 马尔科夫区制转换matlab,马尔科夫区制转移混频向量自回归(MS-MF-VAR)模型及其Gauss实现...
  13. 抄书——最优化的理论与方法(5)——数学基础(凸集和凸函数)
  14. USB Type C 接口引脚详解
  15. Excel冻结窗口及设置下拉菜单
  16. RocketMQ(八)RocketMQ延时消息
  17. narwal机器人_省时省心才见真章!Narwal云鲸J1智能扫拖机器人国内上市
  18. 索爱E50收款蓝牙音箱,支持各种场合的商用收账音箱
  19. Python Project
  20. Flexsim物流配送中心系统仿真

热门文章

  1. php笔记之-laravel-Redis hash
  2. Laravel Migrate
  3. 《Linux内核分析》期末总结及学习心得
  4. Amazon电商数据分析——数据获取
  5. 不为失败找理由,只为成功找方法
  6. java rowmapper 通用实现_必经之路!各大网站力推Java代码优化:77案例+28技巧
  7. android studio 多个项目管理,Android Studio之同一应用创建多个Activity(一)
  8. oracle查询最近十条数据_Oracle-查询最近更新的前10条数据
  9. python自加1_使用Python如何让里面的某个参数每调用一次程序就自加1
  10. 年味十足的喜庆红色新年春节海报PSD模板