这期内容当中小编将会给大家带来有关Java搭建RabbitMq消息中间件过程是怎么样的,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。
前言
当系统中出现“生产“和“消费“的速度或稳定性等因素不一致的时候,就需要消息队列。
名词
exchange: 交换机 routingkey: 路由key queue:队列
控制台端口:15672
exchange和queue是需要绑定在一起的,然后消息发送到exchange再由exchange通过routingkey发送到对应的队列中。
使用场景
1.技能订单3分钟自动取消,改变状态
2.直播开始前15分钟提醒
3.直播状态自动结束
流程
生产者发送消息 —> order_pre_exchange交换机 —> order_per_ttl_delay_queue队列
—> 时间到期 —> order_delay_exchange交换机 —> order_delay_process_queue队列 —> 消费者
第一步:在pom文件中添加
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId></dependency>
第二步:在application.properties文件中添加
spring.rabbitmq.host=172.xx.xx.xxxspring.rabbitmq.port=5672spring.rabbitmq.username=rabbitspring.rabbitmq.password=123456spring.rabbitmq.virtual-host=/spring.rabbitmq.connection-timeout=15000spring.rabbitmq.publisher-confirms=truespring.rabbitmq.publisher-returns=truespring.rabbitmq.template.mandatory=true
第三步:配置 OrderQueueConfig
package com.tuohang.platform.config;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.DirectExchange;import org.springframework.amqp.core.Queue;import org.springframework.amqp.core.QueueBuilder;import org.springframework.amqp.rabbit.connection.ConnectionFactory;import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/** * rabbitMQ的队列设置(生产者发送的消息,永远是先进入exchange,再通过路由,转发到队列) * * * @author Administrator * @version 1.0 * @Date 2018年9月18日 */@Configurationpublic class OrderQueueConfig { /** * 订单缓冲交换机名称 */ public final static String ORDER_PRE_EXCHANGE_NAME = "order_pre_exchange"; /** * 发送到该队列的message会在一段时间后过期进入到order_delay_process_queue 【队列里所有的message都有统一的失效时间】 */ public final static String ORDER_PRE_TTL_DELAY_QUEUE_NAME = "order_pre_ttl_delay_queue"; /** * 订单的交换机DLX 名字 */ final static String ORDER_DELAY_EXCHANGE_NAME = "order_delay_exchange"; /** * 订单message时间过期后进入的队列,也就是订单实际的消费队列 */ public final static String ORDER_DELAY_PROCESS_QUEUE_NAME = "order_delay_process_queue"; /** * 订单在缓冲队列过期时间(毫秒)30分钟 */ public final static int ORDER_QUEUE_EXPIRATION = 1800000; /** * 订单缓冲交换机 * * @return */ @Bean public DirectExchange preOrderExange() { return new DirectExchange(ORDER_PRE_EXCHANGE_NAME); } /** * 创建order_per_ttl_delay_queue队列,订单消息经过缓冲交换机,会进入该队列 * * @return */ @Bean public Queue delayQueuePerOrderTTLQueue() { return QueueBuilder.durable(ORDER_PRE_TTL_DELAY_QUEUE_NAME) .withArgument("x-dead-letter-exchange", ORDER_DELAY_EXCHANGE_NAME) // DLX .withArgument("x-dead-letter-routing-key", ORDER_DELAY_PROCESS_QUEUE_NAME) // dead letter携带的routing key .withArgument("x-message-ttl", ORDER_QUEUE_EXPIRATION) // 设置订单队列的过期时间 .build(); } /** * 将order_pre_exchange绑定到order_pre_ttl_delay_queue队列 * * @param delayQueuePerOrderTTLQueue * @param preOrderExange * @return */ @Bean public Binding queueOrderTTLBinding(Queue delayQueuePerOrderTTLQueue, DirectExchange preOrderExange) { return BindingBuilder.bind(delayQueuePerOrderTTLQueue).to(preOrderExange).with(ORDER_PRE_TTL_DELAY_QUEUE_NAME); } /** * 创建订单的DLX exchange * * @return */ @Bean public DirectExchange delayOrderExchange() { return new DirectExchange(ORDER_DELAY_EXCHANGE_NAME); } /** * 创建order_delay_process_queue队列,也就是订单实际消费队列 * * @return */ @Bean public Queue delayProcessOrderQueue() { return QueueBuilder.durable(ORDER_DELAY_PROCESS_QUEUE_NAME).build(); } /** * 将DLX绑定到实际消费队列 * * @param delayProcessOrderQueue * @param delayExchange * @return */ @Bean public Binding dlxOrderBinding(Queue delayProcessOrderQueue, DirectExchange delayOrderExchange) { return BindingBuilder.bind(delayProcessOrderQueue).to(delayOrderExchange).with(ORDER_DELAY_PROCESS_QUEUE_NAME); } /** * 监听订单实际消费者队列order_delay_process_queue * * @param connectionFactory * @param processReceiver * @return */ @Bean public SimpleMessageListenerContainer orderProcessContainer(ConnectionFactory connectionFactory, OrderProcessReceiver processReceiver) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setQueueNames(ORDER_DELAY_PROCESS_QUEUE_NAME); // 监听order_delay_process_queue container.setMessageListener(new MessageListenerAdapter(processReceiver)); return container; }}
消费者 OrderProcessReceiver :
package com.tuohang.platform.config;import java.util.Objects;import org.apache.tools.ant.types.resources.selectors.Date;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;import org.springframework.stereotype.Component;import com.rabbitmq.client.Channel;/** * 订单延迟处理消费者 * * * @author Administrator * @version 1.0 * @Date 2018年9月18日 */@Componentpublic class OrderProcessReceiver implements ChannelAwareMessageListener { private static Logger logger = LoggerFactory.getLogger(OrderProcessReceiver.class); String msg = "The failed message will auto retry after a certain delay"; @Override public void onMessage(Message message, Channel channel) throws Exception { try { processMessage(message); } catch (Exception e) { // 如果发生了异常,则将该消息重定向到缓冲队列,会在一定延迟之后自动重做 channel.basicPublish(OrderQueueConfig.ORDER_PRE_EXCHANGE_NAME, OrderQueueConfig.ORDER_PRE_TTL_DELAY_QUEUE_NAME, null, msg.getBytes()); } } /** * 处理订单消息,如果订单未支付,取消订单(如果当消息内容为FAIL_MESSAGE的话,则需要抛出异常) * * @param message * @throws Exception */ public void processMessage(Message message) throws Exception { String realMessage = new String(message.getBody()); logger.info("Received <" + realMessage + ">"); // 取消订单 if(!Objects.equals(realMessage, msg)) {// SpringKit.getBean(ITestService.class).resetSexById(Long.valueOf(realMessage)); System.out.println("测试111111-----------"+new Date()); System.out.println(message); } }}
或者
/** * 测试 rabbit 消费者 * * * @author Administrator * @version 1.0 * @Date 2018年9月25日 */@Component@RabbitListener(queues = TestQueueConfig.TEST_DELAY_PROCESS_QUEUE_NAME)public class TestProcessReceiver { private static Logger logger = LoggerFactory.getLogger(TestProcessReceiver.class); String msg = "The failed message will auto retry after a certain delay"; @RabbitHandler public void onMessage(Message message, Channel channel) throws Exception { try { processMessage(message); //告诉服务器收到这条消息 已经被我消费了 可以在队列删掉;否则消息服务器以为这条消息没处理掉 后续还会在发 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { // 如果发生了异常,则将该消息重定向到缓冲队列,会在一定延迟之后自动重做 channel.basicPublish(TestQueueConfig.TEST_PRE_EXCHANGE_NAME, TestQueueConfig.TEST_PRE_TTL_DELAY_QUEUE_NAME, null, msg.getBytes()); } } /** * 处理订单消息,如果订单未支付,取消订单(如果当消息内容为FAIL_MESSAGE的话,则需要抛出异常) * * @param message * @throws Exception */ public void processMessage(Message message) throws Exception { String realMessage = new String(message.getBody()); logger.info("Received < " + realMessage + " >"); // 取消订单 if(!Objects.equals(realMessage, msg)) { System.out.println("测试111111-----------"+new Date()); }else { System.out.println("rabbit else..."); } }}
生产者
/** * 测试rabbitmq * * @return */ @RequestMapping(value = "/testrab") public String testraa() { GenericResult gr = null; try { String name = "test_pre_ttl_delay_queue"; long expiration = 10000;//10s 过期时间 rabbitTemplate.convertAndSend(name,String.valueOf(123456)); // 在单个消息上设置过期时间 //rabbitTemplate.convertAndSend(name,(Object)String.valueOf(123456), new ExpirationMessagePostProcessor(expiration)); } catch (ServiceException e) { e.printStackTrace(); gr = new GenericResult(StateCode.ERROR, languageMap.get("network_error"), e.getMessage()); } return getWrite(gr); }
上述就是小编为大家分享的Java搭建RabbitMq消息中间件过程是怎么样的了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注亿速云行业资讯频道。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。