这篇文章主要讲解了“redis消息队列的实现方法”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“redis消息队列的实现方法”吧!
方式一:通过list的阻塞读取命令,blpop或者brpop
消费者
public class Consumer extends DemoApplicationTests{ @Test public void consume(){ int timeout = 0;//永不超时 String key = "test_que"; //list集合 第一个元素为key值,第二个元素为弹出的元素值;当超时返回[null] while(true){ List<Object> obj = redisTemplate.executePipelined(new RedisCallback<Object>() { @Override public Object doInRedis(RedisConnection connection) throws DataAccessException { //队列没有元素会阻塞操作,直到队列获取新的元素或超时 return connection.bLPop(timeout,key.getBytes()); } },new StringRedisSerializer()); for(Object o:obj){ System.out.println("---------------"+o); } } } }
生产者
public class Productor extends DemoApplicationTests { @Test public void generateMsg() { String key = "test_que"; redisTemplate.opsForList().leftPush(key,"hht2"); } }
方式二:Pub/Sub(发布/订阅)使用的 spring boot
依赖包
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> </dependency> </dependencies>
配置类
@Configuration @AutoConfigureAfter(RedisAutoConfiguration.class) @EnableCaching public class RedisConfig extends CachingConfigurerSupport { /** * 配置自定义redisTemplate * @return */ @Bean RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) { RedisTemplate<String, Object> template = new RedisTemplate<>(); template.setConnectionFactory(redisConnectionFactory); //使用Jackson2JsonRedisSerializer来序列化和反序列化redis的value值 Jackson2JsonRedisSerializer serializer = new Jackson2JsonRedisSerializer(Object.class); ObjectMapper mapper = new ObjectMapper(); mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); serializer.setObjectMapper(mapper); template.setValueSerializer(serializer); //使用StringRedisSerializer来序列化和反序列化redis的key值 template.setKeySerializer(new StringRedisSerializer()); template.setHashKeySerializer(new StringRedisSerializer()); template.setHashValueSerializer(serializer); template.afterPropertiesSet(); return template; } /** * 序列化定制 * * @return */ @Bean public Jackson2JsonRedisSerializer<Object> jackson2JsonSerializer() { Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>( Object.class); // 初始化objectmapper ObjectMapper mapper = new ObjectMapper(); mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL); mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); jackson2JsonRedisSerializer.setObjectMapper(mapper); return jackson2JsonRedisSerializer; } /** * 消息监听器,使用MessageAdapter可实现自动化解码及方法代理 * * @return */ @Bean public MessageListenerAdapter listener(Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer, MessageSubscriber subscriber) { MessageListenerAdapter adapter = new MessageListenerAdapter(subscriber, "onMessage"); adapter.setSerializer(jackson2JsonRedisSerializer); adapter.afterPropertiesSet(); return adapter; } /** * 将订阅器绑定到容器 * * @param connectionFactory * @param listenerAdapter * @return */ @Bean public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listener) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.addMessageListener(listener, new PatternTopic("/redis/*")); return container; } }
模拟消息发布类
@Service public class RedisPubSub { private static final Logger logger = LoggerFactory.getLogger(RedisPubSub.class); @Autowired private RedisTemplate<String, Object> redisTemplate; private ChannelTopic topic = new ChannelTopic("/redis/pubsub"); @Scheduled(initialDelay = 5000, fixedDelay = 10000) private void schedule() { logger.info("publish message"); publish("admin", "hey you must go now!"); } /** * 推送消息 * * @param publisher * @param message */ public void publish(String publisher, String content) { logger.info("message send {} by {}", content, publisher); redisTemplate.convertAndSend(topic.getTopic(), content); } }
模拟消息接收类
@Component public class MessageSubscriber { Logger logger = LoggerFactory.getLogger(MessageSubscriber.class); public void onMessage(String message, String pattern) { logger.info("topic {} received {} ", pattern, message); } }
启动类
@SpringBootApplication @EnableScheduling public class DemoApplication { public static void main(String[] args) { SpringApplication.run(DemoApplication.class, args); } }
感谢各位的阅读,以上就是“redis消息队列的实现方法”的内容了,经过本文的学习后,相信大家对redis消息队列的实现方法这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是亿速云,小编将为大家推送更多相关知识点的文章,欢迎关注!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。