温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

如何使用RabbitMQ配置死信队列

发布时间:2022-09-27 16:15:57 来源:亿速云 阅读:310 作者:iii 栏目:开发技术

这篇文章主要介绍“如何使用RabbitMQ配置死信队列”,在日常操作中,相信很多人在如何使用RabbitMQ配置死信队列问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”如何使用RabbitMQ配置死信队列”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

当发生以下情况之一时,来自消息队列的可能是“死信”:

消息被拒绝并且重新排队设置为 false

消息的 TTL 过期

超出队列长度限制

为了通过示例进行演示,我选择了第一种情况,即消息被拒绝。生产者将PaymentOrders作为消息发送,这些消息将由消费者处理。当PaymentOrder付款人账户资金不足时,消息将被拒绝。

生产者

生产者是一个 Spring Boot 应用程序,它使用Spring AMQP库向PaymentOrderRabbitMQ 发送消息。

生产者的 API

生产者 API 的第一部分是定义交换器的名称、路由密钥、传入和死信队列。

public class Constants {    public static final String EXCHANGE_NAME = "payment-orders.exchange";    public static final String ROUTING_KEY_NAME = "payment-orders";    public static final String INCOMING_QUEUE_NAME = "payment-orders.incoming.queue";    public static final String DEAD_LETTER_QUEUE_NAME = "payment-orders.dead-letter.queue";
}

第二部分是定义消息格式。我们在此示例中使用 JSON。以下 JSON 文档显示了我们如何建模PaymentOrder

{  "from":"SA54 22PS JCLV 7LWT 7LHY EBLO",  "to":"IT23 K545 5414 339G WLPI 2YF6 VBP",  "amount":54.75}

请注意,最好不要使用自定义序列化格式(如有效负载的 Java 序列化),因为这意味着您需要有一个基于 Java 的使用者。好的做法是将有效负载格式化为 JSON。每个平台和/或语言都可以解析 JSON。

生产者配置

我们需要配置 AMQP 基础设施。死信队列配置封装在传入队列声明中。

有一个死信交换direct(DLX) 的概念,它是类型topic或的正常交换fanout。如果在处理从队列中获取的消息期间发生故障,RabbitMQ 会检查是否为该队列配置了死信交换。如果通过x-dead-letter-exchange参数配置了一个,那么它将使用原始路由密钥将失败的消息路由到它。可以通过x-dead-letter-routing-key参数覆盖此路由键。

在此示例中,我们使用default exchange(no-name) 作为 the dead letter exchange,并使用死信队列名称作为新的路由键。这将起作用,因为任何队列都绑定到默认交换,绑定键等于队列名称。

@Configurationpublic class AmqpConfig {
    @Bean
    DirectExchange exchange() {        return new DirectExchange(Constants.EXCHANGE_NAME);
    }
    @Bean
    Queue incomingQueue() {        return QueueBuilder.durable(Constants.INCOMING_QUEUE_NAME)                .withArgument("x-dead-letter-exchange", "")                .withArgument("x-dead-letter-routing-key", Constants.DEAD_LETTER_QUEUE_NAME)                .build();
    }
    @Bean
    Binding binding() {        return BindingBuilder.bind(incomingQueue()).to(exchange()).with(Constants.ROUTING_KEY_NAME);
    }
    @Bean
    Queue deadLetterQueue() {        return QueueBuilder.durable(Constants.DEAD_LETTER_QUEUE_NAME).build();
    }
    @Bean
    public Jackson2JsonMessageConverter jackson2JsonMessageConverter() {        return new Jackson2JsonMessageConverter();
    }
}

用于队列和交换的构建器 API 非常方便,并且从 Spring AMQP 库的 1.6 版本开始可用。

在 RabbitMQ 管理控制台中,DLX和DLK标签指示在传入队列上设置了dead letter exchange和dead letter routing key参数。

如何使用RabbitMQ配置死信队列

生产者逻辑

生产者每 5 秒生成一次随机PaymentOrder消息,这些消息被发送到 RabbitMQ 进行进一步处理。SpringAmqpTemplate是自动配置的,它可以连接到我们的组件中。由于消息格式是 JSON ,Jackson2JsonMessageConverter因此定义了它将自动关联到 auto-configured AmqpTemplate。

@Componentpublic class Producer {    private AmqpTemplate amqpTemplate;    public Producer(AmqpTemplate amqpTemplate) {        this.amqpTemplate = amqpTemplate;
    }    @Scheduled(fixedDelay = 1000L)    public void send() {
        PaymentOrder paymentOrder = new PaymentOrder(
                Iban.random().toFormattedString(),
                Iban.random().toFormattedString(),                new BigDecimal(1D + new Random().nextDouble() * 100D).setScale(2, BigDecimal.ROUND_FLOOR));
        amqpTemplate.convertAndSend(Constants.EXCHANGE_NAME, Constants.ROUTING_KEY_NAME, paymentOrder);
    }
}

消费者

对于这个简单的示例,消费者也是一个 Spring Boot 应用程序,但在实际应用程序中,消费者和生产者不必在同一平台/语言上。

消费者 API

消费者 API 的第一部分是指定它连接到哪个队列。

public class Constants {    public static final String DEAD_LETTER_QUEUE_NAME = "payment-orders.dead-letter.queue";    public static final String INCOMING_QUEUE_NAME = "payment-orders.incoming.queue";
}

第二部分是适应生产者定义的消息格式。请注意,在这种情况下,两个应用程序都是基于 Java 的,因此我可以创建一个包含PaymentOrder类文件的 jar 文件并与消费者和生产者共享它。然而,这是不好的做法,因为它引入了基于共享库的紧密耦合。更好的方法是使用一些代码重复(PaymentOrder在这种情况下为类)并通过同意消息格式来使用更松散的耦合方法。

public class PaymentOrder {    String from;    String to;
    BigDecimal amount;
    @JsonCreator
    public PaymentOrder(@JsonProperty("from") String from,
                        @JsonProperty("to") String to,
                        @JsonProperty("amount") BigDecimal amount) {        this.from = from;        this.to = to;        this.amount = amount;
    }    // getters and toString()}

消费者配置

消费者只关心从中获取消息的队列。传入队列必须存在,否则消费者将无法启动。请注意,dead letter queue消费者启动时不必存在 ,但在消息需要“死信”时它应该存在。如果它丢失,则消息将被静默丢弃。

@Configurationpublic class AmqpConfig {    @Bean
    Queue incomingQueue() {        return QueueBuilder.durable(Constants.INCOMING_QUEUE_NAME)
                .withArgument("x-dead-letter-exchange", "")
                .withArgument("x-dead-letter-routing-key", Constants.DEAD_LETTER_QUEUE_NAME)
                .build();
    }    @Bean
    public Jackson2JsonMessageConverter jackson2JsonMessageConverter() {        return new Jackson2JsonMessageConverter();
    }
}

默认情况下启用重新排队。为了“死信”消息,您需要将以下属性设置为 false。

spring:
  rabbitmq:
    listener:      default-requeue-rejected: false

但是,如果您想在某些错误情况下启用重新排队,最好保持启用重新排队并利用AmqpRejectAndDontRequeueException将发送basic.reject带有 requeue=false 的选项。

消费逻辑

每当传入队列上有消息可用时,将使用反序列化的实例process调用该方法。在这里,我们通过抛出一个扩展异常PaymentOrder来模拟消息拒绝。InsufficientFundsExceptionAmqpRejectAndDontRequeueException

@Componentpublic class Consumer {    @RabbitListener(queues = Constants.INCOMING_QUEUE_NAME)    public void process(@Payload PaymentOrder paymentOrder) throws InsufficientFundsException {        if (new Random().nextBoolean()) {            throw new InsufficientFundsException("insufficient funds on account " + paymentOrder.getFrom());
        }
    }
}

下图显示了一条消息的示例,该PaymentOrder消息被拒绝并最终进入dead letter queue

如何使用RabbitMQ配置死信队列

有时它有助于自动重试失败的操作,以防它可能在后续尝试中成功。RetryTemplateSpring AMQP 库在Spring Retry项目(从 Spring Batch 中提取)的帮助下提供了对此的支持。Spring Boot 使配置变得非常容易,RetryTemplate如下面的示例所示。

spring:
  rabbitmq:
    listener:
      retry:
        enabled: true
        initial-interval: 2000
        max-attempts: 2
        multiplier: 1.5
        max-interval: 5000

使用上述配置,重试功能已启用(默认情况下禁用),最多应有 2 次尝试传递消息,第一次和第二次尝试之间应为 2 秒,稍后与上一次重试间隔乘以 1.5 和最多 5 秒。运行您将在日志中看到的消费者

2016-09-07 21:56:53.396  INFO 11995 --- [cTaskExecutor-1] com.example.consumer.Consumer            : Processing at 'Wed Sep 07 21:56:53 CEST 2016' payload 'PaymentOrder{from='RS32 5346 0536 6006 4886 88', to='FI61 8364 3364 9834 16', amount=45.57}'2016-09-07 21:56:55.399  INFO 11995 --- [cTaskExecutor-1] com.example.consumer.Consumer            : Processing at 'Wed Sep 07 21:56:55 CEST 2016' payload 'PaymentOrder{from='RS32 5346 0536 6006 4886 88', to='FI61 8364 3364 9834 16', amount=45.57}'2016-09-07 21:56:55.401  WARN 11995 --- [cTaskExecutor-1] o.s.a.r.r.RejectAndDontRequeueRecoverer  : Retries exhausted for message (Body:'{"from":"RS32 5346 0536 6006 4886 88","to":"FI61 8364 3364 9834 16","amount":45.57}' MessageProperties [headers={__TypeId__=com.example.producer.api.PaymentOrder}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=application/json, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=payment-orders.exchange, receivedRoutingKey=payment-orders, receivedDelay=null, deliveryTag=31, messageCount=0, consumerTag=amq.ctag-vd18OXS9PSOeJmBQLY4o-w, consumerQueue=payment-orders.incoming.queue])

到此,关于“如何使用RabbitMQ配置死信队列”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注亿速云网站,小编会继续努力为大家带来更多实用的文章!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI