Spark系列四:Spark的经典入门案列之ip地址归属地查询
目录
概述
代码实现
单级模式
分布式模式
方案一:
方案二:
工具类
提示:所有需要的文件我全部放在资源里面了,可以自行下载
概述
需求:根据访问日志的ip地址计算出访问者的归属地,并且按照省份,计算出访问次数,然后将计算好的结果写入到MySQL
解决方案流程:
1.整理数据,切分出ip字段,然后将ip地址转换成十进制
2.加载规则,整理规则,取出有用的字段,然后将数据缓存到内存中(Executor中的内存中)
3.将访问log与ip规则进行匹配(二分法查找)
4.取出对应的省份名称,然后将其和一组合在一起
5.按省份名进行聚合
6.将聚合后的数据写入到MySQL中
代码实现
单级模式
1.ip地址转换成十进制
def ip2Long(ip: String): Long = {val fragments = ip.split("[.]")var ipNum = 0Lfor (i <- 0 until fragments.length){ipNum = fragments(i).toLong | ipNum << 8L}ipNum
}
2.加载规则
def readRules(path: String): Array[(Long, Long, String)] = { //传一个路径,返回起始ip地址对应的十进制,结束ip地址对应的十进制,省份val bf: BufferedSource = Source.fromFile(path) //读取ip规则val lines: Iterator[String] = bf.getLines() //拿到规则的每一条val rules: Array[(Long, Long, String)] = lines.map(line => { //对ip规则进行整理,并放入到内存val fileds = line.split("[|]")val startNum = fileds(2).toLong //返回起始ip地址对应的十进制val endNum = fileds(3).toLong //结束ip地址对应的十进制val province = fileds(6) // 省份(startNum, endNum, province)}).toArrayrules}
3.二分法查找(折半查找)
def binarySearch(lines: Array[(Long, Long, String)], ip: Long) : Int = { //传入数组和ip地址对应的十进制,如果查到返回对应角标,没查到返回-1var low = 0var high = lines.length - 1while (low <= high) {val middle = (low + high) / 2if ((ip >= lines(middle)._1) && (ip <= lines(middle)._2))return middleif (ip < lines(middle)._1)high = middle - 1else {low = middle + 1}}-1}
拿到角标后可以在rules中查找对应的数据省份:province = rules(index)._3
分布式模式
方案一:
上面那种是单机模式,
我们需要把所有的方法的提取出来作为工具类,做成分布式面临的问题是规则的放置问题,因为我们所有的Executor端都需要拿到完全相同的规则,但此时规则在Driver端,我们的解决方案是用广播变量把规则广播到Executor端
object IpLoaction1 {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("IpLoaction1").setMaster("local[4]")val sc = new SparkContext(conf)//在Driver端获取到全部的IP规则数据(全部的IP规则数据在某一台机器上,跟Driver在同一台机器上),全部的IP规则在Driver端了(在Driver端的内存中了),参数是规则的路径。val rules: Array[(Long, Long, String)] = MyUtils.readRules(args(0))//将Drive端的数据广播到Executor中,调用sc上的广播方法,广播变量的引用(还在Driver端),还没有广播出去,只是告诉了它要广播这个数据val broadcastRef: Broadcast[Array[(Long, Long, String)]] = sc.broadcast(rules) val accessLines: RDD[String] = sc.textFile(args(1)) //创建RDD,读取访问日志,因为要在多台机器读取,所有用RDDval func = (line: String) => { //这个函数是在Driver端定义的,而该函数中的代码是在Executor中调用执行的,通过广播变量的引用,就可以拿到当前Executor中的广播的规则了val fields = line.split("[|]")val ip = fields(1) //下标是从0开始,1是ipval ipNum = MyUtils.ip2Long(ip) //将ip转换成十进制val rulesInExecutor: Array[(Long, Long, String)] = broadcastRef.value //通过Driver端的引用获取到Executor中的广播变量var province = "未知"val index = MyUtils.binarySearch(rulesInExecutor, ipNum) //进行二分法查找,if (index != -1) {province = rulesInExecutor(index)._3}(province, 1)}val proviceAndOne: RDD[(String, Int)] = accessLines.map(func) //整理数据val reduced: RDD[(String, Int)] = proviceAndOne.reduceByKey(_+_) //聚合 val sum = (x: Int, y: Int) => x + yval r = reduced.collect() //将结果打印println(r.toBuffer)sc.stop()}
}
方案二:
以上方案中规则的读取是在本地端,实际开发中我们应该把它放在HDFS上,这样每一个Executor端读到的数据就会不一样,解决方案是:把每个Executor端读到的规则collect到Driver端汇总,然后在广播到每一个Executor端,
第二个问题是最后的结果上一种方法是打印在控制台,我们应该把结果打印的数据库或其他地方。解决方案是用JDBC连接,按照分区进行连接,一个分区用一个连接,可以将一个分区中的多条数据写完在释放jdbc连接,这样更节省资源
object IpLoaction2 {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("IpLoaction2").setMaster("local[4]")val sc = new SparkContext(conf) val rulesLines:RDD[String] = sc.textFile(args(0)) //取到HDFS中的ip规则,参数是规则在hdfs上的路 val ipRulesRDD: RDD[(Long, Long, String)] = rulesLines.map(line => { //整理ip规则数据val fields = line.split("[|]")val startNum = fields(2).toLongval endNum = fields(3).toLongval province = fields(6)(startNum, endNum, province)})val rulesInDriver: Array[(Long, Long, String)] = ipRulesRDD.collect() //将分散在多个Executor中的部分IP规则收集到Driver端val broadcastRef: Broadcast[Array[(Long, Long, String)]] = sc.broadcast(rulesInDriver) //将Driver端的数据广播到Executor,广播变量的引用(还在Driver端)val accessLines: RDD[String] = sc.textFile(args(1)) //创建RDD,读取访问日志val proviceAndOne: RDD[(String, Int)] = accessLines.map(log => { //整理数据val fields = log.split("[|]") //将log日志的每一行进行切分val ip = fields(1)val ipNum = MyUtils.ip2Long(ip) //将ip转换成十进制val rulesInExecutor: Array[(Long, Long, String)] = broadcastRef.value //Driver端广播变量的引用是怎样跑到Executor中的呢?Task是在Driver端生成的,广播变量的引用是伴随着Task被发送到Executor中的var province = "未知" //省份默认值是“未知”val index = MyUtils.binarySearch(rulesInExecutor, ipNum)if (index != -1) {province = rulesInExecutor(index)._3}(province, 1)})val reduced: RDD[(String, Int)] = proviceAndOne.reduceByKey(_+_) //聚合reduced.foreachPartition(it => MyUtils.data2MySQL(it)) //将结果收集到mysql端sc.stop()}
}
工具类
object MyUtils {def ip2Long(ip: String): Long = {val fragments = ip.split("[.]")var ipNum = 0Lfor (i <- 0 until fragments.length){ipNum = fragments(i).toLong | ipNum << 8L}ipNum}def readRules(path: String): Array[(Long, Long, String)] = {//读取ip规则val bf: BufferedSource = Source.fromFile(path)val lines: Iterator[String] = bf.getLines()//对ip规则进行整理,并放入到内存val rules: Array[(Long, Long, String)] = lines.map(line => {val fileds = line.split("[|]")val startNum = fileds(2).toLongval endNum = fileds(3).toLongval province = fileds(6)(startNum, endNum, province)}).toArrayrules}def binarySearch(lines: Array[(Long, Long, String)], ip: Long) : Int = {var low = 0var high = lines.length - 1while (low <= high) {val middle = (low + high) / 2if ((ip >= lines(middle)._1) && (ip <= lines(middle)._2))return middleif (ip < lines(middle)._1)high = middle - 1else {low = middle + 1}}-1}def data2MySQL(it: Iterator[(String, Int)]): Unit = {//一个迭代器代表一个分区,分区中有多条数据//先获得一个JDBC连接val conn: Connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/IP?characterEncoding=UTF-8", "root", "root")//将数据通过Connection写入到数据库val pstm: PreparedStatement = conn.prepareStatement("INSERT INTO access_log VALUES (?, ?)")//将分区中的数据一条一条写入到MySQL中it.foreach(tp => {pstm.setString(1, tp._1)pstm.setInt(2, tp._2)pstm.executeUpdate()})//将分区中的数据全部写完之后,在关闭连接if(pstm != null) {pstm.close()}if (conn != null) {conn.close()}}def main(args: Array[String]): Unit = {//数据是在内存中val rules: Array[(Long, Long, String)] = readRules("Desktop/ip/ip.txt")//将ip地址转换成十进制val ipNum = ip2Long("114.215.43.42")//查找val index = binarySearch(rules, ipNum)//根据脚本到rules中查找对应的数据val tp = rules(index)val province = tp._3println(province)}
}
SparkSQL
2.x版本的SQL方式
object IpLoactionSQL {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName("IpLoactionSQL").master("local[*]").getOrCreate()import spark.implicits._val rulesLines:Dataset[String] = spark.read.textFile(args(0)) //取到HDFS中的ip规则val ruleDataFrame: DataFrame = rulesLines.map(line => { //整理ip规则数据val fields = line.split("[|]")val startNum = fields(2).toLongval endNum = fields(3).toLongval province = fields(6)(startNum, endNum, province)}).toDF("snum", "enum", "province")val accessLines: Dataset[String] = spark.read.textFile(args(1)) //创建RDD,读取访问日志val ipDataFrame: DataFrame = accessLines.map(log => { //整理log日志数据val fields = log.split("[|]") //将log日志的每一行进行切分val ip = fields(1) val ipNum = MyUtils.ip2Long(ip) //将ip转换成十进制ipNum}).toDF("ip_num")ruleDataFrame.createTempView("v_rules")ipDataFrame.createTempView("v_ips")val r = spark.sql("SELECT province, count(*) counts FROM v_ips JOIN v_rules ON (ip_num >= snum AND ip_num <= enum) GROUP BY province ORDER BY counts DESC")r.show()spark.stop()}
}
2.x版本的SDL方式
jon的代价太昂贵,而且非常慢,解决思路是将表缓存起来(广播变量)
object IpLoactionSDL {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName("IpLoactionSDL").master("local[*]").getOrCreate()import spark.implicits._val rulesLines:Dataset[String] = spark.read.textFile(args(0)) //取到HDFS中的ip规则val rluesDataset = rulesLines.map(line => { //整理ip规则数据val fields = line.split("[|]")val startNum = fields(2).toLongval endNum = fields(3).toLongval province = fields(6)(startNum, endNum, province)})val rulesInDriver: Array[(Long, Long, String)] = rluesDataset.collect() //收集ip规则到Driver端val broadcastRef: Broadcast[Array[(Long, Long, String)]] = spark.sparkContext.broadcast(rulesInDriver) //广播(必须使用sparkcontext),将广播变量的引用返回到Driver端val accessLines: Dataset[String] = spark.read.textFile(args(1)) //创建RDD,读取访问日志val ipDataFrame: DataFrame = accessLines.map(log => { //整理log日志数据val fields = log.split("[|]") //将log日志的每一行进行切分val ip = fields(1) val ipNum = MyUtils.ip2Long(ip) //将ip转换成十进制ipNum}).toDF("ip_num")ipDataFrame.createTempView("v_log")//定义一个自定义函数(UDF),并注册,该函数的功能是(输入一个IP地址对应的十进制,返回一个省份名称)spark.udf.register("ip2Province", (ipNum: Long) => {//查找ip规则(事先已经广播了,已经在Executor中了)//函数的逻辑是在Executor中执行的,怎样获取ip规则的对应的数据呢?//使用广播变量的引用,就可以获得val ipRulesInExecutor: Array[(Long, Long, String)] = broadcastRef.value//根据IP地址对应的十进制查找省份名称val index = MyUtils.binarySearch(ipRulesInExecutor, ipNum)var province = "未知"if(index != -1) {province = ipRulesInExecutor(index)._3}province})//执行SQLval r = spark.sql("SELECT ip2Province(ip_num) province, COUNT(*) counts FROM v_log GROUP BY province ORDER BY counts DESC")r.show()spark.stop()}
}
Spark系列四:Spark的经典入门案列之ip地址归属地查询相关推荐
- 【大数据Spark系列】Spark教程:详细全部
Spark作为Apache顶级的开源项目,是一个快速.通用的大规模数据处理引擎,和Hadoop的MapReduce计算框架类似,但是相对于MapReduce,Spark凭借其可伸缩.基于内存计算等特点 ...
- JDBC的入门案列以及JDBC的对事务的管理
JDBC的概念 Java数据库连接,(Java Database Connectivity,简称JDBC)是Java语言中用来规范客户端程序如何来访问数据库的应用程序接口,提供了诸如查询和更新数据库中 ...
- 每日一课 | List和 tuple的13个经典使用案列(完结篇)
06 大家好,我是营长,上期营长分享了"List和 tuple的13个经典使用案列"的一部分,这期营长接着为大家分享. 本期分享内容:List和 tuple的13个经典使用案列(完 ...
- Spark系列之Spark体系架构
title: Spark系列 第四章 Spark体系架构 4.1 Spark核心功能 Alluxio 原来叫 tachyon 分布式内存文件系统 Spark Core提供Spark最基础的最核心的功能 ...
- Spark系列之Spark启动与基础使用
title: Spark系列 第三章 Spark启动与基础使用 3.1 Spark Shell 3.1.1 Spark Shell启动 安装目录的bin目录下面,启动命令: spark-shell $ ...
- Spark系列之Spark应用程序运行机制
声明: 文章中代码及相关语句为自己根据相应理解编写,文章中出现的相关图片为自己实践中的截图和相关技术对应的图片,若有相关异议,请联系删除.感谢.转载请注明出处,感谢. By luoye ...
- Spark系列之Spark在不同集群中的架构
title: Spark系列 第十二章 Spark在不同集群中的架构 Spark 注重建立良好的生态系统,它不仅支持多种外部文件存储系统,提供了多种多样的集群运行模式.部署在单台机器上时,既可以用 ...
- Spark系列之Spark的资源调优
title: Spark系列 第十一章 Spark的资源调优 11.1 概述 在开发完Spark作业之后,就该为作业配置合适的资源了.Spark的资源参数,基本都可以在sparksubmit命令中 ...
- Spark系列之Spark概述
title: Spark系列 What is Apache Spark™? Apache Spark™ is a multi-language engine for executing data en ...
最新文章
- 史上最强iPhone越狱工具诞生,而且是开源!
- python if main_python中if __name__ == '__main__' :main(()
- 【用研】细读7张图片,教你玩转用户调研
- java_poi教程.pdf,如何使用POI转换.DOC / .DOCX为PDF在Java ..?
- Robot Framework: 自定义自己的python库
- 番茄花园 Win10 系统 64位 全新纯净版 v2021.01
- 【Next Permutation】cpp
- MySQL之存储引擎,数据类型,约束条件
- 详细图解MySQL(win7x64 5.7.16版本)下载、安装、配置与使用
- Unity3D脚本学习1
- 第三届长安杯检材一复盘
- 基于Key过期失效实现 ‘N分钟内请勿重复提交“ 功能
- 地理信息系统实习教程 第26章 地图注记
- Ubuntu下将中文目录修改为英文目录
- 自己制作一个小程序需要多少钱
- 嘟嘟牛mysql 密码_嘟嘟牛管家数据转化为万象、Pubwin的操作步骤
- [转]一个人生活,如何摆脱孤独提升幸福感?
- 卡券、直充订单列表接口(post 表单提交)
- mybatis报“Invalid value for getInt()“错误
- Windows PowerShell清除历史命令执行记录