这篇文章主要介绍“如何使用RabbitMQ配置死信队列”,在日常操作中,相信很多人在如何使用RabbitMQ配置死信队列问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”如何使用RabbitMQ配置死信队列”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
当发生以下情况之一时,来自消息队列的可能是“死信”:
消息被拒绝并且重新排队设置为 false
消息的 TTL 过期
超出队列长度限制
为了通过示例进行演示,我选择了第一种情况,即消息被拒绝。生产者将PaymentOrders作为消息发送,这些消息将由消费者处理。当PaymentOrder付款人账户资金不足时,消息将被拒绝。
生产者是一个 Spring Boot 应用程序,它使用Spring AMQP库向PaymentOrderRabbitMQ 发送消息。
生产者的 API
生产者 API 的第一部分是定义交换器的名称、路由密钥、传入和死信队列。
public class Constants { public static final String EXCHANGE_NAME = "payment-orders.exchange"; public static final String ROUTING_KEY_NAME = "payment-orders"; public static final String INCOMING_QUEUE_NAME = "payment-orders.incoming.queue"; public static final String DEAD_LETTER_QUEUE_NAME = "payment-orders.dead-letter.queue"; }
第二部分是定义消息格式。我们在此示例中使用 JSON。以下 JSON 文档显示了我们如何建模PaymentOrder
{ "from":"SA54 22PS JCLV 7LWT 7LHY EBLO", "to":"IT23 K545 5414 339G WLPI 2YF6 VBP", "amount":54.75}
请注意,最好不要使用自定义序列化格式(如有效负载的 Java 序列化),因为这意味着您需要有一个基于 Java 的使用者。好的做法是将有效负载格式化为 JSON。每个平台和/或语言都可以解析 JSON。
生产者配置
我们需要配置 AMQP 基础设施。死信队列配置封装在传入队列声明中。
有一个死信交换direct(DLX) 的概念,它是类型topic或的正常交换fanout。如果在处理从队列中获取的消息期间发生故障,RabbitMQ 会检查是否为该队列配置了死信交换。如果通过x-dead-letter-exchange参数配置了一个,那么它将使用原始路由密钥将失败的消息路由到它。可以通过x-dead-letter-routing-key参数覆盖此路由键。
在此示例中,我们使用default exchange(no-name) 作为 the dead letter exchange,并使用死信队列名称作为新的路由键。这将起作用,因为任何队列都绑定到默认交换,绑定键等于队列名称。
@Configurationpublic class AmqpConfig { @Bean DirectExchange exchange() { return new DirectExchange(Constants.EXCHANGE_NAME); } @Bean Queue incomingQueue() { return QueueBuilder.durable(Constants.INCOMING_QUEUE_NAME) .withArgument("x-dead-letter-exchange", "") .withArgument("x-dead-letter-routing-key", Constants.DEAD_LETTER_QUEUE_NAME) .build(); } @Bean Binding binding() { return BindingBuilder.bind(incomingQueue()).to(exchange()).with(Constants.ROUTING_KEY_NAME); } @Bean Queue deadLetterQueue() { return QueueBuilder.durable(Constants.DEAD_LETTER_QUEUE_NAME).build(); } @Bean public Jackson2JsonMessageConverter jackson2JsonMessageConverter() { return new Jackson2JsonMessageConverter(); } }
用于队列和交换的构建器 API 非常方便,并且从 Spring AMQP 库的 1.6 版本开始可用。
在 RabbitMQ 管理控制台中,DLX和DLK标签指示在传入队列上设置了dead letter exchange和dead letter routing key参数。
生产者逻辑
生产者每 5 秒生成一次随机PaymentOrder消息,这些消息被发送到 RabbitMQ 进行进一步处理。SpringAmqpTemplate是自动配置的,它可以连接到我们的组件中。由于消息格式是 JSON ,Jackson2JsonMessageConverter因此定义了它将自动关联到 auto-configured AmqpTemplate。
@Componentpublic class Producer { private AmqpTemplate amqpTemplate; public Producer(AmqpTemplate amqpTemplate) { this.amqpTemplate = amqpTemplate; } @Scheduled(fixedDelay = 1000L) public void send() { PaymentOrder paymentOrder = new PaymentOrder( Iban.random().toFormattedString(), Iban.random().toFormattedString(), new BigDecimal(1D + new Random().nextDouble() * 100D).setScale(2, BigDecimal.ROUND_FLOOR)); amqpTemplate.convertAndSend(Constants.EXCHANGE_NAME, Constants.ROUTING_KEY_NAME, paymentOrder); } }
对于这个简单的示例,消费者也是一个 Spring Boot 应用程序,但在实际应用程序中,消费者和生产者不必在同一平台/语言上。
消费者 API
消费者 API 的第一部分是指定它连接到哪个队列。
public class Constants { public static final String DEAD_LETTER_QUEUE_NAME = "payment-orders.dead-letter.queue"; public static final String INCOMING_QUEUE_NAME = "payment-orders.incoming.queue"; }
第二部分是适应生产者定义的消息格式。请注意,在这种情况下,两个应用程序都是基于 Java 的,因此我可以创建一个包含PaymentOrder类文件的 jar 文件并与消费者和生产者共享它。然而,这是不好的做法,因为它引入了基于共享库的紧密耦合。更好的方法是使用一些代码重复(PaymentOrder在这种情况下为类)并通过同意消息格式来使用更松散的耦合方法。
public class PaymentOrder { String from; String to; BigDecimal amount; @JsonCreator public PaymentOrder(@JsonProperty("from") String from, @JsonProperty("to") String to, @JsonProperty("amount") BigDecimal amount) { this.from = from; this.to = to; this.amount = amount; } // getters and toString()}
消费者配置
消费者只关心从中获取消息的队列。传入队列必须存在,否则消费者将无法启动。请注意,dead letter queue消费者启动时不必存在 ,但在消息需要“死信”时它应该存在。如果它丢失,则消息将被静默丢弃。
@Configurationpublic class AmqpConfig { @Bean Queue incomingQueue() { return QueueBuilder.durable(Constants.INCOMING_QUEUE_NAME) .withArgument("x-dead-letter-exchange", "") .withArgument("x-dead-letter-routing-key", Constants.DEAD_LETTER_QUEUE_NAME) .build(); } @Bean public Jackson2JsonMessageConverter jackson2JsonMessageConverter() { return new Jackson2JsonMessageConverter(); } }
默认情况下启用重新排队。为了“死信”消息,您需要将以下属性设置为 false。
spring: rabbitmq: listener: default-requeue-rejected: false
但是,如果您想在某些错误情况下启用重新排队,最好保持启用重新排队并利用AmqpRejectAndDontRequeueException将发送basic.reject带有 requeue=false 的选项。
消费逻辑
每当传入队列上有消息可用时,将使用反序列化的实例process调用该方法。在这里,我们通过抛出一个扩展异常PaymentOrder来模拟消息拒绝。InsufficientFundsExceptionAmqpRejectAndDontRequeueException
@Componentpublic class Consumer { @RabbitListener(queues = Constants.INCOMING_QUEUE_NAME) public void process(@Payload PaymentOrder paymentOrder) throws InsufficientFundsException { if (new Random().nextBoolean()) { throw new InsufficientFundsException("insufficient funds on account " + paymentOrder.getFrom()); } } }
下图显示了一条消息的示例,该PaymentOrder消息被拒绝并最终进入dead letter queue
有时它有助于自动重试失败的操作,以防它可能在后续尝试中成功。RetryTemplateSpring AMQP 库在Spring Retry项目(从 Spring Batch 中提取)的帮助下提供了对此的支持。Spring Boot 使配置变得非常容易,RetryTemplate如下面的示例所示。
spring: rabbitmq: listener: retry: enabled: true initial-interval: 2000 max-attempts: 2 multiplier: 1.5 max-interval: 5000
使用上述配置,重试功能已启用(默认情况下禁用),最多应有 2 次尝试传递消息,第一次和第二次尝试之间应为 2 秒,稍后与上一次重试间隔乘以 1.5 和最多 5 秒。运行您将在日志中看到的消费者
2016-09-07 21:56:53.396 INFO 11995 --- [cTaskExecutor-1] com.example.consumer.Consumer : Processing at 'Wed Sep 07 21:56:53 CEST 2016' payload 'PaymentOrder{from='RS32 5346 0536 6006 4886 88', to='FI61 8364 3364 9834 16', amount=45.57}'2016-09-07 21:56:55.399 INFO 11995 --- [cTaskExecutor-1] com.example.consumer.Consumer : Processing at 'Wed Sep 07 21:56:55 CEST 2016' payload 'PaymentOrder{from='RS32 5346 0536 6006 4886 88', to='FI61 8364 3364 9834 16', amount=45.57}'2016-09-07 21:56:55.401 WARN 11995 --- [cTaskExecutor-1] o.s.a.r.r.RejectAndDontRequeueRecoverer : Retries exhausted for message (Body:'{"from":"RS32 5346 0536 6006 4886 88","to":"FI61 8364 3364 9834 16","amount":45.57}' MessageProperties [headers={__TypeId__=com.example.producer.api.PaymentOrder}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=application/json, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=payment-orders.exchange, receivedRoutingKey=payment-orders, receivedDelay=null, deliveryTag=31, messageCount=0, consumerTag=amq.ctag-vd18OXS9PSOeJmBQLY4o-w, consumerQueue=payment-orders.incoming.queue])
到此,关于“如何使用RabbitMQ配置死信队列”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注亿速云网站,小编会继续努力为大家带来更多实用的文章!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。