通过代码实例来说明spark api mapPartitions和mapPartitionsWithIndex的使用
代码片段1:
package com.oreilly.learningsparkexamples.scalaimport org.apache.spark._import org.eclipse.jetty.client.ContentExchange
import org.eclipse.jetty.client.HttpClientobject BasicMapPartitions {def main(args: Array[String]) {val master = args.length match {case x: Int if x > 0 => args(0)case _ => "local"}val sc = new SparkContext(master, "BasicMapPartitions", System.getenv("SPARK_HOME"))val input = sc.parallelize(List("KK6JKQ", "Ve3UoW", "kk6jlk", "W6BB"))val result = input.mapPartitions{signs =>val client = new HttpClient()client.start()signs.map {sign =>val exchange = new ContentExchange(true);exchange.setURL(s"http://qrzcq.com/call/${sign}")client.send(exchange)exchange}.map{ exchange =>exchange.waitForDone();exchange.getResponseContent()}}println(result.collect().mkString(","))}
}
上面的代码中,
mapPartitions的参数signs是input这个rdd的一个分区的所有element组成的Iterator
mapPartitions结果是一个分区的所有element被分区处理函数加工后的element组成的Iterator.
mapPartitions函数会对每个分区调用分区函数处理,然后将处理的结果(若干个Iterator)生成新的RDDs
如下这段代码:
package com.oreilly.learningsparkexamples.scalaimport org.apache.spark._object BasicAvgMapPartitions {case class AvgCount(var total: Int = 0, var num: Int = 0) {def merge(other: AvgCount): AvgCount = {total += other.totalnum += other.numthis}def merge(input: Iterator[Int]): AvgCount = {input.foreach{elem =>total += elemnum += 1}this}def avg(): Float = {total / num.toFloat;}}def main(args: Array[String]) {val master = args.length match {case x: Int if x > 0 => args(0)case _ => "local"}val sc = new SparkContext(master, "BasicAvgMapPartitions", System.getenv("SPARK_HOME"))val input = sc.parallelize(List(1, 2, 3, 4))val result = input.mapPartitions(partition =>// Here we only want to return a single element for each partition, but mapPartitions requires that we wrap our return in an IteratorIterator(AvgCount(0, 0).merge(partition))).reduce((x,y) => x.merge(y))println(result)}
}
上面的测试代码,首先对一个RDDs的分区所有元素组成的Iterator进行了合并操作,生成了一个元素, 然后调用Iterator()生成一个新的Iterator,然后作为结果返回 (虽然返回的Iterator中只有一个元素)
mapPartitionsWithIndex与mapPartition基本相同,只是在处理函数的参数是一个二元元组,元组的第一个元素是当前处理的分区的index,元组的第二个元素是当前处理的分区元素组成的Iterator
本文转自http://blog.csdn.net/u012684933/article/details/46894951,所有权力归原作者所有。
通过代码实例来说明spark api mapPartitions和mapPartitionsWithIndex的使用相关推荐
- spark学习-Spark的mapPartitions与MapPartitionsWithIndex理解
=mapPartitions=== 1.先看一个小程序 package scalaTestimport org.apache.spark.SparkContext import org.apache. ...
- spark应用程序转换_Spark—RDD编程常用转换算子代码实例
Spark-RDD编程常用转换算子代码实例 Spark rdd 常用 Transformation 实例: 1.def map[U: ClassTag](f: T => U): RDD[U] ...
- php 菜谱 源码,基于php的菜谱大全api调用代码实例
代码描述:基于php的菜谱大全api调用代码实例 接口地址:http://www.juhe.cn/docs/api/id/46 PHP代码 // +-------------------------- ...
- 外汇汇率接口 java_基于JAVA的货币汇率api调用代码实例
代码描述:基于JAVA的货币汇率api调用代码实例 关联数据:货币汇率 接口地址:http://www.juhe.cn/docs/api/id/23 1.[代码][Java]代码 import jav ...
- html约束验证的例子,HTML5利用约束验证API来检查表单的输入数据的代码实例
HTML5对于表单有着极大程度的优化,无论是语义,小部件,还是数据格式的验证.我猜你肯定会以浏览器兼容作为借口不愿意使用这些"新功能",但这绝不应该成为使你停滞不前的原因,况且还有 ...
- 基于java的圆通快递单号自动识别api接口代码实例
一.产品介绍 快递单号识别,输入运单号自动识别物流公司,实时返回对应物流公司编码.查询单号时,返回的结果可能存在一个或多个物流公司编码,快递鸟大数据平台通过智能分析,实时更新单号库,保障物流公司编码准 ...
- java查询序列_基于JAVA的苹果序列号查询api调用代码实例
代码描述:基于JAVA的苹果序列号查询api调用代码实例 关联数据:苹果序列号 接口地址:http://www.juhe.cn/docs/api/id/37 1.[代码][Java]代码 import ...
- 直播api接口java_基于JAVA的电视台直播节目时间表api调用代码实例
代码描述:基于JAVA的电视台直播节目时间表api调用代码实例 接口地址:http://www.juhe.cn/docs/api/id/129 1.[代码][Java]代码 import java.i ...
- 基于C#的全国天气查询API调用代码实例
全国天气查询API:https://www.juhe.cn/docs/api/id/39 基于C#的全国天气查询API调用代码实例 using System; using System.Collect ...
最新文章
- 在Repeater控件,Repeater1控件中都有FooterTemplate模板.但你在后台中如何去找FooterTemplate中的控件呢?...
- Go 2. 两数相加
- HDU4514(非连通图的环判断与图中最长链)
- Android---- 获取当前应用的版本号和当前android系统的版本号
- 【2016年第6期】中国科学院科学数据云建设与服务
- Java类加载器( 死磕9)
- Mongo散记--聚合(aggregation)amp; 查询(Query)
- android:Style and Theme
- [裴礼文数学分析中的典型问题与方法习题参考解答]4.3.17
- C++ 循环for 引用 for(string : )
- 微信清理僵尸粉脚本-基于auto.js
- pythonopencv人脸相似度_OpenCV3与深度学习实例:Dlib+VGG Face实现两张脸部图像相似度比较...
- rollup打包工具
- 【爬虫进阶】易班登录加密逆向
- 管理后台 - 轮播图片管理功能
- 阿里云域名解析,将域名绑定到指定服务器
- L1 L2正则化和优化器的weight_decay参数
- 书法拓片matlab,拓墨书法作品(拓片)的具体操作方法和步骤?
- zip压缩文件转换为可传输byte[]流和不解压下读取指定zip包中文件
- 通过外部Python调用FreeCAD