The ChainMapper class allows to use multiple Mapper classes within a single Map task. 

The ChainReducer class allows to chain multiple Mapper classes after a Reducer within the Reducer task.

http://www.oratea.net/?p=371

通过ChainMapper可以将多个map类合并成一个map任务。

下面个这个例子没什么实际意思,但是很好的演示了ChainMapper的作用。

源文件
100 tom 90
101 mary 85
102 kate 60

map00的结果,过滤掉100的记录
101 mary 85
102 kate 60

map01的结果,过滤掉101的记录
102 kate 60

reduce结果
102 kate 60

package org.myorg;


import java.io.IOException;
import java.util.*;
import java.lang.String;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
import org.apache.hadoop.mapred.lib.*;

public class WordCount
{

public static class Map00 extends MapReduceBase implements Mapper
    {

public void map(Text key, Text value, OutputCollector output, Reporter reporter) throws IOException
        {

Text ft = new Text(“100″);

if(!key.equals(ft))
            {
                output.collect(key, value);
            }
        }
    }

public static class Map01 extends MapReduceBase implements Mapper
    {

public void map(Text key, Text value, OutputCollector output, Reporter reporter) throws IOException
        {

Text ft = new Text(“101″);

if(!key.equals(ft))
            {
                output.collect(key, value);
            }
        }
    }

public static class Reduce extends MapReduceBase implements Reducer
    {
        public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException
        {

while(values.hasNext())
            {
                output.collect(key, values.next());
            }

}
    }

public static void main(String[] args) throws Exception
    {

JobConf conf = new JobConf(WordCount.class);
        conf.setJobName(“wordcount00″);

conf.setInputFormat(KeyValueTextInputFormat.class);
        conf.setOutputFormat(TextOutputFormat.class);

ChainMapper cm = new ChainMapper();

JobConf mapAConf = new JobConf(false);
        cm.addMapper(conf, Map00.class, Text.class, Text.class, Text.class, Text.class, true, mapAConf);

JobConf mapBConf = new JobConf(false);
        cm.addMapper(conf, Map01.class, Text.class, Text.class, Text.class, Text.class, true, mapBConf);

conf.setReducerClass(Reduce.class);

conf00.setOutputKeyClass(Text.class);
        conf00.setOutputValueClass(Text.class);

FileInputFormat.setInputPaths(conf, new Path(args[0]));
        FileOutputFormat.setOutputPath(conf, new Path(args[1]));

JobClient.runJob(conf);

}
}

另外一个例子,代码很多,其实很简单,Conn几个类都是相同的

http://yixiaohuamax.iteye.com/blog/684244

package com.oncedq.code;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.text.SimpleDateFormat;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapred.jobcontrol.Job;
import org.apache.hadoop.mapred.jobcontrol.JobControl;
import org.apache.hadoop.mapred.lib.ChainMapper;

import com.oncedq.code.util.DateUtil;

public class ProcessSample {
    public static class ExtractMappper extends MapReduceBase implements
            Mapper<LongWritable, Text, LongWritable, Conn1> {

@Override
        public void map(LongWritable arg0, Text arg1,
                OutputCollector<LongWritable, Conn1> arg2, Reporter arg3)
                throws IOException {
            String line = arg1.toString();
            String[] strs = line.split(";");
            Conn1 conn1 = new Conn1();
            conn1.orderKey = Long.parseLong(strs[0]);
            conn1.customer = Long.parseLong(strs[1]);
            conn1.state = strs[2];
            conn1.price = Double.parseDouble(strs[3]);
            conn1.orderDate = DateUtil.getDateFromString(strs[4], "yyyy-MM-dd");
            LongWritable lw = new LongWritable(conn1.orderKey);
            arg2.collect(lw, conn1);
        }

}

private static class Conn1 implements WritableComparable<Conn1> {
        public long orderKey;
        public long customer;
        public String state;
        public double price;
        public java.util.Date orderDate;

@Override
        public void readFields(DataInput in) throws IOException {
            orderKey = in.readLong();
            customer = in.readLong();
            state = Text.readString(in);
            price = in.readDouble();
            orderDate = DateUtil.getDateFromString(Text.readString(in),
                    "yyyy-MM-dd");
        }

@Override
        public void write(DataOutput out) throws IOException {
            out.writeLong(orderKey);
            out.writeLong(customer);
            Text.writeString(out, state);
            out.writeDouble(price);
            Text.writeString(out, DateUtil.getDateStr(orderDate, "yyyy-MM-dd"));
        }

@Override
        public int compareTo(Conn1 arg0) {
            // TODO Auto-generated method stub
            return 0;
        }

}

public static class Filter1Mapper extends MapReduceBase implements
            Mapper<LongWritable, Conn1, LongWritable, Conn2> {

@Override
        public void map(LongWritable inKey, Conn1 c2,
                OutputCollector<LongWritable, Conn2> collector, Reporter report)
                throws IOException {
            if (c2.state.equals("F")) {
                Conn2 inValue = new Conn2();
                inValue.customer = c2.customer;
                inValue.orderDate = c2.orderDate;
                inValue.orderKey = c2.orderKey;
                inValue.price = c2.price;
                inValue.state = c2.state;
                collector.collect(inKey, inValue);
            }
        }

}

private static class Conn2 implements WritableComparable<Conn1> {
        public long orderKey;
        public long customer;
        public String state;
        public double price;
        public java.util.Date orderDate;

@Override
        public void readFields(DataInput in) throws IOException {
            orderKey = in.readLong();
            customer = in.readLong();
            state = Text.readString(in);
            price = in.readDouble();
            orderDate = DateUtil.getDateFromString(Text.readString(in),
                    "yyyy-MM-dd");
        }

@Override
        public void write(DataOutput out) throws IOException {
            out.writeLong(orderKey);
            out.writeLong(customer);
            Text.writeString(out, state);
            out.writeDouble(price);
            Text.writeString(out, DateUtil.getDateStr(orderDate, "yyyy-MM-dd"));
        }

@Override
        public int compareTo(Conn1 arg0) {
            // TODO Auto-generated method stub
            return 0;
        }

}

public static class RegexMapper extends MapReduceBase implements
            Mapper<LongWritable, Conn2, LongWritable, Conn3> {

@Override
        public void map(LongWritable inKey, Conn2 c3,
                OutputCollector<LongWritable, Conn3> collector, Reporter report)
                throws IOException {
            c3.state = c3.state.replaceAll("F", "Find");
            Conn3 c2 = new Conn3();
            c2.customer = c3.customer;
            c2.orderDate = c3.orderDate;
            c2.orderKey = c3.orderKey;
            c2.price = c3.price;
            c2.state = c3.state;
            collector.collect(inKey, c2);
        }
    }

private static class Conn3 implements WritableComparable<Conn1> {
        public long orderKey;
        public long customer;
        public String state;
        public double price;
        public java.util.Date orderDate;

@Override
        public void readFields(DataInput in) throws IOException {
            orderKey = in.readLong();
            customer = in.readLong();
            state = Text.readString(in);
            price = in.readDouble();
            orderDate = DateUtil.getDateFromString(Text.readString(in),
                    "yyyy-MM-dd");
        }

@Override
        public void write(DataOutput out) throws IOException {
            out.writeLong(orderKey);
            out.writeLong(customer);
            Text.writeString(out, state);
            out.writeDouble(price);
            Text.writeString(out, DateUtil.getDateStr(orderDate, "yyyy-MM-dd"));
        }

@Override
        public int compareTo(Conn1 arg0) {
            // TODO Auto-generated method stub
            return 0;
        }

}

public static class LoadMapper extends MapReduceBase implements
            Mapper<LongWritable, Conn3, LongWritable, Conn3> {

@Override
        public void map(LongWritable arg0, Conn3 arg1,
                OutputCollector<LongWritable, Conn3> arg2, Reporter arg3)
                throws IOException {
            arg2.collect(arg0, arg1);
        }

}

public static void main(String[] args) {
        JobConf job = new JobConf(ProcessSample.class);
        job.setJobName("ProcessSample");
        job.setNumReduceTasks(0);
        job.setInputFormat(TextInputFormat.class);
        job.setOutputFormat(TextOutputFormat.class);
        JobConf mapper1 = new JobConf();
        JobConf mapper2 = new JobConf();
        JobConf mapper3 = new JobConf();
        JobConf mapper4 = new JobConf();
        ChainMapper cm = new ChainMapper();
        cm.addMapper(job, ExtractMappper.class, LongWritable.class, Text.class,
                LongWritable.class, Conn1.class, true, mapper1);
        cm.addMapper(job, Filter1Mapper.class, LongWritable.class, Conn1.class,
                LongWritable.class, Conn2.class, true, mapper2);
        cm.addMapper(job, RegexMapper.class, LongWritable.class, Conn2.class,
                LongWritable.class, Conn3.class, true, mapper3);
        cm.addMapper(job, LoadMapper.class, LongWritable.class, Conn3.class,
                LongWritable.class, Conn3.class, true, mapper4);
        FileInputFormat.setInputPaths(job, new Path("orderData"));
        FileOutputFormat.setOutputPath(job, new Path("orderDataOutput"));
        Job job1;
        try {
            job1 = new Job(job);
            JobControl jc = new JobControl("test");
            jc.addJob(job1);
            jc.run();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

}

}

ChainMapper和ChainReducer相关推荐

  1. MapReduce基础开发之十二ChainMapper和ChainReducer使用

    1.需求场景:    过滤无意义的单词后再进行文本词频统计.处理流程是: 1)第一个Map使用无意义单词数组过滤输入流: 2)第二个Map将过滤后的单词加上出现一次的标签: 3)最后Reduce输出词 ...

  2. Hadoop实战第四章--读书笔记

    Hadoop三种运行方式: 单机模式.优点:安装配置简单,运行在本地文件系统,便于调试和查看运行效果:缺点:数据量大时较慢,不能模拟分布式模式: 伪分布式模式.优点:运行在本地HDFS文件系统上,能够 ...

  3. 大数据之hadoop伪集群搭建与MapReduce编程入门

    一.理论知识预热 一句话介绍hadoop: Hadoop的核心由分布式文件系统HDFS与Map/Reduce计算模型组成. (1)HDFS分布式文件系统 HDFS由三个角色构成: 1)NameNode ...

  4. hadoop学习;datajoin;chain签名;combine()

    hadoop有种简化机制来管理job和control的非线性作业之间的依赖.job对象时mapreduce的表现形式.job对象的实例化可通过传递一个jobconf对象到作业的构造函数中来实现. x. ...

  5. Hadoop Map/Reduce的工作流

    问题描述 我们的数据分析平台是单一的Map/Reduce过程,由于半年来不断地增加需求,导致了问题已经不是那么地简单,特别是在Reduce阶段,一些大对象会常驻内存.因此越来越顶不住压力了,当前内存问 ...

  6. 4 开发MapReduce应用程序

    系统参数配置 Configuration类由源来设置,每个源包含以XML形式出现的一系列属性/值对.如: configuration-default.xml configuration-site.xm ...

  7. Hadoop in action 第45678章

    第四五章     MapReduce基础         实例             使用专利局的数据             开发最好基于一个模板             单个类完整定义每个Map ...

  8. MapReduce作业Uber模式

    大家在提交MapReduce作业的时候肯定看过如下的输出: 17/04/17 14:00:38 INFO mapreduce.Job: Running job: job_1472052053889_0 ...

  9. MapReduce设计模式学习

    一:概要模式 1:简介 概要设计模式更接近简单的MR应用,因为基于键将数据分组是MR范型的核心功能,所有的键将被分组汇入reducer中 本章涉及的概要模式有数值概要(numerical summar ...

最新文章

  1. linux和windows的进程的虚拟地址空间
  2. 你必须学会HTML和CSS的9大理由,让你在以后的工作中更香
  3. 阿里云负载均衡升级:同城容灾进一步提升可用性
  4. [日常] Apache Order Deny,Allow的用法
  5. LORA无线模块使用
  6. PostgreSQL checksum与Data Corruption
  7. 怎样用硬盘启动计算机,电脑新增了硬盘,在bios中怎么设置硬盘启动,来看看具体操作步骤...
  8. C语言URL解析器(代码分享)
  9. 关于计算机的英语手抄报简单,英语手抄报简单又好看图片
  10. GameMakerStudio2调用外部dll库
  11. 水晶苍蝇拍-其他系列之一
  12. 两种方式实现矩阵键盘扫描(含程序)
  13. 蓝屏 0x00000001 问题怎么解决?
  14. Vue笔记(适合后端人员开发的快速入门)
  15. 数据查询语言及联表查询
  16. 普林斯顿大学形状基准
  17. Pinia全新一代状态管理工具Pinia-Vue3全家桶
  18. 《中国哲学史》读书笔记
  19. 美图 AB Test 实践:Meepo系统
  20. 【MindSpore易点通】在开发环境下如何使用MindInsight可视化Dump数据

热门文章

  1. 怎样让友情链接更加有效果
  2. AJAX的安全性及AJAX安全隐患
  3. 基于@Bean声明lazy-queue
  4. EnableAutoConfiguration自动配置底层原理
  5. 幻读(phantom read)
  6. 用户操作-用户添加操作代码实现
  7. 数据库-数据库的介绍
  8. maven工程运行环境修改
  9. batocera_batocera系统如何把旧电脑改造成影音游戏主机
  10. 河南职称计算机啥时间考试,河南省2020年高级职称考试时间与具体安排