Spark RDD案例(五)经纬度转换为地理位置

1. 背景

  1. Spark作为大数据分析引擎,本身可以做离线和准实时数据处理
  2. Spark抽象出的操作对象如RDD、dataSet、dataFrame、DStream等都是高层级的抽象,屏蔽了分布式数据处理代码细节,操作分布式数据和处理就像使用scala集合接口一样便利。这样可以很大降低编程使用和理解门槛。
  3. 在实际生产中,大数据处理面临的业务需求和正常java 业务需求一样,都是基于数据做处理。不同的是正常java业务数据相对较少,如mysql中适合存储的数据是小而美的如500万行数据及以下,而大数据存储500万行才达到海量数据存储的门槛。
  4. 实际生产中,大数据和小批量Java数据处理需求往往类似,如需要根据已知规则确定手中数据的对应信息,涉及到一个查找过程。在sql中一般是join的方式来将信息匹配映射,但是在spark中一般采用逐条处理并从已知规则中匹配方式
  5. 当所需要查找的规则数据集合较大时,这时候已经无法将规则集合分发到每个executor,这时候一般使用内部或者外部规则集合,采用请求方式,可以使用http请求或者rpc请求。http请求通用性强,rpc性能高,但局限于一种编程语言。
  6. 本案例采用http请求数据,来对数据做逐条转换

2. 案例

  1. 需求
  • 将输入数据中经纬度数据转换为具体的省市信息,并汇总出来
  1. 数据
{"cid": 1, "money": 600.0, "longitude":116.397128,"latitude":39.916527,"oid":"o123", }
"oid":"o112", "cid": 3, "money": 200.0, "longitude":118.396128,"latitude":35.916527}
{"oid":"o124", "cid": 2, "money": 200.0, "longitude":117.397128,"latitude":38.916527}
{"oid":"o125", "cid": 3, "money": 100.0, "longitude":118.397128,"latitude":35.916527}
{"oid":"o127", "cid": 1, "money": 100.0, "longitude":116.395128,"latitude":39.916527}
{"oid":"o128", "cid": 2, "money": 200.0, "longitude":117.396128,"latitude":38.916527}
{"oid":"o129", "cid": 3, "money": 300.0, "longitude":115.398128,"latitude":35.916527}
{"oid":"o130", "cid": 2, "money": 100.0, "longitude":116.397128,"latitude":39.916527}
{"oid":"o131", "cid": 1, "money": 100.0, "longitude":117.394128,"latitude":38.916527}
{"oid":"o132", "cid": 3, "money": 200.0, "longitude":118.396128,"latitude":35.916527}
  1. 环境准备
  • scala 2.12.12
  • jdk 1.8
  • idea 2020
  • maven 3.6.3
  • pom文件
<!-- 定义了一些常量 --><properties><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><scala.version>2.12.10</scala.version><spark.version>3.0.1</spark.version><hbase.version>2.2.5</hbase.version><hadoop.version>3.2.1</hadoop.version><encoding>UTF-8</encoding></properties><dependencies><!-- 导入scala的依赖 --><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version><!-- 编译时会引入依赖,打包是不引入依赖 --><!--            <scope>provided</scope>--></dependency><!-- https://mvnrepository.com/artifact/org.apache.httpcomponents/httpclient --><dependency><groupId>org.apache.httpcomponents</groupId><artifactId>httpclient</artifactId><version>4.5.12</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>${spark.version}</version><!-- 编译时会引入依赖,打包是不引入依赖 --><!--            <scope>provided</scope>--></dependency><!-- https://mvnrepository.com/artifact/com.alibaba/fastjson --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.73</version></dependency><!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.47</version></dependency></dependencies><build><pluginManagement><plugins><!-- 编译scala的插件 --><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.2</version></plugin><!-- 编译java的插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.5.1</version></plugin></plugins></pluginManagement><plugins><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><executions><execution><id>scala-compile-first</id><phase>process-resources</phase><goals><goal>add-source</goal><goal>compile</goal></goals></execution><execution><id>scala-test-compile</id><phase>process-test-resources</phase><goals><goal>testCompile</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><executions><execution><phase>compile</phase><goals><goal>compile</goal></goals></execution></executions></plugin><!-- 打jar插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.4.3</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters></configuration></execution></executions></plugin></plugins></build>
  1. 代码
package com.doitimport com.alibaba.fastjson.{JSON, JSONObject}
import org.apache.http.HttpEntity
import org.apache.http.client.methods.{CloseableHttpResponse, HttpGet}
import org.apache.http.impl.client.{CloseableHttpClient, HttpClients}
import org.apache.http.util.EntityUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.slf4j.{Logger, LoggerFactory}/*
* 读取的文件中包含经纬度信息
* 需要将经纬度转换为省市的信息
*
* 信息格式如下
* {"oid":"o132", "cid": 3, "money": 200.0, "longitude":118.396128,"latitude":35.916527}
* */
object LocationGeoTest {private val logger: Logger = LoggerFactory.getLogger(this.getClass)def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setAppName("LocationGeoTest").setMaster("local[*]")val sc = new SparkContext(conf)val rdd: RDD[String] = sc.textFile("E:\\DOITLearning\\order.log")val parsedRDD: RDD[OrderInfo] = rdd.map(line => {var bean: OrderInfo = nulltry {bean = JSON.parseObject(line, classOf[OrderInfo])} catch {case e: Exception => {logger.error("parse json error => " + line)}}// 解析后返回一个一个的beanbean})// 过滤解析失败的RDDval filteredRDD: RDD[OrderInfo] = parsedRDD.filter(_ != null)val resRDD: RDD[OrderInfo] = filteredRDD.mapPartitions(iter => {// 这里的经纬度使用高德地图开放API进行查询// 提前申请key:  71cc7d9df22483b27ec40ecb45d9d87b// 因为需要对每条数据请求,为了复用对象,使用mapPartitions,这样一个区只需要创建一个对象val httpClient: CloseableHttpClient = HttpClients.createDefault()iter.map(bean => {// 构建请求参数val longitude: Double = bean.longitudeval latitude: Double = bean.latitudeval httpGet = new HttpGet(s"https://restapi.amap.com/v3/geocode/regeo?&location=$longitude,$latitude&key=71cc7d9df22483b27ec40ecb45d9d87b")// 发送请求,获取返回信息val response: CloseableHttpResponse = httpClient.execute(httpGet)try {// 将返回对象中数据提取出来val entity: HttpEntity = response.getEntityif (response.getStatusLine.getStatusCode == 200) {// 将返回对象中数据转换为字符串val resultStr: String = EntityUtils.toString(entity)// 解析返回的json字符串val jSONObject: JSONObject = JSON.parseObject(resultStr)// 根据高德地图反地理编码接口返回数据中字段进行数据解析val regeocode: JSONObject = jSONObject.getJSONObject("regeocode")if (regeocode != null && regeocode.isEmpty == false) {val address: JSONObject = regeocode.getJSONObject("addressComponent")bean.province = address.getString("province")bean.city = address.getString("city")}}} catch {case e: Exception => {}} finally {// 每一次数据请求之后,关闭连接response.close()}// 迭代器没有数据之后,关闭请求if (iter.hasNext == false) {httpClient.close()}bean})})println("resRDD: " + resRDD.collect().toBuffer)sc.stop()}
}// 因为数据形式是json格式,使用json解析,使用case class比较合适,也可以使用java bean
case class OrderInfo(val oid: String,val cid: String,val money: Double,val longitude: Double,val latitude: Double,var province: String,var city: String)

运行结果

resRDD: ArrayBuffer(OrderInfo(o123,1,600.0,116.397128,39.916527,北京市,[]), OrderInfo(o124,2,200.0,117.397128,38.916527,天津市,[]), OrderInfo(o125,3,100.0,118.397128,35.916527,山东省,临沂市), OrderInfo(o127,1,100.0,116.395128,39.916527,北京市,[]), OrderInfo(o128,2,200.0,117.396128,38.916527,天津市,[]), OrderInfo(o129,3,300.0,115.398128,35.916527,山东省,聊城市), OrderInfo(o130,2,100.0,116.397128,39.916527,北京市,[]), OrderInfo(o131,1,100.0,117.394128,38.916527,天津市,[]), OrderInfo(o132,3,200.0,118.396128,35.916527,山东省,临沂市))

这里可以看出,主要就是使用Spark 的RDD进行数据分布式处理,并将经纬度通过高德地图开放API转换为省市信息
这里第一个注意点,是采用json解析数据,注意使用case class,这样不需要关心序列化,setter方法等细节
第二个注意点是使用高德地图开放API,需要先去解析
第三个是采用httpclient简单网络请求同步请求数据,请求之后,关闭每条请求连接。所有请求结束后,关闭请求对象

Spark RDD案例(五)经纬度转换为地理位置相关推荐

  1. Spark基础学习笔记22:Spark RDD案例分析

    文章目录 零.本讲学习目标 一.案例分析:Spark RDD实现单词计数 (一)案例概述 (二)实现步骤 1.新建Maven管理的Spark项目 2.添加Scala和Spark依赖 3.创建WordC ...

  2. java逆地理编码通过经纬度转换为地理位置(通过百度开发API)

    java通过经纬度获取地理位置信息 先看效果图 百度API接口文档地址 (1)首先需要百度开发者账号创建一个应用得到ak (2)java部分实现 @Component @ConfigurationPr ...

  3. 运用阿里云地图实现经纬度转换为省市县

    最近在写的一个项目需要进行一个经纬度转换为地理位置的工具类 然后写在博客里面提供给大家使用 /*** 经纬度转换地址json* @param lat 经度* @param log 纬度* @retur ...

  4. 2021年大数据Spark(十五):Spark Core的RDD常用算子

    目录 常用算子 基本算子 分区操作函数算子 重分区函数算子 1).增加分区函数 2).减少分区函数 3).调整分区函数 ​​​​​​​聚合函数算子 ​​​​​​​Scala集合中的聚合函数 ​​​​​ ...

  5. SPARK 笔记 (五) 经纬度转换地址

    经纬度转换地址 anssin用的是高德地图,实现逆地理位置,首先需要去高德地图开放平台(https://lbs.amap.com/)获取key 我的key就不分享给大家了 逆地理位置用的是http请求 ...

  6. Spark rdd 介绍,和案例介绍

    1.2.创建RDD 1)由一个已经存在的Scala集合创建. val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8)) 2)由外部存储系统的数据集创建,包括本 ...

  7. Spark商业案例与性能调优实战100课》第2课:商业案例之通过RDD实现分析大数据电影点评系统中电影流行度分析

    Spark商业案例与性能调优实战100课>第2课:商业案例之通过RDD实现分析大数据电影点评系统中电影流行度分析 package com.dt.spark.coresimport org.apa ...

  8. Spark商业案例与性能调优实战100课》第3课:商业案例之通过RDD分析大数据电影点评系各种类型的最喜爱电影TopN及性能优化技巧

    Spark商业案例与性能调优实战100课>第3课:商业案例之通过RDD分析大数据电影点评系各种类型的最喜爱电影TopN及性能优化技 源代码 package com.dt.spark.coresi ...

  9. Spark RDD 论文详解(五)实现

    前言 本文隶属于专栏<1000个问题搞定大数据技术体系>,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢! 本专栏目录结构和参考文献请见1000个问题搞定大数据技 ...

最新文章

  1. spark_updateStateByKey
  2. 使用libjpeg进行图片压缩(哈夫曼算法,无损压缩)
  3. Volatile 关键字 内存可见性
  4. 添加第三方类库造成的linker command failed with exit code 1 (use -v to see invocation)的错误调试
  5. git每个项目创建帐户名和密码
  6. 聊聊Elasticsearch的TimedRunnable
  7. [转]毕业5年决定你的命运 --------值得所有不甘平庸的人看看
  8. uilabel 自适应
  9. mysql 使用select建表_mysql create创建表、insert into插入数据、select查询数据实例
  10. Linux文本处理(grep,sed)
  11. 一致性算法 - Distro协议在Nacos的实践
  12. 计算机制作节日贺卡教案,《制作节日贺卡》教学设计.doc
  13. 蒙特卡洛模拟最牛的地方在哪里呢?
  14. 百度应用开放平台简介
  15. [ 网络协议篇 ] IGP 详解之 OSPF 详解(一)--- 基础知识
  16. 第十篇 面向对象的程序设计
  17. 第12课:如何理解技术管理者(上)
  18. 从沟通的一般模型想到互联网,再想到数字媒体,最后想到信息世界
  19. 系列报道 | 组织范式“青色组织”兴起:员工骨子里透出愉悦积极气质
  20. linux服务器中如何解压分卷文件,Linux解压rar文件(unrar安装和使用,分卷解压)...

热门文章

  1. 自己打造原生ChromeOS
  2. Java面向对象编程——抽象类和接口
  3. 什么是SOA(service-oriented architecture)?
  4. 【无标题】C#上位机与三菱PLC FX2NC通讯方法
  5. 直播app开发搭建,两种很简单的网页特效实现
  6. JS实例:网页特效-自动刷新页面
  7. SLAM学习的一些必要网站
  8. VC MFC指定客户区的大小
  9. 二叉树非递归遍历实现(Java)
  10. okhttp配置缓存策略_Okhttp缓存源码分析以及自定义缓存实现