这篇文章将为大家详细讲解有关MQTT大消息失败原因排查的过程,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。
小组内使用 MQTT 协议搭建了一个聊天服务器,前天在测大消息(超过5000汉字)时,连接直接变得不可用,后续发送的消息全部都收不到回复。
服务器环境:
Netty :4.1.32.Final
使用的是 Netty 包中自带的 MqttDecoder
客户端: Android
由于所有的消息都打印了日志,因此先搜了一下服务器日志,发现日志中并没有发送的消息内容。
难道是客户端在超长消息时没有发送?使用 tcpdump
抓了包,发现客户端正常发送,并且所有的包服务端都已经 ack,但是后续服务端没有发回响应,猜测是服务端在大消息的情况下处理失败了。
tcpdump
使用 -nn
打印出ip和端口,-X
打印网络包的内容,也可以使用-w
选项保存到文件里,然后使用 tcpdump
或 wireshark
来分析
于是查了一下 MQTT 支持的最大 payload,MQTT 官方文档 中说明是 256M,这个大小肯定不会超过。
在服务端抓了下包,确认消息已经收到,但是无确认消息返回
开启线上debug,发现收到了一个 PUBLISH
类型的消息,但是消息的 class 不为 MqttPublishMessage
, 且 payload 中无数据,但在 Message 中有一个报错消息 too large message: 56234 bytes
Google 一下,有网友遇到了同样的问题, 虽然这个问题里 MQTT 是 C 语言的。
查看 MqttDecoder
, 发现 decoder 有最长 payload 限制(以下为部分代码),启动代码里调用的是默认构造函数,因此默认最长数据为 8092
字节。
public final class MqttDecoder extends ReplayingDecoder<DecoderState> { private static final int DEFAULT_MAX_BYTES_IN_MESSAGE = 8092; public MqttDecoder() { this(DEFAULT_MAX_BYTES_IN_MESSAGE); } public MqttDecoder(int maxBytesInMessage) { super(DecoderState.READ_FIXED_HEADER); this.maxBytesInMessage = maxBytesInMessage; } @Override protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception { switch (state()) { case READ_FIXED_HEADER: try { mqttFixedHeader = decodeFixedHeader(buffer); bytesRemainingInVariablePart = mqttFixedHeader.remainingLength(); checkpoint(DecoderState.READ_VARIABLE_HEADER); // fall through } catch (Exception cause) { out.add(invalidMessage(cause)); return; } case READ_VARIABLE_HEADER: try { final Result<?> decodedVariableHeader = decodeVariableHeader(buffer, mqttFixedHeader); variableHeader = decodedVariableHeader.value; if (bytesRemainingInVariablePart > maxBytesInMessage) { throw new DecoderException("too large message: " + bytesRemainingInVariablePart + " bytes"); } bytesRemainingInVariablePart -= decodedVariableHeader.numberOfBytesConsumed; checkpoint(DecoderState.READ_PAYLOAD); // fall through } catch (Exception cause) { out.add(invalidMessage(cause)); return; } case READ_PAYLOAD: try { final Result<?> decodedPayload = decodePayload( buffer, mqttFixedHeader.messageType(), bytesRemainingInVariablePart, variableHeader); bytesRemainingInVariablePart -= decodedPayload.numberOfBytesConsumed; if (bytesRemainingInVariablePart != 0) { throw new DecoderException( "non-zero remaining payload bytes: " + bytesRemainingInVariablePart + " (" + mqttFixedHeader.messageType() + ')'); } checkpoint(DecoderState.READ_FIXED_HEADER); MqttMessage message = MqttMessageFactory.newMessage( mqttFixedHeader, variableHeader, decodedPayload.value); mqttFixedHeader = null; variableHeader = null; out.add(message); break; } catch (Exception cause) { out.add(invalidMessage(cause)); return; } case BAD_MESSAGE: // Keep discarding until disconnection. buffer.skipBytes(actualReadableBytes()); break; default: // Shouldn't reach here. throw new Error(); } } private MqttMessage invalidMessage(Throwable cause) { checkpoint(DecoderState.BAD_MESSAGE); return MqttMessageFactory.newInvalidMessage(mqttFixedHeader, variableHeader, cause); } }
长消息的原因找到了,还剩一个问题,为什么后续的消息包括 ping 消息就再也发不出去了?经过查看代码,这与 MqttDecoder
的父类 ReplayingDecoder
有关系,查看源码有详尽的类说明, 在读取可变长度头部时,如果payload 超过了最大限制,那么直接抛出异常。摘出代码如下:
case READ_VARIABLE_HEADER: try { final Result<?> decodedVariableHeader = decodeVariableHeader(buffer, mqttFixedHeader); variableHeader = decodedVariableHeader.value; if (bytesRemainingInVariablePart > maxBytesInMessage) { throw new DecoderException("too large message: " + bytesRemainingInVariablePart + " bytes"); } bytesRemainingInVariablePart -= decodedVariableHeader.numberOfBytesConsumed; checkpoint(DecoderState.READ_PAYLOAD); // fall through } catch (Exception cause) { out.add(invalidMessage(cause)); return; }
在异常处理中,调用了 invalidMessage
方法,这个方法将 状态设为 DecoderState.BAD_MESSAGE
, 在这个状态下,所有的字节都直接被丢弃。
case BAD_MESSAGE: // Keep discarding until disconnection. buffer.skipBytes(actualReadableBytes()); break;
也就是说此后的消息都不会进入到业务处理逻辑,这条长连接废掉了。
客户端对长消息做字数限制和拆分,保证单条消息不超过最大限制
服务端增大最大载荷长度,MqttDecoder
提供了构造函数(不建议使用,这样会增大服务器处理时间和内存负担)
关于MQTT大消息失败原因排查的过程就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。