本篇内容主要讲解“redis stream怎么实现消息队列”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“redis stream怎么实现消息队列”吧!
Redis5.0带来了Stream类型。从字面上看是流类型,但其实从功能上看,应该是Redis对消息队列(MQ,Message Queue)的完善实现。
基于redis实现消息队列的方式有很多:
PUB/SUB,订阅/发布模式
基于List的 LPUSH+BRPOP 的实现
发布订阅优点: 典型的一对的,所有消费者都能同时消费到消息。主动通知订阅者而不是订阅者轮询去读。
发布订阅缺点: 不支持多个消费者公平消费消息,消息没有持久化,不管订阅者是否收到消息,消息都会丢失。
使用场景:微服务间的消息同步,如 分布式webSocker,数据同步等。
生产者通过lpush生成消息,消费者通过blpop阻塞读取消息。
**list队列优点:**支持多个消费者公平消费消息,对消息进行存储,可以通过lrange查询队列内的消息。
**list队列缺点:**blpop仍然会阻塞当前连接,导致连接不可用。一旦blpop成功消息就丢弃了,期间如果服务器宕机消息会丢失,不支持一对多消费者。
生产者通过zadd 创建消息时指定分数,可以确定消息的顺序,消费者通过zrange获取消息后进行消费,消费完后通zrem删除消息。
zset优点: 保证了消息的顺序,消费者消费失败后重新入队不会打乱消费顺序。
zset缺点: 不支持一对多消费,多个消费者消费时可能出现读取同一条消息的情况,得通过加锁或其他方式解决消费的幂等性。
zset使用场景:由于数据是有序的,常常被用于延迟队列,如 redisson的DelayQueue
Redis5.0带来了Stream类型。从字面上看是流类型,但其实从功能上看,应该是Redis对消息队列(MQ,Message Queue)的完善实现。
参考kafka的思想,通过多个消费者组和消费者支持一对多消费,公平消费,消费者内维护了pending列表防止消息丢失。
提供消息ack机制。
往 stream 内创建消息 语法为:
XADD key ID field string [field string …]
# * 表示自动生成id redis会根据时间戳+序列号自动生成id,不建议我们自己指定id xadd stream1 * name zs age 23
读取stream内的消息,这个并不是消费,只是提供了查看数据的功能,语法为:
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key …] ID [ID …]
#表示从 stream1 内取出一条消息,从第0条消息读取(0表示最小的id) xread count 1 streams stream1 0 #表示从 stream1 内 id=1649143363972-0 开始读取一条消息,读取的是指定id的下一条消息 xread count 1 streams msg 1649143363972-0 #表示一直阻塞读取最新的消息($表示获取下一个生成的消息) xread count 1 block 0 streams stream1 $ xrange stream - + 10
XRANGE key startID endID count
#表示从stream1内取10条消息 起始位置为 -(最小ID) 结束位置为+(最大ID) xrange stream1 - + 10
redis stream 借鉴了kafka的设计,采用了消费者和消费者组的概念。允许多个消费者组消费stream的消息,每个消费者组都能收到完整的消息,例如:stream内有10条消息,消费者组A和消费者组B同时消费时,都能获取到这10条消息。
每个消费者组内可以有多个消费者消费,消息会平均分摊给各个消费者,例如:stream有10条消息,消费者A,B,C同时在同一个组内消费,A接收到 1,4,7,10,B接收到 2,5,8,C接收到 3,6,9
创建消费者组:
#消费消息首先得创建消费者组 # 表示为队列 stream1 创建一个消费者组 group1 从消息id=0(第一条消息)开始读取消息 xgroup create stream1 group1 0 #查询stream1内的所有消费者组信息 xinfo groups stream1
通过xreadgroup可以在消费者组内创建消费者消费消息
XREADGROUP group groupName consumerName [COUNT count] [BLOCK milliseconds] STREAMS key [key …] ID [ID …]
#创建消费者读取消息 #在group1消费者组内通过consumer1消费stream1内的消息,消费1条未分配的消息 (> 表示未分配过消费者的消息) xreadgrup group group1 consumer1 count 1 streams stream1 >
通过 xreadgroup 读取消息时消息会分配给对应的消费者,每个消费者内都维护了一个Pending列表用于保存接收到的消息,当消息ack后会从pending列表内移除,也就是说pending列表内维护的是所有未ack的消息id
每个Pending的消息有4个属性:
消息ID
所属消费者
IDLE,已读取时长
delivery counter,消息被读取次数
XPENDING key group [start end count] [consumer]
#查看pending列表 # 查看group1组内的consumer1的pending列表 - 表示最小的消息id + 表示最大的消息ID xpending stream1 group1 - + 10 consumer1 # 查看group1组内的所有消费者pending类表 xpending stream1 group1 - + 10
当消费者消费了消息,需要通过 xack
命令确认消息,xack后的消息会从pending列表移除
XACK key gruopName ID
xack stream1 group1 xxx
当消费者接收到消息却不能正确消费时(报错或其他原因),可以使用 XCLAIM
将消息转移给其他消费者消费,需要设置组、转移的目标消费者和消息ID,同时需要提供IDLE(已被读取时长),只有超过这个时长,才能被转移。
通过xclaim转移的消息只是将消息移入另一个消费者的pending列表,消费者并不能通过xreadgroup读取到消息,只能通过xpending读取到。
# 表示将ID为 1553585533795-1 的消息转移到消费者B消费,前提是消费 XCLAIM stream1 group1 consumer1 3600000 1553585533795-1
redis提供了xinfo来查看stream的信息
#查看sream信息 xinfo stream steam1 #查询消费者组信息 xinfo groups group1 #查询消费者信息 xinfo consumers consumer1
1 引入依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency>
2 编写消费者
@Slf4j @Component public class EmailConsumer implements StreamListener<String, MapRecord<String,String,String>> { public final String streamName = "emailStream"; public final String groupName = "emailGroup"; public final String consumerName = "emailConsumer"; @Autowired private StringRedisTemplate stringRedisTemplate; @Override public void onMessage(MapRecord<String, String, String> message) { //log.info("stream名称-->{}",message.getStream()); //log.info("消息ID-->{}",message.getId()); log.info("消息内容-->{}",message.getValue()); Map<String, String> msgMap = message.getValue(); if( msgMap.get("sID")!=null && Integer.valueOf(msgMap.get("sID")) % 3 ==0 ){ //消费异常导致未能ack时,消息会进入pending列表,我们可以启动定时任务来读取pending列表处理失败的任务 log.info("消费异常-->"+message); return; } StreamOperations<String, String, String> streamOperations = stringRedisTemplate.opsForStream(); //消息应答 streamOperations.acknowledge( streamName,groupName,message.getId() ); } //我们可以启动定时任务不断监听pending列表,处理死信消息 }
3 配置redis
序列化配置
@EnableCaching @Configuration public class RedisConfig { /** * 设置redis序列化规则 */ @Bean public Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer(){ Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class); ObjectMapper om = new ObjectMapper(); om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); jackson2JsonRedisSerializer.setObjectMapper(om); return jackson2JsonRedisSerializer; } /** * RedisTemplate配置 */ @Bean public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory, Jackson2JsonRedisSerializer jackson2JsonRedisSerializer) { // 配置redisTemplate RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>(); redisTemplate.setConnectionFactory(redisConnectionFactory); RedisSerializer<?> stringSerializer = new StringRedisSerializer(); // key序列化 redisTemplate.setKeySerializer(stringSerializer); // value序列化 redisTemplate.setValueSerializer(jackson2JsonRedisSerializer); // Hash key序列化 redisTemplate.setHashKeySerializer(stringSerializer); // Hash value序列化 redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer); redisTemplate.afterPropertiesSet(); return redisTemplate; } }
消费者组和消费者配置
@Slf4j @Configuration public class RedisStreamConfig { @Autowired private EmailConsumer emailConsumer; @Autowired private RedisTemplate<String,Object> redisTemplate; @Bean public StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String,String,String>> emailListenerContainerOptions(){ StringRedisSerializer stringRedisSerializer = new StringRedisSerializer(); return StreamMessageListenerContainer.StreamMessageListenerContainerOptions .builder() //block读取超时时间 .pollTimeout(Duration.ofSeconds(3)) //count 数量(一次只获取一条消息) .batchSize(1) //序列化规则 .serializer( stringRedisSerializer ) .build(); } /** * 开启监听器接收消息 */ @Bean public StreamMessageListenerContainer<String,MapRecord<String,String,String>> emailListenerContainer(RedisConnectionFactory factory, StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String,String,String>> streamMessageListenerContainerOptions){ StreamMessageListenerContainer<String,MapRecord<String,String,String>> listenerContainer = StreamMessageListenerContainer.create(factory, streamMessageListenerContainerOptions); //如果 流不存在 创建 stream 流 if( !redisTemplate.hasKey(emailConsumer.streamName)){ redisTemplate.opsForStream().add(emailConsumer.streamName, Collections.singletonMap("", "")); log.info("初始化stream {} success",emailConsumer.streamName); } //创建消费者组 try { redisTemplate.opsForStream().createGroup(emailConsumer.streamName,emailConsumer.groupName); } catch (Exception e) { log.info("消费者组 {} 已存在",emailConsumer.groupName); } //注册消费者 消费者名称,从哪条消息开始消费,消费者类 // > 表示没消费过的消息 // $ 表示最新的消息 listenerContainer.receive( Consumer.from(emailConsumer.groupName, emailConsumer.consumerName), StreamOffset.create(emailConsumer.streamName, ReadOffset.lastConsumed()), emailConsumer ); listenerContainer.start(); return listenerContainer; } }
4.生产者生产消息
@GetMapping("/redis/ps") public String redisPublish(String content,Integer count){ StreamOperations streamOperations = redisTemplate.opsForStream(); for (int i = 0; i < count; i++) { AtomicInteger num = new AtomicInteger(i); Map msgMap = new HashMap(); msgMap.put("count", i); msgMap.put("sID", num); //新增消息 streamOperations.add("emailStream",msgMap); } return "success"; }
到此,相信大家对“redis stream怎么实现消息队列”有了更深的了解,不妨来实际操作一番吧!这里是亿速云网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。