Spark认知&Spark环境搭建

  • 1 Spark认知篇
    • 1.1 什么是Spark?
    • 1.2 Spark 特点
      • 1.2.1 快
      • 1.2.2 易用
      • 1.2.3 通用
      • 1.2.4 兼容性
    • 1.3 Spark 的用户和用途
  • 2 Spark 角色介绍及运行模式
    • 2.1 集群角色
    • 2.2 运行模式
      • 2.2.1 Local 模式
      • 2.2.2 Standalone 模式
      • 2.2.3 Yarn 模式
      • 2.2.4 Mesos 模式
  • 3 Spark集群安装
    • 3.1 Spark 安装地址
      • 3.1.1 [官网地址](http://spark.apache.org/)
      • 3.1.2 [文档查看地址](https://spark.apache.org/docs/2.1.3/)
      • 3.1.3 [下载地址](https://spark.apache.org/downloads.html)
    • 3.2 Standalone 模式安装
    • 3.3 JobHistoryServer 配置
    • 3.4 HA配置
    • 3.5 Yarn 模式安装
  • 4 IDEA 环境应用
    • 4.1 在 IDEA 中编写 WordCount 程序
    • 4.2 本地调试
    • 4.3 远程调试
  • 5 WordCount 程序的简单分析

1 Spark认知篇

  • Google论文:

    1. GFS、MapReduce => Hadoop
    2. BigTable => HBase
  • MapReduce不是很好的数据计算工具,使用MapReudce不是很好的方案。成本很高,资源耗费量也很高。
  • Spark最核心的是计算方面的性能。其针对海量数据的存储还是延用HDFS。
  • Spark是解决了很多其他模式的优化而存在。
  • Spark是一个基于内存运算的分布式的计算引擎,围绕海量数据的计算问题提出的性价比高、性能很高的解决方案。
  • Spark是以磁盘作为载体,从存储载体方面入手,把数据加载到内存。(性能:内存>硬盘)
  • Spark学习成本高,但是会越来越流行,应用会越来越广泛,但是也还没有脱离 Hadoop的体系。
  • Spark用于大规模数据处理的统一分析引擎
  • Spark便于使用,scala、Python、Java、SQL
  • Spark可以随处运行,有很多系统支持Spark的运转

1.1 什么是Spark?

  • Spark 是一种快速、通用、可扩展的大数据分析引擎, 2009 年诞生于加州大学伯克利分校 AMPLab, 2010 年开源, 2013 年 6 月成为 Apache 孵化项目, 2014 年 2 月成为 Apache 顶级项目。项目是用 Scala 进行编写。
  • 目前, Spark 生态系统已经发展成为一个包含多个子项目的集合,其中包含 SparkSQL、Spark Streaming、 GraphX、 MLib、 SparkR 等子项目, Spark 是基于内存计算的大数据并行计算框架。除了扩展了广泛使用的 MapReduce 计算模型,而且高效地支持更多计算模式,包括交互式查询和流处理。 Spark 适用于各种各样原先需要多种不同的分布式平台的场景,包括批处理、迭代算法、交互式查询、流处理。通过在一个统一的框架下支持这些不同的计算,Spark 使我们可以简单而低耗地把各种处理流程整合在一起。而这样的组合,在实际的数据分析 过程中是很有意义的。不仅如此, Spark 的这种特性还大大减轻了原先需要对各种平台分别管理的负担。
  • 大一统的软件栈,各个组件关系密切并且可以相互调用, 这种设计有几个好处:
    1. 软件栈中所有的程序库和高级组件 都可以从下层的改进中获益。
    2. 运行整个软件栈的代价变小了。不需要运 行 5 到 10 套独立的软件系统了,一个机构只需要运行一套软件系统即可。系统的部署、维护、测试、支持等大大缩减。
    3. 能够构建出无缝整合不同处理模型的应用。
  • Spark 的内置项目如下:

    1. Spark Core
      1. 实现了 Spark 的基本功能,包含任务调度、内存管理、错误恢复、与存储系统 交互等模块。 Spark Core 中还包含了对弹性分布式数据集(resilient distributed dataset,简称 RDD)的 API 定义。
    2. Spark SQL
      是 Spark 用来操作结构化数据的程序包。通过 Spark SQL,我们可以使用SQL 或者 Apache Hive 版本的 SQL 方言(HQL)来查询数据。 Spark SQL 支持多种数据源,比 如 Hive 表、 Parquet 以及 JSON 等
    3. Spark Streaming
      是 Spark 提供的对实时数据进行流式计算的组件。提供了用来操作数据流的 API,并且与 Spark Core 中的 RDD API 高度对应。
    4. Spark MLlib
      提供常见的机器学习(ML)功能的程序库。包括分类、回归、聚类、协同过滤等,还提供了模型评估、数据 导入等额外的支持功能
    5. 集群管理器
      Spark 设计为可以高效地在一个计算节点到数千个计算节点之间伸缩计算。为了实现这样的要求,同时获得最大灵活性, Spark 支持在各种集群管理器(clustermanager)上运行,包括 Hadoop YARN、 Apache Mesos,以及 Spark 自带的一个简易调度 器,叫作独立调度器
  • Spark 得到了众多大数据公司的支持,这些公司包括 Hortonworks、IBM、Intel、Cloudera、MapR、 Pivotal、百度、阿里、腾讯、京东、携程、优酷土豆。当前百度的 Spark 已应用于凤巢、大搜索、直达号、百度大数据等业务;阿里利用 GraphX 构建了大规模的图计算和图挖掘系统,实现了很多生产系统的推荐算法;腾讯 Spark 集群达到 8000 台的规模,是当前已知的世界上最大的 Spark 集群。
  • SparkSQL SparkStreaming是数据分析方向;MLib和GraphX是数据科学方向。

1.2 Spark 特点

1.2.1 快

  • 与 Hadoop 的 MapReduce 相比, Spark 基于内存的运算要快 100 倍以上,基于硬盘的运算也要快 10 倍以上。 Spark 实现了高效的 DAG 执行引擎,可以通过基于内存来高效处理数据流。计算的中间结果是存在于内存中的。

1.2.2 易用

  • Spark 支持 Java、 Python 和 Scala 的 API,还支持超过 80 种高级算法,使用户可以快速构建不同的应用。而且 Spark 支持交互式的 Python 和 Scala 的 shell,可以非常方便地在这些 shell 中使用 Spark 集群来验证解决问题的方法。

1.2.3 通用

  • Spark 提供了统一的解决方案。Spark 可以用于批处理、交互式查询(Spark SQL)、实时流处理(Spark Streaming)、机器学习(Spark MLlib)和图计算(GraphX)。这些不同类型的处理都可以在同一个应用中无缝使用。 Spark 统一的解决方案非常具有吸引力,毕竟任何公司都想用统一的平台去处理遇到的问题,减少开发和维护的人力成本和部署平台的物力成本

1.2.4 兼容性

  • Spark 可以非常方便地与其他的开源产品进行融合。比如, Spark 可以使用Hadoop 的 YARN 和 Apache Mesos 作为它的资源管理和调度器,器,并且可以处理所有Hadoop 支持的数据,包括 HDFS、 HBase 和 Cassandra 等。这对于已经部署 Hadoop 集群的用户特别重要,因为不需要做任何数据迁移就可以使用 Spark 的强大处理能力。 Spark 也可以不依赖于第三方的资源管理和调度器,它实现了 Standalone 作为其内置的资源管理和调度框架,这样进一步降低了 Spark的使用门槛,使得所有人都可以非常容易地部署和使用Spark。此外, Spark 还提供了在 EC2 上部署 Standalone 的 Spark 集群的工具

1.3 Spark 的用户和用途

  • 我们大致把 Spark 的用例分为两类: 数据科学应用和数据处理应用。 也就对应的有两种人群:数据科学家和工程师。

    1. 数据科学任务: 主要是数据分析领域,数据科学家要负责分析数据并建模,具备 SQL、统计、预测建模(机器学习)等方面的经验,以及一定的使用 Python、 Matlab 或 R 语言进行编程的能力。
    2. 数据处理应用:工程师定义为使用 Spark 开发 生产环境中的数据处理应用的软件开发者, 通过对接 Spark 的 API 实现对处理的处理和转换等任务

2 Spark 角色介绍及运行模式

2.1 集群角色

  • 从物理部署层面上来看, Spark 主要分为两种类型的节点, Master 节点和 Worker 节点:Master 节点主要运行集群管理器的中心化部分,所承载的作用是分配 Application 到 Worker节点,维护 Worker 节点, Driver, Application 的状态。 Worker 节点负责具体的业务运行。
  • Driver:驱动集群进行运转,对集群初始化
  • Work Node:对单个节点进行管理
  • Executor:一个进程,通过线程池在集群进行多线程的存在。真正运行的。

2.2 运行模式

2.2.1 Local 模式

  • Local 模式就是运行在一台计算机上的模式,通常就是用于在本机上练手和测试。它可以通过以下集中方式设置 master。
  • local: 所有计算都运行在一个线程当中,没有任何并行计算,通常我们在本机执行一些测试代码,或者练手,就用这种模式;
  • local[K]: 指定使用几个线程来运行计算,比如 local[4]就是运行 4 个 worker 线程。通常我们的 cpu 有几个 core,就指定几个线程,最大化利用 cpu 的计算能力;
  • ocal[*]: 这种模式直接帮你按照 cpu 最多 cores 来设置线程数了。

2.2.2 Standalone 模式

  • 本质是资源调度系统
  • 构建一个由 Master+Slave 构成的 Spark 集群, Spark 运行在集群中。

2.2.3 Yarn 模式

  • Spark 客户端直接连接 Yarn; 不需要额外构建 Spark 集群。 有 yarn-client 和yarn-cluster 两种模式, 主要区别在于: Driver 程序的运行节点。
  • yarn-client: Driver 程序运行在客户端, 适用于交互、调试,希望立即看到 app 的输出。那个节点提交的任务,那个节点进行执行
  • yarn-cluster: Driver 程序运行在由 RM(ResourceManager)启动的 AP(APPMaster) 适用于生产环境。任务提交之后,由RM决定在那个节点执行。

2.2.4 Mesos 模式

  • Spark 客户端直接连接 Mesos; 不需要额外构建 Spark 集群。 国内应用比较少,更多的是运用 yarn 调度。

3 Spark集群安装

3.1 Spark 安装地址

3.1.1 官网地址

3.1.2 文档查看地址

3.1.3 下载地址

3.2 Standalone 模式安装

  • 上传并解压 spark 安装包

    和Hadoop的安装包版本号一致。
[bduser@node102 softwares]$ tar -zxvf spark-2.1.3-bin-hadoop2.7.tgz -C /opt/modules/
  • 改名
[bduser@node102 modules]$ mv spark-2.1.3-bin-hadoop2.7/ spark-2.1.3
[bduser@node102 modules]$ ll
  • 进入 spark 安装目录下的 conf 文件夹
[bduser@node102 spark-2.1.3]$ cd conf/
  • 修改配置文件名称
[bduser@node102 conf]$ mv slaves.template slaves
[bduser@node102 conf]$ mv spark-env.sh.template spark-env
[bduser@node102 conf]$ mv log4j.properties.template log4j.properties
[bduser@node102 conf]$ mv spark-defaults.conf.template spark-defaults
  • 删除bin目录所有的cmd文件(Windows环境中执行)
[bduser@node102 bin]$ rm -rf *.cmd
  • 修改 slave 文件,添加 work 节点:
[bduser@node102 bin]$ vim ../conf/slaves

  • 修改 spark-env.sh 文件,添加如下配置:
[bduser@node102 spark-2.1.3]$ vim /opt/modules/spark-2.1.3/conf/spark-env

  • 分发 spark 包
[bduser@node102 modules]$ xsync spark-2.1.3/
  • 在 sbin 目录下的spark-config.sh 文件中加入如下配置:
export JAVA_HOME=/opt/modules/jdk1.8.0_172
  • 不要忘记分发
  • 由于Spark的启动终止程序和Hadoop名字相同,都是start-all.sh 和 stop-all.sh 所以需要改名,我们改spark的,sbin/start-all.sh
[bduser@node102 sbin]$ mv start-all.sh start-sp.sh
[bduser@node102 sbin]$ mv stop-all.sh stop-sp.sh
  • 不要忘记分发。把原先的start-all.sh和stop-all.sh都删除掉。
  • 配置Spark环境变量
[root@node102 spark-2.1.3]# vim /etc/profile

  • 分发后重启(环境变量改变后要使之生效)
  • 在家目录下建立一个spark的软链接方便使用
[bduser@node102 ~]$ xcall ln -s /opt/modules/spark-2.1.3 /home/bduser/spark
----------------node102-------------------
----------------node103-------------------
----------------node104-------------------
  • 启动
start-sp.sh

所有进程启动


[bduser@node102 logs]$ xcall jps
----------------node102-------------------
1313 Worker
1233 Master
1526 Jps
----------------node103-------------------
1424 Jps
1254 Worker
----------------node104-------------------
1173 Worker
1319 Jps
  • 提交任务&执行程序
    \ 一行没输入完 换行
    计算 π 值

[bduser@node102 logs]$ spark-submit \
> --class org.apache.spark.examples.SparkPi \
> --master spark://node102:7077 \
> --executor-memory 512M \
> --total-executor-cores 2 \
> ~/spark/examples/jars/spark-examples_2.11-2.1.3.jar \
> 100

结果:

Pi is roughly 3.1417827141782713

  • 参数说明:
    –master spark://node102:7077 指定 Master 的地址
    –class: 你的应用的启动类 (如 org.apache.spark.examples.SparkPi)
    –deploy-mode: 是否发布你的驱动到 worker 节点(cluster) 或者作为一个本地客户端 (client) (default: client)*
    –conf: 任意的 Spark 配置属性, 格式 key=value. 如果值包含空格,可以加引号“key=value”
    application-jar: 打包好的应用 jar,包含依赖. 这个 URL 在集群中全局可见。比如 hdfs:// 共享存储系统, 如果是 file:// path, 那么所有的节点的 path都包含同样的 jar
    application-arguments: 传给 main()方法的参数
    -executor-memory 512M 指定每个 executor 可用内存为 512M
    –total-executor-cores 2 指定每个 executor 使用的 cup 核数为 2 个
    该算法是利用蒙特·卡罗算法求 PI
  • 访问Spark的UI界面
node102:8080

  • 启动 spark shell
[bduser@node102 logs]$ spark-shell --master spark://node102:7077 --executor-memory 1g --total-executor-cores 2
  1. 注意: 如果启动 spark shell 时没有指定 master 地址,但是也可以正常启动spark shell 和执行 spark shell 中的程序,其实是启动了 spark 的 local 模式,该模式仅在本机启动一个进程,没有与集群建立联系。
  2. Spark Shell 中已经默认将 SparkContext 类初始化为对象 sc。用户代码如果需要用到,则直接应用 sc 即可 sparksession 是 sparksql
  3. 交互模式REPL:
  • 再运行一个Spark程序WordCount

    1. 使用下面文件作为读取对象
[bduser@node102 ~]$ cat ~/spark/RELEASE
Spark 2.1.3 built for Hadoop 2.7.3
Build flags: -Dhttps.protocols=TLSv1.1,TLSv1.2 -B -Phadoop-2.7 -Phive -Phive-thriftserver -Pmesos -Pyarn -Psparkr -DzincPort=3036
  1. 编写wordcount

scala> sc.textFile("/home/bduser/spark/RELEASE").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
res0: Array[(String, Int)] = Array((-Psparkr,1), (2.1.3,1), (Build,1), (built,1), (-Phive-thriftserver,1), (-Pmesos,1), (2.7.3,1), (-Phadoop-2.7,1), (-B,1), (Spark,1), (-Pyarn,1), (-Dhttps.protocols=TLSv1.1,TLSv1.2,1), (-DzincPort=3036,1), (flags:,1), (for,1), (-Phive,1), (Hadoop,1))

3.3 JobHistoryServer 配置

  • 在ip://4040可以查看spark-shell的界面,其中可以查看各个任务的运行状态,但是退出再重开之后就不再显示历史任务了,因此搭建历史服务器 JobHistoryServer。
  • 下面是搭建过程:
  • 修改 spark-default.conf.template 名称
mv spark-defaults.conf.template spark-defaults.conf
  • 修改 spark-default.conf 文件,开启 Log:


其中日志文件所在的hdfs路径要提前创建。:
1. 开启Hadoop hdfs start-all.sh
2. 创建目录

[bduser@node102 spark]$ hdfs dfs -mkdir -p  /user/spark/history
  • 修改 spark-env.sh 文件,添加如下配置:
export SPARK_HISTORY_OPTS="-Dspark.history.ui.port=4000 -Dspark.history.retainedApplications=3 -Dspark.history.fs.logDirectory=hdfs://node102:8020/user/spark/history"
  • spark.eventLog.dir: Application 在运行过程中所有的信息均记录在该属性指定的路径下;
    spark.history.ui.port=4000 调整 WEBUI 访问的端口号为 4000
  • spark.history.fs.logDirectory=hdfs://node102:8020/directory 配置了该属性后,在 start-history-server.sh 时就无需再显式的指定路径, Spark HistoryServer 页面只展示该指定路径下的信息
  • spark.history.retainedApplications=3 指定保存 Application 历史记录的个数,如果超过这个值,旧的应用程序信息将被删除,这个是内存中的应用数,而不是页面上显示的应用数。
  • 分发配置文件
[bduser@node102 conf]$ xsync spark-defaults spark-env
  • 启动历史服务
[bduser@node102 conf]$ start-history-server.sh
  • 查看历史服务器的UI界面
http://node102:4000/

3.4 HA配置

  • 高可用:持续不断的提供复位机制
  • zookeeper 正常安装并启动
[bduser@node102 conf]$ start-zk.sh
----------------node102-------------------
ZooKeeper JMX enabled by default
Using config: /opt/modules/zookeeper-3.4.12/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
----------------node103-------------------
ZooKeeper JMX enabled by default
Using config: /opt/modules/zookeeper-3.4.12/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
----------------node104-------------------
ZooKeeper JMX enabled by default
Using config: /opt/modules/zookeeper-3.4.12/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[bduser@node102 conf]$ xcall jps
----------------node102-------------------
3729 QuorumPeerMain
2052 DataNode
2966 Master
2454 NodeManager
3754 Jps
3450 HistoryServer
2347 ResourceManager
1948 NameNode
3037 Worker
3598 CoarseGrainedExecutorBackend
3502 SparkSubmit
----------------node103-------------------
1475 DataNode
2068 CoarseGrainedExecutorBackend
1540 SecondaryNameNode
2136 QuorumPeerMain
1642 NodeManager
2154 Jps
1869 Worker
----------------node104-------------------
1888 QuorumPeerMain
1360 DataNode
1826 CoarseGrainedExecutorBackend
1667 Worker
1918 Jps
1471 NodeManager
[bduser@node102 conf]$ see-sk-status.sh
----------------node102-------------------
ZooKeeper JMX enabled by default
Using config: /opt/modules/zookeeper-3.4.12/bin/../conf/zoo.cfg
Mode: follower
----------------node103-------------------
ZooKeeper JMX enabled by default
Using config: /opt/modules/zookeeper-3.4.12/bin/../conf/zoo.cfg
Mode: follower
----------------node104-------------------
ZooKeeper JMX enabled by default
Using config: /opt/modules/zookeeper-3.4.12/bin/../conf/zoo.cfg
Mode: leader
  • 修改 spark-env.sh 文件添加如下配置:
    注销master节点的声明
# SPARK_MASTER_HOST=node102
#to bind the master to a different IP address or hostnam
# SPARK_MASTER_PORT=7077
# StandAlone模式的HA配置
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=node102,node103,node104 -Dspark.deploy.zookeeper.dir=/spark-standalone-ha"
  • 分发配置文件
[bduser@node102 conf]$ xsync spark-env.sh
------------------node103-----------------------
sending incremental file list
spark-env.shsent 1680 bytes  received 67 bytes  1164.67 bytes/sec
total size is 4382  speedup is 2.51
------------------node104-----------------------
sending incremental file list
spark-env.shsent 1680 bytes  received 67 bytes  1164.67 bytes/sec
total size is 4382  speedup is 2.51
  • 在 node102 上启动全部节点
[bduser@node102 spark]$ start-sp.sh
  • 在 node103 上单独启动 master 节点
[bduser@node102 spark]$ ssh node103
Last login: Sat Jun 22 10:57:50 2019 from node102
[bduser@node103 ~]$ start-master.sh
[bduser@node102 spark]$ xcall jps
----------------node102-------------------
4416 Worker
3729 QuorumPeerMain
2052 DataNode
4533 Jps
2454 NodeManager
3450 HistoryServer
2347 ResourceManager
1948 NameNode
4334 Master
----------------node103-------------------
2352 Worker
1475 DataNode
1540 SecondaryNameNode
2136 QuorumPeerMain
2441 Master
1642 NodeManager
2509 Jps
----------------node104-------------------
1888 QuorumPeerMain
1360 DataNode
2116 Jps
2062 Worker
1471 NodeManager
  • spark HA 集群访问
[bduser@node102 spark]$ spark-shell --master spark://node102:7077,node103:7077 --executor-memory 512M --total-executor-cores 2
  • 查看UI界面

3.5 Yarn 模式安装

  • Yarn内存评估机制并不准确
  • 修改 hadoop 配置文件 yarn-site.xml,添加如下内容:
        <!--是否启动一个线程检查每个任务正使用的物理内存量,如果任务超出分配>值,则直接将其杀掉,默认是 true --><property><name>yarn.nodemanager.pmem-check-enabled</name><value>false</value></property>
<!--是否启动一个线程检查每个任务正使用的虚拟内存量,如果任务超出分配值,则直>接将其杀掉,默认是 true --><property><name>yarn.nodemanager.vmem-check-enabled</name><value>false</value></property>
  • 分发yarn-site.xml
[bduser@node102 spark]$ xsync /opt/modules/hadoop-2.7.6/etc/hadoop/yarn-site.xml
  • 修改 spark-env.sh,添加如下配置:
YARN_CONF_DIR=/opt/modules/hadoop-2.7.6/etc/hadoop
HADOOP_CONF_DIR=/opt/modules/hadoop-2.7.6/etc/hadoop
  • 分发spark-env.sh
[bduser@node102 spark]$ xsync conf/spark-env.sh
  • 执行一个程序
spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode client \
--executor-memory 512M \
--total-executor-cores 2 \
~/spark/examples/jars/spark-examples_2.11-2.1.3.jar \
100

4 IDEA 环境应用

  • spark shell 仅在测试和验证我们的程序时使用的较多,在生产环境中,通常会在 IDE 中编制程序,然后打成 jar 包,然后提交到集群,最常用的是创建一个 Maven 项目,利用 Maven 来管理 jar 包的依赖。

4.1 在 IDEA 中编写 WordCount 程序

  • 创建一个 Maven 项目 WordCount 并导入依赖
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.3</version>
</dependency>
</dependencies>
<build>
<finalName>WordCount</finalName>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<archive>
<manifest>
<mainClass>WordCount(修改)</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
  • 编写代码
object HelloSpark {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setAppName("hello Spark").setMaster("local[*]")val sc = new SparkContext(sparkConf)val lines = sc.textFile(args(0))val map1 = lines.flatMap(_.split(" "));val map2 = map1.map((_,1))val reduce1 = map2.reduceByKey(_+_)reduce1.saveAsTextFile(args(1))}
}
  • 输入路径参数 在本地执行

4.2 本地调试

  • 本地 Spark 程序调试需要使用 local 提交模式,即将本机当做运行环境,Master 和 Worker 都为本机。运行时直接加断点调试即可。如下:创建 SparkConf 的时候设置额外属性,表明本地执行:
    val sparkConf = new SparkConf().setAppName("hello Spark").setMaster("local[*]")
  • 打成jar包发送到集群 将.setMaster("local[*]")注释掉
  • 在Yarn上运行:
spark-submit \
--class com.nefu.spark.test.HelloSpark \
--master yarn \
--deploy-mode client \
~/WordCount.jar \
hdfs:///flume/20190525/14 hdfs:///user/bduser/test/sparkwcoutput
  • 运行结果:
(finished.,1)
(exec.ListSinkOperator,2)
(</PERFLOG,3)
(21:05:46,532,1)
(is,1)
(Instead,,1)
((Operator.java:close(613)),1)
(2019-03-16,10)
(0,2)
(INFO,10)
(done,1)
(Configuration.deprecation,1)
((PerfLogger.java:PerfLogEnd(148)),3)
(,10)
(start=1552741543709,1)
((FileInputFormat.java:listStatus(249)),1)
(mapred.input.dir,1)
(CliDriver,1)
(mapred.FileInputFormat,1)
(21:05:46,534,1)
(Close,1)
(21:05:46,631,1)
(paths,1)
(end=1552741546534,1)
(row(s),1)
(Total,1)
(input,1)
(:,1)
(mapreduce.input.fileinputformat.inputdir,1)
(duration=2825,1)
(duration=1,2)
((Operator.java:close(635)),1)
(21:05:46,707,1)
(closing...,1)
([main]:,10)
((PerfLogger.java:PerfLogBegin(121)),2)
(21:05:46,708,1)
(end=1552741546533,1)
(<PERFLOG,2)
(process,1)
(seconds,,1)
(start=1552741546707,1)
(start=1552741546532,1)
(to,1)
(2.834,1)
(Time,1)
(method=Driver.run,1)
(log.PerfLogger,5)
(method=releaseLocks,4)
(-,10)
((SessionState.java:printInfo(951)),1)
(Fetched:,1)
(from=org.apache.hadoop.hive.ql.Driver>,5)
(taken:,1)
(21:05:46,571,1)
(deprecated.,1)
(end=1552741546708,1)
(1,2)
(21:05:46,690,2)
(21:05:46,706,1)
(21:05:46,533,1)
(use,1)
((Configuration.java:warnOnceIfDeprecated(1243)),1)
  • 在节点上运行(高可用的话开启zk)
bin/spark-submit \
--class com.nefu.spark.test.HelloSpark \
--master spark://node102:7077,node103:7077 \
~/WordCount.jar \
/user/test/wordcount/input/word.txt /user/test/wordcount/output

4.3 远程调试

  • 通过 IDEA 进行远程调试,主要是将 IDEA 作为 Driver 来提交应用程序,配置过程如下:
  • 修改 sparkConf, 添加最终需要运行的 Jar 包、 Driver 程序的地址,并设置 Master 的提交地址:
val conf = new SparkConf().setAppName("MyWordCount")
.setMaster("spark://node102:7077")
.setJars(List("E:\\SparkIDEA\\spark_test\\target\\WordCount.jar")
)

5 WordCount 程序的简单分析

sc.textFile("hdfs://node102:8020/user/test/wordcount/input/word.t
xt").flatMap(_.split("\t")).map((_,1)).reduceByKey(_+_).saveAsTex
tFile("hdfs://node102:8020/user/test/wordcount/output")
  • sc 是 SparkContext 对象,该对象时提交 spark 程序的入口
  • textFile(hdfs://node102:8020/RELEASE)是 hdfs 中读取数据 解析文本文档的方式来处理一个路径指向的所有文件,将此文件内容初始化为RDD
  • flatMap(.split(" "))先 map 再压平
    map((
    ,1))将单词和 1 构成元组 RDD中的方法,称为算子
  • reduceByKey(+)按照 key 进行 reduce,并将 value 累加
  • saveAsTextFile(“hdfs:// node102:8020/out”)将结果写入到 hdfs 中
  • 在 spark 集群中运行 wordcount 程序其主要业务逻辑比较简单,涵盖一下 3个过程:
  1. 读取存储介质上的文本文件(一般存储在 hdfs 上);
  2. 对文本文件内容进行解析,按照单词进行分组统计汇总;
  3. 将过程 2 的分组结果保存到存储介质上。(一般存储在 hdfs 或者 RMDB上)
  • 虽然 wordcount 的业务逻辑非常简单,但其应用程序在 spark 中的运行过程却巧妙得体现了 spark 的核心精髓——分布式弹性数据集、内存迭代以及函数式编程等特点。下图对 spark 集群中 wordcount 的运行过程进行剖析,加深对 spark 技术原理窥探。
  • Scala对wordcount程序的核心实现:
    该程序在spark 集群的运行过程涉及几个核心的 RDD,主要有 textFileRDD、flatMapRDD、 mapToPairRDD、 shuffleRDD(reduceByKey)等
  • 应用程序通过 textFile 方法读取 hdfs 上的文本文件,数据分片的形式以 RDD 为统一模式将数据加载到不同的物理节点上,通过一系列的数据转换,如利用 flatMap 将文本文件中对应每行数据进行拆分(文本文件中单词以空格为分割符号),形成一个以每个单词为核心新的数据集合 RDD;之后通过 MapRDD 继续转换形成形成(K,V)数据形式,以便进一步使用reduceByKey 方法,该方法会触发 shuffle 行为,促使不同的单词到对应的节点上进行汇聚统计(实际上在夸节点进行数据 shuffle 之前会在本地先对相同单词进行合并累加),形成wordcount 的统计结果;最终通过 saveAsTextFile 方法将数据保存到 hdfs上。
    val sparkConf = new SparkConf().setAppName("hello Spark").setMaster("spark://node102:7077").setJars(List("D:\\AboutMyWork\\IDEA-WorkSpace\\SparkDemos\\hellospark\\target\\WordCount.jar"))val sc = new SparkContext(sparkConf)//解析文本文档的方式来处理一个路径指向的所有文件// ,将此文件内容初始化为RDDval srcRDD = sc.textFile(args(0))//RDD中的方法,称为算子  //以下算子都称为转换算子//扁平化处理:把业务拆分为最小的业务单元 Mapperval flatMapRDD = srcRDD.flatMap(_.split(" "));val mapRDD = flatMapRDD.map((_,1))//根据Key进行合并化简操作val reduceRDD = mapRDD.reduceByKey(_+_)reduceRDD.saveAsTextFile(args(1))

Spark认知Spark环境搭建相关推荐

  1. Spark学习之路一——Spark基础及环境搭建

    Spark学习之路一--Spark基础及环境搭建 文章目录 一. Spark 概述 1.1 概述 1.2 优势特性 1.2.1 运行速度快 1.2.2 容易使用 1.2.3 通用性 1.2.4 运行模 ...

  2. Spark集群环境搭建(standalone模式)

    Spark集群环境搭建(standalone模式) 1. 实验室名称: 2. 实验项目名称: 3. 实验学时: 4. 实验原理: 5. 实验目的: 6. 实验内容: 7. 实验器材(设备.虚拟机名称) ...

  3. Spark详解(二):Spark完全分布式环境搭建

    1. 前言 本文搭建了一个由三节点(master.slave1.slave2)构成的Spark完全分布式集群,并通过Spark分布式计算的一个示例测试集群的正确性.本文将搭建一个支持Yarn的完全分布 ...

  4. spark入门及环境搭建

    简介 快速,通用,大数据处理分析框架 scala编写 采用DAG引擎,支持内存计算,速度快 可以运行scala,java,python,r等开发的程序 集成多种数据源 spark组件 spark co ...

  5. windows10下python开发spark应用的环境搭建

    环境搭建主要涉及到3方面,第1是安装,第2是环境变量的配置,第3是验证安装和配置是否成功,总的涉及到以下5个部分. 环境变量配置是在 此电脑 -> 属性 -> 高级系统设置 -> 高 ...

  6. linux spark单节点环境搭建,Linux下基于Hadoop的Spark1.2单机安装

    一,安装环境 硬件:虚拟机 操作系统:Centos 6.4 64位 IP:10.51.121.10 主机名:datanode-4 安装用户:root Hadoop:Hadoop2.6,Hadoop2. ...

  7. Spark On YARN 环境搭建

    1.确保前边的环境都是否配置成功 搭建环境之前先确定自己的环境是否做好 1.jdk 1.8版本 2.HDFS MapReduce Hadoop 3.2.1 + 3.zookeeper 4.python ...

  8. Windows下Scala+Spark+IDEA+Hadoop环境搭建

    下载安装包,添加环境变量不再赘述. 注意spark和scala要对应版本.例如Spark使用官网下载的spark-2.4.4-bin-hadoop2.7,打开%SPARK_HOME%\jars文件夹, ...

  9. Spark on Yarn环境搭建

    1,解压缩spark安装文件 tar -zxvf spark-3.0.0-bin-hadoop3.2.tgz -C /opt/module 2,修改spark目录名称 mv spark-3.0.0-b ...

最新文章

  1. 完成CitrixVDI架构了解及部署测试
  2. 滤镜应用——制作彩虹效果
  3. 全球首款AI的操作系统来了!100%国产,像用Windows一样简单
  4. python第三方库numpy-谁能介绍下Python生态中的第三方库NumPy
  5. IOS 开发一些常用的地址
  6. matlab神经网络43个案例分析_10个经典案例,带你一起分析:高层结构设计难点...
  7. java gradle构建_在Gradle中为JPMS构建Java 6-8库
  8. leetcode 1. 两数之和(map)
  9. 哪个不是python合法的标识符_哪个不是python合法标识符
  10. 标题、段落标签(HTML)
  11. java实现录屏_java录屏截屏: 用java代码实现的录屏和截图截屏,并在springboot上实现...
  12. 轻松三步教你配置oracle,Oracle Net Configuration Assistant 配置步骤简明教程
  13. Tomcat - SSL操作大全
  14. Invest模型问题答疑--产水模块、土壤保持模块、供需平衡分析、生态系统服务、生物多样性生境质量
  15. 如何使用电脑注册微博登陆模拟器
  16. Qt4 Qt5 通用设置应用软件图标 快捷方式图标方法
  17. 微信企业号开发(1)--基础入门
  18. 开博第一文:成为软件奇才的五要素
  19. 计算机组成原理课后答案(白中英主编第五版).pdf
  20. 面试题:说下局部最优和全局最优的区别

热门文章

  1. SEO静态页面生成系统
  2. 不能随便输入的“netsh winsock reset”
  3. 计算机网络实验(三个部分--验证性、Wireshark、CPT)
  4. 计算机基础的课程标准,《计算机基础》课程标准
  5. Simulink开环控制都不稳----记录一次还没有解决的问题
  6. 华硕笔记本快捷键失效(例如fn+f5失效)
  7. 奇葩需求系列-距离明天十二点倒计时
  8. Rust学习教程32 - 动态数组Vec
  9. python陆股通_【科普】沪股通、深股通、港股通、陆股通都是什么意思?
  10. 华为rh2288服务器芯片组,华为RH2288H V2服务器内部介绍