整体思路

  1. 首先数据源是流式读取文件内容
  2. 对每行句子按照空格切分
  3. 将每个单词都构造为一个Tuple,第一个位置是单词,第二个位置是词频
  4. 按照key(单词)分组,对每个组做聚合(reduce)操作
  5. 将结果输出

文本文件

在maven项目的resources下新建一个文件hello.txt,内容如下:

hello world
hello flink
hello spark
When we have shuffled off this mortal coil
When we have shuffled off this mortal coil
ack

代码如下

package transform;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class WordCountKeyBy {public static void main(String[] args) throws Exception {// 1.创建流式执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2.从文件中读取数据DataStream<String> dataStream = env.readTextFile("src/main/resources/hello.txt");// 执行环境并行度设置3env.setParallelism(3);// 3.按照空格分词DataStream<Tuple2<String, Integer>> sensorStream = dataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {String[] wordString = value.split(" ");for (String wordLine : wordString) {out.collect(new Tuple2<>(wordLine, 1));}}});// 4.分组KeyedStream<Tuple2<String, Integer>, Object> key = sensorStream.keyBy(tuple -> tuple.f0);// 5.聚合SingleOutputStreamOperator<Tuple2<String, Integer>> resultStream = key.sum(1);resultStream.print();//执行env.execute();}
}

附:pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.myorg.quickstart</groupId><artifactId>quickstart</artifactId><version>0.1</version><packaging>jar</packaging><name>Flink Quickstart Job</name><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.13.2</flink.version><target.java.version>1.8</target.java.version><scala.binary.version>2.11</scala.binary.version><maven.compiler.source>${target.java.version}</maven.compiler.source><maven.compiler.target>${target.java.version}</maven.compiler.target><log4j.version>2.12.1</log4j.version></properties><repositories><repository><id>apache.snapshots</id><name>Apache Development Snapshot Repository</name><url>https://repository.apache.org/content/repositories/snapshots/</url><releases><enabled>false</enabled></releases><snapshots><enabled>true</enabled></snapshots></repository></repositories><dependencies><!-- Apache Flink dependencies --><!-- These dependencies are provided, because they should not be packaged into the JAR file. --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><scope>compile</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>compile</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>compile</scope></dependency><!-- Add connector dependencies here. They must be in the default scope (compile). --><!-- Example:<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency>--><!-- Add logging framework, to produce console output when running in the IDE. --><!-- These dependencies are excluded from the application JAR by default. --><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency></dependencies><build><plugins><!-- Java Compiler --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>${target.java.version}</source><target>${target.java.version}</target></configuration></plugin><!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. --><!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><executions><!-- Run shade goal on package phase --><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>org.apache.flink:force-shading</exclude><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>org.apache.logging.log4j:*</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>org.myorg.quickstart.StreamingJob</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins><pluginManagement><plugins><!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. --><plugin><groupId>org.eclipse.m2e</groupId><artifactId>lifecycle-mapping</artifactId><version>1.0.0</version><configuration><lifecycleMappingMetadata><pluginExecutions><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><versionRange>[3.1.1,)</versionRange><goals><goal>shade</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><versionRange>[3.1,)</versionRange><goals><goal>testCompile</goal><goal>compile</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution></pluginExecutions></lifecycleMappingMetadata></configuration></plugin></plugins></pluginManagement></build>
</project>

Flink Java 使用map reduce实现wordcount相关推荐

  1. Spark 实现常用的map reduce功能 (Java版本)

    记录利用spark core的函数,完成一些map reduce功能的练习,spark core有Transformation和Action两种算子,Transformation完成中间转变过程,不会 ...

  2. Hadoop完全分布式搭建过程、maven和eclipse配置hadoop开发环境、配置Map/Reduce Locations、简单wordcount测试!

    Hadoop完全分布式搭建及测试 项目开始前准备工作 1.下载并安装VM workstation pro 15安装包,这里选择: VMware-workstation-full-15.1.0-1359 ...

  3. 使用Mongo Shell和Java驱动程序的MongoDB Map Reduce示例

    Map Reduce is a data processing technique that condenses large volumes of data into aggregated resul ...

  4. 在eclipse使用map reduce编写word count程序生成jar包并在虚拟机运行的步骤

    ---恢复内容开始--- 1.首先准备一个需要统计的单词文件 word.txt,我们的单词是以空格分开的,统计时按照空格分隔即可 hello hadoop hello yarn hello zooke ...

  5. Hadoop Map/Reduce教程

    Hadoop Map/Reduce教程 目的     先决条件     概述     输入与输出     例子:WordCount v1.0         源代码         用法        ...

  6. 一步一步跟我学习hadoop(5)----hadoop Map/Reduce教程(2)

    Map/Reduce用户界面 本节为用户採用框架要面对的各个环节提供了具体的描写叙述,旨在与帮助用户对实现.配置和调优进行具体的设置.然而,开发时候还是要相应着API进行相关操作. 首先我们须要了解M ...

  7. [ZZ]Map/Reduce hadoop 细节

    转自:Venus神庙原文:http://www.cnblogs.com/duguguiyu/archive/2009/02/28/1400278.html 分布式计算(Map/Reduce) 分布式计 ...

  8. Hadoop简介(1):什么是Map/Reduce

    看这篇文章请出去跑两圈,然后泡一壶茶,边喝茶,边看,看完你就对hadoop整体有所了解了. Hadoop简介 Hadoop就是一个实现了Google云计算系统的开源系统,包括并行计算模型Map/Red ...

  9. 用通俗易懂的大白话讲解Map/Reduce原理

    Hadoop简介 Hadoop就是一个实现了Google云计算系统的开源系统,包括并行计算模型Map/Reduce,分布式文件系统HDFS,以及分布式数据库Hbase,同时Hadoop的相关项目也很丰 ...

最新文章

  1. 如何学习:自考小组学习
  2. SAP QM初阶之取样策略如何确定检验批Sample Size?
  3. oracle的listagg函数
  4. Webservice soap wsdl区别之个人见解
  5. Java并发编程的基础-Thread.interrupted
  6. 你好,同学!在云端学习最潮的技术吧!
  7. LeetCode 933.最近的请求次数
  8. Moment.js常见用法总结 1
  9. 卢伟冰暗示Redmi K40轻奢版:搭载联发科天玑1100
  10. 《数据科学概论》教材介绍
  11. 【第135期】游戏策划:给@蒙蒙水雾的简历分析
  12. 广告机-开机自动播放-视频-电影-图片-竖屏-分屏-展示机
  13. github提交时报错:remote: Support for password authentication was removed on August 13, 2021问题解决方案
  14. python开发问卷系统_哪个开源的问卷调查系统最好用?
  15. Java 战国大富翁,中国历史上二十大富豪 个个富可敌国
  16. 啤酒每罐2.3元,饮料每罐1.9元
  17. deli考勤机3960操作手册
  18. RTK ? PPK ?到底该选啥
  19. 这几个算法可视化网站,太牛了!
  20. 【标准文件免费下载】国家标准和行业标准 良心网站推荐

热门文章

  1. 苹果7plus专用计算机,iphone7plus怎么用 iphone7plus使用技巧【详解】
  2. RealFlow翻译教程——海洋波浪
  3. 如何提高Unity Gear VR游戏性能
  4. (C/C++学习)15.C语言字符串和字符数组
  5. Spring Cloud云架构 - commonservice-sso服务搭建(一)
  6. 数据库存入表情符报错问题
  7. 树莓派进阶之路 (029) - 语音识别模块 LD3320(原创)
  8. java - 抽象类、接口、内部类
  9. 运维之我的docker-Dockerfile构建镜像详情
  10. ise和modelsim联合仿真的一些准备