本文讲解moquette对UNSUBSCRIBE和DISCONNECT的处理
先说UNSUBSCRIBE,代码比较简单
public void processUnsubscribe(Channel channel, MqttUnsubscribeMessage msg) {
List<String> topics = msg.payload().topics();
String clientID = NettyUtils.clientID(channel);
LOG.info("Processing UNSUBSCRIBE message. CId={}, topics={}", clientID, topics);
ClientSession clientSession = m_sessionsStore.sessionForClient(clientID);
for (String t : topics) {
Topic topic = new Topic(t);
boolean validTopic = topic.isValid();
if (!validTopic) {
// close the connection, not valid topicFilter is a protocol violation
channel.close();
LOG.error("Topic filter is not valid. CId={}, topics={}, badTopicFilter={}", clientID, topics, topic);
return;
}
if(LOG.isDebugEnabled()){
LOG.debug("Removing subscription. CId={}, topic={}", clientID, topic);
}
subscriptions.removeSubscription(topic, clientID);
clientSession.unsubscribeFrom(topic);
String username = NettyUtils.userName(channel);
m_interceptor.notifyTopicUnsubscribed(topic.toString(), clientID, username);
}
// ack the client
int messageID = msg.variableHeader().messageId();
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, AT_LEAST_ONCE, false, 0);
MqttUnsubAckMessage ackMessage = new MqttUnsubAckMessage(fixedHeader, from(messageID));
LOG.info("Sending UNSUBACK message. CId={}, topics={}, messageId={}", clientID, topics, messageID);
channel.writeAndFlush(ackMessage);
}
主要分为以下几步
1.从目录树下,移除该client的订阅,这个移除过程有点复杂,后面单独一篇专门讲解topic树
2.清除ClientSession里面的订阅,包括Set<Subscription> subscriptions,同时还得移除ISubscriptionsStore里面的Map<Topic, Subscription> subscriptions
3.唤醒拦截器
4.返回UNSUBACK ,这里注意UNSUBACK 是没有payload的。
再说DISCONNECT的处理
public void processDisconnect(Channel channel) throws InterruptedException {
final String clientID = NettyUtils.clientID(channel);
LOG.info("Processing DISCONNECT message. CId={}", clientID);
channel.flush();
final ConnectionDescriptor existingDescriptor = this.connectionDescriptors.getConnection(clientID);
if (existingDescriptor == null) {
// another client with same ID removed the descriptor, we must exit
channel.close();
return;
}
if (existingDescriptor.doesNotUseChannel(channel)) {
// another client saved it's descriptor, exit
LOG.warn("Another client is using the connection descriptor. Closing connection. CId={}", clientID);
existingDescriptor.abort();
return;
}
if (!removeSubscriptions(existingDescriptor, clientID)) {
LOG.warn("Unable to remove subscriptions. Closing connection. CId={}", clientID);
existingDescriptor.abort();
return;
}
if (!dropStoredMessages(existingDescriptor, clientID)) {
LOG.warn("Unable to drop stored messages. Closing connection. CId={}", clientID);
existingDescriptor.abort();
return;
}
if (!cleanWillMessageAndNotifyInterceptor(existingDescriptor, clientID)) {
LOG.warn("Unable to drop will message. Closing connection. CId={}", clientID);
existingDescriptor.abort();
return;
}
if (!existingDescriptor.close()) {
LOG.info("The connection has been closed. CId={}", clientID);
return;
}
boolean stillPresent = this.connectionDescriptors.removeConnection(existingDescriptor);
if (!stillPresent) {
// another descriptor was inserted
LOG.warn("Another descriptor has been inserted. CId={}", clientID);
return;
}
LOG.info("The DISCONNECT message has been processed. CId={}", clientID);
}
1.检查连接描述符是否还存在,如果不存在,说明之前已经有客户端删除了它,直接关闭通道
2.判断这个client的连接描述符是不是,是不是还是当前使用这个通道的client?作者要先防止这种情况呢?先卖个关子,后面的第6条会说明
3.清除订阅请求,这里面好像只清楚了不要求保存会话信息的clientsession里面的ISessionsStore里面的Map<Topic, Subscription> subscriptions,而并没有清除ClientSession里面的Set<Subscription> subscriptions和topic树里面的订阅,这能够解释https://blog.51cto.com/13579730/2073914 这篇文章结尾讨论的问题了,只有Map<Topic, Subscription> subscriptions的订阅才是最准确的。
4.丢弃存储的消息,这里面也只是会丢弃不要去保存会话信息的消息
5.清除遗愿消息,对于遗愿消息,这里稍微啰嗦一点,遗愿消息是在初次连接的存储到ProtocolProcessor的ConcurrentMap<String, WillMessage> m_willStore这里面的,那么什么时候发送给订阅者呢?看下面
io.moquette.server.netty.NettyMQTTHandler#channelInactive
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
String clientID = NettyUtils.clientID(ctx.channel());
if (clientID != null && !clientID.isEmpty()) {
LOG.info("Notifying connection lost event. MqttClientId = {}.", clientID);
m_processor.processConnectionLost(clientID, ctx.channel());
}
ctx.close();
}
说明是当netty检测到通道不活跃的时候通知ProtocolProcessor处理ConnectionLost事件的。
public void processConnectionLost(String clientID, Channel channel) {
LOG.info("Processing connection lost event. CId={}", clientID);
ConnectionDescriptor oldConnDescr = new ConnectionDescriptor(clientID, channel, true);
connectionDescriptors.removeConnection(oldConnDescr);//移除连接描述符
// publish the Will message (if any) for the clientID
if (m_willStore.containsKey(clientID)) {
WillMessage will = m_willStore.get(clientID);
forwardPublishWill(will, clientID);//发布遗愿消息
m_willStore.remove(clientID);//移除遗愿消息存储
}
String username = NettyUtils.userName(channel);
m_interceptor.notifyClientConnectionLost(clientID, username);//唤醒拦截器
}
在以下这种情况下会发布遗愿消息
遗嘱消息发布的条件,包括但不限于:
服务端检测到了一个I/O错误或者网络故障。
客户端在保持连接(Keep Alive)的时间内未能通讯。
客户端没有先发送DISCONNECT报文直接关闭了网络连接。
由于协议错误服务端关闭了网络连接。
另外说明一下,遗愿消息是可以设置消息等级的,而且可以被设置成retain消息
6.连接描述符集合里面清除该通道对应的连接描述符,这里有一点很容易误解,强调一下
boolean stillPresent = this.connectionDescriptors.removeConnection(existingDescriptor);
if (!stillPresent) {
// another descriptor was inserted
LOG.warn("Another descriptor has been inserted. CId={}", clientID);
return;
}
作者调用的是ConcurrentMap里面的boolean remove(Object key, Object value);这个方法要求key存在,且value 与预期的一样才会删除,也就说,是有可能存在的,key一样而value不一样的情况的,什么时候会出现?答案是client在两个设备上先后登陆,这个时候由于是存在一个map里面的所以后面的登陆所创建的连接描述符会覆盖前面的一个。当然这里面,也可以在覆盖之前强制断开之前那个连接,但是moquette并没有这么做,具体看源码io.moquette.server.ConnectionDescriptorStore#addConnection
也就说说moquette是允许存在一个账号多设备登陆的。将入client先后在A,B两个设备上建立连接,B连接会覆盖A连接,这个时候A连接虽然还在,但其实是永远也收不到消息的,因为发送消息的时候,会以ConnectionDescriptorStore里面存储的为准,具体看源码
io.moquette.server.ConnectionDescriptorStore#sendMessage,也就是说A连接会无谓的占用broker的资源,个人觉得这样并不好,也非常没有必要,大家可以自行改进。
现在大家就能够理解上面的第2步了,因为这个就是为双登陆的情况下,被覆盖的那个连接准备的。
moquette-broker还要处理以下的报文,包括
1.PINGREQ,心跳报文
2.PUBACK,当broker向client发送qos1消息的时候,client需要回复PUBACK消息,消息存储在
io.moquette.spi.ClientSession.OutboundFlightZone outboundFlightZone里面(底层使用map存储的),
消息是io.moquette.spi.impl.MessagesPublisher#publish3Subscribers(io.moquette.spi.IMessagesStore.StoredMessage, io.moquette.spi.impl.subscriptions.Topic)
这里被存储进去的,这是一个临时的存储,存储完之后消息会被删除掉
3.PUBREC 这个是当broker向client发送qos2消息之后,client需要向broker作的第一个返回报文,
这里面有个动作是将消息从inboundFlightMessages转移到secondPhaseStore和outboundFlightMessages,具体看这
里io.moquette.persistence.memory.MemorySessionStore#moveInFlightToSecondPhaseAckWaiting
4.PUBCOMP 当broker收到这个报文的时候会负责从内存里面删除飞行窗口的消息,具体怎么删除的详见下篇,moquette拦截器
5.PUBREL。当client向broker发送qos2消息的时候,broker会回复PUBREC,告诉client已经记录下来了,
client收到PUBREC之后会发送PUBREL,告诉broker,我知道你已经记录了消息,既然你记录了,那这边
就释放消息了(确保只要broker才会该消息,避免client重发),当broker收到PUBREL报文的时候,就知道
client那边已经把该消息释放了,然后消息的主导权到了他这边,他开始发送消息。当消息发送完成了,
会向client发送PUBCOMP报文。
关于qos2消息的介绍可以看一下这里https://github.com/mcxiaoke/mqtt/blob/master/mqtt/04-OperationalBehavior.md 的4.3条
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。