Apache Beam WordCount案例编写
在pom.xml 中 导入maven依赖
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>word-count-beam</artifactId><version>1.0-SNAPSHOT</version><packaging>jar</packaging><properties><beam.version>2.26.0</beam.version><bigquery.version>v2-rev20200719-1.30.10</bigquery.version><google-api-client.version>1.30.10</google-api-client.version><guava.version>25.1-jre</guava.version><hamcrest.version>2.1</hamcrest.version><jackson.version>2.10.2</jackson.version><joda.version>2.10.5</joda.version><junit.version>4.13-beta-3</junit.version><libraries-bom.version>13.2.0</libraries-bom.version><maven-compiler-plugin.version>3.7.0</maven-compiler-plugin.version><maven-exec-plugin.version>1.6.0</maven-exec-plugin.version><maven-jar-plugin.version>3.0.2</maven-jar-plugin.version><maven-shade-plugin.version>3.1.0</maven-shade-plugin.version><mockito.version>3.0.0</mockito.version><pubsub.version>v1-rev20200713-1.30.10</pubsub.version><slf4j.version>1.7.30</slf4j.version><spark.version>2.4.7</spark.version><hadoop.version>2.8.5</hadoop.version><maven-surefire-plugin.version>3.0.0-M5</maven-surefire-plugin.version><nemo.version>0.1</nemo.version><flink.artifact.name>beam-runners-flink-1.10</flink.artifact.name></properties><repositories><repository><id>apache.snapshots</id><name>Apache Development Snapshot Repository</name><url>https://repository.apache.org/content/repositories/snapshots/</url><releases><enabled>false</enabled></releases><snapshots><enabled>true</enabled></snapshots></repository></repositories><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>${maven-compiler-plugin.version}</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-surefire-plugin</artifactId><version>${maven-surefire-plugin.version}</version><configuration><parallel>all</parallel><threadCount>4</threadCount><redirectTestOutputToFile>true</redirectTestOutputToFile></configuration><dependencies><dependency><groupId>org.apache.maven.surefire</groupId><artifactId>surefire-junit47</artifactId><version>${maven-surefire-plugin.version}</version></dependency></dependencies></plugin><!-- Ensure that the Maven jar plugin runs before the Mavenshade plugin by listing the plugin higher within the file. --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-jar-plugin</artifactId><version>${maven-jar-plugin.version}</version></plugin><!--Configures `mvn package` to produce a bundled jar ("fat jar") for runnersthat require this for job submission to a cluster.--><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>${maven-shade-plugin.version}</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><finalName>${project.artifactId}-bundled-${project.version}</finalName><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/LICENSE</exclude><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/></transformers></configuration></execution></executions></plugin></plugins><pluginManagement><plugins><plugin><groupId>org.codehaus.mojo</groupId><artifactId>exec-maven-plugin</artifactId><version>${maven-exec-plugin.version}</version><configuration><cleanupDaemonThreads>false</cleanupDaemonThreads></configuration></plugin></plugins></pluginManagement></build><profiles><profile><id>direct-runner</id><activation><activeByDefault>true</activeByDefault></activation><!-- Makes the DirectRunner available when running a pipeline. --><dependencies><dependency><groupId>org.apache.beam</groupId><artifactId>beam-runners-direct-java</artifactId><version>${beam.version}</version><scope>runtime</scope></dependency></dependencies></profile><profile><id>portable-runner</id><activation><activeByDefault>true</activeByDefault></activation><!-- Makes the PortableRunner available when running a pipeline. --><dependencies><dependency><groupId>org.apache.beam</groupId><artifactId>beam-runners-portability-java</artifactId><version>${beam.version}</version><scope>runtime</scope></dependency></dependencies></profile><profile><id>dataflow-runner</id><!-- Makes the DataflowRunner available when running a pipeline. --><dependencies><dependency><groupId>org.apache.beam</groupId><artifactId>beam-runners-google-cloud-dataflow-java</artifactId><version>${beam.version}</version><scope>runtime</scope></dependency></dependencies></profile><profile><id>flink-runner</id><!-- Makes the FlinkRunner available when running a pipeline. --><dependencies><dependency><groupId>org.apache.beam</groupId><!-- Please see the Flink Runner page for an up-to-date listof supported Flink versions and their artifact names:https://beam.apache.org/documentation/runners/flink/ --><artifactId>${flink.artifact.name}</artifactId><version>${beam.version}</version><scope>runtime</scope></dependency></dependencies></profile><profile><id>spark-runner</id><!-- Makes the SparkRunner available when running a pipeline. Additionally,overrides some Spark dependencies to Beam-compatible versions. --><properties><netty.version>4.1.17.Final</netty.version></properties><dependencies><dependency><groupId>org.apache.beam</groupId><artifactId>beam-runners-spark</artifactId><version>${beam.version}</version><scope>runtime</scope></dependency><dependency><groupId>org.apache.beam</groupId><artifactId>beam-sdks-java-io-hadoop-file-system</artifactId><version>${beam.version}</version><scope>runtime</scope></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.11</artifactId><version>${spark.version}</version><scope>runtime</scope><exclusions><exclusion><groupId>org.slf4j</groupId><artifactId>jul-to-slf4j</artifactId></exclusion></exclusions></dependency><dependency><groupId>com.fasterxml.jackson.module</groupId><artifactId>jackson-module-scala_2.11</artifactId><version>${jackson.version}</version><scope>runtime</scope></dependency><!-- [BEAM-3519] GCP IO exposes netty on its API surface, causing conflicts with runners --><dependency><groupId>org.apache.beam</groupId><artifactId>beam-sdks-java-io-google-cloud-platform</artifactId><version>${beam.version}</version><exclusions><exclusion><groupId>io.grpc</groupId><artifactId>grpc-netty</artifactId></exclusion><exclusion><groupId>io.netty</groupId><artifactId>netty-handler</artifactId></exclusion></exclusions></dependency></dependencies></profile><profile><id>samza-runner</id><dependencies><dependency><groupId>org.apache.beam</groupId><artifactId>beam-runners-samza</artifactId><version>${beam.version}</version><scope>runtime</scope></dependency></dependencies></profile><profile><id>twister2-runner</id><dependencies><dependency><groupId>org.apache.beam</groupId><artifactId>beam-runners-twister2</artifactId><version>${beam.version}</version><scope>runtime</scope></dependency></dependencies></profile><profile><id>nemo-runner</id><dependencies><dependency><groupId>org.apache.nemo</groupId><artifactId>nemo-compiler-frontend-beam</artifactId><version>${nemo.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>${hadoop.version}</version><exclusions><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId></exclusion><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></exclusion></exclusions></dependency></dependencies></profile><profile><id>jet-runner</id><dependencies><dependency><groupId>org.apache.beam</groupId><artifactId>beam-runners-jet</artifactId><version>${beam.version}</version><scope>runtime</scope></dependency></dependencies></profile></profiles><dependencies><!-- Adds a dependency on the Beam SDK. --><dependency><groupId>org.apache.beam</groupId><artifactId>beam-sdks-java-core</artifactId><version>${beam.version}</version></dependency><!-- Adds a dependency on the Beam Google Cloud Platform IO module. --><dependency><groupId>org.apache.beam</groupId><artifactId>beam-sdks-java-io-google-cloud-platform</artifactId><version>${beam.version}</version></dependency><!-- Dependencies below this line are specific dependencies needed by the examples code. --><dependency><groupId>com.google.api-client</groupId><artifactId>google-api-client</artifactId><version>${google-api-client.version}</version><exclusions><!-- Exclude an old version of guava that is being pulledin by a transitive dependency of google-api-client --><exclusion><groupId>com.google.guava</groupId><artifactId>guava-jdk5</artifactId></exclusion></exclusions></dependency><dependency><groupId>com.google.apis</groupId><artifactId>google-api-services-bigquery</artifactId><version>${bigquery.version}</version><exclusions><!-- Exclude an old version of guava that is being pulledin by a transitive dependency of google-api-client --><exclusion><groupId>com.google.guava</groupId><artifactId>guava-jdk5</artifactId></exclusion></exclusions></dependency><dependency><groupId>com.google.http-client</groupId><artifactId>google-http-client</artifactId><exclusions><!-- Exclude an old version of guava that is being pulledin by a transitive dependency of google-api-client --><exclusion><groupId>com.google.guava</groupId><artifactId>guava-jdk5</artifactId></exclusion></exclusions></dependency><dependency><groupId>com.google.apis</groupId><artifactId>google-api-services-pubsub</artifactId><version>${pubsub.version}</version><exclusions><!-- Exclude an old version of guava that is being pulledin by a transitive dependency of google-api-client --><exclusion><groupId>com.google.guava</groupId><artifactId>guava-jdk5</artifactId></exclusion></exclusions></dependency><dependency><groupId>joda-time</groupId><artifactId>joda-time</artifactId><version>${joda.version}</version></dependency><!-- Add slf4j API frontend binding with JUL backend --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>${slf4j.version}</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-jdk14</artifactId><version>${slf4j.version}</version><!-- When loaded at runtime this will wire up slf4j to the JUL backend --><scope>runtime</scope></dependency><!-- Hamcrest and JUnit are required dependencies of PAssert,which is used in the main code of DebuggingWordCount example. --><dependency><groupId>org.hamcrest</groupId><artifactId>hamcrest-core</artifactId><version>${hamcrest.version}</version></dependency><dependency><groupId>org.hamcrest</groupId><artifactId>hamcrest-library</artifactId><version>${hamcrest.version}</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>${junit.version}</version></dependency><!-- The DirectRunner is needed for unit tests. --><dependency><groupId>org.apache.beam</groupId><artifactId>beam-runners-direct-java</artifactId><version>${beam.version}</version><scope>test</scope></dependency><dependency><groupId>org.mockito</groupId><artifactId>mockito-core</artifactId><version>${mockito.version}</version><scope>test</scope></dependency></dependencies><dependencyManagement><dependencies><dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>${guava.version}</version> <!-- "-jre" for Java 8 or higher --></dependency><!-- GCP libraries BOM sets the version for google http client --><dependency><groupId>com.google.cloud</groupId><artifactId>libraries-bom</artifactId><version>${libraries-bom.version}</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement>
</project>
编写Java代码案例:
public class WordCount {public static void main(String[] args) {// 创建PipelineOptions, 对Pipeline 进行必要的配置PipelineOptions options = PipelineOptionsFactory.create();// 创建 pipeline, 会负责构建一个数据处理流水线所需的数据处理DAG,以及这个DAG所需要进行的transformPipeline p = Pipeline.create(options);//gs://apache-beam-samples/shakespeare/*// 读取文本数据PCollection<String> lines = p.apply(TextIO.read().from("data/words.txt"));// 输出还是一个 PCollection,但是每个元素变成了单词。// 正则表达式 [^\p{L}]+ :非 Unicode Letters 所以它会按空格或者标点符号等把词分开。PCollection<String> words = lines.apply("ExtractWords",FlatMapElements.into(TypeDescriptors.strings()).via((String word) -> Arrays.asList(word.split("[^\\p{L}]+"))));// 使用 Beam SDK 提供的 Count Transform。Count Transform 会把任意一个 PCollection 转换成有 key/value 的组合// key 是原来 PCollection 中的非重复的元素,value 则是元素出现的次数。PCollection<KV<String, Long>> counts = words.apply(Count.<String>perElement());// 输出格式化: Transform 会把刚才的 key/value 组成的 PCollection 转换成我们想要的输出格式,方便我们输出词频PCollection<String> formatted = counts.apply("FormatResults",MapElements.into(TypeDescriptors.strings()).via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue()));// TextIO.Write 用来把最终的 PCollection 写进文本文档。formatted.apply(TextIO.write().to("data/wordcounts.txt"));// 运行 Pipelinep.run().waitUntilFinish();}
Apache Beam WordCount案例编写相关推荐
- Apache Beam 使用指南(一)
Apache Beam 一.概述 更多 Apache Beam 代码案例:https://github.com/xiye50070/Apache-Beam-Model.git Apache Beam是 ...
- Apache Beam实战指南 | 玩转KafkaIO与Flink
AI前线导读:本文是 Apache Beam实战指南系列文章 的第二篇内容,将重点介绍 Apache Beam与Flink的关系,对Beam框架中的KafkaIO和Flink源码进行剖析,并结合应用示 ...
- WordCount案例
WordCount案例 需求 1. 需求说明 2. 文件 案例分析 1.需求分析 2.输入数据 3.期望输出数据 4.Mapper类 5. Reducer类 6. Driver类 代码实现 1. 编写 ...
- Apache Beam 是什么,它为什么比其他选择更受欢迎?
1. 概述 在本教程中,我们将介绍 Apache Beam 并探讨其基本概念.我们将首先演示使用 Apache Beam 的用例和好处,然后介绍基本概念和术语.之后,我们将通过一个简单的例子来说明 A ...
- MapReduce流程(WordCount案例实现)
文章目录 1 MapReduce概述 设计构思 实例进程 实例进程分类 完整执行过程 总结 2 MapReduce编程规范 Map阶段2个步骤 Shuffle阶段4个步骤 Reduce阶段2个步骤 3 ...
- MapReduce之WordCount案例
前言 学习大数据框架通常都是从wordcount案例开始的,也是学习框架的基础,wordcount虽然简单,如果能彻底搞清楚其运行原理,对后续深入学习和掌握MapReduce非常有帮助的,本篇以一个w ...
- MapReduce入门(一)—— MapReduce概述 + WordCount案例实操
MapReduce入门(一)-- MapReduce概述 文章目录 MapReduce入门(一)-- MapReduce概述 1.1 MapReduce 定义 1.2 MapReduce 优缺点 1. ...
- 2. WordCount案例实操
文章目录 WordCount案例实操 1. 官方WordCount源码 2. 常用数据序列化类型 3. MapReduce编程规范 3.1 Mapper阶段 3.2 Reducer阶段 3.3 Dri ...
- Apache Beam 架构原理及应用实践
导读:大家好,很荣幸跟大家分享 Apache Beam 架构原理及应用实践.讲这门课之前大家可以想想,从进入 IT 行业以来,不停的搬运数据,不管职务为前端,还是后台服务器端开发.随着这两年科技的发展 ...
最新文章
- 阿里再推社交“Real如我”,是电商巨头的流量焦虑
- windows10自动填充密码开机自动登陆
- 关于使用layui中的tree的一个坑
- 工作170:删除做个判断操作 成功删除 取消取消
- 我的家庭私有云计划-10
- bzoj1334 [Baltic2008]Elect
- (转载)程序员文史综合题目一(附答案)
- MySQL学习笔记( 整理中)
- jstree静态生成树并为树添加触发事件
- 自媒体时事热点类素材哪里找?推荐这3个网站
- 阿里巴巴python开发面试_在阿里巴巴面试,是什么样的体验?
- 冬天跑步比夏天跑步减肥更快 冬天跑步减肥冷怎么办
- kali使用外接usb蓝牙
- 第三章:Servlet、ServletConfig、ServletContext
- 计算机网络整理(上)
- 记一次hadoop namenode 启动失败问题及解决过程(启动几秒钟后又挂了)
- 同时查询多个韵达快递物流信息,分析出多次派件的单号
- 问小鱼如何看?小米机器人之铁蛋!
- mov ah,4ch int 21的作用
- 刘慈欣:元宇宙将是整个人类文明的一次内卷