在实际MapReduce开发中,会遇到需要数据多输入路径并有对应的Map函数来处理,这需要MultipleInputs.addInputPath(job, path, inputFormatClass, mapperClass)函数。

本文模拟从不同地市中获取数据,并根据按照地市区号输出记录,具体见代码。

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class DPIDigAdsl {public static class DPIDigMapperADSLGZ extends Mapper<Object, Text, Text, Text>{private Text oKey=new Text();public void map(Object key, Text value, Context context)throws IOException, InterruptedException {String strKey=DPIUtil.ADSLMapHandle(value.toString(),"020");//广州if(!strKey.isEmpty()){oKey.set(strKey);context.write(oKey,new Text(""));}}}  public static class DPIDigMapperADSLFS extends Mapper<Object, Text, Text, Text>{private Text oKey=new Text();public void map(Object key, Text value, Context context)throws IOException, InterruptedException {String strKey=DPIUtil.ADSLMapHandle(value.toString(),"0757");//佛山if(!strKey.isEmpty()){oKey.set(strKey);context.write(oKey,new Text(""));}}} public static class DPIDigReducer extends Reducer<Text,Text,Text,Text> {public void reduce(Text key, Iterable<Text> values,Context context) throws IOException, InterruptedException {context.write(key, new Text(""));}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();//设置驱动类Job job = new Job(conf, "DPI dig");job.setJarByClass(DPIDigAdsl.class);//多输入路径对应多map函数String inPathgz="/gz";String inPathfs="/fs";MultipleInputs.addInputPath(job, new Path(inPathgz), TextInputFormat.class, DPIDigMapperADSLGZ.class);MultipleInputs.addInputPath(job, new Path(inPathfs), TextInputFormat.class, DPIDigMapperADSLFS.class);//设置Reduce函数、输出数据类型、输出路径job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);job.setReducerClass(DPIDigReducer.class);job.setNumReduceTasks(1);//设置reduce输入文件一个,方便查看结果String outPath="/tmp/fjs/dpi";outPath=DPIUtil.changeToDir(outPath)+"adsl";FileOutputFormat.setOutputPath(job, new Path(outPath));System.exit(job.waitForCompletion(true) ? 0 : 1);}
}
import java.net.MalformedURLException;
import java.net.URL;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.regex.Matcher;
import java.util.regex.Pattern;public class DPIUtil {//Map处理public static String ADSLMapHandle(String value,String cityCode){String strKey="";//返回//获取字段值String[] strDPIs=value.split("\\|");//获取行,并按照|分隔符提取if(strDPIs.length < 10 ) return strKey;//数据不合规,直接返回String date=DPIUtil.DatetimeToDate(strDPIs[0]);//上网时间STARTDATE提取出YYYYMMDD,如20160430String account=strDPIs[1];//acc_nbr或account       String city=cityCode;//LATN_ID或city提取,如020、0755String url=strDPIs[7];//url提取,含域名和参数及值String domain=DPIUtil.hostFromUrl(strDPIs[7]);//Domain域名提取,如www.jd.com     String cookie=strDPIs[9];//cookie提取,含域名和参数及值//定义正则表达式String[] regExs={".*.;^imei$;^\\d{15}$",".*.; ^meid$;^\\d{14}$|^\\d{16}$",".*.; ^imsi$;.*.",".*.; ^biz$;.*."};//匹配正则表达式for(String regEx:regExs){String regExDomain=regEx.split(";")[0];//域名正则表达式Pattern patDomain=Pattern.compile(regExDomain);if (domain==null) domain="";Matcher matDomain = patDomain.matcher(domain);if(matDomain.find()){//域名匹配String regExPara=regEx.split(";")[1];//参数正则表达式Pattern patPara=Pattern.compile(regExPara);String regExParaVal=regEx.split(";")[2];//参数值正则表达式Pattern patParaVal=Pattern.compile(regExParaVal);//解析URL和cookie,提取参数和值Pattern patSplit= Pattern.compile("[?&]+"); //以多条件分割字符串 String[] strSigns = patSplit.split(url+"?"+cookie);for (String strSign:strSigns){if(strSign.contains("=") && strSign.split("=").length>1){//para=value参数及其值提取String para=strSign.split("=")[0];//等号右边参数if (para==null) para="";Matcher matPara = patPara.matcher(para);String paraVal=strSign.split("=")[1];//等号左边参数值if (paraVal==null) paraVal="";Matcher matParaVal = patParaVal.matcher(paraVal);if(matPara.find() && matParaVal.find()){strKey=account+"|"+date+"|"+city+"|"+domain+"|"+para+"|"+paraVal;return strKey;}           }}  }           }return strKey;}//时间戳转日期时间public static String timestamp2date(String _timeStamp){String dateFormat = "yyyyMMddHHmmss";SimpleDateFormat fm = new SimpleDateFormat(dateFormat);if (_timeStamp.equals("")){return "";}try{long timeStamp = Long.parseLong(_timeStamp);String dt = fm.format(new Date(timeStamp*1000));return dt;} catch (Exception ex){return "";}}//截取时间日期字符串的前8位,输出日期public static String DatetimeToDate(String _datetime){return _datetime.substring(0,8);//从第一个字符开始,共8个字符输出}//从url中提取域名public static String hostFromUrl(String _url){String host=null;try {host = new URL(_url).getHost().toLowerCase();// 此处获取值转换为小写} catch (MalformedURLException e) {// TODO Auto-generated catch blocke.printStackTrace();}return host;}//hadoop目录规范/public static String changeToDir(String dirPath){//目录最后是否有/if(dirPath.charAt(dirPath.length()-1)!='/'){dirPath = dirPath + "/";}return dirPath;}public static void main(String[] args) throws Exception { }
}

MapReduce基础开发之六Map多输入相关推荐

  1. MapReduce基础开发之二数据去重和排序

    因Hadoop集群平台网络限制,只能在eclipse里先写好代码再提交jar到集群平台namenode上执行,不能实时调试,所以没有配置eclipse的hadoop开发环境,只是引入了hadoop的l ...

  2. MapReduce基础开发之三字段处理并输出Hive表

    1.MR设计和开发    1)设计:      输入:用户名 | 数字ip | 时间戳 |  url      MR处理:正则表达式匹配url,满足则解析url并转换ip和时间戳,      输出:用 ...

  3. MapReduce基础开发之十读写ORC File

    1.ORC File Orc是Hive特有的一种列式存储的文件格式,它有着非常高的压缩比和读取效率,因此很快取代了之前的RCFile,成为Hive中非常常用的一种文件格式. 2.编译ORC Jar包 ...

  4. MapReduce基础开发之五分布式下载ftp文件到本地再迁移到hdfs

    为利用Hadoop集群平台的分布存储和计算能力,基于MapReduce将ftp文件分布式下载并上传到HDFS中. 1.文件移动流程:ftp服务器->datanode本地目录->HDFS目录 ...

  5. MapReduce基础开发之四参数传递

    Map和Reduce函数是在各节点进行,如果要在MapReduce数据加工中使用共同参数,要如何传参呢?方法有二: 1.Configuration类的set和get的方法读取xml/txt文件设置或自 ...

  6. MapReduce基础开发之十二ChainMapper和ChainReducer使用

    1.需求场景:    过滤无意义的单词后再进行文本词频统计.处理流程是: 1)第一个Map使用无意义单词数组过滤输入流: 2)第二个Map将过滤后的单词加上出现一次的标签: 3)最后Reduce输出词 ...

  7. MapReduce基础开发之十一DistributedCache使用

    1.需求场景:    过滤无意义的单词后再进行文本词频统计.处理流程是: 1)预定义要过滤的无意义单词保存成文件,保存到HDFS中: 2)程序中将该文件定位为作业的缓存文件,使用Distributed ...

  8. MapReduce基础开发context.write注意new text()多出一列的问题

    1.问题描述:在MapReduce中代码中,Map输出context.write(okey,new text("")),Reduce也是context.write(okey,new ...

  9. MapReduce基础开发之一词汇统计和排序(wordcount)

    统计/var/log/boot.log中含k的字符的数量,并对含k的字符按照数量排序.需分两个job完成,一个用来统计,一个用来排序. 一.统计 1.上传文件到hadoop:    1)新建文件夹:h ...

最新文章

  1. 变换为json类型却遭遇乱码\u516c\u5f00\u65e5\u671f
  2. phpcmsV9SQL注射+列目录
  3. matlab 处理dat文件画图,matlab_DAT_processing matlab处理dat文件并进行绘图 - 下载 - 搜珍网...
  4. 【牛客 - 297C】little w and Segment Coverage(差分数组,区间差分,思维,卡线段树)☆
  5. php中双等与三等,利用php中双等于和三等于的区别,无需密码拿到flag
  6. init_MUTEX被废除
  7. C++实现类似反射模式
  8. 【渝粤教育】国家开放大学2018年春季 8668-22T汽车涂装技术(A) 参考试题
  9. Atitit 作用域的理解attilax总结
  10. apache服务上配置https安全与域名请求
  11. 七个基本量纲_超星尔雅三维建模与仿真答案题库
  12. 雨滴win7计算机路径,win7雨滴桌面秀 Raindrop Desktop Show教程_计算机软件和应用程序_IT /计算机_信息...
  13. 计算机组装与维护选教材,计算机组装与维护校本教材.doc
  14. linux数据库能看到系统执行了哪些命令,Linux-Mysql常用命令(上)
  15. win10清理c盘_系统慢?给你的C盘减减肥!
  16. 基于复化辛卜生求积公式的变步长求积算法
  17. layer进度条ajax,layui动态进度条详细。
  18. html手机打不开是什么,手机打不开微信的网页怎么办?手机打不开微信网页的原因和解决方法...
  19. git上传到阿里云code
  20. 是寒冬还是风口?2015年HTML5游戏完整产业链报告

热门文章

  1. python怎么显示分数_python分数怎么表示什么
  2. SpringBoot——项目搭建、整合Mybatis、整合redis(集群)
  3. JAVA中this和super用法
  4. 阿里云(一)云存储OSS的命令行osscmd的安装和使用
  5. Spring Security默认的用户登录表单 页面源代码
  6. C++ 容器 LIST VECTOR erase
  7. 厦门“快捷贷”项目启动 最高可贷500万
  8. [EasyTao(道)系列文章之一]太极之道
  9. 虚位以待C#-北京Objectiva
  10. Linux驱动开发环境配置(内核源码树构造)