温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

ReduceTask流程是怎样的

发布时间:2021-12-23 16:07:19 来源:亿速云 阅读:170 作者:iii 栏目:大数据

这篇文章主要介绍“ReduceTask流程是怎样的”,在日常操作中,相信很多人在ReduceTask流程是怎样的问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”ReduceTask流程是怎样的”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

ReduceTask流程源码解读

1、最终的文件就是 file.out 和 file.out.index ,等待reduce的拷贝.

2、在LocalJobRunner$Job中的run方法中: 		//LocalJobRunner类中555行
if (numReduceTasks > 0) {  			//判断reduceTask的个数
 	//创建Runnable对象: LocalJobRunner$Job$ReduceTaskRunnable
       	List<RunnableWithThrowable> reduceRunnables = getReduceTaskRunnables(
            jobId, mapOutputFiles);
        //创建线程池
	ExecutorService reduceService = createReduceExecutor();
	//将所有的LocalJobRunner$Job$ReduceTaskRunnable 提交到线程池执行.
        runTasks(reduceRunnables, reduceService, "reduce");
       }
3、进入runTasks(reduceRunnables, reduceService, "reduce");方法	//559行
for (Runnable r : runnables) {		 //循环每个Runnable,提交给线程池去执行.
    service.submit(r);
}

4、线程执行的时候,要运行LocalJobRunner$Job$ReduceTaskRunnable 中run方法

ReduceTask流程是怎样的

5、创建ReduceTask对象		//LocalJobRunner类~332行
ReduceTask reduce = new ReduceTask(systemJobFile.toString(),reduceId, taskId,mapIds.size(), 1);

6、执行ReduceTask中的run方法
//LocalJobRunner类 --> 347行reduce.run(localConf, Job.this); --> //进入run方法

7、调到ReduceTask的run方法内				//ReduceTask类~320行	
initialize(job, getJobID(), reporter, useNewApi);	//初始化~333行
sortPhase.complete(); 					//排序~382行
RawComparator comparator = job.getOutputValueGroupingComparator();	//387行 获取分组比较器
8、进入下列代码(390行)
runNewReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass);
进入runNewReducer方法内		//ReduceTask~577行

--获取job的相关信息
 org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
      new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, getTaskID(), reporter);
 
--反射的操作创建reduce对象 ,例如: WordCountReducer
org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE> reducer =
      (org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>)
        ReflectionUtils.newInstance(taskContext.getReducerClass(), job);
        
--创建RecordWriter对象
org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> trackedRW = 
      new NewTrackingRecordWriter<OUTKEY, OUTVALUE>(this, taskContext);

ReduceTask流程是怎样的

9、向下走,定位到reducer.run(reducerContext);方法 --> 然后进入(Reducer的run方法)	//~628行
setup(context);
reduce(context.getCurrentKey(), context.getValues(), context); 
//执行到WordCountReducer中的reduce方法,是一个循环调用过程.
context.write(key,outv);		//数据写出源码流程如下:
①:reduceContext.write(key, value);
②:output.write(key, value);

//进入到ReduceTask的write方法		//557行
③:real.write(key,value); 		//real :TextOutputFormat$LineRecordWriter
进入到real.write()方法			//TextOutputFormat类~84行
writeObject(key);   //写出key
writeObject(value); //写出value

写出key的源码~简单看下:			//TextOutputFormat类~75行
private void writeObject(Object o) throws IOException {
	if (o instanceof Text) {
		Text to = (Text) o;
		out.write(to.getBytes(), 0, to.getLength());
	} else {
		out.write(o.toString().getBytes(StandardCharsets.UTF_8));
		//调用对象的toString方法,将返回的字符串转换成字节,通过流写出
	}
}
10、cleanup(context);   		//清除生相关的文件,生成分区文件

整体MR工作机制源码解读总结

源码总结说明:
1. 看源码目的:
       熟悉整个MR的流程,能够将我们讲解的知识点对应到源码中具体的位置.
       为面试做准备.

2. 在整个MR中 ,会有N个MapTask(按照切片数量决定个数)和 N个ReduceTask(自行设置个数)
	--在集群中的效果是多个MapTask并行运行, 并行数由集群的资源来决定. 
	--多个ReduceTask并行运行,并行数由集群的资源来决定. 一般来说,ReduceTask的数量比较少,基本上都
能够同时并行.

到此,关于“ReduceTask流程是怎样的”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注亿速云网站,小编会继续努力为大家带来更多实用的文章!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI