dataflow

阿帕奇光束 (Apache Beam)

Apache Beam(Batch + Stream) is a unified programming model that defines and executes both batch and streaming data processing jobs. It provides SDKs for running data pipelines and runners to execute them.

Apache的光束(B ATCH +海峡EAM)是一个统一的编程模型,定义并执行分批和流数据处理作业。 它提供了用于运行数据管道和运行程序的SDK。

Apache Beam can provide value in use cases that involve data movement from different storage layers, data transformations, and real-time data processing jobs.

Apache Beam可以在涉及不同存储层中的数据移动,数据转换和实时数据处理作业的用例中提供价值。

There are three fundamental concepts in Apache Beam, namely:

Apache Beam中有三个基本概念,即:

  • Pipeline — encapsulates the entire data processing tasks and represents a directed acyclic graph(DAG) of PCollection and PTransform. It is analogous to Spark Context.

    管道—封装了整个数据处理任务,并表示PCollectionPTransform的有向无环图(DAG) 它类似于Spark Context。

  • PCollection — represents a data set which can be a fixed batch or a stream of data. We can think it of as a Spark RDD.

    PCollection —表示一个数据集,它可以是固定的批处理或数据流。 我们可以将其视为Spark RDD。

  • PTransform — a data processing operation that takes one or more PCollections and outputs zero or more PCollections. It can be considered as a Spark transformation/action on RDDs to output the result.

    PTransform —一种数据处理操作,采用一个或多个PCollection并输出零个或多个PCollection 。 可以将其视为RDD上的Spark转换/操作,以输出结果。

Apache Beam is designed to enable pipelines to be portable across different runners. In the below example, the pipeline is executed locally using the DirectRunner which is great for developing, testing, and debugging.

Apache Beam旨在使管道可以在不同的运行程序之间移植。 在下面的示例中,使用DirectRunner在本地执行管道,这对于开发,测试和调试非常有用。

WordCount Example(“Bigdata Hello World”):

WordCount示例(“ Bigdata Hello World”):

import apache_beam as beamfrom apache_beam.options.pipeline_options import PipelineOptionswith beam.Pipeline(options=PipelineOptions()) as p:    lines = p | 'Creating PCollection' >> beam.Create(['Hello', 'Hello Good Morning', 'GoodBye'])    counts = (        lines        | 'Tokenizing' >> (beam.FlatMap(lambda x: x.split(' '))                      )        | 'Pairing With One' >> beam.Map(lambda x: (x, 1))        | 'GroupbyKey And Sum' >> beam.CombinePerKey(sum)     | 'Printing' >> beam.ParDo(lambda x: print(x[0], x[1])))

Let’s have a brief review of what code is doing.

让我们简要回顾一下代码在做什么。

beam.Pipeline(options=PipelineOptions())- creates a beam pipeline by taking in the configuration options.

beam.Pipeline(options=PipelineOptions()) -通过接受配置选项来创建光束管道。

beam.Create- creates a PCollection from the data.

beam.Create从数据创建一个PCollection。

beam.FlatMap- applies to each element of the PCollection and tokenize each line into words.

beam.FlatMap适用于beam.FlatMap每个元素,并将每行标记为单词。

beam.Map- transformation operation that maps each word to (word,1)

beam.Map将每个单词映射到(word,1)的转换操作

beam.CombinePerKey- similar to groupbykey operation and sums up each word count.

beam.CombinePerKey类似于groupbykey操作,并且对每个单词计数求和。

beam.ParDo- another transformation operation which is applied on key-value pair and prints the final result.

beam.ParDo另一种转换操作,应用于键-值对并显示最终结果。

Google Cloud Dataflow (Google Cloud Dataflow)

Cloud Dataflow is a fully managed service for running Apache Beam pipelines on Google Cloud Platform. Cloud Dataflow executes data processing jobs. Dataflow is designed to run on a very large dataset, it distributes these processing tasks to several virtual machines in the cluster so that they can process different chunks of data in parallel.Cloud Dataflow is certainly not the first big data processing engine, its not the only one available on Google Cloud Platform. For example, one alternative is to run Apache Spark in Google Cloud Dataproc Service. So, why would you choose Dataflow?There are a few reasons :

Cloud Dataflow是一项完全托管的服务,用于在Google Cloud Platform上运行Apache Beam管道。 Cloud Dataflow执行数据处理作业。 Dataflow设计为在非常大的数据集上运行,它将这些处理任务分配给集群中的多个虚拟机,以便它们可以并行处理不同的数据块.Cloud Dataflow肯定不是第一个大数据处理引擎,它不是Google Cloud Platform上只有一个。 例如,一种替代方法是在Google Cloud Dataproc Service中运行Apache Spark。 那么,为什么要选择Dataflow?有几个原因:

Serverless: We don’t have to manage computing resources. It automatically spins up and down a cluster of virtual machines while running the processing jobs. We can just focus on building the code instead of building the cluster. Apache Spark, on the other hand, requires more configuration even if it is running on Cloud Dataproc.

无服务器 :我们不必管理计算资源。 在运行处理作业时,它会自动旋转虚拟机集群。 我们可以只专注于构建代码,而不是构建集群。 另一方面,即使Apache Spark在Cloud Dataproc上运行,也需要更多配置。

Processing code is separate from the execution environment: In 2016, Google donated open-source Dataflow SDK and a set of data connectors to access Google Cloud Platform which added additional features to the Apache Beam project. We can write beam programs and run them on the local system or Cloud Dataflow service. When we look at the Dataflow documentation, it suggests the Apache Beam website for the latest version of the Software Development Kit.

处理代码与执行环境是分开的: 2016年,Google捐赠了开源Dataflow SDK和一组数据连接器以访问Google Cloud Platform,从而为Apache Beam项目添加了其他功能。 我们可以编写梁程序并在本地系统或Cloud Dataflow服务上运行它们。 当我们查看Dataflow文档时,它建议在Apache Beam网站上获取最新版本的软件开发套件。

Processing batch and stream mode with the same programming model: Other Big data SDKs require different codes depending on whether data comes in batch or streaming form. On the other hand, Apache Beam addresses it with a unified programming model. Competitors like Spark are considering it but they are not quite there yet.

使用相同的编程模型处理批处理和流模式:其他大数据SDK需要不同的代码,具体取决于数据是以批处理还是流处理形式出现。 另一方面,Apache Beam用统一的编程模型来解决它。 像Spark这样的竞争对手正在考虑这样做,但他们还没有到位。

使用Python创建自定义模板 (Creating a Custom template using Python)

The primary goal of the templates is to package the dataflow pipelines in the form of reusable components by only changing the required pipeline parameters. The template files are staged on GCS bucket and can be launched either from the console, gcloud command or from other services like Cloud Scheduler/Functions, etc.

模板的主要目标是通过仅更改所需的管道参数,以可重用组件的形式打包数据流管道。 模板文件在GCS存储桶上暂存,可以从控制台,gcloud命令或其他服务(如Cloud Scheduler / Functions等)启动。

Lets us explore an example of transferring data from Google Cloud Storage to Bigquery using Cloud Dataflow Python SDK and then creating a custom template that accepts a runtime parameter.

让我们探索一个示例,该示例使用Cloud Dataflow Python SDK将数据从Google Cloud Storage传输到Bigquery,然后创建一个接受运行时参数的自定义模板。

Google already provides us with many examples of running the Dataflow jobs using python SDK and lots of examples can be found here https://github.com/GoogleCloudPlatform/DataflowTemplates

Google已经为我们提供了许多使用python SDK运行数据流作业的示例,并且可以在这里找到许多示例https://github.com/GoogleCloudPlatform/DataflowTemplates

I am using PyCharm with python 3.7 and I have installed all the required packages to run Apache Beam(2.22.0) in the local.

我将PyCharm与python 3.7一起使用,并且已经安装了所有必需的程序包以在本地运行Apache Beam(2.22.0)。

A CSV file was upload in the GCS bucket. It is the sample of the public dataset “USA Names” hosted on Bigquery and contains all names from Social Security card applications for births that occurred in the United States after 1879.

CSV文件已上传到GCS存储桶中。 它是Bigquery上托管的公共数据集“ USA Names”的样本,其中包含1879年后在美国出生的社会保险卡申请中的所有姓名。

The application code needs to be authenticated by setting the environment variable GOOGLE_APPLICATION_CREDENTIALin the PyCharm which can be set up as below.

需要通过在PyCharm中设置环境变量GOOGLE_APPLICATION_CREDENTIAL来对应用程序代码进行身份验证,该环境变量可以如下设置。

The downloaded key file path should be set up as an env variable in PyCharm.
下载的密钥文件路径应在PyCharm中设置为env变量。

The service account was given the admin IAM role. If you don’t want an owner role to your service account, please refer to this page (https://cloud.google.com/dataflow/docs/concepts/access-control) on access controls guide for dataflow.

该服务帐户被赋予了管理员IAM角色。 如果您不希望服务帐户拥有所有者角色,请参阅数据流访问控制指南中的此页面( https://cloud.google.com/dataflow/docs/concepts/access-control )。

The below code is used to transfer the data from GCS bucket to Bigquery and can be run either as a template or in the local environment by removing the template location option. It is reading the file in GCS location using beam.io.ReadFromText , mapping the element to convert it into Bigquery rows and then writing it to Bigquery using beam.io.BigquerySink .

以下代码用于将数据从GCS存储桶传输到Bigquery,并且可以通过删除模板位置选项作为模板或在本地环境中运行。 它正在使用beam.io.ReadFromText在GCS位置读取文件,映射该元素以将其转换为Bigquery行,然后使用beam.io.BigquerySink将其写入beam.io.BigquerySink

p = beam.Pipeline(options=pipeline_options_val)data_ingestion = dataingestion()(p | 'Read from a File' >> beam.io.ReadFromText(pipeline_options['input'], skip_header_lines=1) | 'String To BigQuery Row' >> beam.Map(lambda s: data_ingestion.parse_method(s)) | 'Write to BigQuery' >> beam.io.Write(            beam.io.BigQuerySink(                pipeline_options['output'],                schema='state:STRING,gender:STRING,year:STRING,name:STRING,'                       'number:STRING,created_date:STRING',                create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,                write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))p.run().wait_until_finish()

We will now be creating a custom template from the Bigquery data to count the number of females and males present in the sample data. We will be passing the parameter ‘F’ and ‘M’ from the dataflow runtime parameter. The dataflow template uses runtime parameters to accept values that are only available during pipeline execution. To customize the execution of a templated pipeline, we can pass these parameters to functions that run within the pipeline (such as a DoFn).

现在,我们将根据Bigquery数据创建自定义模板,以计算样本数据中存在的女性和男性人数。 我们将从数据流运行时参数传递参数“ F”和“ M”。 数据流模板使用运行时参数来接受仅在管道执行期间可用的值。 为了自定义模板化管道的执行,我们可以将这些参数传递给在管道内运行的函数(例如DoFn )。

We are selecting the gender column from the Bigquery using beam.io.Read(beam.io.BigquerySource()) . Beam.ParDo is used to filter the elements on the value which will be passed during runtime using the dataflow add parameters option. The remaining PCollection is then combined globally using beam.combiners.Count.Globally() to get the count.

我们正在使用beam.io.Read(beam.io.BigquerySource())从Bigquery中选择性别列。 Beam.ParDo用于过滤将在运行时使用dataflow add parameters选项传递的值上的元素。 然后,使用beam.combiners.Count.Globally()全局合并其余的PCollection以获取计数。

A template file is created in the GCS location, the location was passed as a command-line parameter.

在GCS位置中创建了一个模板文件,该位置已作为命令行参数传递。

Now’s it time to run the pipeline.

现在是时候运行管道了。

Click on the CREATE JOB FROM TEMPLATE from the dataflow UI and type in the required details and pass the runtime parameter as shown in the below image. I am ignoring the warning below which requires a metadata file to be created for validating the parameters. The metadata file can be created in the same folder as the template with the name <template_name>_metadata.

从数据流UI中单击CREATE JOB FROM TEMPLATE ,然后键入所需的详细信息,然后传递运行时参数,如下图所示。 我忽略了以下警告,该警告要求创建用于验证参数的元数据文件。 可以在名为<template_name> _metadata的模板的同一文件夹中创建元数据文件。

passing the runtime parameter
传递运行时参数

We can also run the dataflow job using the gcloud command.

我们还可以使用gcloud命令运行数据流作业。

gcloud dataflow jobs run <job-name> --gcs-location <gsc-template-location> --region <region> --staging-location <temp-gcs-location>  --parameters filter_val=<filter-value>

摘要 (Summary)

Apache Beam, in combination with Cloud Dataflow, lets us concentrate on the logical composition of pipelines rather than the physical orchestration of parallel processing. It provides useful abstractions that insulate us from low-level details of distributed processing, thereby providing exceptional opportunities for enterprises to boost their productivity.

Apache Beam与Cloud Dataflow相结合,使我们能够专注于管道的逻辑组成,而不是并行处理的物理编排。 它提供了有用的抽象,使我们与分布式处理的低级细节隔离开来,从而为企业提供了提高生产率的特殊机会。

That’s all for today. You can see the entire code in my Github: https://github.com/ankitakundra/GoogleCloudDataflow

今天就这些。 您可以在我的Github中查看整个代码: https : //github.com/ankitakundra/GoogleCloudDataflow

Thanks for reading. Feel free to reach out to me through the comment section or through LinkedIn https://www.linkedin.com/in/ankita-kundra-77024899/. I am open to any discussion and constructive criticism is welcomed. Thank you!!

谢谢阅读。 欢迎通过评论部分或通过LinkedIn https://www.linkedin.com/in/ankita-kundra-77024899/ 与我联系 我欢迎任何讨论,欢迎建设性的批评。 谢谢!!

翻译自: https://medium.com/swlh/apache-beam-google-cloud-dataflow-and-creating-custom-templates-using-python-c666c151b4bc

dataflow


http://www.taodudu.cc/news/show-6357311.html

相关文章:

  • Spring Cloud Data Flow流处理入门-5
  • SpringCloud DataFlow — 0. 本地部署
  • Flink数据流编程模型(Dataflow Programming Model)
  • 构建多个关于数据库的DataFlow组合(Nifi:Table-Table)
  • 第四届【强网杯】主动
  • [第四届-强网杯]:upload
  • 【BUUCTF】[强网杯 2019]随便注 1
  • 广东省第四届“强网杯”网络安全大赛(“泄露的秘密WP”)
  • 强网杯2019-web-随便注
  • (二)强网杯2019[随便注]
  • 2015广东强网杯web专题
  • 志向 青云
  • qt飞扬青云 / Qt开发经验
  • 全栈云,青云来了!
  • 科技云报道:混合云起势,青云QingCloud领跑
  • 我们不一样。青云的全维云平台技术含量有多少?
  • 每日新闻 | 青云QingCloud正式发布超级混合云解决方案及白皮书
  • 青云QingCloud与陕中二院联手打造智慧医院范本
  • 青云科技以开放姿态打造低代码平台
  • 技术培训 | 青云QingCloud 对象存储应用与实践
  • 青云 KubeSphere 与 OpenCloudOS 完成技术兼容互认证
  • 青云QingCloud 在不同场景化中的云计算应用
  • 2018,青云QingCloud CEO黄允松关于云服务的预言会应验吗?
  • 混合云风头正劲 青云QingCloud为何成为领导者?
  • 字符设备驱动基础3——使用register_chrdev()函数注册字符设备
  • linux内核源码分析之设备驱动
  • 第六讲 Linux字符设备驱动1
  • usbserial驱动 带感叹号_STM32 USB转串口驱动安装不成功出现黄色感叹号解决方法!...
  • 字符设备驱动开发流程
  • 字符设备驱动详解

dataflow_Apache Beam,Google Cloud Dataflow和使用Python创建自定义模板相关推荐

  1. reddit_如何使用Python创建自定义Reddit通知系统

    reddit by Kelsey Wang 王凯西 如何使用Python创建自定义Reddit通知系统 (How to make a custom Reddit notification system ...

  2. 如何在Google Chrome中为扩展程序创建自定义键盘快捷键

    Geeks love keyboard shortcuts-they're often faster than clicking everything with your mouse. We've p ...

  3. python创建自定义函数is_number()来判断一个字符是否是数字

    主要使用错误异常处理try:except:,和float(s)以及unicodedata.numeric(s)函数来处理 def is_number(s):try:float(s) # 如果能转换fl ...

  4. (二)为自动化MLOps设置GitHub、Docker和Google Cloud Platform

    目录 Git存储库 谷歌云平台 Docker 下一步 下载源 - 1.2 MB 在本系列文章中,我们将引导您完成将CI/CD应用于AI任务的过程.您最终会得到满足Google MLOps 成熟度模型2 ...

  5. Google Cloud Natural Language情感分析教程

    安装 Google Cloud SDK 支持的 Python 版本 Python 3(3.5 到 3.9). 运行 PowerShell 命令: (New-Object Net.WebClient). ...

  6. dataflow_Java中的Cloud Dataflow快速入门

    dataflow 在你开始之前 选择或创建一个Cloud Platform Console项目. 转到项目页面 为您的项目启用结算. 启用帐单 启用Cloud Dataflow,Compute Eng ...

  7. python套用word模板_如何使用Python批量创建Word模板

    如何使用 Python 批量创建 Word 模板 在日常工作中我们经常需要重复性地将同一份内容的 Word 文档资料发送 给多个不同客户, 此时 Word 文档内容可能只有客户署名不同. 或者只有部分 ...

  8. python google cloud gcp教程_Google Cloud Platform(GCP)视频学习教程

    资源简介 学习如何利用Google的星球级基础架构迁移和构建高度可扩展,可靠的应用程序 视频说明 想要了解如何在Google Cloud Platform(GCP)上构建和部署应用程序?本课程将向您展 ...

  9. 谷歌cloud_参加Google Cloud专业机器学习工程师考试的20天Beta

    谷歌cloud 1 Aug 2020, I checked to see that the registration page which a week ago showed "we hav ...

最新文章

  1. P2057 [SHOI2007]善意的投票 (最大流最小割)
  2. 项目: 用数组实现反弹球消砖块
  3. cocos 报错dts文件未导入_cocos2dx 3.4项目 导入到 eclipse 爬过的坑
  4. 手机能打开的表白代码_数据分析移动化:打开手机就能做分析
  5. 博客园随笔添加自己的版权信息 [转]
  6. ftp服务器app配置文件,Ubuntu FTP服务器配置与应用
  7. 小团队管理与大团队管理
  8. [iphone-Game]物理引擎-资源整理贴 (3.14 更新)
  9. 网络工程师(软考中级-华为认证)
  10. 发生系统错误 67,找不到网络名
  11. 国科大 计算机网络 复习整理笔记
  12. 网易邮箱注册静态页面
  13. 2022-2028年中国手机结构件行业市场发展规模及市场前景趋势报告
  14. Yarn框架和工作流程简介
  15. react项目在ie11浏览器运行报错Instance method `debug.destroy()` is deprecated and no longer does anything
  16. java公社博客_Java
  17. 快手2018年招聘算法笔试
  18. 2021-09-05-数字身份与区块链
  19. 回溯算法---过河问题(商人过河)
  20. 【Vue3】vue3全解

热门文章

  1. 35、线程的让步与阻塞
  2. 数列的极限和无穷大量
  3. 2022登高架设理论题库及答案
  4. 2021年登高架设考试题及登高架设考试内容
  5. Tame Me【驯服我】
  6. 高精度4位压缩法原理与实现
  7. bugku-告诉你个秘密
  8. 【微处理器与嵌入式】实验2——蜂鸣器+流水灯 汇编
  9. 习题7-5 找鞍点 (20分)(函数调用解法:先找最大值后比较是否为最小值)
  10. 进出口海运货物保险条款及做法