本篇内容介绍了“cdh3u3 hadoop 0.20.2 MultipleOutputs多输出文件怎么实现”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
1.新建一个multest.txt文件
11111,username,password,22,河北师范大学,软件学院,2008 11112,username,password,22,河北师范大学,计算机学院,2008 11113,username,password,22,xx大学,软件学院,2008 11114,username,password,22,xxx大学,计算机学院,2008 11115,username,password,23,2008
2.在hdfs上新建一个目录,hadoop dfs -mkdir multest
3.将新建到文本文件上传到multest目录下:hadoop dfs -put /home/wjk/hadoop/multest.txt multest
4.新建Map/Reduce工程,将格式不符合(7位)到保存到dirtydata中,将河北师范大学软件学院以外到数据保存到otherschool中,将河北师范大学软件学院到数据保存到默认文件中。
public class Multest {
public static class MultestMapper extends
Mapper<Object, Text, Text, NullWritable> {
private Text outkey = new Text("");
private MultipleOutputs<Text, NullWritable> mos;
protected void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String details[] = line.split(",");
if (details.length != 7) {
outkey.set(line);
mos.write("dirtydata", outkey, NullWritable.get());
} else {
String school = details[4];
String college = details[5];
if (school.equals("河北师范大学") && college.equals("软件学院")) {
outkey.set(line);
context.write(outkey, NullWritable.get());
} else {
outkey.set(line);
mos.write("otherschool", outkey, NullWritable.get());
}
}
}
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
mos = new MultipleOutputs<Text, NullWritable>(context);
super.setup(context);
}
@Override
protected void cleanup(Context context) throws IOException,
InterruptedException {
mos.close();
super.cleanup(context);
}
}
public static class MultestReducer extends
Reducer<Text, NullWritable, Text, NullWritable> {
protected void reduce(Text key, Iterable<NullWritable> values,
Context context) throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "multest");
job.setJarByClass(Multest.class);
job.setMapperClass(MultestMapper.class);
job.setReducerClass(MultestReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
MultipleOutputs.addNamedOutput(job, "dirtydata",
TextOutputFormat.class, Text.class, NullWritable.class);
MultipleOutputs.addNamedOutput(job, "otherschool",
TextOutputFormat.class, Text.class, NullWritable.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
5.编译,导出jar,运行:hadoop jar ./../multest.jar com.wjk.test.Multest multest multestout
6.运行截图
=======注意==========================
缺陷:集群上运行会有多个分散的文件
补充:按上述的写法产生的文件很多,合并很难,可以执行输出目录,合并的话按目录getmerge就容易了。主要修改点在mos.write上,参考官方代码,很简单,自行领悟吧。
public <K, V> void write(String namedOutput, K key, V value) throws IOException, InterruptedException {
write(namedOutput, key, value, namedOutput);
}
public <K, V> void write(String namedOutput, K key, V value,String baseOutputPath) throws IOException, InterruptedException {
checkNamedOutputName(this.context, namedOutput, false);
checkBaseOutputPath(baseOutputPath);
if (!(this.namedOutputs.contains(namedOutput))) {
throw new IllegalArgumentException("Undefined named output '" + namedOutput + "'");
}
TaskAttemptContext taskContext = getContext(namedOutput);
getRecordWriter(taskContext, baseOutputPath).write(key, value);
}
“cdh3u3 hadoop 0.20.2 MultipleOutputs多输出文件怎么实现”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注亿速云网站,小编将为大家输出更多高质量的实用文章!
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
原文链接:https://my.oschina.net/wangjiankui/blog/49521