Spark-Java算子
Spark算子回顾!
一、Transformations算子
1.map
特点就是:一对一,进来一个String,出去一个String
JavaRDD<String> map = lines.map(new Function<String, String>() {@Overridepublic String call(String line) throws Exception {return line + "*";}});map.foreach(new VoidFunction<String>() {@Overridepublic void call(String s) throws Exception {System.out.println(s);}});
2.flatMap
一变多,进来一行,出去一堆单词
//切割一行数据,返回一个Iteratorlines.flatMap(new FlatMapFunction<String, String>() {@Overridepublic Iterator<String> call(String s) throws Exception {List<String> list = Arrays.asList(s.split(" "));return list.iterator();}}).foreach(new VoidFunction<String>() {@Overridepublic void call(String s) throws Exception {System.out.println(s);}});
3.filter:过滤输出
只返回符合过滤条件的RDD
JavaRDD<String> result = lines.filter(new Function<String, Boolean>() {@Overridepublic Boolean call(String line) throws Exception {return "hello world".equals(line);}});System.out.println(result.count());//打印多少条result.foreach(new VoidFunction<String>() {@Overridepublic void call(String s) throws Exception {System.out.println(s);}});
4.mapToPair
进来一行String,出去的是一个Tuple。3个泛型,分别对应call的类型、Tuple的K-V类型
JavaPairRDD<String, String> result = lines.mapToPair(new PairFunction<String, String, String>() {@Overridepublic Tuple2<String, String> call(String s) throws Exception {return new Tuple2<>(s, s + "@@@");}});result.foreach(new VoidFunction<Tuple2<String, String>>() {@Overridepublic void call(Tuple2<String, String> stringStringTuple2) throws Exception {System.out.println(stringStringTuple2._1()+"<------>"+stringStringTuple2._2());//输出:hello world<------>hello world@@@,或者直接输出stringStringTuple2}});
5.计数:
lines.flatMap(new FlatMapFunction<String, String>() {@Overridepublic Iterator<String> call(String s) throws Exception {List<String> list = Arrays.asList(s.split(" "));return list.iterator();}}).mapToPair(new PairFunction<String, String, Integer>() {@Overridepublic Tuple2<String, Integer> call(String word) throws Exception {return new Tuple2<>(word,1);}}).reduceByKey(new Function2<Integer, Integer, Integer>() {@Overridepublic Integer call(Integer integer, Integer integer2) throws Exception {return integer+integer2;}}).foreach(new VoidFunction<Tuple2<String, Integer>>() {@Overridepublic void call(Tuple2<String, Integer> tp) throws Exception {System.out.println(tp);}});
6.排序
排序,反转,排序,再反转
JavaPairRDD<String, Integer> reduceRDD = lines.flatMap(new FlatMapFunction<String, String>() {@Overridepublic Iterator<String> call(String s) throws Exception {return Arrays.asList(s.split(" ")).iterator();}}).mapToPair(new PairFunction<String, String, Integer>() {@Overridepublic Tuple2<String, Integer> call(String s) throws Exception {return new Tuple2<>(s, 1);}}).reduceByKey(new Function2<Integer, Integer, Integer>() {@Overridepublic Integer call(Integer integer, Integer integer2) throws Exception {return integer + integer2;}});reduceRDD.mapToPair(new PairFunction<Tuple2<String,Integer>, Integer, String>() {@Overridepublic Tuple2<Integer, String> call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {return stringIntegerTuple2.swap();}}).sortByKey(false).mapToPair(new PairFunction<Tuple2<Integer,String>, String, Integer>() {@Overridepublic Tuple2<String, Integer> call(Tuple2<Integer, String> integerStringTuple2) throws Exception {return integerStringTuple2.swap();}}).foreach(new VoidFunction<Tuple2<String, Integer>>() {@Overridepublic void call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {System.out.println(stringIntegerTuple2);//输出(hello,6),以按照wc排好序}});
7.sample
JavaRDD<String> sample = lines.sample(true, 0.1,100L);sample.foreach(new VoidFunction<String>() {@Overridepublic void call(String s) throws Exception {System.out.println(s);}});
8.join
JavaPairRDD<String, Tuple2<Integer, Integer>> result = rdd1.join(rdd2);
8.1leftOuterJoin
JavaPairRDD<String, Tuple2<Integer, Optional<Integer>>> result = rdd1.leftOuterJoin(rdd2);result.foreach(new VoidFunction<Tuple2<String, Tuple2<Integer, Optional<Integer>>>>() {@Overridepublic void call(Tuple2<String, Tuple2<Integer, Optional<Integer>>> tp) throws Exception {String key = tp._1;Integer v1 = tp._2._1;Optional<Integer> optional = tp._2._2;
// System.out.println("key="+key+";v1="+v1+";optionl="+optional.get());System.out.println("key="+key+";v1="+v1+";optionl="+optional.orElse(1000));}});
8.2rightOuterJoin
JavaPairRDD<String, Tuple2<Optional<Integer>, Integer>> result = rdd1.rightOuterJoin(rdd2);result.foreach(new VoidFunction<Tuple2<String, Tuple2<Optional<Integer>, Integer>>>() {@Overridepublic void call(Tuple2<String, Tuple2<Optional<Integer>, Integer>> tp) throws Exception {String key = tp._1;Optional<Integer> v1 = tp._2._1;Integer v2 = tp._2._2;System.out.println("key="+key+";v1="+v1.get()+";v2="+v2);}});
9.union合并
JavaPairRDD<String, Integer> result = rdd1.union(rdd2);result.foreach(new VoidFunction<Tuple2<String, Integer>>() {@Overridepublic void call(Tuple2<String, Integer> tp) throws Exception {System.out.println(tp._1+":"+tp._2);}});
10.intersection取交集
JavaPairRDD<String, Integer> result = rdd1.intersection(rdd2);result.foreach(new VoidFunction<Tuple2<String, Integer>>() {@Overridepublic void call(Tuple2<String, Integer> tp) throws Exception {System.out.println(tp._1+":"+tp._2);}});
11.subtract取差集
JavaPairRDD<String, Integer> result = rdd1.subtract(rdd2);result.foreach(new VoidFunction<Tuple2<String, Integer>>() {@Overridepublic void call(Tuple2<String, Integer> tp) throws Exception {System.out.println(tp._1+":"+tp._2);}});
12.distinct去重
JavaRDD<String> result = rdd1.distinct();result.foreach(new VoidFunction<String>() {@Overridepublic void call(String s) throws Exception {System.out.println(s);}});
13.mapPartitions
作用于每个partition上的数据
JavaRDD<String> stringJavaRDD = rdd1.mapPartitions(new FlatMapFunction<Iterator<String>, String>() {@Overridepublic Iterator<String> call(Iterator<String> iter) throws Exception {List<String> list = new ArrayList<>();System.out.println("创建");while (iter.hasNext()) {String s = iter.next();list.add(s);System.out.println("插入:" + s);}System.out.println("关闭");return list.iterator();}});stringJavaRDD.count();
14.mapPartitionsWithIndex
JavaRDD<String> rdd1 = rdd.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() {@Overridepublic Iterator<String> call(Integer index, Iterator<String> iterator) throws Exception {List<String> list = new ArrayList<>();while (iterator.hasNext()) {String next = iterator.next();list.add("index=" + index + ";value=" + next);}return list.iterator();}}, false);for (String s:rdd1.collect()){System.out.println(s);//index=0;value=a}
15.repartition
可以增多、减少分区。宽依赖算子,会产生shuffle;
这里区别于coalesce,coalesce同样可能增加、减少分区。但是coalesce是窄依赖算子,默认无shuffle,可通过设置true来开启。当coalesce由少的分区分到多的分区时,不让产生shuffle,不起作用。
因此可以变相的理解为:repartition常用于增多分区,coalesce常用于减少分区
JavaRDD<String> rdd2 = rdd1.repartition(4);rdd2.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() {@Overridepublic Iterator<String> call(Integer index, Iterator<String> iterator) throws Exception {List<String> list = new ArrayList<>();while (iterator.hasNext()) {String next = iterator.next();list.add("index=" + index + ";value=" + next);}return list.iterator();}},false);
16.zip & zipwithindex
zip:两个RDD可以通过zip压缩在一起,输出结果:(a,1)
zipwithindex:Long就是RDD的index下标0,1,2…和各自的下标压缩在一起,形成K-V格式RDD。如:(a,0)
JavaPairRDD<String, String> zip = rdd.zip(rdd1);
JavaPairRDD<String, Long> zipWithIndex = rdd.zipWithIndex();
二、Action算子
1.collect
List<String> list = lines.collect();for (String s:list){System.out.println(s);}
2.count
返回几行
long count = lines.count();System.out.println(count);
3.first
返回第一行
String first = lines.first();System.out.println(first);
4.take
返回指定几行
List<String> list = lines.take(5);for (String s:list){System.out.println(s);}
5.foreachPartition
rdd1.foreachPartition(new VoidFunction<Iterator<String>>() {@Overridepublic void call(Iterator<String> stringIterator) throws Exception {}});
6.reduce &countByKey & countByValue
聚合执行对应逻辑,输出15
sc.parallelize(Arrays.asList(1,2,3,4,5)).reduce(new Function2<Integer, Integer, Integer>() {@Overridepublic Integer call(Integer v1, Integer v2) throws Exception {return v1+v2;}});
countByKey按照key分组,count整体相同的有几个
Map<String, Long> map = sc.parallelizePairs(Arrays.asList(new Tuple2<String, Integer>("a", 1),new Tuple2<String, Integer>("b", 1), new Tuple2<String, Integer>("c", 1),new Tuple2<String, Integer>("d", 1))).countByKey();
countByValue:整体作为value分组,计算出现次数。输出:((a,100),2)
sc.parallelize(Arrays.asList(1,2,3,4,5)).reduce(new Function2<Integer, Integer, Integer>() {@Overridepublic Integer call(Integer v1, Integer v2) throws Exception {return v1+v2;}});Map<Tuple2<String, Integer>, Long> map = sc.parallelizePairs(Arrays.asList(new Tuple2<String, Integer>("a", 1),new Tuple2<String, Integer>("b", 1), new Tuple2<String, Integer>("c", 1),new Tuple2<String, Integer>("d", 1))).countByValue();Set<Map.Entry<Tuple2<String, Integer>, Long>> set = map.entrySet();for (Map.Entry<Tuple2<String,Integer>,Long>entry : set){Tuple2<String, Integer> key = entry.getKey();Long value = entry.getValue();}
Spark-Java算子相关推荐
- Spark action算子案例
在上篇文章中,我们对Spark中几种常用的transformation算子通过Java和Scala两种代码分别进行了案例演示,Spark transformation算子案例 而在本文中,我们将继续 ...
- Spark RDD算子(八)mapPartitions, mapPartitionsWithIndex
Spark RDD算子(八) mapPartitions scala版本 java版本 mapPartitionsWithIndex scala版本 java版本 mapPartitions mapP ...
- spark (java API) 在Intellij IDEA中开发并运行
Spark 程序开发,调试和运行,intellij idea开发Spark java程序. 分两部分,第一部分基于intellij idea开发Spark实例程序并在intellij IDEA中运行 ...
- Spark学习之Spark RDD算子
个人主页zicesun.com 这里,从源码的角度总结一下Spark RDD算子的用法. 单值型Transformation算子 map /*** Return a new RDD by applyi ...
- spark算子大全glom_(七)Spark Streaming 算子梳理 — repartition算子
目录 天小天:(一)Spark Streaming 算子梳理 - 简单介绍streaming运行逻辑 天小天:(二)Spark Streaming 算子梳理 - flatMap和mapPartitio ...
- java spark 环境_在 IntelliJ IDEA 中配置 Spark(Java API) 运行环境
1. 新建Maven项目 初始Maven项目完成后,初始的配置(pom.xml)如下: 2. 配置Maven 向项目里新建Spark Core库 xmlns:xsi="http://www. ...
- Spark部分算子及使用
Spark部分算子及使用 案例一:flatmap算子 def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppN ...
- Spark _30_SparkStreaming算子操作Driver HA
SparkStreaming算子操作 foreachRDD output operation算子,必须对抽取出来的RDD执行action类算子,代码才能执行. import org.apache.sp ...
- Spark transformation算子案例
Spark支持两种RDD操作:transformation和action 在本文中,将对几个常用的transformation算子进行案例演示,采用Java和Scala两种语言对代码进行编写 其中 ...
- spark java 案例_Spark入门案例
package com.spark.core; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPa ...
最新文章
- LLVM Clang前端编译与调试
- 简单介绍Git合并分支的流程步骤
- ssm使用全注解实现增删改查案例——IEmpService
- 【飞秋】JS 实现完美include
- 【项目合作】移动端人体姿态估计
- Photoshop笔刷|如何正确导入笔刷?
- 的文件夹结构_用framework7开发APP:6.目录结构
- Oralce SQLPlus 以及shell脚本中spool输出到文件时的格式化输出
- 经典的哲学家就餐问题
- 这几个免费、商用图片素材网,你一定要知道。
- 崩坏3新版本服务器维护多久,崩坏3 3月14日版本更新维护通知
- codeblocks 编译器设置方法 也可以酱紫滴
- cdh6 添加新主机后,出现 Error sending messages to firehose
- 洛谷 P1007 独木桥
- Android各种模拟器使用笔记
- vue项目创建之后 ESLint导致第一行毛毛虫
- Specification 参数的用法
- Visual Studio Code介绍
- linux cadence教程 pdf,cadence入门教程-修改版.pdf
- 区块链(一)基于区块链的网络安全技术
热门文章
- python3连接oracle教程,Pycharm中Python3连接Oracle
- folders默认配置 shell_分布式存储Ceph RBD-Mirror灾备方案(二)镜像模式配置
- python画图matplotlib基础笔记
- codewars068 - Convert string to camel case
- 项目小结:日立OA系统(Asp.net)
- 90%的用户都不知道手机内部功能
- Knockout学习之前言
- 自己动手写一个JQuery插件(第二篇)(转)
- VB.NET模块的总结(一)精简版
- jQuery源码解析对象实例化与jQuery原型及整体构建模型分析(一)