目录

第十章-RabbitMQ之Spring客户端源码

1. 前言

2. 客户端消费代码

2.1 消费的实现方式

2.2 消费中注解解释

2.3 推测Spring实现过程

3.MQ消费源码分析

3.1 集成SpringBoot 启动过程

3.2 Broker投递消息给客户端过程

3.3 客户端消费过程

4. 总结


第十章-RabbitMQ之Spring客户端源码

1. 前言

经过前面前面的学习,我们已经掌握了rabbitmq的基本用法,高级用法延迟队列、死信队列等,已经研究过了amqp-client的java客户端源码,由于我们在使用的时候,一般还是以SpringBoot为主,那经过Spring封装后的客户端源码是是如何实现的呢?

同学们最好需要有研读过 Spring源码及SpringBoot 源码的经验,会更好衔接一下,不过关系也不大。

由于Spring 体系的庞大,封装的rabbit客户端实现的功能也很多,例 创建连接、生产者推送消息,事务,消费者消费等等内容,那我们这次只抽取rabbitmq消费的部分,进行研读。

集成starter

        <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

2. 客户端消费代码

2.1 消费的实现方式

如之前我们提到的集成SpringBoot后的使用方式:

@RabbitHandler
@RabbitListener(queues = "SolarWaterHeater")
 @RabbitHandler@RabbitListener(queuesToDeclare = @Queue("SolarWaterHeater"))
    @RabbitHandler@RabbitListener(bindings = @QueueBinding(value = @Queue("SolarWaterHeater-RedWine"),key = "REDWINE",exchange = @Exchange(value = "routing-exchange", type = ExchangeTypes.DIRECT, durable = "false")))

2.2 消费中注解解释

这里面出现了两个注解

第一个:RabbitHandler 看下它的解释:

* Annotation that marks a method to be the target of a Rabbit message
* listener within a class that is annotated with {@link RabbitListener}.

如果一天类上面的注解是RabbitListener,那RabbitHandler标注的方法,即是Rabbit的消息监听者。

@Target({ ElementType.METHOD, ElementType.ANNOTATION_TYPE })
这个注解只能标注到Method

第二个 RabbitListener

1. Annotation that marks a method to be the target of a Rabbit message listener 

标注的方法是一个消息监听者

2. When defined at the class level, a single message listener container is used to
* service all methods annotated with {@code @RabbitHandler}

如果标注到类上,那标注RabbitHandler的方法即是消息监听

链一个:@RabbitListener和@RabbitHandler的使用_sliver1836的博客-CSDN博客

2.3 推测Spring实现过程

所以,我们后续的源码分析即基于此两个注解开展。

在开始看代码之前,我们先想一想,我们之前的使用java amqp客户端开发消费逻辑的过程,

1、创建连接

2、创建Channel

3、声明队列、Exchange、绑定关系

4、监听方法实现 继承DefaultConumer

5、basic.Consume 注册到Broker

6、Broker消息推送,监听方法实现消费

那现在Spring就靠两个注解就帮我们实现了消息的消费,有没有很神奇。顿时感叹程序猿越来越幸福,写代码如此简单了呢?但有利就有弊,Spring帮我们封装的太多,而我们知道的底层却太少了。

闲话少说,到这,大家想一下,如果让你写个注解,就去实现上面6个步骤的内容,你该如何去做呢?

开发自定义注解大家都应该做过,大致的逻辑应该是不是可以,在系统启动的时候,我们就会抓取到标注注解的方法,有此类的方法时,我们认为需要使用mq,我们在后端服务中依次的去执行上面中的6个步骤。这样把注解的方法实现了监听,后续监听消息进行消费。

这里只是一个大概的推测,大家自己自行发挥想像。

3.MQ消费源码分析

从哪入手呢?首先点开 RabbitListener 的源码,然后Download源码。

到这个界面:

我们不再研读RabbitListener这个注解的功能了,大家自己看。

然后紧接着看到 RabbitListenerAnnotationBeanPostProcessor

这个类有什么特点呢?首先是处理RabbitListener 的处理类,然后呢是一个BeanPostProcessor继承了BeanPostProcessor 接口-读过Spring源码的同学,肯定就能得到最有效的信息了,这个类会在系统初始化的时候,执行postProcessAfterInitialization()这个方法。如果没读过Spring源码的话就先跟着节奏走吧。

从这开始了我们的切入。

3.1 集成SpringBoot 启动过程

接着上面的步骤呢,我们往上简单倒一下,

首先 这是一个SpringBoot 项目,通过SpringBoot 的启动类的Main 方法进行启动,然后开始扫描各个组件,初始化各种信息,这个不再细聊。【需要读SpringBoot源码】

其次呢,SpringBoot 只是对Spring 的封装,还是需要回到Spring 的类初始化的过程中去。【需要读Spring源码】

如下呢,即Spring 的核心初始化方法:无论Spring 再怎么升级,这几个核心方法基本不会怎么变化了,这里面我们找到 【registerBeanPostProcessors】,从这里面就会触发到我们上面所说的-

RabbitListenerAnnotationBeanPostProcessor

@Overridepublic void refresh() throws BeansException, IllegalStateException {synchronized (this.startupShutdownMonitor) {// Prepare this context for refreshing.prepareRefresh();// Tell the subclass to refresh the internal bean factory.ConfigurableListableBeanFactory beanFactory = obtainFreshBeanFactory();// Prepare the bean factory for use in this context.prepareBeanFactory(beanFactory);try {// Allows post-processing of the bean factory in context subclasses.postProcessBeanFactory(beanFactory);// Invoke factory processors registered as beans in the context.invokeBeanFactoryPostProcessors(beanFactory);// Register bean processors that intercept bean creation.registerBeanPostProcessors(beanFactory);// Initialize message source for this context.initMessageSource();// Initialize event multicaster for this context.initApplicationEventMulticaster();// Initialize other special beans in specific context subclasses.onRefresh();// Check for listener beans and register them.registerListeners();// Instantiate all remaining (non-lazy-init) singletons.finishBeanFactoryInitialization(beanFactory);// Last step: publish corresponding event.finishRefresh();}catch (BeansException ex) {if (logger.isWarnEnabled()) {logger.warn("Exception encountered during context initialization - " +"cancelling refresh attempt: " + ex);}// Destroy already created singletons to avoid dangling resources.destroyBeans();// Reset 'active' flag.cancelRefresh(ex);// Propagate exception to caller.throw ex;}finally {// Reset common introspection caches in Spring's core, since we// might not ever need metadata for singleton beans anymore...resetCommonCaches();}}}

随着Spring 的启动,开始触发到了RabbitListenerAnnotationBeanPostProcessor 中的

postProcessAfterInitialization 方法。

代码:

这就很好解释了,bean 就是我们的消费类,

解析到了 标有注解的方法 @RabbitListener,然后进行处理。processAmqpListener

@Overridepublic Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {Class<?> targetClass = AopUtils.getTargetClass(bean);final TypeMetadata metadata = this.typeCache.computeIfAbsent(targetClass, this::buildMetadata);for (ListenerMethod lm : metadata.listenerMethods) {for (RabbitListener rabbitListener : lm.annotations) {processAmqpListener(rabbitListener, lm.method, bean, beanName);}}if (metadata.handlerMethods.length > 0) {processMultiMethodListeners(metadata.classAnnotations, metadata.handlerMethods, bean, beanName);}return bean;}
protected void processAmqpListener(RabbitListener rabbitListener, Method method, Object bean, String beanName) {// 对应的消费方法Method methodToUse = checkProxy(method, bean);//封装对象MethodRabbitListenerEndpoint endpoint = new MethodRabbitListenerEndpoint();endpoint.setMethod(methodToUse);// 继续处理processListener(endpoint, rabbitListener, bean, methodToUse, beanName);}

继续:

protected void processListener(MethodRabbitListenerEndpoint endpoint, RabbitListener rabbitListener, Object bean,Object adminTarget, String beanName) {endpoint.setBean(bean);endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);endpoint.setId(getEndpointId(rabbitListener));endpoint.setQueueNames(resolveQueues(rabbitListener));endpoint.setConcurrency(resolveExpressionAsStringOrInteger(rabbitListener.concurrency(), "concurrency"));endpoint.setBeanFactory(this.beanFactory);endpoint.setReturnExceptions(resolveExpressionAsBoolean(rabbitListener.returnExceptions()));Object errorHandler = resolveExpression(rabbitListener.errorHandler());if (errorHandler instanceof RabbitListenerErrorHandler) {endpoint.setErrorHandler((RabbitListenerErrorHandler) errorHandler);}else if (errorHandler instanceof String) {String errorHandlerBeanName = (String) errorHandler;if (StringUtils.hasText(errorHandlerBeanName)) {endpoint.setErrorHandler(this.beanFactory.getBean(errorHandlerBeanName, RabbitListenerErrorHandler.class));}}else {throw new IllegalStateException("error handler mut be a bean name or RabbitListenerErrorHandler, not a "+ errorHandler.getClass().toString());}String group = rabbitListener.group();if (StringUtils.hasText(group)) {Object resolvedGroup = resolveExpression(group);if (resolvedGroup instanceof String) {endpoint.setGroup((String) resolvedGroup);}}String autoStartup = rabbitListener.autoStartup();if (StringUtils.hasText(autoStartup)) {endpoint.setAutoStartup(resolveExpressionAsBoolean(autoStartup));}endpoint.setExclusive(rabbitListener.exclusive());String priority = resolve(rabbitListener.priority());if (StringUtils.hasText(priority)) {try {endpoint.setPriority(Integer.valueOf(priority));}catch (NumberFormatException ex) {throw new BeanInitializationException("Invalid priority value for " +rabbitListener + " (must be an integer)", ex);}}// 以上 前面都完成了对 MethodRabbitListenerEndpoint 对象的封装,封装的也都是注解中的属性//此方法内部实际没执行 跳过resolveAdmin(endpoint, rabbitListener, adminTarget);//跳过RabbitListenerContainerFactory<?> factory = resolveContainerFactory(rabbitListener, adminTarget, beanName);// 属性填充 放入List ,不重要this.registrar.registerEndpoint(endpoint, factory);}

程序回转:

这里面来到一个

public void afterSingletonsInstantiated() 方法,这是由于实现了接口SmartInitializingSingleton, 后续得到了处理。

这里面会涉及到两个类:

1. RabbitListenerEndpointRegistrar

2. RabbitListenerEndpointRegistry

有没有长得很像,这里面是把 RabbitListenerEndpointRegistry 手工注册到了RabbitListenerEndpointRegistrar 里面,然后进行了一系列初始化,

这里面不再详细展开了,但这个RabbitListenerEndpointRegistry 很重要,后面还会涉及到它

RabbitListenerEndpointRegistry 实现了一个Lifecycle接口,后续会调用到它的实现start()

将对应的消费Class 做好了封装 ,返回,继续Spring的初始化过程。

来到Spring核心流程

finishRefresh();
 /*** Finish the refresh of this context, invoking the LifecycleProcessor's* onRefresh() method and publishing the* {@link org.springframework.context.event.ContextRefreshedEvent}.*/protected void finishRefresh() {// Clear context-level resource caches (such as ASM metadata from scanning).clearResourceCaches();// Initialize lifecycle processor for this context.initLifecycleProcessor();// Propagate refresh to lifecycle processor first.getLifecycleProcessor().onRefresh();// Publish the final event.publishEvent(new ContextRefreshedEvent(this));// Participate in LiveBeansView MBean, if active.LiveBeansView.registerApplicationContext(this);}

其中第三个方法

getLifecycleProcessor().onRefresh();

这个方法是获取 lifecycle的处理器,进行lifecycle接口实现类的处理,这就呼应到了上面的 RabbitListenerEndpointRegistry ,他实现了lifecycle的接口。

最终一番流转终于到了 这个Registry处理逻辑中:

 @Overridepublic void start() {for (MessageListenerContainer listenerContainer : getListenerContainers()) {startIfNecessary(listenerContainer);}}
 /*** Start the specified {@link MessageListenerContainer} if it should be started* on startup or when start is called explicitly after startup.* @param listenerContainer the container.* @see MessageListenerContainer#isAutoStartup()*/private void startIfNecessary(MessageListenerContainer listenerContainer) {if (this.contextRefreshed || listenerContainer.isAutoStartup()) {listenerContainer.start();}}
MessageListenerContainer 也是在上面afterSingletonsInstantiated 处理好的,现在要启动这个监听者容器。

来到了 AbstractMessageListenerContainer 中的启动方法:

/*** Start this container.* @see #doStart*/@Overridepublic void start() {if (isRunning()) {return;}if (!this.initialized) {synchronized (this.lifecycleMonitor) {if (!this.initialized) {afterPropertiesSet();}}}try {logger.debug("Starting Rabbit listener container.");configureAdminIfNeeded();checkMismatchedQueues();doStart();}catch (Exception ex) {throw convertRabbitAccessException(ex);}finally {this.lazyLoad = false;}}
configureAdminIfNeeded() 获取RabbitAdmin 
checkMismatchedQueues() 这个方法就很关键了,运行到此时打开我们的抓包工具,这里面开始创建Connection了。
protected void checkMismatchedQueues() {if (this.mismatchedQueuesFatal && this.amqpAdmin != null) {try {this.amqpAdmin.initialize();}catch (AmqpConnectException e) {logger.info("Broker not available; cannot check queue declarations");}catch (AmqpIOException e) {if (RabbitUtils.isMismatchedQueueArgs(e)) {throw new FatalListenerStartupException("Mismatched queues", e);}else {logger.info("Failed to get connection during start(): " + e);}}}else {try {// 创建连接方法Connection connection = getConnectionFactory().createConnection(); // NOSONARif (connection != null) {connection.close();}}catch (Exception e) {logger.info("Broker not available; cannot force queue declarations during start: " + e.getMessage());}}}

有没有很熟悉

Connection connection = getConnectionFactory().createConnection(); 
@Overridepublic final Connection createConnection() throws AmqpException {if (this.stopped) {throw new AmqpApplicationContextClosedException("The ApplicationContext is closed and the ConnectionFactory can no longer create connections.");}synchronized (this.connectionMonitor) {if (this.cacheMode == CacheMode.CHANNEL) {if (this.connection.target == null) {this.connection.target = super.createBareConnection();// invoke the listener *after* this.connection is assignedif (!this.checkoutPermits.containsKey(this.connection)) {this.checkoutPermits.put(this.connection, new Semaphore(this.channelCacheSize));}this.connection.closeNotified.set(false);getConnectionListener().onCreate(this.connection);}return this.connection;}else if (this.cacheMode == CacheMode.CONNECTION) {return connectionFromCache();}}return null; // NOSONAR - never reach here - exceptions}

运行完此步,如上的代码中,两个重要的点:

1. 此步直接就创建了Connection、

this.connection.target = super.createBareConnection();

看下抓包:

2. 继续这一步也很关键,创建完连接后,会把接下来的 Exchange、Queue、绑定关系根据注解配置中的内容,该创建的都创建一遍。

getConnectionListener().onCreate(this.connection);

直接运行到了

RabbitAdmin.initialize()

看方法头上的注释也很清晰

/*** Declares all the exchanges, queues and bindings in the enclosing application context, if any. It should be safe* (but unnecessary) to call this method more than once.*/@Override // NOSONAR complexitypublic void initialize() {if (this.applicationContext == null) {this.logger.debug("no ApplicationContext has been set, cannot auto-declare Exchanges, Queues, and Bindings");return;}this.logger.debug("Initializing declarations");Collection<Exchange> contextExchanges = new LinkedList<Exchange>(this.applicationContext.getBeansOfType(Exchange.class).values());Collection<Queue> contextQueues = new LinkedList<Queue>(this.applicationContext.getBeansOfType(Queue.class).values());Collection<Binding> contextBindings = new LinkedList<Binding>(this.applicationContext.getBeansOfType(Binding.class).values());processLegacyCollections(contextExchanges, contextQueues, contextBindings);processDeclarables(contextExchanges, contextQueues, contextBindings);final Collection<Exchange> exchanges = filterDeclarables(contextExchanges);final Collection<Queue> queues = filterDeclarables(contextQueues);final Collection<Binding> bindings = filterDeclarables(contextBindings);for (Exchange exchange : exchanges) {if ((!exchange.isDurable() || exchange.isAutoDelete())  && this.logger.isInfoEnabled()) {this.logger.info("Auto-declaring a non-durable or auto-delete Exchange ("+ exchange.getName()+ ") durable:" + exchange.isDurable() + ", auto-delete:" + exchange.isAutoDelete() + ". "+ "It will be deleted by the broker if it shuts down, and can be redeclared by closing and "+ "reopening the connection.");}}for (Queue queue : queues) {if ((!queue.isDurable() || queue.isAutoDelete() || queue.isExclusive()) && this.logger.isInfoEnabled()) {this.logger.info("Auto-declaring a non-durable, auto-delete, or exclusive Queue ("+ queue.getName()+ ") durable:" + queue.isDurable() + ", auto-delete:" + queue.isAutoDelete() + ", exclusive:"+ queue.isExclusive() + ". "+ "It will be redeclared if the broker stops and is restarted while the connection factory is "+ "alive, but all messages will be lost.");}}if (exchanges.size() == 0 && queues.size() == 0 && bindings.size() == 0) {this.logger.debug("Nothing to declare");return;}this.rabbitTemplate.execute(channel -> {declareExchanges(channel, exchanges.toArray(new Exchange[exchanges.size()]));declareQueues(channel, queues.toArray(new Queue[queues.size()]));declareBindings(channel, bindings.toArray(new Binding[bindings.size()]));return null;});this.logger.debug("Declarations finished");}

由于我们只创建了Queue,使用默认的Exchange,代码不贴太多了,只贴声明Queue的内容:

DeclareOk declareOk = channel.queueDeclare(queue.getName(), queue.isDurable(),queue.isExclusive(), queue.isAutoDelete(), queue.getArguments());

我们看下抓包情况:

到此呢,Queue也声明好了。下面呢,下面就该basic.Consume 了吧,把消费者注册到Broker中去。

好,我们继续:

继续代码又倒回去,倒到:

/*** Start this container.* @see #doStart*/@Overridepublic void start() {if (isRunning()) {return;}if (!this.initialized) {synchronized (this.lifecycleMonitor) {if (!this.initialized) {afterPropertiesSet();}}}try {logger.debug("Starting Rabbit listener container.");configureAdminIfNeeded();checkMismatchedQueues();doStart();}catch (Exception ex) {throw convertRabbitAccessException(ex);}finally {this.lazyLoad = false;}}
doStart(); 

一看doxxx,那一定是要干实际的事情的,很重要对吧,

我们进入到

SimpleMessageListenerContainer

中的实现方法中:

/*** Re-initializes this container's Rabbit message consumers, if not initialized already. Then submits each consumer* to this container's task executor.*/@Overrideprotected void doStart() {checkListenerContainerAware();super.doStart();synchronized (this.consumersMonitor) {if (this.consumers != null) {throw new IllegalStateException("A stopped container should not have consumers");}int newConsumers = initializeConsumers();if (this.consumers == null) {logger.info("Consumers were initialized and then cleared " +"(presumably the container was stopped concurrently)");return;}if (newConsumers <= 0) {if (logger.isInfoEnabled()) {logger.info("Consumers are already running");}return;}Set<AsyncMessageProcessingConsumer> processors = new HashSet<AsyncMessageProcessingConsumer>();for (BlockingQueueConsumer consumer : this.consumers) {AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);processors.add(processor);getTaskExecutor().execute(processor);if (getApplicationEventPublisher() != null) {getApplicationEventPublisher().publishEvent(new AsyncConsumerStartedEvent(this, consumer));}}waitForConsumersToStart(processors);}}

前面几步意义不大,走到

int newConsumers = initializeConsumers();
protected int initializeConsumers() {int count = 0;synchronized (this.consumersMonitor) {if (this.consumers == null) {this.cancellationLock.reset();this.consumers = new HashSet<BlockingQueueConsumer>(this.concurrentConsumers);for (int i = 0; i < this.concurrentConsumers; i++) {BlockingQueueConsumer consumer = createBlockingQueueConsumer();this.consumers.add(consumer);count++;}}}return count;}

重点来咯,

BlockingQueueConsumer consumer = createBlockingQueueConsumer();

这里把BlockingQueueConsumer做了一个初始化,相关的不再展开。

BlockingQueueConsumer -这将是后续我们非常重要的一个类

继续重点内容,回到我们上面代码块中的内容:

for (BlockingQueueConsumer consumer : this.consumers) {AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);processors.add(processor);getTaskExecutor().execute(processor);if (getApplicationEventPublisher() != null) {getApplicationEventPublisher().publishEvent(new AsyncConsumerStartedEvent(this, consumer));}}

这个for循环很重要了,由于我们是一个消费者,循环一次。

初始化一个

AsyncMessageProcessingConsumer

对象。这个对象点进去,大家看下这是个实现了Runnable接口的线程对象。哦哦,真正的核心哦。使用 SimpleAsyncTaskExecutor   来new的线程,这个执行器可不是线程池哦,来一个线程就会New一个,大家自行研究。

这里面我们可以得到一个结论,就是一个消费者,就会开启一个线程进行监听。

从此开启了新线程,【打断点记得Thread模式】

看线程的实现:

@Override // NOSONAR - complexity - many catch blockspublic void run() { // NOSONAR - line countif (!isActive()) {return;}boolean aborted = false;this.consumer.setLocallyTransacted(isChannelLocallyTransacted());String routingLookupKey = getRoutingLookupKey();if (routingLookupKey != null) {SimpleResourceHolder.bind(getRoutingConnectionFactory(), routingLookupKey); // NOSONAR both never null}if (this.consumer.getQueueCount() < 1) {if (logger.isDebugEnabled()) {logger.debug("Consumer stopping; no queues for " + this.consumer);}SimpleMessageListenerContainer.this.cancellationLock.release(this.consumer);if (getApplicationEventPublisher() != null) {getApplicationEventPublisher().publishEvent(new AsyncConsumerStoppedEvent(SimpleMessageListenerContainer.this, this.consumer));}this.start.countDown();return;}try {initialize();while (isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {mainLoop();}}

摘出核心点:

1、initialize();

 private void initialize() throws Throwable { // NOSONARtry {redeclareElementsIfNecessary();this.consumer.start();this.start.countDown();}

初始化内容,

1.  redeclareElementsIfNecessary - 这个是再进行检查进行Exchange 、Queue、Binding的声明与前面声明的方法实现的共用。

2.this.consumer.start();

public void start() throws AmqpException {if (logger.isDebugEnabled()) {logger.debug("Starting consumer " + this);}this.thread = Thread.currentThread();try {this.resourceHolder = ConnectionFactoryUtils.getTransactionalResourceHolder(this.connectionFactory,this.transactional);this.channel = this.resourceHolder.getChannel();ClosingRecoveryListener.addRecoveryListenerIfNecessary(this.channel); // NOSONAR never null here}catch (AmqpAuthenticationException e) {throw new FatalListenerStartupException("Authentication failure", e);}this.deliveryTags.clear();this.activeObjectCounter.add(this);passiveDeclarations();setQosAndreateConsumers();}
这里面我们看这个方法就行
setQosAndreateConsumers();

Qos是设定消费时每次抓取的数量

并CreadConsumers

private void setQosAndreateConsumers() {if (!this.acknowledgeMode.isAutoAck() && !cancelled()) {// Set basicQos before calling basicConsume (otherwise if we are not acking the broker// will send blocks of 100 messages)try {this.channel.basicQos(this.prefetchCount);}catch (IOException e) {this.activeObjectCounter.release(this);throw new AmqpIOException(e);}}try {if (!cancelled()) {for (String queueName : this.queues) {if (!this.missingQueues.contains(queueName)) {consumeFromQueue(queueName);}}}}catch (IOException e) {throw RabbitExceptionTranslator.convertRabbitAccessException(e);}}

有没有很熟悉:

this.channel.basicQos(this.prefetchCount);

抓包:

继续:

consumeFromQueue(queueName);
private void consumeFromQueue(String queue) throws IOException {InternalConsumer consumer = new InternalConsumer(this.channel, queue);String consumerTag = this.channel.basicConsume(queue, this.acknowledgeMode.isAutoAck(),(this.tagStrategy != null ? this.tagStrategy.createConsumerTag(queue) : ""), this.noLocal,this.exclusive, this.consumerArgs,consumer);if (consumerTag != null) {this.consumers.put(queue, consumer);if (logger.isDebugEnabled()) {logger.debug("Started on queue '" + queue + "' with tag " + consumerTag + ": " + this);}}else {logger.error("Null consumer tag received for queue " + queue);}}

有没有很熟悉:

String consumerTag = this.channel.basicConsume(queue, this.acknowledgeMode.isAutoAck(),(this.tagStrategy != null ? this.tagStrategy.createConsumerTag(queue) : ""), this.noLocal,this.exclusive, this.consumerArgs,consumer);

那这里有有一个核心的类出现了。InternalConsumer

这里转向 3.2 Broker投递消息给客户端  解释

到这里呢,我们把消费者注册到了Broker中去了,看下抓包情况:

到这呢,所以Broker也就能给我们投递消息了。

2、mainLoop();

             initialize();while (isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {mainLoop();}

这里也有个mainLoop ,于是想到了,java 的amqp客户端也存在呢mainLoop ,这里的逻辑难道也和他的逻辑契合的?我们转向 3.3 客户端消费过程继续。

3.2 Broker投递消息给客户端过程

上面说到了,已经将消费者注册到了Broker中去了,但一定注意哦,注册到Broker 中的,可不是我们使用注解 RabbitListener 标注的实际消费方法哦,而是新创建了一个内部的消费者:InternalConsumer

我们看下他的一个实现

private final class InternalConsumer extends DefaultConsumer {private final String queueName;boolean canceled;InternalConsumer(Channel channel, String queue) {super(channel);this.queueName = queue;}@Overridepublic void handleConsumeOk(String consumerTag) {super.handleConsumeOk(consumerTag);if (logger.isDebugEnabled()) {logger.debug("ConsumeOK: " + BlockingQueueConsumer.this);}if (BlockingQueueConsumer.this.applicationEventPublisher != null) {BlockingQueueConsumer.this.applicationEventPublisher.publishEvent(new ConsumeOkEvent(this, this.queueName, consumerTag));}}@Overridepublic void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {if (logger.isDebugEnabled()) {if (RabbitUtils.isNormalShutdown(sig)) {logger.debug("Received shutdown signal for consumer tag=" + consumerTag + ": " + sig.getMessage());}else {logger.debug("Received shutdown signal for consumer tag=" + consumerTag, sig);}}BlockingQueueConsumer.this.shutdown = sig;// The delivery tags will be invalid if the channel shuts downBlockingQueueConsumer.this.deliveryTags.clear();BlockingQueueConsumer.this.activeObjectCounter.release(BlockingQueueConsumer.this);}@Overridepublic void handleCancel(String consumerTag) throws IOException {if (logger.isWarnEnabled()) {logger.warn("Cancel received for " + consumerTag + " ("+ this.queueName+ "); " + BlockingQueueConsumer.this);}BlockingQueueConsumer.this.consumers.remove(this.queueName);if (!BlockingQueueConsumer.this.consumers.isEmpty()) {basicCancel(false);}else {BlockingQueueConsumer.this.cancelled.set(true);}}@Overridepublic void handleCancelOk(String consumerTag) {if (logger.isDebugEnabled()) {logger.debug("Received cancelOk for tag " + consumerTag + " ("+ this.queueName+ "); " + BlockingQueueConsumer.this);}this.canceled = true;}@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) {if (logger.isDebugEnabled()) {logger.debug("Storing delivery for consumerTag: '"+ consumerTag + "' with deliveryTag: '" + envelope.getDeliveryTag() + "' in "+ BlockingQueueConsumer.this);}try {if (BlockingQueueConsumer.this.abortStarted > 0) {if (!BlockingQueueConsumer.this.queue.offer(new Delivery(consumerTag, envelope, properties, body, this.queueName),BlockingQueueConsumer.this.shutdownTimeout, TimeUnit.MILLISECONDS)) {Channel channelToClose = super.getChannel();RabbitUtils.setPhysicalCloseRequired(channelToClose, true);// Defensive - should never happenBlockingQueueConsumer.this.queue.clear();if (!this.canceled) {getChannel().basicCancel(consumerTag);}try {channelToClose.close();}catch (@SuppressWarnings("unused") TimeoutException e) {// no-op}}}else {BlockingQueueConsumer.this.queue.put(new Delivery(consumerTag, envelope, properties, body, this.queueName));}}catch (@SuppressWarnings("unused") InterruptedException e) {Thread.currentThread().interrupt();}catch (Exception e) {BlockingQueueConsumer.logger.warn("Unexpected exception during delivery", e);}}@Overridepublic String toString() {return "InternalConsumer{" + "queue='" + this.queueName + '\'' +", consumerTag='" + getConsumerTag() + '\'' +'}';}}

哇,内部类,而且继承了 DefaultConsumer ,这和我们前面学习Rabbitmq工作模式的过程中,自己手动开发的代码一样了吧,那我找到 投递方法:

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,

好亲切有木有,所以到这里真相大白咯。Broker将消息投递到了这里,我们看看他接收到消息搞什么动作?

BlockingQueueConsumer.this.queue.put(new Delivery(consumerTag, envelope, properties, body, this.queueName));

很明显,和java amqp client 实现一样,他这也用到了Queue,去存储了,

this.queue = new LinkedBlockingQueue<Delivery>(prefetchCount);

也是个阻塞Queue哦,看来spring搞了一通,从客户端那边的queue里拿来,又放了一次queue。

那放进去了,就等着取呗,看谁来取咯。

3.3 客户端消费过程

接续上面的 mainLoop(),既然消息又存到了本地的queue中,那mainLoop 的目的岂不是很明确了,那就是死循环的去取消息消息,然后再转调到我们实际的 加入@RabbitListener 的方法中去呢。究竟是不是呢,验证下:

private void mainLoop() throws Exception { // NOSONAR Exceptiontry {boolean receivedOk = receiveAndExecute(this.consumer); // At least one message receivedif (SimpleMessageListenerContainer.this.maxConcurrentConsumers != null) {checkAdjust(receivedOk);}long idleEventInterval = getIdleEventInterval();if (idleEventInterval > 0) {if (receivedOk) {updateLastReceive();}else {long now = System.currentTimeMillis();long lastAlertAt = SimpleMessageListenerContainer.this.lastNoMessageAlert.get();long lastReceive = getLastReceive();if (now > lastReceive + idleEventInterval&& now > lastAlertAt + idleEventInterval&& SimpleMessageListenerContainer.this.lastNoMessageAlert.compareAndSet(lastAlertAt, now)) {publishIdleContainerEvent(now - lastReceive);}}}}catch (ListenerExecutionFailedException ex) {// Continue to process, otherwise re-throwif (ex.getCause() instanceof NoSuchMethodException) {throw new FatalListenerExecutionException("Invalid listener", ex);}}catch (AmqpRejectAndDontRequeueException rejectEx) {/**  These will normally be wrapped by an LEFE if thrown by the*  listener, but we will also honor it if thrown by an*  error handler.*/}}

看下重点方法:

boolean receivedOk = receiveAndExecute(this.consumer); 
private boolean receiveAndExecute(final BlockingQueueConsumer consumer) throws Exception { // NOSONARPlatformTransactionManager transactionManager = getTransactionManager();if (transactionManager != null) {try {if (this.transactionTemplate == null) {this.transactionTemplate =new TransactionTemplate(transactionManager, getTransactionAttribute());}return this.transactionTemplate.execute(status -> { // NOSONAR null never returnedRabbitResourceHolder resourceHolder = ConnectionFactoryUtils.bindResourceToTransaction(new RabbitResourceHolder(consumer.getChannel(), false),getConnectionFactory(), true);// unbound in ResourceHolderSynchronization.beforeCompletion()try {return doReceiveAndExecute(consumer);}catch (RuntimeException e1) {prepareHolderForRollback(resourceHolder, e1);throw e1;}catch (Exception e2) {throw new WrappedTransactionException(e2);}});}catch (WrappedTransactionException e) { // NOSONAR exception flow controlthrow (Exception) e.getCause();}}return doReceiveAndExecute(consumer);}

抛开事务,我们不关注。

return doReceiveAndExecute(consumer);
private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Exception { //NOSONARChannel channel = consumer.getChannel();for (int i = 0; i < this.txSize; i++) {logger.trace("Waiting for message from consumer.");Message message = consumer.nextMessage(this.receiveTimeout);if (message == null) {break;}try {executeListener(channel, message);}

重点哦:

         Message message = consumer.nextMessage(this.receiveTimeout);

从内部消费者取消息咯

public Message nextMessage(long timeout) throws InterruptedException, ShutdownSignalException {if (logger.isTraceEnabled()) {logger.trace("Retrieving delivery for " + this);}checkShutdown();if (this.missingQueues.size() > 0) {checkMissingQueues();}Message message = handle(this.queue.poll(timeout, TimeUnit.MILLISECONDS));if (message == null && this.cancelled.get()) {throw new ConsumerCancelledException();}return message;}

看到poll 我们就放心了,把消息取出来,包装成Message对象。

快调头回来,继续看:

try {executeListener(channel, message);
}

这就要真正处理这个消息了

protected void executeListener(Channel channel, Message messageIn) {if (!isRunning()) {if (logger.isWarnEnabled()) {logger.warn("Rejecting received message because the listener container has been stopped: " + messageIn);}throw new MessageRejectedWhileStoppingException();}try {doExecuteListener(channel, messageIn);}catch (RuntimeException ex) {if (messageIn.getMessageProperties().isFinalRetryForMessageWithNoId()) {if (this.statefulRetryFatalWithNullMessageId) {throw new FatalListenerExecutionException("Illegal null id in message. Failed to manage retry for message: " + messageIn, ex);}else {throw new ListenerExecutionFailedException("Cannot retry message more than once without an ID",new AmqpRejectAndDontRequeueException("Not retryable; rejecting and not requeuing", ex),messageIn);}}handleListenerException(ex);throw ex;}}

代码不往下贴了,继续追就可以,最终还是找到了,打标@RabbitListener的那个方法上,得到了执行。真正让业务逻辑执行到了MQ推送过来的消息,

太不容易了,消息从发送-> Exchange->Queue -> java amqp client  ->spring client - >consume 最终得到了消费。

4. 总结

小结一下,我们从注解RabbitHandler RabbitListener 入手,一步步追踪到 与Broker链接的创建,Queue的声明,接着,启动新线程 注册一个内部的消费者到Broker中,Broker有消息的时候会推送到本地的BlockingQueue中去。

使用MainLoop 消费本地Blockinqueue的内容

贴个小图:

RabbitMQ初步到精通-第十章-RabbitMQ之Spring客户端源码相关推荐

  1. RabbitMQ 客户端源码系列 - Channel

    前言 续上次分享 RabbitMQ 客户端源码系列 - Connection ,继续分享Channel相关的源码分析 (com.rabbitmq:amqp-client:4.8.3) 友情提醒:本次分 ...

  2. RabbitMQ初步到精通-第四章-RabbitMQ工作模式-Routing

    第四章-RabbitMQ工作模式-Routing 1.模式介绍 1.1 模式 路由模式-继续还是和Exchange打交道,上节提到的Exchange类型为Fanout,此次声明的类型为direct 与 ...

  3. Spring源码深度解析(郝佳)-学习-Spring消息-整合RabbitMQ及源码解析

      我们经常在Spring项目中或者Spring Boot项目中使用RabbitMQ,一般使用的时候,己经由前人将配置配置好了,我们只需要写一个注解或者调用一个消息发送或者接收消息的监听器即可,但是底 ...

  4. springboot sessionfactory_Spring Boot从入门到精通(五)多数据源配置实现及源码分析...

    多数据源配置在项目软件中是比较常见的开发需求,Spring和Spring Boot中对此都有相应的解决方案可供大家参考.在Spring Boot中,如MyBatis.JdbcTemplate以及Jpa ...

  5. Android开发从入门到精通教程大总结(源码,教程,面试题,书籍,视频)

    老罗:  http://blog.csdn.net/column/details/androidluo.html http://xiaozu.renren.com/xiaozu/100692/3569 ...

  6. HTML5+CSS3从入门到精通书籍配套源码

    HTML5+CSS3从入门到精通配套视频讲解227节光盘源码云盘分享: 链接:https://pan.baidu.com/s/1nvG5FZz

  7. 精通SpringBoot---整合RabbitMQ消息队列

    今天来和朋友们一起学习下,SpringBoot怎么整合RabbitMQ.目前消息组件大致有三种:.activemq, rabbitmq, kafka.这三者各有优缺点,RabbitMQ相比之下是处于其 ...

  8. RabbitMQ入门到精通

    RabbitMQ 1. 消息中间件概述 1.1. 为什么学习消息队列 电子商务应用中,经常需要对庞大的海量数据进行监控,随着网络技术和软件开发技术的不断提高,在实战开发中MQ的使用与日俱增,特别是Ra ...

  9. rabbitmq java教程_GitHub - maxwellyue/rabbitmq-tutorial-java: RabbitMQ官方教程的翻译和说明--Java版...

    说明 主要通过Hello Word对RabbitMQ有初步认识 工作队列,即一个生产者对多个消费者 循环分发.消息确认.消息持久.公平分发 如何同一个消息同时发给多个消费者 开始引入RabbitMQ消 ...

最新文章

  1. symbian 视频播放解决方案
  2. vSphere 4系列之六:Standard vSwitch
  3. php并发取源码,PHP读取大文件源码示例-Swoole多进程读取大文件
  4. Swagger如何测试Date类型参数
  5. 子页面带到父页面提交
  6. 大数据之-Hadoop3.x_MapReduce_序列化案例FlowMapper---大数据之hadoop3.x工作笔记0098
  7. HTML5网站大观:分享8个精美的 HTML5 网站案例
  8. Python Imaging Library: ImageColor Module(图像颜色模块)
  9. 计算机程序编辑的英语,编译程序是为把高级语言书写的计算机程序翻译成面向计算机的目标程序而使用的计算机程序...
  10. windows终止处理程序( __try __finally) 简单解析
  11. Linux下安装配置JDK6
  12. xmpp即时通讯协议的特性---优点和缺点!
  13. 2021哈工程计算机考研科目,2021考研大纲:哈尔滨工程大学计算机专业基础综合2021年硕士研究生自命题考试大纲...
  14. 大数据可以应用在哪些行业?
  15. 【理财】指数基金投资指南
  16. 论“搜狗”输入法对用户的影响
  17. Dispatch简介
  18. Scylladb学习笔记
  19. 常用的社会信息公开查询
  20. 认知计算机系统和应用实验报告,计算机认知实习课实验报告.doc

热门文章

  1. Android 源码在线阅读
  2. Redis分布式锁-这一篇全了解(Redission实现分布式锁完美方案)
  3. tp6 openid获取 JWT中间件
  4. 货代公司主要是做什么的呢|货代公司作用
  5. 如何批量下载天堂图片网上多个精美作品并保存一个目录
  6. 从阿里巴巴发行价看A股新股投资机会
  7. 哈哈~ 开心死了 厚厚
  8. 试验试剂LR,双官能交联剂Methyltetrazine-PEG12-DBCO,四嗪二苯基环辛炔
  9. GameFramework篇:使用源码替换GameFramework.dll
  10. java窗口样式_美化窗口样式 java窗口界面美化包