软件版本
paoding-analysis3.0
项目jar包和拷贝庖丁dic目录到项目的类路径下
修改paoding-analysis.jar下的paoding-dic-home.properties文件设置词典文件路径
paoding.dic.home=classpath:dic
分词程序demo
import java.io.IOException;
import java.io.StringReader;
import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
import net.paoding.analysis.analyzer.PaodingAnalyzer;
public class TokenizeWithPaoding {
public static void main(String[] args) {
String line="中华民族共和国";
PaodingAnalyzer analyzer =new PaodingAnalyzer();
StringReader sr=new StringReader(line);
TokenStream ts=analyzer.tokenStream("", sr);//分词流,第一个参数无意义
//迭代分词流
try {
while(ts.incrementToken()){
CharTermAttribute ta=ts.getAttribute(CharTermAttribute.class);
System.out.println(ta.toString());
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
新闻文文本分类源文件
http://people.csail.mit.edu/jrennie/20Newsgroups/20news-bydate.tar.gz
每个文件夹代表一个类别,每个类别下的文件代表一条新闻
中文新闻分类需要先分词
对于大量小文件可以使用FileInputFormat的另一个抽象子类CombineFileInputFormat实现createRecordReader方法
CombineFileInputFormat重写了getSpilt方法,返回的分片类型是CombineFileSpilt,是InputSpilt的子类,可包含多个文件
RecordReader怎么由文件生成key-value是由nextKeyValue函数决定
自定义的CombineFileInputFormat类
package org.conan.myhadoop.fengci;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
/**
* 自定义MyInputFormat类, 用于实现一个Split包含多个文件
* @author BOB
*
*/
public class MyInputFormat extends CombineFileInputFormat<Text, Text>{
//禁止文件切分
@Override
protected boolean isSplitable(JobContext context, Path file) {
return false;
}
@Override
public RecordReader<Text, Text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException {
return new CombineFileRecordReader<Text, Text>((CombineFileSplit)split, context, MyRecordReader.class);
}
}
自定义的RecordReader类
package org.conan.myhadoop.fengci;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
/**
* 自定义MyRecordReader类, 用于读取MyInputFormat对象切分的Split分片中的内容
* @author BOB
*
*/
public class MyRecordReader extends RecordReader<Text, Text> {
private CombineFileSplit combineFileSplit; //当前处理的分片
private Configuration conf; //作业的配置信息
private Text currentKey = new Text(); //当前读入的key
private Text currentValue = new Text(); //当前读入的value
private int totalLength; //当前分片中文件的数量
private int currentIndex; //正在读取的文件在当前分片中的位置索引
private float currentProgress = 0F; //当前进度
private boolean processed = false; //标记当前文件是否已经被处理过
//构造方法
public MyRecordReader(CombineFileSplit combineFileSplit,
TaskAttemptContext context, Integer fileIndex) {
super();
this.combineFileSplit = combineFileSplit;
this.currentIndex = fileIndex;
this.conf = context.getConfiguration();
this.totalLength = combineFileSplit.getPaths().length;
}
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
}
@Override
public Text getCurrentKey() throws IOException, InterruptedException {
return currentKey;
}
@Override
public Text getCurrentValue() throws IOException, InterruptedException {
return currentValue;
}
@Override
public float getProgress() throws IOException, InterruptedException {
if(currentIndex >= 0 && currentIndex < totalLength) {
return currentProgress = (float) currentIndex/totalLength;
}
return currentProgress;
}
@Override
public void close() throws IOException {
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if(!processed) {
//由文件的父目录, 文件名以及目录分割符组成key
Path file = combineFileSplit.getPath(currentIndex);
StringBuilder sb = new StringBuilder();
sb.append("/");
sb.append(file.getParent().getName()).append("/");
sb.append(file.getName());
currentKey.set(sb.toString());
//以整个文件的内容作为value
FSDataInputStream in = null;
byte[] content = new byte[(int)combineFileSplit.getLength(currentIndex)];
FileSystem fs = file.getFileSystem(conf);
in = fs.open(file);
in.readFully(content);
currentValue.set(content);
in.close();
processed = true;
return true;
}
return false;
}
}
分词驱动类
package org.conan.myhadoop.fengci;
import java.io.IOException;
import java.io.StringReader;
import net.paoding.analysis.analyzer.PaodingAnalyzer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
/**
* 分词驱动器类, 用于给输入文件进行分词
* @author BOB
*
*/
public class TokenizerDriver extends Configured implements Tool{
public static void main(String[] args) throws Exception{
int res = ToolRunner.run(new Configuration(), new TokenizerDriver(), args);
System.exit(res);
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
//参数设置
conf.setLong("mapreduce.input.fileinputformat.split.maxsize", 4000000);
//作业名称
Job job = new Job(conf,"Tokenizer");
job.setJarByClass(TokenizerDriver.class);
job.setMapperClass(Map.class);
job.setInputFormatClass(MyInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
Path inpath=new Path(args[0]);
Path outpath=new Path(args[1]);
FileSystem fs = inpath.getFileSystem(conf);
FileStatus[] status = fs.listStatus(inpath);
Path[] paths = FileUtil.stat2Paths(status);
for(Path path : paths) {
FileInputFormat.addInputPath(job, path);
}
FileOutputFormat.setOutputPath(job, outpath);
//输出文件夹已经存在则删除
FileSystem hdfs = outpath.getFileSystem(conf);
if(hdfs.exists(outpath)){
hdfs.delete(outpath,true);
hdfs.close();
}
//没有Reduce任务
job.setNumReduceTasks(0);
return job.waitForCompletion(true) ? 0 : 1;
}
/**
* Hadoop计算框架下的Map类, 用于并行处理文本分词任务
* @author BOB
*
*/
static class Map extends Mapper<Text, Text, Text, Text> {
@Override
protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
//创建分词器
Analyzer analyzer = new PaodingAnalyzer();
String line = value.toString();
StringReader reader = new StringReader(line);
//获取分词流对象
TokenStream ts = analyzer.tokenStream("", reader);
StringBuilder sb = new StringBuilder();
//遍历分词流中的词语
while(ts.incrementToken()) {
CharTermAttribute ta = ts.getAttribute(CharTermAttribute.class);
if(sb.length() != 0) {
sb.append(" ").append(ta.toString());
} else {
sb.append(ta.toString());
}
}
value.set(sb.toString());
context.write(key, value);
}
}
}
分词预先处理结果,将所有新闻集中到一个文本中,key为类别,一行代表一篇新闻,单词之间用空格分开
处理后的数据可用于mahout做贝叶斯分类器
参考文章:
http://f.dataguru.cn/thread-244375-1-1.html
http://www.cnblogs.com/panweishadow/p/4320720.html
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。