这篇文章主要为大家展示了“Hadoop中如何实现分组”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“Hadoop中如何实现分组”这篇文章吧。
package grounp;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* 自定义分组
* 初始结果:
* 3 3
* 3 2
* 3 1
* 2 2
* 2 1
* 1 1
* 输出结果:
1 1
2 2
3 3
* @author Xr
*
*/
public class groupApp {
public static final String INPUT_PATH = "hdfs://hadoop:9000/data";
public static final String OUTPUT_PATH = "hdfs://hadoop:9000/datas";
public static void main(String[] args)throws Exception{
Configuration conf = new Configuration();
existsFile(conf);
Job job = new Job(conf, groupApp.class.getName());
FileInputFormat.setInputPaths(job, INPUT_PATH);
job.setMapperClass(MyMapper.class);
//自定义键
job.setMapOutputKeyClass(NewKey.class);
job.setMapOutputValueClass(LongWritable.class);
//自定义分组
job.setGroupingComparatorClass(NewGroupCompator.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(LongWritable.class);
FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
job.waitForCompletion(true);
}
private static void existsFile(Configuration conf) throws IOException,
URISyntaxException {
FileSystem fs = FileSystem.get(new URI(OUTPUT_PATH),conf);
if(fs.exists(new Path(OUTPUT_PATH))){
fs.delete(new Path(OUTPUT_PATH),true);
}
}
}
class MyMapper extends Mapper<LongWritable, Text, NewKey, LongWritable>{
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String string = value.toString();
String[] split = string.split("\t");
NewKey k2 = new NewKey();
k2.set(Long.parseLong(split[0]),Long.parseLong(split[1]));
context.write(k2, new LongWritable(Long.parseLong(split[1])));
}
}
class MyReducer extends Reducer<NewKey, LongWritable, LongWritable, LongWritable>{
@Override
protected void reduce(NewKey key2, Iterable<LongWritable> values,Context context)
throws IOException, InterruptedException {
long max = Long.MIN_VALUE;
for(LongWritable v2 : values){
long l = v2.get();
if(l>max){
max = l;
}
}
context.write(new LongWritable(key2.first),new LongWritable(max));
}
}
class NewKey implements WritableComparable<NewKey>{
long first;
long second;
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(this.first);
out.writeLong(this.second);
}
public void set(long parseLong, long parseLong2) {
this.first = parseLong;
this.second = parseLong2;
}
@Override
public void readFields(DataInput in) throws IOException {
this.first = in.readLong();
this.second = in.readLong();
}
@Override
public int compareTo(NewKey o) {
if(this.first==o.first){
if(this.second < o.second){
return -1;
}else if(this.second == o.second){
return 0;
}else{
return 1;
}
}else{
if(this.first < o.first){
return -1;
}else{
return 1;
}
}
}
}
class NewGroupCompator implements RawComparator<NewKey>{
@Override
public int compare(NewKey o1, NewKey o2) {
return 0;
}
/**
* 比较字节数组中指定的字节序列的大小
* @param b1 第一个参与比较的字节数组
* @param s1 第一个参与比较的字节数组的开始位置
* @param l1 第一个参与比较的字节数组的字节长度
* @param b2 第二个参与比较的字节数组
* @param s2 第二个参与比较的字节数组的开始位置
* @param l2 第二个参与比较的字节数组的字节长度
* @return
*/
@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
return WritableComparator.compareBytes(b1, s1, 8, b2, s2, 8);
}
}
以上是“Hadoop中如何实现分组”这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注亿速云行业资讯频道!
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
原文链接:https://my.oschina.net/Xiao629/blog/205181