接Spark _06_补充部分算子【一】

https://blog.csdn.net/qq_41946557/article/details/102673673

scala API

package ddd.henu.transformationsimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}import scala.collection.mutable.ListBufferobject ElseFun2 {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("ddd").setMaster("local")val sc = new SparkContext(conf)sc.setLogLevel("error")/*** cogroup*/val nameRDD: RDD[(String, Int)] = sc.parallelize(List[(String, Int)](("george", 18), ("george", 180), ("george", 2000),("MM",18)),4)val scoreRDD: RDD[(String, Int)] = sc.parallelize(List[(String, Int)](("george", 100), ("george", 200), ("kk", 300),("ll",400)),4)val result: RDD[(String, (Iterable[Int], Iterable[Int]))] = nameRDD.cogroup(scoreRDD)result.foreach(println)/*** (ll,(CompactBuffer(),CompactBuffer(400)))* (kk,(CompactBuffer(),CompactBuffer(300)))* (MM,(CompactBuffer(18),CompactBuffer()))* (george,(CompactBuffer(18, 180, 2000),CompactBuffer(100, 200)))*//*** distinct*/
//    val rdd = sc.parallelize(List[String]("a","b","a","c","d","d"))//去重
//    rdd.map(one =>{(one,1)}).reduceByKey((v1,v2)=>{v1+v2}).map(one=>{one._1}).foreach(println)//去重2
//    val result: RDD[String] = rdd.distinct()
//    result.foreach(println)/*** mapPartitions   && foreachPartition*/
//    val rdd = sc.parallelize(List[String]("george","love","kk","ll","like","dd"),2)/*val l = rdd.map(one => {println("建立数据库连接")println(s"插入数据:$one")println("关闭数据库连接")one + "!"}).count()println(l)//6 频繁创建连接,浪费资源  *///一个分区一个分区的处理/*val l = rdd.mapPartitions(iter => {val list = new ListBuffer[String]println("建立数据库连接")while (iter.hasNext) {val str = iter.next()list.+=(str)println(s"插入数据:$str")}println("关闭数据库连接")list.iterator}).count()println(l)*///2    建立了两次数据库连接/*rdd.foreachPartition(iter =>{println("建立数据库连接!")while(iter.hasNext){val str:String = iter.next();println(s"插入数据库,$str");}println("关闭数据库连接!!!")})*//*val rdd1 = sc.parallelize(List[Int](1,2,3,4))val rdd2 = sc.parallelize(List[Int](3,4,5,6))*///subtract  差集    rdd2.subtract(rdd1)/*val result = rdd1.subtract(rdd2)result.foreach(println)*//*** 1* 2*///intersection  交集/*val result: RDD[Int] = rdd1.intersection(rdd2)*//*** 4* 3*/}
}

JAVA API

package eee;import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;import java.util.Arrays;/*** @author George* @description**/
public class ElseFun {public static void main(String[] args) {SparkConf conf = new SparkConf();conf.setMaster("local").setAppName("test");JavaSparkContext sc = new JavaSparkContext(conf);JavaPairRDD<String,Integer> nameRdd = sc.parallelizePairs(Arrays.asList(new Tuple2<String,Integer>("george", 22), new Tuple2<String,Integer>("lucy", 22), new Tuple2<String,Integer>("dk", 22), new Tuple2<String,Integer>("ll", 22)));JavaPairRDD<String,Integer> scoreRdd = sc.parallelizePairs(Arrays.asList(new Tuple2<String,Integer>("george", 22), new Tuple2<String,Integer>("lucy", 22), new Tuple2<String,Integer>("dk", 22), new Tuple2<String,Integer>("ll", 22)));JavaPairRDD<String, Tuple2<Integer, Integer>> result = nameRdd.join(scoreRdd);result.foreach(new VoidFunction<Tuple2<String, Tuple2<Integer, Integer>>>() {@Overridepublic void call(Tuple2<String, Tuple2<Integer, Integer>> s) throws Exception {System.out.println(s);}});/*** (ll,(22,22))* (dk,(22,22))* (george,(22,22))* (lucy,(22,22))*/}
}

Spark _07_补充部分算子【二】相关推荐

  1. Spark _10_补充部分算子【三】

    补充算子 transformations mapPartitionWithIndex 类似于mapPartitions,除此之外还会携带分区的索引值. repartition 增加或减少分区.会产生s ...

  2. Spark _06_补充部分算子【一】

    transformation join,leftOuterJoin,rightOuterJoin,fullOuterJoin 作用在K,V格式的RDD上.根据K进行连接,对(K,V)join(K,W) ...

  3. Spark 之 故障排除(二)

    Spark 之 故障排除(二) 这是我参与更文挑战的第12天,活动详情查看:更文挑战 故障排除四:解决算子函数返回NULL导致的问题 在一些算子函数里,需要我们有一个返回值,但是在一些情况下我们不希望 ...

  4. Spark Streaming笔记整理(二):案例、SSC、数据源与自定义Receiver

    [TOC] 实时WordCount案例 主要是监听网络端口中的数据,并实时进行wc的计算. Java版 测试代码如下: package cn.xpleaf.bigdata.spark.java.str ...

  5. Spark的RDD转换算子

    目录 RDD转换算子 Value 类型 1. map 2. mapPartitions map 和mapPartitions 的区别 3. mapPartitionsWithIndex 4. flat ...

  6. Spark学习之路 (二十二)SparkStreaming的官方文档

    讨论QQ:1586558083 目录 一.简介 1.1 概述 1.2 一个小栗子 2.2 初始化StreamingContext 2.3 离散数据流 (DStreams) 2.4 输入DStream和 ...

  7. spark中的转换算子和行动算子区别(transformations and actions)

    算子(RDD Operations): 对于初学者来说,算子的概念比较抽象,算子可以直译为 "RDD的操作", 我们把它理解为RDD的方法即可 . 转换算子(transformat ...

  8. 【Spark】(五)Spark Transformation(转换算子) 和 Action(执行算子)

    文章目录 一.Transformation 和 Action 1.转换操作 2.行动操作 二.map.flatMap.mapParations.mapPartitionsWithIndex 2.1 m ...

  9. 2021年大数据Spark(十六):Spark Core的RDD算子练习

    目录 RDD算子练习 map 算子 filter 算子 flatMap 算子 交集.并集.差集.笛卡尔积 distinct 算子 ​​​​​​​​​​​​​​first.take.top 算子 ​​​ ...

最新文章

  1. 怎么做 慢充 话费_警惕!冒充亲友骗充话费卷土重来
  2. 精进:如何成为一个很厉害的人---书摘(转)
  3. leader选举的源码分析-QuorumPeer.start
  4. (二)流--递归算法
  5. mysql主从配置 简书_Mysql主从配置,实现读写分离-Go语言中文社区
  6. Flutter搜索框SearchBar
  7. 平衡二叉树删除_自平衡二叉树实现及时间复杂度分析
  8. 计算机组成原理-白中英版
  9. 删除远程桌面记录的两种方式介绍
  10. Oracle imp/impdp 导入dmp文件到数据库
  11. 3097-小鑫爱数学
  12. 什么是DOM0,DOM2,DOM3?
  13. stm32h7内存分配_【STM32H7教程】第9章 STM32H7重要知识点数据类型,变量和堆栈...
  14. 广东省第三届职业技能大赛网络安全项目模块B
  15. layui table数据表格中数据返回成功,但页面不显示数据内容问题
  16. 图像和base64的转换 uniapp开发
  17. excel根据条件列转行_excel怎么批量把行变成列
  18. 模仿其他游戏进行内购
  19. 【学习机器学习】实验——聚类算法性能度量
  20. 美国华人一个半世纪的沧桑

热门文章

  1. 牛客 - 王国(虚树+树的直径)
  2. 洛谷 - P3357 最长k可重线段集问题(最大费用最大流+思维建边+拆点)
  3. EOJ_1102_任务调度问题
  4. cmi编码用c语言实现,CMI编码与解码
  5. 通讯故障_伦茨lenze全数字直流调速器通讯故障维修经验很丰富
  6. unix下批量进程的创建和强杀命令
  7. 第47讲:scrapy-redis分布式爬虫介绍
  8. 在linux中解压.tgz
  9. 字符串匹配--Sunday算法
  10. 干货 | 阿里巴巴HBase高可用8年抗战回忆录