Flink java 自定义reduce函数,以wordcount为例
maven项目的文本文件与pom.xml
配置请参考:https://blog.csdn.net/weixin_35757704/article/details/120555968
同样以wordcount为例
package transform;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class WordCountKeyBy {public static void main(String[] args) throws Exception {// 1.创建流式执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2.从文件中读取数据DataStream<String> dataStream = env.readTextFile("src/main/resources/hello.txt");// 执行环境并行度设置3env.setParallelism(3);// 3.按照空格分词DataStream<Tuple2<String, Integer>> sensorStream = dataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {String[] wordString = value.split(" ");for (String wordLine : wordString) {out.collect(new Tuple2<>(wordLine, 1));}}});// 4.分组KeyedStream<Tuple2<String, Integer>, Object> key = sensorStream.keyBy(tuple -> tuple.f0);// 5.聚合SingleOutputStreamOperator<Tuple2<String, Integer>> resultStream = key.reduce(new ReduceFunction<Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {return new Tuple2<String, Integer>(value1.f0, value1.f0.length() + value1.f1);}});resultStream.print();//执行env.execute();}
}
在上面的第5步为自定义的聚合操作,其中:reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2)
中value1
为旧有的状态,value2
为新输入的状态;
上面的代码中Tuple
第一个位置返回原有的单词,而Tuple
第二个位置每次都加一次当前单词的长度
Flink java 自定义reduce函数,以wordcount为例相关推荐
- python写mapreduce_用python写MapReduce函数——以WordCount为例
使用 python 写 MapReduce 的 " 诀窍 " 是利用 Hadoop 流的 API ,通过 STDIN( 标准输入 ) . STDOUT( 标准输出 ) 在 Map ...
- 013 Mapreduce相关概念WordCount框架搭建WordCount的map和reduce函数实现 WordCount的驱动类编写WordCount测试MapReduce数学案例运算 AWK
Mapreduce的相关概念 分布式并行离线计算框架MapReduce 即 如果文件里有三句话 hadoop is nice hadoop good hadoop is better 那么Map做的工 ...
- Flink SQL自定义聚合函数
<2021年最新版大数据面试题全面开启更新> 本篇幅介绍Flink Table/SQL中如何自定义一个聚合函数,介绍其基本用法.撤回定义以及与源码结合分析每个方法的调用位置. 基本使用 F ...
- java自定义equals函数和hashCode函数
所有类都继承自Object类,他所有的非final方法:equals,hashCode, toString, clone 和 finalize,它们都有通用约定. 我们在覆盖这些方法的时候需要遵循这些 ...
- java自定义排序函数_JAVA中sort函数的 自定义排序 cmp函数的写法 【java】【cmp】...
查了很多博客,了解挺多有关cmp函数的问题.所以在这里来总结一下写法.. 我们通常可以用这个cmp函数来改变Arrays.sort()函数的默认排序方法.这里我们以升序排序为例. 基本方法 int c ...
- java 自定义函数_jxTMS--java与python的协作
jxTMS:低成本快速定制的业务系统个人开发平台. java与python的协作 在jxTMS中,有两种java和python的协作模式: 叠加:python运行于java之上,即java准备好环境, ...
- java 自定义函数的调用_Java/Android中的函数调用回调函数自定义回调函数
在做Android自定义控件时遇到要自定义回调函数的问题,想想自己还暂时没有那么精深的技术,赶紧返过头回来再重新研究Java中回调函数的问题.然而不幸的是,网上太多杂乱的帖子和博客都是转来转去,而且都 ...
- jsp 将java对象转json对象 (自定义EL函数)
功能:在 jsp 页面将 java对象 转换为 json对象 (使用自定义EL函数实现) 步骤: 1. 创建 JSP EL 工具类 2. 创建 tld 文件 3. 配置 web.xml 文件 4. 完 ...
- java调mongodb自定义函数,自定义UDF函数,从hive保存到mongodb
(可以通过idea工具调试UDF函数,第二步中会提供参考) 一.自定义UDF函数: 1.首先是pom.xml文件 xmlns:xsi="http://www.w3.org/2001/XMLS ...
最新文章
- 【Qt】Qt中调用python接口
- Python 之父立 Flag:明年要把 Python 速度提高 2 倍!
- linux+添加git+ssh+keys,为github帐号添加SSH keys(Linux和Windows)(示例代码)
- CTFshow 命令执行 web124
- APR-Util 1.5.1 发布
- matlab在linux下面的相对路径的写法
- 企业级业务系统开发实战-序言
- 045 Android Studio 常用应用
- 图像处理自学(五):CAMERA驱动软件硬件架构V4L2
- 使用cardview和recycleview时碰到的一些问题
- VC++ 源码实现通达信公式管理器
- proj编译linux,安装OpenProj配置中文显示
- XSS靶场练习 https://xss.haozi.me
- halcon19.11深度学习关于分类入门案例
- Windows自带虚拟机的使用方法
- Creo 工程图 尺寸 消失
- win7截屏快捷键未在计算机上运行,修复win7“截图工具当前未在计算机上运行”的方法...
- WinRAR分卷压缩与解压缩
- ZooKeeper 的 Watch 机制是什么?
- C/C++程序员面试指南