温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

java rocketmq中与消息发送紧密相关的几行代码以及用法

发布时间:2021-10-13 11:24:27 来源:亿速云 阅读:171 作者:柒染 栏目:编程语言

这期内容当中小编将会给大家带来有关java rocketmq中与消息发送紧密相关的几行代码以及用法,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。

前言

与消息发送紧密相关的几行代码:

1. DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");

2. producer.start();

3. Message msg = new Message(...)

4. SendResult sendResult = producer.send(msg);

5. producer.shutdown();

那这几行代码执行时,背后都做了什么?

一. 首先是DefaultMQProducer.start

@Overridepublic void start() throws MQClientException {this.defaultMQProducerImpl.start();}

调用了默认生成消息的实现类 -- DefaultMQProducerImpl

调用defaultMQProducerImpl.start()方法,DefaultMQProducerImpl.start()会初始化得到MQClientInstance实例对象,MQClientInstance实例对象调用它自己的start方法会 ,启动一些服务,如拉去消息服务PullMessageService.Start()、启动负载平衡服务RebalanceService.Start(),比如网络通信服务MQClientAPIImpl.Start()

另外,还会执行与生产消息相关的信息,如注册produceGroup、new一个TopicPublishInfo对象并以默认TopicKey为键值,构成键值对存入DefaultMQProducerImpl的topicPublishInfoTable中。

efaultMQProducerImpl.start()后,获取的MQClientInstance实例对象会调用sendHeartbeatToAllBroker()方法,不断向broker发送心跳包,yin'b可以使用下面一幅图大致描述DefaultMQProducerImpl.start()过程:

上图中的三个部分中涉及的内容:

1.1 初始化MQClientInstance

一个客户端只能产生一个MQClientInstance实例对象,产生方式使用了工厂模式与单例模式。MQClientInstance.start()方法启动一些服务,源码如下:

public void start() throws MQClientException {synchronized (this) {switch (this.serviceState) {case CREATE_JUST:this.serviceState = ServiceState.START_FAILED;// If not specified,looking address from name serverif (null == this.clientConfig.getNamesrvAddr()) {this.mQClientAPIImpl.fetchNameServerAddr();}// Start request-response channelthis.mQClientAPIImpl.start();// Start various schedule tasksthis.startScheduledTask();// Start pull servicethis.pullMessageService.start();// Start rebalance servicethis.rebalanceService.start();// Start push servicethis.defaultMQProducer.getDefaultMQProducerImpl().start(false);log.info("the client factory [{}] start OK", this.clientId);this.serviceState = ServiceState.RUNNING;break;case RUNNING:break;case SHUTDOWN_ALREADY:break;case START_FAILED:throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);default:break;}}}

1.2 注册producer

该过程会将这个当前producer对象注册到MQClientInstance实例对象的的producerTable中。一个jvm(一个客户端)中一个producerGroup只能有一个实例,MQClientInstance操作producerTable大概有如下几个方法:

-- selectProducer  -- updateTopicRouteInfoFromNameServer  -- prepareHeartbeatData  -- isNeedUpdateTopicRouteInfo  -- shutdown

注:

根据不同的clientId,MQClientManager将给出不同的MQClientInstance;

根据不同的group,MQClientInstance将给出不同的MQProducer和MQConsumer

1.3 向路由信息表中添加路由

topicPublishInfoTable定义:

public class DefaultMQProducerImpl implements MQProducerInner {private final Logger log = ClientLogger.getLog();private final Random random = new Random();private final DefaultMQProducer defaultMQProducer;private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable = new ConcurrentHashMap<String, TopicPublishInfo>();

它是一个以topic为key的Map型数据结构,DefaultMQProducerImpl.start()时会默认创建一个key=MixAll.DEFAULT_TOPIC的TopicPublishInfo存放到topicPublishInfoTable中。

1.4 发送心跳包

MQClientInstance向broker发送心跳包时,调用sendHeartbeatToAllBroker( ),以及从MQClientInstance实例对象的brokerAddrTable中拿到所有broker地址,向这些broker发送心跳包。

sendHeartbeatToAllBroker会涉及到prepareHeartbeatData()方法,该方法会生成heartbeatData数据,发送心跳包时,heartbeatData作为心跳包的body。与producer相关的部分代码如下:

// Producerfor (Map.Entry<String/* group */, MQProducerInner> entry : this.producerTable.entrySet()) {MQProducerInner impl = entry.getValue();if (impl != null) {ProducerData producerData = new ProducerData();producerData.setGroupName(entry.getKey());heartbeatData.getProducerDataSet().add(producerData);}

二、. SendResult sendResult = producer.send(msg)

首先会调用DefaultMQProducer.send(msg) ,继而调用sendDefaultImpl:

public SendResult send(Message msg,long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);}

sendDefaultImpl做了啥?

2.1. 获取topicPublishInfo

根据msg的topic从topicPublishInfoTable获取对应的topicPublishInfo,如果没有则更新路由信息,从nameserver端拉取最新路由信息。从nameserver端拉取最新路由信息大致为:

首先getTopicRouteInfoFromNameServer,然后topicRouteData2TopicPublishInfo。

2.2 选择消息发送的队列

普通消息:默认方式下,selectOneMessageQueue从topicPublishInfo中的messageQueueList中选择一个队列(MessageQueue)进行发送消息,默认采用长轮询的方式选择队列 。

它的机制如下:正常情况下,顺序选择queue进行发送;如果某一个节点发生了超时,则下次选择queue时,跳过相同的broker。不同的队列选择策略形成了生产消息的几种模式,如顺序消息,事务消息。

顺序消息:将一组需要有序消费的消息发往同一个broker的同一个队列上即可实现顺序消息,假设相同订单号的支付,退款需要放到同一个队列,那么就可以在send的时候,自己实现MessageQueueSelector,根据参数arg字段来选择queue。

private SendResult sendSelectImpl(Message msg,MessageQueueSelector selector,Object arg,final CommunicationMode communicationMode,final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { 。。。}

事务消息:只有在消息发送成功,并且本地操作执行成功时,才发送提交事务消息,做事务提交,消息发送失败,直接发送回滚消息,进行回滚,具体如何实现后面会单独成文分析。

2.3 封装消息体通信包,发送数据包

首先,根据获取的MessageQueue中的getBrokerName,调用findBrokerAddressInPublish得到该消息存放对应的broker地址,如果没有找到则跟新路由信息,重新获取地址 :

brokerAddrTable.get(brokerName).get(MixAll.MASTER_ID)

可知获取的broker均为master(id=0)

然后, 将与该消息相关信息打包成RemotingCommand数据包,其RequestCode.SEND_MESSAGE

根据获取的broke地址,将数据包到对应的broker,默认是发送超时时间为3s。

封装消息请求包的包头:

SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());requestHeader.setTopic(msg.getTopic());requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());requestHeader.setQueueId(mq.getQueueId());requestHeader.setSysFlag(sysFlag);requestHeader.setBornTimestamp(System.currentTimeMillis());requestHeader.setFlag(msg.getFlag());requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));requestHeader.setReconsumeTimes(0);requestHeader.setUnitMode(this.isUnitMode());requestHeader.setBatch(msg instanceof MessageBatch);

发送消息包(普通消息默认为同步方式):

SendResult sendResult = null;switch (communicationMode) {   case SYNC:  sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(  brokerAddr,  mq.getBrokerName(),   msg,  requestHeader,   timeout,  communicationMode,  context,  this);break;

处理来自broker端的响应数据包:

private SendResult sendMessageSync(final String addr,final String brokerName,final Message msg,final long timeoutMillis,final RemotingCommand request) throws RemotingException, MQBrokerException, InterruptedException {RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);assert response != null;return this.processSendResponse(brokerName, msg, response);}

broker端处理request数据包后会将消息存储到commitLog.

上述就是小编为大家分享的java rocketmq中与消息发送紧密相关的几行代码以及用法了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注亿速云行业资讯频道。

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI