本篇内容介绍了“Combiner怎么使用”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
1、combiner的工作位置: kv从缓冲区中溢写到磁盘时可以使用combiner(只要设置,无条件使用) 每个MapTask的所有数据都从缓冲区写到磁盘后,在进行归并时可以使用combiner(满足条件使用,溢写次数>=3) 2、Combiner: 合并 目的就是在每个MapTask中将输出的kv提前进行局部合并。 能够降低map到reduce传输的kv对数量及 reduce最终处理的数据量. 3、Combiner使用限制: 在不改变业务逻辑的情况下才能使用combiner. --例如:求平均值时,就不宜使用 4、Combiner组件父类就是Reducer Combiner是在每一个MapTask所在的节点运行; Reducer是接收全局所有Mappei的输出结果;
1、自定义Compiner类 /** * combiner作用: * 在mapTask进行溢写时,对每一个mapTask输出的数据提前进行局部汇总,减少写进reduceTask的整体数据量 * 注意:自定义Combiner类,属于MapTask阶段(虽然它继承Reducer) */ public class WordCountCombiner extends Reducer<Text, IntWritable,Text,IntWritable> { int count = 0; @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws Exception{ for (IntWritable value : values) { count+=value.get(); } context.write(key,new IntWritable(count)); } }
2、WordCountMapper public class WordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable> { private Text outk = new Text(); private IntWritable outv = new IntWritable(1); @Override protected void map(LongWritable key,Text value,Context context) throws Exception { // 获取输入到的一行数据 String lineData = value.toString(); // 提前分析知道,按照空格进行切割,得到每个单词 String[] splitData = lineData.split(" "); // 遍历数据,将切割得到的数据写出 for (String str : splitData) { // 注意,这里得到的数据类型是String,需要转为Text outk.set(str); context.write(outk,outv); } } }
3、WordCountReduce public class WordCountReduce extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable outv = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws Exception { // 定义一个变量,用来接收遍历中次数汇总 int count = 0; // 直接读取values,获取到迭代器对象中记录的每个单词出现次数 for (IntWritable value : values) { // 因为得到的value对象是IntWritable对象,不可以直接进行加操作,所以要转换为int count += value.get(); //get()方法转为int } // 写出计算之后的数据,对count类型进行转换 outv.set(count); context.write(key,outv); } }
4、WordCountDriver public class WordCountDriver { public static void main(String[] args) throws Exception { // 1、获取job对象 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); // 2、关联jar,配置执行程序时,使用的具体驱动类 job.setJarByClass(WordCountDriver.class); // 3、关联mapper 和 reducer job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReduce.class); // 4、设置mapper的输出的key和value类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 5、设置程序最终输出的key和value类型,如果有reducer // 就写reducer输出的kv类型,如果没有reducer,就写mapper输出的kv类型. job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 设置自定义Combiner类 job.setCombinerClass(WordCountCombiner.class); // job.setCombinerClass(WordCountReduce.class);也能这样用 job.setInputFormatClass(CombineTextInputFormat.class); CombineTextInputFormat.setMaxInputSplitSize(job,4194304); // 6、设置文件的输入和输出路径 FileInputFormat.setInputPaths(job,new Path("D:\\io\\hadooptest\\combineinput")); //要求该路径不能存在,交给mr程序创建 FileOutputFormat.setOutputPath(job,new Path("D:\\io\\hadooptest\\Combineroutput2")); // 7、提交job job.waitForCompletion(true); } }
①:Outputformat是一个接口,其内部定义两个抽象方法 --RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job,String name,Progressable progress): 该方法用来获取RecordWriter对象,主负责数据的写出操作. --void checkOutputSpecs(FileSystem ignored, JobConf job): 该方法用来检测输出路径,当driver中的输出路径存在时,会由该方法的实现类抛出异常 //131行抛出异常("Output directory " + outDir + " already exists") ②:通过ctrl+h 查看当前接口的实现类如下图 --TextOutputFormat(hadoop默认使用的写出方式),按行写出,内部重写了getRecordWriter()方法 --SequenceFileOutputFormat(最终写出的文件是二进制格式) --MultipleOutputFormat(子抽象类,其下还有具体实现方法)
![OutputFormat实现类](https://oscimg.oschina.net/oscnet/up- 777fe19a5bf6864396beac3aa83d8350e9e.png "OutputFormat实现类")
//1、LogMapper public class LogMapper extends Mapper<LongWritable, Text,Text, NullWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws Exception { context.write(value,NullWritable.get()); } }
//2、LogReducer public class LogReducer extends Reducer<Text, NullWritable,Text,NullWritable> { @Override protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws Exception{ for (NullWritable value : values) { context.write(key,NullWritable.get()); } } }
//3、MyOutPutFormat public class MyOutPutFormat extends FileOutputFormat<Text, NullWritable> { /** * 重写getRecordWriter()方法,在内部自定义一个写出类 * @param job * @return * @throws IOException * @throws InterruptedException */ public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job) throws Exception { LogRecordWriter rw = new LogRecordWriter(job.getConfiguration()); return rw; } }
//4、LogRecordWriter /** * 自定义LogRecordWriter对象需要继承RecordWriter类 * * 需求: * 将包含"luck"的日志数据写到 D:/bigtools/luck.log * 将不包含"luck"的日志数据写到 D:/bigtools/other.log */ public class LogRecordWriter extends RecordWriter { // 文件输出路径 private String luckPath = "D:/bigtools/luck.log"; private String otherPath = "D:/bigtools/other.log"; private FSDataOutputStream atguiguOut; private FSDataOutputStream otherOut; private FileSystem fs; /** * 初始化 * @param conf */ public LogRecordWriter(Configuration conf){ try { fs = FileSystem.get(conf); luckOut = fs.create(new Path(luckPath)); otherOut = fs.create(new Path(otherPath)); } catch (IOException e) { e.printStackTrace(); } } /** * 重写write方法 * @param key * @param value * @throws IOException * @throws InterruptedException */ public void write(Object key, Object value) throws Exception { String log = key.toString(); if(log.contains("luck")){ luckOut.writeBytes(log + "\n"); }else{ otherOut.writeBytes(log + "\n"); } } /** * 关闭流 * @param context * @throws IOException * @throws InterruptedException */ public void close(TaskAttemptContext context) throws IOException, InterruptedException { IOUtils.closeStream(luckOut); IOUtils.closeStream(otherOut); } }
//5、LogDriver public class LogDriver { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(LogDriver.class); job.setMapperClass(LogMapper.class); job.setReducerClass(LogReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); // 设置自定义输出 job.setOutputFormatClass(MyOutPutFormat.class); FileInputFormat.setInputPaths(job,new Path("D:\\io\\hadooptest\\loginput")); FileOutputFormat.setOutputPath(job,new Path("D:\\io\\hadooptest\\logoutput")); job.waitForCompletion(true); } }
“Combiner怎么使用”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注亿速云网站,小编将为大家输出更多高质量的实用文章!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。