这篇文章主要介绍“ReduceTask流程是怎样的”,在日常操作中,相信很多人在ReduceTask流程是怎样的问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”ReduceTask流程是怎样的”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
1、最终的文件就是 file.out 和 file.out.index ,等待reduce的拷贝.
2、在LocalJobRunner$Job中的run方法中: //LocalJobRunner类中555行
if (numReduceTasks > 0) { //判断reduceTask的个数
//创建Runnable对象: LocalJobRunner$Job$ReduceTaskRunnable
List<RunnableWithThrowable> reduceRunnables = getReduceTaskRunnables(
jobId, mapOutputFiles);
//创建线程池
ExecutorService reduceService = createReduceExecutor();
//将所有的LocalJobRunner$Job$ReduceTaskRunnable 提交到线程池执行.
runTasks(reduceRunnables, reduceService, "reduce");
}
3、进入runTasks(reduceRunnables, reduceService, "reduce");方法 //559行
for (Runnable r : runnables) { //循环每个Runnable,提交给线程池去执行.
service.submit(r);
}
4、线程执行的时候,要运行LocalJobRunner$Job$ReduceTaskRunnable 中run方法
5、创建ReduceTask对象 //LocalJobRunner类~332行
ReduceTask reduce = new ReduceTask(systemJobFile.toString(),reduceId, taskId,mapIds.size(), 1);
6、执行ReduceTask中的run方法
//LocalJobRunner类 --> 347行reduce.run(localConf, Job.this); --> //进入run方法
7、调到ReduceTask的run方法内 //ReduceTask类~320行
initialize(job, getJobID(), reporter, useNewApi); //初始化~333行
sortPhase.complete(); //排序~382行
RawComparator comparator = job.getOutputValueGroupingComparator(); //387行 获取分组比较器
8、进入下列代码(390行)
runNewReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass);
进入runNewReducer方法内 //ReduceTask~577行
--获取job的相关信息
org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, getTaskID(), reporter);
--反射的操作创建reduce对象 ,例如: WordCountReducer
org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE> reducer =
(org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>)
ReflectionUtils.newInstance(taskContext.getReducerClass(), job);
--创建RecordWriter对象
org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> trackedRW =
new NewTrackingRecordWriter<OUTKEY, OUTVALUE>(this, taskContext);
9、向下走,定位到reducer.run(reducerContext);方法 --> 然后进入(Reducer的run方法) //~628行
setup(context);
reduce(context.getCurrentKey(), context.getValues(), context);
//执行到WordCountReducer中的reduce方法,是一个循环调用过程.
context.write(key,outv); //数据写出源码流程如下:
①:reduceContext.write(key, value);
②:output.write(key, value);
//进入到ReduceTask的write方法 //557行
③:real.write(key,value); //real :TextOutputFormat$LineRecordWriter
进入到real.write()方法 //TextOutputFormat类~84行
writeObject(key); //写出key
writeObject(value); //写出value
写出key的源码~简单看下: //TextOutputFormat类~75行
private void writeObject(Object o) throws IOException {
if (o instanceof Text) {
Text to = (Text) o;
out.write(to.getBytes(), 0, to.getLength());
} else {
out.write(o.toString().getBytes(StandardCharsets.UTF_8));
//调用对象的toString方法,将返回的字符串转换成字节,通过流写出
}
}
10、cleanup(context); //清除生相关的文件,生成分区文件
源码总结说明:
1. 看源码目的:
熟悉整个MR的流程,能够将我们讲解的知识点对应到源码中具体的位置.
为面试做准备.
2. 在整个MR中 ,会有N个MapTask(按照切片数量决定个数)和 N个ReduceTask(自行设置个数)
--在集群中的效果是多个MapTask并行运行, 并行数由集群的资源来决定.
--多个ReduceTask并行运行,并行数由集群的资源来决定. 一般来说,ReduceTask的数量比较少,基本上都
能够同时并行.
到此,关于“ReduceTask流程是怎样的”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注亿速云网站,小编会继续努力为大家带来更多实用的文章!
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
原文链接:https://my.oschina.net/luffycl/blog/4992859