温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

spring kafka中怎么批量给topic加前缀

发布时间:2021-07-26 11:30:12 来源:亿速云 阅读:251 作者:Leah 栏目:编程语言

spring kafka中怎么批量给topic加前缀,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。

实现思路

1、生产者端

可以通过生产者拦截器,来给topic加前缀

2、实现步骤

a、编写一个生产者拦截器

@Slf4j
public class KafkaProducerInterceptor implements ProducerInterceptor<String, MessageDTO> {



    /**
     * 运行在用户主线程中,在消息被序列化之前调用
     * @param record
     * @return
     */
    @Override
    public ProducerRecord<String, MessageDTO> onSend(ProducerRecord<String, MessageDTO> record) {
        log.info("原始topic:{}",record.topic());
        return new ProducerRecord<String, MessageDTO>(TOPIC_KEY_PREFIX + record.topic(),
                record.partition(),record.timestamp(),record.key(), record.value());
    }




    /**
     * 在消息被应答之前或者消息发送失败时调用,通常在producer回调逻辑触发之前,运行在produer的io线程中
     * @param metadata
     * @param exception
     */
    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
      log.info("实际topic:{}",metadata.topic());
    }


    /**
     *  清理工作
     */
    @Override
    public void close() {
    }


    /**
     * 初始化工作
     * @param configs
     */
    @Override
    public void configure(Map<String, ?> configs) {

    }

b、配置拦截器

kafka:
    producer:
      # 生产者拦截器配置
      properties:
        interceptor.classes: com.github.lybgeek.kafka.producer.interceptor.KafkaProducerInterceptor

c、测试

spring kafka中怎么批量给topic加前缀

2、消费者端

这个就稍微有点难搞了,因为业务开发部门他们是直接用@KafkaListener的注解,形如下

 @KafkaListener(id = "msgId",topics = {Constant.TOPIC})

像这种也没啥好的办法,就只能通过源码了,通过源码可以发现在如下地方

KafkaListenerAnnotationBeanPostProcessor#postProcessAfterInitialization

会把@KafkaListener的值赋值给消费者,如果对spring有了解的朋友,可能会知道postProcessAfterInitialization是spring后置处理器的方法,主要用来bean初始化后的一些操作,既然我们知道@KafkaListener会在bean初始化后再进行赋值,那我们就可以在bean初始化前,修改掉@KafkaListener的值。具体实现如下

@Component
public class KafkaListenerFactoryBeanPostProcesser implements BeanFactoryPostProcessor {

    @SneakyThrows
    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {

        List<String> packageNames = AutoConfigurationPackages.get(beanFactory);

        for (String packageName : packageNames) {
            Reflections reflections = new Reflections(new ConfigurationBuilder()
                    .forPackages(packageName) // 指定路径URL
                    .addScanners(new SubTypesScanner()) // 添加子类扫描工具
                    .addScanners(new FieldAnnotationsScanner()) // 添加 属性注解扫描工具
                    .addScanners(new MethodAnnotationsScanner() ) // 添加 方法注解扫描工具
                    .addScanners(new MethodParameterScanner() ) // 添加方法参数扫描工具
            );

            Set<Method> methodSet = reflections.getMethodsAnnotatedWith(KafkaListener.class);
            if(!CollectionUtils.isEmpty(methodSet)){
                for (Method method : methodSet) {
                    KafkaListener kafkaListener = method.getAnnotation(KafkaListener.class);
                    changeTopics(kafkaListener);
                }
            }
        }

    }


    private void changeTopics(KafkaListener kafkaListener) throws Exception{
        InvocationHandler invocationHandler = Proxy.getInvocationHandler(kafkaListener);
        Field memberValuesField = invocationHandler.getClass().getDeclaredField("memberValues");
        memberValuesField.setAccessible(true);
        Map<String,Object> memberValues = (Map<String,Object>)memberValuesField.get(invocationHandler);
        String[] topics = (String[])memberValues.get("topics");
        System.out.println("修改前topics:" + Lists.newArrayList(topics));
        for (int i = 0; i < topics.length; i++) {
            topics[i] = Constant.TOPIC_KEY_PREFIX + topics[i];
        }
        memberValues.put("topics", topics);
        System.out.println("修改后topics:" + Lists.newArrayList(kafkaListener.topics()));

    }
}

测试

spring kafka中怎么批量给topic加前缀 spring kafka中怎么批量给topic加前缀

看完上述内容是否对您有帮助呢?如果还想对相关知识有进一步的了解或阅读更多相关文章,请关注亿速云行业资讯频道,感谢您对亿速云的支持。

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI