两个map,一个map读取一个hdfs文件,map完之后进入一个reduce进行逻辑处理。

package com.zhongxin.mr;import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import java.io.IOException;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;/*** Created by DingYS on 2017/12/7.* 用户回款计划统计(详情)*/
public class UserPlanAmount {public static class StatisticsMap extends Mapper<LongWritable,Text,Text,Text> {private Text outKey = new Text();private Text outValue = new Text();private Pattern pattern = Pattern.compile(",");//statistics文件处理public void map(LongWritable key, Text value, Context context) throws IOException,InterruptedException{String strs[] = pattern.split(String.valueOf(value));String bidNo = strs[2];String userId = strs[3];String totalOnInvestedShare = strs[8];String addShare = strs[17];String addyield = strs[16];String outv = bidNo + pattern +"statstics" + pattern + userId + pattern + totalOnInvestedShare + pattern + addShare + pattern + addyield;outKey.set(bidNo);outValue.set(outv);context.write(outKey,outValue);}}public static class PlanMap extends Mapper<LongWritable,Text,Text,Text> {private Text outKey = new Text();private Text outValue = new Text();private Pattern pattern = Pattern.compile(",");// plan统计表(该文件在sqoop导入时就进行了数据计算及合并)public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{String strs[] = pattern.split(String.valueOf(value));String bidNo = strs[0];String interestTime = strs[1];String status = strs[2];String planStatus = strs[3];String yield = strs[4];String endDate = strs[6];String cycle = strs[7];String financedAmount = strs[8];String interestType = strs[9];String penaltyAmount = strs[10];String days = strs[11];if("INIT".equals(status)){String ouv = bidNo + pattern + "plan" + pattern + interestTime + pattern + planStatus + pattern + yield + pattern +cycle + pattern + financedAmount + pattern + interestType + pattern + penaltyAmount + pattern + days + pattern + endDate;outKey.set(bidNo);outValue.set(ouv);context.write(outKey,outValue);}}}public static class Reduce extends Reducer<Text,Text,Text,Text>{private Text outValue = new Text();private Pattern pattern = Pattern.compile(",");public void reduce(Text key,Iterable<Text> values,Context context) throws IOException,InterruptedException{Map<String,List<String>> planMap = new HashMap<String,List<String>>();List<String> statisticsLst = new ArrayList<String>();for(Text value : values){String strs[] = pattern.split(String.valueOf(value));String pbidNo = strs[0];if("plan".equals(strs[1])){if(planMap.containsKey(pbidNo)){planMap.get(pbidNo).add(String.valueOf(value));}else{List<String> planLst = new ArrayList<String>();planLst.add(String.valueOf(value));planMap.put(pbidNo,planLst);}}else{statisticsLst.add(String.valueOf(value));}}for(String value : statisticsLst){String strs[] = pattern.split(String.valueOf(value));String bidNo = strs[0];String userId = strs[2];String totalOnInvestedShare = strs[3];String addShare = strs[4];String addyield = strs[5];if(null == planMap.get(bidNo) || 0 >= planMap.get(bidNo).size()){continue;}String planBid = planMap.get(bidNo).get(0);if(StringUtils.isBlank(planBid)){continue;}String interestType = pattern.split(planBid)[7];if("A1".equals(interestType)){// 到期还本付息for(String v : planMap.get(bidNo)){String strp[] = pattern.split(v);String interestTime = strp[2];String yield = strp[4];String cycle = strp[5];BigDecimal interest = new BigDecimal(totalOnInvestedShare).multiply(new BigDecimal(yield)).divide(new BigDecimal(100),4);BigDecimal addInterest = new BigDecimal(0);if(StringUtils.isNotBlank(addShare) && StringUtils.isNotBlank(addyield)){addInterest = new BigDecimal(addShare).multiply(new BigDecimal(addyield)).divide(new BigDecimal(100),4);}BigDecimal totalInterest = interest.add(addInterest).multiply(new BigDecimal(cycle)).divide(new BigDecimal(365),2);String outv =  userId + pattern + bidNo + pattern + interestTime + pattern + totalInterest + 0.00 + 0.00;outValue.set(outv);context.write(key,outValue);}}else{// 按月付息,按季付息for(String v : planMap.get(bidNo)){String strp[] = pattern.split(v);String interestTime = strp[2];String yield = strp[4];String days = strp[9];String endDate = strp[10];String penaltyTotalAmount = strp[8];String financeAmount = strp[6];BigDecimal interest = new BigDecimal(totalOnInvestedShare).multiply(new BigDecimal(yield)).divide(new BigDecimal(100),4);BigDecimal addInterest = new BigDecimal(0);if("null".equals(addShare) && "null".equals(addyield)){addInterest = new BigDecimal(addShare).multiply(new BigDecimal(addyield)).divide(new BigDecimal(100),4);}BigDecimal totalInterest = interest.add(addInterest).multiply(new BigDecimal(days)).divide(new BigDecimal(365),2);String planSttus = strp[3];BigDecimal penalty = new BigDecimal(0);BigDecimal capital = new BigDecimal(0);if("ADVANCE".equals(planSttus)){// 提前还款penalty = new BigDecimal(penaltyTotalAmount).divide(new BigDecimal(financeAmount),2).multiply(new BigDecimal(totalOnInvestedShare));totalInterest = totalInterest.add(penalty);capital = new BigDecimal(totalOnInvestedShare);}/*** 最后一次派息capital记成totalOnInvestedShare*/if(interestTime.equals(endDate)){capital = new BigDecimal(totalOnInvestedShare);}String outv =  userId + pattern + bidNo + pattern + interestTime + pattern + totalInterest  +pattern + capital + pattern + penalty;outValue.set(outv);context.write(key,outValue);}}}}}public static void main(String[] args) throws Exception{Configuration config = new Configuration();Job job = Job.getInstance(config);job.setJobName("userPlanAmount");job.setJarByClass(UserPlanAmount.class);MultipleInputs.addInputPath(job,new Path(args[0]), TextInputFormat.class,StatisticsMap.class);MultipleInputs.addInputPath(job,new Path(args[1]),TextInputFormat.class,PlanMap.class);job.setReducerClass(Reduce.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);job.setOutputFormatClass(TextOutputFormat.class);FileOutputFormat.setOutputPath(job, new Path(args[2]));System.exit(job.waitForCompletion(true) ? 0 : 1);}}

  

转载于:https://www.cnblogs.com/Smilence1024/p/8041174.html

两个map一个reduce(两个输入文件)相关推荐

  1. android 一个应用两个入口一个应用两个快捷方式(不同图标显示)

    我们在 Android开发中,一个工程对应一个AndroidManifest.xml文件,这个文件中包含有该项目的一些设置,如权限.SDk版Activity.Service信息等.一般而言,这个文件中 ...

  2. 利用map和reduce编写一个str2float函数,把字符串'123.456'转换成浮点数123.456:

    题目:利用map和reduce编写一个str2float函数,把字符串'123.456'转换成浮点数123.456: 来源:廖雪峰Python上的练习题 思路定位小数点,分割小数部分和整数部分,使用r ...

  3. Python:利用map和reduce编写一个str2float函数,把字符串'123.456'转换成浮点数123.456

    # -*- coding: utf-8 -*- from functools import reducedef str2float(s):def fn(x, y):return x * 10 + yd ...

  4. 利用map和reduce编写一个str2float函数,把字符串'123.456'转换成浮点数123.456

    想转行做程序员,最近在看廖雪峰的python教程,年龄大了看算法很费劲,之前总是囫囵吞枣,急于求快最后啥都没学到,现在重新看一遍,刚刚把课后的作业想明白,跟其他人的答案比起来我的很拙劣,但是我依然很高 ...

  5. java中两个map的融合(两个map有相同字段)

    试想这样一个场景: 数据库表中 有 城市信息表 city_tbl: 有院士信息表  ys_tbl ,其中院士有城市id字段(id): 但是不是所有城市都有院士: 我们想要得到 城市的详细信息,包括院士 ...

  6. java map遍历_Java中Map集合的两种遍历方式

    Java中的map遍历有多种方法,从最早的Iterator,到java5支持的foreach,再到java8 Lambda,让我们一起来看下Java中Map集合的两种遍历方式! 关于遍历Map集合的几 ...

  7. java中两种遍历集合的方式_Java中Map集合的两种遍历方式

    Java中的map遍历有多种方法,从最早的Iterator,到java5支持的foreach,再到java8 Lambda,让我们一起来看下Java中Map集合的两种遍历方式! 关于遍历Map集合的几 ...

  8. c++ 两个Map容器的差异性比较,返回差异内容

    在实际应用中 遇到比较两个Map容器的差异,并对差异性内容做处理, 发现没有相应的函数接口, 自己写了一个功能函数: std::map<string, double> CompareMap ...

  9. java map合并_详解Java8合并两个Map中元素的正确姿势

    1. 介绍 本入门教程将介绍Java8中如何合并两个map. 更具体说来,我们将研究不同的合并方案,包括Map含有重复元素的情况. 2. 初始化 我们定义两个map实例 private static ...

最新文章

  1. 某熊周刊:一周推荐外文技术资料(12.2)
  2. CentOS 服务器安全设置
  3. JAVA 中的数据结构
  4. Linux(CentOS)同步时间
  5. java混合分页_坑,MySQL中 order by 与 limit 混用,分页会出现问题!
  6. InnoDB的Buffer Pool简介
  7. java的枚举_Java 枚举
  8. java定时功能分析
  9. ubuntu下mysql的master-slave,双master 及A-B-C级联主从配置说明
  10. Conficker.AE病毒局域网扫描工具
  11. Ubuntu下好用的文档比较工具Meld,代替Notepad++的文档对比功能?
  12. android三星滑动解锁,三星怎样取消滑动解锁
  13. Logstash过滤器之Mutate过滤器详解
  14. hangfire 介绍(一)
  15. Lambda表达式和Stream类的使用
  16. Oracle项目管理系统之设计任务下达及成果交付
  17. 基于启发式算法与单目优化和马尔科夫模型的进出口公司的货物装运策略——整数线性规划 随机模拟
  18. 清除Windows系统用户密码
  19. 计算机一级表格分类汇总怎么弄,多张word表格分类汇总 word表格分类汇总
  20. Windows XP Professional SP2 原版

热门文章

  1. CVE-2022-1609 WordPress Weblizar Backdoor
  2. shell脚本判断文件是否存在
  3. dataframe常用操作_Pandas模块基础及常用方法
  4. Git下的冲突解决【转】
  5. 学生怎么样可以有免费的阿里服务器
  6. 山西计算机专升本——据升本还有48天
  7. Python中Pyyaml模块的使用
  8. SQL简单的查询语句之模糊查询
  9. 2022TWS蓝牙耳机推荐,盘点600元真无线蓝牙耳机
  10. Solidworks如何制作动画1