能进行join的只能是:

JavaPairRDD

--------------------------------------------------------------------第一种方案------------------------------------------------------------------------

代码如下:

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import org.apache.log4j.Logger;
import java.util.Arrays;
import java.util.List;public class java_join {static class Entity {private String name;private Integer age;public Entity(String name, Integer age) //构造函数{this.name = name;this.age = age;}public String getName() {return name;}public Integer getAge() {return age;}}//--------------------------------------------------------------------------------------------------public static void main(String[] args){Logger.getLogger("org.apache.hadoop").setLevel(org.apache.log4j.Level.WARN);Logger.getLogger("org.apache.spark").setLevel(org.apache.log4j.Level.WARN);Logger.getLogger("org.project-spark").setLevel(org.apache.log4j.Level.WARN);String appName = "test";String master = "local[2]";String path = "hdfs://Desktop:9000/rdd3.csv";SparkConf conf = new SparkConf().setAppName(appName).setMaster(master) .set("spark.serializer","org.apache.spark.serializer.KryoSerializer");JavaSparkContext sc = new JavaSparkContext(conf);//        这个keyby会把age放前,name放后JavaPairRDD<Integer, Entity> pairRDD = sc.parallelize(Arrays.asList(new Entity("zhangsan", 11),new Entity("lisi", 11),new Entity("wangwu", 13))).keyBy(Entity::getAge);JavaPairRDD<Integer, Entity> javaPairRDD = sc.textFile(path).map(line -> {String[] strings = line.split(",");String name = strings[0];Integer age = Integer.valueOf(strings[1]);return new Entity(name, age);}).keyBy(Entity::getAge);System.out.println("--------------------------------------------------------");System.out.println(javaPairRDD.collect());JavaPairRDD<Integer, Tuple2<Entity, Entity>> collect = pairRDD.join(javaPairRDD);System.out.println("-------------------------查看join结果-------------------------------");List<Tuple2<Integer, Tuple2<Entity, Entity>>> result = collect.collect();for (int i = 0; i < result.size(); i++){System.out.print("List[");System.out.print(result.get(i)._1);System.out.print(",Tuple2(");System.out.print(result.get(i)._2._1.name);System.out.print(",");System.out.print(result.get(i)._2._2.name);System.out.println(")]");}}
}

实验验结果是:

List[11,Tuple2(zhangsan,zhangsan)]
List[11,Tuple2(zhangsan,lisi)]
List[11,Tuple2(lisi,zhangsan)]
List[11,Tuple2(lisi,lisi)]

rdd3.csv的内容是:

zhangsan,11
lisi,11
wangwu,14

----------------------第二种方案--------------------------

import com.sun.rowset.internal.Row;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaPairRDD$;
import org.apache.spark.api.java.function.*;
import org.slf4j.event.Level;
import scala.Tuple2;
import java.util.*;
import java.util.Random;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.SparkContext;
import java.util.Iterator;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import java.lang.*;
//import org.apache.log4j.Level;
import org.apache.log4j.Logger;
//import java.util.logging.Logger;import scala.Tuple2;public class sampling_salting
{public static void main(String[]  args)
{Logger.getLogger("org.apache.hadoop").setLevel(org.apache.log4j.Level.WARN);Logger.getLogger("org.apache.spark").setLevel(org.apache.log4j.Level.WARN);Logger.getLogger("org.project-spark").setLevel(org.apache.log4j.Level.WARN);SparkConf conf  = new SparkConf().setMaster("local").setAppName("join");JavaSparkContext sc = new JavaSparkContext(conf);String path1="hdfs://Desktop:9000/rdd1.csv";String path2="hdfs://Desktop:9000/rdd2.csv";JavaPairRDD<Integer, String> rdd1 = sc.textFile(path1).mapToPair(new PairFunction<String, Integer, String>(){@Overridepublic Tuple2<Integer, String> call(String s) throws Exception{String[] strings=s.split(",");Integer ids = Integer.valueOf(strings[0]);String greet=strings[1];return Tuple2.apply(ids,greet);}});JavaPairRDD<Integer,String>rdd2=sc.textFile(path2)
.mapToPair(line->{String[] strings=line.split(",");Integer ids = Integer.valueOf(strings[0]);String greet=strings[1];return new Tuple2<>(ids,greet);
});System.out.println(rdd1.collect());System.out.println(rdd2.collect());JavaPairRDD<Integer, Tuple2<String, String>> result = rdd1.join(rdd2);System.out.println(result.collect());}
}

上述代码中,转化为最终的JavaPairRDD使用了mapToPair有两种办法:

return Tuple2.apply(ids,greet);

return new Tuple2<>(ids,greet);

rdd1.csv

001,hello
001,hello
001,hello
001,hello

rdd2.csv

002,hello
002,hello
002,hello
002,hello

hdfs dfs -put rdd1.csv /

hdfs dfs -put rdd2.csv /

--------------------------------------------Java常用的pom.xml文件--------------------------------------------------------------------------------

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>java_join</groupId><artifactId>java_join</artifactId><version>1.0-SNAPSHOT</version><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>1.8</source><target>1.8</target><encoding>UTF-8</encoding></configuration></plugin></plugins></build><dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>3.0.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.12</artifactId><version>3.0.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>3.0.0</version><scope>provided</scope></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-mllib_2.12</artifactId><version>3.0.0</version><scope>runtime</scope></dependency><!-- https://mvnrepository.com/artifact/org.apache.spark/spark-graphx --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-graphx_2.12</artifactId><version>3.0.0</version></dependency></dependencies></project>

Java常用spark的pom.xml与读取csv为rdd到最终join操作+java常用pom.xml文件相关推荐

  1. scala常用spark的pom.xml与读取csv为rdd到最终join操作

    这个问题其实本来没啥难度,不值得记录, 但是因为join需要的是Array((),(),())这样的格式, 而不是Array(Array(),Array(),Array())这样的格式,让问题瞬间有了 ...

  2. python pandas csv时间聚合_Python通过pandas操作excel常用功能

    1.导入数据源 #导入相关库 import pandas as pd import numpy as np import os from pandas import DataFrame,Series ...

  3. spark 把一列数据合并_Spark Java-合并同一列多行 - java

    我正在使用Java Spark,并且有1个这样的数据框 +---+-----+------+ |id |color|datas | +----------------+ |1 |blue |data1 ...

  4. 在IntelliJ IDEA中创建和运行java/scala/spark程序

    本文将分两部分来介绍如何在IntelliJ IDEA中运行Java/Scala/Spark程序: 基本概念介绍 在IntelliJ IDEA中创建和运行java/scala/spark程序 基本概念介 ...

  5. Java连接Spark Standalone集群

    软件环境: spark-1.6.3-bin-hadoop2.6.hadoop-2.6.4.jdk1.7.0_67.IDEA14.1.5 : Hadoop集群采用伪分布式安装,运行过程中只启动HDFS: ...

  6. openCsv读取csv文件

    https://blog.csdn.net/cat_book_milk/article/details/52953786 Java读取csv文件 https://blog.csdn.net/galen ...

  7. spark如何防止内存溢出_Spark 理论基石 —— RDD

    概述 RDD,学名可伸缩的分布式数据集(Resilient Distributed Dataset).是一种对数据集形态的抽象,基于此抽象,使用者可以在集群中执行一系列计算,而不用将中间结果落盘.而这 ...

  8. 读取csv文件 java_Java:逐步读取/流式传输CSV文件

    读取csv文件 java 我一直在做一些涉及读取CSV文件的工作,而我一直在使用OpenCSV ,而我的最初方法是逐行读取文件,解析内容并将其保存到地图列表中. 当文件的内容适合内存时,此方法有效,但 ...

  9. spark做两张大表的join操作,mapPartition和重分区算子的使用策略

    Spark中做两个大hive表的join操作,先读取过来处理成两个数据量很大的RDD,如果两个RDD直接进行join操作,势必会造成shuffle等导致运行非常缓慢,那么怎么优化呢?方法如下: 首先, ...

最新文章

  1. MVC --.Routing
  2. C++ Primer 5th笔记(chap 19 特殊工具与技术)链接指示: extern “C“
  3. Python 爬虫利器 Beautiful Soup 4 之文档树的搜索
  4. 发动机冷启动和热启动的区别_「牛车实验室」AGM自动启停蓄电池解读 和普通蓄电池到底有什么区别...
  5. thinkserver rd650管理口地址_路由器WAN口和LAN口有什么区别【区别介绍】
  6. 单指令流多数据流( SIMD)
  7. python依赖注入_什么是依赖注入?
  8. Java并发性和多线程介绍
  9. 高精度练习(hdoj1042)
  10. django2.2-form表单详解
  11. 手机怎么用外嵌字幕_怎么用手机给视频添加字幕?原来方法这么简单,3分钟教你学会...
  12. 达梦数据库、表字段创建索引或删除索引,增加表字段、修改字段类型或长度、修改注释sql语句
  13. 什么是properties文件
  14. 游戏开发入门(三)图形渲染
  15. 融合正弦余弦和无限折叠迭代混沌映射的蝴蝶优化算法-附代码
  16. 计算机图形学的数学基础
  17. 通过网页控制嵌入式设备
  18. 盘点 | Github上的18个顶级深度学习项目
  19. Zxing系列之设置二维码图片背景透明教程
  20. C++ getline():从文件中读取一行字符串

热门文章

  1. 【转】Android开发之数据库SQL
  2. (SSO)单点登录原理和总结
  3. python-day1
  4. python 04 基础
  5. HTML DOM 事件
  6. linux —— 学习笔记(用户管理与权限控制)
  7. Oracle SQL语句收集
  8. WP8.1开发中关于如何显示.gif格式动态格式图片方法
  9. LINUX内核之内存屏障
  10. 先锋展示了可爱鸟形车载机器