基本思路:

  1. 首先构造pom.xml
  2. 构造数据源,把数据都写在文件中
  3. 批处理:使用import org.apache.flink.api.java.DataSet;读取数据
  4. 流处理:使用import org.apache.flink.streaming.api.datastream.DataStream;读取数据

首先新建一个maven项目:

1.pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.myorg.quickstart</groupId><artifactId>quickstart</artifactId><version>0.1</version><packaging>jar</packaging><name>Flink Quickstart Job</name><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.13.2</flink.version><target.java.version>1.8</target.java.version><scala.binary.version>2.11</scala.binary.version><maven.compiler.source>${target.java.version}</maven.compiler.source><maven.compiler.target>${target.java.version}</maven.compiler.target><log4j.version>2.12.1</log4j.version></properties><repositories><repository><id>apache.snapshots</id><name>Apache Development Snapshot Repository</name><url>https://repository.apache.org/content/repositories/snapshots/</url><releases><enabled>false</enabled></releases><snapshots><enabled>true</enabled></snapshots></repository></repositories><dependencies><!-- Apache Flink dependencies --><!-- These dependencies are provided, because they should not be packaged into the JAR file. --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><scope>compile</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>compile</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>compile</scope></dependency><!-- Add connector dependencies here. They must be in the default scope (compile). --><!-- Example:<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency>--><!-- Add logging framework, to produce console output when running in the IDE. --><!-- These dependencies are excluded from the application JAR by default. --><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency></dependencies><build><plugins><!-- Java Compiler --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>${target.java.version}</source><target>${target.java.version}</target></configuration></plugin><!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. --><!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><executions><!-- Run shade goal on package phase --><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>org.apache.flink:force-shading</exclude><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>org.apache.logging.log4j:*</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>org.myorg.quickstart.StreamingJob</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins><pluginManagement><plugins><!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. --><plugin><groupId>org.eclipse.m2e</groupId><artifactId>lifecycle-mapping</artifactId><version>1.0.0</version><configuration><lifecycleMappingMetadata><pluginExecutions><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><versionRange>[3.1.1,)</versionRange><goals><goal>shade</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><versionRange>[3.1,)</versionRange><goals><goal>testCompile</goal><goal>compile</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution></pluginExecutions></lifecycleMappingMetadata></configuration></plugin></plugins></pluginManagement></build>
</project>

2.数据源

resources文件夹下新建hello.txt文件,写入:

hello world
hello flink
hello spark
When we have shuffled off this mortal coil
When we have shuffled off this mortal coil
ack

3. 批处理代码

package mytest;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;public class WordCount {public static void main(String[] args) throws Exception {// 1.创建执行环境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// 2.从文件中读取数据DataSet<String> inputDataSet = env.readTextFile("src/main/resources/hello.txt");// 3.对数据集进行处理,按空格分词展开,转换成(word, 1)二元组进行统计DataSet<Tuple2<String, Integer>> resultSet = inputDataSet.flatMap(new MyFlatMapper()).groupBy(0)    // 按照第一个位置的word分组.sum(1);    // 将第二个位置上的数据求和resultSet.print();}// 自定义类,实现FlatMapFunction接口public static class MyFlatMapper implements FlatMapFunction<String, Tuple2<String, Integer>> {//输入的数据类型String,输出是Tuple2<String,Integer>@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {String[] words = value.toLowerCase().split(" ");//分词// 遍历所有word,包成二元组输出for (String word : words) {out.collect(new Tuple2<>(word, 1));}}}
}

4. 流处理代码

package mytest;import com.my_check.WordCount;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class StreamWordCount {public static void main(String[] args) throws Exception {// 创建流处理执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(4); //单机模拟分布式的4台机器// 从文件中读取数据DataStream<String> inputDataStream = env.readTextFile("src/main/resources/hello.txt");// 基于数据流进行转换计算DataStream<Tuple2<String, Integer>> resultStream =inputDataStream.flatMap(new WordCount.MyFlatMapper()).keyBy(tuple -> tuple.f0).sum(1);// 打印resultStream.print();// 执行任务env.execute();}
}

Flink java wordcount案例(批处理、流处理)相关推荐

  1. Flink中wordCount之批处理和流处理(2)

    (1)批处理 package com.dajiangtai.helloword.batch;import org.apache.flink.api.common.JobExecutionResult; ...

  2. Flink java模拟生成自定义流式数据

    思路如下: 定义一个POJO类,注意flink里使用的类必须有一个无参的构造方法 自定义DataSource实现SourceFunction接口 使用ctx.collect()传入想要发送的数据就可以 ...

  3. mac下flink的wordcount案例

    文章目录 1. 环境准备 2. 启动 3. 编写代码 1. 环境准备 Mac下安装Flink的local模式 2. 启动 lcc@lcc flink-1.0.2$ bin/start-local.sh ...

  4. 离线搭建Flink项目--WordCount批处理

    1.IDEA创建java项目,并导入flink相关jar包. 说明:flink的jar包来自flink安装包. 2.Flink安装包下载 (1)flink的官网:https://flink.apach ...

  5. intellij运行flink的wordcount实验-Java版本

    注意哈,intellij运行wordcount这个并不属于flink集群中的任何一种模式, 这个属于java应用方式提交,不需要启动任何flink集群. ####################### ...

  6. 复习Java字节流_字符流使用及案例

    字节流_字符流 主要内容 IO流 字节流 字符流 异常处理 Properties 第一章 IO概述 1.1 什么是IO 生活中,你肯定经历过这样的场景.当你编辑一个文本文件,忘记了ctrl+s ,可能 ...

  7. java基础学习_IO流03_字符流、IO流小结、案例_day21总结

    java基础学习_IO流03_字符流.IO流小结.案例_day21总结 ================================================================ ...

  8. 第一天:什么是Flink、WordCount入门、Flink安装、并行度

    1. 初识 Flink 在当前数据量激增的时代,各种业务场景都有大量的业务数据产生,对于这些不断产的数据应该如何进行有效的处理,成为当下大多数公司所面临的问题.目前比较流行的大数据处理引擎 Apach ...

  9. Flink-Java版单词计数(批处理流处理)

    创建工程 pom.xml文件依赖如下: <dependencies><dependency><groupId>org.apache.flink</groupI ...

最新文章

  1. alert在asp.net中如何使用??
  2. steam你所在的国家不允许看到此内容_Steam德国屏蔽“仅限成人”标签 众多3A大作可能被禁...
  3. 找到一个或多个多重定义的符号
  4. 改善DataGrid的默認分頁使其更友好
  5. SQL——字段分组合并
  6. Mono 2.0正式发布了
  7. Oracle 控制文件管理
  8. JavaScript写一个能遍历对象和数组的通用forEach函数
  9. ros基础知识(1)
  10. 排列和组合、以及数列(五)
  11. python docx table 边框_使用pythondocx指定表中的边框外观
  12. HeadFirstJava——3_变量
  13. Android逆向Unity3D——XXX快跑破解
  14. ADNI数据库数据集下载权限申请
  15. Aggressive cows题目翻译
  16. DNA甲基化芯片探针的P值如何计算
  17. 数据报表、数据分析、数据挖掘和商业智能,是什么关系?
  18. 鹰眼系统原理_飞思卡尔智能车一:山外鹰眼摄像头使用原理
  19. 线条的样式solid dotted dashed
  20. 学习这篇总结后,你也能做出天天快报一样的推荐系统

热门文章

  1. NVIDIA NVLink技术
  2. LintCode,hihoCoder,LeetCode有什么区别?
  3. ansible安装和基本使用
  4. 华为虚拟化Fusionphere中VRM重启
  5. 专访黄翀:东方航空到底用MongoDB做了什么,技术选型为何花落MongoDB?
  6. 如何从Alfresco中提取Language Pack
  7. python“-o”命令更改存储位置_程序员的狂欢地一般人的超大存储免费空间和网站空间你要会用用...
  8. 单片机STM8S测量电压电路_单片机设计的胶带输送机智能模糊检测系统,准确性高,胶带寿命长...
  9. mgg mysql_MYSQL基础命令
  10. html让图片自动旋转360,html5 canvas 360图片旋转制作抽奖转盘代码