c语言编程笔录

首页 >   > 笔记大全

笔记大全

spring Kafka中的KafkaListener源码分析

更新时间:2023-11-23

1. @KafkaListener的作用

@KafkaListener主要用于监听kafka中的topic,并通过指定的方法对从kafka中获取到的数据进行处理。下面是一个简单的@KafkaListener的用法示例:

@Component
public class MyListener {

    @KafkaListener(topics = "test_topic")
    public void listen(String message) {
        System.out.println("received message: " + message);
    }
}

该示例中,@KafkaListener注解标注的方法会在监听的test_topic中有数据时被调用,并将获取到的数据作为参数传入方法中,然后进行处理。

2. @KafkaListener源码分析

我们来看一下@KafkaListener注解的具体实现。在spring-kafka的源码中,@KafkaListener被定义在org.springframework.kafka.annotation包下。其核心类是KafkaListenerAnnotationBeanPostProcessor,它是一个BeanPostProcessor,用于解析并处理@KafkaListener注解。

public class KafkaListenerAnnotationBeanPostProcessor extends AbstractKafkaListenerContainerFactoryAware {

    private final KafkaListenerConfigUtils utils = new KafkaListenerConfigUtils();

    private MessageHandlerMethodFactory messageHandlerMethodFactory;

    private KafkaListenerEndpointRegistry endpointRegistry;

    private BeanFactory beanFactory;

    private ResourceLoader resourceLoader;

    private KafkaListenerEndpointRegistrar registrar;

    // 处理@KafkaListener注解
    @Override
    public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) {
        String[] beanNames = registry.getBeanDefinitionNames();
        for (String beanName : beanNames) {
            BeanDefinition beanDefinition = registry.getBeanDefinition(beanName);
            String beanClassName = beanDefinition.getBeanClassName();
            if (beanClassName != null) {
                Class<?> clazz = ClassUtils.resolveClassName(beanClassName, ClassUtils.getDefaultClassLoader());
                Map> annotatedMethods = MethodIntrospector.selectMethods(clazz,
                        (MethodIntrospector.MetadataLookup>) method -> {
                            Set listenerMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations(
                                    method, KafkaListener.class, KafkaListeners.class);
                            return (!listenerMethods.isEmpty() ? listenerMethods : null);
                        });
                if (!annotatedMethods.isEmpty()) {
                    // 解析@KafkaListener注解,为每一个注解创建一个endpoint,并将endpoint注册到endpointRegistry
                    for (Map.Entry> entry : annotatedMethods.entrySet()) {
                        Method method = entry.getKey();
                        Set listeners = entry.getValue();
                        for (KafkaListener kafkaListener : listeners) {
                            processKafkaListener(kafkaListener, method, beanName, registry);
                        }
                    }
                }
            }
        }
    }
    ...
}

在该类的postProcessBeanDefinitionRegistry方法中,主要通过反射扫描所有类中的方法,查找哪些方法使用了@KafkaListener注解,然后为每一个使用注解的方法创建一个对应的Kafka监听器,即KafkaMessageListenerContainer。同时,还需要解析@KafkaListener注解中的其他配置信息,例如:

@KafkaListener(id = "myListener", topics = "${kafka.topic.test}", containerFactory = "kafkaListenerContainerFactory")
public void handle(String message) {
    System.out.println("received message: " + message);
}

上述代码中,除了topics属性外,还指定了containerFactory属性,用于关联kafkaListenerContainerFactory实例。在KafkaListenerAnnotationBeanPostProcessor的processKafkaListener方法中,我们就可以看到它的实现:

private void processKafkaListener(KafkaListener kafkaListener, Method method, String beanName,
            BeanDefinitionRegistry registry) {
        KafkaListenerEndpoint endpoint = new KafkaListenerEndpoint();
        // 设置监听器ID
        endpoint.setId(resolveExpressionAsString(kafkaListener.id(), "id"));
        // 设置监听的topics
        endpoint.setTopics(resolveExpressionAsList(kafkaListener.topics()));
        // 设置监听的topics的partition信息,用于支持分区的消费
        endpoint.setTopicPartitions(resolveExpressionAsTopicPartitionList(kafkaListener.topicPartitions()));
        // 设置监听器的ContainerFactory
        endpoint.setContainerFactoryBeanName(resolveExpressionAsString(kafkaListener.containerFactory(), "containerFactory"));
        // 注册目标类和目标方法
        endpoint.setBean(beanName);
        endpoint.setMethod(method);
        endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
        String errorHandler = resolveExpressionAsString(kafkaListener.errorHandler(), "errorHandler");
        if (StringUtils.hasText(errorHandler)) {
            endpoint.setErrorHandler(resolveExpressionAsClass(errorHandler, KafkaListenerErrorHandler.class));
        }
        String groupId = resolveExpressionAsString(kafkaListener.groupId(), "groupId");
        if (StringUtils.hasText(groupId)) {
            endpoint.setGroupId(groupId);
        }

        KafkaListenerConfigurer configurer =
                this.beanFactory != null ? this.beanFactory.getBean(KafkaListenerConfigurer.class) : null;
        if (configurer != null) {
            configurer.configureKafkaListeners(new KafkaListenerEndpointRegistrar.DefaultKafkaListenerEndpointRegistry(endpoint));
        }
        KafkaListenerEndpointRegistrar registrar = getEndpointRegistrar();
        // 注册监听器
        registrar.registerEndpoint(endpoint, this.beanFactory);
    }

通过以上代码,就成功解析了@KafkaListener注解的所有配置信息,并将其注册到了KafkaListenerEndpointRegistry中,待KafkaListenerEndpointRegistry启动后,就可以自动创建对应的KafkaMessageListenerContainer实例,并启动对应的线程,开始工作。

3. KafkaListenerEndpointRegistry的启动

在KafkaListener的各项配置都完成后,还需要启动KafkaListenerEndpointRegistry,让其开始监听kafka主题。具体实现代码如下:

public class CompositeKafkaListenerContainerFactory extends AbstractKafkaListenerContainerFactory
        implements InitializingBean {

    @SuppressWarnings({ "rawtypes", "unchecked" })
    @Override
    protected MessageListenerContainer createContainerInstance(KafkaListenerEndpoint endpoint) {
        if (!(endpoint instanceof MethodKafkaListenerEndpoint)) {
            throw new IllegalStateException("Endpoint must be of type MethodKafkaListenerEndpoint");
        }
        MethodKafkaListenerEndpoint methodEndpoint = (MethodKafkaListenerEndpoint) endpoint;
        // 根据配置创建一个对应的KafkaMessageListenerContainer实例
        KafkaMessageListenerContainer container = createContainer(methodEndpoint);
        // 初始化该实例,并启动它
        configureContainer(container, methodEndpoint);
        return container;
    }

    ...

    private void doStart(List endpoints) {
        for (KafkaListenerContainerEndpoint endpoint : endpoints) {
            KafkaMessageListenerContainer container = (KafkaMessageListenerContainer) endpoint.createMessageListenerContainer();
            container.start();
        }
    }
}

在CompositeKafkaListenerContainerFactory的doStart方法中,遍历所有创建的KafkaMessageListenerContainer实例,并调用其start方法启动监听。在KafkaMessageListenerContainer的start方法中,主要是启动若干监听kafka的线程,并准备消费kafka消息。

4. KafkaListener的总结

通过对@KafkaListener注解的源码分析,我们可以得知,@KafkaListener是基于消息监听器容器实现的,其生命周期和消息监听器容器是一致的。

在使用@KafkaListener注解时,需要配置监听的kafka主题、消费组ID、容器监听工厂等信息。@KafkaListener注解标注的方法会在对应的kafka主题中有消息到达时得到调用,并将接收到的消息作为参数传入注解标注的方法中。同时需要注意的是,@KafkaListener注解的方法应该在消息处理时保持幂等性。

总的来说,通过@KafkaListener注解和相关配置,我们可以非常方便地实现kafka消息的消费和处理,从而让我们的系统实现更多的功能。