MapReduce概述:
MapReduce采用分而治之的思想,把对大规模数据集的操作,分发给一个主节点管理下的各个分节点共同完成,然后通过整合各个节点的中间结果,得到最终结果。简单来说,MapReduce就是“任务的分解和结果的汇总”。
在Hadoop中,用于执行MapReduce任务的机器角色有两个:一个是JobTracker;另一个是TaskTracker。JobTracker用于调度工作的,TaskTracker是用于执行工作的。一个Hadoop集群中只有一台JobTracker。
在分布式计算中,MapReduce框架负责处理了并行编程中分布式存储、工作调度、负载均衡、容错均衡、容错处理以及网络通信等复杂问题,把处理过程高度抽象为两个函数:map和reduce,map负责把任务分解成多个任务,reduce负责把分解后多任务处理的结果汇总起来。
需要注意的是,用MapReduce来处理的数据集(或任务)必须具备这样的特点:待处理的数据集可以分解成许多小的数据集,而且每一个小数据集都可以完全并行地进行处理。
程序使用的测试文本数据:
Dear River
Dear River Bear Spark
Car Dear Car Bear Car
Dear Car River Car
Spark Spark Dear Spark
首先是自定义的Maper类代码
public class WordCountMap extends Mapper<LongWritable, Text, Text, IntWritable> {
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
//fields:代表着文本一行的的数据: dear bear river
String[] words = value.toString().split("\t");
for (String word : words) {
// 每个单词出现1次,作为中间结果输出
context.write(new Text(word), new IntWritable(1));
}
}
}
这个Map类是一个泛型类型,它有四个形参类型,分别指定map()函数的输入键、输入值、输出键和输出值的类型。LongWritable
:输入键类型,Text
:输入值类型,Text
:输出键类型,IntWritable
:输出值类型.
String[] words = value.toString().split("\t");
,words
的值为Dear River Bear River
输入键key是一个长整数偏移量,用来寻找第一行的数据和下一行的数据,输入值是一行文本Dear River Bear River
,输出键是单词Bear
,输出值是整数1
。
Hadoop本身提供了一套可优化网络序列化传输的基本类型,而不直接使用Java内嵌的类型。这些类型都在org.apache.hadoop.io
包中。这里使用LongWritable
类型(相当于Java的Long
类型)、Text
类型(相当于Java中的String类型)和IntWritable
类型(相当于Java的Integer
类型)。
map()
方法的参数是输入键和输入值。以本程序为例,输入键LongWritable key
是一个偏移量,输入值Text value
是Dear Car Bear Car
,我们首先将包含有一行输入的Text
值转换成Java的String
类型,之后使用substring()
方法提取我们感兴趣的列。map()
方法还提供了Context
实例用于输出内容的写入。
public class WordCountReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
/*
(River, 1)
(River, 1)
(River, 1)
(Spark , 1)
(Spark , 1)
(Spark , 1)
(Spark , 1)
key: River
value: List(1, 1, 1)
key: Spark
value: List(1, 1, 1,1)
*/
public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable count : values) {
sum += count.get();
}
context.write(key, new IntWritable(sum));// 输出最终结果
};
}
Reduce任务最初按照分区号从Map端抓取数据为:
(River, 1)
(River, 1)
(River, 1)
(spark, 1)
(Spark , 1)
(Spark , 1)
(Spark , 1)
经过处理后得到的结果为:
key: hello value: List(1, 1, 1)
key: spark value: List(1, 1, 1,1)
所以reduce()函数的形参 Iterable<IntWritable> values
接收到的值为List(1, 1, 1)
和List(1, 1, 1,1)
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class WordCountMain {
//若在IDEA中本地执行MR程序,需要将mapred-site.xml中的mapreduce.framework.name值修改成local
public static void main(String[] args) throws IOException,
ClassNotFoundException, InterruptedException {
if (args.length != 2 || args == null) {
System.out.println("please input Path!");
System.exit(0);
}
//System.setProperty("HADOOP_USER_NAME","hadoop2.7");
Configuration configuration = new Configuration();
//configuration.set("mapreduce.job.jar","/home/bruce/project/kkbhdp01/target/com.kaikeba.hadoop-1.0-SNAPSHOT.jar");
//调用getInstance方法,生成job实例
Job job = Job.getInstance(configuration, WordCountMain.class.getSimpleName());
// 打jar包
job.setJarByClass(WordCountMain.class);
// 通过job设置输入/输出格式
// MR的默认输入格式是TextInputFormat,所以下两行可以注释掉
// job.setInputFormatClass(TextInputFormat.class);
// job.setOutputFormatClass(TextOutputFormat.class);
// 设置输入/输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 设置处理Map/Reduce阶段的类
job.setMapperClass(WordCountMap.class);
//map combine减少网路传出量
job.setCombinerClass(WordCountReduce.class);
job.setReducerClass(WordCountReduce.class);
//如果map、reduce的输出的kv对类型一致,直接设置reduce的输出的kv对就行;如果不一样,需要分别设置map, reduce的 输出的kv类型
//job.setMapOutputKeyClass(.class)
// job.setMapOutputKeyClass(Text.class);
// job.setMapOutputValueClass(IntWritable.class);
// 设置reduce task最终输出key/value的类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 提交作业
job.waitForCompletion(true);
}
}
首先更改mapred-site.xml文件配置
将mapreduce.framework.name的值设置为local
然后本地运行:
查看结果:
首先打包
更改配置文件,改成yarn模式
添加本地jar包位置:
Configuration configuration = new Configuration();
configuration.set("mapreduce.job.jar","C:\\Users\\tanglei1\\IdeaProjects\\Hadooptang\\target");
设置允许跨平台远程调用:
configuration.set("mapreduce.app-submission.cross-platform","true");
修改输入参数:
运行结果:
将maven项目打包,在服务器端用命令运行mr程序
hadoop jar com.kaikeba.hadoop-1.0-SNAPSHOT.jar
com.kaikeba.hadoop.wordcount.WordCountMain /tttt.txt /wordcount11
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。