本例子采用hadoop1.1.2版本,附件中有例子的数据文件
采用气象数据作为处理数据
1、MultipleOutputs例子,具体解释在代码中有注释
package StationPatitioner;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapred.lib.MultipleOutputs;
import org.apache.hadoop.mapred.lib.NullOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* hadoop Version 1.1.2
* MultipleOutputs例子
* @author 巧克力黑
*
*/
public class PatitionByStationUsingMultipleOutputs extends Configured implements Tool {
enum Counter
{
LINESKIP, //出错的行
}
static class StationMapper extends MapReduceBase implements Mapper<LongWritable , Text, Text , Text>{
private NcdcRecordParser parser = new NcdcRecordParser();
@Override
public void map(LongWritable key, Text value,
OutputCollector<Text, Text> output, Reporter reporter)
throws IOException {
try {
parser.parse(value);
output.collect(new Text(parser.getStationid()), value);
} catch (Exception e) {
reporter.getCounter(Counter.LINESKIP).increment(1); //出错令计数器+1
}
}
}
static class MultipleOutputReducer extends MapReduceBase implements Reducer<Text, Text, NullWritable, Text>{
private MultipleOutputs multipleOutputs;
@Override
public void configure(JobConf jobconf) {
multipleOutputs = new MultipleOutputs(jobconf);//初始化一个MultipleOutputs
}
@Override
public void reduce(Text key, Iterator<Text> values,
OutputCollector<NullWritable, Text> output, Reporter reporter)
throws IOException {
//得到OutputCollector
OutputCollector collector = multipleOutputs.getCollector("station", key.toString().replace("-", ""), reporter);
while(values.hasNext()){
collector.collect(NullWritable.get(), values.next());//MultipleOutputs用OutputCollector输出数据
}
}
@Override
public void close() throws IOException {
multipleOutputs.close();
}
}
@Override
public int run(String[] as) throws Exception {
System.setProperty("HADOOP_USER_NAME", "root");//windows下用户与linux用户不一直,采用此方法避免报Permission相关错误
JobConf conf = new JobConf();
conf.setMapperClass(StationMapper.class);
conf.setReducerClass(MultipleOutputReducer.class);
conf.setMapOutputKeyClass(Text.class);
conf.setOutputKeyClass(NullWritable.class);
conf.setOutputFormat(NullOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path("hdfs://ubuntu:9000/sample1.txt"));//input路径
FileOutputFormat.setOutputPath(conf, new Path("hdfs://ubuntu:9000/temperature"));//output路径
MultipleOutputs.addMultiNamedOutput(conf, "station", TextOutputFormat.class, NullWritable.class, Text.class);
JobClient.runJob(conf);
return 0;
}
public static void main(String[] args) throws Exception{
int exitCode = ToolRunner.run(new PatitionByStationUsingMultipleOutputs(), args);
System.exit(exitCode);
}
}
2、解析气象数据的类
package StationPatitioner;
import org.apache.hadoop.io.Text;
public class NcdcRecordParser {
private static final int MISSING_TEMPERATURE = 9999;
private String year;
private int airTemperature;
private String quality;
private String stationid;
public void parse(String record) {
stationid = record.substring(0, 5);
year = record.substring(15, 19);
String airTemperatureString;
// Remove leading plus sign as parseInt doesn't like them
if (record.charAt(87) == '+') {
airTemperatureString = record.substring(88, 92);
} else {
airTemperatureString = record.substring(87, 92);
}
airTemperature = Integer.parseInt(airTemperatureString);
quality = record.substring(92, 93);
}
public String getStationid(){
return stationid;
}
public void parse(Text record) {
parse(record.toString());
}
public boolean isValidTemperature() {
return airTemperature != MISSING_TEMPERATURE && quality.matches("[01459]");
}
public String getYear() {
return year;
}
public int getAirTemperature() {
return airTemperature;
}
}
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。