Flink Java 使用map reduce实现wordcount
整体思路
- 首先数据源是流式读取文件内容
- 对每行句子按照空格切分
- 将每个单词都构造为一个
Tuple
,第一个位置是单词,第二个位置是词频 - 按照key(单词)分组,对每个组做聚合(reduce)操作
- 将结果输出
文本文件
在maven项目的resources下新建一个文件hello.txt
,内容如下:
hello world
hello flink
hello spark
When we have shuffled off this mortal coil
When we have shuffled off this mortal coil
ack
代码如下
package transform;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class WordCountKeyBy {public static void main(String[] args) throws Exception {// 1.创建流式执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2.从文件中读取数据DataStream<String> dataStream = env.readTextFile("src/main/resources/hello.txt");// 执行环境并行度设置3env.setParallelism(3);// 3.按照空格分词DataStream<Tuple2<String, Integer>> sensorStream = dataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {String[] wordString = value.split(" ");for (String wordLine : wordString) {out.collect(new Tuple2<>(wordLine, 1));}}});// 4.分组KeyedStream<Tuple2<String, Integer>, Object> key = sensorStream.keyBy(tuple -> tuple.f0);// 5.聚合SingleOutputStreamOperator<Tuple2<String, Integer>> resultStream = key.sum(1);resultStream.print();//执行env.execute();}
}
附:pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.myorg.quickstart</groupId><artifactId>quickstart</artifactId><version>0.1</version><packaging>jar</packaging><name>Flink Quickstart Job</name><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.13.2</flink.version><target.java.version>1.8</target.java.version><scala.binary.version>2.11</scala.binary.version><maven.compiler.source>${target.java.version}</maven.compiler.source><maven.compiler.target>${target.java.version}</maven.compiler.target><log4j.version>2.12.1</log4j.version></properties><repositories><repository><id>apache.snapshots</id><name>Apache Development Snapshot Repository</name><url>https://repository.apache.org/content/repositories/snapshots/</url><releases><enabled>false</enabled></releases><snapshots><enabled>true</enabled></snapshots></repository></repositories><dependencies><!-- Apache Flink dependencies --><!-- These dependencies are provided, because they should not be packaged into the JAR file. --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><scope>compile</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>compile</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>compile</scope></dependency><!-- Add connector dependencies here. They must be in the default scope (compile). --><!-- Example:<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency>--><!-- Add logging framework, to produce console output when running in the IDE. --><!-- These dependencies are excluded from the application JAR by default. --><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency></dependencies><build><plugins><!-- Java Compiler --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>${target.java.version}</source><target>${target.java.version}</target></configuration></plugin><!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. --><!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><executions><!-- Run shade goal on package phase --><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>org.apache.flink:force-shading</exclude><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>org.apache.logging.log4j:*</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>org.myorg.quickstart.StreamingJob</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins><pluginManagement><plugins><!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. --><plugin><groupId>org.eclipse.m2e</groupId><artifactId>lifecycle-mapping</artifactId><version>1.0.0</version><configuration><lifecycleMappingMetadata><pluginExecutions><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><versionRange>[3.1.1,)</versionRange><goals><goal>shade</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><versionRange>[3.1,)</versionRange><goals><goal>testCompile</goal><goal>compile</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution></pluginExecutions></lifecycleMappingMetadata></configuration></plugin></plugins></pluginManagement></build>
</project>
Flink Java 使用map reduce实现wordcount相关推荐
- Spark 实现常用的map reduce功能 (Java版本)
记录利用spark core的函数,完成一些map reduce功能的练习,spark core有Transformation和Action两种算子,Transformation完成中间转变过程,不会 ...
- Hadoop完全分布式搭建过程、maven和eclipse配置hadoop开发环境、配置Map/Reduce Locations、简单wordcount测试!
Hadoop完全分布式搭建及测试 项目开始前准备工作 1.下载并安装VM workstation pro 15安装包,这里选择: VMware-workstation-full-15.1.0-1359 ...
- 使用Mongo Shell和Java驱动程序的MongoDB Map Reduce示例
Map Reduce is a data processing technique that condenses large volumes of data into aggregated resul ...
- 在eclipse使用map reduce编写word count程序生成jar包并在虚拟机运行的步骤
---恢复内容开始--- 1.首先准备一个需要统计的单词文件 word.txt,我们的单词是以空格分开的,统计时按照空格分隔即可 hello hadoop hello yarn hello zooke ...
- Hadoop Map/Reduce教程
Hadoop Map/Reduce教程 目的 先决条件 概述 输入与输出 例子:WordCount v1.0 源代码 用法 ...
- 一步一步跟我学习hadoop(5)----hadoop Map/Reduce教程(2)
Map/Reduce用户界面 本节为用户採用框架要面对的各个环节提供了具体的描写叙述,旨在与帮助用户对实现.配置和调优进行具体的设置.然而,开发时候还是要相应着API进行相关操作. 首先我们须要了解M ...
- [ZZ]Map/Reduce hadoop 细节
转自:Venus神庙原文:http://www.cnblogs.com/duguguiyu/archive/2009/02/28/1400278.html 分布式计算(Map/Reduce) 分布式计 ...
- Hadoop简介(1):什么是Map/Reduce
看这篇文章请出去跑两圈,然后泡一壶茶,边喝茶,边看,看完你就对hadoop整体有所了解了. Hadoop简介 Hadoop就是一个实现了Google云计算系统的开源系统,包括并行计算模型Map/Red ...
- 用通俗易懂的大白话讲解Map/Reduce原理
Hadoop简介 Hadoop就是一个实现了Google云计算系统的开源系统,包括并行计算模型Map/Reduce,分布式文件系统HDFS,以及分布式数据库Hbase,同时Hadoop的相关项目也很丰 ...
最新文章
- 如何学习:自考小组学习
- SAP QM初阶之取样策略如何确定检验批Sample Size?
- oracle的listagg函数
- Webservice soap wsdl区别之个人见解
- Java并发编程的基础-Thread.interrupted
- 你好,同学!在云端学习最潮的技术吧!
- LeetCode 933.最近的请求次数
- Moment.js常见用法总结 1
- 卢伟冰暗示Redmi K40轻奢版:搭载联发科天玑1100
- 《数据科学概论》教材介绍
- 【第135期】游戏策划:给@蒙蒙水雾的简历分析
- 广告机-开机自动播放-视频-电影-图片-竖屏-分屏-展示机
- github提交时报错:remote: Support for password authentication was removed on August 13, 2021问题解决方案
- python开发问卷系统_哪个开源的问卷调查系统最好用?
- Java 战国大富翁,中国历史上二十大富豪 个个富可敌国
- 啤酒每罐2.3元,饮料每罐1.9元
- deli考勤机3960操作手册
- RTK ? PPK ?到底该选啥
- 这几个算法可视化网站,太牛了!
- 【标准文件免费下载】国家标准和行业标准 良心网站推荐
热门文章
- 苹果7plus专用计算机,iphone7plus怎么用 iphone7plus使用技巧【详解】
- RealFlow翻译教程——海洋波浪
- 如何提高Unity Gear VR游戏性能
- (C/C++学习)15.C语言字符串和字符数组
- Spring Cloud云架构 - commonservice-sso服务搭建(一)
- 数据库存入表情符报错问题
- 树莓派进阶之路 (029) - 语音识别模块 LD3320(原创)
- java - 抽象类、接口、内部类
- 运维之我的docker-Dockerfile构建镜像详情
- ise和modelsim联合仿真的一些准备