beam入门宝典之初次使用

咱们不多废话,先直接来如何简单使用beam框架。
这里我不使用常见的wordCount做例子,而是一个大写转小写的例子,语言选用java语言
这个例子里我们会初步学到:

  1. 如何建立管道
  2. 如何手动生成数据
  3. 如何转换
  4. 如何查看输出

首先我们要新建1个maven工程,然后在pom.xml中加入如下依赖:

<dependency><groupId>org.apache.beam</groupId><artifactId>beam-runners-direct-java</artifactId><version>${beam.version}</version>
</dependency>

beam.version版本选择beam官网上最新,笔者编写此文时使用的版本是2.13.0

接着我们新建1个HowToCreateAndShowData类,然后开始例子

建立管道

任何beam程序,都需要先建立1个管道选项option,再建立1个初始管道

// 建立选项
PipelineOptions pipelineOptions = PipelineOptionsFactory.create();
// 建立管道
Pipeline pipeline = Pipeline.create(pipelineOptions);

关于选项option和pipeline的更多用法,后面的章节会继续介绍

手动生成数据

我们有了pipeline之后,就要往里面塞入数据
beam里提供了手动输入数据的方式,如下:

// 生成初始的输入数据
// 相当于往管道里塞入了3个自己写的字符串元素
PCollection<String> pcStart = pipeline.apply(
Create.of(
"HELLO!",
"THIS IS BEAM DEMO!",
"HAPPY STUDY!"));

我们调用pipeline的apply方法来输入1个Create对象,里面的元素就是我们的输入元素
并且返回1个PCollection的对象,我们称之为数据集。
<String>指的是数据集中数据的类型

如何转换

要实现转换,需要先编写1个DoFn的子类,并实现processElement方法,代码和讲解如下:

// 把字符串转成小写的转换方法类
// DoFn<String,String>中的第一个String是输入的类型,第二个String是输出的类型
static class StrToLowerCaseFn extends DoFn<String, String> {/**
* processElement,过程元素处理方法,类似于spark、mr中的map操作
* 必须加上@ProcessElement注解,并实现processElement方法
* @param context
*/
@ProcessElement
public void processElement(ProcessContext context) {// 从管道中取出的1个元素String inputStr = context.element();// 转成大写String outputStr = inputStr.toLowerCase();// 输出结果context.output(outputStr);}
}

接着将这个计算方法,用数据集.apply(ParDo.of(计算类))的方式组装到刚才的pcStart中

// 组装小写转换
PCollection<String> pcMid =
pcStart.apply(ParDo.of(new StrToLowerCaseFn()));

如何输出

输出的话,我们可以按照上面的方法再编写1个DoFn子类,用于将数据集中输入的元素打印到控制台

// 打印结果方法类
// 因为不需要再往下输出,所以
static class PrintStrFn extends DoFn<String, Void> {/**
* processElement,过程元素处理方法,类似于spark、mr中的map操作
* 必须加上@ProcessElement注解,并实现processElement方法
* @param context
*/
@ProcessElement
public void processElement(ProcessContext context) {// 从管道中取出的1个元素String inputStr = context.element();// 输出System.out.println(inputStr);}
}

然后组装

// 组装输出操作
pcMid.apply(ParDo.of(new PrintStrFn()));

运行

刚才的3次apply结束后,其实转换都还没有开始,仅仅只是组装计算拓扑的1个流程。
真正开始计算需要调用下面的代码:

// 运行结果
pipeline.run().waitUntilFinish();

执行main方法,输出如下结果:
image.png

完整代码

/*** The howToCreateAndShowData** */
public class HowToCreateAndShowData {public static void main(String[] args) {PipelineOptions pipelineOptions = PipelineOptionsFactory.create();Pipeline pipeline = Pipeline.create(pipelineOptions);// 生成初始的输入数据// 相当于往管道里塞入了3个自己写的字符串元素PCollection<String> pcStart = pipeline.apply(Create.of("HELLO!","THIS IS BEAM DEMO!","HAPPY STUDY!"));// 组装小写转换PCollection<String> pcMid = pcStart.apply(ParDo.of(new StrToLowerCaseFn()));// 组装输出操作pcMid.apply(ParDo.of(new PrintStrFn()));// 运行结果pipeline.run().waitUntilFinish();}// 把字符串转成小写的转换方法类// DoFn<String,String>中的第一个String是输入的类型,第二个String是输出的类型static class StrToLowerCaseFn extends DoFn<String, String> {/*** processElement,过程元素处理方法,类似于spark、mr中的map操作* 必须加上@ProcessElement注解,并实现processElement方法** @param context*/@ProcessElementpublic void processElement(ProcessContext context) {// 从管道中取出的1个元素String inputStr = context.element();// 转成大写String outputStr = inputStr.toLowerCase();// 输出结果context.output(outputStr);}}// 打印结果方法类// 因为不需要再往下输出,所以static class PrintStrFn extends DoFn<String, Void> {/*** processElement,过程元素处理方法,类似于spark、mr中的map操作* 必须加上@ProcessElement注解,并实现processElement方法** @param context*/@ProcessElementpublic void processElement(ProcessContext context) {// 从管道中取出的1个元素String inputStr = context.element();// 输出System.out.println(inputStr);}}
}

apache beam入门之初次使用相关推荐

  1. apache beam 入门之beam-sql

    目录:apache beam 个人使用经验总结目录和入门指导(Java) 就像spark-sql 一样,apache beam也有beam-sql, 就是能够输入1张模拟数据表, 然后通过sql语句来 ...

  2. Apache Beam实战指南 | 玩转KafkaIO与Flink

    AI前线导读:本文是 Apache Beam实战指南系列文章 的第二篇内容,将重点介绍 Apache Beam与Flink的关系,对Beam框架中的KafkaIO和Flink源码进行剖析,并结合应用示 ...

  3. Apache Beam的架构概览

    不多说,直接上干货! Apache Beam是一个开源的数据处理编程库,由Google贡献给Apache的项目,前不久刚刚成为Apache TLP项目.它提供了一个高级的.统一的编程模型,允许我们通过 ...

  4. Apache Beam 是什么,它为什么比其他选择更受欢迎?

    1. 概述 在本教程中,我们将介绍 Apache Beam 并探讨其基本概念.我们将首先演示使用 Apache Beam 的用例和好处,然后介绍基本概念和术语.之后,我们将通过一个简单的例子来说明 A ...

  5. Apache Solr入门教程

    转自:http://blog.csdn.net/u011936655/article/details/51960005 Apache Solr入门教程(初学者之旅) 写在前面:本文涉及solr入门的各 ...

  6. Apache Beam欲通过uber api获取大数据

    现在,有用的Apache大数据项目似乎每日更新.相比于每次都重新学习的方式,如果可以通过一个统一的API如何呢? 长期开玩笑说Hadoop生态系统是那种如果你不喜欢一个为特定系统的API,等待五分钟, ...

  7. Apache Beam发布第一个稳定版本

    Apache Beam在官方博客上正式发布了Beam 2.0.0.这是Beam有史以来的第一个稳定版本,根据Beam社区的声明,Beam意欲为未来版本发布保持API的稳定性,并让Beam适用于企业的部 ...

  8. Apache Beam是什么?

    Apache Beam 的前世今生 1月10日,Apache软件基金会宣布,Apache Beam成功孵化,成为该基金会的一个新的顶级项目,基于Apache V2许可证开源. 2003年,谷歌发布了著 ...

  9. Apache Camel入门

    在先前的博文中,我们了解了企业集成模式(EIP). 现在,在这篇文章中,我们将研究实现这些模式的Apache Camel框架. 关于骆驼: Apache Camel是一个开放源代码项目,已有将近5年的 ...

最新文章

  1. PHP的echo和print小谈
  2. 图解全排列问题_一道笔试题(122345求有条件全排列)的两种做法
  3. 《C++ Primer Plus(第六版)》(13)(第九章 内存模型和命名空间 笔记)
  4. 多线程写mysql数据库_多线程读写mysql数据库
  5. (转)51单片机C中关于.c文件和.h文件
  6. 谈判如何在谈判中_如何避免通过工资谈判把钱留在桌上
  7. Python之configparser模块详解和使用
  8. 能直接挂在iis的动静态网站_如何防止网站高并发引起的系统崩溃?
  9. 删好友警告,C语言最强整人小程序!(勿随便使用)
  10. 二维码设备巡检解决方案
  11. ICCV、ECCV、CVPR三大国际会议
  12. 工作中常用的英文单词缩写
  13. leetcode 36. 有效的数独 (将 9 * 9 数独划分成 9 宫格 )
  14. TUP第30期:微软资深专家论如何用Visual Studio开发iOS、Android应用
  15. 云服务器系统安装设置方法,云服务器系统怎么安装
  16. 邮件客户端 web linux,Linux下五个流行的Webmail
  17. 转载 actor-critic的收敛性问题
  18. 毕业设计-基于机器视觉的甘蔗茎秆识别方法-OpenCV
  19. OpenAI刚融资100亿,DeepMind CEO急了?呼吁AI圈减少科研竞赛!
  20. quadl matlab,MATLAB求一元函数的数值积分(quad,quadl,quadv),大师来详解

热门文章

  1. TinyMCE编辑器使用
  2. 刑事案件中,辩护律师具体能做什么?
  3. Transformer秒杀CNN,凭什么?
  4. 车牌识别系统连不上服务器怎么办,高清车牌识别系统常见故障解决方案
  5. Excel工作表事件(4)- 单元格修订记录跟踪
  6. 搞笑趣味短信100条
  7. QT之QTableWidget基本使用
  8. repo manifest.xml详解
  9. r语言将excel文件转化为csv文件导入RStudio
  10. 21天学通Linux嵌入式开发教程