Spark基础脚本入门实践2:基础开发
1、最基本的Map用法
val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)
val result = distData.map(x=>x*x)
println(result.collect().mkString(","))
其中最关键的操作就是:从分布式数据集 --转换--> 并行数据集
from a distributed dataset to Parallelized collections
Spark分布式数据集包含:
- local file system
- HDFS
- Cassandra
- HBase
- Amazon S3
Spark supports text files, SequenceFiles, and any other Hadoop InputFormat.
- 比如文件:val distFile = sc.textFile("data.txt")
- 比如hdfs:hdfs://
- 比如s3:s3n://
读取文件时需要注意的是:
- 如果使用的是本地文件路径,那么worker节点一定是有访问权限的.
- 文本文件的访问方式: textFile("/my/directory"), textFile("/my/directory/*.txt"), and textFile("/my/directory/*.gz").
- hdfs系统会把文件按128MB进行分区
2、从外部文件系统获取数据
val lines = sc.textFile("file:///usr/local/spark/examples/src/main/resources/people.json")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)
3、flatMap用法
flatMap的做法有点象把迭代器拍扁拍碎,比如以下代码
val lines = sc.parallelize(List("hi man","ha girl"))
val wordsFlatmap = lines.flatMap(line=>line.split(" "))
val wordsMap = lines.map(line=>line.split(" "))
看看区别:
scala> wordsFlatmap.first
res9: String = hi
scala> wordsMap.first
res10: Array[String] = Array(hi, man)
实际上wordsFlatmap被拆成了4个string,而wordsMap是对输入的list每个元素进行了split操作,所以说flatMap的做法有点象把迭代器拍扁拍碎。比如说分词就容易用flatMap
4、笛卡尔积
在推荐系统中,要计算各用户对多个产品的兴趣度,就可以制作一个笛卡尔积,用于比较用户的的喜爱产品的相似度。
val man = sc.parallelize(List("Tom","Cat"))
val product = sc.parallelize(List("car","iphone","android","surfacePro"))
val result = man.cartesian(product)
result.collect
运行结果:
res0: Array[(String, String)] = Array((Tom,car), (Tom,iphone), (Tom,android), (Tom,surfacePro), (Cat,car), (Cat,iphone), (Cat,android), (Cat,surfacePro))
笛卡儿计算是很恐怖的,它会迅速消耗大量的内存,所以在使用这个函数的时候请小心
5、cache操作
在spark中使用cache是非常重要的,因为行动操作都是惰性求值,每次都会重新计算所有的依赖,如果有大量迭代,代价巨大。
缓存就可以从内容读取,无需再次计算
scala> var data = sc.parallelize(List(1,2,3,4))
data: org.apache.spark.rdd.RDD[Int] =
ParallelCollectionRDD[44] at parallelize at <console>:12
scala> data.getStorageLevel
res65: org.apache.spark.storage.StorageLevel =
StorageLevel(false, false, false, false, 1)
scala> data.cache
res66: org.apache.spark.rdd.RDD[Int] =
ParallelCollectionRDD[44] at parallelize at <console>:12
scala> data.getStorageLevel
res67: org.apache.spark.storage.StorageLevel =
StorageLevel(false, true, false, true, 1)
我们先是定义了一个RDD,然后通过getStorageLevel函数得到该RDD的默认存储级别,这里是NONE。然后我们调用cache函数,将RDD的存储级别改成了MEMORY_ONLY(看StorageLevel的第二个参数)
6、检查点
将生成的RDD保存到外部可靠的存储当中,对于一些数据跨度为多个bactch的有状态tranformation操作来说,checkpoint非常有必要,因为在这些transformation操作生成的RDD对前一RDD有依赖,随着时间的增加,依赖链可能会非常长,checkpoint机制能够切断依赖链,将中间的RDD周期性地checkpoint到可靠存储当中,从而在出错时可以直接从checkpoint点恢复。
val data = sc.parallelize(1 to 100 , 5)
sc.setCheckpointDir("/myCheckPoint")
data.checkpoint
data.count
7、cogroup组合
将多个RDD中同一个Key对应的Value组合到一起。
scala> val data1 = sc.parallelize(List((1, "www"), (2, "bbs")))
scala> val data2 = sc.parallelize(List((1, "iteblog"), (2, "iteblog"), (3, "very")))
scala> val data3 = sc.parallelize(List((1, "com"), (2, "com"), (3, "good")))
scala> val result = data1.cogroup(data2, data3)
scala> result.collect
res30: Array[(Int, (Iterable[String], Iterable[String], Iterable[String]))] =
Array((1,(CompactBuffer(www),CompactBuffer(iteblog),CompactBuffer(com))),
(2,(CompactBuffer(bbs),CompactBuffer(iteblog),CompactBuffer(com))),
(3,(CompactBuffer(),CompactBuffer(very),CompactBuffer(good))))
8、广播变量
广播变量是通过调用sparkcontext从变量v创建。广播变量是V的包装器,它的值可以通过调用值方法来访问。下面的代码显示了这一点:
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)
scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)
在创建广播变量之后,应该使用它在集群上运行的任何函数中代替V值,这样v就不会不止一次地发送到节点。此外,对象v在广播之后不应该被修改,以确保所有节点获得相同的广播变量值(例如,如果变量稍后被运送到新节点)。
9、累加器
累加器一般用来累计和计数
val accum = sc.longAccumulator("My Accumulator")
//计数
sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(1))
accum.value
res1: Long = 4
//累加
sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
accum.value
res2: Long = 10
Spark基础脚本入门实践2:基础开发相关推荐
- sql数据库教程百度云_【推荐】零基础水彩画入门教程|零基础水彩教程百度云...
零基础水彩画入门教程|零基础水彩教程百度云! 照着教程画却总是画不好,这些水彩技法你真的学会了吗? 盲目地照着葫芦画瓢,不懂控制确实很难学会,可以关注一下公众号:每日学绘画,可以领取水彩电子书和全套视 ...
- 深入了解计算机基础,计算机入门《计算机基础知识》(全16集)
计算机入门<计算机基础知识>是一套比较经典的老教程,里面个别章节有些过时,但是原的东西讲的很好永不过时,对于想了解电脑.计算机的初学者是很好的教程,课程讲的十分详细.深入浅出,相对于< ...
- R语言基础与入门实践
熟练使用R软件 实践1:最初几步 x=1:100#把1,2,...,100个整数向量赋值到x (x=1:100) #同上, 只不过显示出来 sample(x,20) #从1,...,100中随机不放回 ...
- 【Java基础快速入门】概述及开发环境搭建
概述及开发环境搭建 概述 历史 发展历程 运行机制 模块图 环境搭建 程序示例 注释 标识符 概述 历史 SUN:Stanford University Network 1982年成立 发展历程 Oa ...
- c语言基础知识入门(c语言基础知识入门代码)
C语言怎么入门 初学C语言的基本 一开始 由算法开始,毕竟你以前学的算法和c有所不同 然后开始学程序流程控制 接着学内存处理机制 提高深入编程 然后与其他软件 如sql web 联系起来学习 后面的 ...
- Java基础笔记(入门,语法基础,流程控制,数组)
Java语言入门 发展历程 三大技术体系 JavaSE 标准版 JavaEE 企业版 JavaME 嵌入式版 Java语言特点 简单性 健壮性 面向对象 分布式 多线程 动态性 可移植性(跨平台) J ...
- java基础快速入门--面向对象(基础)
类与对象 看一个养猫问题 张老太养了两只猫:一只名字叫小白,今年三岁,白色.还有一只叫小花,今年一百岁,花色.请编写一个程序,当用户输入小猫的名字时,就显示该猫的名字,年龄,颜色.如果用户输入的小猫名 ...
- 零基础python入门课程-零基础 Python 入门
退款政策是如何规定的? 如果订阅,您可以获得 7 天免费试听,在此期间,您可以取消课程,无需支付任何罚金.在此之后,我们不会退款,但您可以随时取消订阅.请阅读我们完整的退款政策. 我可以只注册一门课程 ...
- python基础语法入门大全_python 基础语法——快速入门
今天来讲一些老生常谈,但凡学习一门语言都逃不过基本的语法,我们也来叨逼叨逼.不过不想事无巨细地讲,因为没有意义,估计讲完了大家都忘记了,我们挑钟爱你,其他的可以自学. 变量,学习一门语言第一件事可能是 ...
最新文章
- workerman结合laravel开发在线聊天应用的示例代码
- Oracle Study之--PL/SQL Developer软件错误
- 每日一皮:史上最直观的单向循环链表,还不懂算我输!
- [***]HZOJ 跳房子
- zemax中非序列添加相位面_老王讲放射MRI脉冲序列的基本参数
- DLL入门浅析(2)——如何使用DLL
- vs 服务容器中已存在服务_敏捷基础设施和公共基础服务
- 解决 An invalid domain was specified for this cookie
- python编写数据库连接工具_详解使用Python写一个向数据库填充数据的小工具(推荐)...
- linux下三三维建模软件,SolidWorks是基于()原创的三维实体建模软件。A.UNIXB.WindowsC.LinuxD.Dos...
- JVM面试重点总结(一)——java内存区域与内存溢出异常
- 常用前端Js框架简介
- ASsd固态测试软件数据,128G固态硬盘各项测试数据评测报告
- VM在使用过程中开机频繁黑屏(VM14版本问题导致的,频繁开机黑屏)
- 使用Mediacoder压制带有图片的ass字幕
- 自建局域网 OTA 服务器
- 上传App Store的截图尺寸
- 求两个矢量的夹角(带正负)
- 计算机网络读书笔记(二)
- ffmpeg 添加视频加文字水印--drawtext 滤镜详解
热门文章
- linux 实时 网口 速率_linux 下查看网卡工作速率-阿里云开发者社区
- 计算机接口技术试题及答案,2014.3计算机接口技术总复习题及答案
- java 存储过程 数组参数_执行数组参数的存储过程
- JAVA操作HDFS API(hadoop)
- catkin_make与gtest出现冲突的问题与解决
- 一个web项目在myeclipse中add deployment时无法被识别出来的原因
- 对Bootloader(引导加载程序)的几点理解
- jquery常用选择器
- sqlserver2008登录sa失败
- 致27岁的老光棍天空