我们都知道,RocketMQ在代码级别对连接服务器进行了限制,基本上可以理解为一个JVM进程中只能连接一个NameServer,但实际应用场景中,我们可能会在架构设计层面上对RocketMQ进行了职能上的划分,规定了A服务处理A类消息,而B服务处理B类消息,这时我们应该如何解决这个问题呢?
我们从代码层级来分析到底为什么会产生“一个JVM实例只能连接一个NameServer”。
RocketMQ Client有一个核心类MQClientManager
,在我们需要使用MQ Client实例的时候,实际上都是通过它的getAndCreateMQClientInstance
方法进行创建的;名称比较拗口,同时是Get和Create,这不太符合我们所说的设计单一性原则,但这不是我们讨论的重点,我们看一看这个方法的实现
public MQClientInstance getAndCreateMQClientInstance(ClientConfig clientConfig, RPCHook rpcHook) {
String clientId = clientConfig.buildMQClientId();
MQClientInstance instance = (MQClientInstance)this.factoryTable.get(clientId);
if (null == instance) {
instance = new MQClientInstance(clientConfig.cloneClientConfig(), this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
MQClientInstance prev = (MQClientInstance)this.factoryTable.putIfAbsent(clientId, instance);
if (prev != null) {
instance = prev;
log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
} else {
log.info("Created new MQClientInstance for clientId:[{}]", clientId);
}
}
return instance;
}
代码不复杂,我们可以看到它利用客户的配置信息生成一个固定的clientId,以此去缓存factoryTable中查找,不存在才会创建全新一个实例。
那么,可以理解一个clientID仅能存在一个连接实例了,可这个clientId是怎么产生的呢?继续跟踪看看这段代码
public String buildMQClientId() {
StringBuilder sb = new StringBuilder();
sb.append(this.getClientIP());
sb.append("@");
sb.append(this.getInstanceName());
if (!UtilAll.isBlank(this.unitName)) {
sb.append("@");
sb.append(this.unitName);
}
return sb.toString();
}
代码层面上对clientId进行了约定,格式为“ClientIp@InstanceName”格式,当unitName不为空的时候还会在后面加上“@unitName”。
从代码分析上我们可以知道,为了创建多实例,我们可以
instanceName从哪来的?
instanceName = System.getProperty("rocketmq.client.name", "DEFAULT");
从系统属性中读取出来的,也就是一般在JVM启动时设定的。。。
可以变吗?当然,你可以通过代码去做到,但这么做的话,你会失去让人理解你代码的能力的,哈哈
这就是为什么多少RocketMQ Client都只能连接一个服务器的原因了,它根本不考虑服务器是谁,仅关心自己,自私的家伙!
除此之外还有其它解决方案吗?我仔细从网络上翻了一轮,没看到什么好方法,是大家都没这个场景还是有其它好办法解决了呢?欢迎大家讨论~
在上一篇博文来自平行世界的救赎里面,我做了个工具sandbox,我提供的方法3就是依托于这个工具。
sandbox通过代码隔离的方式,将另一份类定义放入沙箱中运行,从而实现多个实例完全隔离的效果。 MQClientManager
通过缓存方式,以clientId作为key值存储到自身实例当中,为了实现多个Client,那么前两种方法的逻辑是修改clientId实现多个实例,而方法3的逻辑则是“既然你的缓存已经有这个key,我就换个缓存”,本质就是“你这个锅不装我,我就换个锅”。
这里我使用一个springboot项目作为演示案例。
通过springboot的Configuration
将多个RocketMQ Client进行注册,再定义一个Controller接收不同请求去发送MQ消息,最后加上启动类。
我们先从pom文件中引入包(我没有推上maven仓库,各位可以从github/gitee上下载),代码如下
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>me.van</groupId>
<artifactId>rocket-mq-multi-client-test</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>测试多个rocketmq client共存</name>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.6.RELEASE</version>
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<start-class>me.van.App</start-class>
<java.version>1.8</java.version>
<lombok.version>1.14.8</lombok.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
</dependency>
<dependency>
<groupId>me.van</groupId>
<artifactId>sandbox</artifactId>
<version>1.0.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
此处引入了apache的rocketmq-client组件作为mq客户端,也就是存在前面所说的问题的组件。
package me.van;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class App {
public static void main(String[] args) {
SpringApplication.run(App.class, args);
}
}
非常的简单,没什么好介绍的。
package me.van;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MQProducer;
import org.springframework.beans.factory.annotation.Autowire;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class AppConfig {
@Bean(autowire = Autowire.BY_NAME, value = "producer")
MQProducer producer() throws MQClientException {
DefaultMQProducer producer = new DefaultMQProducer();
initProducer(producer, "a.io:9876;b.io:9876");
return producer;
}
@Bean(autowire = Autowire.BY_NAME, value = "producer_sandbox1")
MQProducer producerSandbox1() throws MQClientException, SandboxCannotCreateObjectException {
DefaultMQProducer producer = createProducerInSandbox();
initProducer(producer, "x.io:9876;y.io:9876");
return producer;
}
@Bean(autowire = Autowire.BY_NAME, value = "producer_sandbox2")
MQProducer producerSandbox2() throws MQClientException, SandboxCannotCreateObjectException {
DefaultMQProducer producer = createProducerInSandbox();
initProducer(producer, "1.io:9876;2.io:9876");
return producer;
}
private DefaultMQProducer createProducerInSandbox() throws SandboxCannotCreateObjectException {
Sandbox sandbox = new Sandbox("org.apache.rocketmq.client");
return sandbox.createObject(DefaultMQProducer.class);
}
private void initProducer(DefaultMQProducer producer, String namesrvAddr) throws MQClientException {
producer.setNamesrvAddr(namesrvAddr);
producer.setProducerGroup("test-group");
producer.setRetryAnotherBrokerWhenNotStoreOK(true);
producer.start();
}
}
这里可以看到,producer
对象是直接new 出来的DefaultMQProducer
,而producer_sandbox1
和producer_sandbox2
是通过不同的沙箱创建出来的;三个client分别连接到不同的NameServer中,同时其它属性保持一致。
package me.van;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.MQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class TestController {
@Autowired
MQProducer producer;
@Autowired
MQProducer producer_sandbox1;
@Autowired
MQProducer producer_sandbox2;
@GetMapping("/")
public String hello(){
return "hello world";
}
@GetMapping("/send")
public String send(String msg){
if(null == msg) return "msg is null";
String returnMsg = "";
Message message = new Message("topic-test-multi-mq-client", msg.getBytes());
try {
producer.send(message);
returnMsg += "原生producer发送完成<br/>";
producer_sandbox1.send(message);
returnMsg += "第一个沙箱内producer发送完成<br/>";
producer_sandbox2.send(message);
returnMsg += "第二个沙箱内producer发送完成<br/>";
} catch (MQClientException | InterruptedException | RemotingException | MQBrokerException e) {
returnMsg += "发送过程出现异常:" + e.getMessage();
}
return returnMsg;
}
}
通过send
方法同时向三个producer
发送消息。
运行App
,等几秒钟启动完毕,访问http://localhost:8080/send,返回
msg is null
访问,http://localhost:8080/send?msg=test
github: https://github.com/vancoo/multi-mq-demo
gitee: https://gitee.com/vancoo/multi-mq-demo
参考文档: 来自平行世界的救赎
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。