1 需求说明

1.1 清洗过滤

1)去除json数据体中的废弃字段(这是前端开发人员在埋点设计方案变更后遗留的无用字段):

"email"
"phoneNbr"
"birthday"
"isRegistered"
"isLogin"
"addr"
"gender"

2)过滤掉日志中: uid | imei | uuid | mac |androidId | ip全为空的记录!

3)过滤掉日志中缺少关键字段(event/eventid/sessionid 缺任何一个都不行)的记录!

4)过滤掉json格式不正确的(脏数据)!

1.2 数据解析

将json打平: 解析成扁平格式;

注: event字段不用扁平化;转成Map类型存储即可

1.3 数据集成

1)将日志中的GPS经纬度坐标解析成省、市、县(区)信息;(为了方便后续的地域维度分析)

2)集成商圈信息;(为了方便后续的地域维度分析)

1.4 数据修正

1)guid回补

2)字段名称规范化

比如app日志中pgid,wxapp中这个字段叫pageid,和web端日志中的page,统一成pageid

3)字段度量规范化

比如时间戳统一用秒级

4)字段类型规范化

比如时间戳统一用长整型

1.5 保存结果

最后,将数据输出为parquet格式,压缩编码用snappy

2 预处理开发实现

2.1 整体流程

1)json解析,解析成功的返回LogBean对象,解析失败的返回null

(这样一来,json格式不对、不完整的脏数据就被识别出来了)

2)对上一步结果RDD[LogBean]进行过滤(清掉json不完整的脏数据,清掉不符合规则的数据)

3)数据修正(回补uid,统一命名规范、度量单位规范等)

4)对数据进行字典知识集成

5)从集成后的结果中跳出无法解析的gps,写入一个待解析目录

6)输出最终结果保存为parquet(或ORC)文件

3.2 完整代码

case class定义:

case class AppLogBean(var guid:Long,eventid: String,event: Map[String, String],uid: String,imei: String,mac: String,imsi: String,osName: String,osVer: String,androidId: String,resolution: String,deviceType: String,deviceId: String,uuid: String,appid: String,appVer: String,release_ch: String,promotion_ch: String,longtitude: Double,latitude: Double,carrier: String,netType: String,cid_sn: String,ip: String,sessionId: String,timestamp: Long,var province:String="未知",var city:String="未知",var district:String="未知")

预处理流程如下:

package cn.doitedu.dw.preimport java.utilimport ch.hsr.geohash.GeoHash
import cn.doitedu.commons.util.SparkUtil
import cn.doitedu.dw.beans.AppLogBean
import com.alibaba.fastjson.{JSON, JSONObject}
import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.{Dataset, Row}object AppLogDataPreprocess {def main(args: Array[String]): Unit = {// 构造sparksessiongval spark = SparkUtil.getSparkSession(this.getClass.getSimpleName)import spark.implicits._// 加载当日的app埋点日志文件,成为一个dataset[String]val appDs: Dataset[String] = spark.read.textFile("G:yiee_logs2020-01-12app")// 加载geo地域字典数据/***    -----|---------|------|----------|*    geo  |province |  city|  district|*    -----|---------|------|----------|*    39eu |河北省    | 石家庄| 裕华区    | Row*    y67u |河南省    | 郑州市| 金水区    | Row*/val geodf = spark.read.parquet("data/dict/geo_dict/output")val geoMap: collection.Map[String, (String, String, String)] = geodf.rdd.map(row=>{val geo = row.getAs[String]("geo")val province = row.getAs[String]("province")val city = row.getAs[String]("city")val district = row.getAs[String]("district")(geo,(province,city,district))}).collectAsMap()// 广播地域字典// Map{ 39eu -> (河北省,石家庄,裕华区)//      y67u -> (河南省,郑州市,金水区)//    }val bc_geo = spark.sparkContext.broadcast(geoMap)// 加载id映射字典/***    ---------------|------|* biaoshi_hashcode  |  guid|*    ---------------|------|*     8238574359     | 62375|row*    ---------------|------|*     3285943259     | 62375|row*    ---------------|------|*          62375      | 62375|row*  -----------------|------|*/val idmpdf = spark.read.parquet("data/idmp/2020-01-12")val idMap = idmpdf.rdd.map(row=>{val id = row.getAs[Long]("biaoshi_hashcode")val guid = row.getAs[Long]("guid")(id,guid)}).collectAsMap()val bc_id = spark.sparkContext.broadcast(idMap)// 对日志ds集合中的每一条记录(json)进行解析appDs.map(line => {var bean: AppLogBean = nulltry {val jsonobj = JSON.parseObject(line)val eventid = jsonobj.getString("eventid")val timestamp = jsonobj.getString("timestamp").toLongval eventobj: JSONObject = jsonobj.getJSONObject("event")import scala.collection.JavaConversions._val javaMap: util.Map[String, String] = eventobj.getInnerMap.asInstanceOf[util.Map[String, String]]val event: Map[String, String] = javaMap.toMapval userobj = jsonobj.getJSONObject("user")val uid = userobj.getString("uid")val sessionId = userobj.getString("sessionId")val phoneobj = userobj.getJSONObject("phone")val imei = phoneobj.getString("imei")val mac = phoneobj.getString("mac")val imsi = phoneobj.getString("imsi")val osName = phoneobj.getString("osName")val osVer = phoneobj.getString("osVer")val androidId = phoneobj.getString("androidId")val resolution = phoneobj.getString("resolution")val deviceType = phoneobj.getString("deviceType")val deviceId = phoneobj.getString("deviceId")val uuid = phoneobj.getString("uuid")val appobj = jsonobj.getJSONObject("app")val appid = appobj.getString("appid")val appVer = appobj.getString("appVer")val release_ch = appobj.getString("release_ch") // 下载渠道val promotion_ch = appobj.getString("promotion_ch") // 推广渠道val locobj = jsonobj.getJSONObject("loc")var lng = 0.0var lat = -90.0try {lng = locobj.getDouble("longtitude")lat = locobj.getDouble("latitude")} catch {case e: Exception =>}val carrier = locobj.getString("carrier")val netType = locobj.getString("netType")val cid_sn = locobj.getString("cid_sn")val ip = locobj.getString("ip")// 判断数据合法规则val tmp = (imei + imsi + mac + uid + uuid + androidId).replaceAll("null", "")if (StringUtils.isNotBlank(tmp) && event != null && StringUtils.isNotBlank(eventid) && StringUtils.isNotBlank(sessionId)) {// 将提取出来的各个字段,封装到AppLogBean中bean = AppLogBean(Long.MinValue,eventid,event,uid,imei,mac,imsi,osName,osVer,androidId,resolution,deviceType,deviceId,uuid,appid,appVer,release_ch,promotion_ch,lng,lat,carrier,netType,cid_sn,ip,sessionId,timestamp)}} catch {case e: Exception => null}bean}).filter(_ != null).map(bean=>{val geoDict = bc_geo.valueval idmpDict = bc_id.value// 查geo地域字典,填充省市区val lat = bean.latitudeval lng = bean.longtitudeval mygeo = GeoHash.geoHashStringWithCharacterPrecision(lat,lng,5)val maybeTuple: Option[(String, String, String)] = geoDict.get(mygeo)if(maybeTuple.isDefined){val areaNames = maybeTuple.get// 填充省市区bean.province = areaNames._1bean.city = areaNames._2bean.district = areaNames._3}// 查id映射字典,填充guidval ids = Array(bean.imei,bean.imsi,bean.mac,bean.androidId,bean.uuid,bean.uid)val mouId = ids.filter(StringUtils.isNotBlank(_))(0)val maybeLong = idmpDict.get(mouId.hashCode.toLong)if(maybeLong.isDefined){val guid = maybeLong.getbean.guid = guid}bean}).filter(bean=>bean.guid != Long.MinValue).toDF().write.parquet("data/applog_processed/2020-01-12")spark.close()}}

2.3 打包提交线上运行

步骤1:

将代码中写死的路径换成参数形式

步骤2:

在pom中加入打包插件

<build><plugins><!-- 指定编译java的插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.5.1</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><!-- 指定编译scala的插件 --><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.2</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals><configuration><args><arg>-dependencyfile</arg><arg>${project.build.directory}/.scala_dependencies</arg></args></configuration></execution></executions></plugin></plugins>
</build>

步骤3:

在idea的maven侧边栏卡中,选父工程,点击install,对整个工程进行打包和本地库安装

步骤4:

拷贝预处理程序的jar包上传到集群,用命令提交

 bin/spark-submit  --master yarn --deploy-mode cluster --num-executors 3 --executor-memory 1g --executor-cores 1 --class cn.doitedu.titan.dw.pre.AppEventLogPreprocess /root/dw.jar /titan/applog/2019-10-29 /titan/areadict /titan/output/applog/2019-10-29 yarn

清掉数据_学习之大数据项目笔记第七篇【数仓模块-日志预处理篇】相关推荐

  1. 零基础转行大数据怎么学习?大数据学习路线

    大数据要怎么学,本文来说说到底要怎么学习它,以及怎么避免大数据学习的误区,以供参考.数据科学特点与大数据学习误区 (1)大数据学习要业务驱动,不要技术驱动:数据科学的核心能力是解决问题. 大数据的核心 ...

  2. 大数据开发学习,大数据学习路线(完整详细版)

    很多初学者,对大数据的概念都是模糊不清的,大数据是什么,能做什么,学的时候,该按照什么线路去学习,学完往哪方面发展,想深入了解,想学习的同学欢迎加入大数据学习qq群:199427210,有大量干货(零 ...

  3. 大数据怎么学习:大数据学习的关键技术知识体系、学习路径和误区

    由于大数据技术涉及内容太庞杂,大数据应用领域广泛,而且各领域和方向采用的关键技术差异性也会较大,难以三言两语说清楚,本文从数据科学和大数据关键技术体系角度,来说说大数据的核心技术什么,到底要怎么学习它 ...

  4. 什么是云计算和大数据_什么是大数据和云计算

    什么是云计算和大数据 Big data and cloud computing are two sides of the coin. The whole world is on a way to ha ...

  5. python人工智能大数据_人工智能及大数据中的Python

    2016年,Python取代Java成为高校中最受欢迎的语言.2018年三大语言榜单中,Python陆续登上了IEEE.PYPL排行榜单之首.薪酬调查结果显示,Python开发人员是收入最高的开发人员 ...

  6. cxgrid 保存数据_什么是大数据

    大数据是具有海量.高增长率和多样化的信息资产,它需要全新的处理模式来增强决策力.洞察发现力和流程优化能力. Big data is high volume, high velocity, and/or ...

  7. 百度地图迁徙大数据_百度地图大数据:五一高速拥堵不似预期,广深成热门迁出入地...

    五一假期在即,你是否做好了"出行功课"?高速拥堵水平降低.公众出门不出城.公园成踏青赏景热门目的地--在全国疫情防控仍未松懈的时刻,2020年的五一或许注定与往年不同. 近日,百度 ...

  8. 查看某个分区之前所有的数据_腾讯大数据面试真题汇总

    腾讯面试题 学长1 1)笔试部分 (1)有一表名t_sh_mtt_netdisk_log,从表名可以看出该表是什么业务的,是什么周期粒度的表. (2)怎么查看表结构,表创建语句?怎么查看表有哪些分区? ...

  9. 供应链 信用管理 大数据_智慧供应链大数据技术架构方案(ppt)

    随着供应链变得越来越复杂,必须采用更好的工具来迅速高效地发挥数据的最大价值.供应链作为企业的核心网链,将彻底变革企业市场边界.业务组合.商业模式和运作模式等.大数据将用于供应链从需求产生,产品设计到采 ...

最新文章

  1. python读数据-python读取各种文件数据方法解析
  2. 安卓开发日记(1) - 安装 Android 开发环境和 first app
  3. day2编写购物商城(1)
  4. Resource stopwords not found. Please use the NLTK Downloader to obtain the r
  5. partition by 函数
  6. robocopy帮助
  7. 手机usb无法被电脑识别_6种方法解决电脑无法识别移动硬盘
  8. 基于分割和识别的服饰商品的自动推荐
  9. 计算机专业毕业设计中期考核表,研究生中期考核表导师评语
  10. 【python】读取json文件
  11. android Sqlite操作之-- 自定义ORM关系实体映射类
  12. 在Linux系统部署docsify工具小记
  13. Loadrunner12实现手机APP压力测试
  14. setupfactory安装程序设置开机自启动
  15. 服务器共享文件夹Windows无法访问,windows server 2008 R2 无法访问本机共享文件夹
  16. Windows下PuTTY远程连接Linux服务器并上传文件/更改默认端口号
  17. 用友U8案例教程销售管理后台配置
  18. PHP的strtolower()和strtoupper()函数在安装非中文系统的服务器下可能会导致将汉字转换为乱码,请写两个替代的函数实现兼容Unicode文字的字符串大小写转换
  19. LOAM系列——LeGO-LOAM配置、安装、问题解决及VLP16测试效果(完结版)
  20. STL迭代器(iterator)用法详解

热门文章

  1. 记一次.net mvc中 RouteAttribute 不起作用
  2. 深入理解java中的线程池
  3. vue2.0中的watch和计算属性computed
  4. 当前主流的单元测试工具
  5. 在弹窗中新建一个遮罩层
  6. loj 1063(求割点个数)
  7. python中的os.path.realpath(__file__)
  8. 转: ASP.NET2.0_缓存
  9. 备份文件时,添加时间戳
  10. oracle索引大小暴增_Oracle创建索引前估算索引大小(dbms_space.create_index_cost)