这篇文章主要介绍nutch中如何实现索引去重,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!
一、主程序调用 SolrDeleteDuplicates dedup = new SolrDeleteDuplicates(); dedup.setConf(getConf()); dedup.dedup(solrUrl); 二、job任务配置 JobConf job = new NutchJob(getConf()); job.setInputFormat(SolrInputFormat.class); job.setMapperClass(IdentityMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(SolrRecord.class); job.setReducerClass(SolrDeleteDuplicates.class); job.setOutputFormat(NullOutputFormat.class); JobClient.runJob(job); 三、Map、reduce任务的输入和输出 Map任务输入、输出 public void map( K key, V val, OutputCollector<K, V> output reduce任务输入、输出 输入:Text/Iterator<SolrRecord> 输出:Text/SolrRecord public void reduce( Text key, Iterator<SolrRecord> values, OutputCollector<Text, SolrRecord> output 四、job任务输入类SolrInputFormat getSplits方法将所有的文档按照数量平均分片 getRecordReader方法中利用solrserver查询了当前分片包含的所有doc记录,solrrecord返回了的当前的RecordReader<Text, SolrRecord>记录(RecordReader是一个全局的变量),并且有获取下一个方法。 (1)、SolrInputFormat的getSplits方法 1、根据job对象的参数,获取solrserver对象。 2、构建并执行查询(查询参数:[*:*、id、setRow(1)] ),获取响应对象 3、根据响应对象获取索引总数,除以分片数,得到每一片分配多少个索引 4、根据分片数创建 SolrInputSplit数组对象, 5、根据solr输入分片的开始和结束位置,实例化SolrInputSplit对象 public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { SolrServer solr = SolrUtils.getCommonsHttpSolrServer(job); final SolrQuery solrQuery = new SolrQuery(SOLR_GET_ALL_QUERY); solrQuery.setFields(SolrConstants.ID_FIELD); solrQuery.setRows(1); QueryResponse response; try { response = solr.query(solrQuery); } catch (final SolrServerException e) { throw new IOException(e); } int numResults = (int)response.getResults().getNumFound(); int numDocsPerSplit = (numResults / numSplits); int currentDoc = 0; SolrInputSplit[] splits = new SolrInputSplit[numSplits]; for (int i = 0; i < numSplits - 1; i++) { splits[i] = new SolrInputSplit(currentDoc, numDocsPerSplit); currentDoc += numDocsPerSplit; } splits[splits.length - 1] = new SolrInputSplit(currentDoc, numResults - currentDoc); return splits; } (2)、SolrInputFormat的getRecordReader()方法 1、获取solrserver对象 2、将传入的split参数,强转成SolrInputSplit对象,并获取这个分片的文档总数 3、构建查询对象,执行查询(参数[*:*,id,boost,tstamp,digest, SolrInputSplit中的开始位置,文档总数 ])。 4、根据响应对象,获取结果集 5、对匿名内部内RecordReader做了实现,并且返回 public RecordReader<Text, SolrRecord> getRecordReader(final InputSplit split, final JobConf job, Reporter reporter) throws IOException { //1、获取solrserver对象 SolrServer solr = SolrUtils.getCommonsHttpSolrServer(job); //2、将传入的split参数,强转成SolrInputSplit对象,并获取这个分片的文档总数 SolrInputSplit solrSplit = (SolrInputSplit) split; final int numDocs = solrSplit.getNumDocs();
//3、构建查询对象,执行查询(参数[*:*,id,boost,tstamp,digest, SolrInputSplit中的开始位置,文档总数 ]) SolrQuery solrQuery = new SolrQuery(SOLR_GET_ALL_QUERY); solrQuery.setFields(SolrConstants.ID_FIELD, SolrConstants.BOOST_FIELD, SolrConstants.TIMESTAMP_FIELD, SolrConstants.DIGEST_FIELD); solrQuery.setStart(solrSplit.getDocBegin()); solrQuery.setRows(numDocs); QueryResponse response;
try { response = solr.query(solrQuery); } catch (final SolrServerException e) { throw new IOException(e); } //4、根据响应对象,获取结果集 final SolrDocumentList solrDocs = response.getResults(); return new RecordReader<Text, SolrRecord>() { //当前的文档 private int currentDoc = 0; public void close() throws IOException { } public Text createKey() { return new Text(); } public SolrRecord createValue() { return new SolrRecord(); } //获取当前的指针 public long getPos() throws IOException { return currentDoc; } //获取进度 public float getProgress() throws IOException { return currentDoc / (float) numDocs; } //获取下一个 public boolean next(Text key, SolrRecord value) throws IOException { if (currentDoc >= numDocs) { return false; } // SolrDocument doc = solrDocs.get(currentDoc); //获取摘要 String digest = (String) doc.getFieldValue(SolrConstants.DIGEST_FIELD); //把摘要作为key key.set(digest); //value(SolrRecord) //赋值:通过doc给solrrecord的id,tstamp,boost 3个字段赋值 value.readSolrDocument(doc); //指针加自增1 currentDoc++; return true; } }; }
五、map()方法和reduce()方法中的实现 (1)、map任务 (2)、reduce任务 去重逻辑: reduce任务会遍历每一个record,并执行reduce()方法中的代码 reduce()方法中,会遍历处于当前文档之后的所有文档,如果分值和时间都比当前的小,会调用solrj删除这个文档,如果比当前的大,会删除当前的,并把当前的替换成这个大的。 public void reduce(Text key, Iterator<SolrRecord> values, OutputCollector<Text, SolrRecord> output, Reporter reporter) throws IOException { //1、下一个SolrRecord对象 SolrRecord recordToKeep = new SolrRecord(values.next()); //2、遍历了SolrRecord while (values.hasNext()) { // SolrRecord solrRecord = values.next(); //boost、tstamp参与比较 //如果当前的分值, 比保持的分支高,并且时间比保持的新,就根据id删除这条索引, if (solrRecord.getBoost() > recordToKeep.getBoost() || (solrRecord.getBoost() == recordToKeep.getBoost() && solrRecord.getTstamp() > recordToKeep.getTstamp())) { updateRequest.deleteById(recordToKeep.id); recordToKeep = new SolrRecord(solrRecord); } else { updateRequest.deleteById(solrRecord.id); } numDeletes++; reporter.incrCounter("SolrDedupStatus", "Deleted documents", 1); if (numDeletes >= NUM_MAX_DELETE_REQUEST) { try { LOG.info("SolrDeleteDuplicates: deleting " + numDeletes + " duplicates"); updateRequest.process(solr); } catch (SolrServerException e) { throw new IOException(e); } updateRequest = new UpdateRequest(); numDeletes = 0; } } } 六、关于digest doc中的digest字段,是在IndexerMapReduce类中的reduce方法中加入的 // add digest, used by dedup doc.add("digest", metadata.get(Nutch.SIGNATURE_KEY)); Metadata中包含了一个HashMap final Metadata metadata = parseData.getContentMeta(); |
以上是“nutch中如何实现索引去重”这篇文章的所有内容,感谢各位的阅读!希望分享的内容对大家有帮助,更多相关知识,欢迎关注亿速云行业资讯频道!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。