在map执行之前,需要将数据进行切片,每个切片对应一个map任务。而每个map任务并不是直接处理这些切片数据的,它是处理KV的。所以问题有两个:数据是如何切片的、切片是如何转为KV给map处理的。
这就涉及到两个抽象类,InputFormat以及 RecordReader。具体为什么是这两个抽象类,请看之前input的源码分析
public abstract class InputFormat<K, V> {
public InputFormat() {
}
public abstract List<InputSplit> getSplits(JobContext var1) throws IOException, InterruptedException;
public abstract RecordReader<K, V> createRecordReader(InputSplit var1, TaskAttemptContext var2) throws IOException, InterruptedException;
}
我们看到,这个抽象类就两个方法
getSplits:看名字就知道是用来将数据处理成切片的了
createRecordReader:就是用来创建RecordReader对象的。
所以这就是一个InputFormat基本的功能
public abstract class RecordReader<KEYIN, VALUEIN> implements Closeable {
public RecordReader() {
}
//初始化,一般就是读取切片的数据
public abstract void initialize(InputSplit var1, TaskAttemptContext var2) throws IOException, InterruptedException;
//检查是否还有下一对KV,并且如果有,实际上会将其处理成KV,并赋值给this.key和this.value
public abstract boolean nextKeyValue() throws IOException, InterruptedException;
//返回一个key
public abstract KEYIN getCurrentKey() throws IOException, InterruptedException;
//返回一个value
public abstract VALUEIN getCurrentValue() throws IOException, InterruptedException;
//返回是否在处理
public abstract float getProgress() throws IOException, InterruptedException;
//关闭reader
public abstract void close() throws IOException;
}
这个抽象类就涉及到读取切片的数据,处理成KV结构。而在input源码分析中说到,mapper.run方法中通过 context.getCurrentKey() 类似的方法获取key其实就是调用这个RecordReader中的这些get方法而已。
从上面的源码可以看到。
InputFormat:负责规划切片信息,以及创建RecordReader对象
RecordReader:负责按照切片规划去读取当前mapper处理的切片数据,并将其处理成KV形式,然后通过context传递给mapper。
常用的有:TextInputFormat、KeyValueTextInputFormat、NLineInputFormat、CombineTextInputFormat和自定义InputFormat(自定义有另外的文章讲)
这是默认的InputFormat,切片方式是按数据块的方式切割,默认大小block大小。一个文件至少是一个切片(无论多小)。因为这个类继承FileInputFormat,使用的是其父类定义的getsplit() 方法进行切片。
使用的RecordReader是LineRecordReader。处理切片成KV时,每条记录是一行输入。键K是LongWritable类型,存储该行在整个文件中的字节偏移量。值是这行的内容,不包括任何行终止符(换行符和回车符)。
这个类也是使用父类FileInputFormat的getsplit() 方法进行切片,所以切片方式和上面一致。
使用的RecordReader是KeyValueLineRecordReader。每一行均为一条记录,被分隔符分割为key,value。可以通过在驱动类中设置conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, " ");来设定分隔符。默认分隔符是tab(\t)。
这个类虽然继承了FileInputFormat,但是自己重写了getSplit方法,使用另外的方式来切片。是按指定的行数来切片,比如5行,那就5行作为一个切片,无论数据大小。通过mapreduce.input.lineinputformat.linespermap 这个参数设置切片行数。
使用的RecordReader是LineRecordReader。和上面类似,不重复说。
这个类继承于 CombineFileInputFormat,父类继承于FileInputFormat。在CombineFileInputFormat中重写了 getSplits方法。因为FileInputFormat默认无论多小的文件,一个文件至少是一个切片。如果遇到很多小文件,就会导致很多切片。而这里的切片方式就是严格按照大小来切片,会将小文件集合在一起,达到指定大小,才作为一个切片。
使用的RecordReader是CombineFileRecordReader。处理方式和 LineRecordReader类似,只不过切片可能是来自多个文件,读取方式上略显麻烦。
job.setInputFormatClass(xxxInputFormat.class);
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。