本篇内容介绍了“storm实时排序TopN怎么使用”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
阅读背景:1 您需要了解TOP 使用的场景
2 您需要了解当前的TOPN 处理,和定时区间处理的区别
看代码说话
package com.cc.storm; import com.cc.storm.bolt.MergeBolt; import com.cc.storm.bolt.RankBolt; import com.cc.storm.bolt.RollingAllCountBolt; import com.cc.storm.bolt.RollingCountBolt; import com.cc.storm.spout.RandomEmitSpout; import com.cc.storm.spout.RedisPubSubSpout; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.generated.AlreadyAliveException; import backtype.storm.generated.InvalidTopologyException; import backtype.storm.topology.TopologyBuilder; import backtype.storm.tuple.Fields; /** * ToPN是一种常见模式,是对流式数据进行“Streaming topN”的计算: * 比如要计算的是最近一段时间内的热门话题,热门点击图片,热门商品浏览,热门商品购买 * * 既然敢要实时的处理,【】【】【】【】【】[] 【】 【】【】【】【】 [] 【】【】【】【】 【】 [] * * @author Yin Shuai */ public class TOP10 { public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException { final int TOP_N = 10; final int time = 1; TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("$datasource$", new RandomEmitSpout(), 1); builder.setBolt("$count$", new RollingCountBolt(3, time), 1) .fieldsGrouping("$datasource$", new Fields("merchandiseIDS")); builder.setBolt("$rank$", new RankBolt(TOP_N), 2).fieldsGrouping( "$count$", new Fields("merchandiseID")); builder.setBolt("$merge$", new MergeBolt(TOP_N)).globalGrouping( "$rank$"); Config conf = new Config(); conf.setDebug(false); conf.setNumWorkers(2); conf.setMaxSpoutPending(5000); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("Getting-Started-Toplogie", conf, builder.createTopology()); Thread.sleep(5000); } }
整个处理的流程如图:
“storm实时排序TopN怎么使用”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注亿速云网站,小编将为大家输出更多高质量的实用文章!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。