storm是grovvy写的
kafka是scala写的
storm-kafka storm连接kafka consumer的插件
下载地址:
https://github.com/wurstmeister/storm-kafka-0.8-plus
除了需要storm和kafka相关jar包还需要google-collections-1.0.jar
以及zookeeper相关包 curator-framework-1.3.3.jar和curator-client-1.3.3.jar
以前由com.netflix.curator组织开发现在归到org.apache.curator下面
1.Kafka Consumer即Storm Spout代码
package demo;
import java.util.ArrayList;
import java.util.List;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.TopologyBuilder;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
public class MyKafkaSpout {
public static void main(String[] args) {
String topic ="track";
ZkHosts zkhosts = new ZkHosts("192.168.1.107:2181,192.168.1.108:2181,192.168.1.109:2181");
SpoutConfig spoutConfig = new SpoutConfig(zkhosts, topic,
"/MyKafka", //偏移量offset的根目录
"MyTrack");//子目录对应一个应用
List<String> zkServers=new ArrayList<String>();
//zkServers.add("192.168.1.107");
//zkServers.add("192.168.1.108");
for(String host:zkhosts.brokerZkStr.split(","))
{
zkServers.add(host.split(":")[0]);
}
spoutConfig.zkServers=zkServers;
spoutConfig.zkPort=2181;
spoutConfig.forceFromStart=true;//从头开始消费,实际上是要改成false的
spoutConfig.socketTimeoutMs=60;
spoutConfig.scheme=new SchemeAsMultiScheme(new StringScheme());//定义输出为string类型
TopologyBuilder builder=new TopologyBuilder();
builder.setSpout("spout", new KafkaSpout(spoutConfig),1);//引用spout,并发度设为1
builder.setBolt("bolt1", new MyKafkaBolt(),1).shuffleGrouping("spout");
Config config =new Config();
config.setDebug(true);//上线之前都要改成false否则日志会非常多
if(args.length>0){
try {
StormSubmitter.submitTopology(args[0], config, builder.createTopology());
} catch (AlreadyAliveException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InvalidTopologyException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}else{
LocalCluster localCluster=new LocalCluster();
localCluster.submitTopology("mytopology", config, builder.createTopology());
//本地模式在一个进程里面模拟一个storm集群的所有功能
}
}
}
2.Bolt代码只是简单打印输出,覆写execute方法即可
package demo;
import java.util.Map;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
public class MyKafkaBolt implements IBasicBolt {
@Override
public void declareOutputFields(OutputFieldsDeclarer arg0) {
// TODO Auto-generated method stub
}
@Override
public Map<String, Object> getComponentConfiguration() {
// TODO Auto-generated method stub
return null;
}
@Override
public void cleanup() {
// TODO Auto-generated method stub
}
@Override
public void execute(Tuple input, BasicOutputCollector arg1) {
String kafkaMsg =input.getString(0);
System.err.println("bolt"+kafkaMsg);
}
@Override
public void prepare(Map arg0, TopologyContext arg1) {
// TODO Auto-generated method stub
}
}
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。