目录

概述

代码实现

单级模式

分布式模式

方案一:

方案二:

工具类


提示:所有需要的文件我全部放在资源里面了,可以自行下载

概述

需求:根据访问日志的ip地址计算出访问者的归属地,并且按照省份,计算出访问次数,然后将计算好的结果写入到MySQL

解决方案流程:
    1.整理数据,切分出ip字段,然后将ip地址转换成十进制
    2.加载规则,整理规则,取出有用的字段,然后将数据缓存到内存中(Executor中的内存中)
    3.将访问log与ip规则进行匹配(二分法查找)
    4.取出对应的省份名称,然后将其和一组合在一起
    5.按省份名进行聚合
    6.将聚合后的数据写入到MySQL中

代码实现

单级模式

1.ip地址转换成十进制

def ip2Long(ip: String): Long = {val fragments = ip.split("[.]")var ipNum = 0Lfor (i <- 0 until fragments.length){ipNum =  fragments(i).toLong | ipNum << 8L}ipNum
}

2.加载规则

  def readRules(path: String): Array[(Long, Long, String)] = { //传一个路径,返回起始ip地址对应的十进制,结束ip地址对应的十进制,省份val bf: BufferedSource = Source.fromFile(path)   //读取ip规则val lines: Iterator[String] = bf.getLines()  //拿到规则的每一条val rules: Array[(Long, Long, String)] = lines.map(line => {  //对ip规则进行整理,并放入到内存val fileds = line.split("[|]")val startNum = fileds(2).toLong  //返回起始ip地址对应的十进制val endNum = fileds(3).toLong  //结束ip地址对应的十进制val province = fileds(6) // 省份(startNum, endNum, province)}).toArrayrules}

3.二分法查找(折半查找)

 def binarySearch(lines: Array[(Long, Long, String)], ip: Long) : Int = { //传入数组和ip地址对应的十进制,如果查到返回对应角标,没查到返回-1var low = 0var high = lines.length - 1while (low <= high) {val middle = (low + high) / 2if ((ip >= lines(middle)._1) && (ip <= lines(middle)._2))return middleif (ip < lines(middle)._1)high = middle - 1else {low = middle + 1}}-1}

拿到角标后可以在rules中查找对应的数据省份:province = rules(index)._3

分布式模式

方案一:

上面那种是单机模式,

我们需要把所有的方法的提取出来作为工具类,做成分布式面临的问题是规则的放置问题,因为我们所有的Executor端都需要拿到完全相同的规则,但此时规则在Driver端,我们的解决方案是用广播变量把规则广播到Executor端

object IpLoaction1 {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("IpLoaction1").setMaster("local[4]")val sc = new SparkContext(conf)//在Driver端获取到全部的IP规则数据(全部的IP规则数据在某一台机器上,跟Driver在同一台机器上),全部的IP规则在Driver端了(在Driver端的内存中了),参数是规则的路径。val rules: Array[(Long, Long, String)] = MyUtils.readRules(args(0))//将Drive端的数据广播到Executor中,调用sc上的广播方法,广播变量的引用(还在Driver端),还没有广播出去,只是告诉了它要广播这个数据val broadcastRef: Broadcast[Array[(Long, Long, String)]] = sc.broadcast(rules)  val accessLines: RDD[String] = sc.textFile(args(1))  //创建RDD,读取访问日志,因为要在多台机器读取,所有用RDDval func = (line: String) => { //这个函数是在Driver端定义的,而该函数中的代码是在Executor中调用执行的,通过广播变量的引用,就可以拿到当前Executor中的广播的规则了val fields = line.split("[|]")val ip = fields(1)  //下标是从0开始,1是ipval ipNum = MyUtils.ip2Long(ip)  //将ip转换成十进制val rulesInExecutor: Array[(Long, Long, String)] = broadcastRef.value   //通过Driver端的引用获取到Executor中的广播变量var province = "未知"val index = MyUtils.binarySearch(rulesInExecutor, ipNum)  //进行二分法查找,if (index != -1) {province = rulesInExecutor(index)._3}(province, 1)}val proviceAndOne: RDD[(String, Int)] = accessLines.map(func)   //整理数据val reduced: RDD[(String, Int)] = proviceAndOne.reduceByKey(_+_)   //聚合  val sum = (x: Int, y: Int) => x + yval r = reduced.collect()  //将结果打印println(r.toBuffer)sc.stop()}
}

方案二:

以上方案中规则的读取是在本地端,实际开发中我们应该把它放在HDFS上,这样每一个Executor端读到的数据就会不一样,解决方案是:把每个Executor端读到的规则collect到Driver端汇总,然后在广播到每一个Executor端,

第二个问题是最后的结果上一种方法是打印在控制台,我们应该把结果打印的数据库或其他地方。解决方案是用JDBC连接,按照分区进行连接,一个分区用一个连接,可以将一个分区中的多条数据写完在释放jdbc连接,这样更节省资源

object IpLoaction2 {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("IpLoaction2").setMaster("local[4]")val sc = new SparkContext(conf)  val rulesLines:RDD[String] = sc.textFile(args(0)) //取到HDFS中的ip规则,参数是规则在hdfs上的路  val ipRulesRDD: RDD[(Long, Long, String)] = rulesLines.map(line => { //整理ip规则数据val fields = line.split("[|]")val startNum = fields(2).toLongval endNum = fields(3).toLongval province = fields(6)(startNum, endNum, province)})val rulesInDriver: Array[(Long, Long, String)] = ipRulesRDD.collect()  //将分散在多个Executor中的部分IP规则收集到Driver端val broadcastRef: Broadcast[Array[(Long, Long, String)]] = sc.broadcast(rulesInDriver) //将Driver端的数据广播到Executor,广播变量的引用(还在Driver端)val accessLines: RDD[String] = sc.textFile(args(1)) //创建RDD,读取访问日志val proviceAndOne: RDD[(String, Int)] = accessLines.map(log => {  //整理数据val fields = log.split("[|]")  //将log日志的每一行进行切分val ip = fields(1)val ipNum = MyUtils.ip2Long(ip)   //将ip转换成十进制val rulesInExecutor: Array[(Long, Long, String)] = broadcastRef.value  //Driver端广播变量的引用是怎样跑到Executor中的呢?Task是在Driver端生成的,广播变量的引用是伴随着Task被发送到Executor中的var province = "未知"  //省份默认值是“未知”val index = MyUtils.binarySearch(rulesInExecutor, ipNum)if (index != -1) {province = rulesInExecutor(index)._3}(province, 1)})val reduced: RDD[(String, Int)] = proviceAndOne.reduceByKey(_+_)   //聚合reduced.foreachPartition(it => MyUtils.data2MySQL(it))  //将结果收集到mysql端sc.stop()}
}

工具类

object MyUtils {def ip2Long(ip: String): Long = {val fragments = ip.split("[.]")var ipNum = 0Lfor (i <- 0 until fragments.length){ipNum =  fragments(i).toLong | ipNum << 8L}ipNum}def readRules(path: String): Array[(Long, Long, String)] = {//读取ip规则val bf: BufferedSource = Source.fromFile(path)val lines: Iterator[String] = bf.getLines()//对ip规则进行整理,并放入到内存val rules: Array[(Long, Long, String)] = lines.map(line => {val fileds = line.split("[|]")val startNum = fileds(2).toLongval endNum = fileds(3).toLongval province = fileds(6)(startNum, endNum, province)}).toArrayrules}def binarySearch(lines: Array[(Long, Long, String)], ip: Long) : Int = {var low = 0var high = lines.length - 1while (low <= high) {val middle = (low + high) / 2if ((ip >= lines(middle)._1) && (ip <= lines(middle)._2))return middleif (ip < lines(middle)._1)high = middle - 1else {low = middle + 1}}-1}def data2MySQL(it: Iterator[(String, Int)]): Unit = {//一个迭代器代表一个分区,分区中有多条数据//先获得一个JDBC连接val conn: Connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/IP?characterEncoding=UTF-8", "root", "root")//将数据通过Connection写入到数据库val pstm: PreparedStatement = conn.prepareStatement("INSERT INTO access_log VALUES (?, ?)")//将分区中的数据一条一条写入到MySQL中it.foreach(tp => {pstm.setString(1, tp._1)pstm.setInt(2, tp._2)pstm.executeUpdate()})//将分区中的数据全部写完之后,在关闭连接if(pstm != null) {pstm.close()}if (conn != null) {conn.close()}}def main(args: Array[String]): Unit = {//数据是在内存中val rules: Array[(Long, Long, String)] = readRules("Desktop/ip/ip.txt")//将ip地址转换成十进制val ipNum = ip2Long("114.215.43.42")//查找val index = binarySearch(rules, ipNum)//根据脚本到rules中查找对应的数据val tp = rules(index)val province = tp._3println(province)}
}

SparkSQL

2.x版本的SQL方式

object IpLoactionSQL {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName("IpLoactionSQL").master("local[*]").getOrCreate()import spark.implicits._val rulesLines:Dataset[String] = spark.read.textFile(args(0))  //取到HDFS中的ip规则val ruleDataFrame: DataFrame = rulesLines.map(line => {  //整理ip规则数据val fields = line.split("[|]")val startNum = fields(2).toLongval endNum = fields(3).toLongval province = fields(6)(startNum, endNum, province)}).toDF("snum", "enum", "province")val accessLines: Dataset[String] = spark.read.textFile(args(1))  //创建RDD,读取访问日志val ipDataFrame: DataFrame = accessLines.map(log => { //整理log日志数据val fields = log.split("[|]") //将log日志的每一行进行切分val ip = fields(1)  val ipNum = MyUtils.ip2Long(ip)  //将ip转换成十进制ipNum}).toDF("ip_num")ruleDataFrame.createTempView("v_rules")ipDataFrame.createTempView("v_ips")val r = spark.sql("SELECT province, count(*) counts FROM v_ips JOIN v_rules ON (ip_num >= snum AND ip_num <= enum) GROUP BY province ORDER BY counts DESC")r.show()spark.stop()}
}

2.x版本的SDL方式

jon的代价太昂贵,而且非常慢,解决思路是将表缓存起来(广播变量

object IpLoactionSDL {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName("IpLoactionSDL").master("local[*]").getOrCreate()import spark.implicits._val rulesLines:Dataset[String] = spark.read.textFile(args(0))  //取到HDFS中的ip规则val rluesDataset = rulesLines.map(line => {  //整理ip规则数据val fields = line.split("[|]")val startNum = fields(2).toLongval endNum = fields(3).toLongval province = fields(6)(startNum, endNum, province)})val rulesInDriver: Array[(Long, Long, String)] = rluesDataset.collect()  //收集ip规则到Driver端val broadcastRef: Broadcast[Array[(Long, Long, String)]] = spark.sparkContext.broadcast(rulesInDriver)  //广播(必须使用sparkcontext),将广播变量的引用返回到Driver端val accessLines: Dataset[String] = spark.read.textFile(args(1))  //创建RDD,读取访问日志val ipDataFrame: DataFrame = accessLines.map(log => { //整理log日志数据val fields = log.split("[|]") //将log日志的每一行进行切分val ip = fields(1)  val ipNum = MyUtils.ip2Long(ip)  //将ip转换成十进制ipNum}).toDF("ip_num")ipDataFrame.createTempView("v_log")//定义一个自定义函数(UDF),并注册,该函数的功能是(输入一个IP地址对应的十进制,返回一个省份名称)spark.udf.register("ip2Province", (ipNum: Long) => {//查找ip规则(事先已经广播了,已经在Executor中了)//函数的逻辑是在Executor中执行的,怎样获取ip规则的对应的数据呢?//使用广播变量的引用,就可以获得val ipRulesInExecutor: Array[(Long, Long, String)] = broadcastRef.value//根据IP地址对应的十进制查找省份名称val index = MyUtils.binarySearch(ipRulesInExecutor, ipNum)var province = "未知"if(index != -1) {province = ipRulesInExecutor(index)._3}province})//执行SQLval r = spark.sql("SELECT ip2Province(ip_num) province, COUNT(*) counts FROM v_log GROUP BY province ORDER BY counts DESC")r.show()spark.stop()}
}

Spark系列四:Spark的经典入门案列之ip地址归属地查询相关推荐

  1. 【大数据Spark系列】Spark教程:详细全部

    Spark作为Apache顶级的开源项目,是一个快速.通用的大规模数据处理引擎,和Hadoop的MapReduce计算框架类似,但是相对于MapReduce,Spark凭借其可伸缩.基于内存计算等特点 ...

  2. JDBC的入门案列以及JDBC的对事务的管理

    JDBC的概念 Java数据库连接,(Java Database Connectivity,简称JDBC)是Java语言中用来规范客户端程序如何来访问数据库的应用程序接口,提供了诸如查询和更新数据库中 ...

  3. 每日一课 | List和 tuple的13个经典使用案列(完结篇)

    06 大家好,我是营长,上期营长分享了"List和 tuple的13个经典使用案列"的一部分,这期营长接着为大家分享. 本期分享内容:List和 tuple的13个经典使用案列(完 ...

  4. Spark系列之Spark体系架构

    title: Spark系列 第四章 Spark体系架构 4.1 Spark核心功能 Alluxio 原来叫 tachyon 分布式内存文件系统 Spark Core提供Spark最基础的最核心的功能 ...

  5. Spark系列之Spark启动与基础使用

    title: Spark系列 第三章 Spark启动与基础使用 3.1 Spark Shell 3.1.1 Spark Shell启动 安装目录的bin目录下面,启动命令: spark-shell $ ...

  6. Spark系列之Spark应用程序运行机制

    声明:         文章中代码及相关语句为自己根据相应理解编写,文章中出现的相关图片为自己实践中的截图和相关技术对应的图片,若有相关异议,请联系删除.感谢.转载请注明出处,感谢. By luoye ...

  7. Spark系列之Spark在不同集群中的架构

    title: Spark系列 第十二章 Spark在不同集群中的架构 ​ Spark 注重建立良好的生态系统,它不仅支持多种外部文件存储系统,提供了多种多样的集群运行模式.部署在单台机器上时,既可以用 ...

  8. Spark系列之Spark的资源调优

    title: Spark系列 第十一章 Spark的资源调优 11.1 概述 ​ 在开发完Spark作业之后,就该为作业配置合适的资源了.Spark的资源参数,基本都可以在sparksubmit命令中 ...

  9. Spark系列之Spark概述

    title: Spark系列 What is Apache Spark™? Apache Spark™ is a multi-language engine for executing data en ...

最新文章

  1. 史上最强iPhone越狱工具诞生,而且是开源!
  2. python if main_python中if __name__ == '__main__' :main(()
  3. 【用研】细读7张图片,教你玩转用户调研
  4. java_poi教程.pdf,如何使用POI转换.DOC / .DOCX为PDF在Java ..?
  5. Robot Framework: 自定义自己的python库
  6. 番茄花园 Win10 系统 64位 全新纯净版 v2021.01
  7. 【Next Permutation】cpp
  8. MySQL之存储引擎,数据类型,约束条件
  9. 详细图解MySQL(win7x64 5.7.16版本)下载、安装、配置与使用
  10. Unity3D脚本学习1
  11. 第三届长安杯检材一复盘
  12. 基于Key过期失效实现 ‘N分钟内请勿重复提交“ 功能
  13. 地理信息系统实习教程 第26章 地图注记
  14. Ubuntu下将中文目录修改为英文目录
  15. 自己制作一个小程序需要多少钱
  16. 嘟嘟牛mysql 密码_嘟嘟牛管家数据转化为万象、Pubwin的操作步骤
  17. [转]一个人生活,如何摆脱孤独提升幸福感?
  18. 卡券、直充订单列表接口(post 表单提交)
  19. mybatis报“Invalid value for getInt()“错误
  20. Windows PowerShell清除历史命令执行记录

热门文章

  1. CorelDraw常用操作
  2. linux系统苹果刷机,iPhone7刷入Linux系统,成苹果阵营刷机小王子!
  3. Unity3D]Unity3d的GUI排版教程--1(转)
  4. BGP十三条选路原则、路由反射器
  5. 工作中有时候情商比工作技能更重要
  6. HEU大二数电时序逻辑电路设计实验
  7. asp.net的页面传值方式
  8. 互联网很快就将达到承载极限?
  9. Excel多级下拉菜单联动自动匹配内容
  10. Java递归解压缩Zip、7z、7zx、Rar5(使用winrar.exe)可解压这四种类型的嵌套压缩