1、创建一个1-10数组的RDD,将所有元素*2形成新的RDD

scala> val rdd1 = sc.parallelize(1 to 10)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:24scala> val rdd2 = rdd1.map(_ * 2)
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[3] at map at <console>:26scala> rdd2.collect
res1: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18, 20)val rdd1 = sc.parallelize(1 to 10)
val  rdd2 = rdd1.map(_*2)
rdd2 .collect  

2、创建一个10-20数组的RDD,使用mapPartitions将所有元素*2形成新的RDD


val rdd1 =sc.makeRDD  (10 to 20 )
cal rdd2 = rdd1.mapPartitions(_.map(_*2)

3、创建一个元素为 1-5 的RDD,运用 flatMap创建一个新的 RDD,新的 RDD 为原 RDD 每个元素的 平方和三次方 来组成 1,1,4,8,9,27..


scala> val rdd1 = sc.parallelize(Array(1,2,3,4,5))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at <console>:24scala>  rdd1.flatMap(x => Array(x * x, x * x * x))
res7: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[9] at flatMap at <console>:27scala> res7.collect
res8: Array[Int] = Array(1, 1, 4, 8, 9, 27, 16, 64, 25, 125)

4、创建一个 4 个分区的 RDD数据为Array(10,20,30,40,50,60),使用glom将每个分区的数据放到一个数组


scala> var rdd1 = sc.parallelize(Array(10,20,30,40,50,60), 4)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[10] at parallelize at <console>:24scala> rdd1.glom.collect
res9: Array[Array[Int]] = Array(Array(10), Array(20, 30), Array(40), Array(50, 60))

5、创建一个 RDD数据为Array(1, 3, 4, 20, 4, 5, 8),按照元素的奇偶性进行分组


scala> val rdd1 = sc.makeRDD(Array(1, 3, 4, 20, 4, 5, 8))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at makeRDD at <console>:24scala> rdd1.groupBy(x =>if(x % 2 == 1) "odd"else"even")
res10: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[14] at groupBy at <console>:27scala> res10.collect
res11: Array[(String, Iterable[Int])] = Array((even,CompactBuffer(4, 20, 4, 8)), (odd,CompactBuffer(1, 3, 5)))

6、创建一个 RDD(由字符串组成)Array("xiaoli", "laoli", "laowang", "xiaocang", "xiaojing", "xiaokong"),过滤出一个新 RDD(包含“xiao”子串)


scala> val names = sc.parallelize(Array("xiaoli", "laoli", "laowang", "xiaocang", "xiaojing", "xiaokong"))
names: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[15] at parallelize at <console>:24scala> names.filter(_.contains("xiao"))
res12: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[16] at filter at <console>:27scala> res12.collect
res13: Array[String] = Array(xiaoli, xiaocang, xiaojing, xiaokong)

7、创建一个 RDD数据为1 to 10,请使用sample不放回抽样


scala> val rdd1 = sc.parallelize(1 to 10)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[17] at parallelize at <console>:24scala> rdd1.sample(false, 0.5).collect
res14: Array[Int] = Array(1, 2, 5, 6)

8、创建一个 RDD数据为1 to 10,请使用sample放回抽样


scala> rdd1.sample(true, 2).collect
res15: Array[Int] = Array(1, 1, 2, 2, 3, 3, 4, 4, 5, 7, 7, 7, 8, 8, 8, 9, 9, 9, 10, 10)

9、创建一个 RDD数据为Array(10,10,2,5,3,5,3,6,9,1),对 RDD 中元素执行去重操作


scala> val rdd1 = sc.parallelize(Array(10,10,2,5,3,5,3,6,9,1))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[20] at parallelize at <console>:24scala> rdd1.distinct().collect
res16: Array[Int] = Array(1, 9, 5, 6, 10, 2, 3)

10、创建一个分区数为5的 RDD,数据为0 to 100,之后使用coalesce再重新减少分区的数量至 2

scala> val rdd1 = sc.parallelize(0 to 100, 5)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24scala> rdd1.partitions.length
res0: Int = 5scala> rdd1.coalesce(2) // 减少分区的数量为2
res1: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[1] at coalesce at <console>:27scala> res1.partitions.length
res2: Int = 2

11、创建一个分区数为5的 RDD,数据为0 to 100,之后使用repartition再重新减少分区的数量至 3

repartition(numPartitions)作用: 根据新的分区数, 重新 shuffle 所有的数据, 这个操作总会通过网络.新的分区数相比以前可以多, 也可以少scala> val rdd1 = sc.parallelize(0 to 100, 5)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:24scala> rdd1.repartition(3)
res3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[6] at repartition at <console>:27scala> res3.partitions.length
res5: Int = 3

12、创建一个 RDD数据为1,3,4,10,4,6,9,20,30,16,请给RDD进行分别进行升序和降序排列


sortBy(func,[ascending], [numTasks])
作用: 使用func先对数据进行处理,按照处理后结果排序,默认为正序。scala> val rdd1 = sc.parallelize(Array(1,3,4,10,4,6,9,20,30,16))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[11] at parallelize at <console>:24scala> rdd1.sortBy(x => x).collect
res8: Array[Int] = Array(1, 3, 4, 4, 6, 9, 10, 16, 20, 30)scala> rdd1.sortBy(x => x, true).collect // 升序输出
res9: Array[Int] = Array(1, 3, 4, 4, 6, 9, 10, 16, 20, 30)scala> rdd1.sortBy(x => x, false).collect // 降序输出
res10: Array[Int] = Array(30, 20, 16, 10, 9, 6, 4, 4, 3, 1)

13、创建两个RDD,分别为rdd1和rdd2数据分别为1 to 6和4 to 10,求并集


union(otherDataset)作用:求并集. 对源 RDD 和参数 RDD 求并集后返回一个新的 RDDscala> val rdd1 = sc.parallelize(1 to 6)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[7] at parallelize at <console>:24scala> val rdd2 = sc.parallelize(4 to 10)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at <console>:24scala> rdd1.union(rdd2)
res4: org.apache.spark.rdd.RDD[Int] = UnionRDD[9] at union at <console>:29scala> res4.collect
res5: Array[Int] = Array(1, 2, 3, 4, 5, 6, 4, 5, 6, 7, 8, 9, 10)

14、创建两个RDD,分别为rdd1和rdd2数据分别为1 to 6和4 to 10,计算差集,两个都算


subtract (otherDataset) 作用: 计算差集. 从原 RDD 中减去 原 RDD 和 otherDataset 中的共同的部分scala> val rdd1 = sc.parallelize(1 to 6)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[7] at parallelize at <console>:24scala> val rdd2 = sc.parallelize(4 to 10)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at <console>:24scala> rdd1.subtract(rdd2).collect
res0: Array[Int] = Array(2, 1, 3)scala> rdd2.subtract(rdd1).collect
res7: Array[Int] = Array(8, 9, 10, 7)

15、创建两个RDD,分别为rdd1和rdd2数据分别为1 to 6和4 to 10,计算交集


intersection(otherDataset)作用: 计算交集. 对源 RDD 和参数 RDD 求交集后返回一个新的 RDDscala> val rdd1 = sc.parallelize(1 to 6)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[7] at parallelize at <console>:24scala> val rdd2 = sc.parallelize(4 to 10)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at <console>:24scala> rdd1.intersection(rdd2).collect
res2: Array[Int] = Array(4, 5, 6) 

16、创建两个RDD,分别为rdd1和rdd2数据分别为1 to 6和4 to 10,计算 2 个 RDD 的笛卡尔积


cartesian(otherDataset)作用: 计算 2 个 RDD 的笛卡尔积. 尽量避免使用scala> val rdd1 = sc.parallelize(1 to 6)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[7] at parallelize at <console>:24scala> val rdd2 = sc.parallelize(4 to 10)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at <console>:24scala> val rdd3 = rdd1.cartesian(rdd2)
rdd3: org.apache.spark.rdd.RDD[(Int, Int)] = CartesianRDD[10] at cartesian at <console>:28scala> rdd3.collect
res2: Array[(Int, Int)] = Array((1,4), (1,5), (1,6), (2,4), (2,5), (2,6), (3,4), (3,5), (3,6), (1,7), (1,8), (1,9), (1,10), (2,7), (2,8), (2,9), (2,10), (3,7), (3,8), (3,9), (3,10), (4,4), (4,5), (4,6), (5,4), (5,5), (5,6), (6,4), (6,5), (6,6), (4,7), (4,8), (4,9), (4,10), (5,7), (5,8), (5,9), (5,10), (6,7), (6,8), (6,9), (6,10))

17、创建两个RDD,分别为rdd1和rdd2数据分别为1 to 5和11 to 15,对两个RDD拉链操作

zip(otherDataset)作用: 拉链操作. 需要注意的是, 在 Spark 中, 两个 RDD 的元素的数量和分区数都必须相同, 否则会抛出异常.(在 scala
中, 两个集合的长度可以不同)。本质就是要求的每个分区的元素的数量相同。scala> val rdd1 = sc.parallelize(1 to 5)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at <console>:24scala> val rdd2 = sc.parallelize(11 to 15)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[9] at parallelize at <console>:24scala> rdd1.zip(rdd2).collect
res3: Array[(Int, Int)] = Array((1,11), (2,12), (3,13), (4,14), (5,15))

18、创建一个RDD数据为List(("female",1),("male",5),("female",5),("male",2)),请计算出female和male的总数分别为多少


reduceByKey(func, [numTasks])作用: 在一个(K,V)的 RDD 上调用,返回一个(K,V)的
RDD,使用指定的reduce函数,将相同key的value聚合到一起,reduce任务的个数可以通过第二个可选的参数来设置。scala> val rdd1 = sc.parallelize(List(("female",1),("male",5),("female",5),("male",2)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[13] at parallelize at <console>:24scala>  rdd1.reduceByKey(_ + _)
res6: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[14] at reduceByKey at <console>:27scala> res6.collect
res8: Array[(String, Int)] = Array((female,6), (male,7))

19、创建一个有两个分区的 RDD数据为List(("a",3),("a",2),("c",4),("b",3),("c",6),("c",8)),取出每个分区相同key对应值的最大值,然后相加


aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])使用给定的 combine 函数和一个初始化的zero value, 对每个key的value进行聚合. 这个函数返回的类型U不同于源
RDD 中的V类型. U的类型是由初始化的zero value来定的. 所以, 我们需要两个操作: - 一个操作(seqOp)去把 1
个v变成 1 个U - 另外一个操作(combOp)来合并 2 个U 第一个操作用于在一个分区进行合并, 第二个操作用在两个分区间进行合并.
为了避免内存分配, 这两个操作函数都允许返回第一个参数, 而不用创建一个新的U 参数描述:zeroValue:给每一个分区中的每一个key一个初始值;
seqOp:函数用于在每一个分区中用初始值逐步迭代value;
combOp:函数用于合并每个分区中的结果。scala> val rdd = sc.parallelize(List(("a",3),("a",2),("c",4),("b",3),("c",6),("c",8)),2)
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:24scala> rdd.aggregateByKey(Int.MinValue)(math.max(_, _), _ +_)
res0: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[1] at aggregateByKey at <console>:27scala> res0.collect
res1: Array[(String, Int)] = Array((b,3), (a,3), (c,12))

20、 创建一个有两个分区的 pairRDD数据为Array(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98)),根据 key 计算每种 key 的value的平均值


combineByKey解 :
遍历过程中,统计a和b的个数,同时计算总和。scala> val score2=rdd.combineByKey(x =>(1,x) ,| (c1:(Int,Int),newScore)=>(c1._1+1,c1._2+newScore),| (c1:(Int,Int),c2:(Int,Int))=>(c1._1+c2._1,c1._2+c2._2))]score2: org.apache.spark.rdd.RDD[(String, (Int, Int))] = ShuffledRDD[31] at combineByKey at <console>:26得到编号ab:个数+总和
scala> score2.foreach(println)
(b,(3,286))
(a,(3,274))// 求平均值
scala>                                    val average=score2.map{case(name, (num,score) )=>(name,score/num) }average: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[32] at map at <console>:28//  结果
scala> average.foreach(println)
(b,95)
(a,91)

21、统计出每一个省份广告被点击次数的 TOP3,数据在access.log文件中

数据结构:时间戳,省份,城市,用户,广告 字段使用空格分割。

样本如下:

1516609143867 6 7 64 16

1516609143869 9 4 75 18

1516609143869 1 7 87 12

RDD编程package Provinceimport org.apache.spark.{SparkConf, SparkContext}/** 统计出每一个省份广告被点击次数的TOP3* 数据结构:时间戳,省份,城市,用户,广告,中间字段使用空格分割。* 日志样例:* 1516609143867 6 7 64 16* 1516609143869 9 4 75 18* 1516609143869 1 7 87 12**/object Province {def main(args: Array[String]): Unit = {//1.初始化spark配置信息并建立与spark的连接val sparkConf = new SparkConf().setMaster("local[*]").setAppName("ProvinceOfAdvertising")val sc = new SparkContext(sparkConf)//2.读取数据生成RDD:TS,Province,City,User,ADval line = sc.textFile("Spark\\src\\main\\scala\\access.log")//3.按照最小粒度聚合:((Province,AD),1)val provinceAdToOne = line.map { x =>val fields: Array[String] = x.split(" ")((fields(1), fields(4)), 1)}//4.计算每个省中每个广告被点击的总数:((Province,AD),sum)val provinceAdToSum = provinceAdToOne.reduceByKey(_ + _)//5.将省份作为key,广告加点击数为value:(Province,(AD,sum))val provinceToAdSum = provinceAdToSum.map(x => (x._1._1, (x._1._2, x._2)))//6.将同一个省份的所有广告进行聚合(Province,List((AD1,sum1),(AD2,sum2)...))val provinceGroup = provinceToAdSum.groupByKey()//7.对同一个省份所有广告的集合进行排序并取前3条,排序规则为广告点击总数.val provinceAdTop3 = provinceGroup.mapValues { x =>x.toList.sortWith((x, y) => x._2 > y._2).take(3)}//8.将数据拉取到Driver端并打印provinceAdTop3.collect().foreach(println)//9.关闭与spark的连接sc.stop()}}

22、读取本地文件words.txt,统计出每个单词的个数,保存数据到 hdfs 上


import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object WordCount {def main(args: Array[String]): Unit = {val master: SparkConf = new SparkConf().setAppName("WrodCount").setMaster("local[2]")val sc: SparkContext = new SparkContext(master)//读取文件val file: RDD[String] = sc.textFile("E:\\2020-传智资料1\\第二学期Spark\\day01_Spark\\练习题\\words.txt")//对文件中每一行单词进行压平切分val words: RDD[String] = file.flatMap(_.split(" "))//对每一个单词计数为1 转化为(单词,1)val wordAndOne: RDD[(String, Int)] = words.map(x=>(x,1))//相同的单词进行汇总 前一个下划线表示累加数据,后一个下划线表示新数据val result: RDD[(String, Int)] = wordAndOne.reduceByKey(_+_)//val finalResult: Array[(String, Int)] = result.collect()// finalResult.foreach(println)//保存数据到HDFSresult.saveAsTextFile(args(1))sc.stop()}}

23、读取 people.json 数据的文件, 每行是一个 json 对象,进行解析输出

import org.apache.spark._
import scala.util.parsing.json.JSON
object JSONApp {def main(args:Array[String]): Unit ={//初始化配置:设置主机名和程序主类的名字val conf = new SparkConf().setMaster("local").setAppName("JSONApp");//通过conf来创建sparkcontextval sc = new SparkContext(conf);val inputFile = "file:///usr/local/spark/people.json"//读取json文件val jsonStr = sc.textFile(inputFile);val result = jsonStr.map(s => JSON.parseFull(s));//逐个JSON字符串解析result.foreach({r => r match {case Some(map:Map[String,Any]) => println(map)case None => println("parsing failed!")case other => println("unknown data structure" + other)}});}
}

24、保存一个 SequenceFile 文件,使用spark创建一个RDD数据为Array(("a", 1),("b", 2),("c", 3)),保存为SequenceFile格式的文件到hdfs上


val ArrayList = Array(("a", 1),("b", 2),("c", 3))
val r = sc.makeRDD(ArrayList, 1)
r.saveAsObjectFile("hdfs:/your/path/ArrayList")val file = sc.sequenceFile[Null,org.apache.hadoop.io.BytesWritable]("hdfs:/path/ArrayList/part-00000")
val bw = file.take(1).apply(0)._2
val bs = bw.getBytes

25、读取24题的SequenceFile 文件并输出


import java.io._
val bis = new ByteArrayInputStream(bs)
val ois = new ObjectInputStream(bis)
ois.readObject

26、读写 objectFile 文件把 RDD 保存为objectFile,RDD数据为Array(("a", 1),("b", 2),("c", 3)),并进行读取出来

val data26_1 = sc.makeRDD(Array(("a", 1), ("b", 2), ("c", 3)))data26_1.saveAsObjectFile("src\\input\\objectFile")val data26_2 = sc.textFile(src\\input\\objectFile")

27、使用内置累加器计算Accumulator.txt文件中空行的数量

val data27 = sc.textFile("input/Accumulator.txt")var count = sc.longAccumulator("count")data27.foreach { x => if (x == "") count.add(1) }println(count.value)

28、使用Spark广播变量

用户表:

id name age gender(0|1)

001,刘向前,18,0

002,冯  剑,28,1

003,李志杰,38,0

004,郭  鹏,48,2

要求,输出用户信息,gender必须为男或者女,不能为0,1

使用广播变量把Map("0" -> "女", "1" -> "男")设置为广播变量,最终输出格式为

001,刘向前,18,女

003,李志杰,38,女

002,冯  剑,28,男

004,郭  鹏,48,男

val data = sc.textFile("input/user.txt")val sex = sc.broadcast(Map("0" -> "女", "1" -> "男"))data.foreach { x => var datas = x.split(","); println(datas(0) + "," + datas(1) + "," + datas(2) + "," + sex.value(datas(3))) }

29、mysql创建一个数据库bigdata0407,在此数据库中创建一张表

CREATE TABLE `user` (

`id` int(11) NOT NULL AUTO_INCREMENT,

`username` varchar(32) NOT NULL COMMENT '用户名称',

`birthday` date DEFAULT NULL COMMENT '生日',

`sex` char(1) DEFAULT NULL COMMENT '性别',

`address` varchar(256) DEFAULT NULL COMMENT '地址',

PRIMARY KEY (`id`)

) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

数据如下:

依次是:姓名 生日 性别 省份

安荷 1998/2/7 女 江苏省

白秋 2000/3/7 女 天津市

雪莲 1998/6/7 女 湖北省

宾白 1999/7/3 男 河北省

宾实 2000/8/7 男 河北省

斌斌 1998/3/7 男 江苏省

请使用spark将以上数据写入mysql中,并读取出来。

val data29 = sc.textFile("input20200407/users.txt")val driver = "com.mysql.jdbc.Driver"val url = "jdbc:mysql://localhost:3306/bigdata01"val username = "root"val password = "root"/*CREATE TABLE `t_student` (`id` int(11) NOT NULL AUTO_INCREMENT,`name` varchar(255) DEFAULT NULL,`age` int(11) DEFAULT NULL,PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8;*//*** MySQL插入数据*/data29.foreachPartition {data =>Class.forName(driver)val connection = java.sql.DriverManager.getConnection(url, username, password)val sql = "INSERT INTO `user` values (NULL,?,?,?,?)"data.foreach {tuples => {val datas = tuples.split(" ")val statement = connection.prepareStatement(sql)statement.setString(1, datas(0))statement.setString(2, datas(1))statement.setString(3, datas(2))statement.setString(4, datas(3))statement.executeUpdate()statement.close()}}connection.close()}
/*** MySQL查询数据*/var sql = "select * from `user` where id between ? and ?"val jdbcRDD = new JdbcRDD(sc,() => {Class.forName(driver)java.sql.DriverManager.getConnection(url, username, password)},sql,0,44,3,result => {println(s"id=${result.getInt(1)},username=${result.getString(2)}" +s",birthday=${result.getDate(3)},sex=${result.getString(4)},address=${result.getString(5)}")})jdbcRDD.collect() 

30、在hbase中创建一个表student,有一个 message列族

create 'student', 'message'

scan 'student', {COLUMNS => 'message'}

给出以下数据,请使用spark将数据写入到hbase中的student表中,并进行查询出来

数据如下:

依次是:姓名 班级 性别 省份,对应表中的字段依次是:name,class,sex,province

飞松 3 女 山东省

刚洁 1 男 深圳市

格格 4 女 四川省

谷菱 5 女 河北省

国立 2 男 四川省

海涛 3 男 广东省

含芙 3 女 四川省

华敏 4 女 上海市

乐和 2 男 上海市

乐家 3 男 黑龙江

乐康 4 男 湖北省

乐人 5 男 四川省

乐水 3 男 北京市

乐天 4 男 河北省

乐童 5 男 江苏省

乐贤 1 男 陕西省

乐音 2 男 广东省

李仁 3 男 湖北省

立涛 3 女 陕西省

凌青 4 女 湖北省

陆涛 4 男 山东省

媚媚 5 女 河南省

梦亿 4 男 江苏省

铭忠 5 男 四川省

慕梅 3 女 北京市

鹏吉 1 男 上海市

娉婷 4 女 河南省

淇峰 2 男 广东省

庆元 3 男 上海市

庆滋 4 男 北京市

丘东 5 男 江苏省

荣郑 1 男 黑龙江

蕊蕊 5 女 四川省

尚凯 2 男 北京市

诗涵 1 女 河南省

淑凤 2 女 天津市

淑娇 3 女 上海市

淑燕 4 女 河北省

淑怡 4 女 广东省

思璇 2 女 湖北省

苏华 3 女 山东省

苏梅 4 女 四川省

听荷 5 女 深圳市

文怡 1 女 天津市

文怡 2 女 河北省

香凝 3 女 山东省

翔云 4 女 河南省

小芸 5 女 深圳市

/*** * create 'student', 'message'* scan 'student', {COLUMNS => 'message'}* 给出以下数据,请使用spark将数据写入到hbase中的student表中,并进行查询出来* 数据如下:* 依次是:姓名 班级 性别 省份,对应表中的字段依次是:name,class,sex,province*///org.apache.hadoop.hbase.mapreduce.TableInputFormatval conf = HBaseConfiguration.create()conf.set("hbase.zookeeper.quorum", "node01:2181,node02:2181,node03:2181")    conf.set(TableInputFormat.INPUT_TABLE, "student")/*** HBase插入数据*/val dataRDD: RDD[String] = sc.textFile("input20200407/student.txt")val putRDD: RDD[(ImmutableBytesWritable, Put)] = dataRDD.map {// 刚洁   1   男   深圳市case line => {val datas = line.split("\t")val rowkey = Bytes.toBytes(datas(0))val put = new Put(rowkey)put.addColumn(Bytes.toBytes("message"), Bytes.toBytes("name"), Bytes.toBytes(datas(0)))put.addColumn(Bytes.toBytes("message"), Bytes.toBytes("class"), Bytes.toBytes(datas(1)))put.addColumn(Bytes.toBytes("message"), Bytes.toBytes("sex"), Bytes.toBytes(datas(2)))put.addColumn(Bytes.toBytes("message"), Bytes.toBytes("province"), Bytes.toBytes(datas(3)))(new ImmutableBytesWritable(rowkey), put)}}val jobConf = new JobConf(conf)//org.apache.hadoop.hbase.mapred.TableOutputFormatjobConf.setOutputFormat(classOf[TableOutputFormat])jobConf.set(TableOutputFormat.OUTPUT_TABLE, "student")putRDD.saveAsHadoopDataset(jobConf)/*** HBase查询数据*/val hbaseRDD: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],classOf[ImmutableBytesWritable],classOf[Result])hbaseRDD.foreach {case (rowKey, result) => {val cells: Array[Cell] = result.rawCells()for (cell <- cells) {println(Bytes.toString(CellUtil.cloneRow(cell)) + "\t" +Bytes.toString(CellUtil.cloneFamily(cell)) + "\t" +Bytes.toString(CellUtil.cloneQualifier(cell)) + "\t" +Bytes.toString(CellUtil.cloneValue(cell)))}}}

TobeContinue

Spark RDD 练习相关推荐

  1. Spark RDD/Core 编程 API入门系列之动手实战和调试Spark文件操作、动手实战操作搜狗日志文件、搜狗日志文件深入实战(二)...

    1.动手实战和调试Spark文件操作 这里,我以指定executor-memory参数的方式,启动spark-shell. 启动hadoop集群 spark@SparkSingleNode:/usr/ ...

  2. spark学习13(spark RDD)

    RDD及其特点 1)RDD(Resillient Distributed Dataset)弹性分布式数据集,是spark提供的核心抽象.它代表一个不可变.可分区.里面的元素可并行计算的集合 2)RDD ...

  3. Spark RDD API:Map和Reduce

    参考文章: http://blog.csdn.net/jewes/article/details/39896301 http://homepage.cs.latrobe.edu.au/zhe/Zhen ...

  4. 第14课:Spark RDD解密

    以下为Spark RDD解密课程学习心得: 在介绍Spark RDD之前,先简单的说下Hadoop MapReduce,它是基于数据流的方式进行计算,从物理存储上加载数据,然后操作数据, 最后写入到物 ...

  5. Spark学习之Spark RDD算子

    个人主页zicesun.com 这里,从源码的角度总结一下Spark RDD算子的用法. 单值型Transformation算子 map /*** Return a new RDD by applyi ...

  6. Spark RDD概念学习系列之rdd持久化、广播、累加器(十八)

    1.rdd持久化 2.广播 3.累加器 1.rdd持久化 通过spark-shell,可以快速的验证我们的想法和操作! 启动hdfs集群 spark@SparkSingleNode:/usr/loca ...

  7. spark RDD官网RDD编程指南

    http://spark.apache.org/docs/latest/rdd-programming-guide.html#using-the-shell Overview(概述) 在较高的层次上, ...

  8. 学习笔记Spark(三)—— Spark架构及原理(spark架构、spark RDD)

    一.Spark架构 1.1.基本组件 Cluster Manager 在standalone模式中即为Master主节点,控制整个集群,监控worker.在YARN模式中为资源管理器. Worker ...

  9. Spark RDD解密

    1.  基于数据集的处理: 从物理存储上加载数据,然后操作数据,然后写入数据到物理设备; 基于数据集的操作不适应的场景: 不适合于大量的迭代: 不适合交互式查询:每次查询都需要对磁盘进行交互. 基于数 ...

  10. Spark RDD并行度与分区设置

    默认情况下,Spark 可以将一个作业切分多个任务后,发送给 Executor 节点并行计算,而能 够并行计算的任务数量我们称之为并行度.这个数量可以在构建 RDD 时指定.记住,这里 的并行执行的任 ...

最新文章

  1. CPU 是如何理解 01 二进制的?
  2. 在不root手机的情况上读取Data目录上的文件
  3. linux下php远程连接mysql_Linux下PHP远程连接Oracle数据库 | 系统运维
  4. 文件夹的位置_感觉电脑变卡了?删除这5个文件夹 C盘立马清爽
  5. 微信读书vscode插件_vscode 常用的插件
  6. 日期的前端验证 jquery
  7. 阿里云ECS主机 Ubuntu下设置Mysql 5.7.17为远程访问
  8. c语言代码格式的简单介绍
  9. 整整240套Axure原型设计元件库 组件库 控件库分享
  10. keil5安装教程及下载
  11. 谷歌(google)、百度、必应d等高级搜索指令使用
  12. 关于MYM码支付系统
  13. 福布斯发布2013中国潜力上市公司100强(表)
  14. mac lion 系统安装
  15. 68.qq号索引结构体写入内存,并实现快速排序
  16. 谁给国潮榜样的他“抄下去”的理由
  17. 简单句 - 主谓/主谓宾/主系表的分析
  18. Axure-9 日历选择制作
  19. ADSL宽带共享上网设置图解
  20. PS插件cutterman从安装到使用

热门文章

  1. 使用STN的行人属性识别
  2. 计算机 64虚拟内存设置方法,win7 64位系统虚拟内存设置及虚拟内存太小的影响...
  3. linux安装Ice3.7 c++
  4. 使用mybatisplus进行分页查询total总为0的原因
  5. Windows系统下GIT生成密钥和添加密钥git
  6. Toast的几种用法
  7. Java 数据填充到word模板中
  8. Centos7下安装Relion
  9. 如何将自己写的项目发布到外网上
  10. ElasticSearch创建索引映射文档+IK分词器