转: https://blog.csdn.net/weixin_38750084/article/details/82769600

这篇文章非常棒, 用代码实际演示了如何创建RDD; 本文主要转载了 java创建RDD的两种方式, 

【方式1】

下面开始初始化spark
spark程序需要做的第一件事情,就是创建一个SparkContext对象,它将告诉spark如何访问一个集群,而要创建一个SparkContext对象,你首先要创建一个SparkConf对象,该对象访问了你的应用程序的信息
比如下面的代码是运行在spark模式下

public class sparkTestCon {public static void main(String[] args) {SparkConf conf=new SparkConf();conf.set("spark.testing.memory", "2147480000");     //因为jvm无法获得足够的资源JavaSparkContext sc = new JavaSparkContext("spark://192.168.52.140:7077", "First Spark App",conf);System.out.println(sc);}}

下面是运行在本机,把上面的第6行代码改为如下

JavaSparkContext sc = new JavaSparkContext("local", "First Spark App",conf);

RDD的创建有两种方式 
1.引用外部文件系统的数据集(HDFS) 
2.并行化一个已经存在于驱动程序中的集合(并行集合,是通过对于驱动程序中的集合调用JavaSparkContext.parallelize来构建的RDD)

第一种方式创建 
下面通过代码来理解RDD和怎么操作RDD

package com.tg.spark;import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.storage.StorageLevel;
/*** 引用外部文件系统的数据集(HDFS)创建RDD*  匿名内部类定义函数传给spark* @author 汤高**/
public class RDDOps {//完成对所有行的长度求和public static void main(String[] args) {SparkConf conf=new SparkConf();conf.set("spark.testing.memory", "2147480000");     //因为jvm无法获得足够的资源JavaSparkContext sc = new JavaSparkContext("local", "First Spark App",conf);System.out.println(sc);//通过hdfs上的文件定义一个RDD 这个数据暂时还没有加载到内存,也没有在上面执行动作,lines仅仅指向这个文件JavaRDD<String> lines = sc.textFile("hdfs://master:9000/testFile/README.md");//定义lineLengths作为Map转换的结果 由于惰性,不会立即计算lineLengths//第一个参数为传入的内容,第二个参数为函数操作完后返回的结果类型JavaRDD<Integer> lineLengths = lines.map(new Function<String, Integer>() {public Integer call(String s) { System.out.println("每行长度"+s.length());return s.length(); }});//运行reduce  这是一个动作action  这时候,spark才将计算拆分成不同的task,//并运行在独立的机器上,每台机器运行他自己的map部分和本地的reducation,并返回结果集给去驱动程序int totalLength = lineLengths.reduce(new Function2<Integer, Integer, Integer>() {public Integer call(Integer a, Integer b) { return a + b; }});System.out.println(totalLength);//为了以后复用  持久化到内存...lineLengths.persist(StorageLevel.MEMORY_ONLY());}
}

如果觉得刚刚那种写法难以理解,可以看看第二种写法

package com.tg.spark;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.storage.StorageLevel;
/*** 引用外部文件系统的数据集(HDFS)创建RDD *  外部类定义函数传给spark* @author 汤高**/
public class RDDOps2 {// 完成对所有行的长度求和public static void main(String[] args) {SparkConf conf = new SparkConf();conf.set("spark.testing.memory", "2147480000"); // 因为jvm无法获得足够的资源JavaSparkContext sc = new JavaSparkContext("local", "First Spark App", conf);System.out.println(sc);//通过hdfs上的文件定义一个RDD 这个数据暂时还没有加载到内存,也没有在上面执行动作,lines仅仅指向这个文件JavaRDD<String> lines = sc.textFile("hdfs://master:9000/testFile/README.md");//定义lineLengths作为Map转换的结果 由于惰性,不会立即计算lineLengthsJavaRDD<Integer> lineLengths = lines.map(new GetLength());//运行reduce  这是一个动作action  这时候,spark才将计算拆分成不同的task,//并运行在独立的机器上,每台机器运行他自己的map部分和本地的reducation,并返回结果集给去驱动程序int totalLength = lineLengths.reduce(new Sum());System.out.println("总长度"+totalLength);// 为了以后复用 持久化到内存...lineLengths.persist(StorageLevel.MEMORY_ONLY());}//定义map函数//第一个参数为传入的内容,第二个参数为函数操作完后返回的结果类型static class GetLength implements Function<String, Integer> {public Integer call(String s) {return s.length();}}//定义reduce函数 //第一个参数为内容,第三个参数为函数操作完后返回的结果类型static class Sum implements Function2<Integer, Integer, Integer> {public Integer call(Integer a, Integer b) {return a + b;}}
}

【方式2】 (java编程推荐)

并行化一个已经存在于驱动程序中的集合(并行集合,是通过对于驱动程序中的集合调用JavaSparkContext.parallelize来构建的RDD)

package com.tg.spark;import java.util.Arrays;
import java.util.List;import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.storage.StorageLevel;import com.tg.spark.RDDOps2.GetLength;
import com.tg.spark.RDDOps2.Sum;
/*** 并行化一个已经存在于驱动程序中的集合创建RDD* @author 汤高**/
public class RDDOps3 {// 完成对所有数求和public static void main(String[] args) {SparkConf conf = new SparkConf();conf.set("spark.testing.memory", "2147480000"); // 因为jvm无法获得足够的资源JavaSparkContext sc = new JavaSparkContext("local", "First Spark App", conf);System.out.println(sc);List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);//并行集合,是通过对于驱动程序中的集合调用JavaSparkContext.parallelize来构建的RDDJavaRDD<Integer> distData = sc.parallelize(data);JavaRDD<Integer> lineLengths = distData.map(new GetLength());// 运行reduce 这是一个动作action 这时候,spark才将计算拆分成不同的task,// 并运行在独立的机器上,每台机器运行他自己的map部分和本地的reducation,并返回结果集给去驱动程序int totalLength = lineLengths.reduce(new Sum());System.out.println("总和" + totalLength);// 为了以后复用 持久化到内存...lineLengths.persist(StorageLevel.MEMORY_ONLY());}// 定义map函数static class GetLength implements Function<Integer, Integer> {@Overridepublic Integer call(Integer a) throws Exception {return a;}}// 定义reduce函数static class Sum implements Function2<Integer, Integer, Integer> {public Integer call(Integer a, Integer b) {return a + b;}}
}

注意:上面的写法是基于jdk1.7或者更低版本 
基于jdk1.8有更简单的写法 
下面是官方文档的说明

所以如果要完成上面第一种创建方式,在jdk1.8中可以简单的这么写

JavaRDD<String> lines = sc.textFile("hdfs://master:9000/testFile/README.md");
JavaRDD<Integer> lineLengths = lines.map(s -> s.length());
int totalLength = lineLengths.reduce((a, b) -> a + b);List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> distData = sc.parallelize(data);

主要不同就是在jdk1.7中我们要自己写一个函数传到map或者reduce方法中,而在jdk1.8中可以直接在map或者reduce方法中写lambda表达式

参考原文:https://blog.csdn.net/tanggao1314/article/details/51570452/

扩展:

SparkContext的parallelize的参数

通过调用SparkContext的parallelize方法,在一个已经存在的Scala集合上创建的(一个Seq对象)。集合的对象将会被拷贝,创建出一个可以被并行操作的分布式数据集。

var data = [1, 2, 3, 4, 5]
var distData = sc.parallelize(data)  

在一个Spark程序的开始部分,有好多是用sparkContext的parallelize制作RDD的,是ParallelCollectionRDD,创建一个并行集合。

例如sc.parallelize(0 until numMappers, numMappers)

创建并行集合的一个重要参数,是slices的数目(例子中是numMappers),它指定了将数据集切分为几份。

在集群模式中,Spark将会在一份slice上起一个Task。典型的,你可以在集群中的每个cpu上,起2-4个Slice (也就是每个cpu分配2-4个Task)。

一般来说,Spark会尝试根据集群的状况,来自动设定slices的数目。当让,也可以手动的设置它,通过parallelize方法的第二个参数。(例如:sc.parallelize(data, 10)).

参考:https://blog.csdn.net/caoli98033/article/details/41777065

(精华)转:RDD:创建的几种方式(scala和java)相关推荐

  1. Java中线程的创建有两种方式

    Java中继承thread类与实现Runnable接口的区别 Java中线程的创建有两种方式: 1.  通过继承Thread类,重写Thread的run()方法,将线程运行的逻辑放在其中 2.  通过 ...

  2. Spark 基础 —— RDD(创建 RDD)的两种方式

    弹性分布式数据集(Resilient Distributed Dataset),简称 RDD,RDD 是 Spark 所提供的最基本的数据抽象,代表分布在集群中多台机器上的对象集合.Spark 有两种 ...

  3. 一心多用多线程-线程创建的三种方式

    第一次了解java线程机制,记录一下线程启动的三种方式. 1.通过继承Thread类调用一个线程 public class Thread1 extends Thread{@Overridepublic ...

  4. Qt创建线程两种方式的区别

    使用QT创建线程有两种方式,方式A使用moveToThread,方式B是直接继承QThread.差异主要在于方式A的槽函数将会在新线程中运行,而方式B的槽函数在旧线程中运行. 结论如下: PS:旧线程 ...

  5. 【大数据开发】SparkCore——Spark作业执行流程、RDD编程的两种方式、简单算子

    文章目录 一.Spark作业执行流程(重点) 二.RDD编程 2.1创建RDD的⼆种⽅式: 2.2Transformation算⼦ 2.3Action算子 三.简单算子(必须掌握) 3.1 map.m ...

  6. java多线程学习一、线程介绍、线程创建的3种方式、lambda创建方式、线程状态、线程示例:12306买票和银行取钱

    文章目录 前言 一.线程简介 1.概述 2.进程.线程 区别 在这里插入图片描述 3. 核心概念 二. 线程创建 1.概述 2. 第一种方式继承Thread 1) 继承Thread 2) 示例:下载图 ...

  7. 线程生命周期以及线程创建的三种方式

    1. 线程生命周期 线程生命周期图 新建状态(New) 当线程对象创建后,即进入新建状态,如:Thread t = new MyThread(); 就绪状态(Runnable) 当调用线程对象的sta ...

  8. 浅析Revit体量创建的几种方式

    随着项目经验的不断加深,可以发现在通过Revit进行开发过程中,创建体量是一个非常重要的工作,今天就在族文件中通过拉伸创建体量的几种方式进行说明以及简单示例,供大家参考. 体量创建API 通过轮廓和拉 ...

  9. javascript对象创建的五种方式

    <html> <head> <script> /* 5.动态原型法 */ function DCar(name,price){this.name=name;this ...

最新文章

  1. 多态指针访问虚函数不能被继承的类快速排序N皇后问题插入排序堆排序merge归并排序栈上生成对象两个栈实现一个队列...
  2. jsp+ajax+servlet+sqlserver实现分页查询_SXT DAY063 分页
  3. 人工智能tensorflow图的可视化
  4. TCP同步和异步连接_学习笔记
  5. 阿里云双11全球狂欢节 计算资源买买买
  6. 如何导出android studio程序,(技术)聊聊Android Studio 如何生成Jar
  7. TensorFlow 教程 --教程--2.2 数据准备
  8. Linux-HA 高可用开源方案 Keepalived VS Heartbeat 的选择
  9. 个人的Directx9研究总结 (1)
  10. win的反义词_小学英语常见的120对反义词大全,聪明的小升初家长快来收藏学习...
  11. Flixel横板游戏制作教程(九)—SquashingthePlayer(挤压Player)
  12. 程序猿必备福利之二上篇!!!简易使用Nodejs实现从美图网爬取清晰脱俗的美图???
  13. 佳能(Canon)打印机初始化备忘录
  14. 计算机网络安全设计毕业设计,计算机网络安全及防护毕业设计论文01
  15. labview:一个采集数据的小程序
  16. YOLOV3训练自己的数据集(PyTorch版本)
  17. java 集合分组_java List 如何进行分组
  18. 2018年终总结之最有成就感的几件事
  19. 神经网络专业硕士就业,神经科学专业就业方向
  20. 服务器判断是手机访问网址还是电脑访问网址

热门文章

  1. Codeforces Round #696
  2. 牛客题霸 [删除有序链表中重复的元素] C++题解/答案
  3. 「ROI 2017 Day 2」反物质(单调队列优化dp)
  4. P3295 [SCOI2016]萌萌哒(DP+倍增)
  5. CF850F Rainbow Balls(数学、期望)
  6. YbtOJ-森林之和【dp】
  7. 【dfs】【模拟】【树】I Like Matrix Forever!
  8. 操作系统复习笔记 05 Thread 线程
  9. 30、JAVA_WEB开发基础之servlet(1)
  10. Java web文件下载断点续传