今天学习了一个根据IP统计归属地的小案例,在此记录一下。

在电商网站后台都会记录用户的浏览日志,然后根据这些日志文件就可以做数据分析,比如统计用户的地址,喜好,这样就可以给用户推荐商品了。
那么怎样进行统计呢,首先我们要有一份各个省份的IP规则,然后要有一份日志文件,我们从日志文件中切分出IP字段,然后与IP规则进行对比,就可以匹配到是哪个地区的了。

我们先来写一下这个小案例的需求;

根据访问日志的IP地址计算出访问者的归属地,并且按照省份,计算出访问次数,然后将计算好的结果写入到MySQL中
1.整理数据,切分出IP字段,然后将IP地址转换成十进制
2.加载IP规则,整理规则,取出有用的字段,然后将数据缓存到内存中(Executor中的内存中)
3.将访问log与IP规则进行匹配(二分法查找)
4.取出对应的省份名称,然后将其和1组合在一起
5.按省份进行聚合
6.将聚合后的数据写入到MySQL中

难点:
首先我们要理解spark提交任务的机制以及RDD创建的机制,在此不在过多的阐述,可以查看博客:
RDD详解:https://blog.csdn.net/weixin_43866709/article/details/88623920
RDD之collect方法执行的过程:https://blog.csdn.net/weixin_43866709/article/details/88666080

难点就在于我们更好的使用IP规则这份数据

接下来我们一步一步的实现这个小需求

工具:spark集群,hdfs集群,MySQL,idea

1.加载IP规则,整理规则,取出有用的字段,然后将数据缓存到内存中(Executor中的内存中)

首先我们要将IP规则读取到hdfs中,这样可以保证IP规则这份数据不易丢失

val rulesLines: RDD[String] = sc.textFile(args(0))

然后整理IP规则,只取出有用的数据,比如用于比较的IP范围,还有对应的省份;
但是这里有一个问题,整理IP规则的是Task,是在Executor端执行的,这样每个Executor只是整理了部分的数据,后面得比较也是在Executor端执行的,这样会出现比较的错误。所以我们要将Executor处理完的IP规则收集到Driver端,这时Driver端的IP规则数据就是完整的了,再将Driver端的数据广播到Executor端,这样Executor端的数据就也是完整的了,就可以进行正确的比较了。

//整理ip规则数据//这里是在Executor中执行的,每个Executor只计算部分的IP规则数据val ipRulesRDD: RDD[(Long, Long, String)] = rulesLines.map(line => {val fields = line.split("[|]")val startNum = fields(2).toLongval endNum = fields(3).toLongval province = fields(6)(startNum, endNum, province)})//需要将每个Executor端执行完的数据收集到Driver端val rulesInDriver: Array[(Long, Long, String)] = ipRulesRDD.collect()//再将Driver端的完整的数据广播到Executor端//生成广播数据的引用val broadcastRef: Broadcast[Array[(Long, Long, String)]] = sc.broadcast(rulesInDriver)

2.整理数据,切分出IP字段,然后将IP地址转换成十进制
3.将访问log与IP规则进行匹配(二分法查找)
4.取出对应的省份名称,然后将其和1组合在一起

首先我们先写一个小算法,用于将IP地址转换成十进制数字(这样更加便于比较)

TestIp.scala

//将IP转化为十进制def ip2Long(ip: String): Long = {val fragments = ip.split("[.]")var ipNum = 0Lfor (i <- 0 until fragments.length){ipNum =  fragments(i).toLong | ipNum << 8L}ipNum}

再写一个小算法,用于IP地址的比较,因为IP规则是一个IP字段的范围,也就是说一个范围对应一个省份,要拿日志文件中的IP地址与这个范围进行比较,而且IP规则中的数据是排好序的,所以使用二分法查找会更加快捷:

TestIp.scala

//二分法查找def binarySearch(lines: Array[(Long, Long, String)], ip: Long) : Int = {var low = 0var high = lines.length - 1while (low <= high) {val middle = (low + high) / 2if ((ip >= lines(middle)._1) && (ip <= lines(middle)._2))return middleif (ip < lines(middle)._1)high = middle - 1else {low = middle + 1}}-1}

然后我们开始整理日志文件的数据,取出IP地址,转换成十进制,然后与IP规则进行比较:

//整理日志文件的数据,取出ip,转换成十进制,与IP规则进行比较(采用二分法)val provinceAndOne: RDD[(String, Int)] = accessLines.map(line => {val fields = line.split("[|]")val ip = fields(1)//将ip转换成十进制val ipNum = TestIp.ip2Long(ip)//让Executor通过广播数据的引用拿到广播的数据//Task是在Driver端生成的,广播变量的引用是伴随着Task被发送到Executor端的val rulesInExecutor: Array[(Long, Long, String)] = broadcastRef.value//查找var province = "未知"val index: Int = TestIp.binarySearch(rulesInExecutor, ipNum)if (index != -1) {province = rulesInExecutor(index)._3}(province, 1)})

5.按省份进行聚合

val reduced: RDD[(String, Int)] = provinceAndOne.reduceByKey(_+_)

6.将聚合后的数据写入到MySQL中

我们也提前将写入MySQL的规则写好:

def data2MySQL(it: Iterator[(String, Int)]): Unit = {//一个迭代器代表一个分区,分区中有多条数据//先获得一个JDBC连接val conn: Connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8", "用户", "密码")//将数据通过Connection写入到数据库val pstm: PreparedStatement = conn.prepareStatement("INSERT INTO access_log VALUES (?, ?)")//将分区中的数据一条一条写入到MySQL中it.foreach(tp => {pstm.setString(1, tp._1)pstm.setInt(2, tp._2)pstm.executeUpdate()})//将分区中的数据全部写完之后,在关闭连接if(pstm != null) {pstm.close()}if (conn != null) {conn.close()}}

在这里我们最好使用foreachPartition方法,一次拿出一个分区进行处理,这样一个分区使用一个jdbc连接,会更加节省资源。

reduced.foreachPartition(it => TestIp.data2MySQL(it))

到这里就处理完了,下面是完整的代码:

TestIp.scala

package XXXimport java.sql.{Connection, DriverManager, PreparedStatement}import scala.io.{BufferedSource, Source}object TestIp {//将IP转化为十进制def ip2Long(ip: String): Long = {val fragments = ip.split("[.]")var ipNum = 0Lfor (i <- 0 until fragments.length){ipNum =  fragments(i).toLong | ipNum << 8L}ipNum}//定义读取ip.txt规则,只要有用的数据def readRules(path:String):Array[(Long,Long,String)] = {//读取ip.txtval bf: BufferedSource = Source.fromFile(path)//对ip.txt进行整理val lines: Iterator[String] = bf.getLines()//对ip进行整理,并放入内存val rules: Array[(Long, Long, String)] = lines.map(line => {val fileds = line.split("[|]")val startNum = fileds(2).toLongval endNum = fileds(3).toLongval province = fileds(6)(startNum, endNum, province)}).toArrayrules}//二分法查找def binarySearch(lines: Array[(Long, Long, String)], ip: Long) : Int = {var low = 0var high = lines.length - 1while (low <= high) {val middle = (low + high) / 2if ((ip >= lines(middle)._1) && (ip <= lines(middle)._2))return middleif (ip < lines(middle)._1)high = middle - 1else {low = middle + 1}}-1}def data2MySQL(it: Iterator[(String, Int)]): Unit = {//一个迭代器代表一个分区,分区中有多条数据//先获得一个JDBC连接val conn: Connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8", "用户", "密码")//将数据通过Connection写入到数据库val pstm: PreparedStatement = conn.prepareStatement("INSERT INTO access_log VALUES (?, ?)")//将分区中的数据一条一条写入到MySQL中it.foreach(tp => {pstm.setString(1, tp._1)pstm.setInt(2, tp._2)pstm.executeUpdate()})//将分区中的数据全部写完之后,在关闭连接if(pstm != null) {pstm.close()}if (conn != null) {conn.close()}}def main(args: Array[String]): Unit = {//数据是在内存中val rules: Array[(Long, Long, String)] = readRules("E:/Spark视频/小牛学堂-大数据24期-06-Spark安装部署到高级-10天/spark-04-Spark案例讲解/课件与代码/ip/ip.txt")//将ip地址转换成十进制val ipNum = ip2Long("1.24.6.56")//查找val index = binarySearch(rules,ipNum)//根据脚标到rules中查找对应的数据val tp = rules(index)val province = tp._3println(province)}}

IpLocation.scala

package XXXXimport org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object IpLocation2 {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("IpLocation2").setMaster("local[4]")val sc = new SparkContext(conf)//将ip.txt读取到HDFS中val rulesLines: RDD[String] = sc.textFile(args(0))//整理ip规则数据//这里是在Executor中执行的,每个Executor只计算部分的IP规则数据val ipRulesRDD: RDD[(Long, Long, String)] = rulesLines.map(line => {val fields = line.split("[|]")val startNum = fields(2).toLongval endNum = fields(3).toLongval province = fields(6)(startNum, endNum, province)})//需要将每个Executor端执行完的数据收集到Driver端val rulesInDriver: Array[(Long, Long, String)] = ipRulesRDD.collect()//再将Driver端的完整的数据广播到Executor端//生成广播数据的引用val broadcastRef: Broadcast[Array[(Long, Long, String)]] = sc.broadcast(rulesInDriver)//接下来开始读取访问日志数据val accessLines: RDD[String] = sc.textFile(args(1))//整理日志文件的数据,取出ip,转换成十进制,与IP规则进行比较(采用二分法)val provinceAndOne: RDD[(String, Int)] = accessLines.map(line => {val fields = line.split("[|]")val ip = fields(1)//将ip转换成十进制val ipNum = TestIp.ip2Long(ip)//让Executor通过广播数据的引用拿到广播的数据//Task是在Driver端生成的,广播变量的引用是伴随着Task被发送到Executor端的val rulesInExecutor: Array[(Long, Long, String)] = broadcastRef.value//查找var province = "未知"val index: Int = TestIp.binarySearch(rulesInExecutor, ipNum)if (index != -1) {province = rulesInExecutor(index)._3}(province, 1)})//聚合val reduced: RDD[(String, Int)] = provinceAndOne.reduceByKey(_+_)reduced.foreachPartition(it => TestIp.data2MySQL(it))//释放资源sc.stop()}}//这种方法是通过HDFS读取IP规则(ip.txt),在收集到Driver端,然后再广播到Executor端
//优点:IP规则更加安全,不容易丢失,而且不用和Driver在同一台机器

spark小案例---根据IP计算归属地相关推荐

  1. python简单小案例列表_python计算列表内各元素的个数实例

    python计算列表内各元素的个数实例 如下所示: list = [1,2,3,4,5,6,7,5,4,3,2,12] set = set(list) dict = {} for item in se ...

  2. Flume+Kafka+Spark小案例

  3. Python小案例(六)通过熵权法计算指标权重

    Python小案例(六)通过熵权法计算指标权重 在日常业务中,产品运营需要综合多个指标进行判断,如果没有目标变量进行监督训练的话,很难人为地判断哪个指标更好,综合起来哪个类别更优秀. 这里介绍一种基于 ...

  4. 用python计算个人所得税税率表,Python 小案例 计算个人所得税

    Python 小案例 计算个人所得税 Python 小案例 计算个人所得税 #coding=utf-8 monthMoney=input("请输入月收入:") ds=3500 #扣 ...

  5. 计算机思维相关实例,几个培养计算思维的教学小案例

    原标题:几个培养计算思维的教学小案例 计算思维的核心部分 BBC概述了计算思维的四大基石:分解.模式识别.抽象和算法. 分解:让孩子们将复杂的问题分解成更小.更简单的问题. 模式识别:引导孩子们建立相 ...

  6. python计算银行余额_Python 小案例实战 —— 简易银行存取款查询系统

    Python 小案例实战 -- 简易银行存取款查询系统 涉及知识点 包的调用 字典.列表的混合运用 列表元素索引.追加 基本的循环与分支结构 源码 import sys import time ban ...

  7. 使用idea编写SparkStreaming消费kafka中的数据,并对数据进行某一指标的处理【小案例】(五)

    接    使用idea编写SparkStreaming消费kafka中的数据[小案例](四) https://georgedage.blog.csdn.net/article/details/1035 ...

  8. Spark【案例】实现黑名单实时过滤

    黑名单实时过滤 一.实验介绍 1.1 实验内容 本节课主要讲解 Spark 的 RDD 操作,让您对 Spark 算子的特性快速了解.通过演示案例实时黑名单过滤,让您切身体会到 RDD 的强大功能,然 ...

  9. 从原理到策略算法再到架构产品看推荐系统 | 附Spark实践案例

    原文链接:mp.weixin.qq.com  作者 | HCY崇远 01 前言 本文源自于前阵子连续更新的推荐系统系列,前段时间给朋友整理一个关于推荐系统相关的知识教学体系,刚好自身业务中,预计明年初 ...

最新文章

  1. 那些年让你迷惑的阻塞、非阻塞、异步、同步
  2. maven打包 jar中没有主清单属性
  3. ie下提示SCRIPT1028:缺少标识符、字符串或数字
  4. Triangular Pastures (二维01背包)
  5. 手把手教你用 TensorFlow 实现文本分类(下)
  6. php获取日期中的月份,年份
  7. linux 启动/关闭多个py脚本
  8. 【HDU - 5922】Minimum’s Revenge(思维,最小生成树变形)
  9. Windows Phone 实用开发技巧(18):使用SystemTray显示全局消息提醒
  10. Python关于File学习过程
  11. java登陆session用法_java中session用法
  12. 王益:分布式机器学习的故事
  13. ubunbtu下基于c++实现MQTT客户端通信
  14. C语言程序——标准输出格式
  15. RS-232通信接口
  16. SSM毕设项目职业性格测试系统7c78o(java+VUE+Mybatis+Maven+Mysql)
  17. html两张图片无缝合成一张,怎么用PS把两张图片合成一张 PS无缝拼图边缘如何处理...
  18. 用java计算三角形周长_三角形求周长和面积完整的解决方案
  19. CIO调查:数据挖掘并不遥远
  20. enfuzion与lsf构建渲染集群_集群渲染系统构建及优化-精选文档

热门文章

  1. 今天谁也别想阻止我好好学习!「CDR 6·18特惠倒计时2天!」
  2. 分割字符串的AfxExtractSubString函数
  3. c语言数据结构产生随机数并排序,数据结构产生随机数并排序.doc
  4. Git原理入门及具体使用介绍
  5. 推荐一些常用的中外学术文献数据库网站
  6. 美地方法官裁决:苹果不得强迫开发者使用应用内支付
  7. 双线性变换法设计原型低通为椭圆型的数字IIR高通滤波器
  8. 测控技术与仪器是计算机相关的,有关测控技术与仪器专业
  9. 计算机仿真实训室建设,数控仿真实训室建设
  10. 雷锋网专访路客网合伙人、COO王鑫光