这篇文章主要讲解了“MapTask流程是怎样的”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“MapTask流程是怎样的”吧!
1、从job提交流程的24步,开始mapTask的流程分析,进入submitJob --LocalJobRunner.java中的788行 Job job = new Job(JobID.downgrade(jobid), jobSubmitDir); //创建一个可以真正执行的Job 该Job: LocalJobRunner$Job , 且是一个线程 $表示内部类
2、因为当前的Job对象是一个线程,所有执行线程要执行run方法,因此直接找到 LocalJobRunner的run方法进行查看 --定位到537行 TaskSplitMetaInfo[] taskSplitMetaInfos = SplitMetaInfoReader.readSplitMetaInfo(jobId, localFs, conf, systemJobDir); //读取切片的metainfo信息,即提交job过程中在临时目录中生成的job.splitmetainfo文件
3、向下走断点,定位到下方代码 --547行 List<RunnableWithThrowable> mapRunnables = getMapTaskRunnables( taskSplitMetaInfos, jobId, mapOutputFiles); //根据切片的metainfo信息,可以得出有多少个切片,再生成对应个数的Runnable对象. 每个Runnable对象对应一个线程,每一个MapTask运行在一个线程中(基于本地模式的分析) Runnable : LocalJobRunnber$Job$MapTaskRunnable ---联想到线程
4、ExecutorService mapService = createMapExecutor(); //创建线程池对象 runTasks(mapRunnables, mapService, "map");// 将所有的LocalJobRunnber$Job$MapTaskRunnable对象提交给 线程池执行,进入到runTasks方法内部。 --LocalJobRunner中的466行
5、//每个线程负责一个Runnable执行,定位到每个Runnable内部的run方法,查看具体执行(以内部类的方式嵌套) for (Runnable r : runnables) { service.submit(r); } LocalJobRunnber$Job$MapTaskRunnable交给每个线程执行时,会执行到 LocalJobRunnber$Job$MapTaskRunnable的run方法,因此接下来看 LocalJobRunnber$Job$MapTaskRunnable的run方法 --LocalJobRunner中的248行
6、进入到run方法内部,定位到254行 MapTask map = new MapTask(systemJobFile.toString(), mapId, taskId, info.getSplitIndex(), 1); //创建MapTask对象 --在每一个线程中都会执行,会创建一个mapTask对象
7、进入map.run(localConf, Job.this); --271行 //执行MapTask的run方法,关联到MapTask方法中的run
进入到MapTask的run方法内 首先进行分区设置 partitions = jobContext.getNumReduceTasks(); if (partitions > 1) { partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>) ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job); } else { partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() { @Override public int getPartition(K key, V value, int numPartitions) { return partitions - 1; } }; } 8、定位到MapTask中run方法的347行,并进入runNewMapper()方法,提前判断下是否使用新的api 进入runNewMapper()方法,定位到MapTask的745行开始读源码
9、--反射的方式创建Mapper对象. 例如: WordCountMapper org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper = (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>) ReflectionUtils.newInstance(taskContext.getMapperClass(), job); --反射的方式创建Inputformat对象, 例如: TextInputFormat(默认) org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat = (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>) ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job); --获取当前MapTask所负责的切片信息 org.apache.hadoop.mapreduce.InputSplit split = null; split = getSplitDetails(new Path(splitIndex.getSplitLocation()), splitIndex.getStartOffset()); --获取RecordReader对象 org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input = new NewTrackingRecordReader<INKEY,INVALUE> (split, inputFormat, reporter, taskContext);
10、向下读取,定位到MapTask的782行 output = new NewOutputCollector(taskContext, job, umbilical, reporter);方法进入
11、定位到MapTask的710行 collector = createSortingCollector(job, reporter); //收集器对象,可以理解为缓冲区对象 12、进入到createSortingCollector方法, --MapTask中的388行 13、collector.init(context); --初始化缓冲区对象 collector: MapTask$MapOutputBuffer 14、进入到init方法中 --MapTask的968行
15、 ①:定位到init方法的980行 --//获取溢写百分比 80%,通过mapreduce.map.sort.spill.percent参数来配置 final float spillper = job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8); --//获取缓冲区大小 100M, 通过 mapreduce.task.io.sort.mb 参数来配置 final int sortmb = job.getInt(MRJobConfig.IO_SORT_MB, MRJobConfig.DEFAULT_IO_SORT_MB); --//获取排序对象 QuickSort.class, 只排索引 sorter = ReflectionUtils.newInstance(job.getClass( MRJobConfig.MAP_SORT_CLASS, QuickSort.class, IndexedSorter.class), job); --//获取key的比较器对象 comparator = job.getOutputKeyComparator(); --//获取key的序列化对象 k/v serialization 获取kv的序列化对象 --//获取计数器对象 output counters --//compression 获取编解码器,进行压缩操作 --//combiner 获取Combiner对象,在溢写及归并可以使用combiner --//spillThread.start(); 启动溢写线程 ,只有达到溢写百分比才会发生溢写操作
16、mapper.run(mapperContext);执行到Mapper对象中的run方法,例如WordCountMapper中的run方法 进入到mapper.run()方法内 执行 setup(context); --143行 执行 map(context.getCurrentKey(), context.getCurrentValue(), context); --146行, 进入到wordCount中的map()方法,是一个循环执行的过程 context.wirte(outK,outV);将map方法中处理好的kv写出 执行cleanup(context);
感谢各位的阅读,以上就是“MapTask流程是怎样的”的内容了,经过本文的学习后,相信大家对MapTask流程是怎样的这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是亿速云,小编将为大家推送更多相关知识点的文章,欢迎关注!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。