[TOC]
编写MapReduce的程序有至少三个必不可少的部分:mapper,reducer,driver。可选的有 partitioner,combiner
而且mapper的输入输出、reducer的输入输出都是key value型的,所以要求我们在编写mapper和reducer时,必须实现明确这4个键值对中的8种数据类型,而且必须还是hadoop的可序列化类型。同时还需要注意的是,map的输出其实就是reduce的输入,所以包括的数据类型是一样的。
编写基本流程
1)自定义map类,需要继承 Mapper这个类
2)继承Mapper 的时候,需要指定输入和输出的键值对中的类型
3)必须重写继承自父类的map() 方法
4)上面重写的map() 方法是每个map task对每一个输入到mapper中的键值对都会调用处理一次。
基本编写实例如下:
/*
指定Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> 这4个类型分别为:
LongWritable, Text, Text, IntWritable,相当于普通类型:
long,string,string,int
*/
public class TestMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
这里是map方法 处理逻辑
}
}
基本编写流程
1)自定义reduce类,需要继承 Reducer这个类
2)继承Reducer的时候,需要指定输入和输出的键值对中的类型
3)必须重写继承自父类的reduce() 方法
4)上面重写的reduce() 方法是每个reduer task对每一个输入到reducer中的键值对都会调用处理一次。
基本编写实例如下:
/*
指定Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> 这4个类型分别为:
Text, IntWritable, Text, IntWritable,相当于普通类型:
string,int,string,int
*/
public class TestReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
protected void reduce(Text key,
Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
这里是reduce方法 处理逻辑
}
}
这个部分是用于配置job对象的各种必须配置信息,配置完成后,将job提交给yarn执行
具体配置啥下面直接上例子看好了。主要起到调度map和reduce任务执行的作用
这个阶段主要是对map阶段的输出进行分区,而map的分区数直接决定reduce task的数量(一般来说是一对一),编写流程如下:
1)自定义分区类,继承 Partitioner<key, value>
2)继承Partitioner的时候,处理的输入的键值对类型
3)必须重写继承自父类的getPartition() 方法
4)上面重写的getPartition() () 方法是每个maptask对每一个输入的键值对都会调用处理一次。
5)根据分区规则,返回0~n,表示分区格式为0~n
编写案例如下:
public class WordCountPartitioner extends Partitioner<Text, IntWritable> {
@Override
public int getPartition(Text text, IntWritable intWritable, int i) {
判断条件1:
return 0;
判断条件2:
return 1;
.......
return n;
}
}
combiner不是一个独立的阶段,它其实是包含在map阶段中的。map本身输出的键值对中,每个键值对的value都是1,就算是一样的key,也是独立一个键值对。如果重复的键值对越多,那么将map输出传递到reduce的过程中,就会占用很多带宽资源。优化的方法就是每个map输出时,先在当前map task下进行局部合并汇总,减少重复可以的出现。即
<king,1> <>king,1> 这种一样的key的,就会合并成 <king,2>
这样就会减少传输的数据量
所以其实由此可以知道,其实combiner的操作和reduce的操作是一样的,只不过一个是局部,一个是全局。简单的做法就是,直接将reducer作为combiner类传入job,如:
job.setCombinerClass(WordCountReducer.class);
我们可以看看这个方法的源码:
public void setCombinerClass(Class<? extends Reducer> cls) throws IllegalStateException {
this.ensureState(Job.JobState.DEFINE);
//看到没,那个 Reducer.class
this.conf.setClass("mapreduce.job.combine.class", cls, Reducer.class);
}
可以清楚看到设置combine class时,可以看到多态的类型设置就是 Reducer 类型的,从这里也可以更加确定 combiner 的操作和 reducer的就是一样的。
下面开始用wordcount作为例子编写一个完整的MapReduce程序
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
//setup 和 clean 方法不是必须的
@Override
protected void setup(Context context) throws IOException, InterruptedException {
//最先执行
//System.out.println("this is setup");
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
//执行完map之后执行
//System.out.println("this is cleanup");
}
//这里创建一个临时对象,用于保存中间值
Text k = new Text();
IntWritable v = new IntWritable();
/**
*
*
* @param key
* @param value
* @param context 用于连接map和reduce上下文,通过这个对象传递map的结果给reduce
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//System.out.println("开始map=====================");
//1.value是读取到的一行字符串,要将其转换为java中的string进行处理,即反序列化
String line = value.toString();
//2.切分数据
String[] words = line.split(" ");
//3.输出map结构, <单词,个数>的形式,写入的时候需将普通类型转为序列化类型
/**
* 两种写法:
* 1) context.write(new Text(word), new IntWritable(1));
* 缺点:每次都会创建两个对象,最后会造成创建了很多临时对象
*
* 2)Text k = new Text();
* IntWritable v = new IntWritable();
*
* for {
* k.set(word);
* v.set(1);
* context.write(k, v);
* }
*
* 这种方法好处就是,对象只创建了一次,后续只是通过修改对象内部的值的方式传递,无需重复创建多个对象
*/
for (String word:words) {
//转换普通类型为可序列化类型
k.set(word);
v.set(1);
//写入到上下文对象中
context.write(k, v);
}
}
}
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
/**
* 这里的 Iterable<IntWritable> values 之所以是一个可迭代的对象,
* 是因为从map传递过来的数据经过合并了,如:
* (HDFS,1),(HDFS,1)合并成 (HDFS,[1,1]) 这样的形式,所以value可以通过迭代方式获取其中的值
*
*/
IntWritable counts = new IntWritable();
@Override
protected void reduce(Text key,
Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
//1.初始化次数
int count = 0;
//2.汇总同一个key中的个数
for (IntWritable value: values) {
count += value.get();
}
//3.输出reduce
counts.set(count);
context.write(key, counts);
}
}
public class WordCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//这里只是方便在ide下直接运行,如果是在命令行下直接输入输入和输出文件路径即可
args = new String[]{"G:\\test2\\", "G:\\testmap6\\"};
//1.获取配置对象
Configuration conf = new Configuration();
//2.获取job对象
Job job = Job.getInstance(conf);
//3.分别给job指定driver,map,reducer的类
job.setJarByClass(WordCountDriver.class);
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
//4.分别指定map和reduce阶段输出的类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//这里可以设置分区类,需要额外编写分区实现类
// job.setPartitionerClass(WordCountPartitioner.class);
// job.setNumReduceTasks(2);
//设置预合并类
//job.setCombinerClass(WordCountReducer.class);
//设置inputFormat类,大量小文件优化,不设置默认使用 TextInputFormat
job.setInputFormatClass(CombineTextInputFormat.class);
CombineTextInputFormat.setMaxInputSplitSize(job,3* 1024 * 1024);
CombineTextInputFormat.setMinInputSplitSize(job, 2 * 1024 * 1024);
//5.数据输入来源以及结果的输出位置
// 输入的时候会根据数据源的情况自动map切片,形成切片信息(或者叫切片方案)
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//以上就是将一个job的配置信息配置完成后,下面就提交job,hadoop将跟就job的配置执行job
//6.提交job任务,这个方法相当于 job.submit()之后,然后等待执行完成
//任务配置信息是提交至yarn的 MRappmanager
job.waitForCompletion(true);
}
}
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。