p>首先编写WordCountDriver:
package com.jym.hadoop.mr.demo;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* 这个程序相当于一个yarn集群的客户端,
* 需要在此封装我们的mr程序的相关运行参数,指定jar包,
* 最后提交给yarn
* */
public class WordcountDriver
{
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException
{
Configuration conf=new Configuration();
/*其实如果在本地运行MR程序其实不用配置下面的代码程序,在MR默认下就是本地运行*/
/**下面这段代码配置的是在本地模式下运行MR程序*/
/**是否运行为本地模式,就是看这个参数值是否为local,默认就是local;*/
//conf.set("mapreduce.framework.name", "local"); //在本地运行MR程序
//本地模式运行MR程序时,输入输出的数据可以在本地,也可以在hdfs上
//到底在哪里,就看以下两行配置用哪一行了,默认是“file:///”
/**conf.set("fs.defaultFS", "hdfs://hadoop1:9000");*/ //使用的是HDFS系统
//conf.set("fs.defaultFS", "file:///"); //使用的是本地Windows磁盘
/**运行集群模式,就是把程序提交到yarn中去运行
* 要想运行为集群模式,以下3个参数要指定为集群上的值
* */
conf.set("mapreduce.framework.name", "yarn");
conf.set("yarn.resourcemanager.hostname", "hadoop1");
conf.set("fs.defaultFS", "hdfs://hadoop1:9000");
Job job = Job.getInstance(conf);
/**要想在Windows的Eclipse上运行程序,并提交到hadoop的YARN集群上需要指定jar包,如下:*/
/**job.setJar("c:/wc.jar");*/
//job.setJar("/home/hadoop/wc.jar"); //这种是将程序打包成jar包,放到指定的位置,缺乏灵活性,不建议使用;
//指定本程序的jar包所在的本地路径
job.setJarByClass(WordcountDriver.class);
//指定本业务job要使用的mapper/reducer业务类
job.setMapperClass(WordcountMapper.class);
job.setReducerClass(WordcountReducerr.class);
//指定mapper输出数据的kv类型;
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//指定最终输出的数据的kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//指定需要使用的combiner,以及用哪一个类作为combiner的逻辑
/*job.setCombinerClass(WordcountCombiner.class);*/
job.setCombinerClass(WordcountReducerr.class);
/**因为combiner的工作原理通reducecer的作用是一样的,所以直接反射调用reducerr类其实作用是一样的*/
/**此处为之后为测试添加的*/
//如果不设置InputFormat,它默认使用的是TextInputFormat.class
/**job.setInputFormatClass(CombineTextInputFormatInputFormatInputFormat.class);
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);
CombineTextInputFormat.setMinInputSplitSize(job, 2097152);
*/
//指定job的输入原始文件所在目录
//FileInputFormat.setInputPaths(job, new Path("/wordcount/input")); //此处添加的路径为HDFS文件系统的路径;
FileInputFormat.setInputPaths(job, new Path(args[0])); //传一个路径参数
//指定job的输出结果所在目录
FileOutputFormat.setOutputPath(job, new Path(args[1])); //传一个参数进来作为输出的路径参数
//将job中配置的相关参数,以及job所用的Java类所在的jar包,提交给yarn去运行;
/*job.submit(); */
boolean res = job.waitForCompletion(true);
System.exit(res?0:1);
}
}
其次编写WordCountMapper:
package com.jym.hadoop.mr.demo;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
//这是一个简单的MapReduce例子,进行单词数量的统计操作;
import org.apache.hadoop.mapreduce.Mapper;
/**
* KEYIN:默认情况下,是mr框架所读到的一行文本的起始偏移量,Long类型,但是在Hadoop中有更精简的序列化接口,因此采用LongWritable类型;
* VALUEIN:默认情况下,是mr框架所读到的一行文本的内容,String类型的,同上用Text(org.apache.hadoop.io.Text)类型;
* KEYOUT:是用户自定义逻辑处理完成之后输出数据中的key,在此处是单词,为String类型,同上用Text类型;
* VALUEOUT:是用户自定义逻辑处理完成之后输出数据中的value,在此处是单词数量,为Integer类型,同上用IntWritable类型;
* */
public class WordcountMapper extends Mapper
{
/**
* map阶段的业务逻辑就写在自定义的map()方法中,
* maptask会对每一行输入数据调用一次我们自定义的map()方法;
* */
@Override //覆写Mapper中的方法;
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
{
//将maptask传给我们的文本内容先转换成String类型
String line = value.toString();
//根据空格将这一行切分成单词;
String[] words = line.split(" ");
//将单词输出为<单词,1>
for(String word:words)
{
//将单词作为key,将次数1作为value,以便于后续的数据分发,可以根据单词分发,以便于相同单词会分到相同的reduce task中;
context.write(new Text(word),new IntWritable(1)); //进行类型转换一下;
}无锡×××医院 https://yyk.familydoctor.com.cn/20612/
}
最后编写WordCountReduceer:
package com.jym.hadoop.mr.demo;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
* KEYIN,VALUEIN应该对应mapper中的输出的KEYOUT,VALUEOUT类型;
* KEYOUT是单词
* VALUEOUT是总次数*/
public class WordcountReducerr extends Reducer
{
/**
* 例如:
*
* 输入参数key,是一组相同单词kv对的key
* */
@Override
protected void reduce(Text key, Iterable values,Context context) throws IOException, InterruptedException
{
int count= 0;
/* //采用迭代器的方式进行统计单词的数量;
Iterator iterator = values.iterator();
while(iterator.hasNext())
{
count+=iterator.next().get(); //获取key对应的value值
}
*/
//下面的for循环和上面注释中的效果是一样的;
for(IntWritable value:values)
{
count+=value.get();
}
//输出统计结果
context.write(key, new IntWritable(count));
/**
* 默认情况下reduce task会将输出结果放到一个文件中(最好是HDFS文件系统上的一个文件)
* */
}
}
然而还可以编写一个Combiner类:
package com.jym.hadoop.mr.demo;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/*
* 此处的这个combiner其实不用自己编写,因为combiner的工作原理同reducer的原理是一样
* 的,故可以直接反射调用WordcountReducer类即可
* */
public class WordcountCombiner extends Reducer
{
@Override
protected void reduce(Text key, Iterable values,Context context) throws IOException, InterruptedException
{
}
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。