Spark算子回顾!

一、Transformations算子

1.map

特点就是:一对一,进来一个String,出去一个String

        JavaRDD<String> map = lines.map(new Function<String, String>() {@Overridepublic String call(String line) throws Exception {return line + "*";}});map.foreach(new VoidFunction<String>() {@Overridepublic void call(String s) throws Exception {System.out.println(s);}});

2.flatMap

一变多,进来一行,出去一堆单词

     //切割一行数据,返回一个Iteratorlines.flatMap(new FlatMapFunction<String, String>() {@Overridepublic Iterator<String> call(String s) throws Exception {List<String> list = Arrays.asList(s.split(" "));return list.iterator();}}).foreach(new VoidFunction<String>() {@Overridepublic void call(String s) throws Exception {System.out.println(s);}});

3.filter:过滤输出

只返回符合过滤条件的RDD

        JavaRDD<String> result = lines.filter(new Function<String, Boolean>() {@Overridepublic Boolean call(String line) throws Exception {return "hello world".equals(line);}});System.out.println(result.count());//打印多少条result.foreach(new VoidFunction<String>() {@Overridepublic void call(String s) throws Exception {System.out.println(s);}});

4.mapToPair

进来一行String,出去的是一个Tuple。3个泛型,分别对应call的类型、Tuple的K-V类型

        JavaPairRDD<String, String> result = lines.mapToPair(new PairFunction<String, String, String>() {@Overridepublic Tuple2<String, String> call(String s) throws Exception {return new Tuple2<>(s, s + "@@@");}});result.foreach(new VoidFunction<Tuple2<String, String>>() {@Overridepublic void call(Tuple2<String, String> stringStringTuple2) throws Exception {System.out.println(stringStringTuple2._1()+"<------>"+stringStringTuple2._2());//输出:hello world<------>hello world@@@,或者直接输出stringStringTuple2}});

5.计数:

        lines.flatMap(new FlatMapFunction<String, String>() {@Overridepublic Iterator<String> call(String s) throws Exception {List<String> list = Arrays.asList(s.split(" "));return list.iterator();}}).mapToPair(new PairFunction<String, String, Integer>() {@Overridepublic Tuple2<String, Integer> call(String word) throws Exception {return new Tuple2<>(word,1);}}).reduceByKey(new Function2<Integer, Integer, Integer>() {@Overridepublic Integer call(Integer integer, Integer integer2) throws Exception {return integer+integer2;}}).foreach(new VoidFunction<Tuple2<String, Integer>>() {@Overridepublic void call(Tuple2<String, Integer> tp) throws Exception {System.out.println(tp);}});

6.排序

排序,反转,排序,再反转

        JavaPairRDD<String, Integer> reduceRDD = lines.flatMap(new FlatMapFunction<String, String>() {@Overridepublic Iterator<String> call(String s) throws Exception {return Arrays.asList(s.split(" ")).iterator();}}).mapToPair(new PairFunction<String, String, Integer>() {@Overridepublic Tuple2<String, Integer> call(String s) throws Exception {return new Tuple2<>(s, 1);}}).reduceByKey(new Function2<Integer, Integer, Integer>() {@Overridepublic Integer call(Integer integer, Integer integer2) throws Exception {return integer + integer2;}});reduceRDD.mapToPair(new PairFunction<Tuple2<String,Integer>, Integer, String>() {@Overridepublic Tuple2<Integer, String> call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {return stringIntegerTuple2.swap();}}).sortByKey(false).mapToPair(new PairFunction<Tuple2<Integer,String>, String, Integer>() {@Overridepublic Tuple2<String, Integer> call(Tuple2<Integer, String> integerStringTuple2) throws Exception {return integerStringTuple2.swap();}}).foreach(new VoidFunction<Tuple2<String, Integer>>() {@Overridepublic void call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {System.out.println(stringIntegerTuple2);//输出(hello,6),以按照wc排好序}});

7.sample

 JavaRDD<String> sample = lines.sample(true, 0.1,100L);sample.foreach(new VoidFunction<String>() {@Overridepublic void call(String s) throws Exception {System.out.println(s);}});

8.join

JavaPairRDD<String, Tuple2<Integer, Integer>> result = rdd1.join(rdd2);

8.1leftOuterJoin

JavaPairRDD<String, Tuple2<Integer, Optional<Integer>>> result = rdd1.leftOuterJoin(rdd2);result.foreach(new VoidFunction<Tuple2<String, Tuple2<Integer, Optional<Integer>>>>() {@Overridepublic void call(Tuple2<String, Tuple2<Integer, Optional<Integer>>> tp) throws Exception {String key = tp._1;Integer v1 = tp._2._1;Optional<Integer> optional = tp._2._2;
//                System.out.println("key="+key+";v1="+v1+";optionl="+optional.get());System.out.println("key="+key+";v1="+v1+";optionl="+optional.orElse(1000));}});

8.2rightOuterJoin

JavaPairRDD<String, Tuple2<Optional<Integer>, Integer>> result = rdd1.rightOuterJoin(rdd2);result.foreach(new VoidFunction<Tuple2<String, Tuple2<Optional<Integer>, Integer>>>() {@Overridepublic void call(Tuple2<String, Tuple2<Optional<Integer>, Integer>> tp) throws Exception {String key = tp._1;Optional<Integer> v1 = tp._2._1;Integer v2 = tp._2._2;System.out.println("key="+key+";v1="+v1.get()+";v2="+v2);}});

9.union合并

JavaPairRDD<String, Integer> result = rdd1.union(rdd2);result.foreach(new VoidFunction<Tuple2<String, Integer>>() {@Overridepublic void call(Tuple2<String, Integer> tp) throws Exception {System.out.println(tp._1+":"+tp._2);}});

10.intersection取交集

JavaPairRDD<String, Integer> result = rdd1.intersection(rdd2);result.foreach(new VoidFunction<Tuple2<String, Integer>>() {@Overridepublic void call(Tuple2<String, Integer> tp) throws Exception {System.out.println(tp._1+":"+tp._2);}});

11.subtract取差集

JavaPairRDD<String, Integer> result = rdd1.subtract(rdd2);result.foreach(new VoidFunction<Tuple2<String, Integer>>() {@Overridepublic void call(Tuple2<String, Integer> tp) throws Exception {System.out.println(tp._1+":"+tp._2);}});

12.distinct去重

JavaRDD<String> result = rdd1.distinct();result.foreach(new VoidFunction<String>() {@Overridepublic void call(String s) throws Exception {System.out.println(s);}});

13.mapPartitions

作用于每个partition上的数据

JavaRDD<String> stringJavaRDD = rdd1.mapPartitions(new FlatMapFunction<Iterator<String>, String>() {@Overridepublic Iterator<String> call(Iterator<String> iter) throws Exception {List<String> list = new ArrayList<>();System.out.println("创建");while (iter.hasNext()) {String s = iter.next();list.add(s);System.out.println("插入:" + s);}System.out.println("关闭");return list.iterator();}});stringJavaRDD.count();

14.mapPartitionsWithIndex

JavaRDD<String> rdd1 = rdd.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() {@Overridepublic Iterator<String> call(Integer index, Iterator<String> iterator) throws Exception {List<String> list = new ArrayList<>();while (iterator.hasNext()) {String next = iterator.next();list.add("index=" + index + ";value=" + next);}return list.iterator();}}, false);for (String s:rdd1.collect()){System.out.println(s);//index=0;value=a}

15.repartition

可以增多、减少分区。宽依赖算子,会产生shuffle;
这里区别于coalesce,coalesce同样可能增加、减少分区。但是coalesce是窄依赖算子,默认无shuffle,可通过设置true来开启。当coalesce由少的分区分到多的分区时,不让产生shuffle,不起作用。
因此可以变相的理解为:repartition常用于增多分区,coalesce常用于减少分区

JavaRDD<String> rdd2 = rdd1.repartition(4);rdd2.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() {@Overridepublic Iterator<String> call(Integer index, Iterator<String> iterator) throws Exception {List<String> list = new ArrayList<>();while (iterator.hasNext()) {String next = iterator.next();list.add("index=" + index + ";value=" + next);}return list.iterator();}},false);

16.zip & zipwithindex

zip:两个RDD可以通过zip压缩在一起,输出结果:(a,1)

zipwithindex:Long就是RDD的index下标0,1,2…和各自的下标压缩在一起,形成K-V格式RDD。如:(a,0)

JavaPairRDD<String, String> zip = rdd.zip(rdd1);
JavaPairRDD<String, Long> zipWithIndex = rdd.zipWithIndex();

二、Action算子

1.collect

        List<String> list = lines.collect();for (String s:list){System.out.println(s);}

2.count

返回几行

        long count = lines.count();System.out.println(count);

3.first

返回第一行

        String first = lines.first();System.out.println(first);

4.take

返回指定几行

        List<String> list = lines.take(5);for (String s:list){System.out.println(s);}

5.foreachPartition

rdd1.foreachPartition(new VoidFunction<Iterator<String>>() {@Overridepublic void call(Iterator<String> stringIterator) throws Exception {}});

6.reduce &countByKey & countByValue

聚合执行对应逻辑,输出15

sc.parallelize(Arrays.asList(1,2,3,4,5)).reduce(new Function2<Integer, Integer, Integer>() {@Overridepublic Integer call(Integer v1, Integer v2) throws Exception {return v1+v2;}});

countByKey按照key分组,count整体相同的有几个

Map<String, Long> map = sc.parallelizePairs(Arrays.asList(new Tuple2<String, Integer>("a", 1),new Tuple2<String, Integer>("b", 1), new Tuple2<String, Integer>("c", 1),new Tuple2<String, Integer>("d", 1))).countByKey();

countByValue:整体作为value分组,计算出现次数。输出:((a,100),2)

sc.parallelize(Arrays.asList(1,2,3,4,5)).reduce(new Function2<Integer, Integer, Integer>() {@Overridepublic Integer call(Integer v1, Integer v2) throws Exception {return v1+v2;}});Map<Tuple2<String, Integer>, Long> map = sc.parallelizePairs(Arrays.asList(new Tuple2<String, Integer>("a", 1),new Tuple2<String, Integer>("b", 1), new Tuple2<String, Integer>("c", 1),new Tuple2<String, Integer>("d", 1))).countByValue();Set<Map.Entry<Tuple2<String, Integer>, Long>> set = map.entrySet();for (Map.Entry<Tuple2<String,Integer>,Long>entry : set){Tuple2<String, Integer> key = entry.getKey();Long value = entry.getValue();}

Spark-Java算子相关推荐

  1. Spark action算子案例

    在上篇文章中,我们对Spark中几种常用的transformation算子通过Java和Scala两种代码分别进行了案例演示,Spark transformation算子案例  而在本文中,我们将继续 ...

  2. Spark RDD算子(八)mapPartitions, mapPartitionsWithIndex

    Spark RDD算子(八) mapPartitions scala版本 java版本 mapPartitionsWithIndex scala版本 java版本 mapPartitions mapP ...

  3. spark (java API) 在Intellij IDEA中开发并运行

    Spark 程序开发,调试和运行,intellij idea开发Spark java程序.  分两部分,第一部分基于intellij idea开发Spark实例程序并在intellij IDEA中运行 ...

  4. Spark学习之Spark RDD算子

    个人主页zicesun.com 这里,从源码的角度总结一下Spark RDD算子的用法. 单值型Transformation算子 map /*** Return a new RDD by applyi ...

  5. spark算子大全glom_(七)Spark Streaming 算子梳理 — repartition算子

    目录 天小天:(一)Spark Streaming 算子梳理 - 简单介绍streaming运行逻辑 天小天:(二)Spark Streaming 算子梳理 - flatMap和mapPartitio ...

  6. java spark 环境_在 IntelliJ IDEA 中配置 Spark(Java API) 运行环境

    1. 新建Maven项目 初始Maven项目完成后,初始的配置(pom.xml)如下: 2. 配置Maven 向项目里新建Spark Core库 xmlns:xsi="http://www. ...

  7. Spark部分算子及使用

    Spark部分算子及使用 案例一:flatmap算子 def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppN ...

  8. Spark _30_SparkStreaming算子操作Driver HA

    SparkStreaming算子操作 foreachRDD output operation算子,必须对抽取出来的RDD执行action类算子,代码才能执行. import org.apache.sp ...

  9. Spark transformation算子案例

    Spark支持两种RDD操作:transformation和action  在本文中,将对几个常用的transformation算子进行案例演示,采用Java和Scala两种语言对代码进行编写  其中 ...

  10. spark java 案例_Spark入门案例

    package com.spark.core; ​ import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPa ...

最新文章

  1. LLVM Clang前端编译与调试
  2. 简单介绍Git合并分支的流程步骤
  3. ssm使用全注解实现增删改查案例——IEmpService
  4. 【飞秋】JS 实现完美include
  5. 【项目合作】移动端人体姿态估计
  6. Photoshop笔刷|如何正确导入笔刷?
  7. 的文件夹结构_用framework7开发APP:6.目录结构
  8. Oralce SQLPlus 以及shell脚本中spool输出到文件时的格式化输出
  9. 经典的哲学家就餐问题
  10. 这几个免费、商用图片素材网,你一定要知道。
  11. 崩坏3新版本服务器维护多久,崩坏3 3月14日版本更新维护通知
  12. codeblocks 编译器设置方法 也可以酱紫滴
  13. cdh6 添加新主机后,出现 Error sending messages to firehose
  14. 洛谷 P1007 独木桥
  15. Android各种模拟器使用笔记
  16. vue项目创建之后 ESLint导致第一行毛毛虫
  17. Specification 参数的用法
  18. Visual Studio Code介绍
  19. linux cadence教程 pdf,cadence入门教程-修改版.pdf
  20. 区块链(一)基于区块链的网络安全技术

热门文章

  1. python3连接oracle教程,Pycharm中Python3连接Oracle
  2. folders默认配置 shell_分布式存储Ceph RBD-Mirror灾备方案(二)镜像模式配置
  3. python画图matplotlib基础笔记
  4. codewars068 - Convert string to camel case
  5. 项目小结:日立OA系统(Asp.net)
  6. 90%的用户都不知道手机内部功能
  7. Knockout学习之前言
  8. 自己动手写一个JQuery插件(第二篇)(转)
  9. VB.NET模块的总结(一)精简版
  10. jQuery源码解析对象实例化与jQuery原型及整体构建模型分析(一)