Hadoop-MR实现日志清洗(三)

5.论坛请求日志清洗解析

请求日志的清洗主要是指过滤掉跟后续统计无关的数据,包括爬虫数据、静态资源数据、无用数据列等。根据需要,清洗过程中也可以对部门数据域进行数据转换,比如日期,以便简化后续的数据加工/统计分析。

对日志的清洗逻辑上也是分为编写map、reduce、run(main)函数,在对输入数据处理时,日志的提取过滤较为复杂,通常是将文件处理的方法单独编写作为解析类,由map调用相关的方法。

5.1解析日志的各个域

单独编写的解析类,给map函数调用

package com.leeyk99.hadoop;import java.text.ParseException;import java.text.SimpleDateFormat;import java.util.Date;import java.util.Locale;/*** 解析日志的每个数据列:日志的数据域大致可分为:IP 、"-"、"-"、TIME、URL、STATUS、STREAM、?、?等等* @author LIN*/public class FieldParser {public static final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmmss");public static final SimpleDateFormat FORMAT=new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss", Locale.ENGLISH);/*** 解析日志记录* @return 数组,含有五个元素,分别是IP\TIME\URL\STAUS\流量*/public String[] parseLog(String line){String ip=parseIP(line);String time=parseTime(line);String url=parseURL(line);String status=parseStatus(line);String stream=parseStream(line);String[] fields=new String[5];fields[0]=ip;fields[1]=time;fields[2]=url;fields[3]=status;fields[4]=stream;//String[] fields=new String[]{ip,time,url,status,stream};return fields;}private String parseStream(String line) {try{final String trim = line.substring(line.lastIndexOf("\"")+1).trim();String stream = trim.split(" ")[1];return stream;}catch (ArrayIndexOutOfBoundsException e){e.printStackTrace();System.out.println(line);}finally{return null;}}private String parseStatus(String line) {final String trim = line.substring(line.lastIndexOf("\"")+1).trim();String status = trim.split(" ")[0];return status;}private String parseURL(String line) {final String trim = line.split("\"")[1].trim();String url = trim;return url;}private String parseTime(String line) {final String trim = line.split("\"")[0].trim();String time = trim.split(" ")[3].substring(1);Date date=parseDateFormat(time);//原始字符串解析成date才能方便格式化为指定的字符串样式
time=dateFormat.format(date);//转成20180903101923格式return time;}private String parseIP(String line) {final String trim = line.split(" ")[0].trim();String ip = trim;return ip;}/*** 日志时间转换  18/Sep/2013:16:16:16* @author LIN* @param 18/Sep/2013:16:16:16*/private Date parseDateFormat(String time){Date formatTime=new Date();try{formatTime =FORMAT.parse(time);//FORMAT.parse解析String类型返回Date类型,FORMAT.format解析Date类型返回字符串类型
}catch (ParseException e){e.printStackTrace();}return  formatTime;}}

5.2编写map函数

这里也演示了如何对多个字段进行传递输出的方法。

package com.leeyk99.hadoop.mapreduce;import com.leeyk99.hadoop.FieldParser;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class LogMapper extends Mapper<LongWritable,Text,LongWritable,Text> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//super.map(key, value, context);
String line=value.toString();FieldParser fieldParser=new FieldParser();String[] record=fieldParser.parseLog(line);/*数据预处理*///1.过滤指定字符串开头的数据if( record[2].startsWith("GET /uc_server") || record[2].startsWith("GET /static") ){ //测试过滤数据return;}//2.数据域加工,这里是字符串截取if( record[2].startsWith("GET /")){record[2]=record[2].substring("GET /".length()-1);//或者5
}else if(record[2].startsWith("POST /")){record[2]=record[2].substring("POST /".length()-1);}if (record[2].endsWith(" HTTP/1.1")){//System.out.println("1"+record[2]);
record[2]=record[2].substring(0,record[2].length()-" HTTP/1.1".length());//System.out.println("2"+record[2]);
}//3.列裁剪,进一步选取指定的列
Text outPutValue=new Text();outPutValue.set(record[0]+"\001"+record[1]+"\001"+record[2]); //指定了\001分隔符/*map输出,这个输出key使用的是LongWritable,输出的还是行号,没有像往常使用Text(维度)输出是Text,不像我们平时的IntWritable或DoubleWritable,这个不是在reduce中进行与group by类似计算的*/context.write(key,outPutValue);}}

5.3编写reducer函数

package com.leeyk99.hadoop.mapreduce;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class LogReducer extends Reducer<LongWritable, Text, Text, NullWritable> {@Overrideprotected void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {//super.reduce(key, values, context);for(Text value : values){context.write(value, NullWritable.get());}}}

5.4编写入口函数(main函数、run函数)

package com.leeyk99.hadoop.mapreduce;//import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;//import org.apache.hadoop.util.GenericOptionsParser;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;//import java.io.File;import java.net.URI;public class LogParser extends Configured implements Tool {//不能使用小写override//@Override 实现接口的方法不能注释为重写,一直红色波浪线提示不合规,程序运行正常,找了好久这个位置的异常。public int run(String[] args) throws Exception{if(args.length != 2){System.err.printf("Usage: %s [generic options ] <input> <output> \n",getClass().getSimpleName());ToolRunner.printGenericCommandUsage(System.err);return -1;}//getClass() 、getConf()//方法1:Hadoop权威指南写法/*Job job=new Job(getConf(),"Log parser");job.setJarByClass(getClass());*///方法二:main写法,最简单写法
Job job=new Job();job.setJarByClass(getClass());//getClass() 获取类名 LogParser.class
job.setJobName("Log parser");//方法三:Configuration写法,网上写法/*Configuration conf=new Configuration();//String[] otherArgs=new GenericOptionsParser(conf,args).getRemainingArgs();Job job=new Job(conf, "Job_001");//新建一个job对象,并给了job任务名job.setJarByClass(LogParser.class);  //指定class//FileInputFormat.addInputPath(job, new Path(otherArgs[0])); //输入路径//FileOutputFormat.setOutputPath(job,new Path(otherArgs[1])); //输出路径*/FileInputFormat.addInputPath(job,new Path(args[0]));FileOutputFormat.setOutputPath(job,new Path(args[1]));job.setMapperClass(LogMapper.class);job.setMapOutputKeyClass(LongWritable.class); //与Reducer的不一致,需要指定
job.setMapOutputValueClass(Text.class);/*使用这个后,map一直卡在22%不动,因为map的输出是<LongWritable,Text>,如果使用Combiner后,输出与reducer一致<Text, NullWritable>,这种输出是不能作为Reducer的输入的,因为输入要求是<LongWritable,Text>*///job.setCombinerClass(LogReducer.class);
job.setReducerClass(LogReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);//Hdfs 输出目录删除
FileSystem fs= FileSystem.get(new URI(args[0]),getConf());Path outPath=new Path(args[1]);if(fs.exists(outPath)){fs.delete(outPath,true);}return  job.waitForCompletion(true)?0:1;}public static void main(String[] args) throws Exception {int exitCode=ToolRunner.run(new LogParser(),args);System.exit(exitCode);}//使用本地,Hdfs 输出目录应该怎么删除呢/*private static void delDir(String path){File f=new File(path);if(f.exists()){if(f.isDirectory()){String[] items=f.list();for( String item : items ){File f2=new File(path+"/"+item);if(f2.isDirectory()){delDir(path+"/"+item);}else{f2.delete();}}}f.delete(); //删除文件或者最后的空目录}else{System.out.println("Output directory does not exist .");}}*/}

转载于:https://www.cnblogs.com/leeyuki/p/9584217.html

Hadoop-MR实现日志清洗(三)相关推荐

  1. Hadoop实例之利用MapReduce实现日志清洗(附源代码)

    通过hadoop的分布式文件系统与MR完成日常日志文件的数据处理,以求达到数据清洗的目的. 日志数据格式: 27.19.74.143 - - [30/Mar/2015:17:38:20 +0800] ...

  2. 基于Kafka-Zookeeper-Nginx-FIlebeat-MySQL的日志清洗分析平台搭建

    一个域名可以解析成多个ip地址,一般来说会轮询方式去解析成各个ip,但是如果其中一个服务器挂了,DNS不会立马将这个ip地址去掉,还是会解析成挂掉的ip,可能会造成访问失败.虽然客户端有重试,但还是会 ...

  3. Apache hadoop集群安装的三种方式:本地、伪分布、完全分布

    四 Hadoop运行模式 1)官方网址 (1)官方网站: http://hadoop.apache.org/ (2)各个版本归档库地址 https://archive.apache.org/dist/ ...

  4. Hadoop MR 之(一) 编写自己的WordCount

    前言 在前面的内容几章内, 我们主要介绍了HDFS的相关内容. 本章开始, 我们讲解下经常使用的Hadoop MapReduce的相关内容. 有人会觉得, 当前已经到了Spark几乎一统天下的时代, ...

  5. 《Hadoop权威指南》第三章 Hadoop分布式文件系统

    <Hadoop权威指南>第三章 Hadoop分布式文件系统 目录 前言 HDFS的设计 HDFS的概念 命令行接口 Hadoop文件系统 Java接口 数据流 通过distcp并行复制 注 ...

  6. Hadoop之电信日志数据处理(一)------业务简介

    日志数据说明 日志里的某一条数据(以下为一整行数据,以| 为分割符):77个属性--20 533||11|93287887015245963|6||||1|100.82.254.88|100.82.9 ...

  7. HTML5 Canvas 学习日志(三)

    2019独角兽企业重金招聘Python工程师标准>>>  HTML5 Canvas 学习日志(三) Canvas的11种合成 蓝色为destination,粉色为source 1 ...

  8. 【Hadoop】Hadoop MR异常处理

    1.代码示例 package com.ares.hadoop.mr.flowsort;import java.io.IOException;import org.apache.hadoop.conf. ...

  9. Android10.0 日志系统分析(三)-logd、logcat读写日志源码分析-[Android取经之路]

    摘要:本节主要来讲解Android10.0 logd.logcat读写日志源码内容 阅读本文大约需要花费20分钟. 文章首发微信公众号:IngresGe 专注于Android系统级源码分析,Andro ...

最新文章

  1. 【学习笔记】斜率优化
  2. 通过踩坑带你读透虚拟机的“锁粗化”
  3. 娱乐社交,玩票大的!网易云信“2021融合通信开发者大赛”正式收官!
  4. c调用python脚本 效率,尝试用C调用Python脚本#
  5. c语言程序设计期末试卷A,《C语言程序设计》期末试卷(A)..doc
  6. 如何查询linux服务器的网卡,Linux服务器如何查看有没有无线网卡
  7. AFN的简单二次封装
  8. Go语言编程17课:切片,步入数组的窗口(附pdf百度云)
  9. 基于C语言图书馆管理系统编程设计
  10. 计算机系统常见的10个硬件 1故障,计算机十项常见故障
  11. python爬取大众点评字体_python采集大众点评(字体反爬)
  12. java mina 框架 获取字节_浅谈Java的Mina框架传递对象
  13. minus用法c语言,Minus-C 一个最小化的C语言规范
  14. C++类内初始值的初始化形式
  15. 山东微信红包派发量全国第8
  16. 怎样抓CD音轨存为WMA文件
  17. 超详细——入门Github的代码上传
  18. 背包问题之多重背包基础写法
  19. 加州洛杉矶计算机研究生,加州洛杉矶计算机硕士文书要求
  20. H5游戏开发:决胜三分球

热门文章

  1. 我想在杭州买一套房一百平米左右的房子大概多少钱?
  2. 别人都说我老公的字写的不错,大家帮我看看这字行不?
  3. 洛克菲勒的逆商:如何在逆境中转换思维走向成功
  4. 有些店铺340块3T希捷硬盘,有什么猫腻吗?
  5. 家里无线网络每天不定时段出现网速很慢或者直接无连接,这是怎么回事?
  6. 什么叫做数字功放?它的电路原理是什么?
  7. xpath下面的xpath_深入研究XPATH查询
  8. 创建SQL Server索引的好工具
  9. 蒲公英自动更新版本管理以及更新后展示引导图
  10. 剑指offer.删除链表中重复的节点