本篇内容介绍了“Combiner怎么使用”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
1、combiner的工作位置:
kv从缓冲区中溢写到磁盘时可以使用combiner(只要设置,无条件使用)
每个MapTask的所有数据都从缓冲区写到磁盘后,在进行归并时可以使用combiner(满足条件使用,溢写次数>=3)
2、Combiner: 合并
目的就是在每个MapTask中将输出的kv提前进行局部合并。
能够降低map到reduce传输的kv对数量及 reduce最终处理的数据量.
3、Combiner使用限制:
在不改变业务逻辑的情况下才能使用combiner.
--例如:求平均值时,就不宜使用
4、Combiner组件父类就是Reducer
Combiner是在每一个MapTask所在的节点运行;
Reducer是接收全局所有Mappei的输出结果;
1、自定义Compiner类
/**
* combiner作用:
* 在mapTask进行溢写时,对每一个mapTask输出的数据提前进行局部汇总,减少写进reduceTask的整体数据量
* 注意:自定义Combiner类,属于MapTask阶段(虽然它继承Reducer)
*/
public class WordCountCombiner extends Reducer<Text, IntWritable,Text,IntWritable> {
int count = 0;
@Override
protected void reduce(Text key,
Iterable<IntWritable> values, Context context) throws Exception{
for (IntWritable value : values) {
count+=value.get();
}
context.write(key,new IntWritable(count));
}
}
2、WordCountMapper
public class WordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable> {
private Text outk = new Text();
private IntWritable outv = new IntWritable(1);
@Override
protected void map(LongWritable key,Text value,Context context) throws Exception {
// 获取输入到的一行数据
String lineData = value.toString();
// 提前分析知道,按照空格进行切割,得到每个单词
String[] splitData = lineData.split(" ");
// 遍历数据,将切割得到的数据写出
for (String str : splitData) {
// 注意,这里得到的数据类型是String,需要转为Text
outk.set(str);
context.write(outk,outv);
}
}
}
3、WordCountReduce
public class WordCountReduce extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable outv = new IntWritable();
@Override
protected void reduce(Text key,
Iterable<IntWritable> values, Context context) throws Exception {
// 定义一个变量,用来接收遍历中次数汇总
int count = 0;
// 直接读取values,获取到迭代器对象中记录的每个单词出现次数
for (IntWritable value : values) {
// 因为得到的value对象是IntWritable对象,不可以直接进行加操作,所以要转换为int
count += value.get(); //get()方法转为int
}
// 写出计算之后的数据,对count类型进行转换
outv.set(count);
context.write(key,outv);
}
}
4、WordCountDriver
public class WordCountDriver {
public static void main(String[] args) throws Exception {
// 1、获取job对象
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2、关联jar,配置执行程序时,使用的具体驱动类
job.setJarByClass(WordCountDriver.class);
// 3、关联mapper 和 reducer
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReduce.class);
// 4、设置mapper的输出的key和value类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 5、设置程序最终输出的key和value类型,如果有reducer
// 就写reducer输出的kv类型,如果没有reducer,就写mapper输出的kv类型.
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 设置自定义Combiner类
job.setCombinerClass(WordCountCombiner.class);
// job.setCombinerClass(WordCountReduce.class);也能这样用
job.setInputFormatClass(CombineTextInputFormat.class);
CombineTextInputFormat.setMaxInputSplitSize(job,4194304);
// 6、设置文件的输入和输出路径
FileInputFormat.setInputPaths(job,new Path("D:\\io\\hadooptest\\combineinput"));
//要求该路径不能存在,交给mr程序创建
FileOutputFormat.setOutputPath(job,new Path("D:\\io\\hadooptest\\Combineroutput2"));
// 7、提交job
job.waitForCompletion(true);
}
}
①:Outputformat是一个接口,其内部定义两个抽象方法
--RecordWriter<K, V> getRecordWriter(FileSystem ignored,
JobConf job,String name,Progressable progress):
该方法用来获取RecordWriter对象,主负责数据的写出操作.
--void checkOutputSpecs(FileSystem ignored, JobConf job):
该方法用来检测输出路径,当driver中的输出路径存在时,会由该方法的实现类抛出异常
//131行抛出异常("Output directory " + outDir + " already exists")
②:通过ctrl+h 查看当前接口的实现类如下图
--TextOutputFormat(hadoop默认使用的写出方式),按行写出,内部重写了getRecordWriter()方法
--SequenceFileOutputFormat(最终写出的文件是二进制格式)
--MultipleOutputFormat(子抽象类,其下还有具体实现方法)

//1、LogMapper
public class LogMapper extends Mapper<LongWritable, Text,Text, NullWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws Exception {
context.write(value,NullWritable.get());
}
}
//2、LogReducer
public class LogReducer extends Reducer<Text, NullWritable,Text,NullWritable> {
@Override
protected void reduce(Text key,
Iterable<NullWritable> values, Context context) throws Exception{
for (NullWritable value : values) {
context.write(key,NullWritable.get());
}
}
}
//3、MyOutPutFormat
public class MyOutPutFormat extends FileOutputFormat<Text, NullWritable> {
/**
* 重写getRecordWriter()方法,在内部自定义一个写出类
* @param job
* @return
* @throws IOException
* @throws InterruptedException
*/
public RecordWriter<Text, NullWritable>
getRecordWriter(TaskAttemptContext job) throws Exception {
LogRecordWriter rw = new LogRecordWriter(job.getConfiguration());
return rw;
}
}
//4、LogRecordWriter
/**
* 自定义LogRecordWriter对象需要继承RecordWriter类
*
* 需求:
* 将包含"luck"的日志数据写到 D:/bigtools/luck.log
* 将不包含"luck"的日志数据写到 D:/bigtools/other.log
*/
public class LogRecordWriter extends RecordWriter {
// 文件输出路径
private String luckPath = "D:/bigtools/luck.log";
private String otherPath = "D:/bigtools/other.log";
private FSDataOutputStream atguiguOut;
private FSDataOutputStream otherOut;
private FileSystem fs;
/**
* 初始化
* @param conf
*/
public LogRecordWriter(Configuration conf){
try {
fs = FileSystem.get(conf);
luckOut = fs.create(new Path(luckPath));
otherOut = fs.create(new Path(otherPath));
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 重写write方法
* @param key
* @param value
* @throws IOException
* @throws InterruptedException
*/
public void write(Object key, Object value) throws Exception {
String log = key.toString();
if(log.contains("luck")){
luckOut.writeBytes(log + "\n");
}else{
otherOut.writeBytes(log + "\n");
}
}
/**
* 关闭流
* @param context
* @throws IOException
* @throws InterruptedException
*/
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
IOUtils.closeStream(luckOut);
IOUtils.closeStream(otherOut);
}
}
//5、LogDriver
public class LogDriver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(LogDriver.class);
job.setMapperClass(LogMapper.class);
job.setReducerClass(LogReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
// 设置自定义输出
job.setOutputFormatClass(MyOutPutFormat.class);
FileInputFormat.setInputPaths(job,new Path("D:\\io\\hadooptest\\loginput"));
FileOutputFormat.setOutputPath(job,new Path("D:\\io\\hadooptest\\logoutput"));
job.waitForCompletion(true);
}
}
“Combiner怎么使用”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注亿速云网站,小编将为大家输出更多高质量的实用文章!
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
原文链接:https://my.oschina.net/luffycl/blog/4996062