本篇内容介绍了“WordCount MapReduce怎么使用”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
package org.myorg;
import java.io.*;
import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
public class WordCount extends Configured implements Tool {
public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
static enum Counters {INPUT_WORDS}
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
private boolean caseSensitive = true;
private Set<String> patternsToSkip = new HashSet<String>();
private long numRecords = 0;
private String inputFile;
public void configure(JobConf job) {
caseSensitive = job.getBoolean("wordcount.case.sensitive", true);
inputFile = job.get("map.input.file");
if (job.getBoolean("wordcount.skip.patterns", false)) {
Path[] patternsFiles = new Path[0];
try {
patternsFiles = DistributedCache.getLocalCacheFiles(job);
} catch (IOException ioe) {
System.err.println("Caught exception while getting cached files: " + StringUtils.stringifyException(ioe));
}
for (Path patternsFile : patternsFiles) {
parseSkipFile(patternsFile);
}
}
}
private void parseSkipFile(Path patternsFile) {
try {
BufferedReader fis = new BufferedReader(new FileReader(patternsFile.toString()));
String pattern = null;
while ((pattern = fis.readLine()) != null) {
patternsToSkip.add(pattern);
}
} catch (IOException ioe) {
System.err.println("Caught exception while parsing the cached file '" + patternsFile + "' : " + StringUtils.stringifyException(ioe));
}
}
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
String line = (caseSensitive) ? value.toString() : value.toString().toLowerCase();
for (String pattern : patternsToSkip) {
line = line.replaceAll(pattern, "");
}
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
output.collect(word, one);
reporter.incrCounter(Counters.INPUT_WORDS, 1);
}
if ((++numRecords % 100) == 0) {
reporter.setStatus("Finished processing " + numRecords + " records " + "from the input file: " + inputFile);
}
}
}
public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
}
public int run(String[] args) throws Exception {
JobConf conf = new JobConf(getConf(), WordCount.class);
conf.setJobName("wordcount");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(Map.class);
conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
List<String> other_args = new ArrayList<String>();
for (int i = 0; i < args.length; ++i) {
if ("-skip".equals(args[i])) {
DistributedCache.addCacheFile(new Path(args[++i]).toUri(), conf);
conf.setBoolean("wordcount.skip.patterns", true);
} else {
other_args.add(args[i]);
}
}
FileInputFormat.setInputPaths(conf, new Path(other_args.get(0)));
FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1)));
JobClient.runJob(conf);
return 0;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new WordCount(), args);
System.exit(res);
}
}
“WordCount MapReduce怎么使用”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注亿速云网站,小编将为大家输出更多高质量的实用文章!
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
原文链接:https://my.oschina.net/u/220934/blog/201646