import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object WordCount {/*** 单词计数程序-Scala版本*/def main(args: Array[String]): Unit = {/*** spark-shell:* spark:SparkSession 主要针对的是SparkSQL* SparkSQL程序入口* sc:SparkCore对象,SparkCore的程序入口* 在spark-shell中已经初始化好了sc,但是我们代码中需要创建对象*///配置文件val conf = new SparkConf()//如果不设置,默认运行的是集群模式,设置成local运行local模式,直接在IDEA中运行即可conf.setMaster("local")//必须要设置,否则会报错,设置任务名字conf.setAppName("WordCount");//创建SparkCore的程序入口val sc = new SparkContext(conf)//以上相当于Spark-shell帮我们干的事//读取文件生成RDDval fileRDD: RDD[String] = sc.textFile("G:\\workspace\\_2018_4_22\\src\\hello.txt")//把每一行数据按照逗号,分隔val wordRDD: RDD[String] = fileRDD.flatMap(line => line.split(","))//让每一个单词都出现一次val wordOneRDD: RDD[(String, Int)] = wordRDD.map(word => (word, 1))//单词计数val wordCountRDD: RDD[(String, Int)] = wordOneRDD.reduceByKey(_ + _)//按照单词出现次数 降序排序 sortByKey()只能按照key排序,降序是false,true是升序val sortedRDD: RDD[(String, Int)] = wordCountRDD.sortBy(tuple => tuple._2, false)//打印结果sortedRDD.foreach(tuple => {println("单词" + tuple._1 + "出现的次数" + tuple._2)})/*** 单词计数:流式编程,函数式编程*//*sc.textFile("G:\\workspace\\_2018_4_22\\src\\hello.txt").flatMap(_.split(",")).map((_, 1)).reduceByKey(_ + _).sortBy(_._2).foreach(tuple => {println(tuple._1 + " " + tuple._2)})*/sc.stop()}
}
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;import java.util.Arrays;
import java.util.Iterator;public class WordCount_java {/*** java-7* 没有lambda表达式* 单词计数程序*/public static void main(String[] args) {//配置文件final SparkConf conf = new SparkConf();conf.setMaster("local");conf.setAppName("WordCount_java");//初始化程序入口final JavaSparkContext sc = new JavaSparkContext(conf);final JavaRDD<String> fileRDD = sc.textFile("G:\\workspace\\_2018_4_22\\src\\hello.txt");//在java中我们需要传入FlatMapFunction类型,第一个是输入的数据类型,第二个参数是输出的数据类型,里面要实现Iterator抽象方法final JavaRDD<String> wordRDD = fileRDD.flatMap(new FlatMapFunction<String, String>() {@Overridepublic Iterator<String> call(String line) throws Exception {//固定操作return Arrays.asList(line.split(",")).iterator();}});//只要是ByKey方法就要求RDD里面必须是Key-value键值对类型//下面注释的代码是错误的,我们正常的逻辑希望最后有一个reduceByKey()但是没有// map()方法,要什么类型就new什么/* final JavaRDD<HashMap<String, Integer>> wordOneRDD = wordRDD.map(new Function<String, HashMap<String, Integer>>() {@Overridepublic HashMap call(String word) throws Exception {final HashMap<String, Integer> map = new HashMap<>();map.put(word, 1);return null;}});*///我们只要是做什么ByKey的操作需要转换成ToPair,sortByKey,groupByKey,reduceByKeyfinal JavaPairRDD<String, Integer> wordOneRDD = wordRDD.mapToPair(new PairFunction<String, String, Integer>() {@Overridepublic Tuple2<String, Integer> call(String word) throws Exception {return new Tuple2<>(word, 1);}});final JavaPairRDD<String, Integer> wordCountRDD = wordOneRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {@Overridepublic Integer call(Integer i1, Integer i2) throws Exception {return i1 + i2;}});final JavaPairRDD<Integer, String> count2wordRDD = wordCountRDD.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {@Overridepublic Tuple2<Integer, String> call(Tuple2<String, Integer> tuple) throws Exception {return new Tuple2<Integer, String>(tuple._2, tuple._1);}});final JavaPairRDD<Integer, String> sortedRDD = count2wordRDD.sortByKey(false);sortedRDD.foreach(new VoidFunction<Tuple2<Integer, String>>() {@Overridepublic void call(Tuple2<Integer, String> tuple) throws Exception {System.out.println("单词:" + tuple._2 + "次数:" + tuple._1);}});sc.stop();}
}
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import java.util.Arrays;
import java.util.List;public class WordCount_java8 {/*** JAVA-8开发单词计数程序*/public static void main(String[] args) {//配置文件final SparkConf conf = new SparkConf();conf.setMaster("local");conf.setAppName("WordCount_java8");//初始化程序入口final JavaSparkContext sc = new JavaSparkContext(conf);final JavaRDD<String> fileRDD = sc.textFile("G:\\workspace\\_2018_4_22\\src\\hello.txt");final JavaRDD<String> wordRDD = fileRDD.flatMap(line -> Arrays.asList(line.split(",")).iterator());final JavaPairRDD<String, Integer> wordOneRDD = wordRDD.mapToPair(word -> new Tuple2<String, Integer>(word, 1));final JavaPairRDD<String, Integer> wordCountRDD = wordOneRDD.reduceByKey((m, n) -> m + n);final JavaPairRDD<Integer, String> count2wordRDD = wordCountRDD.mapToPair(tuple -> new Tuple2<>(tuple._2, tuple._1));final JavaPairRDD<Integer, String> sortedRDD = count2wordRDD.sortByKey(false);sortedRDD.foreach(tuple->{System.out.println("单词:"+tuple._2+"次数:"+tuple._1);});sc.stop();//流式写法TopN  take表示取前几个final List<Tuple2<Integer, String>> result = sc.textFile("G:\\workspace\\_2018_4_22\\src\\hello.txt").flatMap(line -> Arrays.asList(line.split(",")).iterator()).mapToPair(word -> new Tuple2<String, Integer>(word, 1)).reduceByKey((m, n) -> m + n).mapToPair(tuple -> new Tuple2<String,Integer>(tuple._2, tuple._1)).sortByKey(false).take(2);result.forEach(t->{System.out.println(t._2()+" "+t._1());});}
}

Spark-core开发笔记相关推荐

  1. 第10课:底实战详解使用Java开发Spark程序学习笔记

    第10课:底实战详解使用Java开发Spark程序学习笔记 本期内容: 1. 为什么要使用Java? 2. 使用Java开发Spark实战 3. 使用Java开发Spark的Local和Cluster ...

  2. 第10课:底实战详解使用Java开发Spark程序学习笔记(二)

    Maven下的Spark配置: http://maven.outofmemory.cn/org.apache.spark,这个网站提供了Spark core.Spark Streaming使用Mave ...

  3. .NET Core开发实战(第5课:依赖注入:良好架构的起点)--学习笔记(上)

    05 | 依赖注入:良好架构的起点 为什么要使用依赖注入框架 借助依赖注入框架,我们可以轻松管理类之间的依赖,帮助我们在构建应用时遵循设计原则,确保代码的可维护性和可扩展性 ASP.NET Core ...

  4. Spark基础学习笔记10:Scala集成开发环境

    文章目录 零.本讲学习目标 一.搭建Scala的Eclipse开发环境 (一)安装Scala插件 (二)创建Scala项目 二.搭建Scala的IntelliJ IDEA开发环境 (一)启动IDEA ...

  5. 大数据开发笔记(八):Spark综合笔记总结

      ✨大数据开发笔记推荐: 大数据开发面试知识点总结_GoAI的博客-CSDN博客_大数据开发面试​本文详细介绍大数据hadoop生态圈各部分知识,包括不限于hdfs.yarn.mapreduce.h ...

  6. Spark Core笔记

    文章目录 Spark环境 wordcount 本地 Standalone 修改配置文件 关联日志 HA 配置历史服务器 yarn mac本地模式 Spark架构 RDD RDD的核心属性 RDD创建 ...

  7. 第9课:IDEA下的spark程序开发

    第9课:IDEA下的spark程序开发 1.下载IntelliJ IDEA: http://www.jetbrains.com/idea/ 选择社区版,要在centos上安装,需要下载.TARGZ,解 ...

  8. 2021年大数据Spark(十八):Spark Core的RDD Checkpoint

    目录 RDD Checkpoint 引入 API 代码演示 总结:持久化和Checkpoint的区别 问题: 答案: 区别: RDD Checkpoint 引入 RDD 数据可以持久化,但是持久化/缓 ...

  9. sparkcore分区_Spark学习:Spark源码和调优简介 Spark Core (二)

    本文基于 Spark 2.4.4 版本的源码,试图分析其 Core 模块的部分实现原理,其中如有错误,请指正.为了简化论述,将部分细节放到了源码中作为注释,因此正文中是主要内容. 第一部分内容见: S ...

  10. 张高兴的 Windows 10 IoT 开发笔记:BH1750FVI 光照度传感器

    张高兴的 Windows 10 IoT 开发笔记:BH1750FVI 光照度传感器 原文:张高兴的 Windows 10 IoT 开发笔记:BH1750FVI 光照度传感器 BH1750FVI 是一款 ...

最新文章

  1. 《QTP自动化测试进阶》(1)
  2. Android Interactive Animation
  3. java面试题三 位运算符
  4. 一步一步学Silverlight 2系列(20):如何在Silverlight中与HTML DOM交互(下)
  5. 『设计模式』Web程序开发最基本的编程模式--MVC编程模式
  6. windows 邮槽mailslot 在服务程序内建立后客户端无权限访问(GetLastError() == 5)的问题...
  7. java 动态代理
  8. 家里也是不知不觉就电脑有不能开启了
  9. mac 下更新python
  10. 复函数图像怎么画_如何画出复平面上的网格在复函数映射下的图像?
  11. linux升级gnome,linux – Gnome shell特权升级
  12. matlab自带的信号,实验一 连续时间信号在MATLAB中的表示..ppt
  13. 树莓派安装FFTW,linux安装库不生成.so库,拷贝.so .a,按时间查看文件的命令
  14. spring中cglib动态代理
  15. mmd动作:Bad End Night
  16. 两招快速教会你们PDF怎么转图片jpg格式
  17. 【计算机二级Python】模拟试卷第4套选择题
  18. 美化你的Typora
  19. 苹果发布的是iPphone 4s,而不是iPhone 5
  20. 强制使用ie浏览器使用最高版本

热门文章

  1. ionic3学习历程(1)
  2. 《游戏设计艺术(第2版)》——学习笔记(30)第30章 设计师要向客户推销自己的想法
  3. 一个简单的CPU降温方法.
  4. EEPROM存储和读取函数介绍
  5. Mac vi 下 提示 E32:NO file name
  6. uni-app实现蓝牙打印小票
  7. php yii composer,PHP Yii2 composer环境安装
  8. python画像素分布图
  9. linux img 内核启动,【Linux必知必会】initrd.img、vmlinux和 vmlinuz
  10. 32. 注入篇——基于HTTP头部的注入