spark 并行处理

by Hari Santanam

通过Hari Santanam

如何使用Spark集群并行处理大数据 (How to use Spark clusters for parallel processing Big Data)

将Apache Spark的弹性分布式数据集(RDD)与Databricks一起使用 (Use Apache Spark’s Resilient Distributed Dataset (RDD) with Databricks)

Due to physical limitations, the individual computer processor has largely reached the upper ceiling for speed with current designs. So, hardware makers added more processors to the motherboard (parallel CPU cores, running at the same speed).

由于物理限制,在当前设计中,单个计算机处理器已在很大程度上达到了速度的上限。 因此,硬件制造商在主板上增加了更多处理器(并行CPU内核,以相同的速度运行)。

But… most software applications written over the last few decades were not written for parallel processing.

但是……过去几十年来编写的大多数软件应用程序都不是为并行处理编写的。

Additionally, data collection has gotten exponentially bigger, due to cheap devices that can collect specific data (such as temperature, sound, speed…).

此外,由于廉价的设备可以收集特定的数据(例如温度,声音,速度等),因此数据收集的数量成倍增长。

To process this data in a more efficient way, newer programming methods were needed.

为了以更有效的方式处理此数据,需要更新的编程方法。

A cluster of computing processes is similar to a group of workers. A team can work better and more efficiently than a single worker. They pool resources. This means they share information, break down the tasks and collect updates and outputs to come up with a single set of results.

计算过程的集群类似于一组工人。 一个团队可以比一个工人更好,更高效地工作。 他们集中资源。 这意味着他们共享信息,分解任务并收集更新和输出以得出一组结果。

Just as farmers went from working on one field to working with combines and tractors to efficiently produce food from larger and more farms, and agricultural cooperatives made processing easier, the cluster works together to tackle larger and more complex data collection and processing.

就像农民从在一个田地上工作到与联合收割机和拖拉机一起工作以有效地从更大和更多的农场生产食物,以及农业合作社简化了加工过程一样,该集群协同工作以处理更大,更复杂的数据收集和处理。

Cluster computing and parallel processing were the answers, and today we have the Apache Spark framework. Databricks is a unified analytics platform used to launch Spark cluster computing in a simple and easy way.

集群计算和并行处理便是答案,如今,我们有了Apache Spark框架。 Databricks是一个统一的分析平台,用于以简单的方式启动Spark集群计算。

什么是星火? (What is Spark?)

Apache Spark is a lightning-fast unified analytics engine for big data and machine learning. It was originally developed at UC Berkeley.

Apache Spark是一个闪电般的统一分析引擎,适用于大数据和机器学习。 它最初是在加州大学伯克利分校开发的。

Spark is fast. It takes advantage of in-memory computing and other optimizations. It currently holds the record for large-scale on-disk sorting.

火花很快。 它利用了内存计算和其他优化功能。 当前,它保留了大规模磁盘上排序的记录。

Spark uses Resilient Distributed Datasets (RDD) to perform parallel processing across a cluster or computer processors.

Spark使用弹性分布式数据集(RDD)在群集或计算机处理器上执行并行处理。

It has easy-to-use APIs for operating on large datasets, in various programming languages. It also has APIs for transforming data, and familiar data frame APIs for manipulating semi-structured data.

它具有易于使用的API,可使用各种编程语言对大型数据集进行操作。 它还具有用于转换数据的API,以及用于处理半结构化数据的熟悉的数据框架API。

Basically, Spark uses a cluster manager to coordinate work across a cluster of computers. A cluster is a group of computers that are connected and coordinate with each other to process data and compute.

基本上,Spark使用群集管理器来协调跨计算机群集的工作。 群集是一组相互连接并相互协调以处理数据和计算的计算机。

Spark applications consist of a driver process and executor processes.

Spark应用程序由驱动程序进程和执行程序进程组成。

Briefly put, the driver process runs the main function, and analyzes and distributes work across the executors. The executors actually do the tasks assigned — executing code and reporting to the driver node.

简而言之,驱动程序运行主要功能,并在执行程序中分析和分配工作。 执行者实际上执行分配的任务-执行代码并向驱动程序节点报告。

In real-world applications in business and emerging AI programming, parallel processing is becoming a necessity for efficiency, speed and complexity.

在业务和新兴AI编程的实际应用中,并行处理已成为提高效率,速度和复杂性的必要条件。

太好了-那么Databricks是什么? (Great — so what is Databricks?)

Databricks is a unified analytics platform, from the creators of Apache Spark. It makes it easy to launch cloud-optimized Spark clusters in minutes.

Databricks是来自Apache Spark的创建者的统一分析平台。 它使在几分钟内启动云优化的Spark集群变得容易。

Think of it as an all-in-one package to write your code. You can use Spark (without worrying about the underlying details) and produce results.

将其视为编写代码的多合一软件包。 您可以使用Spark(无需担心基础细节)并产生结果。

It also includes Jupyter notebooks that can be shared, as well as providing GitHub integration, connections to many widely used tools and automation monitoring, scheduling and debugging. See here for more information.

它还包括可以共享的Jupyter笔记本,并提供GitHub集成,与许多广泛使用的工具的连接以及自动化监视,调度和调试。 有关更多信息,请参见此处 。

You can sign up for free with the community edition. This will allow you to play around with Spark clusters. Other benefits, depending on plan, include:

您可以使用社区版免费注册。 这将使您可以使用Spark集群。 根据计划,其他好处包括:

  • Get clusters up and running in seconds on both AWS and Azure CPU and GPU instances for maximum flexibility.在几秒钟内在AWS以及Azure CPU和GPU实例上启动并运行群集,以实现最大的灵活性。
  • Get started quickly with out-of-the-box integration of TensorFlow, Keras, and their dependencies on Databricks clusters.立即使用TensorFlow,Keras及其对Databricks集群的依赖关系的现成集成快速入门。

Let’s get started. If you have already used Databricks before, skip down to the next part. Otherwise, you can sign up here and select ‘community edition’ to try it out for free.

让我们开始吧。 如果您以前已经使用过Databricks,请跳至下一部分。 否则,您可以在这里注册并选择“社区版”以免费试用。

Follow the directions there. They are clear, concise and easy:

按照那里的指示。 它们清晰,简洁,容易:

  • Create a cluster创建集群
  • Attach a notebook to the cluster and run commands in the notebook on the cluster将笔记本连接到群集,并在群集上的笔记本中运行命令
  • Manipulate the data and create a graph处理数据并创建图形
  • Operations on Python DataFrame API; create a DataFrame from a Databricks dataset对Python DataFrame API的操作; 从Databricks数据集创建DataFrame
  • Manipulate the data and display results处理数据并显示结果

Now that you have created a data program on cluster, let’s move on to another dataset, with more operations so you can have more data.

现在,您已经在集群上创建了一个数据程序,让我们继续进行另一个具有更多操作的数据集,以便可以拥有更多数据。

The dataset is the 2017 World Happiness Report by country, based on different factors such as GDP, generosity, trust, family, and others. The fields and their descriptions are listed further down in the article.

该数据集是基于不同因素(例如GDP,慷慨,信任,家庭等)的国家/地区发布的《 2017年世界幸福报告》。 这些字段及其描述在文章的下方列出。

I previously downloaded the dataset, then moved it into Databricks’ DBFS (DataBricks Files System) by simply dragging and dropping into the window in Databricks.

我以前下载了数据集,然后只需将其拖放到Databricks的窗口中,即可将其移动到Databricks的DBFS(DataBricks文件系统)中。

Or, you can click on Data from left Navigation pane, Click on Add Data, then either drag and drop or browse and add.

或者,您可以从左侧导航窗格中单击数据,单击添加数据,然后拖放或浏览并添加。

# File location and type#this file was dragged and dropped into Databricks from stored #location; https://www.kaggle.com/unsdsn/world-happiness#2017.csv
file_location = "/FileStore/tables/2017.csv"file_type = "csv"
# CSV options# The applied options are for CSV files. For other file types, these # will be ignored: Schema is inferred; first row is header - I # deleted header row in editor and intentionally left it 'false' to #contrast with later rdd parsing, #delimiter # separated, #file_location; if you don't delete header row, instead of reading #C0, C1, it would read "country", "dystopia" etc.infer_schema = "true"first_row_is_header = "false"delimiter = ","df = spark.read.format(file_type) \  .option("inferSchema", infer_schema) \  .option("header", first_row_is_header) \  .option("sep", delimiter) \  .load(file_location)
display(df)

Now, let’s load the file into Spark’s Resilient Distributed Dataset(RDD) mentioned earlier. RDD performs parallel processing across a cluster or computer processors and makes data operations faster and more efficient.

现在,让我们将文件加载到前面提到的Spark的弹性分布式数据集(RDD)中。 RDD在群集或计算机处理器上执行并行处理,使数据操作更快,更高效。

#load the file into Spark's Resilient Distributed Dataset(RDD)data_file = "/FileStore/tables/2017.csv"raw_rdd = sc.textFile(data_file).cache()#show the top 5 lines of the fileraw_rdd.take(5)

Note the “Spark Jobs” below, just above the output. Click on View to see details, as shown in the inset window on the right.

注意输出上方的下面的“ Spark Jobs”。 单击查看以查看详细信息,如右侧插入窗口中所示。

Databricks and Sparks have excellent visualizations of the processes.

Databrick和Sparks具有出色的过程可视化效果。

In Spark, a job is associated with a chain of RDD dependencies organized in a direct acyclic graph (DAG). In a DAG, branches are directed from one node to another, with no loop backs. Tasks are submitted to the scheduler, which executes them using pipelining to optimize the work and transform into minimal stages.

在Spark中,作业与直接非循环图(DAG)中组织的RDD依赖关系链相关联。 在DAG中,分支从一个节点定向到另一个节点,没有环回。 任务被提交给调度程序,调度程序使用流水线执行任务以优化工作并转换为最少的阶段。

Don’t worry if the above items seem complicated. There are visual snapshots of processes occurring during the specific stage for which you pressed Spark Job view button. You may or may not need this information — it is there if you do.

如果上述项目看起来很复杂,请不要担心。 在您按下“ Spark Job”视图按钮的特定阶段,会看到过程的可视快照。 您可能需要此信息,也可能不需要,如果需要,它就在那里。

RDD entries are separated by commas, which we need to split before parsing and building a dataframe. We will then take specific columns from the dataset to use.

RDD条目用逗号分隔,在解析和构建数据帧之前,我们需要对其进行拆分。 然后,我们将从数据集中获取特定的列以使用。

#split RDD before parsing and building dataframecsv_rdd = raw_rdd.map(lambda row: row.split(","))#print 2 rowsprint(csv_rdd.take(2))#print typesprint(type(csv_rdd))print('potential # of columns: ', len(csv_rdd.take(1)[0]))
#use specific columns from dataset
from pyspark.sql import Row
parsed_rdd = csv_rdd.map(lambda r: Row(    country = r[0],   #country, position 1, type=string    happiness_rank = r[1],    happiness_score = r[2],    gdp_per_capita = r[5],    family = r[6],    health = r[7],    freedom = r[8],    generosity = r[9],    trust = r[10],    dystopia = r[11],    label = r[-1]    ))parsed_rdd.take(5)

Here are the columns and definitions for the Happiness dataset:

以下是幸福数据集的列和定义:

Happiness dataset columns and definitions

幸福数据集列和定义

Country — Name of the country.

国家(地区)—国家名称。

Region — Region the country belongs to.

地区-国家所属的地区。

Happiness Rank — Rank of the country based on the Happiness Score.

幸福等级-基于幸福分数的国家/地区排名。

Happiness Score — A metric measured in 2015 by asking the sampled people the question: “How would you rate your happiness on a scale of 0 to 10 where 10 is the happiest.”

幸福分数-2015年的一项衡量标准,通过询问抽样人员以下问题:“您如何以0到10的等级来评价幸福,其中10是最幸福的。”

Economy (GDP per Capita) — The extent to which GDP (Gross Domestic Product) contributes to the calculation of the Happiness Score

经济(人均GDP)-GDP(国内生产总值)对幸福分数计算的贡献程度

Family — The extent to which Family contributes to the calculation of the Happiness Score

家庭-家庭对幸福分数计算的贡献程度

Health — (Life Expectancy)The extent to which Life expectancy contributed to the calculation of the Happiness Score

健康-(预期寿命)预期寿命在计算幸福分数中的贡献程度

Freedom — The extent to which Freedom contributed to the calculation of the Happiness Score.

自由-自由对幸福分数计算的贡献程度。

Trust — (Government Corruption)The extent to which Perception of Corruption contributes to Happiness Score.

信任-(政府腐败)腐败感对幸福感得分的贡献程度。

Generosity — The extent to which Generosity contributed to the calculation of the Happiness Score.

慷慨度—慷慨度对幸福分数计算的贡献程度。

Dystopia Residual — The extent to which Dystopia Residual contributed to the calculation of the Happiness Score (Dystopia=imagined place or state in which everything is unpleasant or bad, typically a totalitarian or environmentally degraded one. Residual — what’s left or remaining after everything is else is accounted for or taken away).

反乌托邦残渣-反乌托邦残渣在计算幸福感分数方面的贡献程度(反乌托邦=想象中的地方或状态,其中一切都不愉快或不好,通常是极权主义或环境恶化的状况。残差-剩下的就是剩下的一切)占或拿走)。

# Create a view or table
temp_table_name = "2017_csv"
df.createOrReplaceTempView(temp_table_name)
#build dataframe from RDD created earlierdf = sqlContext.createDataFrame(parsed_rdd)display(df.head(10)#view the dataframe's schemadf.printSchema()
#build temporary table to run SQL commands#table only alive for the session#table scoped to the cluster; highly optimizeddf.registerTempTable("happiness")#display happiness_score counts using dataframe syntaxdisplay(df.groupBy('happiness_score')          .count()          .orderBy('count', ascending=False)       )
df.registerTempTable("happiness")
#display happiness_score counts using dataframe syntaxdisplay(df.groupBy('happiness_score')          .count()          .orderBy('count', ascending=False)       )

Now, let’s use SQL to run a query to do same thing. The purpose is to show you different ways to process data and to compare the methods.

现在,让我们使用SQL运行查询以执行相同的操作。 目的是向您展示处理数据和比较方法的不同方法。

#use SQL to run query to do same thing as previously done with dataframe (count by happiness_score)happ_query = sqlContext.sql("""                        SELECT happiness_score, count(*) as freq                        FROM happiness                        GROUP BY happiness_score                        ORDER BY 2 DESC                        """)display(happ_query)

Another SQL query to practice our data processing:

另一个用于实践数据处理SQL查询:

#another sql queryhapp_stats = sqlContext.sql("""                            SELECT                              country,                              happiness_rank,                              dystopia                            FROM happiness                            WHERE happiness_rank > 20                            """)display(happ_stats)

There! You have done it — created a Spark-powered cluster and completed a dataset query process using that cluster. You can use this with your own datasets to process and output your Big Data projects.

那里! 您已经完成了—创建了一个由Spark驱动的集群,并使用该集群完成了数据集查询过程。 您可以将其与自己的数据集一起使用以处理和输出大数据项目。

You can also play around with the charts-click on the chart /graph icon at the bottom of any output, specify the values and type of graph and see what happens. It is fun.

您也可以使用图表进行操作,在任何输出的底部单击图表/图形图标,指定图形的值和类型,然后看看会发生什么。 很好玩。

The code is posted in a notebook here at Databricks public forum and will be available for about 6 months as per Databricks.

该代码已发布在Databricks公共论坛的笔记本中,根据Databricks的使用将持续约6个月。

  • For more information on using Sparks with Deep Learning, read this excellent article by Favio Vázquez

    有关将Sparks与深度学习配合使用的更多信息,请阅读FavioVázquez 撰写的精彩文章

Thanks for reading! I hope you have interesting programs with Databricks and enjoy it as much as I have. Please clap if you found it interesting or useful.

谢谢阅读! 我希望您可以使用Databricks进行一些有趣的程序,并尽可能享受它。 如果您觉得它有趣或有用,请鼓掌。

For a complete list of my articles, see here.

有关我的文章的完整列表,请参见此处 。

翻译自: https://www.freecodecamp.org/news/how-to-use-spark-clusters-for-parallel-processing-big-data-86a22e7f8b50/

spark 并行处理

spark 并行处理_如何使用Spark集群并行处理大数据相关推荐

  1. 大数据之-Hadoop完全分布式_集群下_单个节点启动_一个一个启动集群节点---大数据之hadoop工作笔记0035

    前面我们已经配置好了,完全分布式下的,hadoop102,hadoop103,hadoop104,这3台机器. 然后我们去启动节点去, 首先启动之前,我们说一定要先去格式化namenode,但是格式化 ...

  2. 拆分命令_在MongoDB分片集群中拆分数据块chunks

    MongoDB Manual (Version 4.2)> Sharding > Data Partitioning with Chunks > Split Chunks in a ...

  3. 大数据之-Hadoop完全分布式_集群时间同步---大数据之hadoop工作笔记0043

    然后我们需要在集群中的所有机器,保持他们的时间是一样的,为什么,? 比如,我们有3台机器, a机器时间是1点 b机器时间是2点 c机器时间是3点 那么如果我们设置了一个任务是要求他1点执行,那么,会出 ...

  4. 大数据之-Hadoop完全分布式_集群群起_同时启动上万台集群机器---大数据之hadoop工作笔记0037

    然后之前我们启动集群中的机器,都是一台一台启动的,很麻烦,现在我们来实现集群群起. 配置步骤上面有,我们跟着做一下. 首先我们进入 cd /opt/module/hadoop-2.7.2/ 进入以后找 ...

  5. 大数据之-Hadoop完全分布式_完全分布式模式下的集群配置---大数据之hadoop工作笔记0034

    然后前面我们准备好了,完全分布式下用的集群环境,下面我们,开始配置hadoop集群 我们这里用hadoop102,hadoop103,hadoop104 这3台机器. 需要hadoop102,103, ...

  6. 基于分布式数据库集群的大数据职位信息统计

    目录 任务一: MongoDB 分布式集群关键配置信息截图(启动参数文件.初始化参数文件.启动命令等) ch0的参数文件配置: ​编辑 ch1的参数文件配置: ​编辑chconfig的参数文件配置: ...

  7. 【干货】Dask快速搭建分布式集群(大数据0基础可以理解,并使用!)

    非常开心,解决了很久都没有解决的问题 使用的语言: Python3.5 分布式机器: windows7 注意到,其实,通过这工具搭建分布式不需要管使用的电脑是什么系统. 分布式使用流程 Created ...

  8. 大数据之-Hadoop3.x_MapReduce_WordCount案例集群运行---大数据之hadoop3.x工作笔记0093

    然后我们再来看让我们写的那个WordCount的,用java写的这个程序在hadoop的集群上运行. 怎么实现? 我们知道上一节,我们已经可以把我们的WordCount的MapReduce程序,我们自 ...

  9. Flink-1.17.0(Standalone)集群安装-大数据学习系列(四)

    前置:集群规划 机器信息 Hostname k8s-master k8s-node1 k8s-node2 外网IP 106.15.186.55 139.196.15.28 47.101.63.122 ...

  10. 大数据之-Hadoop完全分布式_集群文件存储路径说明_完全分布式集群测试---大数据之hadoop工作笔记0038

    前面咱们已经在完全分布式模式下,配置好了SSH免密登录了, 现在我们来看看hdfs的,路径是怎么回事 我们去上传一个小文件,上传一个大文件,然后对比查看 首先我们去上传一个小文件 /opt/modul ...

最新文章

  1. Error in terms.formula(formula, data = data) : invalid model formula in ExtractVars
  2. 调参到头秃?你需要这份自动超参搜索技术攻略
  3. 来自星星的你,我要代表月亮消灭你一
  4. Python3之paramiko模块
  5. ubuntu安装pip包管理器
  6. 关于《侏罗纪世界》你应该知道的18件事
  7. [python][os]分离文件目录,文件名以及文件后缀
  8. AutoCAD DWG,DXF文件导出高清图片、PDF
  9. error C2065: “SHELLEXECUTEINFO”: 未声明的标识符
  10. linux步进电机驱动程序,基于S3C2440嵌入式Linux的步进电机驱动程序
  11. MAC版SecureCRT+SecureFX 安装说明
  12. oracle 排除节假日,ORACLE 计算节假日
  13. java top virt_Java 进程占用 VIRT 虚拟内存超高的问题研究
  14. 派工单系统 源码_「VIP报修云」报修工单进度通知方法
  15. a href a/target属性讲解
  16. TypeError: 'str' object cannot be interpreted as an integer
  17. 如何辨识“真假”敏捷?
  18. Java基于局域网(LAN)的聊天室软件-内附源码
  19. android studio代码教程,史上最详细的Android Studio系列教程三
  20. 怎么在jq中添加html样式,jquery怎么添加css样式

热门文章

  1. find和grep区别
  2. IT方面学习交流群推荐
  3. php打印n乘n沙漏形状图形,《算法笔记》3.3小节——入门模拟-图形输出
  4. MySQL5.7自带分词搜索使用
  5. 优达(Udacity)smartcab
  6. Apache报错指定的网络名不再可用解决方案
  7. 渣渣的Leetcode之旅(Python3)_打卡(12,15,917,7,8)
  8. 新手小白都看得懂得光通信知识
  9. 乔布斯在斯坦福大学的演讲稿【中英】
  10. Tello Scratch使用说明