在 《HDFS 块和 Input Splits 的区别与联系》 文章中介绍了HDFS 块和 Input Splits 的区别与联系,其中并没有涉及到源码级别的描述。为了补充这部分,这篇文章将列出相关的源码进行说明。看源码可能会比直接看文字容易理解,毕竟代码说明一切。
为了简便起见,这里只描述 TextInputFormat 部分的读取逻辑,关于写 HDFS 块相关的代码请参见网上其他人编写的文章。在阅读下面源码之前,看下下图图中的内容,以便能够深入的理解代码的含义。
在初始化 LineRecordReader 的时候,如果 FileSplit 的起始位置 start 不等于0, 说明这个 Block 块不是第一个 Block,这时候一律丢掉这个 Block 的第一行数据。具体参见下面代码:
public void initialize(InputSplit genericSplit,
TaskAttemptContext context) throws IOException {
FileSplit split = (FileSplit) genericSplit;
Configuration job = context.getConfiguration();
this.maxLineLength = job.getInt(MAX_LINE_LENGTH, Integer.MAX_VALUE);
start = split.getStart();
end = start + split.getLength();
final Path file = split.getPath();
// open the file and seek to the start of the split
final FileSystem fs = file.getFileSystem(job);
fileIn = fs.open(file);
CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);
if (null!=codec) {
// 处理压缩文件,我们不考虑
} else {
fileIn.seek(start);
if (null == this.recordDelimiterBytes){
in = new LineReader(fileIn, job);
} else {
in = new LineReader(fileIn, job, this.recordDelimiterBytes);
}
filePosition = fileIn;
}
// 如果不是第一个 Split,直接丢掉第一行数据的内容。
// 因为这一行的数据已经被上一个 split 读取了。
if (start != 0) {
start += in.readLine(new Text(), 0, maxBytesToConsume(start));
}
this.pos = start;
}
初始化完 LineRecordReader,程序就可以一行一行地读取 HDFS block 里面的内容了,这个实现主要调用 LineRecordReader 里面 nextKeyValue 的方法:
public boolean nextKeyValue() throws IOException {
if (key == null) {
key = new LongWritable();
}
key.set(pos);
if (value == null) {
value = new Text();
}
int newSize = 0;
// 这里每次会多读取一行的数据,注意 getFilePosition() <= end,
// 这里的 <= ,只有这样,才能保证每次会从下一个 Block 中多读取
// 一行的数据。
while (getFilePosition() <= end) {
newSize = in.readLine(value, maxLineLength,
Math.max(maxBytesToConsume(pos), maxLineLength));
pos += newSize;
if (newSize < maxLineLength) {
break;
}
// line too long. try again
LOG.info("Skipped line of size " + newSize + " at pos " +
(pos - newSize));
}
if (newSize == 0) {
key = null;
value = null;
return false;
} else {
return true;
}
}
readLine 的实现如下:
public int readLine(Text str, int maxLineLength,
int maxBytesToConsume) throws IOException {
if (this.recordDelimiterBytes != null) {
return readCustomLine(str, maxLineLength, maxBytesToConsume);
} else {
return readDefaultLine(str, maxLineLength, maxBytesToConsume);
}
}
如果用户指定了 textinputformat.record.delimiter 参数,说明用户自定义数据行分隔符,则程序会调用 readCustomLine 函数;否则调用 readDefaultLine,这时候行的分隔符是 \n。readCustomLine 和 readDefaultLine 的处理逻辑很类似,这里只列出 readDefaultLine 的实现:
private int readDefaultLine(Text str, int maxLineLength, int maxBytesToConsume)
throws IOException {
/* We're reading data from in, but the head of the stream may be
* already buffered in buffer, so we have several cases:
* 1. No newline characters are in the buffer, so we need to copy
* everything and read another buffer from the stream.
* 2. An unambiguously terminated line is in buffer, so we just
* copy to str.
* 3. Ambiguously terminated line is in buffer, i.e. buffer ends
* in CR. In this case we copy everything up to CR to str, but
* we also need to see what follows CR: if it's LF, then we
* need consume LF as well, so next call to readLine will read
* from after that.
* We use a flag prevCharCR to signal if previous character was CR
* and, if it happens to be at the end of the buffer, delay
* consuming it until we have a chance to look at the char that
* follows.
*/
str.clear();
int txtLength = 0; //tracks str.getLength(), as an optimization
int newlineLength = 0; //length of terminating newline
boolean prevCharCR = false; //true of prev char was CR
long bytesConsumed = 0;
do {
int startPosn = bufferPosn; //starting from where we left off the last time
if (bufferPosn >= bufferLength) {
startPosn = bufferPosn = 0;
if (prevCharCR) {
++bytesConsumed; //account for CR from previous read
}
bufferLength = in.read(buffer);
if (bufferLength <= 0) {
break; // EOF
}
}
for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
if (buffer[bufferPosn] == LF) {
newlineLength = (prevCharCR) ? 2 : 1;
++bufferPosn; // at next invocation proceed from following byte
break;
}
if (prevCharCR) { //CR + notLF, we are at notLF
newlineLength = 1;
break;
}
prevCharCR = (buffer[bufferPosn] == CR);
}
int readLength = bufferPosn - startPosn;
if (prevCharCR && newlineLength == 0) {
--readLength; //CR at the end of the buffer
}
bytesConsumed += readLength;
int appendLength = readLength - newlineLength;
if (appendLength > maxLineLength - txtLength) {
appendLength = maxLineLength - txtLength;
}
if (appendLength > 0) {
str.append(buffer, startPosn, appendLength);
txtLength += appendLength;
}
// 没有找到换行符,这时候需要再调用 in.read(buffer); 再从文件中读取部分数据。
} while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);
if (bytesConsumed > (long)Integer.MAX_VALUE) {
throw new IOException("Too many bytes before newline: " + bytesConsumed);
}
return (int)bytesConsumed;
}
当外部程序调用这个读取函数的时候,每次会读取大约 64kb 的数据,并存放在 buffer 数组中,缓存区的大小可通过 io.file.buffer.size 参数配置。读取每个 Block 的时候都会从下一个 Block 多读取一行的数据,也就是说 in.read(buffer); 操作会读取两个 block 的数据。
原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【HDFS 块和 Input Splits 的区别与联系(源码版)】(https://www.iteblog.com/archives/2366.html)


