Spark - 小实践(6)计算手机在基站停留时间
背景
原文转自作者Allen-Gao的一位博主,使用的api是RDD计算,文章最后附上我的和原博主的代码。
项目说明:附件为要计算数据的demo。附件(https://download.csdn.net/download/u013560925/10425558)
其中bs_log文件夹数据格式为(手机号,时间戳,基站ID,连接状态(“1”为连接,“0”为断开))
lac_info.txt 文件数据格式为(基站ID,经度,纬度,信号辐射类型)
程序思路:
1、先根据"手机号,基站ID"构成一个元祖,做为唯一标识, 和时间戳构成新的数据结构->(手机号, 站点, 时间戳)
2、(手机号,基站ID)作为key,通过reduceByKey算子进行聚合,计算出在基站的停留时间,构成新的数据结构,以便和坐标数据进行joi
n,->(基站ID,(手机号,停留时间))
3、将基站坐标数据信息通过map,构建成数据类型 ->(基站ID,(经度,纬度))
4、将2、3进行join操作,构成新的数据类型 ->(手机号,基站ID,停留时间,经度,纬度)
5、按手机号进行分组。->(手机号,(手机号,基站ID,停留时间,经度,纬度))
6、取出停留时间最长的两个基站ID。
正文
原作者提供了两个数据集,手机在某个基站的状态和基站的位置信息:
原作者思路:先join,后筛选出手机停留基站时间top数据
我的代码实现:先筛选出手机停留基站时间top数据,然后跟基站位置信息join
原作者代码:
- package cn.allengao.Location
- import org.apache.spark.{SparkConf, SparkContext}
- /**
- * class_name:
- * package:
- * describe: 基站信息查询
- * creat_user: Allen Gao
- * creat_date: 2018/1/29
- * creat_time: 10:03
- **/
- /*
- * 说明:
- * 1, 先根据"手机号,基站ID"构成一个元祖,做为唯一标识, 和时间戳构成新的数据结构->(手机号, 站点, 时间戳)
- * 2、(手机号,基站ID)作为key,通过reduceByKey算子进行聚合,计算出在基站的停留时间,构成新的数据结构,
- * 以便和坐标数据进行join,->(基站ID,(手机号,停留时间))
- * 3、将基站坐标数据信息通过map,构建成数据类型 ->(基站ID,(经度,纬度))
- * 4、将2、3进行join操作,构成新的数据类型 ->(手机号,基站ID,停留时间,经度,纬度)
- * 5、按手机号进行分组。->(手机号,(手机号,基站ID,停留时间,经度,纬度))
- * 6、取出停留时间最长的两个基站ID。
- *
- */
- object UserLocation {
- def main(args: Array[String]): Unit = {
- //创建Spark配置信息
- val conf = new SparkConf().setAppName("UserLocation").setMaster("local[*]")
- //建立Spark上下文,并将配置信息导入
- val sc = new SparkContext(conf)
- /*
- 基站连接手机号,连接时间戳,基站站点ID信息,“1”表示连接,“0”表示断开连接。
- 18688888888,20160327082400,16030401EAFB68F1E3CDF819735E1C66,1
- */
- //从log文件拿到数据,并按行采集。
- //sc.textFile("c://information//bs_log").map(_.split(",")).map(x => (x(0), x(1), x(2), x(3)))
- val rdd_Info = sc.textFile("j://information//bs_log").map(line => {
- //通过“,”将数据进行切分field(0)手机号,field(1)时间戳,field(2)基站ID信息,field(3)事件类型
- val fields = line.split(",")
- //事件类型,“1”表示连接,“0”表示断开。
- val eventType = fields(3)
- val time = fields(1)
- //连接基站将时间戳至为“-”,断开基站将时间戳至为“+”,以便后面进行计算。
- val timeLong = if(eventType == "1") -time.toLong else time.toLong
- //构成一个数据类型(手机号,基站ID信息,带符号的时间戳)
- ((fields(0),fields(2)),timeLong)
- })
- val rdd_lacInfo = rdd_Info.reduceByKey(_+_).map(t=>{
- val mobile = t._1._1
- val lac = t._1._2
- val time = t._2
- (lac, (mobile, time))
- })
- val rdd_coordinate = sc.textFile("j://information//lac_info.txt").map(line =>{
- val f = line.split(",")
- //(基站ID, (经度, 纬度))
- (f(0),(f(1), f(2)))
- })
- //rdd1.join(rdd2)-->(CC0710CC94ECC657A8561DE549D940E0,((18688888888,1300),(116.303955,40.041935)))
- val rdd_all = rdd_lacInfo.join(rdd_coordinate).map(t =>{
- val lac = t._1
- val mobile = t._2._1._1
- val time = t._2._1._2
- val x = t._2._2._1
- val y = t._2._2._2
- (mobile, lac, time, x, y)
- })
- //按照手机号进行分组
- val rdd_mobile = rdd_all.groupBy(_._1)
- //取出停留时间最长的前两个基站
- val rdd_topTwo= rdd_mobile.mapValues(it =>{
- it.toList.sortBy(_._3).reverse.take(2)
- })
- // println(rdd_Info.collect().toBuffer)
- // println(rdd_lacInfo.collect().toBuffer)
- // println(rdd_coordinate.collect().toBuffer)
- // println(rdd_all.collect().toBuffer)
- // println(rdd_mobile.collect().toBuffer)
- // println(rdd_topTwo.collect().toBuffer)
- rdd_topTwo.saveAsTextFile("j://information//out")
- sc.stop()
- }
- }
我的代码:
val data = spark.read.textFile("hdfs://master:9000/user/wangqi/logs").rdd.map(x=>x.split(",")).map(line=>{val stayTime = if(line(3).toInt==1) line(1).toLong else -line(1).toLong((line(0),line(2)),stayTime)}).reduceByKey(_+_).groupBy(_._1._2).mapValues(it=>{it.toList.sortBy(_._2).reverse.take(2)}).flatMap{line=>line._2}val top2Rdd = data.map(line=>(line._1._2,(line._1._1,line._2)))val lacRdd = spark.read.textFile("hdfs://master:9000/user/wangqi/lac_info.txt").rdd.map(line=>{val f = line.split(",")(f(0),(f(1),f(2)))})val result = lacRdd.join(top2Rdd).map(line=>(line._2._2._1,line._1,line._2._2._2,line._2._1._1,line._2._1._2))result.saveAsTextFile("hdfs://master:9000/user/wangqi/lacResult")
总结
sortBy(key,false)按照key进行降序排列
groupBy:后是(key,List())后可使用mapValues对List进行操作
核心思想是:寻找所有类别格子的top数据
Spark - 小实践(6)计算手机在基站停留时间相关推荐
- Spark Streaming实践和优化
2019独角兽企业重金招聘Python工程师标准>>> Spark Streaming实践和优化 博客分类: spark 在流式计算领域,Spark Streaming和Storm时 ...
- grafana计算不同时间的差值_大数据时代!如何基于Spark Streaming构建实时计算平台...
随着互联网技术的迅速发展,用户对于数据处理的时效性.准确性与稳定性要求越来越高,如何构建一个稳定易用并提供齐备的监控与预警功能的实时计算平台也成了很多公司一个很大的挑战. 自2015年携程实时计算平台 ...
- 美团点评 Hadoop/Spark 系统实践
系列文章 实时存储引擎和实时计算引擎 美团点评 Hadoop/Spark 系统实践 美团大数据查询技术 美团深度学习平台实践 美团广告系统实践 本文目录 系列文章 一.Hadoop/Spark 定位与 ...
- Spark Streaming 流式计算实战
这篇文章由一次平安夜的微信分享整理而来.在Stuq 做的分享,原文内容. 业务场景 这次分享会比较实战些.具体业务场景描述: 我们每分钟会有几百万条的日志进入系统,我们希望根据日志提取出时间以及用户名 ...
- 怎么切换用户_走进通信:4G手机跟基站是怎么“交流”的
科技进步的直接体现就是越来越多的东西都开始变得简单化.扁平化,能一步到位的事情,绝不拆成两步来做.就拿我们日常生活中最常使用的手机来说,拨号通话.上网等操作,只需要动动手指即可:但是隐藏在手指&quo ...
- 最简容器化动手小实践——再战flappybird
<Flappy Bird>是一名越南开发者所开发的游戏,这款游戏的主要内容是帮助一只小鸟穿越水管的层层阻碍,玩家所需要的只是点击屏幕从而调整小鸟的高度.而令这款游戏与众不同的是,这款游戏的 ...
- GNN理论入门和小实践——从卷积讲起
GNN 1 卷积 1.1 信号与系统的卷积[^1] 1.2 图像上的卷积[^3] 1.3 图(Graph)的卷积 1.3.1信号的傅里叶变换 1.3.2图(Graph)的傅里叶变换 2 图卷积神经网络 ...
- 微信小程序汇率计算界面
微信小程序汇率计算界面 最近在学习微信小程序,自己制作了一个自己想制作的小应用--汇率计算界面(真不知道为什么自己对汇率那么感兴趣XD ) 数据接入 在获取数据之前先在data里面添加几个参数 mon ...
- 小程序占服务器空间吗,小程序会占用手机存储空间吗?| 小程序问答 #21
原标题:小程序会占用手机存储空间吗?| 小程序问答 #21 使用小程序,到底会不会占用手机存储空间呢? 答案是:当然会. 小程序会占多少储存空间? 小程序所占用的手机存储空间,主要有以下两个部分: 首 ...
最新文章
- Android源码下载资料
- 使用 collections 来创建类似元组对象
- 深入理解浏览器解析和执行过程
- C 多线程的互斥锁应用RAII机制
- 2022年中国政企采购数字化转型白皮书
- 百度地图api中文乱码
- uni-app前端解密微信小程序手机号加密数据
- 《windows核心编程》第6章 线程基础
- 通过ICommand和ITool操作地图
- POJ 1392 Ouroboros Snake(数位欧拉)
- Docker学习:容器之间单/双向通信 |--link /自定义网络实现互认容器别名 (理论篇)
- qq快捷登陆 php代码,qq互联--qq快捷登陆
- 知晓云 php,2020 知晓云小程序年度评选获奖名单发布
- Java面试必背八股文[6]:Redis
- pip下载镜像源汇总
- monkeyrunner的使用
- PT2262 单片机解码程序
- 桥接路由器总是掉线_无线路由器桥接完整教程(不会断网)【图文详解】
- PowerBI:关于PBIX,PBIT及PBIDS
- linux so自毁指令,iPhone自毁模式怎么设置 充电爆炸快捷指令设置自毁模式方法
热门文章
- 华为面试题----16进制转换为10进制
- 结合读取 https://kj.sscejia.com/ssq/kaijiang/61.html 开奖号码分析 下一次 数字组合
- 2023年中职网络安全竞赛——CMS网站渗透解析
- 美通社企业新闻汇总 | 2019.2.15 | 星巴克中国首家焙烤美食臻选门店亮相上海;岭南东方酒店品牌将落户沙巴和江门...
- DM学习之路2之DM数据库单机数据库搭建(图形化)
- 开源音乐播放器_使用开源音乐播放器设计生活中的配乐
- 3531交叉编译htop
- 华为ICT大赛2022-2023全国总决赛颁奖典礼暨人才联盟伙伴年会圆满落幕
- ftp传输工具FlashFxp+FilezillaServer的使用小记
- R语言重读微积分(一):极限