本篇内容介绍了“RocketMQTransactionAnnotationProcessor的原理和用法”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
本文主要研究一下RocketMQTransactionAnnotationProcessor
rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/config/RocketMQTransactionAnnotationProcessor.java
public class RocketMQTransactionAnnotationProcessor implements BeanPostProcessor, Ordered, ApplicationContextAware { private final static Logger log = LoggerFactory.getLogger(RocketMQTransactionAnnotationProcessor.class); private ApplicationContext applicationContext; private final Set<Class<?>> nonProcessedClasses = Collections.newSetFromMap(new ConcurrentHashMap<Class<?>, Boolean>(64)); private TransactionHandlerRegistry transactionHandlerRegistry; public RocketMQTransactionAnnotationProcessor(TransactionHandlerRegistry transactionHandlerRegistry) { this.transactionHandlerRegistry = transactionHandlerRegistry; } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } @Override public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { return bean; } @Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { if (!this.nonProcessedClasses.contains(bean.getClass())) { Class<?> targetClass = AopUtils.getTargetClass(bean); RocketMQTransactionListener listener = AnnotationUtils.findAnnotation(targetClass, RocketMQTransactionListener.class); this.nonProcessedClasses.add(bean.getClass()); if (listener == null) { // for quick search log.trace("No @RocketMQTransactionListener annotations found on bean type: {}", bean.getClass()); } else { try { processTransactionListenerAnnotation(listener, bean); } catch (MQClientException e) { log.error("Failed to process annotation " + listener, e); throw new BeanCreationException("Failed to process annotation " + listener, e); } } } return bean; } private void processTransactionListenerAnnotation(RocketMQTransactionListener listener, Object bean) throws MQClientException { if (transactionHandlerRegistry == null) { throw new MQClientException("Bad usage of @RocketMQTransactionListener, " + "the class must work with RocketMQTemplate", null); } if (!RocketMQLocalTransactionListener.class.isAssignableFrom(bean.getClass())) { throw new MQClientException("Bad usage of @RocketMQTransactionListener, " + "the class must implement interface RocketMQLocalTransactionListener", null); } TransactionHandler transactionHandler = new TransactionHandler(); transactionHandler.setBeanFactory(this.applicationContext.getAutowireCapableBeanFactory()); transactionHandler.setName(listener.txProducerGroup()); transactionHandler.setBeanName(bean.getClass().getName()); transactionHandler.setListener((RocketMQLocalTransactionListener) bean); transactionHandler.setCheckExecutor(listener.corePoolSize(), listener.maximumPoolSize(), listener.keepAliveTime(), listener.blockingQueueSize()); RPCHook rpcHook = RocketMQUtil.getRPCHookByAkSk(applicationContext.getEnvironment(), listener.accessKey(), listener.secretKey()); if (Objects.nonNull(rpcHook)) { transactionHandler.setRpcHook(rpcHook); } else { log.debug("Access-key or secret-key not configure in " + listener + "."); } transactionHandlerRegistry.registerTransactionHandler(transactionHandler); } @Override public int getOrder() { return LOWEST_PRECEDENCE; } }
RocketMQTransactionAnnotationProcessor实现了BeanPostProcessor, Ordered, ApplicationContextAware接口
postProcessAfterInitialization方法会查找标记有RocketMQTransactionListener注解的bean,然后执行processTransactionListenerAnnotation方法
processTransactionListenerAnnotation方法会创建transactionHandler,然后执行transactionHandlerRegistry.registerTransactionHandler进行注册
rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/config/TransactionHandler.java
class TransactionHandler { private String name; private String beanName; private RocketMQLocalTransactionListener bean; private BeanFactory beanFactory; private ThreadPoolExecutor checkExecutor; private RPCHook rpcHook; public String getBeanName() { return beanName; } public void setBeanName(String beanName) { this.beanName = beanName; } public String getName() { return name; } public void setName(String name) { this.name = name; } public RPCHook getRpcHook() { return rpcHook; } public void setRpcHook(RPCHook rpcHook) { this.rpcHook = rpcHook; } public BeanFactory getBeanFactory() { return beanFactory; } public void setBeanFactory(BeanFactory beanFactory) { this.beanFactory = beanFactory; } public void setListener(RocketMQLocalTransactionListener listener) { this.bean = listener; } public RocketMQLocalTransactionListener getListener() { return this.bean; } public void setCheckExecutor(int corePoolSize, int maxPoolSize, long keepAliveTime, int blockingQueueSize) { this.checkExecutor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(blockingQueueSize)); } public ThreadPoolExecutor getCheckExecutor() { return checkExecutor; } }
TransactionHandler包含了name、beanName、bean、beanFactory、checkExecutor、rpcHook属性
rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/config/TransactionHandlerRegistry.java
public class TransactionHandlerRegistry implements DisposableBean { private RocketMQTemplate rocketMQTemplate; private final Set<String> listenerContainers = new ConcurrentSet<>(); public TransactionHandlerRegistry(RocketMQTemplate template) { this.rocketMQTemplate = template; } @Override public void destroy() throws Exception { listenerContainers.clear(); } public void registerTransactionHandler(TransactionHandler handler) throws MQClientException { if (listenerContainers.contains(handler.getName())) { throw new MQClientException(-1, String .format("The transaction name [%s] has been defined in TransactionListener [%s]", handler.getName(), handler.getBeanName())); } listenerContainers.add(handler.getName()); rocketMQTemplate.createAndStartTransactionMQProducer(handler.getName(), handler.getListener(), handler.getCheckExecutor(), handler.getRpcHook()); } }
TransactionHandlerRegistry实现了DisposableBean接口,其clear方法直接清空listenerContainers;registerTransactionHandler方法会往listenerContainers添加该handler的name,然后执行rocketMQTemplate.createAndStartTransactionMQProducer来创建并启动TransactionMQProducer
RocketMQTransactionAnnotationProcessor实现了BeanPostProcessor, Ordered, ApplicationContextAware接口
postProcessAfterInitialization方法会查找标记有RocketMQTransactionListener注解的bean,然后执行processTransactionListenerAnnotation方法
processTransactionListenerAnnotation方法会创建transactionHandler,然后执行transactionHandlerRegistry.registerTransactionHandler进行注册
“RocketMQTransactionAnnotationProcessor的原理和用法”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注亿速云网站,小编将为大家输出更多高质量的实用文章!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。