Filter算子:过滤作用

filter算子过滤函数
过滤函数,过滤出需要的数据,对传入的数据进行判断,如果返回true则该元素继续向下传递,如果返回false则该元素将被过滤掉。比如:如果返回来的价格大于100,我就打印出来,小于100就不打印出来

package Flink_API;import com.alibaba.fastjson.JSON;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.util.Collector;import java.io.Serializable;
import java.util.Properties;public class TestFileter {public static void main(String[] rags) throws Exception {//获取Flink的运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);env.setParallelism(1);//读取用户浏览信息Properties consumerProperties=new Properties();consumerProperties.setProperty("bootstrap.servers","page01");consumerProperties.setProperty("groud.id","browsegroup");DataStreamSource<String> dataStreamSource=env.addSource(new FlinkKafkaConsumer010<String>("topic",new SimpleStringSchema(),consumerProperties));//解析数据DataStream<UserBrowseLog> processData=dataStreamSource.process(new ProcessFunction<String, UserBrowseLog>() {@Overridepublic void processElement(String s, Context context, Collector<UserBrowseLog> collector) throws Exception {try{UserBrowseLog browseLog= JSON.parseObject(s, UserBrowseLog.class);if(browseLog!=null){collector.collect(browseLog);}}catch(Exception e){System.out.print("解析Json_UserBrowseLog异常,异常信息是:"+e.getMessage());}}}).setParallelism(2);DataStream<UserBrowseLog> filter = processData.filter(new FilterFunction<UserBrowseLog>() {@Overridepublic boolean filter(UserBrowseLog userBrowseLog) throws Exception {if(userBrowseLog.getProductPrice()>100){return true;}else{return false;}}});filter.print();env.execute("TestFileter");}public static class UserBrowseLog implements Serializable {private String userID;private String eventTime;private String eventType;private String productID;private Integer productPrice;public String getUserID() {return userID;}public void setUserID(String userID) {this.userID = userID;}public String getEventTime() {return eventTime;}public void setEventTime(String eventTime) {this.eventTime = eventTime;}public String getEventType() {return eventType;}public void setEventType(String eventType) {this.eventType = eventType;}public String getProductID() {return productID;}public void setProductID(String productID) {this.productID = productID;}public Integer getProductPrice() {return productPrice;}public void setProductPrice(Integer productPrice) {this.productPrice = productPrice;}@Overridepublic String toString() {return "UserBrowseLog{" +"userID='" + userID + '\'' +", eventTime='" + eventTime + '\'' +", eventType='" + eventType + '\'' +", productID='" + productID + '\'' +", productPrice=" + productPrice +'}';}}}

KeyBy算子:keyBY的使用:比如我们要从卡夫卡统计每个用户对商品的浏览信息,如何对用户进行分开统计,Flink里面不支持,mapRreduce是支持这样统计的。因为卡夫卡里面的数据是混乱的,做法是先统计出每个用户浏览的商品,然后再把相同的加起来统计。
同一个用户的数据发送到同一个任务里面进行处理,mapreduce可以,Flink不会。

1、keyby算子仅仅是用来进行区分,所以其后面不能跟setParallesism参数,可以理解为非真真的算子;
2、区分结果和keyBY下游算子的并行度强相关,如下游算子只有一个并行度,不管咋么分,都会分到一起;
3、对POJO类型,keyby可以通过keyBy(fieldName)制定对象当中某个字段进行分区
4、对于tuple类型,keyBy可以通过keyBy(fieldPosition)制定Tuple中的算几个元素进行分区;
5、对于一般类型,keyBy可以通过keyBy(new keySelector{...})指定字段进行分区。
注意:一下类型是无法作为key的
1、一个实体对象,没有重写hashCode方法,而是依赖object的hashCode方法2、数组类型3、基本数据类型,int、long
//按照单词进行分组
.keyBy(...fields:"word" KeydStream<SocketWindowWordCountJava.WordWithCount,Tuple>)

package Flink_API;import com.alibaba.fastjson.JSON;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.util.Collector;import java.io.Serializable;
import java.util.Properties;public class TestKeyBy {//查看一个用户一分钟内点击了多少下public static void main(String[] rags) throws Exception {//获取Flink的运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);env.setParallelism(1);//读取用户浏览信息Properties consumerProperties=new Properties();consumerProperties.setProperty("bootstrap.servers","page01");consumerProperties.setProperty("groud.id","browsegroup");DataStreamSource<String> dataStreamSource=env.addSource(new FlinkKafkaConsumer010<String>("topic",new SimpleStringSchema(),consumerProperties));//解析数据DataStream<UserBrowseLog> processData=dataStreamSource.process(new ProcessFunction<String, UserBrowseLog>() {@Overridepublic void processElement(String s, Context context, Collector<UserBrowseLog> collector) throws Exception {try{UserBrowseLog browseLog= JSON.parseObject(s,UserBrowseLog.class);if(browseLog!=null){collector.collect(browseLog);}}catch(Exception e){System.out.print("解析Json_UserBrowseLog异常,异常信息是:"+e.getMessage());}}}).setParallelism(2);KeyedStream<UserBrowseLog,String> keyBy=processData.keyBy(new KeySelector<UserBrowseLog, String>() {@Overridepublic String getKey(UserBrowseLog userBrowseLog) throws Exception {return userBrowseLog.getUserID();}});//来一条数据计算一次keyBy.print();env.execute("TestKeyBy");}public static class UserBrowseLog implements Serializable {private String userID;private String eventTime;private String eventType;private String productID;private Integer productPrice;public String getUserID() {return userID;}public void setUserID(String userID) {this.userID = userID;}public String getEventTime() {return eventTime;}public void setEventTime(String eventTime) {this.eventTime = eventTime;}public String getEventType() {return eventType;}public void setEventType(String eventType) {this.eventType = eventType;}public String getProductID() {return productID;}public void setProductID(String productID) {this.productID = productID;}public Integer getProductPrice() {return productPrice;}public void setProductPrice(Integer productPrice) {this.productPrice = productPrice;}@Overridepublic String toString() {return "UserBrowseLog{" +"userID='" + userID + '\'' +", eventTime='" + eventTime + '\'' +", eventType='" + eventType + '\'' +", productID='" + productID + '\'' +", productPrice=" + productPrice +'}';}}
}

Reduce算子:

对key相同的记录进行滚动聚合操作,也就是将当前元素和上一次reduce滚动聚合的结果进行再次聚合,然后返回一个新的值,并向下游算子输出每次滚动聚合后的结果
案例:滚动聚合每个用户浏览商品的价格

package Flink_API;import com.alibaba.fastjson.JSON;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.util.Collector;import java.io.Serializable;
import java.util.Properties;public class TestReduce {public static void main(String[] rags) throws Exception {//获取Flink的运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);env.setParallelism(1);//读取用户浏览信息Properties consumerProperties=new Properties();consumerProperties.setProperty("bootstrap.servers","page01");consumerProperties.setProperty("groud.id","browsegroup");DataStreamSource<String> dataStreamSource=env.addSource(new FlinkKafkaConsumer010<String>("topic",new SimpleStringSchema(),consumerProperties));//解析数据DataStream<UserBrowseLog> processData=dataStreamSource.process(new ProcessFunction<String, UserBrowseLog>() {@Overridepublic void processElement(String s, Context context, Collector<UserBrowseLog> collector) throws Exception {try{UserBrowseLog browseLog= JSON.parseObject(s,UserBrowseLog.class);if(browseLog!=null){collector.collect(browseLog);}}catch(Exception e){System.out.print("解析Json_UserBrowseLog异常,异常信息是:"+e.getMessage());}}}).setParallelism(2);DataStream<UserBrowseLog> reduceMap=processData.keyBy("userID").reduce(new ReduceFunction<UserBrowseLog>() {@Overridepublic UserBrowseLog reduce(UserBrowseLog t1, UserBrowseLog t2) throws Exception {int i = t1.getProductPrice() + t2.getProductPrice();return new UserBrowseLog(t1.getUserID(),"","","",i);}});//来一条计算一次,只有再窗口结束的时候才会输出reduceMap.print();env.execute("TestReduce");}public static class UserBrowseLog implements Serializable {private String userID;private String eventTime;private String eventType;private String productID;private Integer productPrice;public UserBrowseLog(String userID, String eventTime, String eventType, String productID, Integer productPrice) {this.userID = userID;this.eventTime = eventTime;this.eventType = eventType;this.productID = productID;this.productPrice = productPrice;}public String getUserID() {return userID;}public void setUserID(String userID) {this.userID = userID;}public String getEventTime() {return eventTime;}public void setEventTime(String eventTime) {this.eventTime = eventTime;}public String getEventType() {return eventType;}public void setEventType(String eventType) {this.eventType = eventType;}public String getProductID() {return productID;}public void setProductID(String productID) {this.productID = productID;}public Integer getProductPrice() {return productPrice;}public void setProductPrice(Integer productPrice) {this.productPrice = productPrice;}@Overridepublic String toString() {return "UserBrowseLog{" +"userID='" + userID + '\'' +", eventTime='" + eventTime + '\'' +", eventType='" + eventType + '\'' +", productID='" + productID + '\'' +", productPrice=" + productPrice +'}';}}
}

Aggregate算子:

对key相同的记录按照指定的字段(field)进行滚动聚合操作,也就是将当前元素和上一次滚动聚合的结果进行再次聚合,然后返回一个新的值,并向下游算子输出每次滚动聚合后的结果。min和minBy的区别是:min返回指定字段的最小值,并将该值赋值给第一条数据并返回第一条数据,而minBY返回最小值所在的那条原生记录,其余同理。

    //对keyStream中元素的第一个Filed求和DataStream<String> dataStream=keyStream.sum(0);//对keyedStream中元素的count字段求和keyedStream.sum("count");//获取keyedStream中第一个字段的最小值keyedStream.min(0);//获取keyedStream中couny字段的最小值元素keyedStream.minBy("count");keyedStream.max("count");keyedStream.maxBy(0);
package Flink_API;import com.alibaba.fastjson.JSON;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.util.Collector;import java.io.Serializable;
import java.util.Properties;public class TestAggregate {public static void main(String[] rags) throws Exception {//获取Flink的运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);env.setParallelism(1);//读取用户浏览信息Properties consumerProperties=new Properties();consumerProperties.setProperty("bootstrap.servers","page01");consumerProperties.setProperty("groud.id","browsegroup");DataStreamSource<String> dataStreamSource=env.addSource(new FlinkKafkaConsumer010<String>("topic",new SimpleStringSchema(),consumerProperties));//解析数据DataStream<UserBrowseLog> processData=dataStreamSource.process(new ProcessFunction<String, UserBrowseLog>() {@Overridepublic void processElement(String s, Context context, Collector<UserBrowseLog> collector) throws Exception {try{UserBrowseLog browseLog= JSON.parseObject(s,UserBrowseLog.class);if(browseLog!=null){collector.collect(browseLog);}}catch(Exception e){System.out.print("解析Json_UserBrowseLog异常,异常信息是:"+e.getMessage());}}}).setParallelism(2);//不加windows(统计价格流量)DataStream<UserBrowseLog> minByData=processData.keyBy("userID").maxBy("productPrice");minByData.print();env.execute("TestAggregate");}public static class UserBrowseLog implements Serializable {private String userID;private String eventTime;private String eventType;private String productID;private Integer productPrice;public String getUserID() {return userID;}public void setUserID(String userID) {this.userID = userID;}public String getEventTime() {return eventTime;}public void setEventTime(String eventTime) {this.eventTime = eventTime;}public String getEventType() {return eventType;}public void setEventType(String eventType) {this.eventType = eventType;}public String getProductID() {return productID;}public void setProductID(String productID) {this.productID = productID;}public Integer getProductPrice() {return productPrice;}public void setProductPrice(Integer productPrice) {this.productPrice = productPrice;}@Overridepublic String toString() {return "UserBrowseLog{" +"userID='" + userID + '\'' +", eventTime='" + eventTime + '\'' +", eventType='" + eventType + '\'' +", productID='" + productID + '\'' +", productPrice=" + productPrice +'}';}}}

Flink算子(Filter、KeyBy、Reduce和Aggregate)相关推荐

  1. Flink系列之:Java代码实现深入浅出的理解Flink算子的使用方法

    Flink系列之:Java代码实现深入浅出的理解Flink算子的使用方法 一.Map算子 二.filter算子 三.flatMap算子 四.keyBy算子 五.Reduce算子 六.union算子 七 ...

  2. 如何用Map、Filter和Reduce替换Python For循环?

    2020-01-06 17:00:00 全文共2375字,预计学习时长7分钟 图源:Unsplash 屏幕前的你,瞅瞅看,你的代码是不是通篇都是For循环?是不是眯眼靠近屏幕才能看清自己的代码? 没错 ...

  3. 在幕后看看Swift中的Map,Filter和Reduce的实现

    一个函数接受一些输入,对它做一些事情并创建一个输出.功能有签名和正文.如果为函数提供相同的输入,则始终获得相同的输出.简而言之,这是函数的定义. 现在我们将通过仔细研究它们来讨论更多功能.我们将在Sw ...

  4. Swift学习之map、flatMap、filter、reduce的使用

    Swift相比于Objective-C又一个重要的优点,它对函数式编程提供了很好的支持,Swift提供了map.filter.reduce这三个高阶函数作为对容器的支持. 1.map:对数组中的每一个 ...

  5. python3函数中lambda/filter/map/reduce的用法

    lambda/filter/map/reduce这几个函数面试中很肯定会用到,本篇主要介绍这几个函数的用法. 1.lambda 匿名函数,用法如下: # lambada 参数,参数,参数 : 返回的表 ...

  6. python进阶(小白也能看懂)——Map、Filter、Reduce

    python进阶(小白也能看懂)--Map.Filter.Reduce 第三篇 Map.Filter.Reduce是python中常用的函数,使用这些函数能够给我们带来很多便捷. Map map(fu ...

  7. map 长度_Python实用教程系列——高阶函数Map、Filter、Reduce

    点击上方蓝色文字关注我们吧 有你想要的精彩 作者 | 那个百分十先生出品 | Python知识学堂 上次推文我们介绍了python中的<Logging日志模块>的相关知识,这次推文我们将学 ...

  8. map for循环_如何用Map、Filter和Reduce替换Python For循环?

    全文共2375字,预计学习时长7分钟 图源:Unsplash 屏幕前的你,瞅瞅看,你的代码是不是通篇都是For循环?是不是眯眼靠近屏幕才能看清自己的代码? 没错,我就是这样的.(难以切齿) For循环 ...

  9. Python的map、filter、reduce函数

    Python的map.filter.reduce函数 map函数func作用于给定序列的每个元素,并用一个列表来提供返回值. map函数python实现代码: def map(func,seq):  ...

最新文章

  1. 学习笔记92—python 画横竖分界线
  2. Android TextView 属性设置
  3. Orchard Core Framework:ASP.NET Core 模块化,多租户框架
  4. c++并发编程基础(一):并发、并行域多线程
  5. python 调取百度网盘API,实现上传下载
  6. 从Slice_Header学习H.264(三.1)--相关细节之 POC的计算
  7. Axure8授权激活码
  8. ajax 的data,ajax请求的data数据格式
  9. mac windows linux 公用磁盘格式,win访问mac分区 linuxt系统磁盘分区知识(2)
  10. 自定义注解:具体的设计作用一般看过滤器的实现(以@Secured为例子部分理解)
  11. 复旦非全日制研究生计算机,信息整合!复旦大学非全日制研究生招生详情
  12. iconfont 图标转为字体_iconfont图标字体
  13. 复旦大学2019计算机考研,2019年复旦961软件工程专硕考研初试363+复试经验分享
  14. LeetCode hot-100 简单and中等难度,41-50.
  15. 递归边界条件不足的解决方法
  16. Vue3.0 + Echarts 实现地区人口数量分布展示
  17. 微信小程序 - 屏幕适配
  18. 【steam_api.dll下载】steam_api.dll放在哪
  19. 哪里的草图大师sketchup模型可以渲染的呢?caotu66模型展示
  20. Coding and Paper Letter(十九)

热门文章

  1. English trip -- MC(情景课)3 C Do you have a sister?
  2. 一个ip对应多个域名多个ssl证书配置-Nginx实现多域名证书HTTPS
  3. MySQL Connector/C++入门教程(上)
  4. noip2006提高组-金明的预算方案解题报告
  5. oracle添加序列器,Oracle 建表,添加主外键,序列,触发器
  6. axure删除的页面怎么恢复_微信删除好友怎么找回?快速恢复,真的不难
  7. html表白特效源代码_程序员如何在七夕节表白
  8. android+php最佳实践视频,Android和PHP开发最佳实践 PDF 第2版
  9. 哨兵2号波段_分布式框架之高性能:Redis哨兵模式
  10. 收获不止oracle在线,重温《收获不止Oracle》