温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

MapReduce编写实现wordcount词频统计

发布时间:2020-07-29 12:58:10 来源:网络 阅读:723 作者:nineteens 栏目:编程语言

  p>首先编写WordCountDriver:

  package com.jym.hadoop.mr.demo;

  import java.io.IOException;

  import org.apache.hadoop.conf.Configuration;

  import org.apache.hadoop.fs.Path;

  import org.apache.hadoop.io.IntWritable;

  import org.apache.hadoop.io.Text;

  import org.apache.hadoop.mapreduce.Job;

  import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;

  import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

  /**

  * 这个程序相当于一个yarn集群的客户端,

  * 需要在此封装我们的mr程序的相关运行参数,指定jar包,

  * 最后提交给yarn

  * */

  public class WordcountDriver

  {

  public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException

  {

  Configuration conf=new Configuration();

  /*其实如果在本地运行MR程序其实不用配置下面的代码程序,在MR默认下就是本地运行*/

  /**下面这段代码配置的是在本地模式下运行MR程序*/

  /**是否运行为本地模式,就是看这个参数值是否为local,默认就是local;*/

  //conf.set("mapreduce.framework.name", "local"); //在本地运行MR程序

  //本地模式运行MR程序时,输入输出的数据可以在本地,也可以在hdfs上

  //到底在哪里,就看以下两行配置用哪一行了,默认是“file:///”

  /**conf.set("fs.defaultFS", "hdfs://hadoop1:9000");*/ //使用的是HDFS系统

  //conf.set("fs.defaultFS", "file:///"); //使用的是本地Windows磁盘

  /**运行集群模式,就是把程序提交到yarn中去运行

  * 要想运行为集群模式,以下3个参数要指定为集群上的值

  * */

  conf.set("mapreduce.framework.name", "yarn");

  conf.set("yarn.resourcemanager.hostname", "hadoop1");

  conf.set("fs.defaultFS", "hdfs://hadoop1:9000");

  Job job = Job.getInstance(conf);

  /**要想在Windows的Eclipse上运行程序,并提交到hadoop的YARN集群上需要指定jar包,如下:*/

  /**job.setJar("c:/wc.jar");*/

  //job.setJar("/home/hadoop/wc.jar"); //这种是将程序打包成jar包,放到指定的位置,缺乏灵活性,不建议使用;

  //指定本程序的jar包所在的本地路径

  job.setJarByClass(WordcountDriver.class);

  //指定本业务job要使用的mapper/reducer业务类

  job.setMapperClass(WordcountMapper.class);

  job.setReducerClass(WordcountReducerr.class);

  //指定mapper输出数据的kv类型;

  job.setMapOutputKeyClass(Text.class);

  job.setMapOutputValueClass(IntWritable.class);

  //指定最终输出的数据的kv类型

  job.setOutputKeyClass(Text.class);

  job.setOutputValueClass(IntWritable.class);

  //指定需要使用的combiner,以及用哪一个类作为combiner的逻辑

  /*job.setCombinerClass(WordcountCombiner.class);*/

  job.setCombinerClass(WordcountReducerr.class);

  /**因为combiner的工作原理通reducecer的作用是一样的,所以直接反射调用reducerr类其实作用是一样的*/

  /**此处为之后为测试添加的*/

  //如果不设置InputFormat,它默认使用的是TextInputFormat.class

  /**job.setInputFormatClass(CombineTextInputFormatInputFormatInputFormat.class);

  CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);

  CombineTextInputFormat.setMinInputSplitSize(job, 2097152);

  */

  //指定job的输入原始文件所在目录

  //FileInputFormat.setInputPaths(job, new Path("/wordcount/input")); //此处添加的路径为HDFS文件系统的路径;

  FileInputFormat.setInputPaths(job, new Path(args[0])); //传一个路径参数

  //指定job的输出结果所在目录

  FileOutputFormat.setOutputPath(job, new Path(args[1])); //传一个参数进来作为输出的路径参数

  //将job中配置的相关参数,以及job所用的Java类所在的jar包,提交给yarn去运行;

  /*job.submit(); */

  boolean res = job.waitForCompletion(true);

  System.exit(res?0:1);

  }

  }

  其次编写WordCountMapper:

  package com.jym.hadoop.mr.demo;

  import java.io.IOException;

  import org.apache.hadoop.io.IntWritable;

  import org.apache.hadoop.io.LongWritable;

  import org.apache.hadoop.io.Text;

  //这是一个简单的MapReduce例子,进行单词数量的统计操作;

  import org.apache.hadoop.mapreduce.Mapper;

  /**

  * KEYIN:默认情况下,是mr框架所读到的一行文本的起始偏移量,Long类型,但是在Hadoop中有更精简的序列化接口,因此采用LongWritable类型;

  * VALUEIN:默认情况下,是mr框架所读到的一行文本的内容,String类型的,同上用Text(org.apache.hadoop.io.Text)类型;

  * KEYOUT:是用户自定义逻辑处理完成之后输出数据中的key,在此处是单词,为String类型,同上用Text类型;

  * VALUEOUT:是用户自定义逻辑处理完成之后输出数据中的value,在此处是单词数量,为Integer类型,同上用IntWritable类型;

  * */

  public class WordcountMapper extends Mapper

  {

  /**

  * map阶段的业务逻辑就写在自定义的map()方法中,

  * maptask会对每一行输入数据调用一次我们自定义的map()方法;

  * */

  @Override //覆写Mapper中的方法;

  protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException

  {

  //将maptask传给我们的文本内容先转换成String类型

  String line = value.toString();

  //根据空格将这一行切分成单词;

  String[] words = line.split(" ");

  //将单词输出为<单词,1>

  for(String word:words)

  {

  //将单词作为key,将次数1作为value,以便于后续的数据分发,可以根据单词分发,以便于相同单词会分到相同的reduce task中;

  context.write(new Text(word),new IntWritable(1)); //进行类型转换一下;

  }无锡×××医院 https://yyk.familydoctor.com.cn/20612/

  }

  最后编写WordCountReduceer:

  package com.jym.hadoop.mr.demo;

  import java.io.IOException;

  import java.util.Iterator;

  import org.apache.hadoop.io.IntWritable;

  import org.apache.hadoop.io.Text;

  import org.apache.hadoop.mapreduce.Reducer;

  /**

  * KEYIN,VALUEIN应该对应mapper中的输出的KEYOUT,VALUEOUT类型;

  * KEYOUT是单词

  * VALUEOUT是总次数*/

  public class WordcountReducerr extends Reducer

  {

  /**

  * 例如:

  *

  * 输入参数key,是一组相同单词kv对的key

  * */

  @Override

  protected void reduce(Text key, Iterable values,Context context) throws IOException, InterruptedException

  {

  int count= 0;

  /* //采用迭代器的方式进行统计单词的数量;

  Iterator iterator = values.iterator();

  while(iterator.hasNext())

  {

  count+=iterator.next().get(); //获取key对应的value值

  }

  */

  //下面的for循环和上面注释中的效果是一样的;

  for(IntWritable value:values)

  {

  count+=value.get();

  }

  //输出统计结果

  context.write(key, new IntWritable(count));

  /**

  * 默认情况下reduce task会将输出结果放到一个文件中(最好是HDFS文件系统上的一个文件)

  * */

  }

  }

  然而还可以编写一个Combiner类:

  package com.jym.hadoop.mr.demo;

  import java.io.IOException;

  import org.apache.hadoop.io.IntWritable;

  import org.apache.hadoop.io.Text;

  import org.apache.hadoop.mapreduce.Reducer;

  /*

  * 此处的这个combiner其实不用自己编写,因为combiner的工作原理同reducer的原理是一样

  * 的,故可以直接反射调用WordcountReducer类即可

  * */

  public class WordcountCombiner extends Reducer

  {

  @Override

  protected void reduce(Text key, Iterable values,Context context) throws IOException, InterruptedException

  {

  }


向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI