maven项目的文本文件与pom.xml配置请参考:https://blog.csdn.net/weixin_35757704/article/details/120555968
同样以wordcount为例

package transform;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class WordCountKeyBy {public static void main(String[] args) throws Exception {// 1.创建流式执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2.从文件中读取数据DataStream<String> dataStream = env.readTextFile("src/main/resources/hello.txt");// 执行环境并行度设置3env.setParallelism(3);// 3.按照空格分词DataStream<Tuple2<String, Integer>> sensorStream = dataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {String[] wordString = value.split(" ");for (String wordLine : wordString) {out.collect(new Tuple2<>(wordLine, 1));}}});// 4.分组KeyedStream<Tuple2<String, Integer>, Object> key = sensorStream.keyBy(tuple -> tuple.f0);// 5.聚合SingleOutputStreamOperator<Tuple2<String, Integer>> resultStream = key.reduce(new ReduceFunction<Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {return new Tuple2<String, Integer>(value1.f0, value1.f0.length() + value1.f1);}});resultStream.print();//执行env.execute();}
}

在上面的第5步为自定义的聚合操作,其中:reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2)value1为旧有的状态,value2为新输入的状态;

上面的代码中Tuple第一个位置返回原有的单词,而Tuple第二个位置每次都加一次当前单词的长度

Flink java 自定义reduce函数,以wordcount为例相关推荐

  1. python写mapreduce_用python写MapReduce函数——以WordCount为例

    使用 python 写 MapReduce 的 " 诀窍 " 是利用 Hadoop 流的 API ,通过 STDIN( 标准输入 ) . STDOUT( 标准输出 ) 在 Map ...

  2. 013 Mapreduce相关概念WordCount框架搭建WordCount的map和reduce函数实现 WordCount的驱动类编写WordCount测试MapReduce数学案例运算 AWK

    Mapreduce的相关概念 分布式并行离线计算框架MapReduce 即 如果文件里有三句话 hadoop is nice hadoop good hadoop is better 那么Map做的工 ...

  3. Flink SQL自定义聚合函数

    <2021年最新版大数据面试题全面开启更新> 本篇幅介绍Flink Table/SQL中如何自定义一个聚合函数,介绍其基本用法.撤回定义以及与源码结合分析每个方法的调用位置. 基本使用 F ...

  4. java自定义equals函数和hashCode函数

    所有类都继承自Object类,他所有的非final方法:equals,hashCode, toString, clone 和 finalize,它们都有通用约定. 我们在覆盖这些方法的时候需要遵循这些 ...

  5. java自定义排序函数_JAVA中sort函数的 自定义排序 cmp函数的写法 【java】【cmp】...

    查了很多博客,了解挺多有关cmp函数的问题.所以在这里来总结一下写法.. 我们通常可以用这个cmp函数来改变Arrays.sort()函数的默认排序方法.这里我们以升序排序为例. 基本方法 int c ...

  6. java 自定义函数_jxTMS--java与python的协作

    jxTMS:低成本快速定制的业务系统个人开发平台. java与python的协作 在jxTMS中,有两种java和python的协作模式: 叠加:python运行于java之上,即java准备好环境, ...

  7. java 自定义函数的调用_Java/Android中的函数调用回调函数自定义回调函数

    在做Android自定义控件时遇到要自定义回调函数的问题,想想自己还暂时没有那么精深的技术,赶紧返过头回来再重新研究Java中回调函数的问题.然而不幸的是,网上太多杂乱的帖子和博客都是转来转去,而且都 ...

  8. jsp 将java对象转json对象 (自定义EL函数)

    功能:在 jsp 页面将 java对象 转换为 json对象 (使用自定义EL函数实现) 步骤: 1. 创建 JSP EL 工具类 2. 创建 tld 文件 3. 配置 web.xml 文件 4. 完 ...

  9. java调mongodb自定义函数,自定义UDF函数,从hive保存到mongodb

    (可以通过idea工具调试UDF函数,第二步中会提供参考) 一.自定义UDF函数: 1.首先是pom.xml文件 xmlns:xsi="http://www.w3.org/2001/XMLS ...

最新文章

  1. 【Qt】Qt中调用python接口
  2. Python 之父立 Flag:明年要把 Python 速度提高 2 倍!
  3. linux+添加git+ssh+keys,为github帐号添加SSH keys(Linux和Windows)(示例代码)
  4. CTFshow 命令执行 web124
  5. APR-Util 1.5.1 发布
  6. matlab在linux下面的相对路径的写法
  7. 企业级业务系统开发实战-序言
  8. 045 Android Studio 常用应用
  9. 图像处理自学(五):CAMERA驱动软件硬件架构V4L2
  10. 使用cardview和recycleview时碰到的一些问题
  11. VC++ 源码实现通达信公式管理器
  12. proj编译linux,安装OpenProj配置中文显示
  13. XSS靶场练习 https://xss.haozi.me
  14. halcon19.11深度学习关于分类入门案例
  15. Windows自带虚拟机的使用方法
  16. Creo 工程图 尺寸 消失
  17. win7截屏快捷键未在计算机上运行,修复win7“截图工具当前未在计算机上运行”的方法...
  18. WinRAR分卷压缩与解压缩
  19. ZooKeeper 的 Watch 机制是什么?
  20. C/C++程序员面试指南

热门文章

  1. 安卓平台OpenGL ES的调用
  2. 如何学好python基础_零基础如何学好Python开发?
  3. python基础的学习
  4. 嗖嗖移动业务大厅代码_移动云应用性能监控,掌控云时代的多变应用
  5. 用html编写勾股定理,一种勾股定理演示器的制作方法
  6. hset php,HSET命令_视频讲解_用法示例-redis编程词典-php中文网
  7. HLSL bytecode to GLSL编译器的第一步
  8. Early Z Culling 优化
  9. windows 2008 r2 AD域控服务器部署
  10. XTU 1252 Defense Tower