(一)概述

算子从功能上可以分为Transformations转换算子和Action行动算子。转换算子用来做数据的转换操作,比如map、flatMap、reduceByKey等都是转换算子,这类算子通过懒加载执行。行动算子的作用是触发执行,比如foreach、collect、count等都是行动算子,只有程序运行到行动算子时,转换算子才会去执行。

本文将介绍开发过程中常用的转换算子和行动算子,Spark代码基于Java编写,前置代码如下:

public class SparkTransformationTest {public static void main(String[] args) {// 前置准备SparkConf conf = new SparkConf();conf.setMaster("local[1]");conf.setAppName("SPARK ES");JavaSparkContext sc = new JavaSparkContext(conf);JavaRDD<String> javaRdd = sc.parallelize(Arrays.asList("a", "b", "c", "d", "e"));}
}

(二)转换算子

map

map(func):通过函数func传递的每个元素,返回一个新的RDD。

JavaRDD<Object> map = javaRdd.map((Function<String, Object>) item -> "new" + item);
map.foreach(x -> System.out.println(x));

返回一个新的RDD,数据是newa、newb、newc、newd、newe

filter

filter(func):筛选通过func处理后返回 true 的元素,返回一个新的RDD。

JavaRDD<String> filter = javaRdd.filter(item -> item.equals("a") || item.equals("b"));
filter.foreach(x -> System.out.println(x));

返回的新RDD数据是a和b。

flatMap

flatMap(func):类似于 map,但每个输入项可以映射到 0 个或更多输出项。

JavaRDD<String> javaRdd = sc.parallelize(Arrays.asList("a,b", "c,d,e", "f,g"));
JavaRDD<String> flatMap = javaRdd.flatMap((FlatMapFunction<String, String>) s -> Arrays.asList(s.split(",")).iterator());
flatMap.foreach(x -> System.out.println(x));

入参只有3个,经过flatMap映射后返回了长度为7的RDD。

mapPartitions

mapPartitions(func):类似于map,但该函数是在RDD每个partition上单独运行,因此入参会是Iterator<Object>

JavaRDD<String> mapPartitions = javaRdd.mapPartitions((FlatMapFunction<Iterator<String>, String>) stringIterator -> {ArrayList<String> list = new ArrayList<>();while (stringIterator.hasNext()) {list.add(stringIterator.next());}return list.iterator();
});
mapPartitions.foreach(x -> System.out.println(x));

除了是对Iterator进行处理之外,其他的都和map一样。

union

union(otherDataset):返回一个新数据集,包含两个数据集合的并集。

JavaRDD<String> newJavaRdd = sc.parallelize(Arrays.asList("1", "2", "3", "4", "5"));
JavaRDD<String> unionRdd = javaRdd.union(newJavaRdd);
unionRdd.foreach(x-> System.out.println(x));

intersection

intersection(otherDataset):返回一个新数据集,包含两个数据集合的交集。

JavaRDD<String> newJavaRdd = sc.parallelize(Arrays.asList("a", "b", "3", "4", "5"));
JavaRDD<String> intersectionRdd = javaRdd.intersection(newJavaRdd);
intersectionRdd.foreach(x-> System.out.println(x));

groupByKey

groupByKey ([ numPartitions ]):在 (K, V) 对的数据集上调用时,返回 (K, Iterable) 对的数据集,可以传递一个可选numPartitions参数来设置不同数量的任务。

这里需要了解Java中的另外一种RDD,JavaPairRDD。JavaPairRDD是一种key-value类型的RDD,groupByKey就是针对JavaPairRDD的API。

JavaRDD<String> rdd = sc.parallelize(Arrays.asList("a:1", "a:2", "b:1", "c:3"));
JavaPairRDD<String, Integer> javaPairRDD = rdd.mapToPair(s -> {String[] split = s.split(":");return new Tuple2<>(split[0], Integer.parseInt(split[1]));
});
JavaPairRDD<String, Iterable<Integer>> groupByKey = javaPairRDD.groupByKey();
groupByKey.foreach(x -> System.out.println(x._1()+x._2()));

最终输出结果:

a[1, 2]
b[1]
c[3]

reduceByKey

reduceByKey(func, [numPartitions]):在 (K, V) 对的数据集上调用时,返回 (K, V) 对的数据集,其中每个键的值使用给定的 reduce 函数func聚合。和groupByKey不同的地方在于reduceByKey对value进行了聚合处理。

JavaRDD<String> rdd = sc.parallelize(Arrays.asList("a:1", "a:2", "b:1", "c:3"));
JavaPairRDD<String, Integer> javaPairRDD = rdd.mapToPair(s -> {String[] split = s.split(":");return new Tuple2<>(split[0], Integer.parseInt(split[1]));
});
JavaPairRDD<String, Integer> reduceRdd = javaPairRDD.reduceByKey((Function2<Integer, Integer, Integer>) (x, y) -> x + y);
reduceRdd.foreach(x -> System.out.println(x._1()+x._2()));

最终输出结果:

a3
b1
c3

aggregateByKey

aggregateByKey(zeroValue)(seqOp, combOp, [numPartitions]):aggregateByKey这个算子相比上面这些会复杂很多,主要参数有zeroValue、seqOp、combOp,numPartitions可选。

zeroValue是该算子设置的初始值,seqOp函数是将rdd中的value值和zeroValue进行处理,combOp是将相同key的数据进行处理。

JavaRDD<String> rdd = sc.parallelize(Arrays.asList("a:1", "a:2", "b:1", "c:3"));
JavaPairRDD<String, Integer> javaPairRDD = rdd.mapToPair(s -> {String[] split = s.split(":");return new Tuple2<>(split[0], Integer.parseInt(split[1]));
});
JavaPairRDD<String, Integer> aggregateRdd = javaPairRDD.aggregateByKey(1,// seqOp函数中的第一个入参是 zeroValue,第二个入参是rdd的value,这里对所有的value+1(Function2<Integer, Integer, Integer>) (v1, v2) -> v1 + v2,// combOp函数是对同一个key的value进行处理,这里对相同key的value进行相加(Function2<Integer, Integer, Integer>) (v1, v2) -> v1 + v2);
aggregateRdd.foreach(x -> System.out.println(x._1()+":"+x._2()));

最终输出结果如下:

a:4
b:2
c:4

(三)行动算子

reduce

reduce(func):使用函数func聚合数据集的元素(它接受两个参数并返回一个)。
下面这段代码对所有rdd进行相加:

String reduce = javaRdd.reduce((Function2<String, String, String>) (v1, v2) -> {System.out.println(v1 + ":" + v2);return v1+v2;
});
System.out.println("result:"+reduce);

最终结果如下,从结果可以看出,每次对v1都是上一次reduce运行之后的结果:

a:b
ab:c
abc:d
abcd:e
result:abcde

collect()

collect():将driver中的所有元素数据通过集合的方式返回,适合小数据量的场景,大数据量会导致内存溢出。

JavaRDD<String> javaRdd = sc.parallelize(Arrays.asList("a", "b", "c", "d", "e"));
List<String> collect = javaRdd.collect();

count()

count():返回一个RDD中元素的数量。

JavaRDD<String> javaRdd = sc.parallelize(Arrays.asList("a", "b", "c", "d", "e"));
long count = javaRdd.count();

first()

first():返回第一个元素。

JavaRDD<String> javaRdd = sc.parallelize(Arrays.asList("a", "b", "c", "d", "e"));
String first = javaRdd.first();

take

take(n):返回前N个元素,take(1)=first()。

JavaRDD<String> javaRdd = sc.parallelize(Arrays.asList("a", "b", "c", "d", "e"));
List<String> take = javaRdd.take(3);

takeOrdered

takeOrdered(n, [ordering]):返回自然排序的前N个元素,或者指定排序方法后的前N个元素。首先写一个排序类。

public class MyComparator implements Comparator<String>, Serializable {@Overridepublic int compare(String o1, String o2) {return o2.compareTo(o1);}
}

接着是调用方式:

JavaRDD<String> javaRdd = sc.parallelize(Arrays.asList("a", "b", "c", "d", "e"));
List<String> take = javaRdd.takeOrdered(3, new MyComparator());

foreach

foreach(func):该函数对数据集的每个RDD运行func函数,foreach算子在上面的代码中已经使用到,这里不再做代码案例展示。

(四)总结

Spark的开发可以用Java或者Scala,Spark本身使用Scala编写,具体使用哪种语言进行开发需要根据项目情况考虑时间和学习成本。具体的API都可以在Spark官网查询:https://spark.apache.org/docs/2.3.0/rdd-programming-guide.html

Spark算子实战Java版,学到了相关推荐

  1. 第六篇 :微信公众平台开发实战Java版之如何自定义微信公众号菜单

    我们来了解一下 自定义菜单创建接口: http请求方式:POST(请使用https协议) https://api.weixin.qq.com/cgi-bin/menu/create?access_to ...

  2. Spark算子---实战应用

    Spark算子实战应用 数据集 :http://grouplens.org/datasets/movielens/ MovieLens 1M Datase 相关数据文件 : users.dat --- ...

  3. 第三篇 :微信公众平台开发实战Java版之请求消息,响应消息以及事件消息类的封装...

    微信服务器和第三方服务器之间究竟是通过什么方式进行对话的? 下面,我们先看下图: 其实我们可以简单的理解: (1)首先,用户向微信服务器发送消息: (2)微信服务器接收到用户的消息处理之后,通过开发者 ...

  4. 第八篇 :微信公众平台开发实战Java版之如何网页授权获取用户基本信息

    第一部分:微信授权获取基本信息的介绍 我们首先来看看官方的文档怎么说: 如果用户在微信客户端中访问第三方网页,公众号可以通过微信网页授权机制,来获取用户基本信息,进而实现业务逻辑. 关于网页授权回调域 ...

  5. 第一篇:微信公众平台开发实战Java版之了解微信公众平台基础知识以及资料准备...

    相信很多人或多或少听说了微信公众平台的火热.但是开发还是有一点门槛,鉴于挺多朋友问我怎么开发,问多了,自己平时也进行以下总结. 所以下面给大家分享一下我的经验: 第一部分   介绍微信公众号的一些简单 ...

  6. 微信开放平台分账功能实战(Java版)

    ####近期为了接入微信支付以及微信分账等功能,开发了微信类的一系列接口,下面就本着开发的目标,再次记录回顾一下微信开放的步骤.. ####目标:通过微信支付,实现分账到运营商的功能. ####根据实 ...

  7. spark学习:java版JavaRDD与JavaPairRDD的互相转换

    1.引发:做一个java读取hbase的注册成表的程序.但是读出来的是javaPairRDD,而网上都是javaRDD转成dataFrame,我只能自己摸索怎么转成javaRDD  2.方法  Jav ...

  8. Spark案例:Java版统计单词个数

    1.Maven项目JavaSparkWordCount 2.在pom.xml里,添加对spark的依赖 <?xml version="1.0" encoding=" ...

  9. Spark项目实战:购物网站评价标签生成(非常详细的Spark算子操作)

    实战概览 一.项目简介 1. 需求 2. 内容 二.项目的开发环境 三.项目代码编写 1. 项目搭建 2. 分析原始的数据 3. 编写JSON解析类 4. 编写数据处理类 5. 将项目打包成jar提交 ...

最新文章

  1. 安装报错_RG Magic Bullet安装报错修复方法
  2. AngularJS之Filter(二)
  3. linux useradd(adduser)命令参数及用法详解(linux创建新用户命令)
  4. 如何封装Spring bean
  5. 看过这五条,再离职!
  6. 计算机组成原理补充实验,计算机组成原理实验补充实验指导-实.doc
  7. android xml java混合编程_Android | 自动调整文本大小的 TextViews
  8. Winform导入文件
  9. cts测试之FileAccessPermissionTest
  10. 谷粒学院项目总结(持续更新)
  11. 注册双击Ctrl键 (DLL版)
  12. Android识别图片中的WIFI二维码,并自动连接
  13. 打印纸张尺寸换算_常用纸张的尺寸大小对照表
  14. HTML5期末大作业:美食坊网站设计——美食坊美食购物主题(15页) HTML+CSS+JavaScript
  15. ffmpeg java 合并_[置顶] ffmpg简介以及用它实现音频视频合并(java)
  16. 将word 转换为图片(word to pdf ->pdf to image)
  17. SQuirrel SQL Client数据库连接工具的配置与使用
  18. thumbnailator图片压缩和碰见的问题修复
  19. C语言最难学的四大内容是什么?
  20. ibm tivoli_Tivoli Identity Manager中的角色重新认证

热门文章

  1. ROS意外崩掉解决方案
  2. Diva无法运行LVS问题(virtuoso,layout)
  3. laravel 页面静态化
  4. Django 页面静态化
  5. 最全的Windows Azure学习教程汇总
  6. 【git】出现Merge Conflict,解冲突
  7. matlab中text竖着写,科学网—matlab中text函数的用法 - 张瑞龙的博文
  8. 科研必备的14个学术搜索引擎
  9. 从苏宁电器到卡巴斯基第35篇:我与卡巴斯基的邂逅(中)
  10. 网易视频云郭再荣:打造一体化多场景的视频云平台