Spark:日志文件数据清洗
日志数据,下面是一行日志信息 2018-09-04T20:27:31+08:00 http://datacenter.bdqn.cn/logs/user?actionBegin=1536150451617&actionClient=Mozilla%2F5.0+%28Windows+NT+6.1%3B+WOW64%29+AppleWebKit%2F537.36+%28KHTML%2C+like+Gecko%29+Chrome%2F63.0.3239.132+Safari%2F537.36&actionEnd=1536150451705&actionName=viewQuestionAnalysis&actionTest=0&actionType=3&actionValue=272878&clientType=001_bdqn&examType=001&ifEquipment=web&questionId=32415&skillIdCount=0&userSID=EDEC6A9CF8220BE663A22BDD13E428E7.exam-tomcat-node3.exam-tomcat-node3&userUID=272878&userUIP=117.152.82.106 GET 200 192.168.168.63 - - Apache-HttpClient/4.1.2 (java 1.5)
package **.**.***.etlimport org.apache.commons.lang.StringUtils
import org.apache.spark.rdd.RDD
//import org.apache.spark.sql.catalyst.util.StringUtils
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}/*** 将日志文件test.log,进行格式化(整理成带表头,带表类型的表)*/
object EtlDemo {def main(args: Array[String]): Unit = {//获取SparkSession对象val spark: SparkSession = SparkSession.builder().master("local[*]").appName("etldemo1").getOrCreate()//获取SparkContext对象val sc: SparkContext = spark.sparkContext//导入spark隐式类import spark.implicits._//读取本地文件获取rddval rdd: RDD[String] = sc.textFile("in/test.log")//按照Tab切割数据val rdd1: RDD[Array[String]] = rdd.map(x=>x.split("\t"))//过滤掉字段数量少于8个的val rdd2: RDD[Array[String]] = rdd1.filter(x=>x.length==8)//将切割后的字段放入Row中,获得一个rddRowval rddRow: RDD[Row] = rdd2.map(x=>Row(x(0),x(1),x(2),x(3),x(4),x(5),x(6),x(7)))//编写Schemaval schema: StructType = StructType(Array(StructField("event_time", StringType),StructField("url", StringType),StructField("method", StringType),StructField("status", StringType),StructField("sip", StringType),StructField("user_uip", StringType),StructField("action_prepend", StringType),StructField("action_client", StringType)))//获取dataframeval logsDF: DataFrame = spark.createDataFrame(rddRow,schema)//按照第一列和第二列对数据进行去重val filter_logs_row: Dataset[Row] = logsDF.dropDuplicates("event_time", "url")//过滤掉状态码非200.filter($"status" === "200")//过滤掉event_time为空的数据.filter(StringUtils.isNotEmpty("event_time").toString)//获取到一个datasetRow :filter_logs_row将url按照"&"以及"="切割val full_logs_row2: RDD[Row] = filter_logs_row.map(row => {//获得url字段的数据val str: String = row.getAs[String]("url")//将数据用[?]分割,获得一个String类型的数组val paramasArray: Array[String] = str.split("\\?")//paramasArray(1)是问号以后的数据,是我们需要拆分的//定义一个Map类型的paramasMap变量,用来接收[=]拆分后的数据var paramasMap: Map[String, String] = null//逻辑判断if (paramasArray.length == 2) {//获得元组val tuples: Array[(String, String)] = paramasArray(1).split('&') //用&分割.map(x => x.split('=')) //用=分割.filter(x => x.length == 2) //逻辑判断.map(x => (x(0), x(1))) //放入元组//将元组放入到Map中paramasMap = tuples.toMap}//返回一个元组(row.getAs[String]("event_time"),row.getAs[String]("method"),row.getAs[String]("status"),row.getAs[String]("sip"),row.getAs[String]("user_uip"),row.getAs[String]("action_prepend"),row.getAs[String]("action_client"),paramasMap.getOrElse[String]("userSID", ""),paramasMap.getOrElse[String]("userUID", ""),paramasMap.getOrElse[String]("actionBegin", ""),paramasMap.getOrElse[String]("actionEnd", ""),paramasMap.getOrElse[String]("actionType", ""),paramasMap.getOrElse[String]("actionName", ""),paramasMap.getOrElse[String]("ifEquipment", ""),paramasMap.getOrElse[String]("actionValue", ""),paramasMap.getOrElse[String]("actionTest", ""))}).toDF().rdd //转df再转rdd,获得一个rdd[Row]类型//创建schemaval full_log_schema: StructType = StructType(Array(StructField("event_time", StringType),StructField("userSID", StringType),StructField("userUID", StringType),StructField("actionBegin", StringType),StructField("actionEnd", StringType),StructField("actionType", StringType),StructField("actionName", StringType),StructField("ifEquipment", StringType),StructField("actionValue", StringType),StructField("actionTest", StringType),StructField("method", StringType),StructField("status", StringType),StructField("sip", StringType),StructField("user_uip", StringType),StructField("action_prepend", StringType),StructField("action_client", StringType)))//创建一个dataframeval full_logs_DF: DataFrame = spark.createDataFrame(full_logs_row2,full_log_schema)//完成,打印结构和数据full_logs_DF.printSchema()full_logs_DF.show()}
}
Spark:日志文件数据清洗相关推荐
- CDH下的spark日志文件查询
如果算法文件的日志过大,通过web页面太慢了,通过文件查询就快的多了. 1.spark的日志文件位置: 因为是用yarn提交的计算任务,所以日志是在yarn的容器中记录的. /yarn/contain ...
- Spark RDD/Core 编程 API入门系列之动手实战和调试Spark文件操作、动手实战操作搜狗日志文件、搜狗日志文件深入实战(二)...
1.动手实战和调试Spark文件操作 这里,我以指定executor-memory参数的方式,启动spark-shell. 启动hadoop集群 spark@SparkSingleNode:/usr/ ...
- linux将屏幕输出到文件,Linux命令执行的屏幕输出内容重定向到日志文件
摘要: 作者:Syn良子 出处:http://www.cnblogs.com/cssdongl 转载请注明出处 快速mark一下这个命令细节,免得以后使用又忘记了 大家都知道可以用echo来输出内容到 ...
- hadoop 清理日志文件
执行下 hadoop fs -du -h /user/spark/applicationHistory hadoop fs -du -h /user/spark 执行 再执行 hadoop fs -d ...
- Spark日志,及设置日志输出级别
Spark日志,及设置日志输出级别 1.全局应用设置 2.局部应用设置日志输出级别 3.Spark log4j.properties配置详解与实例(摘录于铭霏的记事本) 文章内容来源: 作者:大葱拌豆 ...
- spark日志存储路径为mysql_利用Spark解析Tomcat日志,并将统计结果存入Mysql数据库...
本文试图实现的需求场景为:以学习Spark知识点为目的,编写Scala利用Spark解析800M的tomcat日志文件,打印一段时间内ERROR级别记录的前10行,统计每分钟的日志记录数,并将统计结果 ...
- linux里的日志文件干啥用的,linux分析日志经常用的命令
解法1: cat access_log | awk '{print $1}' | sort | uniq -c | sort -n -r | head -10 解法2: cat access_log ...
- 使用logrotate管理nginx日志文件
本文转载自:http://linux008.blog.51cto.com/2837805/555829 描述:linux日志文件如果不定期清理,会填满整个磁盘.这样会很危险,因此日志管理是系统管理员日 ...
- 【Qt】Log4Qt(四):周期性输出日志,并且限制日志文件数量
在Log4Qt中存在一个比较大的问题,当使用 DailyRollingFileAppender对日志进行输出时,会无限输出文件,也就是说,当系统运行很久时,日志文件有可能很大,大到无法想象.因此,很多 ...
最新文章
- Oracle round函数是什么意思?怎么运用?
- 波士顿动力送狗抗疫:头顶iPad,背装对讲机,说是减少医患接触,但性价比真的OK吗?...
- AWS推出深度学习容器,简化AI程序开发
- windows 下conda安装gym
- 分布式数据库 HBase
- golang http 返回html文件_从零部署安装 Grpc-golang
- 安装独立版本的 Adobe Community Help
- activiti 作业执行器定时开始事件
- 动态设置control的显示与隐藏
- 服务机器人语音对话的实现
- bldc不同载波频率_三相BLDC弦波驱动器-PT2511
- (转自MBA智库百科)弗兰克·吉尔布雷斯
- 删除Navicat注册表
- 阿里云CTO章文嵩:阿里云强大的数据和计算能力助力企业实现“弯道超车”
- 使用R进行描述性统计分析(连续性变量)
- 2021-05-24 昨日三省吾身
- 解锁ChatGPT超高级玩法,展示动态图片,纯干货分享!
- Linux环境中安装zookeeper
- java解析宏文件,从命令行或批处理文件运行Excel宏的方法?
- apk开发教程!了解Android架构组件后,构建APP超简单!先收藏了