这篇文章主要介绍“基Spring Integration如何实现MQTT客户端简单订阅发布功能”,在日常操作中,相信很多人在基Spring Integration如何实现MQTT客户端简单订阅发布功能问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”基Spring Integration如何实现MQTT客户端简单订阅发布功能”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
Spring Integration
提供入站(inbound)和出站(outbound)通道适配器,以支持MQTT消息协议。使用这两适配器,需要加入依赖:
<!-- Maven -->
<dependency>
<groupid>org.springframework.integration</groupid>
<artifactid>spring-integration-mqtt</artifactid>
<version>5.2.1.RELEASE</version>
</dependency>
// Gradle
compile "org.springframework.integration:spring-integration-mqtt:5.2.1.RELEASE"
当前的MQTT Integration实现使用的是Eclipse Paho MQTT客户端库。两个适配器的配置都是使用DefaultMqttPahoClientFactory
实现的。有关配置选项的更多信息,请参阅Eclipse Paho MQTT
文档定义。
> 建议配置MqttConnectOptions
对象并将其注入工厂(factory),而不是在工厂本身里设置(不推荐使用)MQTT连接选项。
入站通道适配器由MqttPahoMessageDrivenChannelAdapter
实现。常用的配置项有:
客户端ID
MQTT Broker URL
待订阅的主题列表
带订阅的主题QoS值列表
MqttMessageConverter
(可选)。默认情况下,默认的DefaultPaHomeMessageConverter
生成一条带有字符串有效负载的消息,其头部内容如下:
mqtt_topic
: 接收消息的主题
mqtt_duplicate
: 如果消息是重复的,则为true
mqtt_qos
: 服务质量,你可以将DefaultPahoMessageConverter
声明为<bean />
并将payloadAsBytes
属性设置为true
,从而将DefaultPahoMessageConverter
返回有效负载中的原始byte[]
客户端工厂
发送超时。仅当通道可能阻塞(例如当前已满的有界队列通道)时才适用。
错误通道。下游异常将以错误消息的形式发送到此通道(如果提供)。有效负载是包含失败消息和原因的MessagingException
。
恢复间隔。它控制适配器在发生故障后尝试重新连接的时间间隔。默认为10000毫秒(10秒)。
> 从Spring 4.1版开始,可以省略URL
。相反,你可以在DefaultMqttPahoClientFactory
的server URIs
属性中提供服务器uri。例如,这样做允许连接到高可用(HA)集群。
从Spring 4.2.2
开始,当适配器成功订阅到主题了,MqttSubscribedEvent
事件就会被触发。当连接失败或者订阅失败,MqttConnectionFailedEvent
事件会被触发。这两个事件都能够被一个Bean通过实现ApplicationListener
而接收到。另外,名为recoveryInterval
的新属性控制适配器在失败后尝试重新连接的时间间隔。默认为10000毫秒(10秒)。
@Component
public class MQTTSubscribedListener implements ApplicationListener<mqttsubscribedevent> {
private static final Logger LOGGER = LogManager.getLogger(MQTTSubscribedListener.class);
@Override
public void onApplicationEvent(MqttSubscribedEvent event) {
LOGGER.debug("Subscribed Success: " + event.getMessage());
}
}
> 在版本Spring 4.2.3之前,当适配器停止时,客户端总是取消订阅。这是不正确的,因为如果客户端QOS大于0,我们需要保持订阅处于活动状态,以便在下次启动时传递适配器停止时到达的消息。这还需要将客户机工厂上的cleanSession
属性设置为false。默认为true。从4.2.3版开始,如果cleanSession属性为false,则适配器不会取消订阅(默认情况下),这个默认行为可以通过在工厂上设置consumerCloseAction
属性来重写此行为。它可以有以下值:UNSUBSCRIBE_ALWAYS
、UNSUBSCRIBE_NEVER
和UNSUBSCRIBE_CLEAN
,最后一项(默认设置)仅在cleanSession属性为true时取消订阅。若要还原到4.2.3之前的行为,请始终使用“取消订阅”设置项。
> 注意:从Spring 5.0开始,topic、qos和retained属性映射到.RECEIVED_…headers
(MqttHeaders.RECEIVED_topic、MqttHeaders.RECEIVED_qos和MqttHeaders.RECEIVED_retained),以避免意外传播到(默认情况下)使用MqttHeaders.topic、MqttHeaders.qos和MqttHeaders.retained headers的出站消息。
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<!--?--> message) throws MessagingException {
LOGGER.debug("===Received Msg(topic {}): {}", message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC), message.getPayload());
}
};
}
从Spring4.1
开始,你可以通过编程更改适配器订阅的主题。Spring Integration提供了addTopic()
和removeTopic()
方法。添加主题时,可以选择指定QoS
值(默认是1)。你还可以通过向具有适当有效负载的<control bus />
发送适当的消息来修改主题。示例:
myMqttAdapter.addTopic('foo', 1)
停止和启动适配器对主题列表(topics设置项)没有影响(它不会还原到配置中的原始设置)。这些更改不会保留到应用程序上下文的生命周期之外。新的应用程序上下文将还原为配置的设置。
在适配器停止(或与代理断开连接)时更改主题列表(topics)将在下次建立连接时生效。
以下Spring Boot
应用程序显示了如何使用Java配置配置入站(inbound)适配器的示例:
@SpringBootApplication
public class MqttJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(MqttJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883", "testClient",
"topic1", "topic2");
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<!--?--> message) throws MessagingException {
System.out.println(message.getPayload());
}
};
}
}
下面的Spring Boot
应用程序提供了使用Java DSL配置入站适配器的示例:
@SpringBootApplication
public class MqttJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(MqttJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow mqttInbound() {
return IntegrationFlows.from(
new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883",
"testClient", "topic1", "topic2");)
.handle(m -> System.out.println(m.getPayload()))
.get();
}
}
出站通道适配器由MqttPahoMessageHandler
实现,MqttPahoMessageHandler
包装在ConsumerEndpoint
中。为了方便起见,可以使用名称空间配置它。
从Spring 4.1开始,适配器支持异步发送操作,在确认交付之前避免阻塞。如果需要,可以发出应用程序事件以使应用程序确认传递。
以下列表显示出站通道适配器可用的属性:
<int-mqtt:outbound-channel-adapter id="withConverter" client-id="foo" url="tcp://localhost:1883" converter="myConverter" client-factory="clientFactory" default-qos="1" qos-expression="" default-retained="true" retained-expression="" default-topic="bar" topic-expression="" async="false" async-events="false" channel="target" />
MQTT Client ID
MQTT Broker URL
Converter(MqttMessageConver,可选的),默认的DefaultPaHomeMessageConverter
可识别以下标题:
mqtt_topic
: 消息将发送到的主题
mqtt_retained
: 如果要保留消息,则为true
mqtt_qos
:消息服务质量
客户端工厂
default-qos,默认的服务质量。如果找不到mqtt_qos头或qos表达式返回空值,则使用它。如果提供自定义转换器,则不使用它。
用于计算以确定qos的表达式。缺省值是headers[mqtt_qos]
。
保留标志的默认值。如果找不到mqtt_retained
头,则使用它。如果提供了自定义转换器,则不使用它。
要计算以确定保留布尔值的表达式。默认为headers[mqtt_retained]
消息发送到的默认主题(如果找不到mqtt_topic
头,则使用)
要计算以确定目标主题的表达式。默认为headers['mqtt_topic']
async如果为true,则调用方不会阻塞。而是在发送消息时等待传递确认。默认值为false(发送将阻塞,直到确认发送)
async-events,当async和async事件(async-events)都为true时,将发出MqttMessageSentEvent
。它包含消息、主题、客户端库生成的消息id、clientId和clientInstance(每次连接客户端时递增)。当客户端库确认传递时,将发出MqttMessageDeliveredEvent
。它包含messageId、clientId和clientInstance,使传递与发送相关。任何ApplicationListener
或事件入站通道适配器都可以接收这些事件。请注意,MqttMessageDeliveredEvent
可能在MqttMessageSentEvent
之前收到。默认值为false
> 注意,同样地,从Spring 4.1开始,可以省略URL。相反,可以在DefaultMqttPahoClientFactor
的server URIs
属性中提供服务器uri。例如,这允许连接到高可用(HA)集群。
下面的Spring Boot应用程序展示了如何使用Java配置配置出站适配器的示例:
@SpringBootApplication
@IntegrationComponentScan
public class MqttJavaApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context =
new SpringApplicationBuilder(MqttJavaApplication.class)
.web(false)
.run(args);
MyGateway gateway = context.getBean(MyGateway.class);
gateway.sendToMqtt("foo");
}
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[] { "tcp://host1:1883", "tcp://host2:1883" });
options.setUserName("username");
options.setPassword("password".toCharArray());
factory.setConnectionOptions(options);
return factory;
}
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler =
new MqttPahoMessageHandler("testClient", mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic("testTopic");
return messageHandler;
}
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MyGateway {
void sendToMqtt(String data);
}
}
下面的Spring Boot应用程序提供了使用Java DSL配置出站适配器的示例:
@SpringBootApplication
public class MqttJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(MqttJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow mqttOutboundFlow() {
return f -> f.handle(new MqttPahoMessageHandler("tcp://host1:1883", "someMqttClient"));
}
}
到此,关于“基Spring Integration如何实现MQTT客户端简单订阅发布功能”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注亿速云网站,小编会继续努力为大家带来更多实用的文章!
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
原文链接:https://my.oschina.net/biedamingming/blog/3145882