这篇文章主要讲解了“Mapper输出缓冲区MapOutputBuffer怎么理解”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Mapper输出缓冲区MapOutputBuffer怎么理解”吧!
Mapper的输出缓冲区MapOutputBuffer
现在我们知道了Map的输入端,紧接着我们看map的输出,这里重点就是context.write这个语句的内涵。
搞清Mapper作为参数传给map的context,这里我们看Mapper的run被调用的时候作为了参数传递下来。调用Mapper.run的是MapTask. runNewMapper。到这里我们深究一下runNewMapper。我们看MapTask的run方法:我们重点看runNewMapper
public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
throws IOException, ClassNotFoundException, InterruptedException { this.umbilical = umbilical; if (isMapTask()) { // If there are no reducers then there won't be any sort. Hence the map // phase will govern the entire attempt's progress. if (conf.getNumReduceTasks() == 0) { mapPhase = getProgress().addPhase("map", 1.0f); } else { // If there are reducers then the entire attempt's progress will be // split between the map phase (67%) and the sort phase (33%). mapPhase = getProgress().addPhase("map", 0.667f); sortPhase = getProgress().addPhase("sort", 0.333f); } } TaskReporter reporter = startReporter(umbilical);获取视频中文档资料及完整视频的伙伴请加QQ群:947967114 boolean useNewApi = job.getUseNewMapper(); initialize(job, getJobID(), reporter, useNewApi); // check if it is a cleanupJobTask if (jobCleanup) { runJobCleanupTask(umbilical, reporter); return; } if (jobSetup) { runJobSetupTask(umbilical, reporter); return; } if (taskCleanup) { runTaskCleanupTask(umbilical, reporter); return; } if (useNewApi) { runNewMapper(job, splitMetaInfo, umbilical, reporter); } else { runOldMapper(job, splitMetaInfo, umbilical, reporter); } done(umbilical, reporter);
}
当我们点runNewMapper的时候就能进入真正实现:
private <INKEY,INVALUE,OUTKEY,OUTVALUE>
void runNewMapper(final JobConf job,final TaskSplitIndex splitIndex,final TaskUmbilicalProtocol umbilical,TaskReporter reporter) throws IOException, ClassNotFoundException,
InterruptedException {
// make a task context so we can get the classes org.apache.hadoop.mapreduce.TaskAttemptContext taskContext = new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, getTaskID(),reporter); // make a mapper org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper = (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
//确定该用哪一种具体的Mapper,然后创建。获取视频中文档资料及完整视频的伙伴请加QQ群:947967114
org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat = (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)
ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
//确定输入的文件格式
// rebuild the input split org.apache.hadoop.mapreduce.InputSplit split = null;
split = getSplitDetails(new Path(splitIndex.getSplitLocation()),splitIndex.getStartOffset());//确定这个Mapper所用的输入是哪一个split
LOG.info("Processing split: " + split); org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input = new NewTrackingRecordReader<INKEY,INVALUE> (split, inputFormat, reporter, taskContext);
//创建和InputFormat相称的RecordReader
job.setBoolean(JobContext.SKIP_RECORDS, isSkipping()); org.apache.hadoop.mapreduce.RecordWriter output = null; // get an output object
if (job.getNumReduceTasks() == 0) {
//如果设置的reduce个数是0,就直接输出。
output = new NewDirectOutputCollector(taskContext, job, umbilical, reporter); } else { output = new NewOutputCollector(taskContext, job, umbilical, reporter);
}
接下来我们看一下NewOutputCollector源码 获取视频中文档资料及完整视频的伙伴请加QQ群:947967114
NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext, JobConf job, TaskUmbilicalProtocol umbilical, TaskReporter reporter ) throws IOException, ClassNotFoundException { collector = createSortingCollector(job, reporter);
//创建通向排序阶段的collecter
partitions = jobContext.getNumReduceTasks();
//通过获取Reduce数量来获得partitions数量。两个数量一一对应
if (partitions > 1) {
//获取的partitions 数量大于1
partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>) ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);
//ReflectionUtils.newInstance创建用户设置的Partitioner,里边的参数jobContext.getPartitionerClass()是对抽象类的某种扩充,表示自己可以书写一个Partitioner类,通过这个方法来获取,如果没有自己写,就是用默认的HashPartitioner
} else { partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() { @Override public int getPartition(K key, V value, int numPartitions) { return partitions - 1; }//只有一个partition就动态扩充抽象类Partitioner类 }; }
}
回到runNewMapper源码:
org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE> mapContext = new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(), input, output, committer, reporter, split);
//创建一个用于Mapper的Context。
org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context
mapperContext = new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext(mapContext);
//把上边创建的mapContext通过getMapContext获取过来最终传递给mapperContext ,我们继续看getMapContext源码
public Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context
getMapContext(MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> mapContext) {
return new Context(mapContext);
}
//这里返回了Context对象,在查看Context对象。获取视频中文档资料及完整视频的伙伴请加QQ群:947967114
public Context(MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> mapContext) { this.mapContext = mapContext;
}
//我们看到获取了mapContext 的值。所以我们知道WrappedMapper-->Context-->mapContext是一个MapContextImpl。
try { input.initialize(split, mapperContext);
//初始化input,input是recordReader对象,split和mapperContext作为参数
mapper.run(mapperContext);
//我们知道这个run方法运行的是Mapper的run方法,所以看一下这个run
public void run(Context context) throws IOException, InterruptedException {
setup(context);
//获取context
try { while (context.nextKeyValue()) {
//通过nextKeyValue来控制运行
map(context.getCurrentKey(), context.getCurrentValue(), context);
//运行了map方法,给了recordReader提供过来的键值对。
} } finally { cleanup(context); }
}
回到MapTask源码
mapPhase.complete();
//上锁
setPhase(TaskStatus.Phase.SORT);
//所有的task结果进行排序
statusUpdate(umbilical);
//更新runNewMapper状态。
input.close();
//关闭输入流
input = null; output.close(mapperContext);
//关闭输出流
output = null; } finally { closeQuietly(input); closeQuietly(output, mapperContext); }
}
对于输入格式和分片以前已经详细说过了,需要注意NewTrackingRecordReader。我们知道有了InputFormat之后需要创建与他对应的RecordReader。但是在RecordReader上是用NewTrackingRecordReader。不同之处在于Tracking,是一个跟踪,对RecordReader的跟踪,他这里有一个参数reporter,就是用来上报跟踪结果的,RecordReader则没有这个功能。
和输出有关的是collecter,是输出数据的收集器,context.write最后就通过RecodWriter落实到collector.collect上。RecordWriter和RecordReader是同一个层次。RecodWriter是hadoop定义个一个抽象类,具体的RecodWriter就是对这个抽象类的扩充。用于maptask的就是NewDrictDoutputCollecter和NewOutputCollecter。
这两个类叫做OutputCollecter,实际上都是RecordWriter。Collecter只是一种语意的描述。从Mapper的角度看是Writer,是输出。从框架或下游的角度看是Collect,是收集。
如果reducer数量是0,就是没有reducer,Mapper的输出就是整个MR的输出,这个时候用RecordWriter的NewDrictDoutputCollecter,直接输出。相反至少有一个Reducer,那么使用的就是RecordWriter的NewOutputCollecter。这是我们注重的重点内容。我们看NewOutputCollecter源码。定义了几个内容:
collector = createSortingCollector(job, reporter);
//实现MapOutputCollector
partitions = jobContext.getNumReduceTasks();
//负责Mapper输出的分区
partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)
//分发目标的个数,也就是Reducer的个数。
@Override public void write(K key, V value) throws IOException, InterruptedException { collector.collect(key, value, partitioner.getPartition(key, value, partitions));
}
//write只写不读。
@Override public void close(TaskAttemptContext context ) throws IOException,InterruptedException { try { collector.flush(); } catch (ClassNotFoundException cnf) { throw new IOException("can't find class ", cnf); } collector.close(); }
}
NewOutputCollector分成两部分,一个是collecter还有一个是partitioner。collecter负责实际收集Mapper输出并交付给Reducer的工作,partitioner负责决定把具体的输出交给哪一个Reducer。
有多个Reducer存在,MR框架需要把每个Mapper的每项输出,也就是收集到的所有的KV对。按照某种条件(就是Partioner的实现方式,默认就是HashPartitioner)输出到不同的Reducer。这样就把Mapper的输出划分成了多个分区(Partition),有几个Reducer就把每个Mapper还分成几个Partition,Partitioner就是起到划分的作用。hash的方式。。。。。。。。。。。。
所以在创建NewOutputCollector的构造函数中,就要把具体的collector和partitioner创建好。
hadoop的源码中定义了MapOutputCollector。凡是实现了这个类,除了init和close方法外,还必须提供collect和flush这两个函数,从NewOutputCollector知道这两个函数的调用者是collector,创建collector的方式是通过createSortingCollector来完成的。并且还实现了对KV对的排序。从属关系如下:
YarnChild.main->PrivilegeExceptionAction.run->Maptask.run-->RunNewMapper->NewOutputCollector->MapTask.createSortingCollector
那么我们来看一下createSortingCollector源码。获取视频中文档资料及完整视频的伙伴请加QQ群:947967114
private <KEY, VALUE> MapOutputCollector<KEY, VALUE>
createSortingCollector(JobConf job, TaskReporter reporter) throws IOException, ClassNotFoundException { MapOutputCollector.Context context = new MapOutputCollector.Context(this, job, reporter); Class<?>[] collectorClasses = job.getClasses( JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR, MapOutputBuffer.class);
//如果没有添加设置就默认使用MapOutputBuffer.class
int remainingCollectors = collectorClasses.length;
for (Class clazz : collectorClasses) {
//逐一实验设置的collectorClasses
try { if (!MapOutputCollector.class.isAssignableFrom(clazz)) { throw new IOException("Invalid output collector class: " + clazz.getName() + " (does not implement MapOutputCollector)");
//这里告诉我们必须实现MapOutputCollector.class
} Class<? extends MapOutputCollector> subclazz = clazz.asSubclass(MapOutputCollector.class); LOG.debug("Trying map output collector class: " + subclazz.getName());
//获取日志
MapOutputCollector<KEY, VALUE> collector = ReflectionUtils.newInstance(subclazz, job);
//创建collector对象。
collector.init(context);
//初始化collector,实际上初始化的是MapOutputBuffer对象
LOG.info("Map output collector class = " + collector.getClass().getName()); return collector;
//没有异常就成功了。
} catch (Exception e) { String msg = "Unable to initialize MapOutputCollector " + clazz.getName(); if (--remainingCollectors > 0) { msg += " (" + remainingCollectors + " more collector(s) to try)"; } LOG.warn(msg, e); } } throw new IOException("Unable to initialize any output collector");
}
具体采用什么collector是可以在配置文件mapred-default.xml中设置的,这里的MAP_OUTPUT_COLLECTOR_CLASS_ATTR即mapreduce.job.output.collector.class.如果文件中没有设置就使用默认的MapOutputBuffer。所以实际创建的collcter就是Mapask的MapOutputBuffer。这个类是Maptask的内部类,实现了MapOutputCollector。
可想而知,如果我们另写一个实现了MapOutputCollectior的Collector,并修改配置文件mapred-default.xml中队配置项的设置。那么就可以创建不是MapTask.MapOutputBuffer。那样createSortingCollector创建的就是一个没有排序功能的collector。我们知道MapReduce框架之所以是工作流不是数据流的原因就是因为Mapper和Reducer之间的排序。因为Sort只有在所有数据到来之后才能完成。sort完之后所有数据才被Rducer拉取。那么没有了sort之后代表数据可以不断的流入而不是一次性的填充,MR给我们提供了这种可能性,就是通过写一个不排序的Collector来替代MapOutputBuffer。我们接下来还是把注意力放到runNewMapper上。
当创建了collector和partitioner之后就是Context,MapTask在调用mapper.run时作为参数的是mapperContext,这个对象的类型是WrappedMapper.Context,整个过程是MapContextImpl创建了mapContext对象,通过WrappedMapper对象(是对Mapper的扩充,根据名字就可以知道是对Mapper的包装区别就是在内部定义了Context类),把一个扩充的Mapper.Context包装在Mapper内部,这就是WrappedMapper.Context类对象。下面是部分代码;
public class WrappedMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
extends Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
public Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context
getMapContext(MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> mapContext) {
return new Context(mapContext);
}
@InterfaceStability.Evolving
public class Context
extends Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context { protected MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> mapContext;
//MapContext类。被MapContextImpl实现
public Context(MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> mapContext) { this.mapContext = mapContext; } /** * Get the input split for this map. */ public InputSplit getInputSplit() { return mapContext.getInputSplit(); } @Override public KEYIN getCurrentKey() throws IOException, InterruptedException { return mapContext.getCurrentKey(); } @Override public VALUEIN getCurrentValue() throws IOException, InterruptedException { return mapContext.getCurrentValue(); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { return mapContext.nextKeyValue(); }
WrappedMapper.Context是对Mapper.Context的扩充。内部mapContext,它的构造函数Context中的this.mapContext就设置成这个MapContextImpl类对象mapContext。WrappedMapper.Context扩充了Mapper.Context的write、getCurrentKey、nextKeyValue等。
传给mapper.run的context就是WrappedMapper.Context对象。里面的mapContext是MapContextImpl对象。
我们继续看Mapper.map的context.write
关系是:MapTask.run->runNewMapper->Mapper.run->Mapper.map
按照这个关系找到了一个没有做任何事的方法。
public void write(KEYOUT key, VALUEOUT value)
throws IOException, InterruptedException;
我们需要找一个实现,这里找到的就是WrappedMapper.Context.write
就是这一段:
public void write(KEYOUT key, VALUEOUT value) throws IOException, InterruptedException { mapContext.write(key, value);
}
这里的调用的其实是MapContextImpl.write。所以我们找到MapContextImpl。当我们看到MapContextImpl源码是看到继承了TaskInputOutputContextImpl我们找到了
public void write(KEYOUT key, VALUEOUT value
) throws IOException, InterruptedException { output.write(key, value);
}
找到这里我们还是没有找到真正的实现,这里的witer实际上调用的是,NewOutputCollector.writer。
public void write(K key, V value) throws IOException, InterruptedException { collector.collect(key, value, partitioner.getPartition(key, value, partitions));
}
绕了一大圈之后我们发现最终回到了NewOutputCollector,这里的write和之前的有明显区别是collect实现的,里面有了分区。我们找的目的是一定要找到write中真正实现了分区写。
我们知道context是个WrappedMappe.Context对象,所以context.write其实就是就是Wrapped.Context.write,这个函数转而调用内部成分mapContext的write函数,而mapContext是个MapContextImpl对象,所以实际调用的是MoapCntextImpl.write。然而MapContextImpl中没有提供write函数,但是我们看到这个类继承了TaskInputOutputContextImpl。所以就继承他的write方法,然后这个write函数调用的是output的write,我们知道这个output参数类型是一个RecordReader,实际上这个output就是MapTask中定义的output,这个output是一个NewOutputCollector,也就是说是调用的NewOutputCollector的write方法,在这个write中我们看到调用了collector的collect,这个collecter就是Maptask.MapOutputBuffer。
在调用Maptask.MapOutputBuffer的collect时增加了一个参数partition,是指明KV去向的,这个值是有job.setPartitionerClass指定的,没有设置就使用了hashPartitioner。下面所有的工作就是由MapTask的MapOutputBuffer来完成了。
感谢各位的阅读,以上就是“Mapper输出缓冲区MapOutputBuffer怎么理解”的内容了,经过本文的学习后,相信大家对Mapper输出缓冲区MapOutputBuffer怎么理解这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是亿速云,小编将为大家推送更多相关知识点的文章,欢迎关注!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。