这篇文章给大家分享的是有关hadoop如何自定义格式化输出的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。
import java.io.IOException;
import java.net.URI;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class CustomizeOutputFormat {
static final Log LOG = LogFactory.getLog(CustomizeOutputFormat.class);
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(CustomizeOutputFormat.class);
job.setMapperClass(CustMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//此处这只自定义的格式化输出
job.setOutputFormatClass(CustOutputFormat.class);
String jobName = "Customize outputformat test!";
job.setJobName(jobName);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean b = job.waitForCompletion(true);
if(b) {
LOG.info("Job "+ jobName +" is done.");
}else {
LOG.info("Job "+ jobName +"is going wrong,now exit.");
System.exit(0);
}
}
}
class CustMapper extends Mapper<LongWritable, Text, Text, Text>{
String[] textIn = null;
Text outkey = new Text();
Text outvalue = new Text();
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
/**
* 假设文件的内容为如下:
* boys girls
* firends goodbye
* down up
* fly to
* neibors that
*
*/
textIn = value.toString().split("\t");
outkey.set(textIn[0]);
outvalue.set(textIn[1]);
context.write(outkey, outvalue);
}
}
//自定义OutoutFormat
class CustOutputFormat extends FileOutputFormat<Text, Text>{
@Override
public RecordWriter<Text, Text> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
//获得configration
Configuration conf = context.getConfiguration();
//获得FileSystem
FileSystem fs = FileSystem.newInstance(conf);
//获得输出路径
Path path = CustOutputFormat.getOutputPath(context);
URI uri = path.toUri();
//创建两个文件,得到写入流
FSDataOutputStream foa = fs.create(new Path(uri.toString()+"/out.a"));
FSDataOutputStream fob = fs.create(new Path(uri.toString()+"/out.b"));
//创建自定义RecordWriter 传入 两个流
CustRecordWriter rw = new CustRecordWriter(foa,fob);
return rw;
}
class CustRecordWriter extends RecordWriter<Text, Text>{
FSDataOutputStream foa = null;
FSDataOutputStream fob = null;
CustRecordWriter(FSDataOutputStream foa,FSDataOutputStream fob){
this.foa = foa;
this.fob = fob;
}
@Override
public void write(Text key, Text value) throws IOException, InterruptedException {
String mText = key.toString();
//根据可以长度的不同分别输入到不同的文件
if(mText.length()>=5) {
foa.writeUTF(mText+"\t"+value.toString()+"\n");
}else {
fob.writeUTF(mText+"\t"+value.toString()+"\n");
}
}
@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
//最后将两个写入流关闭
if(foa!=null) {
foa.close();
}
if(fob!=null) {
fob.close();
}
}
}
}
//使用MultipleInputs,c处理多个来源的文件
package hgs.multipuleinput;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import hgs.custsort.SortBean;
import hgs.custsort.SortDriver;
import hgs.custsort.SortMapper;
import hgs.custsort.SortReducer;
public class MultipuleInputDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(SortDriver.class);
job.setMapperClass(SortMapper.class);
job.setReducerClass(SortReducer.class);
job.setOutputKeyClass(SortBean.class);
job.setOutputValueClass(NullWritable.class);
MultipleInputs.addInputPath(job, new Path("/sort"), TextInputFormat.class,SortMapper.class);
MultipleInputs.addInputPath(job, new Path("/sort1"), TextInputFormat.class,SortMapper.class);
//FileInputFormat.setInputPaths(job, new Path("/sort"));
FileOutputFormat.setOutputPath(job, new Path("/sortresult"));
System.exit(job.waitForCompletion(true)==true?0:1);
}
}
感谢各位的阅读!关于“hadoop如何自定义格式化输出”这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,让大家可以学到更多知识,如果觉得文章不错,可以把它分享出去让更多的人看到吧!
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
原文链接:http://blog.itpub.net/31506529/viewspace-2213389/