持久化和非持久化消息发送的策略

通过setDeliveMode设置持久跟非持久属性。
消息的同步发送,跟异步发送:

  1. 消息的同步发送跟异步发送是针对broker 而言。
    在默认情况下,非持久化的消息是异步发送的。
    非持久化消息且非事物模式下是同步发送的。
    在开启事务的情况下,消息都是异步发送的。

  2. 通过以下三种方式来设置异步发送:

    • 在链接的URL 传递参数jms.useAsyncSend=true
       ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://127.0.0.1:61616?jms.useAsyncSend=true");
    
    • 调用ConnectionFactory的API 来设置。
    ((ActiveMQConnectionFactory) connectionFactory).setUseAsyncSend(true);
    
    • 调用Connection的API来设置
    ((ActiveMQConnection)connection).setUseAsyncSend(true);
    

    消息发送的流程图如下:

  3. 源码入口ActiveMQMessageProducer#send()

public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, AsyncCallback onComplete) throws JMSException {checkClosed(); //检查session的状态,如果session关闭则抛异常if (destination == null) {if (info.getDestination() == null) {throw new UnsupportedOperationException("A destination must be specified.");}throw new InvalidDestinationException("Don't understand null destinations");}//检查destination的类型,如果符合要求,就转变为ActiveMQDestinationActiveMQDestination dest;if (destination.equals(info.getDestination())) {dest = (ActiveMQDestination)destination;} else if (info.getDestination() == null) {dest = ActiveMQDestination.transform(destination);} else {throw new UnsupportedOperationException("This producer can only send messages to: " + this.info.getDestination().getPhysicalName());}if (dest == null) {throw new JMSException("No destination specified");}if (transformer != null) {Message transformedMessage = transformer.producerTransform(session, this, message);if (transformedMessage != null) {message = transformedMessage;}}//如果发送窗口大小不为空,则判断发送窗口的大小决定是否阻塞if (producerWindow != null) {try {producerWindow.waitForSpace();} catch (InterruptedException e) {throw new JMSException("Send aborted due to thread interrupt.");}}//发送消息到broker的topicthis.session.send(this, dest, message, deliveryMode, priority, timeToLive, producerWindow, sendTimeout, onComplete);stats.onMessage();}
  1. ActiveMQSession#send()方法去做真正的发送
                  //。。。。这里省略一部分源码,主要做了对消息的封装,msg.setConnection(this.connection);msg.onSend();msg.setProducerId(msg.getMessageId().getProducerId());//消息异步发送的判断的条件: 回掉onComplete不为空、超时时间、不需要反馈、是否为异步,是否持久化if (onComplete==null && sendTimeout <= 0 && !msg.isResponseRequired() && !connection.isAlwaysSyncSend() (!msg.isPersistent() || connection.isUseAsyncSend() || txid != null)) {this.connection.asyncSendPacket(msg);if (producerWindow != null) {int size = msg.getSize();//异步发送的情况下,需要设置producerWindow的大小producerWindow.increaseUsage(size);}} else {if (sendTimeout > 0 && onComplete==null) {this.connection.syncSendPacket(msg,sendTimeout);}else {this.connection.syncSendPacket(msg, onComplete);}}
  1. 首先我们看下异步发送
    异步发送的情况下,需要设置producerWindow的大小,producer每发送一个消息,统计一下发送的字节数,当发送的总字节数达到ProducerWindowSize峰值时,需要等待broker的确认。主要用来约束在异步发送时producer端允许积压的(尚未ACK)的消息的大小。每次发送消息之后,都将会导致memoryUsage大小增加(+msg.size),当broker返回producerAck时,memoryUsage尺寸减少(producerAck.size,此size表示先前发送消息的大小)。
    ProducerWindowSize值初始化的方式有2种

    在brokerUrl中设置: “tcp://127.0.0.1:61616?jms.producerWindowSize=10204”,这种设置将会对所有的producer生效。
    在destinationUri中设置: “myQueue?producer.windowSize=10204”,此参数只会对使用此Destination实例的producer生效,将会覆盖上面brokerUrl中的producerWindowSize值

  2. 接下来我们接着看ActiveMQConnection#asyncSendPacket方法

    紧接着我们分析下transport 对象是如何初始化的?通过ActiveMQConnection对象构造器可以看出,transport对象是通过初始化Connection链接的时候创建的。创建ActiveMQConnection对象代码如下:

ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.161616");
Connection connection= connectionFactory.createConnection();

在创建connection 方法中我们发现transport创建的代码如下:


createTransport();来创建Transport对象

TransportFactory#connect(java.net.URI)

 public static Transport connect(URI location) throws Exception {TransportFactory tf = findTransportFactory(location);return tf.doConnect(location);}

1.创建上一步传入的URL 创建对应的 TransportFactory,调用TransportFactory#findTransportFactory方法创建工厂对象:

其中TRANSPORT_FACTORY_FINDER定义如下

FactoryFinder TRANSPORT_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/");

这点类似与之前dubbo 源码分析中的SPI思想,将需要加载的类写入固定文件夹下,通过解析加载配置文件来初始化对应的对象。

tcp 文件中的定义如下

因此 findTransportFactory创建的工厂对象为TcpTransportFactory

2.调用TransportFactory#doConnect(java.net.URI)创建Transport 对象

TransportFactory#createTransport方法,这里的TransportFactory为TcpTransportFactory

protected Transport createTransport(URI location, WireFormat wf) throws UnknownHostException, IOException {URI localLocation = null;String path = location.getPath();if(path != null && path.length() > 0) {int localPortIndex = path.indexOf(58);try {Integer.parseInt(path.substring(localPortIndex + 1, path.length()));String localString = location.getScheme() + ":/" + path;localLocation = new URI(localString);} catch (Exception var7) {LOG.warn("path isn't a valid local location for TcpTransport to use", var7.getMessage());if(LOG.isDebugEnabled()) {LOG.debug("Failure detail", var7);}}}SocketFactory socketFactory = this.createSocketFactory(); //因为目前是Tcp 传输的return this.createTcpTransport(wf, socketFactory, location, localLocation);//因此得到的是tcptransport}

从代码上看返回的是一个tcptransport 对象再往上看返回的是rc 对象。通过TransportFactory#configure对创建好的tcptransport 进行包装代码如下:

public Transport configure(Transport transport, WireFormat wf, Map options) throws Exception {
//组装一个复合的transport,这里会包装两层,一个是IactivityMonitor.另一个是WireFormatNegotiatortransport = compositeConfigure(transport, wf, options);//再做一层包装,MutexTransporttransport = new MutexTransport(transport);//包装ResponseCorrelatortransport = new ResponseCorrelator(transport);return transport;}

到目前为止,tcptransport 是一系列的包装 ResponseCorrelator(MutexTransport(WireFormatNegotiator(IactivityMonitor(TcpTransport()))
ResponseCorrelator 异步请求包装。
MutexTransport 加锁包装。
WireFormatNegotiator发送数据解析相关协议包装。
IactivityMonitor 心跳机制包装。

至此transport 对象就创建完毕。我们回到之前的消息异步发送.syncSendPacket(msg)方法

private void doAsyncSendPacket(Command command) throws JMSException {try {this.transport.oneway(command);} catch (IOException e) {throw JMSExceptionSupport.create(e);}}

这里的transport对象指的是ResponseCorrelator,onway 设置了Command

 public void oneway(Object o) throws IOException {Command command = (Command)o;command.setCommandId(this.sequenceGenerator.getNextSequenceId());command.setResponseRequired(false);this.next.oneway(command);}

next 指的是MutexTransport ,主要是加锁,

  public void oneway(Object command) throws IOException {this.writeLock.lock();try {this.next.oneway(command);} finally {this.writeLock.unlock();}}

next 指的的WireFormatNegotiator#oneway 解析我们发送的内容

 public void oneway(Object command) throws IOException {boolean wasInterrupted = Thread.interrupted();try {if(this.readyCountDownLatch.getCount() > 0L && !this.readyCountDownLatch.await(this.negotiateTimeout, TimeUnit.MILLISECONDS)) {throw new IOException("Wire format negotiation timeout: peer did not send his wire format.");}} catch (InterruptedException var14) {InterruptedIOException interruptedIOException = new InterruptedIOException("Interrupted waiting for wire format negotiation");interruptedIOException.initCause(var14);try {this.onException(interruptedIOException);} finally {Thread.currentThread().interrupt();wasInterrupted = false;}throw interruptedIOException;} finally {if(wasInterrupted) {Thread.currentThread().interrupt();}}super.oneway(command);}

这个里面调用了父类的 oneway ,父类是 TransportFilter 类

  public void oneway(Object command) throws IOException {this.next.oneway(command);}

这里的next 是InactivityMonitor,我们发现并没有对此方法进行实现,我们去看他的父类。

 public void oneway(Object o) throws IOException {this.sendLock.readLock().lock();this.inSend.set(true);try {this.doOnewaySend(o);} finally {this.commandSent.set(true);this.inSend.set(false);this.sendLock.readLock().unlock();}}

doOnewaySend()通过模板模式调用子类的

private void doOnewaySend(Object command) throws IOException {if(this.failed.get()) {throw new InactivityIOException("Cannot send, channel has already failed: " + this.next.getRemoteAddress());} else {if(command.getClass() == WireFormatInfo.class) {synchronized(this) {this.processOutboundWireFormatInfo((WireFormatInfo)command);}}this.next.oneway(command);}}

这里的next 其实就是我们最总的transport 对象

public void oneway(Object command) throws IOException {this.checkStarted();this.wireFormat.marshal(command, this.dataOut);this.dataOut.flush();}

通过wireFormat对数据进行格式化,然后通过sokect 对数据进行传输。至此异步发送到此结束。

8 . 同步发送过程如下


调用request 方法,这里的transport 对象跟异步的一样。


发送的过程还是异步,它与上面的异步方式的区别:
通过阻塞的方式从FutureResponse 异步拿到一个结果。拿到的过程是阻塞的,所以就实现了一个阻塞的同步同步发送。


同步等等过程。
自此同步就发送完了。

ActiveMQ 原理分析—消息发送篇相关推荐

  1. ActiveMQ 原理分析—消息持久化篇

    消息持久化策略 背景 当消息发送者(provider)发送消息后消费者(consumer)没启动.故障, 或者消息中心在发送者发送消息后宕机了.ActiveMQ是如何保证消息不丢失,消费者能够正常的消 ...

  2. 分布式消息通信ActiveMQ原理 分析一

    本章知识点: 1. 持久化消息和非持久化消息的发送策略2. 消息的持久化方案及实践3. 消费端消费消息的原理 持久化消息与非持久化消息的发送策略 消息同步发送和异步发送 同步发送过程中,发送者发送一条 ...

  3. 分布式消息通信 ActiveMQ 原理 分析二

    本章重点: 1. unconsumedMessage 源码分析 2. 消费端的 PrefetchSize 3. 消息的确认过程 4. 消息重发机制 5. ActiveMQ 多节点高性能方案 消息消费流 ...

  4. ActiveMQ原理分析

    持久化消息和非持久化消息的发送策略 消息同步发送和异步发送 ActiveMQ支持同步.异步两种发送模式将消息发送到broker上.同步发送过程中,发送者发送一条消息会阻塞直到broker反馈一个确认消 ...

  5. (2)咚咚客户端核心设计原理分析 - 代码流程篇 (套接字建立,发送消息流程,接收消息流程)

    京麦Tcp建立连接流程: ConnectTask.run() -> connection.connect()(这里面也注册了一个连接状态的一个回调类) -> tryToConnectOnT ...

  6. 分布式消息通信ActiveMQ原理-消费消息策略-笔记

    2019独角兽企业重金招聘Python工程师标准>>> 消息消费流程图 消费端消费消息的原理 我们通过上一节课的讲解,知道有两种方法可以接收消息, 一种是使用同步阻塞的Message ...

  7. 速读“消息中间件”架构体系-ActiveMQ:入门+原理分析+优缺点!

    ActiveMQ 01 介绍 ActiveMQ 是完全基于 JMS 规范实现的一个消息中间件产品. 是 Apache 开源基金会研发的消息中间件.ActiveMQ主要应用在分布式系统架构中,帮助构建高 ...

  8. 一篇读懂:Android手机如何通过USB接口与外设通信(附原理分析及方案选型)

    更多技术干货,欢迎扫码关注博主微信公众号:HowieXue,共同探讨软件知识经验,关注就有海量学习资料免费领哦: 目录 0背景 1.手机USB接口通信特点 1.1 使用方便 1.2 通用性强 1.3 ...

  9. 分布式专题-分布式消息通信之ActiveMQ03-ActiveMQ原理分析(下)

    目录导航 前言 unconsumedMessage源码分析 异步分发的流程 同步分发的流程 消费端的PrefetchSize及优化 原理剖析 prefetchSize 的设置方法 总结 消息的确认过程 ...

最新文章

  1. 「咖啡馆」里的任正非:开放的技术和商业,不会遵循「丛林法则」
  2. MySQL中INSERT INTO SELECT的使用
  3. 专家:中国房地产泡沫崩溃时间就是今明二年
  4. ASP:关于生成HTML文件的新闻系统
  5. 【mysql】启动mysql 服务器 Redirecting to /bin/systemctl start mysql.service
  6. 提高抗打击能力_输不起、爱放弃,孩子抗挫能力差怎么办?3招教你培养孩子抗挫力...
  7. asp.net ajax 怎么获取前端ul li_字节前端提前批面试题:触发了几次回流几次重绘...
  8. RDD基本转换操作:zipWithIndex、zipWithUniqueId
  9. 中科大 计算机网络12 Web和HTTP
  10. Anaconda下安装 TensorFlow 和 keras 以及连接pycharm
  11. mysql server 5.7.16_mysql 5.7.16 安装配置方法图文教程(ubuntu 16.04)
  12. python基础知识-python基础知识(一)
  13. 【面试感悟】一名3年工作经验的程序员应该具备的技能
  14. 爬虫代理哪家强?十大付费代理详细对比评测出炉!
  15. HTML文档的基本结构
  16. 计算机网络总线型结构优,总线型拓扑结构优缺点是什么
  17. win10小娜_win10小娜不好用,想禁用或彻底删除Cortana,就用这2招
  18. php 文本域,关于使用文本域(TextArea)的一个问题
  19. 游戏开发人员推荐书单
  20. Python使用FTP上传文件

热门文章

  1. 用鸽 计算机教案,六年级上信息技术教案训“鸽”南方版.docx
  2. ubuntu官方live cd和dvd下载地址
  3. 东方财富-web前端实习-笔试面试
  4. 分类树(回归树)的优劣势
  5. Linux下终端的相关函数,gprof 使用和介绍
  6. AMD 锐龙CPU安装genymotion的问题
  7. 没人比我更懂系列之--HashMap底层原理及相关问题
  8. win10+php+com组件,分享Win10系统打不开COM组件提示错误代码80040154的解决方法
  9. python es 数据库 ik_Centos7 搭建ES搜索引擎,并通过go-mysql-elasticsearch 实现数据同步...
  10. 【cocos2d-x】如何使用Cocos2D-x制作一款简单的iphoneAndroid游戏①