hbase 源代码分析 (17)MapReduce 过程
public static Job createSubmittableJob(Configuration conf, String[] args)
throws IOException {
String tableName = args[0];
Path outputDir = new Path(args[1]);
Job job = new Job(conf, NAME + "_" + tableName);
job.setJobName(NAME + "_" + tableName);
job.setJarByClass(Export.class);
// 定义scan。主要根据配置是否需要设置fitle ,startkey,endkey等。
//简单 Scan s =new Scan()
Scan s = getConfiguredScanForJob(conf, args);
//这里会定义每一个region一个map。map的数量等于region的数量。这个map里面基本什么都没做就是读到的
//数据直接写出。
//这里会定义map的输入格式为
TableInputFormat.classIdentityTableMapper.initJob(tableName, s, IdentityTableMapper.class, job);
// No reducers. Just write straight to output files.
//直接保存数据。
job.setNumReduceTasks(0);
//输出文件
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Result.class);
FileOutputFormat.setOutputPath(job, outputDir); // job conf doesn't contain the conf so doesn't have a default fs.
return job;
}
@Override
public List<InputSplit> getSplits(JobContext context) throws IOException {
List<InputSplit> splits = super.getSplits(context);
if ((conf.get(SHUFFLE_MAPS) != null) && "true".equals(conf.get(SHUFFLE_MAPS).toLowerCase())) {
Collections.shuffle(splits);
}
return splits;
}
这里getSplits是根据regionLocationInfo ,分区当然是startkey。根据region的数量设置map的个数,这样就可以一个region
对应一个map了。当然这里没有设置,因为没必要。
在初始化map的时候设置了combinerClass为putCombiner
@Override
protected void reduce(K row, Iterable<Put> vals, Context context)
throws IOException, InterruptedException {
long threshold = context.getConfiguration().getLong(
"putcombiner.row.threshold", 1L * (1<<30));
int cnt = 0;
long curSize = 0;
Put put = null;
Map<byte[], List<Cell>> familyMap = null;
for (Put p : vals) {
cnt++;
if (put == null) {
put = p;
familyMap = put.getFamilyCellMap();
} else {
for (Entry<byte[], List<Cell>> entry : p.getFamilyCellMap()
.entrySet()) {
List<Cell> cells = familyMap.get(entry.getKey());
List<Cell> kvs = (cells != null) ? (List<Cell>) cells : null;
for (Cell cell : entry.getValue()) {
KeyValue kv = KeyValueUtil.ensureKeyValueTypeForMR(cell);
curSize += kv.heapSize();
if (kvs != null) {
kvs.add(kv);
}
}
if (cells == null) {
familyMap.put(entry.getKey(), entry.getValue());
}
}
if (cnt % 10 == 0) context.setStatus("Combine " + cnt);
if (curSize > threshold) {
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("Combined %d Put(s) into %d.", cnt, 1));
}
context.write(row, put);
put = null;
curSize = 0;
cnt = 0;
}
}
}
if (put != null) {
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("Combined %d Put(s) into %d.", cnt, 1));
}
context.write(row, put);
}
}
因为hbase 输出都是一个cell单元,如果一行记录包含多个列,就需要这个东西。将相同rowkey的数据放在一块。
对于reduce 根本不需,指定输出格式就行。然后就是位置。
这样Export 过程结束:
2)Import.java
这个刚好相反。需要关注reduce过程
public static Job createSubmittableJob(Configuration conf, String[] args)
throws IOException {
TableName tableName = TableName.valueOf(args[0]);
conf.set(TABLE_NAME, tableName.getNameAsString());
Path inputDir = new Path(args[1]);
Job job = new Job(conf, NAME + "_" + tableName);
job.setJarByClass(Importer.class);
FileInputFormat.setInputPaths(job, inputDir);
job.setInputFormatClass(SequenceFileInputFormat.class);
String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
// make sure we get the filter in the jars
try {
Class<? extends Filter> filter = conf.getClass(FILTER_CLASS_CONF_KEY, null, Filter.class);
if (filter != null) {
TableMapReduceUtil.addDependencyJarsForClasses(conf, filter);
}
} catch (Exception e) {
throw new IOException(e);
- }
- //这里直接写出kv文件。因为数据量大。按region分了
if (hfileOutPath != null && conf.getBoolean(HAS_LARGE_RESULT, false)) {
LOG.info("Use Large Result!!");
try (Connection conn = ConnectionFactory.createConnection(conf);
Table table = conn.getTable(tableName);
RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator);
job.setMapperClass(KeyValueSortImporter.class);
job.setReducerClass(KeyValueReducer.class);
Path outputDir = new Path(hfileOutPath);
FileOutputFormat.setOutputPath(job, outputDir);
job.setMapOutputKeyClass(KeyValueWritableComparable.class);
job.setMapOutputValueClass(KeyValue.class);
job.getConfiguration().setClass("mapreduce.job.output.key.comparator.class",
KeyValueWritableComparable.KeyValueWritableComparator.class,
RawComparator.class);
Path partitionsPath =
new Path(TotalOrderPartitioner.getPartitionFile(job.getConfiguration()));
FileSystem fs = FileSystem.get(job.getConfiguration());
fs.deleteOnExit(partitionsPath);
job.setPartitionerClass(KeyValueWritableComparablePartitioner.class);
job.setNumReduceTasks(regionLocator.getStartKeys().length);
TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
com.google.common.base.Preconditions.class);
}
//没有分区。
} else if (hfileOutPath != null) {
job.setMapperClass(KeyValueImporter.class);
try (Connection conn = ConnectionFactory.createConnection(conf);
Table table = conn.getTable(tableName);
RegionLocator regionLocator = conn.getRegionLocator(tableName)){
job.setReducerClass(KeyValueSortReducer.class);
Path outputDir = new Path(hfileOutPath);
FileOutputFormat.setOutputPath(job, outputDir);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(KeyValue.class);
HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator);
TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
com.google.common.base.Preconditions.class);
}
} else {
//这个直接调用内TableOutputFarmat。这样就直接调用的是put。这个少量还好,多了不行。
//具体的write见下面的代码
// No reducers. Just write straight to table. Call initTableReducerJob
// because it sets up the TableOutputFormat.
job.setMapperClass(Importer.class);
TableMapReduceUtil.initTableReducerJob(tableName.getNameAsString(), null, job);
job.setNumReduceTasks(0);
}
return job;
}
这个主要的就是标红的地方,定义reduce的个数,定义reduce的输出是按region来分区的。这样就ok了。
这里的partition也是按照startkey来区分的
private static KeyValueWritableComparable[] START_KEYS = null;
@Override
public int getPartition(KeyValueWritableComparable key, KeyValue value,
int numPartitions) {
for (int i = 0; i < START_KEYS.length; ++i) {
if (key.compareTo(START_KEYS[i]) <= 0) {
return i;
}
}
return START_KEYS.length;
}
}
@Override
public void write(KEY key, Mutation value)
throws IOException {
if (!(value instanceof Put) && !(value instanceof Delete)) {
throw new IOException("Pass a Delete or a Put");
}
mutator.mutate(value);
}
LoadIncrementalHFiles
public static Job createSubmittableJob(Configuration conf, String[] args)
throws IOException {
String tableName = args[0];
Path outputDir = new Path(args[1]);
Job job = new Job(conf, NAME + "_" + tableName);
job.setJobName(NAME + "_" + tableName);
job.setJarByClass(Export.class);
// 定义scan。主要根据配置是否需要设置fitle ,startkey,endkey等。
//简单 Scan s =new Scan()
Scan s = getConfiguredScanForJob(conf, args);
//这里会定义每一个region一个map。map的数量等于region的数量。这个map里面基本什么都没做就是读到的
//数据直接写出。
//这里会定义map的输入格式为
TableInputFormat.classIdentityTableMapper.initJob(tableName, s, IdentityTableMapper.class, job);
// No reducers. Just write straight to output files.
//直接保存数据。
job.setNumReduceTasks(0);
//输出文件
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Result.class);
FileOutputFormat.setOutputPath(job, outputDir); // job conf doesn't contain the conf so doesn't have a default fs.
return job;
}
@Override
public List<InputSplit> getSplits(JobContext context) throws IOException {
List<InputSplit> splits = super.getSplits(context);
if ((conf.get(SHUFFLE_MAPS) != null) && "true".equals(conf.get(SHUFFLE_MAPS).toLowerCase())) {
Collections.shuffle(splits);
}
return splits;
}
这里getSplits是根据regionLocationInfo ,分区当然是startkey。根据region的数量设置map的个数,这样就可以一个region
对应一个map了。当然这里没有设置,因为没必要。
在初始化map的时候设置了combinerClass为putCombiner
@Override
protected void reduce(K row, Iterable<Put> vals, Context context)
throws IOException, InterruptedException {
long threshold = context.getConfiguration().getLong(
"putcombiner.row.threshold", 1L * (1<<30));
int cnt = 0;
long curSize = 0;
Put put = null;
Map<byte[], List<Cell>> familyMap = null;
for (Put p : vals) {
cnt++;
if (put == null) {
put = p;
familyMap = put.getFamilyCellMap();
} else {
for (Entry<byte[], List<Cell>> entry : p.getFamilyCellMap()
.entrySet()) {
List<Cell> cells = familyMap.get(entry.getKey());
List<Cell> kvs = (cells != null) ? (List<Cell>) cells : null;
for (Cell cell : entry.getValue()) {
KeyValue kv = KeyValueUtil.ensureKeyValueTypeForMR(cell);
curSize += kv.heapSize();
if (kvs != null) {
kvs.add(kv);
}
}
if (cells == null) {
familyMap.put(entry.getKey(), entry.getValue());
}
}
if (cnt % 10 == 0) context.setStatus("Combine " + cnt);
if (curSize > threshold) {
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("Combined %d Put(s) into %d.", cnt, 1));
}
context.write(row, put);
put = null;
curSize = 0;
cnt = 0;
}
}
}
if (put != null) {
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("Combined %d Put(s) into %d.", cnt, 1));
}
context.write(row, put);
}
}
因为hbase 输出都是一个cell单元,如果一行记录包含多个列,就需要这个东西。将相同rowkey的数据放在一块。
对于reduce 根本不需,指定输出格式就行。然后就是位置。
这样Export 过程结束:
2)Import.java
这个刚好相反。需要关注reduce过程
public static Job createSubmittableJob(Configuration conf, String[] args)
throws IOException {
TableName tableName = TableName.valueOf(args[0]);
conf.set(TABLE_NAME, tableName.getNameAsString());
Path inputDir = new Path(args[1]);
Job job = new Job(conf, NAME + "_" + tableName);
job.setJarByClass(Importer.class);
FileInputFormat.setInputPaths(job, inputDir);
job.setInputFormatClass(SequenceFileInputFormat.class);
String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
// make sure we get the filter in the jars
try {
Class<? extends Filter> filter = conf.getClass(FILTER_CLASS_CONF_KEY, null, Filter.class);
if (filter != null) {
TableMapReduceUtil.addDependencyJarsForClasses(conf, filter);
}
} catch (Exception e) {
throw new IOException(e);
- }
- //这里直接写出kv文件。因为数据量大。按region分了
if (hfileOutPath != null && conf.getBoolean(HAS_LARGE_RESULT, false)) {
LOG.info("Use Large Result!!");
try (Connection conn = ConnectionFactory.createConnection(conf);
Table table = conn.getTable(tableName);
RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator);
job.setMapperClass(KeyValueSortImporter.class);
job.setReducerClass(KeyValueReducer.class);
Path outputDir = new Path(hfileOutPath);
FileOutputFormat.setOutputPath(job, outputDir);
job.setMapOutputKeyClass(KeyValueWritableComparable.class);
job.setMapOutputValueClass(KeyValue.class);
job.getConfiguration().setClass("mapreduce.job.output.key.comparator.class",
KeyValueWritableComparable.KeyValueWritableComparator.class,
RawComparator.class);
Path partitionsPath =
new Path(TotalOrderPartitioner.getPartitionFile(job.getConfiguration()));
FileSystem fs = FileSystem.get(job.getConfiguration());
fs.deleteOnExit(partitionsPath);
job.setPartitionerClass(KeyValueWritableComparablePartitioner.class);
job.setNumReduceTasks(regionLocator.getStartKeys().length);
TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
com.google.common.base.Preconditions.class);
}
//没有分区。
} else if (hfileOutPath != null) {
job.setMapperClass(KeyValueImporter.class);
try (Connection conn = ConnectionFactory.createConnection(conf);
Table table = conn.getTable(tableName);
RegionLocator regionLocator = conn.getRegionLocator(tableName)){
job.setReducerClass(KeyValueSortReducer.class);
Path outputDir = new Path(hfileOutPath);
FileOutputFormat.setOutputPath(job, outputDir);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(KeyValue.class);
HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator);
TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
com.google.common.base.Preconditions.class);
}
} else {
//这个直接调用内TableOutputFarmat。这样就直接调用的是put。这个少量还好,多了不行。
//具体的write见下面的代码
// No reducers. Just write straight to table. Call initTableReducerJob
// because it sets up the TableOutputFormat.
job.setMapperClass(Importer.class);
TableMapReduceUtil.initTableReducerJob(tableName.getNameAsString(), null, job);
job.setNumReduceTasks(0);
}
return job;
}
这个主要的就是标红的地方,定义reduce的个数,定义reduce的输出是按region来分区的。这样就ok了。
这里的partition也是按照startkey来区分的
private static KeyValueWritableComparable[] START_KEYS = null;
@Override
public int getPartition(KeyValueWritableComparable key, KeyValue value,
int numPartitions) {
for (int i = 0; i < START_KEYS.length; ++i) {
if (key.compareTo(START_KEYS[i]) <= 0) {
return i;
}
}
return START_KEYS.length;
}
}
@Override
public void write(KEY key, Mutation value)
throws IOException {
if (!(value instanceof Put) && !(value instanceof Delete)) {
throw new IOException("Pass a Delete or a Put");
}
mutator.mutate(value);
}
LoadIncrementalHFiles
hbase 源代码分析 (17)MapReduce 过程相关推荐
- HBase源代码分析之MemStore的flush发起时机、推断条件等详情(二)
在<HBase源代码分析之MemStore的flush发起时机.推断条件等详情>一文中,我们具体介绍了MemStore flush的发起时机.推断条件等详情.主要是两类操作.一是会引起Me ...
- 【Java】【Flume】Flume-NG源代码分析的启动过程(两)
本节分析配置文件的解析,即PollingPropertiesFileConfigurationProvider.FileWatcherRunnable.run中的eventBus.post(getCo ...
- Hadoop源代码分析(MapReduce概论)
大家都熟悉文件系统,在对HDFS进行分析前,我们并没有花很多的时间去介绍HDFS的背景,毕竟大家对文件系统的还是有一定的理解的,而且也有很好的文档.在分析Hadoop的MapReduce部分前,我们还 ...
- Fabric 1.0源代码分析(17)gossip(流言算法) #deliverclient(deliver客户端)
# Fabric 1.0源代码笔记 之 gossip(流言算法) #deliverclient(deliver客户端) ## 1.deliverclient概述 deliverclient代码分布在g ...
- Hadoop源代码分析 - MapReduce(转载)
1. Hadoop源代码分析(MapReduce概论) http://caibinbupt.javaeye.com/blog/336467
- Hadoop源代码分析
http://wenku.baidu.com/link?url=R-QoZXhc918qoO0BX6eXI9_uPU75whF62vFFUBIR-7c5XAYUVxDRX5Rs6QZR9hrBnUdM ...
- Android Applicaion组件创建的源代码分析(Android 9,含序列图)
Application组件源代码分析 1. Applicaion启动流程源代码分析 2. 启动过程中应用进程.系统服务进程通信的分界点 3. 组件生命周期与系统服务的关系 4. Application ...
- Android系统默认Home应用程序(Launcher)的启动过程源代码分析
在前面一篇文章中,我们分析了Android系统在启动时安装应用程序的过程,这些应用程序安装好之后,还需要有一个Home应用程序来负责把它们在桌面上展示出来,在Android系统中,这个默认的Home应 ...
- Android系统进程间通信(IPC)机制Binder中的Server启动过程源代码分析
原文地址: http://blog.csdn.net/luoshengyang/article/details/6629298 在前面一篇文章浅谈Android系统进程间通信(IPC)机制Binder ...
最新文章
- 如何用一句话证明你学过 NLP ?
- springmvc中action跳转
- malloc 和new 区别
- linux 环境下安装和配置mysql数据库以及远程登录
- asp.net 中ashx、axd的区别
- elementPlus关闭弹窗,页面原先滚动条消失
- 微型计算机组装实验报告虚拟,计算机硬件的组装实验报告.doc
- 一道说难不难的js题目
- 晨哥真有料丨聊天就在一起了,真有这么简单吗?
- 浪潮服务器u盘安装系统未找到任何驱动器,u盘重装win10时找不到任何驱动器
- Bithumb Global AMA丨Cred加速实现开放金融——打造区块链上蚂蚁金服
- Atitit 源码语句解析结构 目录 1.1. 栈帧(stack frame).每个独立的栈帧一般包括:	1 1.2. 局部变量表(Local Variable Table)	2 2. ref	2
- C语言:判断一个三位数是否为水仙花数
- Python程序员培训计划
- 天猫精灵连接蓝牙摸索1 关于阿里巴巴蓝牙MESH芯片TG7100B LINUX 开发环境塔建图文说明
- RCNN系列发展历程
- 关于AD18中Board information的位置更改
- 【命名规则】驼峰命名法
- Java实例教程(上)
- CHRE: 编译过程