spring Kafka中的KafkaListener源码分析
更新时间:2023-11-231. @KafkaListener的作用
@KafkaListener主要用于监听kafka中的topic,并通过指定的方法对从kafka中获取到的数据进行处理。下面是一个简单的@KafkaListener的用法示例:
@Component public class MyListener { @KafkaListener(topics = "test_topic") public void listen(String message) { System.out.println("received message: " + message); } }
该示例中,@KafkaListener注解标注的方法会在监听的test_topic中有数据时被调用,并将获取到的数据作为参数传入方法中,然后进行处理。
2. @KafkaListener源码分析
我们来看一下@KafkaListener注解的具体实现。在spring-kafka的源码中,@KafkaListener被定义在org.springframework.kafka.annotation包下。其核心类是KafkaListenerAnnotationBeanPostProcessor,它是一个BeanPostProcessor,用于解析并处理@KafkaListener注解。
public class KafkaListenerAnnotationBeanPostProcessor extends AbstractKafkaListenerContainerFactoryAware { private final KafkaListenerConfigUtils utils = new KafkaListenerConfigUtils(); private MessageHandlerMethodFactory messageHandlerMethodFactory; private KafkaListenerEndpointRegistry endpointRegistry; private BeanFactory beanFactory; private ResourceLoader resourceLoader; private KafkaListenerEndpointRegistrar registrar; // 处理@KafkaListener注解 @Override public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) { String[] beanNames = registry.getBeanDefinitionNames(); for (String beanName : beanNames) { BeanDefinition beanDefinition = registry.getBeanDefinition(beanName); String beanClassName = beanDefinition.getBeanClassName(); if (beanClassName != null) { Class<?> clazz = ClassUtils.resolveClassName(beanClassName, ClassUtils.getDefaultClassLoader()); Map> annotatedMethods = MethodIntrospector.selectMethods(clazz, (MethodIntrospector.MetadataLookup >) method -> { Set listenerMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations( method, KafkaListener.class, KafkaListeners.class); return (!listenerMethods.isEmpty() ? listenerMethods : null); }); if (!annotatedMethods.isEmpty()) { // 解析@KafkaListener注解,为每一个注解创建一个endpoint,并将endpoint注册到endpointRegistry for (Map.Entry > entry : annotatedMethods.entrySet()) { Method method = entry.getKey(); Set listeners = entry.getValue(); for (KafkaListener kafkaListener : listeners) { processKafkaListener(kafkaListener, method, beanName, registry); } } } } } } ... }
在该类的postProcessBeanDefinitionRegistry方法中,主要通过反射扫描所有类中的方法,查找哪些方法使用了@KafkaListener注解,然后为每一个使用注解的方法创建一个对应的Kafka监听器,即KafkaMessageListenerContainer。同时,还需要解析@KafkaListener注解中的其他配置信息,例如:
@KafkaListener(id = "myListener", topics = "${kafka.topic.test}", containerFactory = "kafkaListenerContainerFactory") public void handle(String message) { System.out.println("received message: " + message); }
上述代码中,除了topics属性外,还指定了containerFactory属性,用于关联kafkaListenerContainerFactory实例。在KafkaListenerAnnotationBeanPostProcessor的processKafkaListener方法中,我们就可以看到它的实现:
private void processKafkaListener(KafkaListener kafkaListener, Method method, String beanName, BeanDefinitionRegistry registry) { KafkaListenerEndpoint endpoint = new KafkaListenerEndpoint(); // 设置监听器ID endpoint.setId(resolveExpressionAsString(kafkaListener.id(), "id")); // 设置监听的topics endpoint.setTopics(resolveExpressionAsList(kafkaListener.topics())); // 设置监听的topics的partition信息,用于支持分区的消费 endpoint.setTopicPartitions(resolveExpressionAsTopicPartitionList(kafkaListener.topicPartitions())); // 设置监听器的ContainerFactory endpoint.setContainerFactoryBeanName(resolveExpressionAsString(kafkaListener.containerFactory(), "containerFactory")); // 注册目标类和目标方法 endpoint.setBean(beanName); endpoint.setMethod(method); endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory); String errorHandler = resolveExpressionAsString(kafkaListener.errorHandler(), "errorHandler"); if (StringUtils.hasText(errorHandler)) { endpoint.setErrorHandler(resolveExpressionAsClass(errorHandler, KafkaListenerErrorHandler.class)); } String groupId = resolveExpressionAsString(kafkaListener.groupId(), "groupId"); if (StringUtils.hasText(groupId)) { endpoint.setGroupId(groupId); } KafkaListenerConfigurer configurer = this.beanFactory != null ? this.beanFactory.getBean(KafkaListenerConfigurer.class) : null; if (configurer != null) { configurer.configureKafkaListeners(new KafkaListenerEndpointRegistrar.DefaultKafkaListenerEndpointRegistry(endpoint)); } KafkaListenerEndpointRegistrar registrar = getEndpointRegistrar(); // 注册监听器 registrar.registerEndpoint(endpoint, this.beanFactory); }
通过以上代码,就成功解析了@KafkaListener注解的所有配置信息,并将其注册到了KafkaListenerEndpointRegistry中,待KafkaListenerEndpointRegistry启动后,就可以自动创建对应的KafkaMessageListenerContainer实例,并启动对应的线程,开始工作。
3. KafkaListenerEndpointRegistry的启动
在KafkaListener的各项配置都完成后,还需要启动KafkaListenerEndpointRegistry,让其开始监听kafka主题。具体实现代码如下:
public class CompositeKafkaListenerContainerFactory extends AbstractKafkaListenerContainerFactory
在CompositeKafkaListenerContainerFactory的doStart方法中,遍历所有创建的KafkaMessageListenerContainer实例,并调用其start方法启动监听。在KafkaMessageListenerContainer的start方法中,主要是启动若干监听kafka的线程,并准备消费kafka消息。
4. KafkaListener的总结
通过对@KafkaListener注解的源码分析,我们可以得知,@KafkaListener是基于消息监听器容器实现的,其生命周期和消息监听器容器是一致的。
在使用@KafkaListener注解时,需要配置监听的kafka主题、消费组ID、容器监听工厂等信息。@KafkaListener注解标注的方法会在对应的kafka主题中有消息到达时得到调用,并将接收到的消息作为参数传入注解标注的方法中。同时需要注意的是,@KafkaListener注解的方法应该在消息处理时保持幂等性。
总的来说,通过@KafkaListener注解和相关配置,我们可以非常方便地实现kafka消息的消费和处理,从而让我们的系统实现更多的功能。