在MR程序的开发过程中,经常会遇到输入数据不是HDFS或者数据输出目的地不是HDFS的,MapReduce的设计已经考虑到这种情况,它为我们提供了两个组建,只需要我们自定义适合的InputFormat和OutputFormat,就可以完成这个需求,这里简单的介绍一个从MongoDB中读数据,并写出数据到MongoDB中的一种情况,只是一个Demo,所以数据随便找的一个。


一、自定义InputFormat

  MapReduce中Map阶段的数据输入是由InputFormat决定的,我们查看org.apache.hadoop.mapreduce.InputFormat的源码可以看到以下代码内容,我们可以看到除了实现InputFormat抽象类以外,我们还需要自定义InputSplit和自定义RecordReader类,这两个类的主要作用分别是:split确定数据分片的大小以及数据的位置信息,recordReader具体的读取数据。

public abstract class InputFormat<K, V> {public abstract List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException; // 获取Map阶段的数据分片集合信息 public abstract RecordReader<K,V> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException; // 创建具体的数据读取对象
}

  1、自定义InputSplit

    自定义InputSplit主要需要实现的方法有一下几个:

public abstract class InputSplit {  public abstract long getLength() throws IOException, InterruptedException; // 获取当前分片的长度大小  public abstract String[] getLocations() throws IOException, InterruptedException; // 获取当前分片的位置信息
}

  2、自定义RecordReader

    自定义RecordReader的主要实现方法有一下几个:

public abstract class RecordReader<KEYIN, VALUEIN> implements Closeable {
  public abstract void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException; // 初始化,如果在构造函数中初始化了,那么该方法可以为空public abstract boolean nextKeyValue() throws IOException, InterruptedException; //是否存在下一个key/value,如果存在返回true。否则返回false。public abstract KEYIN getCurrentKey() throws IOException, InterruptedException;  // 获取当然keypublic abstract VALUEIN getCurrentValue() throws IOException, InterruptedException;  // 获取当然valuepublic abstract float getProgress() throws IOException, InterruptedException;  // 获取进度信息public abstract void close() throws IOException; // 关闭资源
}

二、自定义OutputFormat

  MapReduce中Reducer阶段的数据输出是由OutputFormat决定的,决定数据的输出目的地和job的提交对象,我们查看org.apache.hadoop.mapreduce.OutputFormat的源码可以看到以下代码内容,我们可以看到除了实现OutputFormat抽象类以外,我们还需要自定义RecordWriter和自定义OutputCommitter类,其中OutputCommitter类由于不涉及到具体的输出目的地,所以一般情况下,不用重写,可直接使用FileOutputcommitter对象;RecordWriter类是具体的定义如何将数据写到目的地的。

public abstract class OutputFormat<K, V> { public abstract RecordWriter<K, V> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException; // 获取具体的数据写出对象public abstract void checkOutputSpecs(JobContext context) throws IOException, InterruptedException; // 检查输出配置信息是否正确public abstract OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException; // 获取输出job的提交者对象
}

  1、自定义RecordWriter

    查看RecordWriter源码,我们可以看到主要需要实现的有下列三个方法,分别是:

public abstract class RecordWriter<K, V> {  public abstract void write(K key, V value) throws IOException, InterruptedException;  // 具体的写数据的方法public abstract void close(TaskAttemptContext context) throws IOException, InterruptedException; // 关闭资源
}

三、详细代码

  自定义InputFormat&InputSplit

  1 package com.gerry.mongo.hadoop2x.mr.mongodb.lib;
  2
  3 import java.io.DataInput;
  4 import java.io.DataOutput;
  5 import java.io.IOException;
  6 import java.util.ArrayList;
  7 import java.util.List;
  8 import java.util.Map;
  9
 10 import org.apache.hadoop.conf.Configurable;
 11 import org.apache.hadoop.conf.Configuration;
 12 import org.apache.hadoop.io.LongWritable;
 13 import org.apache.hadoop.io.Writable;
 14 import org.apache.hadoop.mapreduce.InputFormat;
 15 import org.apache.hadoop.mapreduce.InputSplit;
 16 import org.apache.hadoop.mapreduce.JobContext;
 17 import org.apache.hadoop.mapreduce.MRJobConfig;
 18 import org.apache.hadoop.mapreduce.RecordReader;
 19 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 20 import org.apache.log4j.Logger;
 21
 22 import com.mongodb.BasicDBObject;
 23 import com.mongodb.BasicDBObjectBuilder;
 24 import com.mongodb.DB;
 25 import com.mongodb.DBCollection;
 26 import com.mongodb.DBObject;
 27 import com.mongodb.Mongo;
 28 import com.mongodb.MongoException;
 29
 30 public class MongoDBInputFormat<T extends MongoDBWritable> extends InputFormat<LongWritable, T> implements Configurable {
 31     private static final Logger LOG = Logger.getLogger(MongoDBInputFormat.class);
 32
 33     /**
 34      * 空的对象,主要作用是不进行任何操作,类似于NullWritable
 35      */
 36     public static class NullMongoDBWritable implements MongoDBWritable, Writable {
 37         @Override
 38         public void write(DBCollection collection) throws MongoException {
 39             // TODO Auto-generated method stub
 40         }
 41
 42         @Override
 43         public void readFields(DBObject object) throws MongoException {
 44             // TODO Auto-generated method stub
 45         }
 46
 47         @Override
 48         public void write(DataOutput out) throws IOException {
 49             // TODO Auto-generated method stub
 50         }
 51
 52         @Override
 53         public void readFields(DataInput in) throws IOException {
 54             // TODO Auto-generated method stub
 55         }
 56
 57         @Override
 58         public DBObject fetchWriteDBObject(DBObject old) throws MongoException {
 59             // TODO Auto-generated method stub
 60             return old;
 61         }
 62
 63     }
 64
 65     /**
 66      * MongoDB的input split类
 67      */
 68     public static class MongoDBInputSplit extends InputSplit implements Writable {
 69         private long end = 0;
 70         private long start = 0;
 71
 72         /**
 73          * 默认构造方法
 74          */
 75         public MongoDBInputSplit() {
 76         }
 77
 78         /**
 79          * 便利的构造方法
 80          *
 81          * @param start
 82          *            集合中查询的文档开始行号
 83          * @param end
 84          *            集合中查询的文档结束行号
 85          */
 86         public MongoDBInputSplit(long start, long end) {
 87             this.start = start;
 88             this.end = end;
 89         }
 90
 91         public long getEnd() {
 92             return end;
 93         }
 94
 95         public long getStart() {
 96             return start;
 97         }
 98
 99         @Override
100         public void write(DataOutput out) throws IOException {
101             out.writeLong(this.start);
102             out.writeLong(this.end);
103         }
104
105         @Override
106         public void readFields(DataInput in) throws IOException {
107             this.start = in.readLong();
108             this.end = in.readLong();
109         }
110
111         @Override
112         public long getLength() throws IOException, InterruptedException {
113             // 分片大小
114             return this.end - this.start;
115         }
116
117         @Override
118         public String[] getLocations() throws IOException, InterruptedException {
119             // TODO 返回一个空的数组,表示不进行数据本地化的优化,那么map执行节点随机选择。
120             return new String[] {};
121         }
122
123     }
124
125     protected MongoDBConfiguration mongoConfiguration; // mongo相关配置信息
126     protected Mongo mongo; // mongo连接
127     protected String databaseName; // 连接的数据库名称
128     protected String collectionName; // 连接的集合名称
129     protected DBObject conditionQuery; // 选择条件
130     protected DBObject fieldQuery; // 需要的字段条件
131
132     @Override
133     public List<InputSplit> getSplits(JobContext job) throws IOException, InterruptedException {
134         DBCollection dbCollection = null;
135         try {
136             dbCollection = this.getDBCollection();
137             // 获取数量大小
138             long count = dbCollection.count(this.getConditionQuery());
139             int chunks = job.getConfiguration().getInt(MRJobConfig.NUM_MAPS, 1);
140             long chunkSize = (count / chunks); // 分片数量
141
142             // 开始分片,只是简单的分配每个分片的数据量
143             List<InputSplit> splits = new ArrayList<InputSplit>();
144             for (int i = 0; i < chunks; i++) {
145                 MongoDBInputSplit split = null;
146                 if ((i + 1) == chunks) {
147                     split = new MongoDBInputSplit(i * chunkSize, count);
148                 } else {
149                     split = new MongoDBInputSplit(i * chunkSize, (i * chunkSize) + chunkSize);
150                 }
151                 splits.add(split);
152             }
153             return splits;
154         } catch (Exception e) {
155             throw new IOException(e);
156         } finally {
157             dbCollection = null;
158             closeConnection(); // 关闭资源的连接
159         }
160     }
161
162     @Override
163     public RecordReader<LongWritable, T> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
164         return createRecordReader((MongoDBInputSplit) split, context.getConfiguration());
165     }
166
167     protected RecordReader<LongWritable, T> createRecordReader(MongoDBInputSplit split, Configuration conf) {
168         // 获取从mongodb中读取数据需要转换成的value class,默认为NullMongoDBWritable
169         Class<? extends MongoDBWritable> valueClass = this.mongoConfiguration.getValueClass();
170         return new MongoDBRecordReader<T>(split, valueClass, conf, getDBCollection(), getConditionQuery(), getFieldQuery());
171     }
172
173     @Override
174     public void setConf(Configuration conf) {
175         mongoConfiguration = new MongoDBConfiguration(conf);
176         databaseName = this.mongoConfiguration.getInputDatabaseName(); // 输入数据的数据库
177         collectionName = this.mongoConfiguration.getInputCollectionName(); // 输入数据的集合
178         getMongo(); // 初始化
179         getConditionQuery(); // 初始化
180         getFieldQuery(); // 初始化
181     }
182
183     @Override
184     public Configuration getConf() {
185         return this.mongoConfiguration.getConfiguration();
186     }
187
188     public Mongo getMongo() {
189         try {
190             if (null == this.mongo) {
191                 this.mongo = this.mongoConfiguration.getMongoConnection();
192             }
193         } catch (Exception e) {
194             throw new RuntimeException(e);
195         }
196         return mongo;
197     }
198
199     public DBObject getConditionQuery() {
200         if (null == this.conditionQuery) {
201             Map<String, String> conditions = this.mongoConfiguration.getInputConditions();
202             BasicDBObjectBuilder builder = new BasicDBObjectBuilder();
203             for (Map.Entry<String, String> entry : conditions.entrySet()) {
204                 if (entry.getValue() != null) {
205                     builder.append(entry.getKey(), entry.getValue());
206                 } else {
207                     builder.push(entry.getKey());
208                 }
209             }
210             if (builder.isEmpty()) {
211                 this.conditionQuery = new BasicDBObject();
212             } else {
213                 this.conditionQuery = builder.get();
214             }
215         }
216         return this.conditionQuery;
217     }
218
219     public DBObject getFieldQuery() {
220         if (fieldQuery == null) {
221             String[] fields = this.mongoConfiguration.getInputFieldNames();
222             if (fields != null && fields.length > 0) {
223                 BasicDBObjectBuilder builder = new BasicDBObjectBuilder();
224                 for (String field : fields) {
225                     builder.push(field);
226                 }
227                 fieldQuery = builder.get();
228             } else {
229                 fieldQuery = new BasicDBObject();
230             }
231         }
232         return fieldQuery;
233     }
234
235     protected DBCollection getDBCollection() {
236         DB db = getMongo().getDB(this.databaseName);
237         if (this.mongoConfiguration.isEnableAuth()) {
238             String username = this.mongoConfiguration.getUsername();
239             String password = this.mongoConfiguration.getPassword();
240             if (!db.authenticate(username, password.toCharArray())) {
241                 throw new RuntimeException("authenticate failure with the username:" + username + ",pwd:" + password);
242             }
243         }
244         return db.getCollection(collectionName);
245     }
246
247     protected void closeConnection() {
248         try {
249             if (null != this.mongo) {
250                 this.mongo.close();
251                 this.mongo = null;
252             }
253         } catch (Exception e) {
254             LOG.debug("Exception on close", e);
255         }
256     }
257 }

MongoDBInputFormat.java

  自定义RecordReader

package com.gerry.mongo.hadoop2x.mr.mongodb.lib;import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.ReflectionUtils;import com.mongodb.DBCollection;
import com.mongodb.DBCursor;
import com.mongodb.DBObject;public class MongoDBRecordReader<T extends MongoDBWritable> extends RecordReader<LongWritable, T> {private Class<? extends MongoDBWritable> valueClass;private LongWritable key;private T value;private long pos;private Configuration conf;private MongoDBInputFormat.MongoDBInputSplit split;private DBCollection collection;private DBObject conditionQuery;private DBObject fieldQuery;private DBCursor cursor;public MongoDBRecordReader(MongoDBInputFormat.MongoDBInputSplit split, Class<? extends MongoDBWritable> valueClass, Configuration conf, DBCollection collection, DBObject conditionQuery,DBObject fieldQuery) {this.split = split;this.valueClass = valueClass;this.collection = collection;this.conditionQuery = conditionQuery;this.fieldQuery = fieldQuery;this.conf = conf;}@Overridepublic void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {// do nothing
    }@SuppressWarnings("unchecked")@Overridepublic boolean nextKeyValue() throws IOException, InterruptedException {try {if (key == null) {key = new LongWritable();}if (value == null) {value = (T) ReflectionUtils.newInstance(valueClass, conf);}if (null == cursor) {cursor = executeQuery();}if (!cursor.hasNext()) {return false;}key.set(pos + split.getStart()); // 设置keyvalue.readFields(cursor.next()); // 设置valuepos++;} catch (Exception e) {throw new IOException("Exception in nextKeyValue", e);}return true;}protected DBCursor executeQuery() {try {return collection.find(conditionQuery, fieldQuery).skip((int) split.getStart()).limit((int) split.getLength());} catch (IOException | InterruptedException e) {throw new RuntimeException(e);}}@Overridepublic LongWritable getCurrentKey() throws IOException, InterruptedException {return this.key;}@Overridepublic T getCurrentValue() throws IOException, InterruptedException {return this.value;}@Overridepublic float getProgress() throws IOException, InterruptedException {return pos;}@Overridepublic void close() throws IOException {if (collection != null) {collection.getDB().getMongo().close();}}}

MongoDBRecordReader.java

  自定义OutputFormat&RecordWriter

package com.gerry.mongo.hadoop2x.mr.mongodb.lib;import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.Logger;import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.DBObject;
import com.mongodb.Mongo;public class MongoDBOutputFormat<K extends MongoDBWritable, V extends MongoDBWritable> extends OutputFormat<K, V> {private static Logger LOG = Logger.getLogger(MongoDBOutputFormat.class);/*** A RecordWriter that writes the reduce output to a MongoDB collection* * @param <K>* @param <T>*/public static class MongoDBRecordWriter<K extends MongoDBWritable, V extends MongoDBWritable> extends RecordWriter<K, V> {private Mongo mongo;private String databaseName;private String collectionName;private MongoDBConfiguration dbConf;private DBCollection dbCollection;private DBObject dbObject;private boolean enableFetchMethod;public MongoDBRecordWriter(MongoDBConfiguration dbConf, Mongo mongo, String databaseName, String collectionName) {this.mongo = mongo;this.databaseName = databaseName;this.collectionName = collectionName;this.dbConf = dbConf;this.enableFetchMethod = this.dbConf.isEnableUseFetchMethod();getDbCollection();// 创建连接
        }protected DBCollection getDbCollection() {if (null == this.dbCollection) {DB db = this.mongo.getDB(this.databaseName);if (this.dbConf.isEnableAuth()) {String username = this.dbConf.getUsername();String password = this.dbConf.getPassword();if (!db.authenticate(username, password.toCharArray())) {throw new RuntimeException("authenticate failure, the username:" + username + ", pwd:" + password);}}this.dbCollection = db.getCollection(this.collectionName);}return this.dbCollection;}@Overridepublic void write(K key, V value) throws IOException, InterruptedException {if (this.enableFetchMethod) {this.dbObject = key.fetchWriteDBObject(null);this.dbObject = value.fetchWriteDBObject(this.dbObject);// 写数据this.dbCollection.insert(this.dbObject);// 在这里可以做一个缓存,一起提交,如果数据量大的情况下。this.dbObject = null;} else {// 直接调用写方法
                key.write(dbCollection);value.write(dbCollection);}}@Overridepublic void close(TaskAttemptContext context) throws IOException, InterruptedException {if (this.mongo != null) {this.dbCollection = null;this.mongo.close();}}}@Overridepublic RecordWriter<K, V> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {try {MongoDBConfiguration dbConf = new MongoDBConfiguration(context.getConfiguration());String databaseName = dbConf.getOutputDatabaseName();String collectionName = dbConf.getOutputCollectionName();Mongo mongo = dbConf.getMongoConnection();return new MongoDBRecordWriter<K, V>(dbConf, mongo, databaseName, collectionName);} catch (Exception e) {LOG.error("Create the record writer occur exception.", e);throw new IOException(e);}}@Overridepublic void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {// 不进行检测
    }@Overridepublic OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {// 由于outputcommitter主要作用是提交jar,分配jar的功能。所以我们这里直接使用FileOutputCommitterreturn new FileOutputCommitter(FileOutputFormat.getOutputPath(context), context);}/*** 设置output属性* * @param job* @param databaseName* @param collectionName*/public static void setOutput(Job job, String databaseName, String collectionName) {job.setOutputFormatClass(MongoDBOutputFormat.class);job.setReduceSpeculativeExecution(false);MongoDBConfiguration mdc = new MongoDBConfiguration(job.getConfiguration());mdc.setOutputCollectionName(collectionName);mdc.setOutputDatabaseName(databaseName);}/*** 静止使用fetch方法* * @param conf*/public static void disableFetchMethod(Configuration conf) {conf.setBoolean(MongoDBConfiguration.OUTPUT_USE_FETCH_METHOD_PROPERTY, false);}
}

MongoDBOutputFormat.java

  其他涉及到的java代码

package com.gerry.mongo.hadoop2x.mr.mongodb.lib;import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;import com.gerry.mongo.hadoop2x.mr.mongodb.lib.MongoDBInputFormat.NullMongoDBWritable;
import com.mongodb.Mongo;
import com.mongodb.ServerAddress;public class MongoDBConfiguration {public static final String BIND_HOST_PROPERTY = "mapreduce.mongo.host";public static final String BIND_PORT_PROPERTY = "mapreduce.mongo.port";public static final String AUTH_ENABLE_PROPERTY = "mapreduce.mongo.auth.enable";public static final String USERNAME_PROPERTY = "mapreduce.mongo.username";public static final String PASSWORD_PROPERTY = "mapreduce.mongo.password";public static final String PARTITION_PROPERTY = "mapreduce.mongo.partition";public static final String INPUT_DATABASE_NAME_PROPERTY = "mapreduce.mongo.input.database.name";public static final String INPUT_COLLECTION_NAME_PROPERTY = "mapreduce.mongo.input.collection.name";public static final String INPUT_FIELD_NAMES_PROPERTY = "mapreduce.mongo.input.field.names";public static final String INPUT_CONDITIONS_PROPERTY = "mapreduce.mongo.input.conditions";public static final String INPUT_CLASS_PROPERTY = "mapreduce.mongo.input.class";public static final String OUTPUT_DATABASE_NAME_PROPERTY = "mapreduce.mongo.output.database.name";public static final String OUTPUT_COLLECTION_NAME_PROPERTY = "mapreduce.mongo.output.collection.name";// 在recordwriter中到底是否调用fetch方法,默认调用。如果设置为不调用,那么就直接使用writer方法public static final String OUTPUT_USE_FETCH_METHOD_PROPERTY = "mapreduce.mongo.output.use.fetch.method";private Configuration conf;public MongoDBConfiguration(Configuration conf) {this.conf = conf;}/*** 获取Configuration对象* * @return*/public Configuration getConfiguration() {return this.conf;}/*** 设置连接信息* * @param host* @param port* @return*/public MongoDBConfiguration configureDB(String host, int port) {return this.configureDB(host, port, false, null, null);}/*** 设置连接信息* * @param host* @param port* @param enableAuth* @param username* @param password* @return*/public MongoDBConfiguration configureDB(String host, int port, boolean enableAuth, String username, String password) {this.conf.set(BIND_HOST_PROPERTY, host);this.conf.setInt(BIND_PORT_PROPERTY, port);if (enableAuth) {this.conf.setBoolean(AUTH_ENABLE_PROPERTY, true);this.conf.set(USERNAME_PROPERTY, username);this.conf.set(PASSWORD_PROPERTY, password);}return this;}/*** 获取MongoDB的连接对象Connection对象* * @return* @throws UnknownHostException*/public Mongo getMongoConnection() throws UnknownHostException {return new Mongo(new ServerAddress(this.getBindHost(), this.getBindPort()));}/*** 获取设置的host* * @return*/public String getBindHost() {return this.conf.get(BIND_HOST_PROPERTY, "localhost");}/*** 获取设置的port* * @return*/public int getBindPort() {return this.conf.getInt(BIND_PORT_PROPERTY, 27017);}/*** 获取是否开启安全验证,默认的Mongodb是不开启的。* * @return*/public boolean isEnableAuth() {return this.conf.getBoolean(AUTH_ENABLE_PROPERTY, false);}/*** 获取完全验证所需要的用户名* * @return*/public String getUsername() {return this.conf.get(USERNAME_PROPERTY);}/*** 获取安全验证所需要的密码* * @return*/public String getPassword() {return this.conf.get(PASSWORD_PROPERTY);}public String getPartition() {return conf.get(PARTITION_PROPERTY, "|");}public MongoDBConfiguration setPartition(String partition) {conf.set(PARTITION_PROPERTY, partition);return this;}public String getInputDatabaseName() {return conf.get(INPUT_DATABASE_NAME_PROPERTY, "test");}public MongoDBConfiguration setInputDatabaseName(String databaseName) {conf.set(INPUT_DATABASE_NAME_PROPERTY, databaseName);return this;}public String getInputCollectionName() {return conf.get(MongoDBConfiguration.INPUT_COLLECTION_NAME_PROPERTY, "test");}public void setInputCollectionName(String tableName) {conf.set(MongoDBConfiguration.INPUT_COLLECTION_NAME_PROPERTY, tableName);}public String[] getInputFieldNames() {return conf.getStrings(MongoDBConfiguration.INPUT_FIELD_NAMES_PROPERTY);}public void setInputFieldNames(String... fieldNames) {conf.setStrings(MongoDBConfiguration.INPUT_FIELD_NAMES_PROPERTY, fieldNames);}public Map<String, String> getInputConditions() {Map<String, String> result = new HashMap<String, String>();String[] conditions = conf.getStrings(INPUT_CONDITIONS_PROPERTY);if (conditions != null && conditions.length > 0) {String partition = this.getPartition();String[] values = null;for (String condition : conditions) {values = condition.split(partition);if (values != null && values.length == 2) {result.put(values[0], values[1]);} else {result.put(condition, null);}}}return result;}public void setInputConditions(Map<String, String> conditions) {if (conditions != null && conditions.size() > 0) {String[] values = new String[conditions.size()];String partition = this.getPartition();int k = 0;for (Map.Entry<String, String> entry : conditions.entrySet()) {if (entry.getValue() != null) {values[k++] = entry.getKey() + partition + entry.getValue();} else {values[k++] = entry.getKey();}}conf.setStrings(INPUT_CONDITIONS_PROPERTY, values);}}public Class<? extends MongoDBWritable> getValueClass() {return conf.getClass(INPUT_CLASS_PROPERTY, NullMongoDBWritable.class, MongoDBWritable.class);}public void setInputClass(Class<? extends DBWritable> inputClass) {conf.setClass(MongoDBConfiguration.INPUT_CLASS_PROPERTY, inputClass, DBWritable.class);}public String getOutputDatabaseName() {return conf.get(OUTPUT_DATABASE_NAME_PROPERTY, "test");}public MongoDBConfiguration setOutputDatabaseName(String databaseName) {conf.set(OUTPUT_DATABASE_NAME_PROPERTY, databaseName);return this;}public String getOutputCollectionName() {return conf.get(MongoDBConfiguration.OUTPUT_COLLECTION_NAME_PROPERTY, "test");}public void setOutputCollectionName(String tableName) {conf.set(MongoDBConfiguration.OUTPUT_COLLECTION_NAME_PROPERTY, tableName);}public boolean isEnableUseFetchMethod() {return conf.getBoolean(OUTPUT_USE_FETCH_METHOD_PROPERTY, true);}public void setOutputUseFetchMethod(boolean useFetchMethod) {conf.setBoolean(OUTPUT_USE_FETCH_METHOD_PROPERTY, useFetchMethod);}
}

MongoDBConfiguration.java

package com.gerry.mongo.hadoop2x.mr.mongodb.lib;import com.mongodb.DBCollection;
import com.mongodb.DBObject;
import com.mongodb.MongoException;public interface MongoDBWritable {/*** 往mongodb的集合中写数据* * @param collection* @throws MongoException*/public void write(DBCollection collection) throws MongoException;/*** 获取要写的mongoDB对象* * @param old* @return* @throws MongoException*/public DBObject fetchWriteDBObject(DBObject old) throws MongoException;/*** 从mongodb的集合中读数据* * @param collection* @throws MongoException*/public void readFields(DBObject object) throws MongoException;
}

MongoDBWritable.java

package com.gerry.mongo.hadoop2x.mr.mongodb.nw;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Date;
import java.util.HashSet;
import java.util.Set;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;import com.gerry.mongo.hadoop2x.mr.mongodb.lib.MongoDBConfiguration;
import com.gerry.mongo.hadoop2x.mr.mongodb.lib.MongoDBInputFormat;
import com.gerry.mongo.hadoop2x.mr.mongodb.lib.MongoDBOutputFormat;
import com.gerry.mongo.hadoop2x.mr.mongodb.lib.MongoDBWritable;
import com.mongodb.BasicDBObject;
import com.mongodb.BasicDBObjectBuilder;
import com.mongodb.DBCollection;
import com.mongodb.DBObject;
import com.mongodb.MongoException;public class Demo {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();// 设置输入的mongodb的数据库和集合,以及对应的输入对象value,这里的数据库和集合要求存在,否则是没有数据的,当然没有数据不会出问题conf.set(MongoDBConfiguration.INPUT_COLLECTION_NAME_PROPERTY, "users");conf.set(MongoDBConfiguration.INPUT_DATABASE_NAME_PROPERTY, "db_java");conf.setClass(MongoDBConfiguration.INPUT_CLASS_PROPERTY, DemoInputValueAndOutputKey.class, MongoDBWritable.class);Job job = Job.getInstance(conf, "mongodb-demo");job.setJarByClass(Demo.class);job.setMapperClass(DemoMapper.class);job.setReducerClass(DemoReducer.class);job.setOutputKeyClass(DemoInputValueAndOutputKey.class);job.setOutputValueClass(DemoOutputValue.class);job.setMapOutputKeyClass(DemoInputValueAndOutputKey.class);job.setMapOutputValueClass(NullWritable.class);job.setInputFormatClass(MongoDBInputFormat.class);MongoDBOutputFormat.setOutput(job, "foobar2", "users"); // 这个可以不存在
job.waitForCompletion(true);}public static class DemoOutputValue implements Writable, MongoDBWritable {private Date clientTime;private long count;@Overridepublic void write(DBCollection collection) throws MongoException {throw new UnsupportedOperationException();}@Overridepublic DBObject fetchWriteDBObject(DBObject old) throws MongoException {BasicDBObjectBuilder builder = null;Set<String> keys = new HashSet<String>();if (old != null) {keys = old.keySet();builder = BasicDBObjectBuilder.start(old.toMap());} else {builder = new BasicDBObjectBuilder();}// 添加当前对象的value值,如果存在同样的key,那么加序号builder.append(getKey(keys, "time", 0), clientTime).append(getKey(keys, "count", 0), this.count);return builder.get();}@Overridepublic void readFields(DBObject object) throws MongoException {throw new UnsupportedOperationException();}@Overridepublic void write(DataOutput out) throws IOException {out.writeLong(this.clientTime.getTime());out.writeLong(this.count);}@Overridepublic void readFields(DataInput in) throws IOException {this.clientTime = new Date(in.readLong());this.count = in.readLong();}public Date getClientTime() {return clientTime;}public void setClientTime(Date clientTime) {this.clientTime = clientTime;}public long getCount() {return count;}public void setCount(long count) {this.count = count;}}public static class DemoInputValueAndOutputKey implements MongoDBWritable, WritableComparable<DemoInputValueAndOutputKey> {private String name;private Integer age;private String sex;@Overridepublic void write(DataOutput out) throws IOException {if (this.name == null) {out.writeBoolean(false);} else {out.writeBoolean(true);out.writeUTF(this.name);}if (this.age == null) {out.writeBoolean(false);} else {out.writeBoolean(true);out.writeInt(this.age);}if (this.sex == null) {out.writeBoolean(false);} else {out.writeBoolean(true);out.writeUTF(this.sex);}}@Overridepublic void readFields(DataInput in) throws IOException {this.name = in.readBoolean() ? in.readUTF() : null;this.age = in.readBoolean() ? Integer.valueOf(in.readInt()) : null;this.sex = in.readBoolean() ? in.readUTF() : null;}@Overridepublic void write(DBCollection collection) throws MongoException {DBObject object = new BasicDBObject();object.put("name", this.name);object.put("age", this.age.intValue());object.put("sex", this.sex);collection.insert(object);}@Overridepublic void readFields(DBObject object) throws MongoException {this.name = (String) object.get("name");this.age = (Integer) object.get("age");this.sex = (String) object.get("sex");}@Overridepublic DBObject fetchWriteDBObject(DBObject old) throws MongoException {BasicDBObjectBuilder builder = null;Set<String> keys = new HashSet<String>();if (old != null) {keys = old.keySet();builder = BasicDBObjectBuilder.start(old.toMap());} else {builder = new BasicDBObjectBuilder();}// 添加当前对象的value值,如果存在同样的key,那么加序号if (this.name != null) {builder.append(getKey(keys, "name", 0), this.name);}if (this.age != null) {builder.append(getKey(keys, "age", 0), this.age.intValue());}if (this.sex != null) {builder.append(getKey(keys, "sex", 0), this.sex);}return builder.get();}@Overridepublic String toString() {return "DemoInputValue [name=" + name + ", age=" + age + ", sex=" + sex + "]";}@Overridepublic int compareTo(DemoInputValueAndOutputKey o) {int tmp;if (this.name == null) {if (o.name != null) {return -1;}} else if (o.name == null) {return 1;} else {tmp = this.name.compareTo(o.name);if (tmp != 0) {return tmp;}}if (this.age == null) {if (o.age != null) {return -1;}} else if (o.age == null) {return 1;} else {tmp = this.age - o.age;if (tmp != 0) {return tmp;}}if (this.sex == null) {if (o.sex != null) {return -1;}} else if (o.sex == null) {return 1;} else {return this.sex.compareTo(o.sex);}return 0;}}/*** 直接输出* * @author jsliuming* */public static class DemoMapper extends Mapper<LongWritable, DemoInputValueAndOutputKey, DemoInputValueAndOutputKey, NullWritable> {@Overrideprotected void map(LongWritable key, DemoInputValueAndOutputKey value, Context context) throws IOException, InterruptedException {context.write(value, NullWritable.get());}}/*** 写出数据,只做一个统计操作* * @author jsliuming* */public static class DemoReducer extends Reducer<DemoInputValueAndOutputKey, NullWritable, DemoInputValueAndOutputKey, DemoOutputValue> {private DemoOutputValue outputValue = new DemoOutputValue();@Overrideprotected void reduce(DemoInputValueAndOutputKey key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {long sum = 0;for (@SuppressWarnings("unused")NullWritable value : values) {sum++;}outputValue.setClientTime(new Date());outputValue.setCount(sum);context.write(key, outputValue);}}/*** 转换key,作用是当key存在keys集合中的时候,在key后面添加序号* * @param keys* @param key* @param index* @return*/public static String getKey(Set<String> keys, String key, int index) {while (keys.contains(key)) {key = key + (index++);}return key;}
}

Demo

四、结果截图

转载于:https://www.cnblogs.com/liuming1992/p/4758504.html

[Hadoop] - 自定义Mapreduce InputFormatOutputFormat相关推荐

  1. Hadoop之MapReduce面试知识复习

    Hadoop之MapReduce面试知识复习 目录 谈谈Hadoop序列化和反序列化及自定义bean对象实现序列化? FileInputFormat切片机制 在一个运行的Hadoop 任务中,什么是I ...

  2. Hadoop之MapReduce入门

    Hadoop之MapReduce概述 目录 MapReduce定义 MapReduce优缺点 MapReduce核心思想 MapReduce进程 MapReduce编程规范 MapReduce案例实操 ...

  3. hadoop之MapReduce学习教程

    hadoop之MapReduce学习 MapReduce概述 MapReduce定义 MapReduce是一个分布式运算程序的编程框架,是用户开发"基于Hadoop的数据分析应用" ...

  4. hadoop之mapreduce教程+案例学习(二)

    第3章 MapReduce框架原理 目录 第3章 MapReduce框架原理 3.1 InputFormat数据输入 3.1.1 切片与MapTask并行度决定机制 3.1.2 Job提交流程源码和切 ...

  5. hadoop之mapreduce教程+案例学习(一)

    第1章 MapReduce概述 目录 第1章 MapReduce概述 1.1 MapReduce定义 MapReduce是一个分布式运算程序的编程框架,是用户开发"基于Hadoop的数据分析 ...

  6. (超详细)大数据Hadoop之MapReduce组件

    一.MapReduce 简介 1.1 MapReduce的概述 在Hadoop生态圈中,MapReduce属于核心,负责进行分布式计算. MapReduce 核心功能是将用户编写的业务逻辑代码和自带默 ...

  7. 大数据技术之Hadoop(MapReduce)

    大数据技术之Hadoop(MapReduce) (作者:大数据研发部) 版本:V1.4 第1章MapReduce入门 map 计算 reduce 规约 1.1 MapReduce定义 Mapreduc ...

  8. Hadoop之mapreduce 实例三

    Hadoop之mapreduce 实例三 转载于:https://www.cnblogs.com/chaoren399/archive/2013/01/04/2844503.html

  9. hadoop和python的关系_Python 的 map 和 reduce 和 Hadoop 的 MapReduce 有什么关系?

    先说结论.Python 的 map 和 reduce 是Python的内置函数,而 Hadoop 的 MapReduce 是一个计算框架. 两者之间没有直接的关系.但是他们的部分计算操作思想是类似的. ...

最新文章

  1. 好好说说Java中的常量池之Class常量池
  2. python编程300集免费-python 300本电子书合集
  3. 推荐系统笔记(关键模块)
  4. python程序设计搜题软件_智慧职教APPPython程序设计基础答案搜题公众号
  5. DFT 与 ATPG综 述
  6. 常用的linux远程管理方法,常用windows下远程管理Linux服务器的方法
  7. “无继承”情况下的对象构造
  8. 动态更新 HTML 内容 —— AJAX
  9. NetScaler的Web 2.0 Push技术
  10. python实现给定一个单链表删除指定节点
  11. java ojdbc14 查询数据表,Oracle10g JDBC ojdbc14 DATE类型hibernate查询时分秒问题
  12. 人工智能 一种现代方法 第7章 逻辑Agent(命题逻辑)
  13. 微信商城小程序怎么弄?怎么做微信商城小程序?
  14. android 打印机 万能驱动,打印机驱动,万能打印机驱动下载,驱动程序_万能驱动下载...
  15. python多叉树_python中高效的四叉树实现
  16. Process finished with exit code -1073740791 (0xC0000409) 一种解决方案
  17. 当AI有了“自由意志”
  18. 装甲逆袭-NPC移动处理
  19. R语言 NetCoMi包 Co-occurrence网络图 微生物16S 网络比较 核心物种
  20. 华为p30怎么升级鸿蒙系统

热门文章

  1. NIOS II软核处理器
  2. 夫妻两一个两年内3张卡9次逾期,一人4次,还能办理房贷吗?
  3. 贷款能否成功,这4个人说了算
  4. warning: pointer of type 'void *' used in arithmetic
  5. verilog中assign语句
  6. 自学考试c语言真题,自学考试《C语言程序设计》练习题及答案
  7. android mdpi对应哪一个屏幕,android 常见分辨率(mdpi、hdpi 、xhdpi、xxhdpi )及屏幕适配...
  8. 换脸系列——整脸替换
  9. MKL学习——基本操作C++实现
  10. 【caffe-Windows】cifar实例编译之model的使用