这篇文章主要介绍“ReduceTask流程是怎样的”,在日常操作中,相信很多人在ReduceTask流程是怎样的问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”ReduceTask流程是怎样的”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
1、最终的文件就是 file.out 和 file.out.index ,等待reduce的拷贝.
2、在LocalJobRunner$Job中的run方法中: //LocalJobRunner类中555行 if (numReduceTasks > 0) { //判断reduceTask的个数 //创建Runnable对象: LocalJobRunner$Job$ReduceTaskRunnable List<RunnableWithThrowable> reduceRunnables = getReduceTaskRunnables( jobId, mapOutputFiles); //创建线程池 ExecutorService reduceService = createReduceExecutor(); //将所有的LocalJobRunner$Job$ReduceTaskRunnable 提交到线程池执行. runTasks(reduceRunnables, reduceService, "reduce"); }
3、进入runTasks(reduceRunnables, reduceService, "reduce");方法 //559行 for (Runnable r : runnables) { //循环每个Runnable,提交给线程池去执行. service.submit(r); }
4、线程执行的时候,要运行LocalJobRunner$Job$ReduceTaskRunnable 中run方法
5、创建ReduceTask对象 //LocalJobRunner类~332行 ReduceTask reduce = new ReduceTask(systemJobFile.toString(),reduceId, taskId,mapIds.size(), 1); 6、执行ReduceTask中的run方法 //LocalJobRunner类 --> 347行reduce.run(localConf, Job.this); --> //进入run方法 7、调到ReduceTask的run方法内 //ReduceTask类~320行 initialize(job, getJobID(), reporter, useNewApi); //初始化~333行 sortPhase.complete(); //排序~382行 RawComparator comparator = job.getOutputValueGroupingComparator(); //387行 获取分组比较器
8、进入下列代码(390行) runNewReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass); 进入runNewReducer方法内 //ReduceTask~577行 --获取job的相关信息 org.apache.hadoop.mapreduce.TaskAttemptContext taskContext = new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, getTaskID(), reporter); --反射的操作创建reduce对象 ,例如: WordCountReducer org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE> reducer = (org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>) ReflectionUtils.newInstance(taskContext.getReducerClass(), job); --创建RecordWriter对象 org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> trackedRW = new NewTrackingRecordWriter<OUTKEY, OUTVALUE>(this, taskContext);
9、向下走,定位到reducer.run(reducerContext);方法 --> 然后进入(Reducer的run方法) //~628行 setup(context); reduce(context.getCurrentKey(), context.getValues(), context); //执行到WordCountReducer中的reduce方法,是一个循环调用过程. context.write(key,outv); //数据写出源码流程如下: ①:reduceContext.write(key, value); ②:output.write(key, value); //进入到ReduceTask的write方法 //557行 ③:real.write(key,value); //real :TextOutputFormat$LineRecordWriter 进入到real.write()方法 //TextOutputFormat类~84行 writeObject(key); //写出key writeObject(value); //写出value 写出key的源码~简单看下: //TextOutputFormat类~75行 private void writeObject(Object o) throws IOException { if (o instanceof Text) { Text to = (Text) o; out.write(to.getBytes(), 0, to.getLength()); } else { out.write(o.toString().getBytes(StandardCharsets.UTF_8)); //调用对象的toString方法,将返回的字符串转换成字节,通过流写出 } }
10、cleanup(context); //清除生相关的文件,生成分区文件
源码总结说明: 1. 看源码目的: 熟悉整个MR的流程,能够将我们讲解的知识点对应到源码中具体的位置. 为面试做准备. 2. 在整个MR中 ,会有N个MapTask(按照切片数量决定个数)和 N个ReduceTask(自行设置个数) --在集群中的效果是多个MapTask并行运行, 并行数由集群的资源来决定. --多个ReduceTask并行运行,并行数由集群的资源来决定. 一般来说,ReduceTask的数量比较少,基本上都 能够同时并行.
到此,关于“ReduceTask流程是怎样的”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注亿速云网站,小编会继续努力为大家带来更多实用的文章!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。