Redisson中怎么实现一个延时消息组件,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。
定义主题队列注解
@Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Documented @Component public @interface RMessage { /** * 消息队列 * @return */ String queue(); /** * 主题 * @return */ String topic() default "system"; }
springboot启动监听初始化任务队列与消息主题,消费者订阅主题
@Slf4j @Component public class RMessageListener implements ApplicationListener<ApplicationStartedEvent> { /** * consumer monitoringMethod monitorMessage */ private final static String METHOD_MONITOR_MESSAGE = "monitorMessage"; /** * redisson topic name */ private final static String ATTRIBUTE_NAME_TOPIC = "topic"; /** * redisson messageQueue name */ private final static String ATTRIBUTE_NAME_QUEUE = "queue"; /** * redisson queue map */ public static Map<String, RBlockingDeque<? super Serializable>> messageQueue = new ConcurrentHashMap<>(); /** * redisson offQueue map */ public static Map<String, RDelayedQueue<? super Serializable>> offQueue = new ConcurrentHashMap<>(); /** * redisson topic map */ public static Map<String, RTopic> topicMap = new ConcurrentHashMap<>(); @Override public void onApplicationEvent(ApplicationStartedEvent applicationStartedEvent) { ClassPathScanningCandidateComponentProvider provider = new ClassPathScanningCandidateComponentProvider(false); provider.addIncludeFilter(new AnnotationTypeFilter(RMessage.class)); String basePackage = applicationStartedEvent.getSpringApplication().getMainApplicationClass().getPackage().getName(); Set<BeanDefinition> beanDefinitions = provider.findCandidateComponents(basePackage); ConfigurableListableBeanFactory beanFactory = applicationStartedEvent.getApplicationContext().getBeanFactory(); mqInit(beanDefinitions, beanFactory); provider.clearCache(); provider.resetFilters(false); provider.addIncludeFilter(new AssignableTypeFilter(RMessageConsumer.class)); Set<BeanDefinition> consumers = provider.findCandidateComponents(basePackage); consumerSubscribe(beanFactory, consumers); } /** * consumer subscription news * * @param beanFactory * @param consumers */ private void consumerSubscribe(ConfigurableListableBeanFactory beanFactory, Set<BeanDefinition> consumers) { consumers.forEach(beanDefinition -> { log.info("rMessage init consumer {}",beanDefinition.getBeanClassName()); try { Object bean = beanFactory.getBean(Class.forName(beanDefinition.getBeanClassName())); Method method = bean.getClass().getMethod(METHOD_MONITOR_MESSAGE); ReflectionUtils.invokeMethod(method,bean); } catch (ClassNotFoundException | NoSuchMethodException e) { e.printStackTrace(); } }); } /** * Parameter initialization * * @param beanDefinitions * @param beanFactory */ private void mqInit(Set<BeanDefinition> beanDefinitions,final ConfigurableListableBeanFactory beanFactory) { RedissonClient redissonClient = beanFactory.getBean(RedissonClient.class); beanDefinitions.stream().filter(beanDefinition -> beanDefinition instanceof AnnotatedBeanDefinition).forEach(beanDefinition->{ AnnotatedBeanDefinition annotatedBeanDefinition = (AnnotatedBeanDefinition)beanDefinition; AnnotationMetadata annotationMetadata = annotatedBeanDefinition.getMetadata(); MergedAnnotation<RMessage> mergedAnnotation = annotationMetadata.getAnnotations().get(RMessage.class); String queryName = mergedAnnotation.getString(ATTRIBUTE_NAME_QUEUE); String topicName = mergedAnnotation.getString(ATTRIBUTE_NAME_TOPIC); String shortName = topicName+"."+queryName; RBlockingDeque<? super Serializable> blockingDeque = redissonClient.getBlockingDeque(shortName); messageQueue.put(shortName,blockingDeque); RDelayedQueue<? super Serializable> delayedQueue = redissonClient.getDelayedQueue(blockingDeque); offQueue.put(shortName,delayedQueue); RTopic topic = redissonClient.getTopic(topicName); topicMap.put(shortName,topic); }); } }
抽象队列主题列表
public abstract class AbstractQueue { Map<String, RDelayedQueue<? super Serializable>> offQueue = RMessageListener.offQueue; Map<String, RBlockingDeque<? super Serializable>> messageQueue = RMessageListener.messageQueue; Map<String, RTopic> topicMap = RMessageListener.topicMap; protected RDelayedQueue<? super Serializable> getRDelayedQueue() { return offQueue.get(shortName()); } protected RBlockingDeque<? super Serializable> getMessageQueue() { return messageQueue.get(shortName()); } private String shortName() { Annotation[] annotations = this.getClass().getAnnotations(); RMessage rMessage = Arrays.stream(annotations).filter(annotation -> annotation instanceof RMessage) .map(annotation -> (RMessage)annotation).findAny().get(); String queryName = rMessage.queue(); String topicName = rMessage.topic(); return topicName+"."+queryName; } protected RTopic getTopic() { return topicMap.get(shortName()); } }
抽象生产者模板
@Slf4j public abstract class RMessageProducer<T extends Serializable> extends AbstractQueue { /** * 发送延时消息 * @param message * @param delay * @param timeUnit */ public void sendMessage(T message, long delay, TimeUnit timeUnit) { log.info("rMessage sendMessage: {}, delayTime {}",message.toString(),delay+timeUnit.name()); super.getRDelayedQueue().offer(message,delay,timeUnit); super.getTopic().publish(this.hashCode()); } /** * 发送异步消息 * @param message */ public void sendMessage(T message) { this.sendMessage(message,0,TimeUnit.MILLISECONDS); } }
抽象消费者模板
@Slf4j public abstract class RMessageConsumer<T extends Serializable> extends AbstractQueue { public void monitorMessage() { CompletableFuture.runAsync(this::pastConsumption); super.getTopic().addListener(Object.class,(c,m)-> { try { Object take = super.getMessageQueue().take(); log.info("rMessage receiveMessage: {}, receiving time {}",take.toString(), LocalDateTime.now()); this.useMessage((T)take); } catch (InterruptedException e) { e.printStackTrace(); } }); } protected abstract void useMessage(T message); public void pastConsumption() { while (super.getRDelayedQueue().size() > 0 || super.getMessageQueue().size() > 0) { try { Object take = super.getMessageQueue().take(); log.info("rMessage receiveMessage: {}, receiving time {}",take.toString(), LocalDateTime.now()); this.useMessage((T)take); } catch (InterruptedException e) { e.printStackTrace(); } } } }
生产者
@RMessage(queue = "redisQuery",topic = "order") public class RedissonProducer extends RMessageProducer<HashMap> { } @RestController @RequestMapping("producer") @AllArgsConstructor public class ProducerController { private RedissonProducer redissonProducer; @PostMapping public String send() { HashMap<String,Object> map = new HashMap<>(); map.put("name","张三"); map.put("time", "测试顺序第二条"+LocalDateTime.now()); redissonProducer.sendMessage(map,5, TimeUnit.MINUTES); return "send msg"; } }
消费者
@RMessage(queue = "redisQuery",topic = "order") public class RedissonConsumer extends RMessageConsumer<HashMap> { @Override protected void useMessage(HashMap message) { System.out.println("接收到消息:"+message); } }
关于 Redisson中怎么实现一个延时消息组件问题的解答就分享到这里了,希望以上内容可以对大家有一定的帮助,如果你还有很多疑惑没有解开,可以关注亿速云行业资讯频道了解更多相关知识。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。