小编给大家分享一下hadoop中mapreduce如何实现串联执行,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!
import java.io.IOException; import java.util.Iterator; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob; import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class PickMain { private static final Log LOG = LogFactory.getLog(PickMain.class); public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { /* * Configuration conf = new Configuration(); Job job1 = Job.getInstance(conf); job1.setJarByClass(PickMain.class); job1.setMapperClass(FindMapper.class); job1.setReducerClass(FindReducer.class); job1.setOutputKeyClass(Text.class); job1.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job1, new Path(args[0])); FileOutputFormat.setOutputPath(job1, new Path(args[1])); boolean flag1 = job1.waitForCompletion(true); //下面这种方法也可以实现串联执行job if(flag1) { Job job2 = Job.getInstance(conf); job2.setJarByClass(PickMain.class); job2.setMapperClass(SecondFindMapper.class); job2.setReducerClass(SecondFindReducer.class); job2.setOutputKeyClass(Text.class); job2.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job2, new Path(args[1])); FileOutputFormat.setOutputPath(job2, new Path(args[2])); boolean flag2 = job2.waitForCompletion(true); System.out.println(flag2?0:1); if(flag2) { LOG.info("The job is done!"); System.exit(0); }else { LOG.info("The Second job is wrong!"); System.exit(1); } }else { LOG.info("The firt job is Running Wrong job break!"); System.exit(1); } */ //下面通过使用ContolledJob和JobControl来实现提交多个作业 Configuration conf = new Configuration(); Job job1 = Job.getInstance(conf); job1.setJarByClass(PickMain.class); job1.setMapperClass(FindMapper.class); job1.setReducerClass(FindReducer.class); job1.setOutputKeyClass(Text.class); job1.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job1, new Path(args[0])); FileOutputFormat.setOutputPath(job1, new Path(args[1])); Configuration conf2 = new Configuration(); Job job2 = Job.getInstance(conf2); job2.setJarByClass(PickMain.class); job2.setMapperClass(SecondFindMapper.class); job2.setReducerClass(SecondFindReducer.class); job2.setOutputKeyClass(Text.class); job2.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job2, new Path(args[1])); FileOutputFormat.setOutputPath(job2, new Path(args[2])); //创建ControlledJob对job进行包装 ControlledJob cjob1 = new ControlledJob(conf); ControlledJob cjob2 = new ControlledJob(conf2); cjob1.setJob(job1); cjob2.setJob(job2); //设置依赖关系,这个时候只有等到job1执行完成后job2才会执行 cjob2.addDependingJob(cjob1); //JobControl该类相当于一个job控制器,它是一个线程,需要通过线程启动 JobControl jc = new JobControl("my_jobcontrol"); jc.addJob(cjob1); jc.addJob(cjob2); Thread th = new Thread(jc); th.start(); //等到所有的job都执行完成后在退出 while(!jc.allFinished()) { Thread.sleep(5000); } System.exit(0); } } class FindMapper extends Mapper<LongWritable, Text, Text, Text>{ Text m1 = new Text(); Text m2 = new Text(); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { String line = value.toString(); String[] tmp1 = line.split(":"); String outval = tmp1[0]; String[] outkeys = tmp1[1].split(","); for(int i = 0 ; i<outkeys.length;i++) { m1.set(outkeys[i]);m2.set(outval); context.write(m1,m2); } } } class FindReducer extends Reducer<Text, Text, Text, NullWritable>{ StringBuilder sb = new StringBuilder(); NullWritable nul = NullWritable.get(); Text outval = new Text(); String spector = ":"; @Override protected void reduce(Text txt, Iterable<Text> txtiter, Reducer<Text, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException { sb.delete(0, sb.length()); sb.append(txt.toString()); Iterator<Text> it = txtiter.iterator(); while(it.hasNext()) { sb.append(spector+it.next().toString()); } outval.set(sb.toString()); context.write(outval, nul); } } class SecondFindMapper extends Mapper<LongWritable, Text, Text, Text>{ Text keyout = new Text(); Text valueout = new Text(); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { String[] fs = value.toString().split(":"); valueout.set(fs[0]); if(fs.length>0) { for(int i = 1;i<fs.length-1;i++) { for(int j = i+1;j<fs.length;j++) { if((int)fs[i].toCharArray()[0]>(int)fs[j].toCharArray()[0]) { keyout.set(fs[j]+"-"+fs[i]); }else { keyout.set(fs[i]+"-"+fs[j]); } context.write(keyout, valueout); } } } } } class SecondFindReducer extends Reducer<Text, Text, Text, Text>{ StringBuilder sb = new StringBuilder(); Text outvalue = new Text(); @Override protected void reduce(Text key, Iterable<Text> iter, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException { sb.delete(0, sb.length()); Iterator<Text> it = iter.iterator(); if(it.hasNext()) { sb.append(it.next().toString()); } while(it.hasNext()) { sb.append(","+it.next().toString()); } outvalue.set(sb.toString()); context.write(key, outvalue); } }
以上是“hadoop中mapreduce如何实现串联执行”这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注亿速云行业资讯频道!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。