一:Hadoop内置的数据类型。

Hadoop提供如下内置的数据类型,这些数据类型都实现了WritableComparable接口,以便用这些类型定义的数据可以被序列化进行网络传输和文件存储,以及进行大小比较。

BooleanWritable 标准布尔型数值
ByteWritable 单字节数值
DoubleWritable 双字节数
FloatWritable 浮点数
IntWritable 整型数
LongWritable 长整型数
Text 使用UTF-8格式存储的文本
NullWritable 当<key,value>中的key或value为空时使用
//简单知道这些类型
IntWritable iw = new IntWritable(1);
System.out.println(  iw.get() );  // 1 BooleanWritable bw = new BooleanWritable(true);
System.out.println(  bw.get() );  // true

二:Hadoop-用户自定义的数据类型。

自定义数据类型时,需满足两个基本要求,即

1.实现Writable接口,以便该数据能被序列化后完成网络传输或文件输入/输出。

2.如果该数据需要作为主键key使用,或需要比较数值大小时,则需要实现WritableComparable接口。

//Hadoop2.6.4版 - Writable源码:
public interface Writable {void write(DataOutput out) throws IOException;void readFields(DataInput in) throws IOException;}
public interface WritableComparable<T> extends Writable, Comparable<T> {}

三:Hadoop内置的数据输入格式和RecordReader。

数据输入格式(InputFormat)用于描述MapReduce作业的数据输入规范。MapReduce框架依靠数据输入格式完成输入规范检查、对数据文件进行输入分块(InputSplit),以及提供从输入分块中将数据记录逐一读出、并转换为Map过程的输入键值对等功能。

Hadoop提供了丰富的内置数据输入格式,最常用的数据输入格式包括:TextInputFormat 和 KeyValueInputFormat。

TextInputFormat是系统默认的数据输入格式,可以将文本文件分块并逐行读入以便Map节点进行处理。读入一行时,所产生的主键key就是当前行在整个文本文件中的字节偏移位置,而value就是该行的内容。

//TextInputFormat部分源码:
public class TextInputFormat extends FileInputFormat<LongWritable, Text> {@Overridepublic RecordReader<LongWritable, Text> createRecordReader(InputSplit split,TaskAttemptContext context) {String delimiter = context.getConfiguration().get("textinputformat.record.delimiter");byte[] recordDelimiterBytes = null;if (null != delimiter)recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);return new LineRecordReader(recordDelimiterBytes);}//....
}

KeyValueTextInputFormat是另一个常用的数据输入格式,可将一个按照<key,value>格式逐行存放的文本文件逐行读出,并自动解析生成相应的key和value。

//KeyValueTextInputFormat部分源码:
public class KeyValueTextInputFormat extends FileInputFormat<Text, Text> {// ...public RecordReader<Text, Text> createRecordReader(InputSplit genericSplit,TaskAttemptContext context) throws IOException {context.setStatus(genericSplit.toString());return new KeyValueLineRecordReader(context.getConfiguration());}}

RecordReader:对于一个数据输入格式,都需要有一个对应的RecordReader,主要用于将一个文件中的数据记录拆分成具体的键值对。TextInputFormat的默认RecordReader是LineRecordReader,而KeyValueTextInputFormat的默认RecordReader是KeyValueLineRecordReader。

四:Hadoop内置的数据输出格式与RecordWriter。

数据输出格式(OutputFormat)用于描述MapReduce作业的数据输出规范。MapReduce框架依靠数据输出格式完成输出规范检查以及提供作业结果数据输出功能。

同样,最常用的数据输出格式是TextOutputFormat,也是系统默认的数据输出格式,可以将计算结果以 “key + \t + vaue”的形式逐行输出到文本文件中。

与数据输入格式类似样,数据输出格式也提供一个对应的RecordWriter,以便系统明确输出结果写入到文件中的具体格式。TextInputFormat的默认RecordWriter是LineRecordWriter,其实际操作是将结果数据以“key + \t + value”的形式输出到文本文件中。

//TextOutputFormat的部分源码:
public class TextOutputFormat<K, V> extends FileOutputFormat<K, V> {protected static class LineRecordWriter<K, V> extends RecordWriter<K, V> {// ...public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {//...}public LineRecordWriter(DataOutputStream out) {this(out, "\t");}private void writeObject(Object o) throws IOException {// ...}public synchronized void write(K key, V value) throws IOException {//...out.write(newline);}}public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {// ...}
}

五:通过打印输出UserInfo小例子来实现简单的用户自定义数据类型,数据输入格式,数据输出格式。 (简单的说就是模仿源码,基本上没多大变化)。

以下附上案例源码:

1.定义自己的UserInfo,作为数据类型。

package com.hadoop.mapreduce.test4.outputformat;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;public class UserInfo implements WritableComparable<UserInfo> {private int id;private String name;private int age;private String sex;private String address;public UserInfo() {}public UserInfo(int id, String name, int age, String sex, String address) {this.id = id;this.name = name;this.age = age;this.sex = sex;this.address = address;}// JavaBean 普通的get set方法....@Overridepublic void readFields(DataInput in) throws IOException {this.id = in.readInt();this.name = in.readUTF();this.age = in.readInt();this.sex = in.readUTF();this.address = in.readUTF();}@Overridepublic void write(DataOutput out) throws IOException {out.writeInt(id);out.writeUTF(name);out.writeInt(age);out.writeUTF(sex);out.writeUTF(address);}@Overridepublic String toString() {return "Id:" + id + ", Name:" + name + ", Age:" + age + ", Sex:" + sex + ", Address:" + address ;}@Overridepublic int compareTo(UserInfo userInfo) {return 0;}
}

2.定制自己的数据输入格式:UserInfoTextInputFormat。

package com.hadoop.mapreduce.test4.outputformat;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;public class UserInfoTextInputFormat extends FileInputFormat<Text, UserInfo> {@Overridepublic RecordReader<Text, UserInfo> createRecordReader(InputSplit split, TaskAttemptContext context)throws IOException, InterruptedException {context.setStatus(split.toString());UserInfoRecordReader userInforRecordReader = new UserInfoRecordReader(context.getConfiguration() );return userInforRecordReader;}
}

3.定制自己的RecordReader:UserInfoRecordReader。

package com.hadoop.mapreduce.test4.outputformat;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;public class UserInfoRecordReader extends RecordReader<Text, UserInfo> {public static final String KEY_VALUE_SEPERATOR = "mapreduce.input.keyvaluelinerecordreader.key.value.separator";private final LineRecordReader lineRecordReader;private byte separator = (byte) '\t';private Text innerValue;private Text key;private UserInfo value;public Class getKeyClass() { return Text.class;}public UserInfoRecordReader(Configuration conf)throws IOException {lineRecordReader = new LineRecordReader();String sepStr = conf.get(KEY_VALUE_SEPERATOR,"\t");this.separator = (byte) sepStr.charAt(0);}public void initialize(InputSplit genericSplit,TaskAttemptContext context) throws IOException {lineRecordReader.initialize(genericSplit, context);}public static int findSeparator(byte[] utf, int start, int length, byte sep) {for (int i = start; i < (start + length); i++) {if (utf[i] == sep) {return i;}}return -1; //将这个截取标识符的位置给返回回去。}public static void setKeyValue(Text key, UserInfo value, byte[] line,int lineLen, int pos) {if (pos == -1) {key.set(line, 0, lineLen);value.setId(0);value.setName("");value.setAge(0);value.setSex("");value.setAddress("");} else {key.set(line, 0, pos); //设置键  从 第 0位置 到 截取标识符的位置Text text = new Text();text.set(line, pos + 1, lineLen - pos - 1);System.out.println("text的值: "+text);String[] str = text.toString().split(",");for (int i=0;i<str.length;i++) {//System.out.println("根据逗号分隔开来的值:  " + str[i] );String[] strKeyValue = str[i].split(":");//System.out.println("strKeyValue的Key-Value:" + key+"--->"+value);if("ID".equals(strKeyValue[0])){value.setId(Integer.parseInt( strKeyValue[1]) );}else if("Name".equals(strKeyValue[0])){value.setName( strKeyValue[1]);}else if("Age".equals(strKeyValue[0])){value.setAge(Integer.parseInt( strKeyValue[1]) );}else if("Sex".equals(strKeyValue[0])){value.setSex(strKeyValue[1] );}else if("Address".equals(strKeyValue[0])){value.setAddress(strKeyValue[1] );}}
//          System.out.println( "key--> " + key);
//          System.out.println( "value--> "+value +"\n\n");}}public synchronized boolean nextKeyValue()throws IOException {byte[] line = null;int lineLen = -1;if (key == null) {key = new Text();}if (value == null) {value = new UserInfo(); }if (lineRecordReader.nextKeyValue()) {innerValue = lineRecordReader.getCurrentValue();line = innerValue.getBytes();lineLen = innerValue.getLength();} else {return false;}if (line == null){return false;}int pos = findSeparator(line, 0, lineLen, this.separator);setKeyValue(key, value, line, lineLen, pos);return true;}public Text getCurrentKey() {return key;}public UserInfo getCurrentValue() {return value;}public float getProgress() throws IOException {return lineRecordReader.getProgress();}public synchronized void close() throws IOException { lineRecordReader.close();}}

3.定制自己的输出格式:UserInfoTextOutputFormat。

package com.hadoop.mapreduce.test4.outputformat;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;public class UserInfoTextOutputFormat<K, V> extends FileOutputFormat<K, V> {public static String SEPERATOR = "mapreduce.output.textoutputformat.separator";protected static class LineRecordWriter<K, V> extends RecordWriter<K, V> {private static final String utf8 = "UTF-8";private static final byte[] newline;static {try {newline = "\n".getBytes(utf8);//System.out.println(  "newline --> " + newline);} catch (UnsupportedEncodingException uee) {throw new IllegalArgumentException("can't find " + utf8 + " encoding");}}protected DataOutputStream out;private final byte[] keyValueSeparator;public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {this.out = out;try {this.keyValueSeparator = keyValueSeparator.getBytes(utf8);} catch (UnsupportedEncodingException uee) {throw new IllegalArgumentException("can't find " + utf8 + " encoding");}}public LineRecordWriter(DataOutputStream out) {this(out, "\t");}private void writeObject(Object o) throws IOException {if (o instanceof Text) {Text to = (Text) o;System.out.println(  "o instanceof Text  --> True : "+ to.toString()  );out.write(to.getBytes(), 0, to.getLength());} else {out.write(o.toString().getBytes(utf8));System.out.println( "o instanceof Text  --> false : "+ o.toString()  );}}public synchronized void write(K key, V value) throws IOException {boolean nullKey = key == null || key instanceof NullWritable;boolean nullValue = value == null || value instanceof NullWritable;System.out.println(  "nullKey--> "+nullKey +" ,  nullValue--> "+nullValue);if (nullKey && nullValue) {return;}System.out.println( " nullkey --> "+ nullKey + ", !nullkey -->"+nullKey);if (!nullKey) {writeObject(key);}System.out.println( "(nullKey || nullValue) --> " + (nullKey || nullValue) );if (!(nullKey || nullValue)) {out.write(keyValueSeparator);}if (!nullValue) {writeObject(value);}out.write(newline);}public synchronized void close(TaskAttemptContext context) throws IOException {out.close();}}public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {Configuration conf = job.getConfiguration();boolean isCompressed = getCompressOutput(job);String keyValueSeparator= conf.get(SEPERATOR, "---->");System.out.println(  "keyValueSeparator---> "+keyValueSeparator);CompressionCodec codec = null;String extension = "";if (isCompressed) {Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job, GzipCodec.class);codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);extension = codec.getDefaultExtension();}Path file = getDefaultWorkFile(job, extension);System.out.println(  "file --> Path : "+ file  );FileSystem fs = file.getFileSystem(conf);if (!isCompressed) {FSDataOutputStream fileOut = fs.create(file, false);System.out.println( "if---isCompressed-->: "+fileOut);return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);} else {FSDataOutputStream fileOut = fs.create(file, false);System.out.println( "else---isCompressed-->: "+fileOut);return new LineRecordWriter<K, V>(new DataOutputStream(codec.createOutputStream(fileOut)),keyValueSeparator);}}
}

5.测试类:PrintUserInfo

package com.hadoop.mapreduce.test4.outputformat;import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;public class PrintUserInfo {public static final IntWritable ONE = new IntWritable(1);public static class UserInfoMapper extends Mapper<Text, UserInfo, Text, UserInfo>{@Overrideprotected void map(Text key, UserInfo value, Mapper<Text, UserInfo, Text, UserInfo>.Context context)throws IOException, InterruptedException {super.map(key, value, context);}}public static void main(String[] args) {try {Configuration conf = new Configuration();Job job = Job.getInstance(conf, "UserInfo");job.setJarByClass(PrintUserInfo.class);job.setMapperClass(UserInfoMapper.class);//定制输入格式:job.setInputFormatClass(UserInfoTextInputFormat.class);//定制输出格式:job.setOutputFormatClass(UserInfoTextOutputFormat.class);job.setMapOutputKeyClass(Text.class);//用的自己定义的数据类型job.setMapOutputValueClass(UserInfo.class);FileInputFormat.addInputPath(job, new Path("hdfs://192.168.226.129:9000/rootdir/mapuserinfo.txt"));FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.226.129:9000/rootdir/data/output7/"+System.currentTimeMillis()+"/"));System.exit(job.waitForCompletion(true) ? 0 : 1);//执行job} catch (IOException e) {e.printStackTrace();} catch (ClassNotFoundException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}
}

6.输出结果:

1.数据文件:

1  ID:221,Name:xj,Age:22,Sex:man,Address:hunan,
2   ID:222,Name:cc,Age:21,Sex:Woman,Address:miluo,

2.结果文件:

1---->Id:221, Name:xj, Age:22, Sex:man, Address:hunan
2---->Id:222, Name:cc, Age:21, Sex:Woman, Address:miluo

了解Hadoop数据类型,输入输出格式及用户如何自定义。相关推荐

  1. c语言基本的数据类型输入,C语言基本数据类型输入输出格式

    C语言基本数据类型输入输出格式 C语言基本数据类型输入输出格式 首先看一下C语言的基本数据类型 在C语言中,用int关键字来表示基本的整数类型.后3个关键字(long.short和unsigned)和 ...

  2. C语言基本数据类型输入输出格式

    首先看一下C语言的基本数据类型 在C语言中,用int关键字来表示基本的整数类型.后3个关键字(long.short和unsigned)和C90新增的signed用于提供基本整数类型的变式,例如unsi ...

  3. Python零基础入门(2)——常用的快捷命令、数据类型、输入输出格式、变量介绍

    1.常用快捷名命令 pycharm设置界面(ctrl + alt + s) 快速创建文件(alt + insert) 格式化python代码(ctrl + alt + l) 快速注释代码(ctrl + ...

  4. SAS的基本使用介绍2(变量的输入输出格式)

    前言 有关SAS的基础使用可以回看上一篇 SAS的基本使用介绍1(数据集建立与输入输出格式) 本文将继续介绍SAS基本使用 自定义格式 SAS的灵活之处在于可自定义输入输出 例如:在输入性别时,输入1 ...

  5. C++STL开发温习与总结(六): 6.C++语言输入/输出流定义之输入/输出格式控制

    原博主博客地址:http://blog.csdn.net/qq21497936 本文章博客地址:http://mp.blog.csdn.net/postedit/79177645 C++STL开发温习 ...

  6. C++基础知识(二)--左值右值--逻辑表达式求值优化--逗号运算符与表示式--输入输出格式控制...

    :一.C++左值右值概念 左值:c++将变量名代表的单元称为左值,而将变量的值称为右值,左值必须是内存中可以访问且可以合法修改的对象,因此只能是变量名,而不能是常量或表达式.即左值可以寻址. 右值:将 ...

  7. [ACM训练] ACM中巧用文件的输入输出来改写acm程序的输入输出 + ACM中八大输入输出格式...

    ACM中巧用文件的输入输出来改写acm程序的输入输出 经常有见大神们使用文件来代替ACM程序中的IO,尤其是当程序IO比较复杂时,可以使自己能够更专注于代码的测试,而不是怎样敲输入. C/C++代码中 ...

  8. java循环输入直到,使用循环接受其他用户输入,直到用户输入结束输入的值

    我是Java的新手 . 我需要一些帮助,使用循环接受其他用户输入,直到用户输入结束输入的值 . 我的问题从语句"System.out.println("你完成了吗?输入大写的Y / ...

  9. c语言讲输入退回缓冲区_开始之前的结束-如何不退回输入错误的用户电子邮件...

    c语言讲输入退回缓冲区 by Alex Peterson 通过亚历克斯·彼得森 开始之前的结束-如何不退回输入错误的用户电子邮件 (Over before it started - how to no ...

  10. 编写两个函数分别求两个数的最大公约数和最小公倍数,用主函数调用这两个函数,并输出结果,两个数由键盘输入。 输入输出格式示例: 输入:24 16 输出:zdgys=8,zxgbs=48

    编写两个函数分别求两个数的最大公约数和最小公倍数,用主函数调用这两个函数,并输出结果,两个数由键盘输入. 输入输出格式示例: 输入:24 16 输出:zdgys=8,zxgbs=48 #include ...

最新文章

  1. Mapreduce 任务提交源码分析1
  2. Android应用开发-所有课堂代码
  3. 自然语言处理之词向量技术(二)
  4. GIT常用命令--拉取提交
  5. 【ArcObject开发】实验:ArcGIS Desktop开发方式入门基础教程
  6. 一入前端深似海,从此红尘是路人系列第七弹之孤独的剑客-单例模式
  7. 如何在JavaScript中大写字符串的首字母
  8. 动态代理:jdk和cglib区别
  9. python3.6安装cv2库_win10 +python3.6环境下安装opencv以及pycharm导入cv2有问题的解决办法...
  10. 浅谈端上智能之计算优化
  11. 平台式惯性导航系统简介(持续更新ing)
  12. chrome遭劫持7654网站,怎么办?
  13. Google Earth Engine APPS(GEE)—— Landsat 数据的时间序列分析来监测森林转化和退化 (CODED)整体框架(万字长文)
  14. 制图中比例尺的一些问题
  15. 已知一个字典包含若干员工信息,姓请编写一个函数,删除性别为男的员工信息
  16. 清华计算机科学四字班,清华大学里四个特殊班
  17. Ubuntu 10.4 .安装Cairo-dock
  18. CFile用法与实现
  19. Free IPA docker 安装记录
  20. 大数据可视化陈为智慧树_智慧树知到_大数据可视化_答案章节单元测试答案

热门文章

  1. 阶段3 2.Spring_03.Spring的 IOC 和 DI_7 spring中bean的细节之作用范围
  2. 阶段3 1.Mybatis_05.使用Mybatis完成CRUD_9 Mybatis中的返回值深入-解决实体类属性和数据库列名不对应的两种方式...
  3. web-4. 装饰页面的图像
  4. 数组中元素的动态增加和删除
  5. IntelliJ IDEA设置不自动打开最后关闭的项目
  6. uva11549Calculator Conundrum
  7. 自动把动态的jsp页面(或静态html)生成PDF文档,并且上传至服务器
  8. 小程序或者APP 自行使用TOKEN 实现登录会话保持
  9. 中国经济真相:跑了 1135 家制造企业,我终于明白什么叫自己玩死自己(转)...
  10. Leetcode207---课程表(逆拓扑排序)