温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

rabbitmq template实现发送消息

发布时间:2020-06-18 14:22:10 来源:亿速云 阅读:718 作者:元一 栏目:编程语言

前言

rabbitmq template:消息模板。这是spring整合rabbit提供的消息模板。是进行发送消息的关键类。该类提供了丰富的发送方法,包括可靠性投递消息方法、回调监听消息接口ConfirmCallback、返回值确认接口ReturnCallBack等等。同样我们需要注入到spring容器中,然后就可以想其他bean那样正常使用了。

之前的配置是这样的:

@Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);

        template.setMessageConverter(new Jackson2JsonMessageConverter());
        template.setMandatory(true);

        ...
        return template;
    }

要发送出去的消息vo是这样的:

@Data
public class TestVO {
    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
    private Date testDate;
}

然后,出现的问题就是,消息体里,时间比当前时间少了8个小时。

{"testDate":"2019-12-27 05:45:26"}

原因

我们是这么使用rabbitmq template的:

@Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private RedisRepository redisRepository;

    /**
     * 发送消息
     * @param exchange 交换机名称
     * @param routingKey 路由键
     * @param msgMbject 消息体,无需序列化,会自动序列化为json
     */
    public void send(String exchange, String routingKey, final Object msgMbject) {
        CorrelationData correlationData = new CorrelationData(GUID.generate());
        CachedMqMessageForConfirm cachedMqMessageForConfirm = new CachedMqMessageForConfirm(exchange, routingKey, msgMbject);
        redisRepository.saveCacheMessageForConfirms(correlationData,cachedMqMessageForConfirm);
        //核心代码:这里,发送出去的msgObject其实就是一个vo或者dto,rabbitmqTemplate会自动帮我们转为json
        rabbitTemplate.convertAndSend(exchange,routingKey,msgMbject,correlationData);
    }

注释里我解释了,rabbitmq会自动做转换,转换用的就是jackson。

跟进源码也能一探究竟:

org.springframework.amqp.rabbit.core.RabbitTemplate#convertAndSend

    @Override
    public void convertAndSend(String exchange, String routingKey, final Object object,
            @Nullable CorrelationData correlationData) throws AmqpException {
        // 这里调用了convertMessageIfNecessary(object)
        send(exchange, routingKey, convertMessageIfNecessary(object), correlationData);
    }
调用了convertMessageIfNessary:

protected Message convertMessageIfNecessary(final Object object) {
        if (object instanceof Message) {
            return (Message) object;
        }
        // 获取消息转换器
        return getRequiredMessageConverter().toMessage(object, new MessageProperties());
    }

获取消息转换器的代码如下:

private MessageConverter getRequiredMessageConverter() throws IllegalStateException {
        MessageConverter converter = getMessageConverter();
        if (converter == null) {
            throw new AmqpIllegalStateException(
                    "No 'messageConverter' specified. Check configuration of RabbitTemplate.");
        }
        return converter;
    }

getMessageConverter就是获取rabbitmqTemplate 类中的一个field。

public MessageConverter getMessageConverter() {
        return this.messageConverter;
    }
我们只要看哪里对它进行赋值即可。

然后我想起来,就是在我们业务代码里赋值的:

@Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        // 下面这里赋值了。。。差点搞忘了
        template.setMessageConverter(new Jackson2JsonMessageConverter());
        template.setMandatory(true);

        return template;
    }

反正呢,总体来说,就是rabbitmqTemplate 会使用我们自定义的messageConverter转换message后再发送。

时区问题,很好重现,源码在:

https://gitee.com/ckl111/all-simple-demo-in-work/tree/master/jackson-demo

@Data
public class TestVO {
    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
    private Date testDate;
}

测试代码:

@org.junit.Test
    public void normal() throws JsonProcessingException {
        ObjectMapper mapper = new ObjectMapper();
        TestVO vo = new TestVO();
        vo.setTestDate(new Date());
        String value = mapper.writeValueAsString(vo);
        System.out.println(value);
    }

输出:

{"testDate":"2019-12-27 05:45:26"}

解决办法

指定默认时区配置

@org.junit.Test
    public void specifyDefaultTimezone() throws JsonProcessingException {
        ObjectMapper mapper = new ObjectMapper();
        SerializationConfig oldSerializationConfig = mapper.getSerializationConfig();
        /**
         * 新的序列化配置,要配置时区
         */
        String timeZone = "GMT+8";
        SerializationConfig newSerializationConfig = oldSerializationConfig.with(TimeZone.getTimeZone(timeZone));

        mapper.setConfig(newSerializationConfig);
        TestVO vo = new TestVO();
        vo.setTestDate(new Date());
        String value = mapper.writeValueAsString(vo);
        System.out.println(value);
    }

在field上加注解

@Data
public class TestVoWithTimeZone {
    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
    private Date testDate;
}
我们这里,新增了timezone,手动指定了时区配置。

测试代码:

@org.junit.Test
    public void specifyTimezoneOnField() throws JsonProcessingException {
        ObjectMapper mapper = new ObjectMapper();
        TestVoWithTimeZone vo = new TestVoWithTimeZone();
        vo.setTestDate(new Date());
        String value = mapper.writeValueAsString(vo);
        System.out.println(value);
    }

上面两种的输出都是正确的。

这里没有去分析源码,简单说一下,在序列化的时候,会有一个序列化配置;这个配置由两部分组成:默认配置+这个类自定义的配置。 自定义配置会覆盖默认配置。

我们的第二种方式,就是修改了默认配置;第三种方式,就是使用自定义配置覆盖默认配置。

jackson 还挺重要,尤其是 spring cloud 全家桶, feign 也用了这个, restTemplate 也用了,还有 Spring MVC 里的 httpmessageConverter 有兴趣的同学,去看下面这个地方就可以了。
rabbitmq template实现发送消息

rabbitmq template实现发送消息

如果对JsonFormat的处理感兴趣,可以看下面的地方:

com.fasterxml.jackson.annotation.JsonFormat.Value#Value(com.fasterxml.jackson.annotation.JsonFormat) (打个断点在这里,然后跑个test就到这里了)
rabbitmq template实现发送消息

rabbitmq template实现发送消息

总结
差点忘了,针对rabbitmq template的问题,最终我们的解决方案就是:

@Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);

        ObjectMapper mapper = new ObjectMapper();
        SerializationConfig oldSerializationConfig = mapper.getSerializationConfig();
        /**
         * 新的序列化配置,要配置时区
         */
        String timeZone = environment.getProperty(CadModuleConstants.SPRING_JACKSON_TIME_ZONE);
        SerializationConfig newSerializationConfig = oldSerializationConfig.with(TimeZone.getTimeZone(timeZone));

        mapper.setConfig(newSerializationConfig);

        Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter(mapper);

        template.setMessageConverter(messageConverter);
        template.setMandatory(true);

        ...设置callback啥的
        return template;
    }
向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI