昨天听朋友说了一个题目,具体的题目忘了! 有数据是这样的:
<1,0> <2,8> <1,9> <2,7> <1,0> <3,15> <5,20> <3,25> <4,20> <3,50>
要得到结果试着样的:
1 2 2 2 3 3 4 1 5 1
对左侧数据的统计,对右侧数据的去重; 当左侧相同时,右侧也相同,之记录一次;当左侧相同,右侧不同,左侧数据次数累加; 当左侧不相同,右侧也不相同时候,左侧数据累加统计。
了解过大意以后发现这个就是对数据的去重统计的一个小测试! 思路就不写了,跟着代码随意遐想,代码仅限上述情况:
package com.amir.test;
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
public class MapReducer_MulTask {
public static class Ma***emovingMap extends MapReduceBase implements
Mapper<Object, Text, Text, Text> {
private Text line = new Text();
public void map(Object key, Text value,
OutputCollector<Text, Text> output, Reporter reporter)
throws IOException {
line = value;
output.collect(line, new Text(""));
}
}
public static class Ma***emovingReduce extends MapReduceBase implements
Reducer<Text, IntWritable, Text, Text> {
public void reduce(Text key, Iterator<IntWritable> value,
OutputCollector<Text, Text> output, Reporter reporter)
throws IOException {
output.collect(key, new Text(""));
}
}
public static class StatisticsMap extends MapReduceBase implements
Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
String[] temp = itr.nextToken().split(",");
String akey = temp[0].replace("<", "");
word.set(akey);
output.collect(word, one);
}
}
}
public static class StatisticsReduce extends MapReduceBase implements
Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterator<IntWritable> value,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
int sum = 0;
while (value.hasNext()) {
IntWritable val = value.next();
sum += val.get();
}
result.set(sum);
output.collect(key, result);
}
}
public static void TaskMa***emoving() throws IOException{
String[] param = { "/test/testw/ss", "/test/testw/woutput" };
Configuration conf = new Configuration();
JobConf jobconf = new JobConf(conf, MapReducer_MulTask.class);
jobconf.setJobName("TaskMa***emoving");
jobconf.setJarByClass(MapReducer_MulTask.class);
jobconf.setMapperClass(Ma***emovingMap.class);
jobconf.setCombinerClass(Ma***emovingReduce.class);
jobconf.setReducerClass(Ma***emovingReduce.class);
jobconf.setOutputKeyClass(Text.class);
jobconf.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(jobconf, new Path(param[0]));
FileOutputFormat.setOutputPath(jobconf, new Path(param[1]));
JobClient.runJob(jobconf).waitForCompletion();
}
public static void TaskStatistics() throws IOException{
String[] param = {"/test/testw/woutput/part-00000","/test/testw/woutput/wordcount"};
Configuration conf = new Configuration();
JobConf jobconf = new JobConf(conf, MapReducer_MulTask.class);
jobconf.setJobName("TaskStatistics");
jobconf.setJarByClass(MapReducer_MulTask.class);
jobconf.setMapperClass(StatisticsMap.class);
jobconf.setCombinerClass(StatisticsReduce.class);
jobconf.setReducerClass(StatisticsReduce.class);
jobconf.setOutputKeyClass(Text.class);
jobconf.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(jobconf, new Path(param[0]));
FileOutputFormat.setOutputPath(jobconf, new Path(param[1]));
JobClient.runJob(jobconf).waitForCompletion();
}
public static void main(String[] args) throws IOException {
try {
MapReducer_MulTask.TaskMa***emoving(); // 01
MapReducer_MulTask.TaskStatistics(); // 02
System.out.println("OK!");
} catch (Exception e) {
e.printStackTrace();
}
}
}
主要对MapReducer 基本使用的测试!!!!
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。