这篇文章主要讲解了“hadoop中的recordreader和split以及block的关系是怎样的”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“hadoop中的recordreader和split以及block的关系是怎样的”吧!
recordreader的作用不言而喻。
通常来讲,Inputformat会为没有一个split产生一个recordreader来提供给maptask使用,进而,MapTask能够读取属于自己管辖处理的那部分split。
这里面,我们以linerecordreader为例子进行讲解:
几个核心的方法
定义了linerecordreader基本的作用,即,是否有下一对kv,获得下一个key,获得下一个value。
而这个三个方法的使用地方如下。
暂时忽略...
因为文件在hdfs上分块存放的,那么split和block什么鬼?为啥不直接按照block去处理就行了呗。原因呢,是block中的数据可能不是连续的。可能某个重要的信息被两个block分隔了。因此,我们使用逻辑上的概念,即split来处理。
而split并不是真的将文件split了...而是逻辑上的标记下start,length,filepath等即可。
根据Path,可以过得到FileSystem
final Path file = split.getPath();
// open the file and seek to the start of the split
final FileSystem fs = file.getFileSystem(job);
每个maptask呢,都会使用使用一个linerecordreader,处理对应的split,中间通过了
private FSDataInputStream fileIn;
来维护一个流。
切记:这里的流并不是只针对这个split的,我们之前说过,split只是标记而已,没有分隔。
因此,这个流fileIn其实是指向整个文件的。
并且呢,这个流呢,会实现jdk中标准的方法,啥read啊之类的。读取到缓冲区中,但是如果涉及到不同的block呢,这个流会自动帮我们去找对应的block的,这个太复杂。反正记住fileIn屏蔽了顶层的不同block之前的切换,对我们来讲就像处理一个大的文件一样。
既然是流,那么就能够定位了,因此,不同的maptask就可以根据自己的split中的start位置,通过fileIn流直接定位到要处理文件的那个地方。
fileIn.seek(start); in = new SplitLineReader(fileIn, job, this.recordDelimiterBytes); filePosition = fileIn;
可以看到其中的in对象,是借助fileIn生成,相比,in内部一定借助了这个fileIn流来实现某个功能。
典型的,readLine,
in对象负责一行的读取逻辑,,而fileIn则负责从文件读取字符到byte缓冲区。
readline函数,最终会有一个这样的抵用,可以看到
bufferLength = fillBuffer(in, buffer, prevCharCR);
调用fillbuffer函数,从in.read()中读取东西到buffer中,
private int readDefaultLine(Text str, int maxLineLength, int maxBytesToConsume) throws IOException { /* We're reading data from in, but the head of the stream may be * already buffered in buffer, so we have several cases: * 1. No newline characters are in the buffer, so we need to copy * everything and read another buffer from the stream. * 2. An unambiguously terminated line is in buffer, so we just * copy to str. * 3. Ambiguously terminated line is in buffer, i.e. buffer ends * in CR. In this case we copy everything up to CR to str, but * we also need to see what follows CR: if it's LF, then we * need consume LF as well, so next call to readLine will read * from after that. * We use a flag prevCharCR to signal if previous character was CR * and, if it happens to be at the end of the buffer, delay * consuming it until we have a chance to look at the char that * follows. */ str.clear(); int txtLength = 0; //tracks str.getLength(), as an optimization int newlineLength = 0; //length of terminating newline boolean prevCharCR = false; //true of prev char was CR long bytesConsumed = 0; do { int startPosn = bufferPosn; //starting from where we left off the last time if (bufferPosn >= bufferLength) { startPosn = bufferPosn = 0; if (prevCharCR) { ++bytesConsumed; //account for CR from previous read } bufferLength = fillBuffer(in, buffer, prevCharCR); if (bufferLength <= 0) { break; // EOF } } for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline if (buffer[bufferPosn] == LF) { newlineLength = (prevCharCR) ? 2 : 1; ++bufferPosn; // at next invocation proceed from following byte break; } if (prevCharCR) { //CR + notLF, we are at notLF newlineLength = 1; break; } prevCharCR = (buffer[bufferPosn] == CR); } int readLength = bufferPosn - startPosn; if (prevCharCR && newlineLength == 0) { --readLength; //CR at the end of the buffer } bytesConsumed += readLength; int appendLength = readLength - newlineLength; if (appendLength > maxLineLength - txtLength) { appendLength = maxLineLength - txtLength; } if (appendLength > 0) { str.append(buffer, startPosn, appendLength); txtLength += appendLength; } } while (newlineLength == 0 && bytesConsumed < maxBytesToConsume); if (bytesConsumed > Integer.MAX_VALUE) { throw new IOException("Too many bytes before newline: " + bytesConsumed); } return (int)bytesConsumed; }
OK,那之后的linerecordreader三个主要的方法就简答了,读取就行了。略屌。
但是,有一个问题还没说。即一行信息如果被某个block分隔了咋办。
或者这个问题,这样说,我们知道Inputformat中的getSplit方法呢,就是根据文件的length等属性直接划分split的。
参照FileInputformat的getSplits方法
那么一行数据,可能在不同的splits中,也可能在不同的block中。
在不同的block中呢,这个有fileIn对象帮我们处理的了,主要是读取read到缓冲区,属于物理上的问题,不是考虑的地方。
处于不同的split呢?这个情况有些问题,因为不同的split就是不同的划分,并且由不同的map task执行。
那么我们recordreader如何解决这个问题呢?
解决办法便是,突破split的start和end限制。
linerecordreader的解决办法:
只不start指向的位置不是文件的第一行,则默认的过滤掉一行(start位置可能是一行中的某一个位置)。
initialize()方法
if (start != 0) { start += in.readLine(new Text(), 0, maxBytesToConsume(start)); } this.pos = start;
在nextKeyvalue方法中,多读取一些数据,补充完整的一行。
while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) { if (pos == 0) { newSize = skipUtfByteOrderMark(); } else { newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos)); pos += newSize; } if ((newSize == 0) || (newSize < maxLineLength)) { break; } // line too long. try again LOG.info("Skipped line of size " + newSize + " at pos " + (pos - newSize)); }
OK,通过过滤掉一行,和多读取一行,就能保证被split分隔的一行,能够完成的读取,同时也不会重复处理一些数据。因为,所有的mapTask的linerecordreader都遵循这个方法。
感谢各位的阅读,以上就是“hadoop中的recordreader和split以及block的关系是怎样的”的内容了,经过本文的学习后,相信大家对hadoop中的recordreader和split以及block的关系是怎样的这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是亿速云,小编将为大家推送更多相关知识点的文章,欢迎关注!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。