
阿帕奇光束 (Apache Beam)

Apache Beam(Batch + Stream) is a unified programming model that defines and executes both batch and streaming data processing jobs. It provides SDKs for running data pipelines and runners to execute them.

Apache的光束(B ATCH +海峡EAM)是一个统一的编程模型,定义并执行分批和流数据处理作业。 它提供了用于运行数据管道和运行程序的SDK。

Apache Beam can provide value in use cases that involve data movement from different storage layers, data transformations, and real-time data processing jobs.

Apache Beam可以在涉及不同存储层中的数据移动,数据转换和实时数据处理作业的用例中提供价值。

There are three fundamental concepts in Apache Beam, namely:

Apache Beam中有三个基本概念,即:

  • Pipeline — encapsulates the entire data processing tasks and represents a directed acyclic graph(DAG) of PCollection and PTransform. It is analogous to Spark Context.

    管道—封装了整个数据处理任务,并表示PCollectionPTransform的有向无环图(DAG) 它类似于Spark Context。

  • PCollection — represents a data set which can be a fixed batch or a stream of data. We can think it of as a Spark RDD.

    PCollection —表示一个数据集,它可以是固定的批处理或数据流。 我们可以将其视为Spark RDD。

  • PTransform — a data processing operation that takes one or more PCollections and outputs zero or more PCollections. It can be considered as a Spark transformation/action on RDDs to output the result.

    PTransform —一种数据处理操作,采用一个或多个PCollection并输出零个或多个PCollection 。 可以将其视为RDD上的Spark转换/操作,以输出结果。

Apache Beam is designed to enable pipelines to be portable across different runners. In the below example, the pipeline is executed locally using the DirectRunner which is great for developing, testing, and debugging.

Apache Beam旨在使管道可以在不同的运行程序之间移植。 在下面的示例中,使用DirectRunner在本地执行管道,这对于开发,测试和调试非常有用。

WordCount Example(“Bigdata Hello World”):

WordCount示例(“ Bigdata Hello World”):

import apache_beam as beamfrom apache_beam.options.pipeline_options import PipelineOptionswith beam.Pipeline(options=PipelineOptions()) as p:    lines = p | 'Creating PCollection' >> beam.Create(['Hello', 'Hello Good Morning', 'GoodBye'])    counts = (        lines        | 'Tokenizing' >> (beam.FlatMap(lambda x: x.split(' '))                      )        | 'Pairing With One' >> beam.Map(lambda x: (x, 1))        | 'GroupbyKey And Sum' >> beam.CombinePerKey(sum)     | 'Printing' >> beam.ParDo(lambda x: print(x[0], x[1])))

Let’s have a brief review of what code is doing.


beam.Pipeline(options=PipelineOptions())- creates a beam pipeline by taking in the configuration options.

beam.Pipeline(options=PipelineOptions()) -通过接受配置选项来创建光束管道。

beam.Create- creates a PCollection from the data.


beam.FlatMap- applies to each element of the PCollection and tokenize each line into words.


beam.Map- transformation operation that maps each word to (word,1)


beam.CombinePerKey- similar to groupbykey operation and sums up each word count.


beam.ParDo- another transformation operation which is applied on key-value pair and prints the final result.


Google Cloud Dataflow (Google Cloud Dataflow)

Cloud Dataflow is a fully managed service for running Apache Beam pipelines on Google Cloud Platform. Cloud Dataflow executes data processing jobs. Dataflow is designed to run on a very large dataset, it distributes these processing tasks to several virtual machines in the cluster so that they can process different chunks of data in parallel.Cloud Dataflow is certainly not the first big data processing engine, its not the only one available on Google Cloud Platform. For example, one alternative is to run Apache Spark in Google Cloud Dataproc Service. So, why would you choose Dataflow?There are a few reasons :

Cloud Dataflow是一项完全托管的服务,用于在Google Cloud Platform上运行Apache Beam管道。 Cloud Dataflow执行数据处理作业。 Dataflow设计为在非常大的数据集上运行,它将这些处理任务分配给集群中的多个虚拟机,以便它们可以并行处理不同的数据块.Cloud Dataflow肯定不是第一个大数据处理引擎,它不是Google Cloud Platform上只有一个。 例如,一种替代方法是在Google Cloud Dataproc Service中运行Apache Spark。 那么,为什么要选择Dataflow?有几个原因:

Serverless: We don’t have to manage computing resources. It automatically spins up and down a cluster of virtual machines while running the processing jobs. We can just focus on building the code instead of building the cluster. Apache Spark, on the other hand, requires more configuration even if it is running on Cloud Dataproc.

无服务器 :我们不必管理计算资源。 在运行处理作业时,它会自动旋转虚拟机集群。 我们可以只专注于构建代码,而不是构建集群。 另一方面,即使Apache Spark在Cloud Dataproc上运行,也需要更多配置。

Processing code is separate from the execution environment: In 2016, Google donated open-source Dataflow SDK and a set of data connectors to access Google Cloud Platform which added additional features to the Apache Beam project. We can write beam programs and run them on the local system or Cloud Dataflow service. When we look at the Dataflow documentation, it suggests the Apache Beam website for the latest version of the Software Development Kit.

处理代码与执行环境是分开的: 2016年,Google捐赠了开源Dataflow SDK和一组数据连接器以访问Google Cloud Platform,从而为Apache Beam项目添加了其他功能。 我们可以编写梁程序并在本地系统或Cloud Dataflow服务上运行它们。 当我们查看Dataflow文档时,它建议在Apache Beam网站上获取最新版本的软件开发套件。

Processing batch and stream mode with the same programming model: Other Big data SDKs require different codes depending on whether data comes in batch or streaming form. On the other hand, Apache Beam addresses it with a unified programming model. Competitors like Spark are considering it but they are not quite there yet.

使用相同的编程模型处理批处理和流模式:其他大数据SDK需要不同的代码,具体取决于数据是以批处理还是流处理形式出现。 另一方面,Apache Beam用统一的编程模型来解决它。 像Spark这样的竞争对手正在考虑这样做,但他们还没有到位。

使用Python创建自定义模板 (Creating a Custom template using Python)

The primary goal of the templates is to package the dataflow pipelines in the form of reusable components by only changing the required pipeline parameters. The template files are staged on GCS bucket and can be launched either from the console, gcloud command or from other services like Cloud Scheduler/Functions, etc.

模板的主要目标是通过仅更改所需的管道参数,以可重用组件的形式打包数据流管道。 模板文件在GCS存储桶上暂存,可以从控制台,gcloud命令或其他服务(如Cloud Scheduler / Functions等)启动。

Lets us explore an example of transferring data from Google Cloud Storage to Bigquery using Cloud Dataflow Python SDK and then creating a custom template that accepts a runtime parameter.

让我们探索一个示例,该示例使用Cloud Dataflow Python SDK将数据从Google Cloud Storage传输到Bigquery,然后创建一个接受运行时参数的自定义模板。

Google already provides us with many examples of running the Dataflow jobs using python SDK and lots of examples can be found here

Google已经为我们提供了许多使用python SDK运行数据流作业的示例,并且可以在这里找到许多示例

I am using PyCharm with python 3.7 and I have installed all the required packages to run Apache Beam(2.22.0) in the local.

我将PyCharm与python 3.7一起使用,并且已经安装了所有必需的程序包以在本地运行Apache Beam(2.22.0)。

A CSV file was upload in the GCS bucket. It is the sample of the public dataset “USA Names” hosted on Bigquery and contains all names from Social Security card applications for births that occurred in the United States after 1879.

CSV文件已上传到GCS存储桶中。 它是Bigquery上托管的公共数据集“ USA Names”的样本,其中包含1879年后在美国出生的社会保险卡申请中的所有姓名。

The application code needs to be authenticated by setting the environment variable GOOGLE_APPLICATION_CREDENTIALin the PyCharm which can be set up as below.


The downloaded key file path should be set up as an env variable in PyCharm.

The service account was given the admin IAM role. If you don’t want an owner role to your service account, please refer to this page ( on access controls guide for dataflow.

该服务帐户被赋予了管理员IAM角色。 如果您不希望服务帐户拥有所有者角色,请参阅数据流访问控制指南中的此页面( )。

The below code is used to transfer the data from GCS bucket to Bigquery and can be run either as a template or in the local environment by removing the template location option. It is reading the file in GCS location using , mapping the element to convert it into Bigquery rows and then writing it to Bigquery using .

以下代码用于将数据从GCS存储桶传输到Bigquery,并且可以通过删除模板位置选项作为模板或在本地环境中运行。 它正在使用在GCS位置读取文件,映射该元素以将其转换为Bigquery行,然后使用将其写入

p = beam.Pipeline(options=pipeline_options_val)data_ingestion = dataingestion()(p | 'Read from a File' >>['input'], skip_header_lines=1) | 'String To BigQuery Row' >> beam.Map(lambda s: data_ingestion.parse_method(s)) | 'Write to BigQuery' >>                  pipeline_options['output'],                schema='state:STRING,gender:STRING,year:STRING,name:STRING,'                       'number:STRING,created_date:STRING',      ,      

We will now be creating a custom template from the Bigquery data to count the number of females and males present in the sample data. We will be passing the parameter ‘F’ and ‘M’ from the dataflow runtime parameter. The dataflow template uses runtime parameters to accept values that are only available during pipeline execution. To customize the execution of a templated pipeline, we can pass these parameters to functions that run within the pipeline (such as a DoFn).

现在,我们将根据Bigquery数据创建自定义模板,以计算样本数据中存在的女性和男性人数。 我们将从数据流运行时参数传递参数“ F”和“ M”。 数据流模板使用运行时参数来接受仅在管道执行期间可用的值。 为了自定义模板化管道的执行,我们可以将这些参数传递给在管道内运行的函数(例如DoFn )。

We are selecting the gender column from the Bigquery using . Beam.ParDo is used to filter the elements on the value which will be passed during runtime using the dataflow add parameters option. The remaining PCollection is then combined globally using beam.combiners.Count.Globally() to get the count.

我们正在使用从Bigquery中选择性别列。 Beam.ParDo用于过滤将在运行时使用dataflow add parameters选项传递的值上的元素。 然后,使用beam.combiners.Count.Globally()全局合并其余的PCollection以获取计数。

A template file is created in the GCS location, the location was passed as a command-line parameter.


Now’s it time to run the pipeline.


Click on the CREATE JOB FROM TEMPLATE from the dataflow UI and type in the required details and pass the runtime parameter as shown in the below image. I am ignoring the warning below which requires a metadata file to be created for validating the parameters. The metadata file can be created in the same folder as the template with the name <template_name>_metadata.

从数据流UI中单击CREATE JOB FROM TEMPLATE ,然后键入所需的详细信息,然后传递运行时参数,如下图所示。 我忽略了以下警告,该警告要求创建用于验证参数的元数据文件。 可以在名为<template_name> _metadata的模板的同一文件夹中创建元数据文件。

passing the runtime parameter

We can also run the dataflow job using the gcloud command.


gcloud dataflow jobs run <job-name> --gcs-location <gsc-template-location> --region <region> --staging-location <temp-gcs-location>  --parameters filter_val=<filter-value>

摘要 (Summary)

Apache Beam, in combination with Cloud Dataflow, lets us concentrate on the logical composition of pipelines rather than the physical orchestration of parallel processing. It provides useful abstractions that insulate us from low-level details of distributed processing, thereby providing exceptional opportunities for enterprises to boost their productivity.

Apache Beam与Cloud Dataflow相结合,使我们能够专注于管道的逻辑组成,而不是并行处理的物理编排。 它提供了有用的抽象,使我们与分布式处理的低级细节隔离开来,从而为企业提供了提高生产率的特殊机会。

That’s all for today. You can see the entire code in my Github:

今天就这些。 您可以在我的Github中查看整个代码: https : //

Thanks for reading. Feel free to reach out to me through the comment section or through LinkedIn I am open to any discussion and constructive criticism is welcomed. Thank you!!

谢谢阅读。 欢迎通过评论部分或通过LinkedIn 与我联系 我欢迎任何讨论,欢迎建设性的批评。 谢谢!!




