欢迎支持笔者新作:《深入理解Kafka:核心设计与实践原理》和《RabbitMQ实战指南》,同时欢迎关注笔者的微信公众号:朱小厮的博客。

欢迎跳转到本文的原文链接:https://honeypps.com/mq/kafka-and-spark-integration-3-spark-code-model/


在Spark中,我们通过对分布式数据集的操作来表达我们的计算意图,这些计算会自动地在集群上并行进行。这样的数据集被称为弹性分布式数据集(Resilient Distributed Dataset),简称RDD。RDD是Spark对分布式数据和计算的基本抽象。在Spark中,对数据的所有操作不外乎创建RDD、转换已有RDD以及调用RDD操作进行求值。在《Spark的安装及简单应用》的单词统计示例中,rdd和wordmap都是MapPartitionsRDD类型的RDD,而wordreduce是ShuffledRDD类型的RDD。

RDD支持2种类型的操作:转换操作(Transformation Operation)和行动操作(Action Operation)。有些资料还会细分为创建操作、转换操作、控制操作以及行动操作等4种类型。转换操作会由一个RDD生成一个新的RDD。行动操作会对RDD计算出一个结果,并把结果返回到驱动器程序中,或者把结果存储到外部存储系统中。转换操作和行动操作的区别在于Spark计算RDD的方式不同。虽然你可以在任何时候定义新的RDD,但Spark只会惰性计算这些RDD。它们只有第一次在一个行动操作中用到时,才会真正计算。表中给出了转换操作和行动操作之间对比的更多细节。

类别 函数 区别
转换操作 map、filter、groupBy、join、union、reduce、sort、partitionBy等 返回值还是RDD,不会立马提交给Spark集群运行
行动操作 count、collect、take、save、show等 返回值不是RDD,会形成DAG图,提交给Spark集群运行并立即返回结果

通过转换操作,从已有的RDD中派生出新的RDD,Spark会使用谱系图(lineage graph,很多资料也会翻译为“血统”)来记录这些不同RDD之间的依赖关系。Spark需要用这些信息来按需计算每个RDD,也可以依赖谱系图在持久化的RDD丢失部分数据时恢复所丢失的数据。行动操作会把最终求得的结果返回到驱动器程序,或者写入外部存储系统中。由于行动操作需要生产实际的输出,它们会强制执行那些求值必须用到的RDD的转换操作。

Spark中RDD计算是以分区(Partition)为单位的,将RDD划分为很多个分区分布到集群的节点中,分区的多少涉及对这个RDD进行并行计算的粒度。如下图所示,实线方框A、B、C、D、E、F、G都表示的是RDD,阴影背景的矩形则表示分区。A、B、C、D、E、F、G之间的依赖关系构成整个应用的谱系图。


依赖关系还可以分为窄依赖和宽依赖。窄依赖(Narrow Dependencies)是指每个父RDD的分区都至多被一个子RDD的分区使用,而宽依赖(Wide Dependencies)是指多个子RDD的分区依赖一个父RDD的分区。图中,C和D之间是窄依赖,而A和B之间是宽依赖。RDD中行动操作的执行将会以宽依赖为分界来构建各个调度阶段,各个调度阶段内部的窄依赖则前后链接构成流水线。图中的3个虚线方框分别代表了3个不同的调度阶段。

对于执行失败的任务,只要它对应的调度阶段的父类信息仍然可用,该任务就会分散到其它节点重新执行。如果某些调度阶段不可用,则重新提交相应的任务,并以并行方式计算丢失的地方。在整个作业中如果某个任务执行缓慢,系统则会在其他节点上执行该任务的副本,并最终取最先得到的结果作为最终的结果。

下面就以与《Spark的安装及简单应用》中相同的单词统计程序来分析一下Spark的编程模型,与《Spark的安装及简单应用》中所不同的是,这里的是一个完整的Scala程序,程序所对应的Maven依赖如下:

<dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>2.3.1</version>
</dependency>

具体代码示例如下:

package scala.spark.demo
import org.apache.spark.{SparkConf, SparkContext}object WordCount {def main(args: Array[String]): Unit ={val conf = new SparkConf().setAppName("WordCount").setMaster("local")①val sc = new SparkContext(conf)②val rdd = sc.textFile("/opt/spark-2.3.1-bin-hadoop2.7/bin/spark-shell")③val wordcount = rdd.flatMap(_.split(" ")).map(x=>(x,1)).reduceByKey(_+_)④val wordsort = wordcount.map(x=>(x._2,x._1)).sortByKey(false).map(x=>(x._2,x._1))⑤wordsort.saveAsTextFile("/tmp/spark")⑥sc.stop()⑦}
}

main()方法主体中第①和第②行中首先创建一个SparkConf对象来配置应用程序,然后基于这个SparkConf创建了一个SparkContext对象。一旦有了SparkContext,就可以用它来创建RDD,第③行代码中调用了sc.textFile()来创建一个代表文件中各行文本的RDD。第④行中rdd.flatMap(_.split(" ")).map(x=>(x,1))这一段内容的依赖关系是窄依赖,而reduceByKey(_+_)操作对单词进行计数时属于宽依赖。第⑥行中将排序后的结果存储起来。最后第⑦行中使用stop()方法来关闭应用。

在$SPARK_HOME/bin目录中还有一个spark-submit脚本,用于将应用快速部署到Spark集群中。比如这里的WordCount程序,当我们希望通过spark-submit部署时,只需要将应用打包成jar包(即下面示例中的wordcount.jar)并上传到Spark集群中,然后通过spark-submit进行部署,示例如下:

[root@node1  spark]# bin/spark-submit --class scala.spark.demo.WordCount wordcount.jar --executor-memory 1G --master spark://localhost:7077
2018-08-06 15:39:54 WARN  NativeCodeLoader:62 - Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
2018-08-06 15:39:55 INFO  SparkContext:54 - Running Spark version 2.3.1
2018-08-06 15:39:55 INFO  SparkContext:54 - Submitted application: WordCount
2018-08-06 15:39:55 INFO  SecurityManager:54 - Changing view acls to: root
2018-08-06 15:39:55 INFO  SecurityManager:54 - Changing modify acls to: root
(....省略若干)
2018-08-07 12:25:47 INFO  AbstractConnector:318 - Stopped
Spark@6299e2c1{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
2018-08-07 12:25:47 INFO  SparkUI:54 - Stopped Spark web UI at
http://10.199.172.111:4040
2018-08-07 12:25:47 INFO  MapOutputTrackerMasterEndpoint:54 –
MapOutputTrackerMasterEndpoint stopped!
2018-08-07 12:25:47 INFO  MemoryStore:54 - MemoryStore cleared
2018-08-07 12:25:47 INFO  BlockManager:54 - BlockManager stopped
2018-08-07 12:25:47 INFO  BlockManagerMaster:54 - BlockManagerMaster stopped
2018-08-07 12:25:47 INFO
OutputCommitCoordinator$OutputCommitCoordinatorEndpoint:54 –
OutputCommitCoordinator stopped!
2018-08-06 15:46:57 INFO  SparkContext:54 - Successfully stopped SparkContext
2018-08-06 15:46:57 INFO  ShutdownHookManager:54 - Shutdown hook called
2018-08-06 15:46:57 INFO  ShutdownHookManager:54 - Deleting directory
/tmp/spark-fa955139-270c-4899-82b7-4959983a1cb0
2018-08-06 15:46:57 INFO  ShutdownHookManager:54 - Deleting directory
/tmp/spark-3f359966-2167-4bb9-863a-2d8a8d5e8fbe

示例中的–class用来指定应用程序的主类,这里为scala.spark.demo.WordCount;–executor-memory用来指定执行器节点的内容,这里设置为1G。最后得到的输出结果如下所示:

[root@node1 spark]# ls /tmp/spark
part-00000  _SUCCESS
[root@node1 spark]# cat /tmp/spark/part-00000
(,91)
(#,37)
(the,19)
(in,7)
(to,7)
(for,6)
(if,5)
(then,5)
(under,4)
(stty,4)
(not,4)

欢迎跳转到本文的原文链接:https://honeypps.com/mq/kafka-and-spark-integration-3-spark-code-model/


欢迎支持笔者新作:《深入理解Kafka:核心设计与实践原理》和《RabbitMQ实战指南》,同时欢迎关注笔者的微信公众号:朱小厮的博客。


[Kafka与Spark集成系列三] Spark编程模型相关推荐

  1. Kafka与Spark集成系列二Spark的安装及简单应用

    原 [Kafka与Spark集成系列二] Spark的安装及简单应用https://blog.csdn.net/u013256816/article/details/82082019版权声明:本文为博 ...

  2. [Kafka与Spark集成系列一] Spark入门

    欢迎支持笔者新作:<深入理解Kafka:核心设计与实践原理>和<RabbitMQ实战指南>,同时欢迎关注笔者的微信公众号:朱小厮的博客. 欢迎跳转到本文的原文链接:https: ...

  3. [Kafka与Spark集成系列二] Spark的安装及简单应用

    欢迎支持笔者新作:<深入理解Kafka:核心设计与实践原理>和<RabbitMQ实战指南>,同时欢迎关注笔者的微信公众号:朱小厮的博客. 欢迎跳转到本文的原文链接:https: ...

  4. [Kafka与Spark集成系列四] Spark运行结构

    欢迎支持笔者新作:<深入理解Kafka:核心设计与实践原理>和<RabbitMQ实战指南>,同时欢迎关注笔者的微信公众号:朱小厮的博客. 欢迎跳转到本文的原文链接:https: ...

  5. 机器学习-白板推导系列(三十)-生成模型(Generative Model)

    机器学习-白板推导系列(三十)-生成模型(Generative Model) 30.1 生成模型的定义 前面所详细描述的模型以浅层的机器学习为主.本章将承上启下引出后面深度机器学习的部分.本小节,主要 ...

  6. ARMv7-M4处理器系列文章-2 编程模型

    概述 本文主要记录M4处理器的编程模型,其中会聊到寄存器组,处理器模式,软件执行特权等. 处理器模式 线程模式:当处理器复位或者异常处理结束后,就会进入线程模式,在线程模式下,主要运行的用户应用软件. ...

  7. Spark Shuffle系列-----1. Spark Shuffle与任务调度之间的关系

    本文转自http://blog.csdn.net/u012684933/article/details/49074185,所有权力归原作者所有,仅供学习. Spark根据RDD间的依赖关系是否是Shu ...

  8. Flink1.6系列之—数据流编程模型

    序言: Flink系列的文章会一直更新,这里只是参考官方文档,给出一个大概的解释,这里面涉及很多的细节需要划分多个模块单独来讲解,有兴趣的同学还是直接去看官网(官方文档和社区)和源码,这样获取知识最真 ...

  9. Apache SparkStreaming 简介和编程模型

    1. 简介 图5.22 SparkStreaming[16] Spark Streaming是Spark API核心扩展,提供对实时数据流进行流式处理,具备可扩展.高吞吐和容错等特性.Spark St ...

最新文章

  1. JSP简单标签带属性开发
  2. 1-spark学习笔记-大数据概述
  3. jsp中jquery传值给Java_jsp中利用jquery+ajax在前后台之间传递json格式参数
  4. Mybatis更新和删除数据
  5. gpg加解密软件学习
  6. php教程知识点归纳,PHP知识点小结
  7. 【编译原理】如何编写BNF?
  8. ipv6寻址_有类和无类寻址:IPV4寻址| 计算机网络
  9. spring boot 学习之五(日志配置)
  10. 不注册使用 .NET Reactor
  11. 3.1.2 Score Inflation 总分
  12. 开源OA项目:办公用品如何管理?
  13. B站视频下载方法之--手机下载后再转移至电脑
  14. 后门攻击阅读笔记,Input-aware dynamic backdoor attack
  15. 安卓开发学习5-6:布局管理器:布局管理器嵌套
  16. 【OpenGrok代码搜索引擎】一、OpenGrok简介
  17. 时尚品牌如何做微信营销推广?具体方法有哪些?
  18. SSM 框架原理简介及解析
  19. 三菱PLC控制步进驱动器脉冲定位相关(附代码接线图)
  20. 一文搞懂 Cocos Creator 3.x 坐标转换!建议收藏

热门文章

  1. JDK1.8并发包中的类
  2. myeclipse 项目右键没有svn_新建SVN仓库并上传项目
  3. mysql 查询的转义字符_mysql – 如何在LIKE查询中转义字符?
  4. Exchange与ADFS单点登录 PART 6:Exchange声明规则配置
  5. 配置Hyper-V Server 资源计量
  6. 搭建scala 开发spark程序环境及实例演示
  7. .net中自定义过滤器对Response内容进行处理
  8. Tmux : GNU Screen 的替代品
  9. 剑指-顺时针打印矩阵
  10. 中石油训练赛 - Historical Maths(二分)