这篇文章将为大家详细讲解有关Storm如何和Kafka进行整合,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。
对于Storm 如何和Kafka进行整合
package com.mixbox.storm.kafka; import backtype.storm.Config; import backtype.storm.metric.api.IMetric; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import kafka.message.Message; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.mixbox.storm.kafka.PartitionManager.KafkaMessageId; import java.util.*; /** * @author Yin Shuai */ public class KafkaSpout extends BaseRichSpout { public static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class); /** * 内部类,Message和Offset的偏移量对象 * * @author Yin Shuai */ public static class MessageAndRealOffset { public Message msg; public long offset; public MessageAndRealOffset(Message msg, long offset) { this.msg = msg; this.offset = offset; } } /** * 发射的枚举类 * @author Yin Shuai */ static enum EmitState { EMITTED_MORE_LEFT, EMITTED_END, NO_EMITTED } String _uuid = UUID.randomUUID().toString(); SpoutConfig _spoutConfig; SpoutOutputCollector _collector; // 分区的协调器,getMyManagedPartitions 拿到我所管理的分区 PartitionCoordinator _coordinator; // 动态的分区链接:保存到kafka各个节点的连接,以及负责的topic的partition号码 DynamicPartitionConnections _connections; // 提供了从zookeeper读写kafka 消费者信息的功能 ZkState _state; // 上次更新的毫秒数 long _lastUpdateMs = 0; // 当前的分区 int _currPartitionIndex = 0; public KafkaSpout(SpoutConfig spoutConf) { _spoutConfig = spoutConf; } @SuppressWarnings("unchecked") @Override public void open(Map conf, final TopologyContext context, final SpoutOutputCollector collector) { _collector = collector; List<String> zkServers = _spoutConfig.zkServers; // 初始化的时候如果zkServers 为空,那么初始化 默认的配置Zookeeper if (zkServers == null) { zkServers = new ArrayList<String>() { { add("192.168.50.144"); add("192.168.50.169"); add("192.168.50.207"); } }; // zkServers = // (List<String>)conf.get(Config.STORM_ZOOKEEPER_SERVERS); System.out.println(" 使用的是Storm默认配置的Zookeeper List : " + zkServers); } Integer zkPort = _spoutConfig.zkPort; // 在这里我们也同时 来检查zookeeper的端口是否为空 if (zkPort == null) { zkPort = 2181; // zkPort = ((Number) // conf.get(Config.STORM_ZOOKEEPER_PORT)).intValue(); } Map stateConf = new HashMap(conf); stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, zkServers); stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, zkPort); stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_ROOT, _spoutConfig.zkRoot); // 通过保存的配置文件,我们持有了一个zookeeper的state,支持节点内容的创建和删除 _state = new ZkState(stateConf); // 对于连接的维护 _connections = new DynamicPartitionConnections(_spoutConfig, KafkaUtils.makeBrokerReader(conf, _spoutConfig)); // using TransactionalState like this is a hack // 拿到总共的任务次数 int totalTasks = context .getComponentTasks(context.getThisComponentId()).size(); // 判断当前的主机是否是静态的statichost if (_spoutConfig.hosts instanceof StaticHosts) { _coordinator = new StaticCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), totalTasks, _uuid); // 当你拿到的spoutConfig是zkhost的时候 } else { _coordinator = new ZkCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), totalTasks, _uuid); } context.registerMetric("kafkaOffset", new IMetric() { KafkaUtils.KafkaOffsetMetric _kafkaOffsetMetric = new KafkaUtils.KafkaOffsetMetric( _spoutConfig.topic, _connections); @Override public Object getValueAndReset() { List<PartitionManager> pms = _coordinator .getMyManagedPartitions(); Set<Partition> latestPartitions = new HashSet(); for (PartitionManager pm : pms) { latestPartitions.add(pm.getPartition()); } _kafkaOffsetMetric.refreshPartitions(latestPartitions); for (PartitionManager pm : pms) { _kafkaOffsetMetric.setLatestEmittedOffset( pm.getPartition(), pm.lastCompletedOffset()); } return _kafkaOffsetMetric.getValueAndReset(); } }, _spoutConfig.metricsTimeBucketSizeInSecs); context.registerMetric("kafkaPartition", new IMetric() { @Override public Object getValueAndReset() { List<PartitionManager> pms = _coordinator .getMyManagedPartitions(); Map concatMetricsDataMaps = new HashMap(); for (PartitionManager pm : pms) { concatMetricsDataMaps.putAll(pm.getMetricsDataMap()); } return concatMetricsDataMaps; } }, _spoutConfig.metricsTimeBucketSizeInSecs); } @Override public void close() { _state.close(); } @Override public void nextTuple() { // Storm-spout 是从kafka 消费数据,把 kafka 的 consumer // 当成是一个spout,并且向其他的bolt的发送数据 // 拿到当前我管理的这些PartitionsManager List<PartitionManager> managers = _coordinator.getMyManagedPartitions(); for (int i = 0; i < managers.size(); i++) { // 对于每一个分区的 PartitionManager // in case the number of managers decreased // 当前的分区 _currPartitionIndex = _currPartitionIndex % managers.size(); // 拿到当前的分区,并且发送,这里把SpoutOutputCollector传递进去了,由他发射元祖 EmitState state = managers.get(_currPartitionIndex) .next(_collector); // 如果发送状态为:发送-还有剩余 if (state != EmitState.EMITTED_MORE_LEFT) { _currPartitionIndex = (_currPartitionIndex + 1) % managers.size(); } // 如果发送的状态为: 发送-没有剩余 if (state != EmitState.NO_EMITTED) { break; } } long now = System.currentTimeMillis(); if ((now - _lastUpdateMs) > _spoutConfig.stateUpdateIntervalMs) { commit(); } } @Override public void ack(Object msgId) { KafkaMessageId id = (KafkaMessageId) msgId; PartitionManager m = _coordinator.getManager(id.partition); if (m != null) { m.ack(id.offset); } } @Override public void fail(Object msgId) { KafkaMessageId id = (KafkaMessageId) msgId; PartitionManager m = _coordinator.getManager(id.partition); if (m != null) { m.fail(id.offset); } } @Override public void deactivate() { // 停止工作 commit(); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { System.out.println(_spoutConfig.scheme.getOutputFields()); declarer.declare(_spoutConfig.scheme.getOutputFields()); } private void commit() { _lastUpdateMs = System.currentTimeMillis(); for (PartitionManager manager : _coordinator.getMyManagedPartitions()) { manager.commit(); } } }
在粗浅的代码阅读之后,在这里进行详细的分析:
1 KafkaSpout之中持有了一个 MessageAndRealOffset 的内部类
public static class MessageAndRealOffset { public Message msg; public long offset; public MessageAndRealOffset(Message msg,long offset) { this.msg = msg; this.offset = offset; } }
2 在Spout之中我们还持有了一个PartitionCoordinator的分区协调器,默认的情况我们实例化的对象
是ZKCoordinator
关于Storm如何和Kafka进行整合就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。