日志数据,下面是一行日志信息
2018-09-04T20:27:31+08:00   http://datacenter.bdqn.cn/logs/user?actionBegin=1536150451617&actionClient=Mozilla%2F5.0+%28Windows+NT+6.1%3B+WOW64%29+AppleWebKit%2F537.36+%28KHTML%2C+like+Gecko%29+Chrome%2F63.0.3239.132+Safari%2F537.36&actionEnd=1536150451705&actionName=viewQuestionAnalysis&actionTest=0&actionType=3&actionValue=272878&clientType=001_bdqn&examType=001&ifEquipment=web&questionId=32415&skillIdCount=0&userSID=EDEC6A9CF8220BE663A22BDD13E428E7.exam-tomcat-node3.exam-tomcat-node3&userUID=272878&userUIP=117.152.82.106  GET    200    192.168.168.63 -  -  Apache-HttpClient/4.1.2 (java 1.5)

package **.**.***.etlimport org.apache.commons.lang.StringUtils
import org.apache.spark.rdd.RDD
//import org.apache.spark.sql.catalyst.util.StringUtils
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}/*** 将日志文件test.log,进行格式化(整理成带表头,带表类型的表)*/
object EtlDemo {def main(args: Array[String]): Unit = {//获取SparkSession对象val spark: SparkSession = SparkSession.builder().master("local[*]").appName("etldemo1").getOrCreate()//获取SparkContext对象val sc: SparkContext = spark.sparkContext//导入spark隐式类import spark.implicits._//读取本地文件获取rddval rdd: RDD[String] = sc.textFile("in/test.log")//按照Tab切割数据val rdd1: RDD[Array[String]] = rdd.map(x=>x.split("\t"))//过滤掉字段数量少于8个的val rdd2: RDD[Array[String]] = rdd1.filter(x=>x.length==8)//将切割后的字段放入Row中,获得一个rddRowval rddRow: RDD[Row] = rdd2.map(x=>Row(x(0),x(1),x(2),x(3),x(4),x(5),x(6),x(7)))//编写Schemaval schema: StructType = StructType(Array(StructField("event_time", StringType),StructField("url", StringType),StructField("method", StringType),StructField("status", StringType),StructField("sip", StringType),StructField("user_uip", StringType),StructField("action_prepend", StringType),StructField("action_client", StringType)))//获取dataframeval logsDF: DataFrame = spark.createDataFrame(rddRow,schema)//按照第一列和第二列对数据进行去重val filter_logs_row: Dataset[Row] = logsDF.dropDuplicates("event_time", "url")//过滤掉状态码非200.filter($"status" === "200")//过滤掉event_time为空的数据.filter(StringUtils.isNotEmpty("event_time").toString)//获取到一个datasetRow :filter_logs_row将url按照"&"以及"="切割val full_logs_row2: RDD[Row] = filter_logs_row.map(row => {//获得url字段的数据val str: String = row.getAs[String]("url")//将数据用[?]分割,获得一个String类型的数组val paramasArray: Array[String] = str.split("\\?")//paramasArray(1)是问号以后的数据,是我们需要拆分的//定义一个Map类型的paramasMap变量,用来接收[=]拆分后的数据var paramasMap: Map[String, String] = null//逻辑判断if (paramasArray.length == 2) {//获得元组val tuples: Array[(String, String)] = paramasArray(1).split('&') //用&分割.map(x => x.split('=')) //用=分割.filter(x => x.length == 2) //逻辑判断.map(x => (x(0), x(1))) //放入元组//将元组放入到Map中paramasMap = tuples.toMap}//返回一个元组(row.getAs[String]("event_time"),row.getAs[String]("method"),row.getAs[String]("status"),row.getAs[String]("sip"),row.getAs[String]("user_uip"),row.getAs[String]("action_prepend"),row.getAs[String]("action_client"),paramasMap.getOrElse[String]("userSID", ""),paramasMap.getOrElse[String]("userUID", ""),paramasMap.getOrElse[String]("actionBegin", ""),paramasMap.getOrElse[String]("actionEnd", ""),paramasMap.getOrElse[String]("actionType", ""),paramasMap.getOrElse[String]("actionName", ""),paramasMap.getOrElse[String]("ifEquipment", ""),paramasMap.getOrElse[String]("actionValue", ""),paramasMap.getOrElse[String]("actionTest", ""))}).toDF().rdd //转df再转rdd,获得一个rdd[Row]类型//创建schemaval full_log_schema: StructType = StructType(Array(StructField("event_time", StringType),StructField("userSID", StringType),StructField("userUID", StringType),StructField("actionBegin", StringType),StructField("actionEnd", StringType),StructField("actionType", StringType),StructField("actionName", StringType),StructField("ifEquipment", StringType),StructField("actionValue", StringType),StructField("actionTest", StringType),StructField("method", StringType),StructField("status", StringType),StructField("sip", StringType),StructField("user_uip", StringType),StructField("action_prepend", StringType),StructField("action_client", StringType)))//创建一个dataframeval full_logs_DF: DataFrame = spark.createDataFrame(full_logs_row2,full_log_schema)//完成,打印结构和数据full_logs_DF.printSchema()full_logs_DF.show()}
}

Spark:日志文件数据清洗相关推荐

  1. CDH下的spark日志文件查询

    如果算法文件的日志过大,通过web页面太慢了,通过文件查询就快的多了. 1.spark的日志文件位置: 因为是用yarn提交的计算任务,所以日志是在yarn的容器中记录的. /yarn/contain ...

  2. Spark RDD/Core 编程 API入门系列之动手实战和调试Spark文件操作、动手实战操作搜狗日志文件、搜狗日志文件深入实战(二)...

    1.动手实战和调试Spark文件操作 这里,我以指定executor-memory参数的方式,启动spark-shell. 启动hadoop集群 spark@SparkSingleNode:/usr/ ...

  3. linux将屏幕输出到文件,Linux命令执行的屏幕输出内容重定向到日志文件

    摘要: 作者:Syn良子 出处:http://www.cnblogs.com/cssdongl 转载请注明出处 快速mark一下这个命令细节,免得以后使用又忘记了 大家都知道可以用echo来输出内容到 ...

  4. hadoop 清理日志文件

    执行下 hadoop fs -du -h /user/spark/applicationHistory hadoop fs -du -h /user/spark 执行 再执行 hadoop fs -d ...

  5. Spark日志,及设置日志输出级别

    Spark日志,及设置日志输出级别 1.全局应用设置 2.局部应用设置日志输出级别 3.Spark log4j.properties配置详解与实例(摘录于铭霏的记事本) 文章内容来源: 作者:大葱拌豆 ...

  6. spark日志存储路径为mysql_利用Spark解析Tomcat日志,并将统计结果存入Mysql数据库...

    本文试图实现的需求场景为:以学习Spark知识点为目的,编写Scala利用Spark解析800M的tomcat日志文件,打印一段时间内ERROR级别记录的前10行,统计每分钟的日志记录数,并将统计结果 ...

  7. linux里的日志文件干啥用的,linux分析日志经常用的命令

    解法1: cat access_log | awk  '{print $1}' | sort | uniq -c | sort -n -r | head -10 解法2: cat access_log ...

  8. 使用logrotate管理nginx日志文件

    本文转载自:http://linux008.blog.51cto.com/2837805/555829 描述:linux日志文件如果不定期清理,会填满整个磁盘.这样会很危险,因此日志管理是系统管理员日 ...

  9. 【Qt】Log4Qt(四):周期性输出日志,并且限制日志文件数量

    在Log4Qt中存在一个比较大的问题,当使用 DailyRollingFileAppender对日志进行输出时,会无限输出文件,也就是说,当系统运行很久时,日志文件有可能很大,大到无法想象.因此,很多 ...

最新文章

  1. Oracle round函数是什么意思?怎么运用?
  2. 波士顿动力送狗抗疫:头顶iPad,背装对讲机,说是减少医患接触,但性价比真的OK吗?...
  3. AWS推出深度学习容器,简化AI程序开发
  4. windows 下conda安装gym
  5. 分布式数据库 HBase
  6. golang http 返回html文件_从零部署安装 Grpc-golang
  7. 安装独立版本的 Adobe Community Help
  8. activiti 作业执行器定时开始事件
  9. 动态设置control的显示与隐藏
  10. 服务机器人语音对话的实现
  11. bldc不同载波频率_三相BLDC弦波驱动器-PT2511
  12. (转自MBA智库百科)弗兰克·吉尔布雷斯
  13. 删除Navicat注册表
  14. 阿里云CTO章文嵩:阿里云强大的数据和计算能力助力企业实现“弯道超车”
  15. 使用R进行描述性统计分析(连续性变量)
  16. 2021-05-24 昨日三省吾身
  17. 解锁ChatGPT超高级玩法,展示动态图片,纯干货分享!
  18. Linux环境中安装zookeeper
  19. java解析宏文件,从命令行或批处理文件运行Excel宏的方法?
  20. apk开发教程!了解Android架构组件后,构建APP超简单!先收藏了

热门文章

  1. 第01章_Java语言概述
  2. 个性设置 CMD命令
  3. 2011,Ready!
  4. 【记录用python做毕业设计-游戏后台管理系统的整体流程】-持续更新中
  5. 全球变暖基础知识小测试程序(Global Warming Facts Quiz)
  6. 数字峰会人气火爆,城链科技引发新一轮商业变革
  7. gg-editor的使用
  8. SOLIDWORKS PDM如何进行库压缩
  9. 华为P9手机的品质还需工匠精神
  10. 全国青少年信息素养大赛2023年python·必做题模拟四卷