本篇文章给大家分享的是有关MapReduce中怎样实现二次排序,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。
MR的二次排序的需求说明: 在mapreduce操作时,shuffle阶段会多次根据key值排序。但是在shuffle分组后,相同key值的values序列的顺序是不确定的(如下图)。如果想要此时value值也是排序好的,这种需求就是二次排序。
原始数据 无二次排序 有二次排序
a 12 a 12 a 12
b 34 b 34 b 13
c 90 b 23 b 23
b 23 b 13 b 34
b 13 c 90 c 90
根据案例分析,我们要将下面数据key按照abc,value按照大小排序,这也就是一个典型的MR的二次排序的案例,准备原始数据:
a 20
b 20
a 5
c 10
c 8
b 15
a 10
b 18
c 29
b 52
我们想要得到的结果:
a 5
a 10
a 20
b 15
b 18
b 20
b 52
c 8
c 10
c 29
先看方案一的实现思路:
input -> map -><a,20> -> shuffle -> <a,list(10, 5, 20)> -> reduce -> <a,5>
<b,20> <b,list(52, 18, 15, 20)> <a,10>
<a,5> <c,list(29, 8, 10)> <a,20>
<c,10> <b,15>
... <b,18>
<b,20>
...
直接在reduce端对分组后的values进行排序 示例代码:
package com.kfk.hadoop.mr.sort;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
/**
* @author : 蔡政洁
* @email :caizhengjie888@icloud.com
* @date : 2020/10/9
* @time : 7:07 下午
*/
public class SortMR extends Configured implements Tool {
/**
* map
* TODO
*/
public static class TemplateMapper extends Mapper<LongWritable, Text,Text, IntWritable>{
// 创建map输出的对象
private static final Text mapOutKey = new Text();
private static final IntWritable mapOutValue = new IntWritable();
@Override
public void setup(Context context) throws IOException, InterruptedException {
// TODO
}
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 将每一行数据按空格拆开
String[] values = value.toString().split(" ");
// 数据预处理,将数组超过2的过滤掉
if (values.length != 2){
return;
}
mapOutKey.set(values[0]);
mapOutValue.set(Integer.valueOf(values[1]));
context.write(mapOutKey,mapOutValue);
}
@Override
public void cleanup(Context context) throws IOException, InterruptedException {
// TODO
}
}
/**
* reduce
* TODO
*/
public static class TemplateReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
// 创建reduceout端的对象
private static final IntWritable outputValue = new IntWritable();
@Override
public void setup(Context context) throws IOException, InterruptedException {
// TODO
}
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
List<Integer> valueList = new ArrayList<Integer>();
// 取出value
for (IntWritable value:values){
valueList.add(value.get());
}
// 打印出reduce输入的key和valueList
System.out.println("Reduce in == KeyIn: "+key+" ValueIn: "+valueList);
// 进行排序
Collections.sort(valueList);
/*
valueList:表示上面已经排序好的列表,即需要遍历列表中的值作为reduce的输出
key不需要改变,即可作为reduce的输出
*/
for (Integer value : valueList){
outputValue.set(value);
context.write(key,outputValue);
}
}
@Override
public void cleanup(Context context) throws IOException, InterruptedException {
// TODO
}
}
/**
* run
* @param args
* @return
* @throws IOException
* @throws ClassNotFoundException
* @throws InterruptedException
*/
public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 1) get conf
Configuration configuration = this.getConf();
// 2) create job
Job job = Job.getInstance(configuration,this.getClass().getSimpleName());
job.setJarByClass(this.getClass());
// 3.1) input,指定job的输入
Path path = new Path(args[0]);
FileInputFormat.addInputPath(job,path);
// 3.2) map,指定job的mapper和输出的类型
job.setMapperClass(TemplateMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 1.分区
// job.setPartitionerClass();
// 2.排序
// job.setSortComparatorClass();
// 3.combiner -可选项
// job.setCombinerClass(WordCountCombiner.class);
// 4.compress -可配置
// configuration.set("mapreduce.map.output.compress","true");
// 使用的SnappyCodec压缩算法
// configuration.set("mapreduce.map.output.compress.codec","org.apache.hadoop.io.compress.SnappyCodec");
// 5.分组
// job.setGroupingComparatorClass();
// 6.设置reduce的数量
// job.setNumReduceTasks(2);
// 3.3) reduce,指定job的reducer和输出类型
job.setReducerClass(TemplateReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 3.4) output,指定job的输出
Path outpath = new Path(args[1]);
FileOutputFormat.setOutputPath(job,outpath);
// 4) commit,执行job
boolean isSuccess = job.waitForCompletion(true);
// 如果正常执行返回0,否则返回1
return (isSuccess) ? 0 : 1;
}
public static void main(String[] args) {
// 添加输入,输入参数
args = new String[]{
"hdfs://bigdata-pro-m01:9000/user/caizhengjie/datas/secondSort",
"hdfs://bigdata-pro-m01:9000/user/caizhengjie/mr/output"
};
// WordCountUpMR wordCountUpMR = new WordCountUpMR();
Configuration configuration = new Configuration();
try {
// 判断输出的文件存不存在,如果存在就将它删除
Path fileOutPath = new Path(args[1]);
FileSystem fileSystem = FileSystem.get(configuration);
if (fileSystem.exists(fileOutPath)){
fileSystem.delete(fileOutPath,true);
}
// 调用run方法
int status = ToolRunner.run(configuration,new SortMR(),args);
// 退出程序
System.exit(status);
} catch (IOException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
}
运行结果:
a 5
a 10
a 20
b 15
b 18
b 20
b 52
c 8
c 10
c 29
很容易发现,这样把排序工作都放到reduce端完成,当values序列长度非常大的时候,会对CPU和内存造成极大的负载。
注意的地方(容易被“坑”) 在reduce端对values进行迭代的时候,不要直接存储value值或者key值,因为reduce方法会反复执行多次,但key和value相关的对象只有两个,reduce会反复重用这两个对象。需要用相应的数据类型.get()取出后再存储。
方案二的解决思路:
原始数据 自定义newkey 在shuffle中排序 reduce输入 reduce输出
a 12 a#12,12 a#12,12
b 34 b#34,34 b#13,13
c 90 -> map -> c#90,90 b#23,23 b#,List(13,23,34)-> reduce -> b,13 b,23 b,34
b 23 b#23,23 b#34,34
b 13 b#13,13 c#90,90
我们可以把key和value联合起来作为新的key,记作newkey。这时,newkey含有两个字段,假设分别是k,v。这里的k和v是原来的key和value。原来的value还是不变。这样,value就同时在newkey和value的位置。我们再实现newkey的比较规则,先按照key排序,在key相同的基础上再按照value排序。在分组时,再按照原来的key进行分组,就不会影响原有的分组逻辑了。最后在输出的时候,只把原有的key、value输出,就可以变通的实现了二次排序的需求。
需要自定义的地方 1.自定义数据类型实现组合key 实现方式:继承WritableComparable 2.自定义partioner,形成newKey后保持分区规则任然按照key进行。保证不打乱原来的分区。 实现方式:继承Partitioner 3.自定义分组,保持分组规则任然按照key进行。不打乱原来的分组 实现方式:继承RawComparator
自定义数据类型代码:
package com.kfk.hadoop.mr.secondsort;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* @author : 蔡政洁
* @email :caizhengjie888@icloud.com
* @date : 2020/10/15
* @time : 6:16 下午
*/
public class PairWritable implements WritableComparable<PairWritable> {
// 组合key:a#12,12
private String first;
private int second;
public PairWritable() {
}
public PairWritable(String first, int second) {
this.set(first,second);
}
/**
* 方便设置字段
*/
public void set(String first, int second){
this.first = first;
this.second = second;
}
public String getFirst() {
return first;
}
public void setFirst(String first) {
this.first = first;
}
public int getSecond() {
return second;
}
public void setSecond(int second) {
this.second = second;
}
/**
* 重写比较器
*/
public int compareTo(PairWritable o) {
int comp = this.getFirst().compareTo(o.getFirst());
if (0 == comp){
// 若第一个字段相等,则比较第二个字段
return Integer.valueOf(this.getSecond()).compareTo(o.getSecond());
}
return comp;
}
/**
* 序列化
*/
public void write(DataOutput out) throws IOException {
out.writeUTF(first);
out.writeInt(second);
}
/**
* 反序列化
*/
public void readFields(DataInput in) throws IOException {
this.first = in.readUTF();
this.second = in.readInt();
}
@Override
public String toString() {
return "PairWritable{" +
"first='" + first + '\'' +
", second=" + second +
'}';
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
PairWritable that = (PairWritable) o;
if (second != that.second) return false;
return first != null ? first.equals(that.first) : that.first == null;
}
@Override
public int hashCode() {
int result = first != null ? first.hashCode() : 0;
result = 31 * result + second;
return result;
}
}
自定义分区代码:
package com.kfk.hadoop.mr.secondsort;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;
/**
* @author : 蔡政洁
* @email :caizhengjie888@icloud.com
* @date : 2020/10/15
* @time : 7:09 下午
*/
public class FristPartitioner extends Partitioner<PairWritable, IntWritable> {
public int getPartition(PairWritable key, IntWritable intWritable, int numPartitions) {
/*
* 默认的实现 (key.hashCode() & Integer.MAX_VALUE) % numPartitions
* 让key中first字段作为分区依据
*/
return (key.getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions;
}
}
自定义分组比较器代码:
package com.kfk.hadoop.mr.secondsort;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.WritableComparator;
/**
* @author : 蔡政洁
* @email :caizhengjie888@icloud.com
* @date : 2020/10/15
* @time : 6:59 下午
*/
public class FristGrouping implements RawComparator<PairWritable> {
/*
* 字节比较
* bytes1,bytes2为要比较的两个字节数组
* i,i1表示第一个字节数组要进行比较的收尾位置,i2,i3表示第二个
* 从第一个字节比到组合key中second的前一个字节,因为second为int型,所以长度为4
*/
public int compare(byte[] bytes1, int i, int i1, byte[] bytes2, int i2, int i3) {
return WritableComparator.compareBytes(bytes1,0,i1-4,bytes2,0,i3-4);
}
/*
* 对象比较
*/
public int compare(PairWritable o1, PairWritable o2) {
return o1.getFirst().compareTo(o2.getFirst());
}
}
二次排序实现代码:
package com.kfk.hadoop.mr.secondsort;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
/**
* @author : 蔡政洁
* @email :caizhengjie888@icloud.com
* @date : 2020/10/9
* @time : 7:07 下午
*/
public class SecondSortMR extends Configured implements Tool {
/**
* map
* TODO
*/
public static class TemplateMapper extends Mapper<LongWritable, Text,PairWritable, IntWritable>{
// 创建map输出的对象
private static final PairWritable mapOutKey = new PairWritable();
private static final IntWritable mapOutValue = new IntWritable();
@Override
public void setup(Context context) {
// TODO
}
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 将每一行数据按空格拆开
String[] values = value.toString().split(" ");
// 数据预处理,将数组超过2的过滤掉
if (values.length != 2){
return;
}
mapOutKey.set(values[0],Integer.parseInt(values[1]));
mapOutValue.set(Integer.parseInt(values[1]));
context.write(mapOutKey,mapOutValue);
System.out.println("Map out == KeyOut: "+mapOutKey+" ValueOut: "+mapOutValue);
}
@Override
public void cleanup(Context context) {
// TODO
}
}
/**
* reduce
* TODO
*/
public static class TemplateReducer extends Reducer<PairWritable,IntWritable,Text,IntWritable>{
// 创建reduce output端的对象
private static final IntWritable outputValue = new IntWritable();
private static final Text outputKey = new Text();
@Override
public void setup(Context context) throws IOException, InterruptedException {
// TODO
}
@Override
public void reduce(PairWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
/*
values表示reduce端输入已经排序好的列表,即需要遍历values每一个值作为reduce输出即可
key表示为自定义的key(newkey),即需要取出newkey的第一部分,也就是原始的key,作为reduce的输出
*/
for (IntWritable value:values){
outputKey.set(key.getFirst());
context.write(outputKey,value);
}
}
@Override
public void cleanup(Context context) throws IOException, InterruptedException {
// TODO
}
}
/**
* run
* @param args
* @return
* @throws IOException
* @throws ClassNotFoundException
* @throws InterruptedException
*/
public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 1) get conf
Configuration configuration = this.getConf();
// 2) create job
Job job = Job.getInstance(configuration,this.getClass().getSimpleName());
job.setJarByClass(this.getClass());
// 3.1) input,指定job的输入
Path path = new Path(args[0]);
FileInputFormat.addInputPath(job,path);
// 3.2) map,指定job的mapper和输出的类型
job.setMapperClass(TemplateMapper.class);
job.setMapOutputKeyClass(PairWritable.class);
job.setMapOutputValueClass(IntWritable.class);
// 1.分区
job.setPartitionerClass(FristPartitioner.class);
// 2.排序
// job.setSortComparatorClass();
// 3.combiner -可选项
// job.setCombinerClass(WordCountCombiner.class);
// 4.compress -可配置
// configuration.set("mapreduce.map.output.compress","true");
// 使用的SnappyCodec压缩算法
// configuration.set("mapreduce.map.output.compress.codec","org.apache.hadoop.io.compress.SnappyCodec");
// 5.分组
job.setGroupingComparatorClass(FristGrouping.class);
// 6.设置reduce的数量
// job.setNumReduceTasks(2);
// 3.3) reduce,指定job的reducer和输出类型
job.setReducerClass(TemplateReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 3.4) output,指定job的输出
Path outpath = new Path(args[1]);
FileOutputFormat.setOutputPath(job,outpath);
// 4) commit,执行job
boolean isSuccess = job.waitForCompletion(true);
// 如果正常执行返回0,否则返回1
return (isSuccess) ? 0 : 1;
}
public static void main(String[] args) {
// 添加输入,输入参数
args = new String[]{
"hdfs://bigdata-pro-m01:9000/user/caizhengjie/datas/secondSort",
"hdfs://bigdata-pro-m01:9000/user/caizhengjie/mr/output"
};
// WordCountUpMR wordCountUpMR = new WordCountUpMR();
Configuration configuration = new Configuration();
try {
// 判断输出的文件存不存在,如果存在就将它删除
Path fileOutPath = new Path(args[1]);
FileSystem fileSystem = FileSystem.get(configuration);
if (fileSystem.exists(fileOutPath)){
fileSystem.delete(fileOutPath,true);
}
// 调用run方法
int status = ToolRunner.run(configuration,new SecondSortMR(),args);
// 退出程序
System.exit(status);
} catch (IOException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
}
运行结果:
a 5
a 10
a 20
b 15
b 18
b 20
b 52
c 8
c 10
c 29
以上就是MapReduce中怎样实现二次排序,小编相信有部分知识点可能是我们日常工作会见到或用到的。希望你能通过这篇文章学到更多知识。更多详情敬请关注亿速云行业资讯频道。
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
原文链接:https://my.oschina.net/u/4451162/blog/4681913