@RabbitListener起作用的原理是什么
更新时间:2023-07-17前言:
RabbitListener是Spring AMQP提供的注解,用于定义和配置监听器端点方法,以接收RabbitMQ消息。它可以应用在方法上,将方法标记为RabbitMQ的消息消费者。当消息到达指定的队列时,RabbitListener会自动调用被注解的方法,以处理接收到的消息。那么,RabbitListener起作用的原理是什么呢?消息监听器容器的创建与初始化
RabbitListener的原理核心在于消息监听器容器的创建与初始化过程。当Spring应用程序启动时,会自动扫描并加载RabbitListener注解,并根据注解的配置信息创建对应的消息监听器容器。消息监听器容器主要负责监听指定队列中的消息,并将接收到的消息分发给对应的消费者方法进行处理。 下面是RabbitListener消息监听器容器的创建与初始化过程:1.首先,Spring会扫描带有RabbitListener注解的类和方法,加载这些注解配置。
public class RabbitListenerAnnotationBeanPostProcessor implements BeanPostProcessor, BeanFactoryAware { // ... @Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean); CollectionannotatedMethods = MethodIntrospector.selectMethods(targetClass, (MethodIntrospector.MetadataLookup ) method -> { Collection listenerAnnotations = findListenerAnnotations(method); return (!listenerAnnotations.isEmpty() ? listenerAnnotations : null); }); if (annotatedMethods.isEmpty()) { return bean; } // 为每一个带有RabbitListener注解的方法创建一个消息监听器容器 for (Method method : annotatedMethods) { processAmqpListener(method, bean, bean.getClass()); } return bean; } // ... }
2.根据注解的配置信息,创建一个消息监听器容器。消息监听器容器是RabbitListener相关功能的核心,负责监听指定队列中的消息,并将消息分发给对应的消费者方法进行处理。
@Configuration public class RabbitListenerConfig { @Value("${spring.rabbitmq.host}") private String host; @Value("${spring.rabbitmq.port}") private int port; @Value("${spring.rabbitmq.username}") private String username; @Value("${spring.rabbitmq.password}") private String password; // 创建消息监听器容器 @Bean public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setQueueNames("myQueue"); // 监听的队列名 container.setMessageListener(new MyMessageListener()); // 消息监听器 return container; } @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory factory = new CachingConnectionFactory(); factory.setHost(host); factory.setPort(port); factory.setUsername(username); factory.setPassword(password); return factory; } }
3.在创建的消息监听器容器中配置消息监听器和消息处理方法,以便将监听到的消息分发给相应的消费者方法处理。消息监听器负责监听队列,消息处理方法负责处理接收到的消息。
public class MyMessageListener implements MessageListener { @Override public void onMessage(Message message) { // 接收到消息后的处理逻辑 } }
4.配置完成后,消息监听器容器会根据配置的信息开始监听指定的队列。一旦有消息到达队列,消息监听器容器将会调用消息处理方法来处理接收到的消息。