本篇内容主要讲解“rabbitmq多个消费者同时接收_RabbitMQ实现公平派遣任务的方法”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“rabbitmq多个消费者同时接收_RabbitMQ实现公平派遣任务的方法”吧!
先来看一下 RabbitMQ 工作队列和竞争消费者简化模型:
P:(producer/ publisher):生产者,一个发送消息的用户应用程序。
C1:消费者1,一个主要用来等待接收消息的用户应用程序1。
C2:消费者2,一个主要用来等待接收消息的用户应用程序2。
(C1 和 C2 指代多个消费者(工作人员))
工作队列(任务队列):主要用于在多个工作人员(消费者)之间分配耗时的任务,主要思想是避免必须等待某些立即执行的资源密集型任务完成的尴尬局面发生。将任务封装为消息并将其发送到队列,这样,安排的任务可以在以后完成。当有很多消费者时,任务将在他们之间共享,一个消息只能被一个消费者获取。
这个概念在Web应用程序中特别有用,因为在Web应用程序中,不可能在较短的HTTP请求窗口内处理复杂的任务。
生产者发送新任务消息:
public class NewTask {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//循环发送 50 条信息
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
for(int i = 0; i < 50; i++) {
String message = "task..." + i;
channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));
System.out.println(" [x]Sent:" + message);
try {
Thread.sleep(i * 2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
channel.close();
connection.close();
}
}
消费者1(消费者1设置消费耗时):
假设消耗 1 秒来处理消费过程
public class Worker {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
// 设置每个消费者同时只能处理一条消息
//channel.basicQos(1);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String msg = new String(body);
System.out.println("[消费者1] Received '" + msg + "'");
try{
Thread.sleep(1000); // 模拟消费者耗时
}catch (InterruptedException e){
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
channel.basicConsume(TASK_QUEUE_NAME, false, consumer);
}
}
消费者2(消费者2不设置消费耗时):
public class Worker2 {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
// 设置每个消费者同时只能处理一条消息
//channel.basicQos(1);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String msg = new String(body);
System.out.println("[消费者2] Received '" + msg + "'");
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
channel.basicConsume(TASK_QUEUE_NAME, false, consumer);
}
}
首先启动消费者1(Worker)和消费者2(Worker2)应用程序,再启动生产者(NewTask)。
运行结果如下:
消费者1:
消费者2:
由结果可见,两个消费者各自消费了 25 条信息,消息各不相同,从而实现了任务的分发。
公平派遣任务
由结果,我们注意到,工作者1(消费者1)的处理任务比较忙碌(消费耗时),而消费者2却有大量时间处于空闲状态不做任何任务。而 RabbitMQ 对此一无所知,进行平均分配消息,这是因为 RabbitMQ 在消息进入队列才进行信息调度,不会进行未确认消息数,只是盲目地将消息发送给消费者。
为了解决这一点,可以使用 basicQos 方法并将入参参数设置为 1,这样做的意义是:RabbitMQ 一次不要给消费者一个以上的消息。换句话即是在消费者处理并确认上一条信息之前,不要向其发送新的消息,而是分派给不繁忙的消费者进行消费。
在消费者1和消费者2加入一段代码:
// 设置每个消费者同时只能处理一条消息
channel.basicQos(1);
再次运行结果:
消费者1:
消费者2:
因此,通过设置 basicQos ,可以让 RabbitMQ 实现分轻重地对任务进行分派。
到此,相信大家对“rabbitmq多个消费者同时接收_RabbitMQ实现公平派遣任务的方法”有了更深的了解,不妨来实际操作一番吧!这里是亿速云网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。