大数据框架之Hadoop:MapReduce(三)MapReduce框架原理——数据清洗(ETL)
在运行核心业务MapReduce程序之前,往往要先对数据进行清洗,清理掉不符合用户要求的数据。清理的过程往往只需要运行Mapper程序,不需要运行Reduce程序。
3.9.1数据清洗案例实操-简单解析版
1、需求
去除日志中字段长度小于等于11的日志。
(1)输入数据
web.log
(2)期望输出数据
每行字段长度都大于11。
2、需求分析
需要在Map阶段对输入的数据根据规则进行过滤清洗。
3、实现代码
(1)编写LogMapper类
package com.cuiyf41.etl;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class LogMapper extends Mapper<LongWritable, Text, Text, NullWritable> {Text k = new Text();@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {// 1 获取1行数据String line = value.toString();// 2 解析日志boolean result = parseLog(line,context);// 3 日志不合法退出if (!result) {return;}// 4 设置keyk.set(line);// 5 写出数据context.write(k, NullWritable.get());}// 2 解析日志private boolean parseLog(String line, Context context) {// 1 截取String[] fields = line.split(" ");// 2 日志长度大于11的为合法if (fields.length > 11) {// 系统计数器context.getCounter("map", "true").increment(1);return true;}else {context.getCounter("map", "false").increment(1);return false;}}
}
(2)编写LogDriver类
package com.cuiyf41.etl;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class LogDriver {public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {// 输入输出路径需要根据自己电脑上实际的输入输出路径设置args = new String[] { "e:/input/inputlog", "e:/output1" };// 1 获取job信息Configuration conf = new Configuration();Job job = Job.getInstance(conf);// 2 加载jar包job.setJarByClass(LogDriver.class);// 3 关联mapjob.setMapperClass(LogMapper.class);// 4 设置最终输出类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);// 设置reducetask个数为0job.setNumReduceTasks(0);// 5 设置输入和输出路径Path input = new Path(args[0]);Path output = new Path(args[1]);// 如果输出路径存在,则进行删除FileSystem fs = FileSystem.get(conf);if (fs.exists(output)) {fs.delete(output,true);}FileInputFormat.setInputPaths(job, input);FileOutputFormat.setOutputPath(job, output);// 6 提交job.waitForCompletion(true);}
}
3.9.2数据清洗案例实操-复杂解析版
1、需求
对Web访问日志中的各字段识别切分,去除日志中不合法的记录。根据清洗规则,输出过滤后的数据。
(1)输入数据
web.log
(2)期望输出数据
都是合法的数据
2、实现代码
(1)定义一个bean,用来记录日志数据中的各数据字段
package com.cuiyf41.etlu;public class LogBean {private String remote_addr;// 记录客户端的ip地址private String remote_user;// 记录客户端用户名称,忽略属性"-"private String time_local;// 记录访问时间与时区private String request;// 记录请求的url与http协议private String status;// 记录请求状态;成功是200private String body_bytes_sent;// 记录发送给客户端文件主体内容大小private String http_referer;// 用来记录从那个页面链接访问过来的private String http_user_agent;// 记录客户浏览器的相关信息private boolean valid = true;// 判断数据是否合法public String getRemote_addr() {return remote_addr;}public void setRemote_addr(String remote_addr) {this.remote_addr = remote_addr;}public String getRemote_user() {return remote_user;}public void setRemote_user(String remote_user) {this.remote_user = remote_user;}public String getTime_local() {return time_local;}public void setTime_local(String time_local) {this.time_local = time_local;}public String getRequest() {return request;}public void setRequest(String request) {this.request = request;}public String getStatus() {return status;}public void setStatus(String status) {this.status = status;}public String getBody_bytes_sent() {return body_bytes_sent;}public void setBody_bytes_sent(String body_bytes_sent) {this.body_bytes_sent = body_bytes_sent;}public String getHttp_referer() {return http_referer;}public void setHttp_referer(String http_referer) {this.http_referer = http_referer;}public String getHttp_user_agent() {return http_user_agent;}public void setHttp_user_agent(String http_user_agent) {this.http_user_agent = http_user_agent;}public boolean isValid() {return valid;}public void setValid(boolean valid) {this.valid = valid;}@Overridepublic String toString() {StringBuilder sb = new StringBuilder();sb.append(this.valid);sb.append("\001").append(this.remote_addr);sb.append("\001").append(this.remote_user);sb.append("\001").append(this.time_local);sb.append("\001").append(this.request);sb.append("\001").append(this.status);sb.append("\001").append(this.body_bytes_sent);sb.append("\001").append(this.http_referer);sb.append("\001").append(this.http_user_agent);return sb.toString();}
}
(2)编写LogMapper类
package com.cuiyf41.etlu;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class LogMapper extends Mapper<LongWritable, Text, Text, NullWritable> {Text k = new Text();@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {// 1 获取1行String line = value.toString();// 2 解析日志是否合法LogBean bean = parseLog(line);if (!bean.isValid()) {return;}k.set(bean.toString());// 3 输出context.write(k, NullWritable.get());}// 解析日志private LogBean parseLog(String line) {LogBean logBean = new LogBean();// 1 截取String[] fields = line.split(" ");if (fields.length > 11) {// 2封装数据logBean.setRemote_addr(fields[0]);logBean.setRemote_user(fields[1]);logBean.setTime_local(fields[3].substring(1));logBean.setRequest(fields[6]);logBean.setStatus(fields[8]);logBean.setBody_bytes_sent(fields[9]);logBean.setHttp_referer(fields[10]);if (fields.length > 12) {logBean.setHttp_user_agent(fields[11] + " "+ fields[12]);}else {logBean.setHttp_user_agent(fields[11]);}// 大于400,HTTP错误if (Integer.parseInt(logBean.getStatus()) >= 400) {logBean.setValid(false);}}else {logBean.setValid(false);}return logBean;}
}
(3)编写LogDriver类
package com.cuiyf41.etlu;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class LogDriver {public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {// 1 获取job信息Configuration conf = new Configuration();Job job = Job.getInstance(conf);// 2 加载jar包job.setJarByClass(LogDriver.class);// 3 关联mapjob.setMapperClass(LogMapper.class);// 4 设置最终输出类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);// 5 设置输入和输出路径Path input = new Path(args[0]);Path output = new Path(args[1]);// 如果输出路径存在,则进行删除FileSystem fs = FileSystem.get(conf);if (fs.exists(output)) {fs.delete(output,true);}FileInputFormat.setInputPaths(job, input);FileOutputFormat.setOutputPath(job, output);// 6 提交job.waitForCompletion(true);}
}
大数据框架之Hadoop:MapReduce(三)MapReduce框架原理——数据清洗(ETL)相关推荐
- 大数据基础之Hadoop(三)—— MapReduce
作者:duktig 博客:https://duktig.cn (文章首发) 优秀还努力.愿你付出甘之如饴,所得归于欢喜. 本篇文章源码参看:https://github.com/duktig666/b ...
- 大数据框架之Hadoop:入门(一)大数据概论
第1章 大数据概论 1.1大数据概念 大数据(Big Data):指无法在一定时间范围内用常规软件工具进行捕捉.管理和处理的数据集合,是需要新处理模式才能具有更强的决策力.洞察发现力和流程优化能力的海 ...
- 大数据框架Hadoop篇之Hadoop入门
1. 写在前面 今天开始,想开启大数据框架学习的一个新系列,之前在学校的时候就会大数据相关技术很是好奇,但苦于没有实践场景,对这些东西并没有什么体会,到公司之后,我越发觉得大数据的相关知识很重要,不管 ...
- 五种大数据框架你必须要知道
学习大数据不可不知的五种大数据框架,码笔记分享大数据框架Hadoop.Storm.Samza.Spark和Flink五种大数据框架详解: 一:Hadoop大数据框架 Hadoop 大数据框架?第一映入 ...
- 大数据基础之Hadoop(一)—— Hadoop概述
Hadoop系列 大数据基础之Hadoop(一)-- Hadoop概述 大数据基础之Hadoop(二)-- HDFS 大数据基础之Hadoop(三)-- MapReduce 大数据基础之Hadoop( ...
- 大数据技术之Hadoop(MapReduce)
大数据技术之Hadoop(MapReduce) (作者:大数据研发部) 版本:V1.4 第1章MapReduce入门 map 计算 reduce 规约 1.1 MapReduce定义 Mapreduc ...
- 图解大数据 | 分布式平台Hadoop与Map-Reduce详解
作者:韩信子@ShowMeAI 教程地址:https://www.showmeai.tech/tutorials/84 本文地址:https://www.showmeai.tech/article-d ...
- 大数据框架对比:Hadoop、Storm、Samza、Spark和Flink——flink支持SQL,待看
简介 大数据是收集.整理.处理大容量数据集,并从中获得见解所需的非传统战略和技术的总称.虽然处理数据所需的计算能力或存储容量早已超过一台计算机的上限,但这种计算类型的普遍性.规模,以及价值在最近几年才 ...
- 大数据数据库的技术对垒:MapReduce vs. MPP[作者:李明]
大数据数据库的技术对垒:MapReduce vs. MPP --作者:李明(email: mli@pivotal.io) 这些年大数据概念已经成为IT界的热门,我们经常也会在新闻和报纸中看到.大数据概 ...
最新文章
- (step8.2.6)hdu 1848(Fibonacci again and again——组合博弈)
- java Class类与反射
- 资源过于敏澸,8h删!这波福利....请笑纳。。
- 线性代数笔记:Khatri-Rao积
- 机器学习算法与Python实践之(二)支持向量机(SVM)初级
- Python 编程快速上手 第十七章 操作图像
- 机器人学习--pitch yaw roll
- VTK:Filtering之ConnectivityFilter
- cocoa pods的安装与我遇到的问题
- 每日一题(48)—— 中断
- PyQt5, PushButton
- python足球投注_/usr/lib目录属性更改引发的蝴蝶效应
- hbase major_compact 文件会变小吗_图解式学习:可能是最易懂的Hbase架构原理解析(二)...
- MSIL实用指南-struct的生成和操作
- 开源流媒体服务器:为何一定得再撸个新的 | 凌云时刻
- 内网穿透的一种方式——基于ngrok的小米球
- cad2010怎么隐藏标注尺寸,cad2007怎么隐藏标注尺寸
- STM8驱动0.96寸OLED(12864液晶屏)
- 第1回 V模型,我的完整诠释
- Linux系统查看有几块硬盘