这篇文章主要讲解了“MapTask流程是怎样的”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“MapTask流程是怎样的”吧!
1、从job提交流程的24步,开始mapTask的流程分析,进入submitJob --LocalJobRunner.java中的788行
Job job = new Job(JobID.downgrade(jobid), jobSubmitDir); //创建一个可以真正执行的Job
该Job: LocalJobRunner$Job , 且是一个线程 $表示内部类
2、因为当前的Job对象是一个线程,所有执行线程要执行run方法,因此直接找到 LocalJobRunner的run方法进行查看
--定位到537行
TaskSplitMetaInfo[] taskSplitMetaInfos =
SplitMetaInfoReader.readSplitMetaInfo(jobId, localFs, conf, systemJobDir);
//读取切片的metainfo信息,即提交job过程中在临时目录中生成的job.splitmetainfo文件
3、向下走断点,定位到下方代码 --547行
List<RunnableWithThrowable> mapRunnables = getMapTaskRunnables(
taskSplitMetaInfos, jobId, mapOutputFiles);
//根据切片的metainfo信息,可以得出有多少个切片,再生成对应个数的Runnable对象.
每个Runnable对象对应一个线程,每一个MapTask运行在一个线程中(基于本地模式的分析)
Runnable : LocalJobRunnber$Job$MapTaskRunnable ---联想到线程
4、ExecutorService mapService = createMapExecutor(); //创建线程池对象
runTasks(mapRunnables, mapService, "map");// 将所有的LocalJobRunnber$Job$MapTaskRunnable对象提交给
线程池执行,进入到runTasks方法内部。 --LocalJobRunner中的466行
5、//每个线程负责一个Runnable执行,定位到每个Runnable内部的run方法,查看具体执行(以内部类的方式嵌套)
for (Runnable r : runnables) {
service.submit(r);
}
LocalJobRunnber$Job$MapTaskRunnable交给每个线程执行时,会执行到
LocalJobRunnber$Job$MapTaskRunnable的run方法,因此接下来看
LocalJobRunnber$Job$MapTaskRunnable的run方法 --LocalJobRunner中的248行
6、进入到run方法内部,定位到254行
MapTask map = new MapTask(systemJobFile.toString(), mapId, taskId,
info.getSplitIndex(), 1);
//创建MapTask对象 --在每一个线程中都会执行,会创建一个mapTask对象
7、进入map.run(localConf, Job.this); --271行 //执行MapTask的run方法,关联到MapTask方法中的run
进入到MapTask的run方法内
首先进行分区设置
partitions = jobContext.getNumReduceTasks();
if (partitions > 1) {
partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)
ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);
} else {
partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() {
@Override
public int getPartition(K key, V value, int numPartitions) {
return partitions - 1;
}
};
}
8、定位到MapTask中run方法的347行,并进入runNewMapper()方法,提前判断下是否使用新的api
进入runNewMapper()方法,定位到MapTask的745行开始读源码
9、--反射的方式创建Mapper对象. 例如: WordCountMapper
org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
(org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
--反射的方式创建Inputformat对象, 例如: TextInputFormat(默认)
org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =
(org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)
ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
--获取当前MapTask所负责的切片信息
org.apache.hadoop.mapreduce.InputSplit split = null;
split = getSplitDetails(new Path(splitIndex.getSplitLocation()),
splitIndex.getStartOffset());
--获取RecordReader对象
org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
new NewTrackingRecordReader<INKEY,INVALUE>
(split, inputFormat, reporter, taskContext);
10、向下读取,定位到MapTask的782行 output = new NewOutputCollector(taskContext, job, umbilical, reporter);方法进入
11、定位到MapTask的710行
collector = createSortingCollector(job, reporter); //收集器对象,可以理解为缓冲区对象
12、进入到createSortingCollector方法, --MapTask中的388行
13、collector.init(context); --初始化缓冲区对象 collector: MapTask$MapOutputBuffer
14、进入到init方法中 --MapTask的968行
15、
①:定位到init方法的980行
--//获取溢写百分比 80%,通过mapreduce.map.sort.spill.percent参数来配置
final float spillper =
job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8);
--//获取缓冲区大小 100M, 通过 mapreduce.task.io.sort.mb 参数来配置
final int sortmb = job.getInt(MRJobConfig.IO_SORT_MB,
MRJobConfig.DEFAULT_IO_SORT_MB);
--//获取排序对象 QuickSort.class, 只排索引
sorter = ReflectionUtils.newInstance(job.getClass(
MRJobConfig.MAP_SORT_CLASS, QuickSort.class,
IndexedSorter.class), job);
--//获取key的比较器对象
comparator = job.getOutputKeyComparator();
--//获取key的序列化对象 k/v serialization 获取kv的序列化对象
--//获取计数器对象 output counters
--//compression 获取编解码器,进行压缩操作
--//combiner 获取Combiner对象,在溢写及归并可以使用combiner
--//spillThread.start(); 启动溢写线程 ,只有达到溢写百分比才会发生溢写操作
16、mapper.run(mapperContext);执行到Mapper对象中的run方法,例如WordCountMapper中的run方法
进入到mapper.run()方法内
执行 setup(context); --143行
执行 map(context.getCurrentKey(), context.getCurrentValue(), context); --146行,
进入到wordCount中的map()方法,是一个循环执行的过程
context.wirte(outK,outV);将map方法中处理好的kv写出
执行cleanup(context);
感谢各位的阅读,以上就是“MapTask流程是怎样的”的内容了,经过本文的学习后,相信大家对MapTask流程是怎样的这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是亿速云,小编将为大家推送更多相关知识点的文章,欢迎关注!
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
原文链接:https://my.oschina.net/luffycl/blog/4992705