Fold算子:将数据流的每一次输出进行滚动叠加,合并输出结果

示例环境

java.version: 1.8.x
flink.version: 1.11.1

示例数据源(项目码云下载)

Flink 系例 之 搭建开发环境与数据

Fold.java

import com.flink.examples.DataSource;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.List;/*** @Description Fold算子:将数据流的每一次输出进行滚动叠加,合并输出结果* (与Reduce的区别是,Reduce是拿前一次聚合结果累加后一次的并输出数据流;Fold是直接将当前数据对象追加到前一次叠加结果上并输出数据流)*/
public class Fold {/*** 遍历集合,分区打印每一次滚动叠加的结果(示例:按性别分区,按排序,未位追加输出)* @param args* @throws Exception*/public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(4);List<Tuple3<String,String,Integer>> tuple3List = DataSource.getTuple3ToList();//注意:使用Integer进行分区时,会导致分区结果不对,转换成String类型输出key即可正确输出KeyedStream<Tuple3<String,String,Integer>, String> keyedStream = env.fromCollection(tuple3List).keyBy(new KeySelector<Tuple3<String,String,Integer>, String>() {@Overridepublic String getKey(Tuple3<String, String, Integer> tuple3) throws Exception {//f1为性别字段,以相同f1值(性别)进行分区return String.valueOf(tuple3.f1);}});SingleOutputStreamOperator<String> result = keyedStream.fold("同学:", new FoldFunction<Tuple3<String, String, Integer>, String>() {@Overridepublic String fold(String s, Tuple3<String, String, Integer> tuple3) throws Exception {if (s.startsWith("男") || s.startsWith("女")){return s + tuple3.f0 + "、";} else {return (tuple3.f1.equals("man") ? "男" : "女") + s + tuple3.f0 + "、";}}});result.print();env.execute("flink Fold job");}
}

打印结果

2> 男同学:张三、
2> 男同学:张三、王五、
2> 男同学:张三、王五、吴八、
1> 女同学:李四、
1> 女同学:李四、刘六、
1> 女同学:李四、刘六、伍七、

Flink 系例 之 Fold相关推荐

  1. Flink 系例 之 CountWindowAll

    countWindowAll 数量窗口 (不分区数量滚动窗口[滑动窗口与滚动窗口的区别,在于滑动窗口会有数据元素重叠可能,而滚动窗口不存在元素重叠]) 示例环境 java.version: 1.8.x ...

  2. Flink 系例 之 Connectors 连接 Redis

    通过使用 Flink DataStream Connectors 数据流连接器连接到 Redis 缓存数据库,并提供数据流输入与输出操作: 示例环境 java.version: 1.8.x flink ...

  3. Flink 系例 之 Connectors 连接 Kafka

    通过使用 Flink DataStream Connectors 数据流连接器连接到 ElasticSearch 搜索引擎的文档数据库 Index,并提供数据流输入与输出操作: 示例环境 java.v ...

  4. Flink 系例 之 Connectors 连接 ElasticSearch

    通过使用 Flink DataStream Connectors 数据流连接器连接到 ElasticSearch 搜索引擎的文档数据库 Index,并提供数据流输入与输出操作: 示例环境 java.v ...

  5. Flink 系例 之 Project

    Project算子:从数据流的元数组中,重新排例参数并指定不同的下标位,返回新的数据流 示例环境 java.version: 1.8.x flink.version: 1.11.1 示例数据源 (项目 ...

  6. Flink 系例 之 Connectors 连接 MySql

    通过使用 Flink DataStream Connectors 数据流连接器连接到 Mysql 数据源,并基于 JDBC 提供数据流输入与输出操作: 示例环境 java.version: 1.8.x ...

  7. Flink 系例 之 DataStream Connectors 与 示例模块

    官方介绍  Flink 中的 API Flink 为流式 / 批式处理应用程序的开发提供了不同级别的抽象. Flink API 最底层的抽象为有状态实时流处理.其抽象实现是 Process Funct ...

  8. java dataset flatmap_Flink 系例 之 FlatMap

    FlatMap算子:将数据流一行按逻辑或规则拆分成0行或多行输出 示例环境 java.version: 1.8.x flink.version: 1.11.1 示例数据源 FlatMap.java i ...

  9. vba 单元格 一系例操作

    1.单元格取值赋值 Private Sub CommandButton1_Click() Range("A1") = "111" Range("A2& ...

  10. python 案例串接_Python基础系例--字典串操作

    字符串 字符串是由数字,字母.下划线组成的一串字符 创建字符串,可以使用单引号和双引号: var1 ='Hello World!' var2="Hello World!" 访问字符 ...

最新文章

  1. PHP Memcached应用实现代码
  2. Middleware课程01-概述
  3. python中 __name__及__main()__的妙处02
  4. 福玛特机器人评测_深度爆料扫地机器人地宝福玛特D820评测好不好?怎么样呢?内幕曝光测评...
  5. 浅谈Python和VC中的编码问题(转)
  6. boost::depth_first_search用法的测试程序
  7. 20155222卢梓杰 实验四 恶意代码分析
  8. Wildfly 8.0通过其JAXRS 2.0实现提供了无缝的JSON支持。
  9. git 的安装以及使用:是一个开源的分布式版本控制系统,可以对项目进行版本管理。 早期是linux之父用来管理linux系统源代码的(linux是和windows一样操作系统 开源免费的操作...
  10. 使用VirtualEnv在Mac安装TensorFlow
  11. Identity Server 4 原理和实战(完结)_----选看 OAuth 2.0 简介(上)
  12. Spring AOP无法拦截Controller中的方法
  13. 怎么将webp格式转换成jpg?详细步骤
  14. Android9一键安全手机
  15. emoji.php,简单的处理emoji的PHP类库
  16. javplayer 使用教程_SAI教程SAI绘画软件下载SAI2上色教程
  17. u盘pe启动盘怎么制作?
  18. win7右键反应特别慢的问题
  19. 出一本自己的书,是一种怎样的体验?
  20. 消失的梯度问题(vanishing gradient problem)

热门文章

  1. 安规电容可靠性试验项目、试验方法及要求
  2. avr单片机c语言程序设计,avr单片机c语言编程风格介绍
  3. 电子密码锁的设计(Verilog HDL实现)
  4. 怎么进计算机更新失败,系统更新失败无法进入系统怎么办?
  5. 线性代数 【23】 概念的深入01 - Points坐标点和Vectors向量
  6. RHCE7 -- systemctl命令
  7. 一台服务器如何做两个站点,一台服务器实现多个Web站点
  8. 蓝兔子现在有一个字符串,如果一个字符串从左向右看和从右向左看是一样的,则称为回文串。请编写程序,帮助蓝兔子判断输入的字符串是否是回文串。
  9. JAVA-CPU飙高问题排查
  10. 初始vue脚手架的项目文件中mian.js文件