这篇文章主要介绍“Storm开发细节是什么”,在日常操作中,相信很多人在Storm开发细节是什么问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Storm开发细节是什么”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
package test; import java.io.IOException; import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import storm.copyFromClass.TestWordSpout; import com.esotericsoftware.minlog.Log; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.TopologyBuilder; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Tuple; // 测试目的,在这里我们需要测试一下当前Spout 不断产生数据的过程 public class testWordSpoutTopology { public static class TestSimpleBolt extends BaseBasicBolt { @Override public void execute(Tuple input, BasicOutputCollector collector) { System.out.println(input.toString()); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { System.out.println("Method declare"); } } public static void main(String[] args) throws IOException { // 首先,我们必须建立一个新的TopologyBuilder TopologyBuilder builder = new TopologyBuilder(); //其次,我们需要配置如下的组件: 1 Spout,2Bolt builder.setSpout("word-emit-byThread", new TestWordSpout()); //在这个Spout之中,我们约定将 【word-emit-byThread】Spout组件 发射的元祖进行 shuffleGrouping builder.setBolt("word-show", new TestSimpleBolt()).shuffleGrouping( "word-emit-byThread"); Config config = new Config(); config.setDebug(false); //最后进行本地提交 LocalCluster cluster = new LocalCluster(); cluster.submitTopology("simple", config, builder.createTopology()); } }
以上,
testWordSpoutTopology
是我们运行的主类
package storm.copyFromClass; import backtype.storm.Config; import backtype.storm.topology.OutputFieldsDeclarer; import java.util.Map; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.utils.Utils; import java.util.HashMap; import java.util.Random; import org.slf4j.Logger; import org.slf4j.LoggerFactory; // public class TestWordSpout extends BaseRichSpout { public static Logger LOG = LoggerFactory.getLogger(TestWordSpout.class); boolean _isDistributed; SpoutOutputCollector _collector; public TestWordSpout() { this(true); } public TestWordSpout(boolean isDistributed) { _isDistributed = isDistributed; } public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { _collector = collector; } public void close() { } // 发送 public void nextTuple() { Utils.sleep(100); final String[] words = new String[] { "张兵", "吴哥", "仝志维", "前辈", "禅师"}; final Random rand = new Random(); final String word = words[rand.nextInt(words.length)]; _collector.emit(new Values(word)); } //在这里,我们没有进行ACK public void ack(Object msgId) { } //在这里,我们没有进行fail public void fail(Object msgId) { } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } @Override public Map<String, Object> getComponentConfiguration() { if(!_isDistributed) { Map<String, Object> ret = new HashMap<String, Object>(); ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 1); return ret; } else { return null; } } }
结果:
请注意在这里,我们的Stream 默认的id为空
到此,关于“Storm开发细节是什么”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注亿速云网站,小编会继续努力为大家带来更多实用的文章!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。