在pom.xml 中 导入maven依赖

<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>word-count-beam</artifactId><version>1.0-SNAPSHOT</version><packaging>jar</packaging><properties><beam.version>2.26.0</beam.version><bigquery.version>v2-rev20200719-1.30.10</bigquery.version><google-api-client.version>1.30.10</google-api-client.version><guava.version>25.1-jre</guava.version><hamcrest.version>2.1</hamcrest.version><jackson.version>2.10.2</jackson.version><joda.version>2.10.5</joda.version><junit.version>4.13-beta-3</junit.version><libraries-bom.version>13.2.0</libraries-bom.version><maven-compiler-plugin.version>3.7.0</maven-compiler-plugin.version><maven-exec-plugin.version>1.6.0</maven-exec-plugin.version><maven-jar-plugin.version>3.0.2</maven-jar-plugin.version><maven-shade-plugin.version>3.1.0</maven-shade-plugin.version><mockito.version>3.0.0</mockito.version><pubsub.version>v1-rev20200713-1.30.10</pubsub.version><slf4j.version>1.7.30</slf4j.version><spark.version>2.4.7</spark.version><hadoop.version>2.8.5</hadoop.version><maven-surefire-plugin.version>3.0.0-M5</maven-surefire-plugin.version><nemo.version>0.1</nemo.version><flink.artifact.name>beam-runners-flink-1.10</flink.artifact.name></properties><repositories><repository><id>apache.snapshots</id><name>Apache Development Snapshot Repository</name><url>https://repository.apache.org/content/repositories/snapshots/</url><releases><enabled>false</enabled></releases><snapshots><enabled>true</enabled></snapshots></repository></repositories><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>${maven-compiler-plugin.version}</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-surefire-plugin</artifactId><version>${maven-surefire-plugin.version}</version><configuration><parallel>all</parallel><threadCount>4</threadCount><redirectTestOutputToFile>true</redirectTestOutputToFile></configuration><dependencies><dependency><groupId>org.apache.maven.surefire</groupId><artifactId>surefire-junit47</artifactId><version>${maven-surefire-plugin.version}</version></dependency></dependencies></plugin><!-- Ensure that the Maven jar plugin runs before the Mavenshade plugin by listing the plugin higher within the file. --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-jar-plugin</artifactId><version>${maven-jar-plugin.version}</version></plugin><!--Configures `mvn package` to produce a bundled jar ("fat jar") for runnersthat require this for job submission to a cluster.--><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>${maven-shade-plugin.version}</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><finalName>${project.artifactId}-bundled-${project.version}</finalName><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/LICENSE</exclude><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/></transformers></configuration></execution></executions></plugin></plugins><pluginManagement><plugins><plugin><groupId>org.codehaus.mojo</groupId><artifactId>exec-maven-plugin</artifactId><version>${maven-exec-plugin.version}</version><configuration><cleanupDaemonThreads>false</cleanupDaemonThreads></configuration></plugin></plugins></pluginManagement></build><profiles><profile><id>direct-runner</id><activation><activeByDefault>true</activeByDefault></activation><!-- Makes the DirectRunner available when running a pipeline. --><dependencies><dependency><groupId>org.apache.beam</groupId><artifactId>beam-runners-direct-java</artifactId><version>${beam.version}</version><scope>runtime</scope></dependency></dependencies></profile><profile><id>portable-runner</id><activation><activeByDefault>true</activeByDefault></activation><!-- Makes the PortableRunner available when running a pipeline. --><dependencies><dependency><groupId>org.apache.beam</groupId><artifactId>beam-runners-portability-java</artifactId><version>${beam.version}</version><scope>runtime</scope></dependency></dependencies></profile><profile><id>dataflow-runner</id><!-- Makes the DataflowRunner available when running a pipeline. --><dependencies><dependency><groupId>org.apache.beam</groupId><artifactId>beam-runners-google-cloud-dataflow-java</artifactId><version>${beam.version}</version><scope>runtime</scope></dependency></dependencies></profile><profile><id>flink-runner</id><!-- Makes the FlinkRunner available when running a pipeline. --><dependencies><dependency><groupId>org.apache.beam</groupId><!-- Please see the Flink Runner page for an up-to-date listof supported Flink versions and their artifact names:https://beam.apache.org/documentation/runners/flink/ --><artifactId>${flink.artifact.name}</artifactId><version>${beam.version}</version><scope>runtime</scope></dependency></dependencies></profile><profile><id>spark-runner</id><!-- Makes the SparkRunner available when running a pipeline. Additionally,overrides some Spark dependencies to Beam-compatible versions. --><properties><netty.version>4.1.17.Final</netty.version></properties><dependencies><dependency><groupId>org.apache.beam</groupId><artifactId>beam-runners-spark</artifactId><version>${beam.version}</version><scope>runtime</scope></dependency><dependency><groupId>org.apache.beam</groupId><artifactId>beam-sdks-java-io-hadoop-file-system</artifactId><version>${beam.version}</version><scope>runtime</scope></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.11</artifactId><version>${spark.version}</version><scope>runtime</scope><exclusions><exclusion><groupId>org.slf4j</groupId><artifactId>jul-to-slf4j</artifactId></exclusion></exclusions></dependency><dependency><groupId>com.fasterxml.jackson.module</groupId><artifactId>jackson-module-scala_2.11</artifactId><version>${jackson.version}</version><scope>runtime</scope></dependency><!-- [BEAM-3519] GCP IO exposes netty on its API surface, causing conflicts with runners --><dependency><groupId>org.apache.beam</groupId><artifactId>beam-sdks-java-io-google-cloud-platform</artifactId><version>${beam.version}</version><exclusions><exclusion><groupId>io.grpc</groupId><artifactId>grpc-netty</artifactId></exclusion><exclusion><groupId>io.netty</groupId><artifactId>netty-handler</artifactId></exclusion></exclusions></dependency></dependencies></profile><profile><id>samza-runner</id><dependencies><dependency><groupId>org.apache.beam</groupId><artifactId>beam-runners-samza</artifactId><version>${beam.version}</version><scope>runtime</scope></dependency></dependencies></profile><profile><id>twister2-runner</id><dependencies><dependency><groupId>org.apache.beam</groupId><artifactId>beam-runners-twister2</artifactId><version>${beam.version}</version><scope>runtime</scope></dependency></dependencies></profile><profile><id>nemo-runner</id><dependencies><dependency><groupId>org.apache.nemo</groupId><artifactId>nemo-compiler-frontend-beam</artifactId><version>${nemo.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>${hadoop.version}</version><exclusions><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId></exclusion><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></exclusion></exclusions></dependency></dependencies></profile><profile><id>jet-runner</id><dependencies><dependency><groupId>org.apache.beam</groupId><artifactId>beam-runners-jet</artifactId><version>${beam.version}</version><scope>runtime</scope></dependency></dependencies></profile></profiles><dependencies><!-- Adds a dependency on the Beam SDK. --><dependency><groupId>org.apache.beam</groupId><artifactId>beam-sdks-java-core</artifactId><version>${beam.version}</version></dependency><!-- Adds a dependency on the Beam Google Cloud Platform IO module. --><dependency><groupId>org.apache.beam</groupId><artifactId>beam-sdks-java-io-google-cloud-platform</artifactId><version>${beam.version}</version></dependency><!-- Dependencies below this line are specific dependencies needed by the examples code. --><dependency><groupId>com.google.api-client</groupId><artifactId>google-api-client</artifactId><version>${google-api-client.version}</version><exclusions><!-- Exclude an old version of guava that is being pulledin by a transitive dependency of google-api-client --><exclusion><groupId>com.google.guava</groupId><artifactId>guava-jdk5</artifactId></exclusion></exclusions></dependency><dependency><groupId>com.google.apis</groupId><artifactId>google-api-services-bigquery</artifactId><version>${bigquery.version}</version><exclusions><!-- Exclude an old version of guava that is being pulledin by a transitive dependency of google-api-client --><exclusion><groupId>com.google.guava</groupId><artifactId>guava-jdk5</artifactId></exclusion></exclusions></dependency><dependency><groupId>com.google.http-client</groupId><artifactId>google-http-client</artifactId><exclusions><!-- Exclude an old version of guava that is being pulledin by a transitive dependency of google-api-client --><exclusion><groupId>com.google.guava</groupId><artifactId>guava-jdk5</artifactId></exclusion></exclusions></dependency><dependency><groupId>com.google.apis</groupId><artifactId>google-api-services-pubsub</artifactId><version>${pubsub.version}</version><exclusions><!-- Exclude an old version of guava that is being pulledin by a transitive dependency of google-api-client --><exclusion><groupId>com.google.guava</groupId><artifactId>guava-jdk5</artifactId></exclusion></exclusions></dependency><dependency><groupId>joda-time</groupId><artifactId>joda-time</artifactId><version>${joda.version}</version></dependency><!-- Add slf4j API frontend binding with JUL backend --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>${slf4j.version}</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-jdk14</artifactId><version>${slf4j.version}</version><!-- When loaded at runtime this will wire up slf4j to the JUL backend --><scope>runtime</scope></dependency><!-- Hamcrest and JUnit are required dependencies of PAssert,which is used in the main code of DebuggingWordCount example. --><dependency><groupId>org.hamcrest</groupId><artifactId>hamcrest-core</artifactId><version>${hamcrest.version}</version></dependency><dependency><groupId>org.hamcrest</groupId><artifactId>hamcrest-library</artifactId><version>${hamcrest.version}</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>${junit.version}</version></dependency><!-- The DirectRunner is needed for unit tests. --><dependency><groupId>org.apache.beam</groupId><artifactId>beam-runners-direct-java</artifactId><version>${beam.version}</version><scope>test</scope></dependency><dependency><groupId>org.mockito</groupId><artifactId>mockito-core</artifactId><version>${mockito.version}</version><scope>test</scope></dependency></dependencies><dependencyManagement><dependencies><dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>${guava.version}</version>  <!-- "-jre" for Java 8 or higher --></dependency><!-- GCP libraries BOM sets the version for google http client --><dependency><groupId>com.google.cloud</groupId><artifactId>libraries-bom</artifactId><version>${libraries-bom.version}</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement>
</project>

编写Java代码案例:

public class WordCount {public static void main(String[] args) {// 创建PipelineOptions, 对Pipeline 进行必要的配置PipelineOptions options = PipelineOptionsFactory.create();// 创建 pipeline, 会负责构建一个数据处理流水线所需的数据处理DAG,以及这个DAG所需要进行的transformPipeline p = Pipeline.create(options);//gs://apache-beam-samples/shakespeare/*// 读取文本数据PCollection<String> lines = p.apply(TextIO.read().from("data/words.txt"));// 输出还是一个 PCollection,但是每个元素变成了单词。// 正则表达式 [^\p{L}]+ :非 Unicode Letters 所以它会按空格或者标点符号等把词分开。PCollection<String> words = lines.apply("ExtractWords",FlatMapElements.into(TypeDescriptors.strings()).via((String word) -> Arrays.asList(word.split("[^\\p{L}]+"))));// 使用 Beam SDK 提供的 Count Transform。Count Transform 会把任意一个 PCollection 转换成有 key/value 的组合// key 是原来 PCollection 中的非重复的元素,value 则是元素出现的次数。PCollection<KV<String, Long>> counts = words.apply(Count.<String>perElement());// 输出格式化: Transform 会把刚才的 key/value 组成的 PCollection 转换成我们想要的输出格式,方便我们输出词频PCollection<String> formatted = counts.apply("FormatResults",MapElements.into(TypeDescriptors.strings()).via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue()));//  TextIO.Write 用来把最终的 PCollection 写进文本文档。formatted.apply(TextIO.write().to("data/wordcounts.txt"));// 运行 Pipelinep.run().waitUntilFinish();}

Apache Beam WordCount案例编写相关推荐

  1. Apache Beam 使用指南(一)

    Apache Beam 一.概述 更多 Apache Beam 代码案例:https://github.com/xiye50070/Apache-Beam-Model.git Apache Beam是 ...

  2. Apache Beam实战指南 | 玩转KafkaIO与Flink

    AI前线导读:本文是 Apache Beam实战指南系列文章 的第二篇内容,将重点介绍 Apache Beam与Flink的关系,对Beam框架中的KafkaIO和Flink源码进行剖析,并结合应用示 ...

  3. WordCount案例

    WordCount案例 需求 1. 需求说明 2. 文件 案例分析 1.需求分析 2.输入数据 3.期望输出数据 4.Mapper类 5. Reducer类 6. Driver类 代码实现 1. 编写 ...

  4. Apache Beam 是什么,它为什么比其他选择更受欢迎?

    1. 概述 在本教程中,我们将介绍 Apache Beam 并探讨其基本概念.我们将首先演示使用 Apache Beam 的用例和好处,然后介绍基本概念和术语.之后,我们将通过一个简单的例子来说明 A ...

  5. MapReduce流程(WordCount案例实现)

    文章目录 1 MapReduce概述 设计构思 实例进程 实例进程分类 完整执行过程 总结 2 MapReduce编程规范 Map阶段2个步骤 Shuffle阶段4个步骤 Reduce阶段2个步骤 3 ...

  6. MapReduce之WordCount案例

    前言 学习大数据框架通常都是从wordcount案例开始的,也是学习框架的基础,wordcount虽然简单,如果能彻底搞清楚其运行原理,对后续深入学习和掌握MapReduce非常有帮助的,本篇以一个w ...

  7. MapReduce入门(一)—— MapReduce概述 + WordCount案例实操

    MapReduce入门(一)-- MapReduce概述 文章目录 MapReduce入门(一)-- MapReduce概述 1.1 MapReduce 定义 1.2 MapReduce 优缺点 1. ...

  8. 2. WordCount案例实操

    文章目录 WordCount案例实操 1. 官方WordCount源码 2. 常用数据序列化类型 3. MapReduce编程规范 3.1 Mapper阶段 3.2 Reducer阶段 3.3 Dri ...

  9. Apache Beam 架构原理及应用实践

    导读:大家好,很荣幸跟大家分享 Apache Beam 架构原理及应用实践.讲这门课之前大家可以想想,从进入 IT 行业以来,不停的搬运数据,不管职务为前端,还是后台服务器端开发.随着这两年科技的发展 ...

最新文章

  1. 阿里再推社交“Real如我”,是电商巨头的流量焦虑
  2. windows10自动填充密码开机自动登陆
  3. 关于使用layui中的tree的一个坑
  4. 工作170:删除做个判断操作 成功删除 取消取消
  5. 我的家庭私有云计划-10
  6. bzoj1334 [Baltic2008]Elect
  7. (转载)程序员文史综合题目一(附答案)
  8. MySQL学习笔记( 整理中)
  9. jstree静态生成树并为树添加触发事件
  10. 自媒体时事热点类素材哪里找?推荐这3个网站
  11. 阿里巴巴python开发面试_在阿里巴巴面试,是什么样的体验?
  12. 冬天跑步比夏天跑步减肥更快 冬天跑步减肥冷怎么办
  13. kali使用外接usb蓝牙
  14. 第三章:Servlet、ServletConfig、ServletContext
  15. 计算机网络整理(上)
  16. 记一次hadoop namenode 启动失败问题及解决过程(启动几秒钟后又挂了)
  17. 同时查询多个韵达快递物流信息,分析出多次派件的单号
  18. 问小鱼如何看?小米机器人之铁蛋!
  19. mov ah,4ch int 21的作用
  20. 刘慈欣:元宇宙将是整个人类文明的一次内卷

热门文章

  1. 智慧水务系统-用科技守护城市供水安全
  2. STM32学习记录——声音传感器的使用
  3. 软考中级程序设计师复习——数据库基础(2)
  4. 硬件加速. 记得加入
  5. 数据、算法、场景:工程化的“三驾马车”
  6. 【SEO教程网】网站内部链接优化的四大技巧
  7. 李健清华计算机专业,李建-西南石油大学 - 计算机科学学院
  8. 数字孪生白皮书(附下载)
  9. 20210407 lvm
  10. Cisco Packet Tracer使用方法和路由器基本配置