网上看了别人都在谈Beam,你说咱们作为技术人员技术也得紧跟着时代不是,所以也开始利用业余时间研究Beam。咱不是大神,不能啥都一看就会,所以一天一天来,这个也就作为笔记吧。废话不多说,进入主题,按照老规矩,从官网入手。

其实Beam官网目前做的不是很丰满,不过好在按照步骤进行,可以接受。

Beam是什么呢?英文中Beam是光束的意思,官方对Beam的解释是:Apache Beam是一个开源的统一的编程模型(记住,他是个模型而已),我们可以使用它来创建数据处理管道(核心是管道)。我们首先要定义一个程序,使用开源的BeamSDK来定义管道。然后管道由Beam支持的分布式处理后端之一执行:Apache Apex,Apache Flink,Apache Spark,Google Cloud Dataflow。

Beam对于尴尬并行数据处理任务特别有用,其中问题可以被分解为可以独立和并行处理的许多较小的数据束。我们同样可以使用Beam来提取,变换和加载(ETL)任务和纯数据集成。这些任务对于不同存储介质和数据源之间移动数据,将数据转化成理想格式,或将数据加载到新的系统上有很大的好处。

Beam管道运行器将我们定义的处理管道和程序转化为与我们选择的分布式处理后端兼容的API。当我们运行Beam程序的时候,我们需要为执行管道的后端指定适当的运行器(Runner)。

好了,上面就是简短的理论基础,下面开始我们经典的wordcount环节。不过我打算绕过官方的QuickStart环节,因为这个真的没啥意思,我们直接自己手动创建项目然后开始学习。

我们从Minimal WordCount开始说起,下面我简称:MW。MW演示了一个可以从文本中读取的管道。应用转换来对单词进行标记和计数,并将结果写入到输出文件中。下面是详细步骤:

首先我们创建一个maven项目,如图:

然后在pom文件中加入我们的依赖:

<dependency><groupId>org.apache.beam</groupId><artifactId>beam-sdks-java-core</artifactId><version>0.4.0</version>
</dependency>

接着创建我们的第一个类:Day01,然后在其中创建main方法,那么到此我们的准备工作完毕。下面开始编写代码:

Creating the Pipeline

创建Beam管道的第一步是创建一个PipelineOptions对象,这个对象让我们对我们的管道设置各种选项,例如将要执行我们管道的管道线程以及所选择的线程所需的任何指定配置。我们可以为我们的Pipeline指定一个Runner。比如DataflowRunner或者SparkRunner。当我们不指定的时候,将会默认调用本地的DirectRunner。这里我跟官网不同,我使用最为简单的本地读取。

所以我可以直接创建Pipeline对象:

PipelineOptions pipe = PipelineOptionsFactory.create();
// 当我们不指定的时候,会默认使用DirectRunner这种类型
//  pipe.setRunner(DirectRunner.class);

Pipeline p = Pipeline.create(pipe);

【注意】如果这里直接运行会报错,说本地找不到DirectRunner类(能导入的那个不是我们需要的),因为缺少依赖,在pom中增加:

<dependency><groupId>org.apache.beam</groupId><artifactId>beam-runners-direct-java</artifactId><version>0.4.0</version>
</dependency>

就可以成功解决问题。创建了管道,我们就可以对管道进行转化了。

每个转换采取某种输入,然后产生一些输出数据。输入和输出数据被SDK类:PCollection所表示。PCollection是一个特殊的类,他由Beam的SDK提供,我们可以用来代表几乎任何大小的数据集。流程图如下:

文本文件读取操作被用于Pipeline本身,他生成PCollection作为输出,输出PCollection中的每个元素表示输入文件中的一行文本。那么我们首先创建一个文件:

demo.txt为我们新创建的文件,里面内容:

tom
cat
hello

然后我们开始进行读取:

p.apply(TextIO.Read.from("D:\\JavaProject\\Beam_Demo\\src\\main\\resources\\demo.txt"))

读取完毕我们需要对内容进行处理,在每个元素上调用DoFn方法的ParDo转换,将文本行标记为单个单词,该文本的输入是由前一个TextIO.Read转换生成的文本行的PCollection。ParDo同样转换输出为新的PCollection,其中每个元素表示文本中的单个词:

.apply("ExtractWords", ParDo.of(new DoFn<String, String>() {@ProcessElement
    public void processElement(ProcessContext c) {for (String word : c.element().split(" ")) {if (!word.isEmpty()) {c.output(word);}}}
}))

接下来我们需要对每个单词进行统计,SDK提供的count变换是一种通用的转换,它采用任何类型的PCollection,并返回key/value类型的PCollection。每个key表示来自于集合的唯一元素,每个value表示key出现的总次数。

.apply(Count.<String>perElement())

下面的转换将唯一word和出现次数的每个key/value对格式化为适用于写入输出文件的可打印字符串。MapElements是一个更高级别的复合变换,它封装了一个简单的ParDo。对于PCollection中的每个元素,MapElements应用只产生一个元素的函数。在本例中,MapElements调用执行格式化的simpleFunction(匿名内部类),作为输入,

MapElements获得由count生成的key/value对的PCollection。并产生可打印字符串的新PCollection。

.apply("FormatResult", MapElements.via(new SimpleFunction<KV<String, Long>, String>() {@Override
    public String apply(KV<String, Long> input) {return input.getKey() + ": " + input.getValue();}
})).apply(TextIO.Write.to("D:\\JavaProject\\Beam_Demo\\src\\main\\resources\\wordcounts"));

然后我们开始运行:

p.run().waitUntilFinish();

运行后我们得到结果:

每个里面是word的统计结果,应该是hash分区,所以出现三个文件:

tom: 1
hello: 1
cat: 1

那么计算结束,这里仅仅是一个简单的入门,后面还会继续深入。

感谢开源。

Beam从零开始(一)相关推荐

  1. java 与大数据学习较好的网站

    C# C#中 Thread,Task,Async/Await,IAsyncResult 的那些事儿! https://www.cnblogs.com/doforfuture/p/6293926.htm ...

  2. CSDN日报20170327——《如何平衡工作与生活?真相在此》

    [程序人生]如何平衡工作与生活?真相在此 作者:foruok 经常有人问我"当公司要求我加班时该怎样拒绝又不给领导留下不好的印象"或者"能不能比上司早下班"之类 ...

  3. Greedy search与beam search

    最近在看End-to-end Relation Extraction using LSTMs on Sequences and Tree Structures这篇文章时,看到此文在Entity det ...

  4. 从零开始的linux_manjaro+vim+fish shell+i3酷炫操作和配置(持续更新中......)

    本博客最新更新于 2021年11月18日 一.前言 笔者此前几乎没有vim使用经验,也没有安装过linux虚拟机,偶然在b站上看到TheCW的视频,遂惊异于linux和vim配合的巧妙与vim配置和f ...

  5. 关于新技术的引入原则 ——从零开始学架构

    不以解决实际问题引入的技术都耍流氓. 新技术的引入不是为了证明自己,而是为了解决实际项目中遇到的问题.希望诸位能够控制住自己的心魔. 新技术的引入要求应该是解决的问题大于带来的问题. 再引入新技术,请 ...

  6. Beam Search

    Q: 什么是Beam Search? 它在NLP中的什么场景里会⽤到? 传统的广度优先策略能够找到最优的路径,但是在搜索空间非常大的情况下,内存占用是指数级增长,很容易造成内存溢出,因此提出了beam ...

  7. 从零开始用 Flask 搭建一个网站(二)

    从零开始用 Flask 搭建一个网站(一) 介绍了如何搭建 Python 环境,以及 Flask 应用基本项目结构.我们要搭建的网站是管理第三方集成的控制台,类似于 Slack. 本篇主要讲解数据如何 ...

  8. 从零开始编写自己的C#框架(16)——Web层后端父类

    从零开始编写自己的C#框架(16)--Web层后端父类 原文:从零开始编写自己的C#框架(16)--Web层后端父类 本章节讲述的各个类是后端系统的核心之一,涉及到系统安全验证.操作日志记录.页面与按 ...

  9. ssm radis mysql_从零开始搭建框架SSM+Redis+Mysql(一)之摘要

    从零开始搭建框架SSM+Redis+Mysql(一)之摘要 本文章为本人实际的操作后的回忆笔记,如果有步骤错漏,希望来信307793969@qq.com或者评论指出. 本文章只体现过程,仅体现操作流程 ...

最新文章

  1. 微信小程序 html css xml,微信小程序 使用towxml解析html流程及踩坑记录
  2. 【Python】PAT-1024-科学计数法
  3. 90后CV男神Workshop | 祥雨带你畅聊Model设计新视角
  4. ZOJ-3494 BCD Code (ac自动机+数位dp)
  5. TypeScript—语法简介
  6. 填坑-十万个为什么?(18)
  7. 一.oracle的SQL中group by使用的情况(与聚合函数的关系)
  8. 神器!Alibaba Sentinel,功能真心强大!
  9. ass字幕导入Premiere的另一种思路 用ffmpeg把ass转换成颜色通道和透明度通道两个视频
  10. 关于获得淘宝商品评论的那些事
  11. 高防服务器,可以防多大的ddos攻击
  12. intel服务器芯片组历史,Intel主板芯片组发展史.doc
  13. uni-app小程序,实现根据中文首字母排序功能
  14. Mongodb入门(CRUD与安装)
  15. 酷开系统壁纸模式,百变画作颠覆想象
  16. 拼多多商家一件代发,一键打单有什么软件?
  17. hadoop错误org.apache.hadoop.yarn.exceptions.YarnException Unauthorized request to start container
  18. 火狐、chrome浏览器过滤网页广告设置过程
  19. 【wp7】简易的语音报时懒人闹钟
  20. 新版骗分导论(最少骗到省级三等奖)

热门文章

  1. 关于SpringAop中@within的使用踩坑指南
  2. KindEditor富文编辑器集成kityformula数学公式
  3. 01_Windows系统下 在qt中 对OpenCV下载配置并简单使用
  4. 读书笔记之呼吸 [美] 特德·姜
  5. 罗永浩是怎样买到 T.TT 域名的?
  6. 基于Layui的登录注册页面模板
  7. FPGA串口回环实验
  8. 网优谷说明域名后缀对SEO有影响吗?
  9. 剧情介绍:“热血教师”
  10. JAVA设计模式十七--Composite(组合模式)