温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

RabbitMQ用多路由,多队列来破除流控

发布时间:2021-06-22 14:34:35 来源:亿速云 阅读:171 作者:chen 栏目:大数据

本篇内容主要讲解“RabbitMQ用多路由,多队列来破除流控”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“RabbitMQ用多路由,多队列来破除流控”吧!

流控机制是我们在使用RabbitMQ最头疼的问题,一旦并发激增时,消费者消费队列消息就像滴水一样慢。

现在我们下单后,需要给通知中心发送消息,让通知中心通知服务商收取订单,并确认提供服务。

我们先给Order接口添加一个发送消息的方法。

public interface Order {public void makeOrder(Order order);    public OrderSuccessResult getResult(Order order);    public void postOrder(Order order);}

实现类实现该方法

@Data@AllArgsConstructor@NoArgsConstructor@ServiceOrderVersion(value = 1)@RequiredArgsConstructorpublic class ServiceOrder extends AbstractOrder {private Long id;    @NonNull    private String code;    @NonNull    private Store store;    @NonNull    private ProviderService service;    @NonNull    private Car car;    @NonNull    private Date serviceDate;    @NonNull    private String contact;    @NonNull    private String contactTel;    private AppUser user;    @NonNull    private String content;    private int status;    private Date createDate;    @Override    public void makeOrder(Order order) {
        ServiceOrderDao serviceOrderDao = SpringBootUtil.getBean(ServiceOrderDao.class);        IdService idService = SpringBootUtil.getBean(IdService.class);        ((ServiceOrder)order).setId(idService.genId());        ((ServiceOrder)order).setCode(getCodeInfo(idService));        AppUser loginAppUser = AppUserUtil.getLoginAppUser();        AppUser user = new AppUser();        user.setId(loginAppUser.getId());        user.setUsername(loginAppUser.getUsername());        ((ServiceOrder)order).setUser(user);        ((ServiceOrder)order).setStatus(1);        ((ServiceOrder)order).setCreateDate(new Date());        serviceOrderDao.save((ServiceOrder) order);    }@Override    public OrderSuccessResult getResult(Order order) {
        ServiceOrderSuccessResultFactory orderSuccessResultFactory = SpringBootUtil.getBean(ServiceOrderSuccessResultFactory.class);        this.orderSuccessResult = orderSuccessResultFactory.getOrderSuccessResult();        return this.orderSuccessResult.getResult(order);    }@Override    public void postOrder(Order order) {
        MessageSender sender = SpringBootUtil.getBean(MessageSender.class);        CompletableFuture.runAsync(() ->sender.send(OwnerCarCenterMq.MQ_EXCHANGE_ORDER,                        OwnerCarCenterMq.ROUTING_KEY_ORDER,                        order)
        );    }private String getCodeInfo(IdService idService) {
        String flow = String.valueOf(idService.genId());        flow = flow.substring(14,flow.length());        String pre = DateUtils.format(new Date(), DateUtils.pattern9);        return pre + flow;    }
}

其中我们定义了这么一组队列名,交换机,和路由

public interface OwnerCarCenterMq {/**     * 队列名     */    String ORDER_QUEUE = "order";    /**     * 服务系统exchange名     */    String MQ_EXCHANGE_ORDER = "order.topic.exchange";    /**     * 服务添加routing key     */    String ROUTING_KEY_ORDER = "post.order";}

为了避免流控,我们定义了10个队列,并全部绑定到一个交换机上。

@Configurationpublic class RabbitmqConfig {   @Bean   public List<Queue> orderQueues() {
      List<Queue> queues = new ArrayList<>();      for (int i = 1;i < 11;i++) {
         Queue queue = new Queue(OwnerCarCenterMq.ORDER_QUEUE + "_" + i);         queues.add(queue);      }      return queues;   }   @Bean   public TopicExchange orderExchange() {      return new TopicExchange(OwnerCarCenterMq.MQ_EXCHANGE_ORDER);   }   @Bean   public List<Binding> bindingOrders() {
      List<Binding> bindings = new ArrayList<>();      for (int i = 1;i < 11;i++) {
         Binding binding = BindingBuilder.bind(orderQueues().get(i - 1)).to(orderExchange())
               .with(OwnerCarCenterMq.ROUTING_KEY_ORDER + "_" + i);         bindings.add(binding);      }      return bindings;   }
}

重新封装消息提供者,每次发送都随机选取一个路由来进行发送。

@Slf4j@Componentpublic class MessageSender implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {@Autowired    private RabbitTemplate rabbitTemplate;    public void send(String exchange,String routingKey,Object content) {log.info("send content=" + content);        this.rabbitTemplate.setMandatory(true);        this.rabbitTemplate.setConfirmCallback(this);        this.rabbitTemplate.setReturnCallback(this);        ThreadLocalRandom random = ThreadLocalRandom.current();        this.rabbitTemplate.convertAndSend(exchange,routingKey + "_" + random.nextInt(1,11),serialize(content));    }/**     * 确认后回调:     * @param correlationData     * @param ack     * @param cause     */    @Override    public void confirm(CorrelationData correlationData, boolean ack, String cause) {if (!ack) {log.info("send ack fail, cause = " + cause);        } else {log.info("send ack success");        }
    }/**     * 失败后return回调:     *     * @param message     * @param replyCode     * @param replyText     * @param exchange     * @param routingKey     */    @Override    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {log.info("send fail return-message = " + new String(message.getBody()) + ", replyCode: " + replyCode + ", replyText: " + replyText + ", exchange: " + exchange + ", routingKey: " + routingKey);    }/**     * 对消息对象进行二进制序列化     * @param o     * @return     */    private byte[] serialize(Object o) {
        Kryo kryo = new Kryo();        ByteArrayOutputStream stream = new ByteArrayOutputStream();        Output output = new Output(stream);        kryo.writeObject(output, o);        output.close();        return stream.toByteArray();    }
}

我们可以看到在ServiceOrder里,我们是通过异步来进行发送到。

Controller如下

@Slf4j@RestControllerpublic class OrderController {private ThreadLocal<OrderFactory> orderFactory = new ThreadLocal<>();    private ThreadLocal<Order> orderService = new ThreadLocal<>();    @Autowired    private OrderBean orderBean;    @Transactional    @SuppressWarnings("unchecked")@PostMapping("/makeeorder")public Result<OrderSuccessResult> makeOrder(@RequestBody String orderStr, @RequestParam("type") String type) {log.info(orderStr);        Order order = setOrderFactory(orderStr,type);        orderService.get().makeOrder(order);        orderService.get().postOrder(order);        return Result.success(orderService.get().getResult(order));    }/**     * 判断是哪一种类型的订单来获取哪一种类型的具体订单工厂     * @param orderStr     * @return     */    private Order setOrderFactory(String orderStr,String type) {
        Class<?> classType = orderBean.getOrderMap().get(type);        Object order = JSONObject.parseObject(orderStr, classType);//        if (orderStr.contains("service")) {//            order = JSON.parseObject(orderStr, ServiceOrder.class);//        }else if (orderStr.contains("product")) {//            order = JSON.parseObject(orderStr, ProductOrder.class);//        }        Class<?> classFactoryType = orderBean.getOrderFactoryMap().get(type + "Factory");        this.orderFactory.set((OrderFactory) SpringBootUtil.getBean(classFactoryType));//        if (order instanceof ServiceOrder) {//            this.orderFactory.set(SpringBootUtil.getBean(ServiceOrderFactory.class));//        }else if (order instanceof ProductOrder) {//            this.orderFactory.set(SpringBootUtil.getBean(ProductOrderFactory.class));//        }        orderService.set(orderFactory.get().getOrder());        return (Order) order;    }
}

最后是在我们的通知中心模块接收消息,同时对这10个队列实行监控

@Slf4j@Component@RabbitListener(queues = {OwnerCarCenterMq.ORDER_QUEUE + "_" + 1,        OwnerCarCenterMq.ORDER_QUEUE + "_" + 2,        OwnerCarCenterMq.ORDER_QUEUE + "_" + 3,        OwnerCarCenterMq.ORDER_QUEUE + "_" + 4,        OwnerCarCenterMq.ORDER_QUEUE + "_" + 5,        OwnerCarCenterMq.ORDER_QUEUE + "_" + 6,        OwnerCarCenterMq.ORDER_QUEUE + "_" + 7,        OwnerCarCenterMq.ORDER_QUEUE + "_" + 8,        OwnerCarCenterMq.ORDER_QUEUE + "_" + 9,        OwnerCarCenterMq.ORDER_QUEUE + "_" + 10})public class ServiceOrderConsummer {@Getter    private Queue<ServiceOrder> serviceOrders = new ConcurrentLinkedDeque<>();    @RabbitHandler    public void receiceOrder(byte[] data, Channel channel, Message message) throws IOException {try {//告诉服务器收到这条消息 已经被我消费了 可以在队列删掉;否则消息服务器以为这条消息没处理掉 后续还会在发            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);            ServiceOrder order = unSerialize(data);            this.serviceOrders.add(order);            log.info(String.valueOf(order));        } catch (IOException e) {
            e.printStackTrace();            //丢弃这条消息            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);            log.info("receiver fail");        }
    }/**     * 反序列化     * @param data     * @return     */    private ServiceOrder unSerialize(byte[] data) {
        Input input = null;        try {
            Kryo kryo = new Kryo();            input = new Input(new ByteArrayInputStream(data));            return kryo.readObject(input,ServiceOrder.class);        }finally {
            input.close();        }
    }
}

项目启动后,我们可以看到rabbitmq的情况如下

RabbitMQ用多路由,多队列来破除流控

RabbitMQ用多路由,多队列来破除流控

现我们来对其进行压测,启动Jmeter,我们使用1000线程来进行压测测试。各配置如下

RabbitMQ用多路由,多队列来破除流控

RabbitMQ用多路由,多队列来破除流控

RabbitMQ用多路由,多队列来破除流控

保存文件上传服务器,因为本人是华为云的服务器,故在服务器上进行压测,不进行远程压测

在服务器的jmeter的bin目录下输入

./jmeter -n -t model/rabbit.jmx -l log.jtl

这里-n为不启动图形界面,-t使用我们上传的配置文件,-l记录日志

压测结果如下

RabbitMQ用多路由,多队列来破除流控

我们在压测过程中来看一下rabbitmq的UI界面

RabbitMQ用多路由,多队列来破除流控

RabbitMQ用多路由,多队列来破除流控

消费基本上是实时的,没有出现流控积压现象。

到此,相信大家对“RabbitMQ用多路由,多队列来破除流控”有了更深的了解,不妨来实际操作一番吧!这里是亿速云网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI