Spark 编程模型

在Spark 中, 我们通过对分布式数据集的操作来表达计算意图 ,这些计算会自动在集群上

井行执行 这样的数据集被称为弹性分布式数据集 Resilient Distributed Dataset ),简称 RDD

RDD 是Spark 分布式数据和计算的基本抽象。在 Spark 中,对数据的所有操作不外乎创建

RDD 、转换己有 RDD 以及调用 RDD 操作进行求值 rdd 和wordrnap 都是 MapPartition RDD 类型 RD ,而 wordreduce ShuffiedRDD 类型的 RDD

RDD 支持2种 类型的操作 转换操作( Transformation Operation )和行动操作 Action

Operation )。有些资料还会细分为创建操作、转换操作、控制操作和行动操作4 类型。转换

操作会由一个 RDD 生成一个新的 RDD 行动操作会对 RDD 算出 个结果,并把结果返回驱

动器程序,或者把结果存储到外部存储系统中 转换操作和行动操作的区别在于 Spark 计算 RDD

的方式不同 虽然可以在任何时候定义新的 RDD ,但 Spark 只会惰性计算这些 RDD 。它们只有

第一次在 个行动操作中用到时才会 正计算。

转换操作和行动操作的对比

通过转换操作,从己有的 RDD 中派生出新的 RDD Spark 会使用谱系图( Lineage Graph,

很多资料也会翻译为“血统”)来记录这些不同 RDD 间的依赖关系 Spark 要用这些信息

来按需计算每个 RDD ,也可以依赖谱系图在持久化的 RD 丢失部分数据时恢复丢失的数据。

行动操作会把最终求得的结果返回驱动器程序,或者写入外部存储系统。由于行动操作需要生

产实际的输出,所以它们会强制执行那些求值必须用到的 RDD 转换操作。

Spark 中RDD 计算是以分区( Part ion )为单位的,将 RDD 分为很 个分区分布到集群

的节点中,分区的多少涉及对这个 RD 进行并行计算的粒度。如图 12-2 所示 实线方框 A、B、C、D、E、F、G阴影背景的矩形 表示分区。 A、B、C、D、E、F、G之间的依赖关系构成整个应用的谱系图。

依赖关系还可以分为窄依赖和宽依赖。窄依赖 Narrow ependen cie )是指每个父 RDD 的

分区都至多被一个RDD 的分区使用, 而宽依赖( Wide Dependencies )是指多个子 RDD 的分区依赖一个父 RDD 分区。图 12-2 中,C和D 之间是窄依赖,而 A和B之间是宽依赖。 RDD中行动操作的执行会以宽依赖为分界来构建各个调度阶段,各个调度阶段 内部的窄依赖、前后链接构成流水线。图中的 个虚线方框分别代表了 个不同的调度阶段。 对于执行失败的任 ,只 要它对应的调度阶段的父类信息仍然可用,那么该 务就会分散

到其他节点重新执行。如果某些调 阶段不可用,则重新提交相应的任务,并以并行方式计算

丢失的地方。在整个作业中,如果某个任务执行缓慢, 则系统会在其他节点上执行该任务的副 本,并取最先得到的结果作为最终的结果。

下面就以 12 节中相同的单词统计程序为例来分析 Spark 的编程模型,与 12.1 节中所不

同的是, 这里是 个完整的 Scala 程序,程序对应的 Maven 依赖如下

单词统计程序如代码清单 12-1 示。

代码清单 12-1 单词统计程序

main() 方法主体的第①和第②行中首先创建一个 SparkConf 对象来配置应用程序,然后基于这个 SparkConf 建了一个 SparkContext 象。一旦有了 SparkContext ,就可 以用它创建RDD 第③行代码中调用 sc textFile ()来创 建一个代表文 件中各行文本的 RDD 第④行中

rdd flatMap(_.split(””)) .map(x=>帜, ))这一段内容的依赖关系是窄依赖,而reduceByKey(_+ _)操 作对单词进行计数时属于宽依赖。第⑤行中将排序后的结果存储起来。最后第⑦行中使用 top()

方法来关闭应用。

在SPARK_HOME/bin 目录中还有一个 spark-submit 脚本,用于将应用 快速部署到Spark 集

群。 比如这里的 WordCount 程序 当我 希望通过 park-submit 进行部署 ,只需要将应用打

包成 jar 包(即下面示例中的 wordcount. )井上传到 Spark 集群 然后通 spark-submit 进行

部署即 ,示例如下

[root@no del spark)# bin/spark- submit --class scala.spark.demo . WordCount wordcount . jar --executor- memory lG --master spark : //localhost : 7077 2018 - 08 - 06 15:39 : 54 WARN NativeCodeLoader:62 - Unable to load native hadoop library for your platform . .. using builtin- ] ava classes where applicable 2018-08 - 06 15 : 39 : 55 INFO SparkContext : 54 - Running Spark vers on 2 . 3 . 1 2018 - 08 - 06 15 : 39 : 55 INFO SparkContext : 54 - Submitted applicat on WordCount 2018 - 08 - 06 15:39 : 55 INFO SecurityManager : 54 - Chang ng view acls to : root 2018 - 08 - 06 1 5 : 39 : 55 I NFO SecurityManager 54- Chang ng modify acls to : root ( .... ;占略若干)2018 - 08 - 07 12 : 25 : 47 INFO AbstractConnector : 318 - Stopped Spark@62 99e2cl {HTTP /1 . 1 , [http/ 1 . l) } { 0. 0 . 0. 0: 4 04 0} 2018 - 08 - 07 12 : 25 : 47 INFO SparkUI : 54 - Stopped Spark web UI at http: // 10 . 199 . 172 . 111 : 4040 2018 - 08-07 12 : 25 : 47 INFO MapOutp tTrackerMasterEndpo nt 54 - MapOutputTrackerMasterEndpoint stop ped' 2018 - 08-07 12 : 25:47 INFO MemoryStore : 54 - MemoryStore cleared 2018 - 08-07 12 : 25 : 47 INFO BlockManager:54 - BlockManager stopped 2018 - 08 - 07 12 : 25 : 47 INFO BlockManagerMaster : 54 - BlockManagerMaster stopped 2018 - 08 - 07 12 : 25 : 47 INFO OutputCommitCoordinator OutputCommitCoordinatorEndpoint:54 - OutputCommitCoordinator stopped ! 2018 08-06 15 : 46 : 57 INFO SparkCo text 54 - Successfully stopped SparkContext 20 1 8 - 08-06 1 5 : 46:57 INFO ShutdownHookManager : 54 Shutdown hook called 2018 - 08 - 06 15 : 46 : 57 INFO ShutdownHoo kManager : 54 - Delet ng directory /tmp/spark- fa955139-270c-4899 - 82b7 - 4959983alcb0 2018 - 08 - 06 15 : 46 : 57 INFO ShutdownHookManager : 54 - Deleting directory /tmp/spark-3f359966- 2167 - 4bb9 - 863a - 2d8a8d5e8fbe 

实例中的--class 用来指定应用程序 主类,这里为 eal ark.demo.WordCount;

execu or memory 用来指定执行器节点的内容,这里设置为 lG 。最后得到的输出结果如

下所示。

[root@node l spark) # ls /tmp/spar k part 00000 SUCCESS [root@nodel spark)# cat /tmp/spark/part-00000 (, 91) (# ' 37) (the , 19) (in, 7) (to , 7) (for, 6) (if, 5) (then, 5) (under, 4) (stty, 4) (not, 4) 

Spark 的运行结构

在分布式环境下 Spark 集群采用的是主从架构。如图 12-3 示,在一个Spark 集群中,

有一个节点负责中央协调,调度各个分布式工作节点,这个中央协调节点被称为驱动器( Driver

节点 与之对应的工作节点被称为执行器( Executor )节点。驱动器节点可以和大量的执行器节

点进行通信 它们都作为独立的进程运行。驱动器节点和所有的执行器节点一起被称为 Spark

应用( Application)。

Spark 应用通过一个叫作集群管理器( luster Manager )的外部服务在集群中的机器上启动。 Spark 自带的集群管理器被称为独立集群管理器 Spark 也能运行 YARN Mesos Kubemetes 这类开源集群管理器上Spark 驱动器节点是执行程序中的 main()方法的进程。它执行用户编写的用来创建 SparkContext RDD ,以及进行 RDD 转换操作和行动操作的代码。其实,当启动 park-shell 时,就启动了一个 park 驱动程序。驱动程序一旦停止 Spark 应用也就结束了

Kafka与Spark trea ing 的整合

采用 Spark Stre ming 流式处理 fka 中的数据,首先需要把数据从 Kafka 中接收过 ,然

后转换为 Spark Streaming 中的 DStrea 。接收数据的方式一共有两种:利用接收器Receiver 的方式接收数据和直接Kafka中读取数据 。

Receiver 方式通过KafkaUtils. creates trea ()方法来创建一个DS tream 对象 ,它不关注消费的位移的处理,Receive方式的结构如图 12-9所示 但这种方式在 Spark 任务执行异常 导致 数据丢失,如果要保证数据的可靠性,则需要开启预写式日志,简称 AL (Write Ahead Logs) , 只有收到的数据被持久化到 WAL 之后才会更新 Kafka 中的消费位移。收 的数据 WAL储存

位置信息被可靠地存储,如果期间出现故障,那么这些信息被用来从错误中恢复,并继续处理

数据。

WAL 的方式可以保证从 Kafka 中接收的数据不被丢失 但是在某些异常情况下,一些数据

被可靠地保存到了 WAL 中,但是还没有来得及更新消费位移,这样会造成 Kafka 中的数据被

Spark 拉取 了不止一次。同时在 Receiver 方式中 Spark的RDD 分区 Kafka 的分区并不是相

关的,因此增加 Kafk 中主题的分区数并不能增加 Spark处理的并行度,仅仅增加了接收器接

收数据的并行度

Direct 方式是从 Spark 1.3 开始引入的,它通过 KafkaUtil s.createDire ctStream() 方法创建一个

DStream 象, Direct 方式的结构如图 12-10 所示。该方式中 Kafka 的一个分区与 SparkRDD对应,通过定期扫描所订阅的 afka 每个主题的每个分区的最新偏移量以确定当前批处理数据偏 移范围。与 Rec iver 方式相比, irect 方式不需要维护一份WAL 数据,由 park Streaming 程序自控制位移的处理,通常通过检查点机制处理消费位移,这样可以保证 Kafka 中的数据只 会被 Spark 拉取一次。

下面使用一个简单的例子来演示 Spark Streaming和Kafka 的集成。在该示例中,每秒往

Kafka写入一个0到9之间的随机数,通过 Spark Streaming从Kafka 中获取数据并实 计算批次间隔内的数据的数值之和

往Kafk 中写入随 数的主要代码如下:

Random random = new Random( ); wh le (true) { String msg = String.val ueOf( r andom.nextint(lO) ); ProducerRecord, String> message = new ProducerRecord<>(topic , msg ); producer.send(message) . get() ; TimeUnit.SECONDS . sleep(l) ; 

Kafka与Spark Streaming的集成示例如代码清单 12-3所示,代码中的批次间隔设置为 2s

示例中的主题 topic spark 包含4个分区。

代码清单12-3 Kafka与Spa Streaming的集成示例

其实,kafka的设计实现,涉及到太多的底层技术,为了能够把它吃透,需要花大量的时间和精力。

在这里,送大家一张 Kafka 学习框架,分为 Kafka 入门、Kafka 的基本使用、客户端详解、Kafka 原理介绍、Kafka 运维与监控以及高级 Kafka 应用。

需要这份的kafka朋友们转发收藏+关注私信“资料”立即获取

spark 集群单词统计_最近Kafka这么火,聊一聊Kafka:Kafka与Spark的集成相关推荐

  1. hadoop集群-单词统计

    1.在用Hadoop进行单词统计前,要做好Hadoop的集群部署 输入上述命令,就能在浏览器中分别访问namenode:50070(namenode指的是你主节点的名字,这里我的主节点名字是namen ...

  2. 用spark自带的示例SparkPi测试scala和spark集群

    在按照王家林的文档安装完scala,spark集群和idea-IC开发工具后,用spark自带的示例SparkPi测试scala和spark集群 1.按照王家林文档中的方法把spark自带的Spark ...

  3. Windows家庭版下基于Docker的hadoop、Spark集群搭建

    Windows家庭版下基于Docker的hadoop.Spark集群搭建 目录 Windows家庭版下基于Docker的hadoop.Spark集群搭建 1.实验目的 2.实验平台 3.实验内容和要求 ...

  4. Spark集群 + Akka + Kafka + Scala 开发(2) : 开发一个Spark应用

    前言 在Spark集群 + Akka + Kafka + Scala 开发(1) : 配置开发环境,我们已经部署好了一个Spark的开发环境. 本文的目标是写一个Spark应用,并可以在集群中测试. ...

  5. 使用Python+jieba和java+庖丁分词在Spark集群上进行中文分词统计

    写在前边的话: 本篇博客也是在做豆瓣电影数据的分析过程中,需要对影评信息和剧情摘要信息进行分析而写的一篇博客 以前学习hadoop时,感觉做中文分词也没那么麻烦,但是到了Spark,却碰到了诸多困难, ...

  6. spark 序列化错误 集群提交时_【问题解决】本地提交任务到Spark集群报错:Initial job has not accepted any resources...

    本地提交任务到Spark集群报错:Initial job has not accepted any resources 错误信息如下: 18/04/17 18:18:14 INFO TaskSched ...

  7. spark 广播变量大数据_大数据处理 | Spark集群搭建及基本使用

    点击蓝字关注我 前面用了一篇文章详细的介绍了集群HDFS文件系统的搭建,HDFS文件系统只是一个用于存储数据的系统,它主要是用来服务于大数据计算框架,例如MapReduce.Spark,本文就接着上一 ...

  8. spark 并行处理_如何使用Spark集群并行处理大数据

    spark 并行处理 by Hari Santanam 通过Hari Santanam 如何使用Spark集群并行处理大数据 (How to use Spark clusters for parall ...

  9. docker下,极速搭建spark集群(含hdfs集群)

    搭建spark和hdfs的集群环境会消耗一些时间和精力,处于学习和开发阶段的同学关注的是spark应用的开发,他们希望整个环境能快速搭建好,从而尽快投入编码和调试,今天咱们就借助docker,极速搭建 ...

最新文章

  1. VLC 关键模块结构分析
  2. git cherry-pick命令
  3. java8的rmi_Java中的RMI
  4. 如何诊断ORA-125XX连接问题
  5. 【渝粤教育】国家开放大学2018年秋季 0161-22T教师职业道德 参考试题
  6. 成功跳槽百度工资从15K涨到28K,已整理成文档
  7. 集群为什么最少6个_结构化面试答题技巧:多年的经验告诉你,最少要注意这6个方面...
  8. 数字信号处理的fpga实现_FPGA提高雷达性能,实现脉冲压缩
  9. 计算机网络工程师模拟题库,计算机网络工程师模拟题56.doc
  10. linux该如何备份多个分区文件格式,使用partimage 备份Linux 多格式分区
  11. 手写签名 PNG 制作
  12. 使用Nginx中遇到的一个小问题思考
  13. python3.x和python2.x唯一区别_Python3.x和Python2.x的区别
  14. CUDA C++ Programming Guide——编程模型
  15. 基础通用版IPv6转换服务使用说明及设置示例
  16. 计算任意年份之间的天数
  17. js除法四舍五入保留小数点后两位写法
  18. Qt入门视频教程地址分享
  19. c++网络开发必备dll---ssleay32.dll和libeay32.dll
  20. 解决第一个Android工程sync失败失败90%问题的方法(sync失败、unresolved ...、not found、。。。)

热门文章

  1. tf.arg_max
  2. numpy 矩阵的用法
  3. 机器学习从入门到精通系列之BP神经网络理论知识详解
  4. 阿里妈妈:基于动态背包的多场景广告序列投放算法
  5. Python 知识点全解析系列之列表推导式(list comprehension)
  6. LeetCode-链表-19. 删除链表的倒数第 N 个结点
  7. 【机器学习算法-python实现】最大似然估计(Maximum Likelihood)
  8. 第九章 组合模型在信贷风控中的应用
  9. MongoDB如何一次插入多条json数据--转
  10. 手把手教你搭建一个基于Java的分布式爬虫系统