
private long start;private long pos;private long end;private SplitLineReader in;private FSDataInputStream fileIn;private Seekable filePosition;private int maxLineLength;private LongWritable key;private Text value;private boolean isCompressedInput;private Decompressor decompressor;private byte[] recordDelimiterBytes;



public LineRecordReader() {}public LineRecordReader(byte[] recordDelimiter) {this.recordDelimiterBytes = recordDelimiter;}



 public void initialize(InputSplit genericSplit,TaskAttemptContext context) throws IOException {FileSplit split = (FileSplit) genericSplit;Configuration job = context.getConfiguration();this.maxLineLength = job.getInt(MAX_LINE_LENGTH, Integer.MAX_VALUE);start = split.getStart();end = start + split.getLength();final Path file = split.getPath();// open the file and seek to the start of the splitfinal FileSystem fs = file.getFileSystem(job);fileIn = fs.open(file);CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);if (null!=codec) {isCompressedInput = true;    decompressor = CodecPool.getDecompressor(codec);if (codec instanceof SplittableCompressionCodec) {final SplitCompressionInputStream cIn =((SplittableCompressionCodec)codec).createInputStream(fileIn, decompressor, start, end,SplittableCompressionCodec.READ_MODE.BYBLOCK);in = new CompressedSplitLineReader(cIn, job,this.recordDelimiterBytes);start = cIn.getAdjustedStart();end = cIn.getAdjustedEnd();filePosition = cIn;} else {in = new SplitLineReader(codec.createInputStream(fileIn,decompressor), job, this.recordDelimiterBytes);filePosition = fileIn;}} else {fileIn.seek(start);in = new UncompressedSplitLineReader(fileIn, job, this.recordDelimiterBytes, split.getLength());filePosition = fileIn;}// If this is not the first split, we always throw away first record// because we always (except the last split) read one extra line in// next() method.if (start != 0) {start += in.readLine(new Text(), 0, maxBytesToConsume(start));}this.pos = start;}

/** The position of the first byte in the file to process. */public long getStart() { return start; }/** The number of bytes in the file to process. */@Overridepublic long getLength() { return length; }


 public boolean nextKeyValue() throws IOException {if (key == null) {key = new LongWritable();}key.set(pos);if (value == null) {value = new Text();}int newSize = 0;// We always read one extra line, which lies outside the upper// split limit i.e. (end - 1)while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {if (pos == 0) {newSize = skipUtfByteOrderMark();} else {newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));pos += newSize;}if ((newSize == 0) || (newSize < maxLineLength)) {break;}// line too long. try againLOG.info("Skipped line of size " + newSize + " at pos " + (pos - newSize));}if (newSize == 0) {key = null;value = null;return false;} else {return true;}}


readline做啥的?返回值是个数字.从new Line读取的byte的数量
the number of bytes read including the (longest) newline found.


参数: str –存储给定行的对象(不包含换行符) maxLineLength –要存储到str中的最大字节数;
该行的其余部分将被静默丢弃。 maxBytesToConsume –在此调用中消耗的最大字节数。
这仅是一个提示,因为如果线越过该阈值,我们就允许它发生。 它可能会超出一个缓冲区长度。 返回值: 读取的字节数,包括找到的(最长)换行符。

总结下,如果new Size=0,就把key换value赋为null,同时返回false.就是没有下一对key和value.


public int readLine(Text str, int maxLineLength,int maxBytesToConsume) throws IOException {if (this.recordDelimiterBytes != null) {return readCustomLine(str, maxLineLength, maxBytesToConsume);} else {return readDefaultLine(str, maxLineLength, maxBytesToConsume);}}

* 1.缓冲区中没有换行符,因此我们需要复制一切,然后从流中读取另一个缓冲区。
* 2.明确终止的行在缓冲区中,因此我们只是复制到str。
* 3.含糊处的行在缓冲区中,即缓冲区结束
*在CR中。 在这种情况下,我们将所有内容复制到CR到str,

 /*** Read a line terminated by one of CR, LF, or CRLF.*/private int readDefaultLine(Text str, int maxLineLength, int maxBytesToConsume)throws IOException {/* We're reading data from in, but the head of the stream may be* already buffered in buffer, so we have several cases:* 1. No newline characters are in the buffer, so we need to copy*    everything and read another buffer from the stream.* 2. An unambiguously terminated line is in buffer, so we just*    copy to str.* 3. Ambiguously terminated line is in buffer, i.e. buffer ends*    in CR.  In this case we copy everything up to CR to str, but*    we also need to see what follows CR: if it's LF, then we*    need consume LF as well, so next call to readLine will read*    from after that.* We use a flag prevCharCR to signal if previous character was CR* and, if it happens to be at the end of the buffer, delay* consuming it until we have a chance to look at the char that* follows.*/str.clear();int txtLength = 0; //tracks str.getLength(), as an optimizationint newlineLength = 0; //length of terminating newlineboolean prevCharCR = false; //true of prev char was CRlong bytesConsumed = 0;do {int startPosn = bufferPosn; //starting from where we left off the last timeif (bufferPosn >= bufferLength) {startPosn = bufferPosn = 0;if (prevCharCR) {++bytesConsumed; //account for CR from previous read}bufferLength = fillBuffer(in, buffer, prevCharCR);if (bufferLength <= 0) {break; // EOF}}for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newlineif (buffer[bufferPosn] == LF) {newlineLength = (prevCharCR) ? 2 : 1;++bufferPosn; // at next invocation proceed from following bytebreak;}if (prevCharCR) { //CR + notLF, we are at notLFnewlineLength = 1;break;}prevCharCR = (buffer[bufferPosn] == CR);}int readLength = bufferPosn - startPosn;if (prevCharCR && newlineLength == 0) {--readLength; //CR at the end of the buffer}bytesConsumed += readLength;int appendLength = readLength - newlineLength;if (appendLength > maxLineLength - txtLength) {appendLength = maxLineLength - txtLength;}if (appendLength > 0) {str.append(buffer, startPosn, appendLength);txtLength += appendLength;}} while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);if (bytesConsumed > Integer.MAX_VALUE) {throw new IOException("Too many bytes before newline: " + bytesConsumed);}return (int)bytesConsumed;}

LineRecordReader 利用LineReader的readline方法读取每一行数据,默认碰到换行符就转化为key,value.LineRecordReader负责把inputSplit转化为kv对.具体什么时候调用的呢?

然后这里只是判断有没有下一个keyvalue,那么下一个key value是啥,是在哪里设置的呢?
key value在哪里设置


 if (appendLength > 0) {str.append(buffer, startPosn, appendLength);txtLength += appendLength;}

