一  cache 和 persist 和 unpersist

1  cache 和 persist

1.1  cache 和 persist 的使用场景 (为什么使用 ?)

一个 application 多次触发 Action ,为了复用前面 RDD 计算好的数据 ,避免反复读取 HDFS (数据源) 中的数据和重复计算 .

1.2  persist ,可以将数据缓存到内存或持久化到磁盘[ executor 所在的内存或磁盘], 第一次触发Action 才放入到内存或磁盘 (第一次读取数据时速度会比较慢 ,边读取数据边将数据放到内存中--executor进程中), 以后再出发 Action 会读取缓存 , RDD 的数据再进行操作并且复用 persist 的数据(所以第二次读取时速度会比第一次快几倍 /十几倍或者是几十倍 ,因为是从内存中读取的)

只要( spark-shell 客户端) application 不停止 (sc.stop 或者 ctrl+C) ,cache 缓存到内存中的数据就一直在

1.3  cache 或 persist 的 RDD 或其子 RDD 多次触发 Action ,那么 cache 或 persist 才有意义

1.4  如果将数据缓存到内存 ,内存不够 ,以分区为单位 ,只缓存部分分区的数据

即当内存很小时 ,不能将 cache 的数据全部存在内存里面 ,只能存先前的一部分 ,后一部分就存不下了 ,当下次 Action 再读取RDD 的数据的时候 ,速度仍会比之前块 , 虽然有一部分数据要从头(HDFS)读取 ,都是另外一部分缓存到内存中了就可以从内存中读取 .

1.5  支持多种 StorageLevel ,可以将数据序列化 ,默认放入内存使用的是 java 对象的方式(Java Serialization)存储到内存中 ,但是很占用空间,存储的内容比原数据大3倍左右), 优点是速度快(因为不需要反序列化--Java Serialization) ,也可以使用其他的序列化方法 ,通常是指 Kryo Serialization---需要反序列化 (占用空间小 ,速度快---较优的数据压缩/序列化方法)

Kryo Serialization 比 Java Serialization 稍慢 ,因为 Kryo Serialization 需要反序列化

1.6  cache 底层调用的是 persist 方法 ,可以指定其他存储级别

存储级别有以下几种 : 

1)  MEMORY_ONLY : 默认只放在内存 ,不序列化 (当内存足够大 ,数据很小的情况下可以使用)

2)  MEMORY_ONLY_SER : 默认存放在内存 ,按指定方式序列化

3)  DISK_ONLY_2 : 存磁盘 ,且存 2 份(在不同机器的磁盘中--属于该 application 的Task 或 executor 所在机器的磁盘中 )

4)  MEMORY_AND_DISK_SER : 优先存放在内存 ,内存存不下再存放到磁盘 ,按指定方式序列化

1.7  cache 和 persist 方法 ,严格来说 ,不是 Transformation ,因为没有生成新的 RDD ,只是标记当前 RDD 要 cache 或 persist

1.8  最佳实践(最佳的方法)有以下两种方法 :

1)  原始的数据 ,结果整理过滤后或轻度聚合后再进行 cache 或 persist ,这样的效果最佳(大大减小了数据量 ,减少内存空间的使用) ;

2)  推荐使用的存储级别为 : MEMORY_AND_DISK_SER (优先将数据缓存到内存 ,如果内存放不下再序列化放到磁盘).

2 unpersist : 释放内存里面缓存的RDD数据

1)  unpersist(false) : 异步 ,内存中缓存的RDD数据一边释放 ,程序可以一边往下走进行其他的操作

2)  unpersist(true) : 同步 ,内存中缓存的RDD数据释放完才能进行下一步的操作

3  程序代码实操

3.1  spark-shell 客户端 (linux虚拟机上)

spark-shell 客户端 :
---第一步 : 指定序列化器(指定序列化方式)
sc.getCof.set("spark.serializer" ,"org apache.spark.serializer.KryoSerializer")---第二步 : 导包 (存储方式/级别的包)
import org.apache.spark.storage.StorageLevel---第三步 : 创建原始的RDD
val rdd1 = sc.texeFile("hdfs://linux04:8020/wc")---第四步 : 指定的存储方式将RDD数据缓存到内存和磁盘中的方法
rdd1.persist(StorageLevel.MEMORY_AND_DISK_SER)---第五步 : 可以对RDD使用算子进行操作了 ,原始数据和操作后的数据会被缓存到指定的存储位置
rdd1.count   --触发action后 ,可以到spark web 界面查看缓存的效果(第二次操作的速度会明显比第一次快很多)

3.2  java 客户端上(二)---颜值降序 ,年龄升序的规则排序

1) 获取 conf 对象 ,指定 spark 的序列化器 ,注册自定义的 bean 使用 Kryo 进行序列化

2) 创建 sc ,根据 sc 创建原始的 RDD ,获取到数据

3) 自定义一个 bean (case class)(如果自定义的bean是 case class 类型 ,也是自动进行序列化接口的 ,但是普通的class bean 是要自己实现序列化接口的 ,这样才能够传入数据)

4) 对数据进行切割处理 ,将需要的字段封装到自定义的 bean 对象 ,字段与bean里面的属性值要一一对应,得到一个指定Boy类型的新的RDD

5) 新的RDD调用sortBy方法 , 根据需求,应用 tuple3 的排序规则 ,ASCLL码表 ,自己可以指定bean对象中的字段(颜值,年龄)进行升序降序排序 ,一个字段或者多个字段都可以,得到最终结果 .

object CunstomSort02 {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]")--指定spark的序列化器 Kryoconf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")--注册自定义的bean使用kryo进行序列化conf.registerKryoClasses(Array(classOf[Boy1]))--创建 sparkcontextval sc = new SparkContext(conf)--使用 sc 创建原始的RDDval list = List("nin,16,99.99", "lin,18,99.99", "auu,15,99.88")val listRDD: RDD[String] = sc.parallelize(list)--对数据进行切割处理 ,然后将数据封装到自定义的 boybean 对象里面val mapRDD: RDD[Boy1] = listRDD.map(it => {val fields = it.split(",")val name = fields(0)val age = fields(1).toIntval faceValue = fields(2).toDoublenew Boy1(name, age, faceValue)})println(mapRDD.collect().toBuffer)  //ArrayBuffer(Boy1(nin,16,99.99), Boy1(lin,18,99.99), Boy1(auu,15,99.88))--sortBy : 只能按照指定元素进行排序,可以是一个元素,也可以是多个元素--颜值一样,按年龄降序排序 ,颜值不一样 ,按颜值升序排序val sortedRDD: RDD[Boy1] = mapRDD.sortBy(it =>(it.faceValue ,-it.age))println(sortedRDD.collect().toBuffer)   //ArrayBuffer(Boy1(auu,15,99.88), Boy1(lin,18,99.99), Boy1(nin,16,99.99))}
}
case class Boy1(name: String, age: Int, faceValue: Double)  --自带序列化方法和toString,get,set,equal等方法

二  Checkpoint

1   Checkpoint 

1.1  Checkpoint 的使用场景 : 适合复杂的计算 [ 机器学习 / 迭代计算 ] , 为了避免丢失数据重复计算 ,可以将宝贵的中间结果保存到 hdfs 中 , 中间结果安全

1)  一般都不是对源数据进行 checkpoint ,比如如果源数据是 1T ,对源数据进行 checkpoint 之后还是 1T ,所以没有意义

2)  一般都是对源数据进行一些过滤 ,进行一些运算或聚合之后得到宝贵的中间结果 ,

1.2  在调用 RDD 的 checkpoint 方法之前 ,一定要指定 Checkpoint 的目录即 sc.setCheckPointDir

1.3  为保证中间结果安全 ,将数据保存到 HDFS 中, 分布式文件系统 ,可以保证数据不丢失

1.4  第一次触发 Action ,才做 Checkpoint ,会额外触发一个 job ,这个 job 的目的就是将结果保存到 HDFS 中

1.5  如果RDD 做了 Checkpoint , 这个 RDD 以前的依赖关系就不再使用了 ,父RDD依赖关系就被剪断了 ,因为使用了 Checkpoint 之后 , spark 系统默认该数据永远不会丢失 ; 所以如果将 hdfs 上缓存的中间结果被删除了 ,再次调用该 RDD 使用中间结果时就会抛出异常 ,就只能从头 / 从源数据重新开始计算 .

分布式系统存储有副本 ,默认数据是不会丢失的 ,除非整个机房爆炸了 ,但是分布式的文件一般都不是存储在一个地方的 ,比如一些重要的数据 ,可能是将文件分布式存储在不同城市的服务器上 ,所以存储到hdfs上的文件默认是永远不会丢失的

1.6  触发多次 Action ,Checkpoint 才有意义 ,多用于迭代计算

1.7  checkpoint 严格来说 ,不是 Transformation ,只是标记当前 RDD 要做 checkpoint

1.8  如果 checkpoint 前 ,对 RDD 进行了 cache ,可以避免数据重复计算 ,如果有 cache 的数据优先使用 cache ,cache 的数据没有再使用 checkpoint

2   程序代码实操 :

spark-shell 客户端 :
---第一步 : 指定序列化器(指定序列化方式)
sc.getCof.set("spark.serializer" ,"org apache.spark.serializer.KryoSerializer")---第二步 : 导包 (存储方式/级别的包)
import org.apache.spark.storage.StorageLevel---第三步 : 创建原始的RDD ,然后对源数据进行过滤 ,再讲过滤的结果进行 checkpoint
val rdd1 = sc.texeFile("hdfs://linux04:8020/wc")
val rdd2 = rdd1.filter(_.contains("spark"))      ---先过滤数据,得到中间结果---第四步 : 先在hdfs上创建一个 checkpoint 目录
sc.SetCheckpointDir("hdfs://linux04:8020/ck")---第五步 : 再将RDD数据持久化到 hdfs 上指定的checkpoint的目录下
rdd2.checkpoint     --第一次checkpoint时 ,目录下是没有数据的 ,这个方法是lazy的---第五步 : 触发action之后 ,过滤和count的这些中间数据会被持久化到hdfs上指定的目录下存储
rdd1.count
--触发了 2 次 action ,第一次action计算结果
--第二次action是为了把中间数据 checkpoint 到 hdfs 上(将中间结果存到靠谱的系统中--分布式)
--分布式系统存储有副本 ,默认数据是不会丢失的 ,除非整个机房爆炸了 ,但是分布式的文件一般都不是存储在一个地方的 ,比如一些重要的数据 ,可能是将文件分布式存储在不同城市的服务器上 ,所以存储到hdfs上的文件默认是永远不会丢失的

三   广播变量

1  通常是为了实现 mapside join ,可以将 Driver 端的数据广播到属于该 application 的 Executor ,然后通过 Driver 广播变量返回的引用可以获取事先广播到 Executor 的数据

2  广播变量是通过 BT 的方式广播的 (TorrentBroadcast) ,多个 Executor 可以相互传递数据 ,可以提高效率

在 Driver 端使用 sc.broadcast 这个方法进行广播 ,并且该方法是阻塞的 (同步的 ,数据不广播完 ,下一步程序就不会执行,保证了广播数据的完整性)

广播变量一旦广播出去就不能改变 ,为了以后可以定期的改变要关联的数据 ,可以定义一个 object (单例对象) ,在函数内使用 ,并且加一个定时器 ,然后定期更新数据 

广播到 executor 的数据 ,可以在 Driver 获取到引用 ,然后这个引用会伴随着每一个 Task 发送到 Executor ,在 Executor 中可以通过这个引用 ,获取到事先广播好的数据

四  广播变量的案例实操分析(java客户端)

1  案例一  :  根据IP地址查找到位置信息(省市)

1) 创建 sparkContext(Driver端) ,根据 sc 创建原始的RDD ,获取到全国的IP规则数据(ip地址的范围对应国内具体省份城市和一些其他的信息)进行切割处理 ,得到ip范围的两个地址对应的十进制和对应的省份城市 ,组装成Touple4的元组 ,然后根据ip起始地址进行升序排序 ,然后收集起来成一份完整的IP规则数据 ,并将Driver端全部/完整的IP规则数据广播到属于该Application的executor :sc.broadcast(IP规则数据),每台executor中的IP规则数据是不完整的,使用的时候需要executor之间互相关联得到完整的IP规则数据

2) 创建 sparkContext ,根据 sc 创建原始的RDD ,获取到用户浏览器浏览的日志数据 ,然后对数据进行切割处理 ,将待查询的ip地址取出来 ,将其转为十进制形式待用(在另外一个Object类中定义一个将ip转为十进制的方法)

3) 关联Executor中事先已经广播号的数据 ,在Executor 中,通过广播变量的引用,可以获取事先已经广播号的数据

4) 将待查询的IP放到二分法中查找(在另外一个Object中定义一个二分法查找方法)这个待查询的IP是在哪个IP规则范围,如果查询到就放回其索引值(index值) ,然后将其放入到IP规则中并获取省份城市信息,将结果收集起来(collect)即可

1.0  数据准备

ip规则数据 : 

1.0.1.0|1.0.3.255|16777472|16778239|亚洲|中国|福建|福州||电信|350100|China|CN|119.306239|26.075302
1.0.8.0|1.0.15.255|16779264|16781311|亚洲|中国|广东|广州||电信|440100|China|CN|113.280637|23.125178
1.0.32.0|1.0.63.255|16785408|16793599|亚洲|中国|广东|广州||电信|440100|China|CN|113.280637|23.125178
1.1.0.0|1.1.0.255|16842752|16843007|亚洲|中国|福建|福州||电信|350100|China|CN|119.306239|26.075302
1.1.2.0|1.1.7.255|16843264|16844799|亚洲|中国|福建|福州||电信|350100|China|CN|119.306239|26.075302
1.1.8.0|1.1.63.255|16844800|16859135|亚洲|中国|广东|广州||电信|440100|China|CN|113.280637|23.125178
1.2.0.0|1.2.1.255|16908288|16908799|亚洲|中国|福建|福州||电信|350100|China|CN|119.306239|26.075302
1.2.2.0|1.2.2.255|16908800|16909055|亚洲|中国|北京|北京|海淀|北龙中网|110108|China|CN|116.29812|39.95931
.....

日志数据 : 

20090121000132095572000|125.213.100.123|show.51.com|/shoplist.php?phpfile=shoplist2.php&style=1&sex=137|Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; Mozilla/4.0(Compatible Mozilla/4.0(Compatible-EmbeddedWB 14.59 http://bsalsa.com/ EmbeddedWB- 14.59  from: http://bsalsa.com/ )|http://show.51.com/main.php|
20090121000132124542000|117.101.215.133|www.jiayuan.com|/19245971|Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; TencentTraveler 4.0)|http://photo.jiayuan.com/index.php?uidhash=d1c3b69e9b8355a5204474c749fb76ef|__tkist=0; myloc=50%7C5008; myage=2009; PROFILE=14469674%3A%E8%8B%A6%E6%B6%A9%E5%92%96%E5%95%A1%3Am%3Aphotos2.love21cn.com%2F45%2F1b
20090121000132406516000|117.101.222.68|gg.xiaonei.com|/view.jsp?p=389|Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; CIBA)|http://home.xiaonei.com/Home.do?id=229670724|_r01_=1; __utma=204579609.31669176.1231940225.1232462740.1232467011.145; __utmz=204579609.1231940225.1.1.utmccn=(direct)
20090121000132581311000|115.120.36.118|tj.tt98.com|/tj.htm|Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; TheWorld)|http://www.tt98.com/|
.....

1.1  将IP地址转为十进制的固定的方法和转为十进制的IP到IP规则数据信息中二分法查找的方法

object IpUtils01 {-- ip地址值转十进制-- 传入的是字符串 ,返回的是 Long类型-- fragments : 碎片def ip2Long(ip:String):Long ={val fragments: Array[String] = ip.split("\\.")var ipNum = 0Lfor(i <- 0 until fragments.length){ipNum = fragments(i).toLong | ipNum << 8L}ipNum}-- 二分法查找-- 参数一 : ip规则数据-- 参数二 : 需要查找地理位置信息的ip-- 返回值是Intdef binarySearch(lines:Array[(Long, Long, String, String)],ip:Long): Int ={var start = 0var end = lines.length -1while(start < end ){val middle = (start + end)/2if((ip >= lines(middle)._1) && (ip <= lines(middle)._2))return middleif(ip < lines(middle)._1)end = middle -1else{start = middle +1}}-1}def binarySearch(lines:ArrayBuffer[(Long, Long, String, String)], ip:Long): Int ={var start = 0var end = 0while(start < end){val middle = (start + end)/2if((ip >= lines(middle)._1) && (ip <= lines(middle)._2))return middleif(ip < lines(middle)._1)end = middle - 1else {start = middle + 1}}-1}
}

1.2   处理IP规则数据和日志数据 ,然后获得待查询ip对应的省份城市信息

object IpLocation01 {def main(args: Array[String]): Unit = {val isLocal = args(0).toBooleanval conf = new SparkConf().setAppName(this.getClass.getSimpleName)if(isLocal){conf.setMaster("local[*]")}val sc = new SparkContext(conf)--先读取IP规则数据val ipLines: RDD[String] = sc.textFile(args(1))--对获取的IP规则数据进行处理--ip规则数据形式 : 1.0.1.0|1.0.3.255|16777472|16778239|亚洲|中国|福建|福州||电信|350100|China|CN|119.306239|26.075302val ipRulesInDriver: Array[(Long, Long, String, String)] = ipLines.map(line => {val fields = line.split("[|]")val startNum = fields(2).toLongval endNum = fields(3).toLongval province = fields(6)val city = fields(7)--组装成元组(startNum, endNum, province, city)--按照startNum(IP起始十进制)进行排序,后续用二分查找是数据必须是有序的--将全部的IP规则收集到Driver端(只要是collect ,就是将数据收集到Driver)}).sortBy(_._1).collect()--然后需要将Driver端全部的IP规则数据广播到属于该Application的executor :broadcast()val broadcastRefInDriver: Broadcast[Array[(Long, Long, String, String)]] = sc.broadcast(ipRulesInDriver)--读取访问日志的数据--日志数据:20090121000132581311000|115.120.36.118|tj.tt98.com|/tj.htm|Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; TheWorld)|http://www.tt98.com/|val accessLines = sc.textFile(args(2))--对读取的访问日志的数据进行处理val reduced: RDD[(String, Int)] = accessLines.map(line => {val fields = line.split("[|]")--得到IP地址val ip = fields(1)--需要向将ip转为十进制val ipNum = IpUtils.ip2Long(ip)--关联Executor中事先已经广播号的数据--在Executor 中,通过广播变量的引用,可以获取事先已经广播号的数据val ipRulesInExecutor: Array[(Long, Long, String, String)] = broadcastRefInDriver.value--使用二分法查找 ,定义一个二分法查找的类IpUtilsval index = IpUtils.binarySearch(ipRulesInExecutor, ipNum)var province = "未知"if (index > -1) {province = ipRulesInExecutor(index)._3}(province, 1)}).reduceByKey(_ + _)println(reduced.collect().toBuffer)--结果为 : ArrayBuffer((陕西,1824), (河北,383), (云南,126), (未知,1), (重庆,868), (北京,1535))--释放资源sc.stop()}
}

2   案例二 :  根据经度纬度查找到位置信息

2.1  (运行环境的准备)依赖关系的准备 ,在 pom.xml 配置文件中导入以下工具包

 <!-- 发送HTTP请求的Java工具包 --><dependency><groupId>org.apache.httpcomponents</groupId><artifactId>httpclient</artifactId><version>4.5.7</version></dependency>

2.2  初步测试---入门程序

import com.alibaba.fastjson.JSON
import org.apache.http.HttpEntity
import org.apache.http.client.methods.{CloseableHttpResponse, HttpGet}
import org.apache.http.impl.client.{CloseableHttpClient, HttpClients}
import org.apache.http.util.EntityUtils---------先写一个小程序测试一下是否可用-------
object HttpClient01 {def main(args: Array[String]): Unit = {--创建一个可用关闭的httpClient(里面有一大堆默认的参数),用完之后就会关闭--相当于创建了一个链接val httpClient: CloseableHttpClient = HttpClients.createDefault--使用REST请求的规范 :查询用GET(HttpGet) ,添加PUT(HttpPut) ,修改POST(HttpPost) ,删除用delete(HttpDelete)val longitude = 116.310003     --经度val latitude: Double = 39.991957       --纬度val httpGet = new HttpGet(s"https://restapi.amap.com/v3/geocode/regeo?&location=$longitude,$latitude&key=4924f7ef5c86a278f5500851541cdcff")--从Http上get(获取)信息 ,返回响应val response: CloseableHttpResponse = httpClient.execute(httpGet)--获得响应体val entity: HttpEntity = response.getEntity--对响应体做一些有用的事情,并确保它被完全消耗--判断响应状态码是否是200--HTTP响应码 :1xx :代表请求已被接受,需要继续处理。这类响应是临时响应,只包含状态行和某些可选的响应头信息,并以空行结束--2xx(200/201/202..) :代表请求已成功被服务器接收、理解、并接受。--3xx(300/301/302..) :代表需要客户端采取进一步的操作才能完成请求。通常,这些状态码用来重定向,后续的请求地址(重定向目标)在本次响应的 Location 域中指明。--4xx(404..) :代表了客户端看起来可能发生了错误,妨碍了服务器的处理--5xx :代表了服务器在处理请求的过程中有错误或者异常状态发生,也有可能是服务器意识到以当前的软硬件资源无法完成对请求的处理。if(response.getStatusLine.getStatusCode == 200){--获取请求的json字符串val result: String = EntityUtils.toString(entity)--转成json对象val jsonObj = JSON.parseObject(result)--获取位置信息val regeocode = jsonObj.getJSONObject("regeocode")if(regeocode != null && !regeocode.isEmpty){val address = regeocode.getJSONObject("addressComponent")println(address)/**  结果如下 :* "businessAreas":[{"name":"万泉河","location":"116.303364,39.976410","id":"110108"},* {"name":"中关村","location":"116.314222,39.982490","id":"110108"},* {"name":"西苑","location":"116.294214,39.996850","id":"110108"}* "country":"中国","province":"北京市",* "citycode":"010","city":[],"adcode":"110108",* "streetNumber":{"number":"5号","distance":"94.5489",* "street":"颐和园路",* "location":"116.310454,39.992734",* "direction":"东北"},* "towncode":"110108015000",* "district":"海淀区",* "neighborhood":{"name":"北京大学","type":"科教文化服务;学校;高等院校"},* "township":"燕园街道",* "building":{"name":"北京大学","type":"科教文化服务;学校;高等院校"**/}}}
}

2.3   实操程序

2.3.0  数据准备

{"cid": 1, "money": 600.0, "longitude":116.397128,"latitude":39.916527,"oid":"o123", }
"oid":"o112", "cid": 3, "money": 200.0, "longitude":118.396128,"latitude":35.916527}
{"oid":"o124", "cid": 2, "money": 200.0, "longitude":117.397128,"latitude":38.916527}
{"oid":"o125", "cid": 3, "money": 100.0, "longitude":118.397128,"latitude":35.916527}
{"oid":"o127", "cid": 1, "money": 100.0, "longitude":116.395128,"latitude":39.916527}
{"oid":"o128", "cid": 2, "money": 200.0, "longitude":117.396128,"latitude":38.916527}
{"oid":"o129", "cid": 3, "money": 300.0, "longitude":115.398128,"latitude":35.916527}
{"oid":"o130", "cid": 2, "money": 100.0, "longitude":116.397128,"latitude":39.916527}
{"oid":"o131", "cid": 1, "money": 100.0, "longitude":117.394128,"latitude":38.916527}
{"oid":"o132", "cid": 3, "money": 200.0, "longitude":118.396128,"latitude":35.916527}

2.3.1   定义一个case class类,解析json数据时使用 ,有多个参数,参数字段与解析的json数据字段信息一致 ,将解析结果封装成一个一个的对象

--与java中的javabean功能一样 ,其自带toString,equal,get,set,序列化等功能
--解析JSON数据是,这样定义的属性字段的顺序不需与解析的字段一致 ,当字段名字要一模一样
case class HttpBean(cid: String,oid: String,money:Double,longitude: Double,latitude: Double,var province: String,   --不指定的话默认是val类型 ,就不可以被赋值了var city: String)

2.3.2   定义一个object类,内含处理数据的计算逻辑 ,用来处理数据,根据经度纬度信息查询到具体的省份和城市

import com.alibaba.fastjson.{JSON, JSONException}
import org.apache.http.client.methods.HttpGet
import org.apache.http.impl.client.{CloseableHttpClient, HttpClients}
import org.apache.http.util.EntityUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.slf4j.LoggerFactoryobject HttpClient02 {private val logger = LoggerFactory.getLogger(this.getClass)def main(args: Array[String]): Unit = {--获取conf对象val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]")--获取sparkContext对象val sc = new SparkContext(conf)--通过sc ,创建原始的RDD,获取文件数据,在main方法参数中传入待处理文件路径val lines: RDD[String] = sc.textFile(args(0))--解析数据 ,将JSON类型的数据解析,然后放到HttpBean对象里面变成一个个对象val beanRDD: RDD[HttpBean] = lines.map(line => {var bean: HttpBean = nulltry {bean = JSON.parseObject(line, classOf[HttpBean])} catch {case e: JSONException => {--捕获到错误的数据/无法解析的脏数据logger.error("parse json error =>" + line)}}bean})--过滤有问题的数据 ,有可能有空字符对象,需要过滤掉 ,剩下的都是正常可以分析的数据val filterd: RDD[HttpBean] = beanRDD.filter(_ != null)--根据经度纬度查询高德地图获取位置信息--关联纬度数据 ,数据一个区一个区的处理 ,从区里面一条一条数据的取出处理,来一条处理一条val result1: RDD[HttpBean] = filterd.mapPartitions(it => {--创建一个可用关闭的httpClient(里面有一大堆默认的参数),用完之后就会关闭 ,相当于创建了一个链接val httpClient: CloseableHttpClient = HttpClients.createDefault--迭代每个分区中的每条数据 ,数据来一条就到高德/百度/腾讯地图API中查找,然后将查到的信息返回it.map(bean => {--使用REST请求的规范 :查询用GET(HttpGet) ,添加PUT(HttpPut) ,修改POST(HttpPost) ,删除用delete(HttpDelete)--需要将每一条数据(即每个对象)的经度和纬度取出来 ,根据这两个字段查询地理位置信息val longitude = bean.longitudeval latitude = bean.latitudeval httpGet = new HttpGet(s"https://restapi.amap.com/v3/geocode/regeo?&location=$longitude,$latitude&key=4924f7ef5c86a278f5500851541cdcff")--从Http上get(获取)信息 ,返回响应 (使用连接对象查询get到的信息 ,然后返回响应)val response = httpClient.execute(httpGet)try {// 可以查看响应状况// System.out.println(response.getStatusLine)--获得响应体val entity = response.getEntity--对响应体做一些有用的事情,并确保它被完全消耗var province: String = nullvar city: String = null--如果响应返回的状态是 200 ,代表请求已成功被服务器接收、理解、并接受if (response.getStatusLine.getStatusCode == 200) {--获取请求的json字符串(将响应体放进这个系统自带的实体帮助类中)val result = EntityUtils.toString(entity)--转为json对象val jsonObj = JSON.parseObject(result)--获取位置信息val regeocode = jsonObj.getJSONObject("regeocode")--如果位置信息不为空字符串并且也不为空if (regeocode != null && !regeocode.isEmpty) {--获取地址信息val address = regeocode.getJSONObject("addressComponent")--根据获取的地址信息 ,获取省份和城市信息,将这些获取的信息都放到bean里面bean.province = address.getString("province")bean.city = address.getString("city")}}} finally {response.close()}--直到每个区(也就是每个迭代器)中的数据都处理完 ,再也拿不到下一条数据 ,就把连接也关闭,释放资源if (!it.hasNext) {httpClient.close()}--将装有查询数据的bean对象返回bean})})--收集结果并打印result1.collect().foreach(println)/*** HttpBean(1,o123,600.0,116.397128,39.916527,北京市,[])* HttpBean(2,o124,200.0,117.397128,38.916527,天津市,[])* HttpBean(3,o125,100.0,118.397128,35.916527,山东省,临沂市)* HttpBean(1,o127,100.0,116.395128,39.916527,北京市,[])* HttpBean(2,o128,200.0,117.396128,38.916527,天津市,[])* HttpBean(3,o129,300.0,115.398128,35.916527,山东省,聊城市)* HttpBean(2,o130,100.0,116.397128,39.916527,北京市,[])* HttpBean(1,o131,100.0,117.394128,38.916527,天津市,[])* HttpBean(3,o132,200.0,118.396128,35.916527,山东省,临沂市)*/--释放资源sc.stop()}
}

Spark之cache ,persist ,checkpoint ,广播变量及其案例 : 根据IP地址(浏览器访问日志获取) / 经度纬度定位地理位置案例(7)相关推荐

  1. Spark的Cache和Checkpoint区别和联系拾遗

    点击上方蓝色字体,选择"设为星标" 回复"资源"获取更多资源 作为区别于 Hadoop 的一个重要 feature,cache 机制保证了需要访问重复数据的应用 ...

  2. Spark编程指引(四)----共享变量(广播变量和累加器)

    转自:http://blog.csdn.net/happyanger6/article/details/46576831 共享变量 通常情况下,当向Spark操作(如map,reduce)传递一个函数 ...

  3. TCP/IP-浅谈单播、广播、组播、IP地址、MAC地址、交换机

    TCP首先必须明白两点: 网络的MAC层提供单播,广播,组播服务,网络是否具备单播,广播,组播能力,由MAC层是否提供单播,广播,组播服务决定 网络的IP层设置单播,广播,组播方式,根据IP地址,包括 ...

  4. 【大数据开发】SparkCore——利用广播变量优化ip地址统计、Spark2.x自定义累加器

    文章目录 一.Broadcast广播变量 1.1 广播变量的逻辑过程 1.2 [优化ip地址统计](https://blog.csdn.net/weixin_37090394/article/deta ...

  5. 【Spark】广播变量和累加器

    文章目录 一.Spark广播变量 二.累加器 Reference 一.Spark广播变量 多进程编程中,不同进程可以通过创建共享内存,进行进程间通信.而在分布式中,Spark通过[广播变量]和[累加器 ...

  6. Spark共享变量(广播变量、累加器)

    Spark两种共享变量:广播变量(broadcast variable)与累加器(accumulator) 累加器用来对信息进行聚合,而广播变量用来高效分发较大的对象. 共享变量出现的原因: 通常在向 ...

  7. Spark 的共享变量之累加器和广播变量

    前言 本期将介绍下 Spark 编程中两种类型的共享变量:累加器和广播变量. 简单说,累加器是用来对信息进行聚合的,而广播变量则是用来高效分发较大对象的. 学习目标 闭包的概念 累加器的原理 广播变量 ...

  8. 【Spark】ip地址查询案例(城市热点图)

    上图是一张很常见的城市热力图,像这样的图是如何绘制的呢? 其实,每个地区都有自己的经纬度和上网ip区段,可以通过解析上网日志中的ip,定位某个地区的客流量. 本篇文章主要介绍,如果通过解析上网日志,查 ...

  9. Spark中cache、persist、checkpoint区别

    spark中的cache.persist.checkpoint都可以将RDD保存起来,进行持久化操作,供后面重用或者容错处理.但是三者有所不同. cache 将数据临时存储在内存中进行数据重用,不够安 ...

最新文章

  1. Nginx反向代理负载均衡
  2. 消除图片在ie中缓存而无法更新的问题
  3. TensorFlow基础11-(小批量梯度下降法和梯度下降法的优化)
  4. “头移植模型”论文称换头术可行 业内疑两大问题未解
  5. jdk与jre的区别(转)
  6. 数字电路中时钟抖动 Jitter 和 偏移 Skew
  7. python入门指南bl-Vue 3 高阶指南之 Map
  8. Cloud Foundry中warden的网络设计实现——iptable规则配置
  9. linux使用grep数字个数,51CTO博客-专业IT技术博客创作平台-技术成就梦想
  10. python和anaconda一定要对应版本安装吗_Anaconda与Python安装版本对应关系 --- 转载
  11. MSSql-SP_who分析数据库性能
  12. 蓝牙解析(part10):BLE ATT/GATT
  13. Eclipse开发C/C++之使用技巧小结,写给新手
  14. Cookies 、 Session 和 token 的区别
  15. HTML5 —— 属性
  16. 【数据结构笔记35】C实现:有序子列的归并算法:递归与非递归的实现
  17. asp使用Jmail发送含任意内嵌附件和附件的Email函数
  18. [golang 易犯错误] golang 局部变量初始化:=的陷阱
  19. 基于javaweb的客户信息管理系统搭建
  20. 帝国 cms 列表 php,帝国cms数据表详细中文说明

热门文章

  1. linux java 反斜杠_每日linux命令学习-引用符号(反斜杠\,单引号'',双引号)...
  2. linux如何正则匹配删除一行,shell sed命令匹配替换删除最后第一行字符正则表
  3. ITIL4知识系列之IT服务连续性管理
  4. Try Windows Live Writer
  5. 数据库的基本操作——建库、删库、建表、删表等
  6. php json_decode解析失败原因及处理
  7. 软件编程和机器人编程区别
  8. 基础差应该怎样学英语
  9. 【图像处理】深入解析LBP算法
  10. 助力字节降本增效,大规模企业级 HTTP 框架 Hertz 设计实践