apache beam入门之初次使用
beam入门宝典之初次使用
咱们不多废话,先直接来如何简单使用beam框架。
这里我不使用常见的wordCount做例子,而是一个大写转小写的例子,语言选用java语言
这个例子里我们会初步学到:
- 如何建立管道
- 如何手动生成数据
- 如何转换
- 如何查看输出
首先我们要新建1个maven工程,然后在pom.xml中加入如下依赖:
<dependency><groupId>org.apache.beam</groupId><artifactId>beam-runners-direct-java</artifactId><version>${beam.version}</version>
</dependency>
beam.version版本选择beam官网上最新,笔者编写此文时使用的版本是2.13.0
接着我们新建1个HowToCreateAndShowData类,然后开始例子
建立管道
任何beam程序,都需要先建立1个管道选项option,再建立1个初始管道
// 建立选项
PipelineOptions pipelineOptions = PipelineOptionsFactory.create();
// 建立管道
Pipeline pipeline = Pipeline.create(pipelineOptions);
关于选项option和pipeline的更多用法,后面的章节会继续介绍
手动生成数据
我们有了pipeline之后,就要往里面塞入数据
beam里提供了手动输入数据的方式,如下:
// 生成初始的输入数据
// 相当于往管道里塞入了3个自己写的字符串元素
PCollection<String> pcStart = pipeline.apply(
Create.of(
"HELLO!",
"THIS IS BEAM DEMO!",
"HAPPY STUDY!"));
我们调用pipeline的apply方法来输入1个Create对象,里面的元素就是我们的输入元素
并且返回1个PCollection的对象,我们称之为数据集。
<String>指的是数据集中数据的类型
如何转换
要实现转换,需要先编写1个DoFn的子类,并实现processElement方法,代码和讲解如下:
// 把字符串转成小写的转换方法类
// DoFn<String,String>中的第一个String是输入的类型,第二个String是输出的类型
static class StrToLowerCaseFn extends DoFn<String, String> {/**
* processElement,过程元素处理方法,类似于spark、mr中的map操作
* 必须加上@ProcessElement注解,并实现processElement方法
* @param context
*/
@ProcessElement
public void processElement(ProcessContext context) {// 从管道中取出的1个元素String inputStr = context.element();// 转成大写String outputStr = inputStr.toLowerCase();// 输出结果context.output(outputStr);}
}
接着将这个计算方法,用数据集.apply(ParDo.of(计算类))的方式组装到刚才的pcStart中
// 组装小写转换
PCollection<String> pcMid =
pcStart.apply(ParDo.of(new StrToLowerCaseFn()));
如何输出
输出的话,我们可以按照上面的方法再编写1个DoFn子类,用于将数据集中输入的元素打印到控制台
// 打印结果方法类
// 因为不需要再往下输出,所以
static class PrintStrFn extends DoFn<String, Void> {/**
* processElement,过程元素处理方法,类似于spark、mr中的map操作
* 必须加上@ProcessElement注解,并实现processElement方法
* @param context
*/
@ProcessElement
public void processElement(ProcessContext context) {// 从管道中取出的1个元素String inputStr = context.element();// 输出System.out.println(inputStr);}
}
然后组装
// 组装输出操作
pcMid.apply(ParDo.of(new PrintStrFn()));
运行
刚才的3次apply结束后,其实转换都还没有开始,仅仅只是组装计算拓扑的1个流程。
真正开始计算需要调用下面的代码:
// 运行结果
pipeline.run().waitUntilFinish();
执行main方法,输出如下结果:
image.png
完整代码
/*** The howToCreateAndShowData** */
public class HowToCreateAndShowData {public static void main(String[] args) {PipelineOptions pipelineOptions = PipelineOptionsFactory.create();Pipeline pipeline = Pipeline.create(pipelineOptions);// 生成初始的输入数据// 相当于往管道里塞入了3个自己写的字符串元素PCollection<String> pcStart = pipeline.apply(Create.of("HELLO!","THIS IS BEAM DEMO!","HAPPY STUDY!"));// 组装小写转换PCollection<String> pcMid = pcStart.apply(ParDo.of(new StrToLowerCaseFn()));// 组装输出操作pcMid.apply(ParDo.of(new PrintStrFn()));// 运行结果pipeline.run().waitUntilFinish();}// 把字符串转成小写的转换方法类// DoFn<String,String>中的第一个String是输入的类型,第二个String是输出的类型static class StrToLowerCaseFn extends DoFn<String, String> {/*** processElement,过程元素处理方法,类似于spark、mr中的map操作* 必须加上@ProcessElement注解,并实现processElement方法** @param context*/@ProcessElementpublic void processElement(ProcessContext context) {// 从管道中取出的1个元素String inputStr = context.element();// 转成大写String outputStr = inputStr.toLowerCase();// 输出结果context.output(outputStr);}}// 打印结果方法类// 因为不需要再往下输出,所以static class PrintStrFn extends DoFn<String, Void> {/*** processElement,过程元素处理方法,类似于spark、mr中的map操作* 必须加上@ProcessElement注解,并实现processElement方法** @param context*/@ProcessElementpublic void processElement(ProcessContext context) {// 从管道中取出的1个元素String inputStr = context.element();// 输出System.out.println(inputStr);}}
}
apache beam入门之初次使用相关推荐
- apache beam 入门之beam-sql
目录:apache beam 个人使用经验总结目录和入门指导(Java) 就像spark-sql 一样,apache beam也有beam-sql, 就是能够输入1张模拟数据表, 然后通过sql语句来 ...
- Apache Beam实战指南 | 玩转KafkaIO与Flink
AI前线导读:本文是 Apache Beam实战指南系列文章 的第二篇内容,将重点介绍 Apache Beam与Flink的关系,对Beam框架中的KafkaIO和Flink源码进行剖析,并结合应用示 ...
- Apache Beam的架构概览
不多说,直接上干货! Apache Beam是一个开源的数据处理编程库,由Google贡献给Apache的项目,前不久刚刚成为Apache TLP项目.它提供了一个高级的.统一的编程模型,允许我们通过 ...
- Apache Beam 是什么,它为什么比其他选择更受欢迎?
1. 概述 在本教程中,我们将介绍 Apache Beam 并探讨其基本概念.我们将首先演示使用 Apache Beam 的用例和好处,然后介绍基本概念和术语.之后,我们将通过一个简单的例子来说明 A ...
- Apache Solr入门教程
转自:http://blog.csdn.net/u011936655/article/details/51960005 Apache Solr入门教程(初学者之旅) 写在前面:本文涉及solr入门的各 ...
- Apache Beam欲通过uber api获取大数据
现在,有用的Apache大数据项目似乎每日更新.相比于每次都重新学习的方式,如果可以通过一个统一的API如何呢? 长期开玩笑说Hadoop生态系统是那种如果你不喜欢一个为特定系统的API,等待五分钟, ...
- Apache Beam发布第一个稳定版本
Apache Beam在官方博客上正式发布了Beam 2.0.0.这是Beam有史以来的第一个稳定版本,根据Beam社区的声明,Beam意欲为未来版本发布保持API的稳定性,并让Beam适用于企业的部 ...
- Apache Beam是什么?
Apache Beam 的前世今生 1月10日,Apache软件基金会宣布,Apache Beam成功孵化,成为该基金会的一个新的顶级项目,基于Apache V2许可证开源. 2003年,谷歌发布了著 ...
- Apache Camel入门
在先前的博文中,我们了解了企业集成模式(EIP). 现在,在这篇文章中,我们将研究实现这些模式的Apache Camel框架. 关于骆驼: Apache Camel是一个开放源代码项目,已有将近5年的 ...
最新文章
- PHP的echo和print小谈
- 图解全排列问题_一道笔试题(122345求有条件全排列)的两种做法
- 《C++ Primer Plus(第六版)》(13)(第九章 内存模型和命名空间 笔记)
- 多线程写mysql数据库_多线程读写mysql数据库
- (转)51单片机C中关于.c文件和.h文件
- 谈判如何在谈判中_如何避免通过工资谈判把钱留在桌上
- Python之configparser模块详解和使用
- 能直接挂在iis的动静态网站_如何防止网站高并发引起的系统崩溃?
- 删好友警告,C语言最强整人小程序!(勿随便使用)
- 二维码设备巡检解决方案
- ICCV、ECCV、CVPR三大国际会议
- 工作中常用的英文单词缩写
- leetcode 36. 有效的数独 (将 9 * 9 数独划分成 9 宫格 )
- TUP第30期:微软资深专家论如何用Visual Studio开发iOS、Android应用
- 云服务器系统安装设置方法,云服务器系统怎么安装
- 邮件客户端 web linux,Linux下五个流行的Webmail
- 转载 actor-critic的收敛性问题
- 毕业设计-基于机器视觉的甘蔗茎秆识别方法-OpenCV
- OpenAI刚融资100亿,DeepMind CEO急了?呼吁AI圈减少科研竞赛!
- quadl matlab,MATLAB求一元函数的数值积分(quad,quadl,quadv),大师来详解