目录

JMS 基本概念

JMS 消息结构

JMS 消息确认模式


JMS 基本概念

提示:可以先参考《ActiveMQ 简介 与 Maven 项目基本使用 》的编码,然后更容易理解本文的理论。

1、JMS 全称  Java Message Service ,Java 消息服务,是 Java EE 中的一个技术。

2、JMS 规范定义了 Java 中访问消息中间件的接口,并没有给予实现,实现 JMS 接口的消息中间件成为 JMS Provider(供应者),例如 ActiveMQ 。

3、JMS Provider:实现 JMS 接口和规范的消息中间件。

4、JMS Message:JMS 的消息,由以下三个部分组成:

1)消息头:每个消息头字段都有对应的 getter 和 setter 方法

2)消息属性:如果需要除消息头字段以外的值,那么可以使用消息属性

3)消息体:封装具体的消息数据

5、JMS  Producer:消息生产者,创建和发送 JMS 消息的客户端应用

6、JMS Consumer:消息消费者,接收和处理 JMS 消息的客户端应用。消息的消费可以采用以下两种方式之一:

1)同步消费:通过调用消费者的 receive 方法从目的地中显示的读取消息数据,receive 方法会一直阻塞到消息到达。

2)异步消费:客户端可以为消费者注册一个消息监听器,以定义消息到达时所采取的措施/动作。

7、JMS domains:消息传递域,JMS 规范中定义了两种消息转递域:点对点(point-to-point,简写成 PTP),发布/订阅(publish/subscribe,简写为 pub/sub)

8、点对点(point-to-point)消息传递域特点如下:

1)每个生产可以对应多个消费者,但是每个消息只能有一个消费者消费

2)消息的生产者和消费者之间没有时间上的相关性,即无论消费者在生产者发送消息的时候是否已经运行,它启动后都可以提取消息

9、发布/订阅(publish/subscribe)消息传递域特点如下:

1)每个消息可以有多个消费者消费

2)生产者和消费者之间有时间上的相关性。订阅一个主题的消费者只能消费自它订阅之后生产者发布的消息。即使消费者订阅了主题,如果生产者发布消息的时候,消费者宕机了,那么它再次启动的时候,默认也是不能再接收的。但 JMS 规范允许客户端创建持久订阅,允许消费者消费它在未激活状态时发送的消息。

10、在点对点消息传递域中,目的地被称为队列(queue),在发布/订阅消息传递域中,目的地被称为主题(topic)

11、Connection Factory:连接工厂,用来创建连接对象,以连接到 JMS 的 Provider(供应商,实现者)

12、JMS Connection:封装了客户与 JMS 提供者之间的一个虚拟的连接

13、JMS Session:是生产者和消费者的一个单线程上下文。会话用于创建消息生产者(Producer)、消息消费者(Consumer),和消息(Message)等。会话提供了一个事务性的上下文,一组发送和接收被组合到了一个原子操作中。

Destination 消息发送到的目的地
Acknowledge 签收
Transaction 事务
JMS Cline 用来收发消息的客户端应用

JMS 消息结构

1、JMS 消息(Message)由消息头、消息属性、消息体组成。

2、消息头包含消息的识别信息和路由信息,包含一些标准的属性如下:

标准属性 描述
JMSDestination JMS 目的地,由 send 方法设置。主要指 queue与topic。自动分配。
JMSDeliveryMode JMS 传递模式,由 send 方法设置。分为持久模式和非持久模式。前者的消息应该被传送"一次且仅仅一次",即使 JMS 提供者出现故障,该消息也不会丢失,会在服务器恢复后再次传递;后者的消息最多会传送一次,如果JMS Provider 服务器故障,该消息将永久丢失。自动分配。
JMSExpiration JMS 消息过期/到期时间,由 send 方法设置。等于 Destination 的 send 方法中的 timeToLive 值加上发送时刻的 GMT 时间值。如果 timeToLive 等于0,则 JMSExpiration 被设为0,表示该消息永不过期。如果发送后,在消息过期时间之后还没有被发送到目的地,则该消息被清除。自动分配。
JMSPriority JMS 消息优先级,由 send 方法设置。有 0-9 十个级别,0-4是普通消息,5-9是加急消息。JMS 不要求 JMS Provider 严格按着十个优先级发送消息,但必须保证加急消息要先于普通消息到达。默认是第4级。自动分配。
JMSMessageID JMS 消息标识 id,由 send 方法设置。唯一识别每个消息的标识,由 JMS Provider 产生。自动分配。
JMSTimestamp JMS 时间戳,由客户端设置。一个 JMS Provider 在调用 send 方法时自动设置的。它是消息被发送和消费者实际接收的时间差。自动分配。
JMSCorrelationID JMS 相关性 id,由客户端设置。用来连接到另外一个消息,典型的应用是在回复消息中连接到原消息。可以是任意值,不仅仅是 JMSMessageId。
JMSType JMS 消息类型的识别符。由开发者设置。
JMSReplyTo JMS 回复,由客户端设置。提供本信息回复消息的目的地址。
JMSRedelivered JMS 重发,由JMS Provider(供应商)设置

3、消息体:JMS API 定义了 5 种消息体格式,也叫消息类型,可以使用不同的形式发送接收数据,并可以兼容现有的消息格式。包括:TextMessage、MapMessage、BytesMessage、StreamMessage 和 ObjectMessage。它们都是 Message 接口的子类。

4、消息属性,包含以下三种类型的属性:

1)应用程序设置和添加的属性,比如:Message.setStringProperty("username","zhangSan");

2)JMS 定义的属性:使用 "JMSX" 作为属性名的前缀,connection.getMetaData().getJMSXPropertyNames(); 返回连接支持的所有 JMSX 属性的名字;

3)JMS 供应商特点的属性

JMS 定义的属性
JMSXUserID 发送消息的用户标识,发送时提供商设置
JMSXAppID 发送消息的应用标识,发送时提供商设置
JMSXDeliveryCount 转发消息重试次数,第一次是1、第二次是2、...,发送时提供商设置
JMSXGroupID 消息所在消息组的标识,由客户端设置
JMSXGroupSeq 组内消息的序号,第一个消息是1,第二个是2,...,由客户端设置
JMSXProducerTXID 产生消息的事务的事务标识,发送时提供商设置
JMSXConsumerTXID 消费消息的事务的事务标识,接收时提供商设置
JMSXRcvTimestamp JMS 转发消息到消费者的时间,接收时提供商设置
            String brokerURL = "tcp://127.0.0.1:61616";//ActiveMQ 中间件连接地址/*** 创建 javax.jms.ConnectionFactory 连接工厂* org.apache.activemq.ActiveMQConnectionFactory 中默认设置了大量的参数,还有几个重载的构造器可以选择*/ConnectionFactory mqConnectionFactory = new ActiveMQConnectionFactory(brokerURL);//如果 ActiveMQ 连不上,则抛异常:java.net.ConnectException: Connection refused: connectConnection connection = mqConnectionFactory.createConnection();//通过连接工厂获取连接 javax.jms.Connectionconnection.start();//启动连接,同理还有 stop、close//连接元数据ConnectionMetaData metaData = connection.getMetaData();int jmsMajorVersion = metaData.getJMSMajorVersion();int jmsMinorVersion = metaData.getJMSMinorVersion();String jmsProviderName = metaData.getJMSProviderName();String jmsVersion = metaData.getJMSVersion();int providerMajorVersion = metaData.getProviderMajorVersion();int providerMinorVersion = metaData.getProviderMinorVersion();String providerVersion = metaData.getProviderVersion();Enumeration jmsxPropertyNames = metaData.getJMSXPropertyNames();System.out.println("jmsMajorVersion = " + jmsMajorVersion);//jmsMajorVersion = 1System.out.println("jmsMinorVersion = " + jmsMinorVersion);//jmsMinorVersion = 1System.out.println("jmsProviderName = " + jmsProviderName);//jmsProviderName = ActiveMQSystem.out.println("jmsVersion = " + jmsVersion);//jmsVersion = 1.1System.out.println("providerMajorVersion = " + providerMajorVersion);//providerMajorVersion = 5System.out.println("providerMinorVersion = " + providerMinorVersion);//providerMinorVersion = 15System.out.println("providerVersion = " + providerVersion);//providerVersion = 5.15.9while (jmsxPropertyNames.hasMoreElements()){Object o = jmsxPropertyNames.nextElement();System.out.print(o+"\t");//JMSXUserID JMSXGroupID JMSXGroupSeq    JMSXDeliveryCount   JMSXProducerTXID}

JMS 消息确认模式

1、JMS 消息只有在被确认之后,才认为已经成功的消费了。消息的成功消费通常包含三个阶段:客户接收消息、客户处理消息、消息被确认。消息被确认后,消费者无法再次读取,相反如果消息未被确认,则消费者可以一直读取。

2、createSession(boolean transacted, int acknowledgeMode)

transacted 为 true 时:表示事务性会话,此时当一个事务被提交时,确认自动发生:session.commit();//确认消息,告诉中间件,消息已经确认接收;与 acknowledgeMode 参数设置无关。

transacted 为 false 时:表示非事务性会话,此时消息何时被确认取决于创建会话时的应答模式(acknowledgeMode)。与 acknowledgeMode 参数有关。

3、应答/确认模式(acknowledgeMode) 在 transacted 为 false 时有效,可选值如下:

Session.AUTO_ACKNOWLEDGE 自动确认。当客户成功的从 receive 方法,或者从 MessageListener.onMessage 方法返回的时候,会话自动确认客户收到的消息。
Session.CLIENT_ACKNOWLEDGE 客户确认。客户通过调用消息(Session)的 acknowledge() 方法确认消息。client 确认模式是在会话层上进行,只要确认一个被消费的消息,将自动确认当前会话中所有已经被消费的消息。比如一个会话中消费者消费了10个消息,然后确认了第8个消息,那么这10个消息都会被确认。
Session.DUPS_ACKNOWLEDGE 迟钝确认。该模式只是会话迟钝的确认消息的提交。如果 JMS Provider 失败,那么可能会导致一些重复的消息。如果是重复的消息,那么 JMS provider 必须把消息头的 JMSRedelivered 字段设置为 true。不常用。

4、下面使用环境:Maven 3.6.1 + Java JDK 1.8 + ActiveMQ 5.15.9 + IDEA 2018 稍作演示,Mava 管理的 Java SE 应用。pom.xml 文件如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.wmx</groupId>
<artifactId>activeMQ1</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies><dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-all</artifactId><version>5.15.9</version></dependency>
</dependencies>
</project>

生产者内容如下:

import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.util.UUID;
/*** 消息发送者*/
@SuppressWarnings("all")
public class JmsSender {public static void main(String[] args) {Connection connection = null;Session session = null;try {String brokerURL = "tcp://127.0.0.1:61616";//ActiveMQ 中间件连接地址/*** 创建 javax.jms.ConnectionFactory 连接工厂* org.apache.activemq.ActiveMQConnectionFactory 中默认设置了大量的参数,还有几个重载的构造器可以选择*/ConnectionFactory mqConnectionFactory = new ActiveMQConnectionFactory(brokerURL);//如果 ActiveMQ 连不上,则抛异常:java.net.ConnectException: Connection refused: connectconnection = mqConnectionFactory.createConnection();//通过连接工厂获取连接 javax.jms.Connectionconnection.start();//启动连接,同理还有 stop、close/*** Session createSession(boolean transacted, int acknowledgeMode) 创建会话* transacted :表示是否开启事务* acknowledgeMode:表示会话确认模式*/session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);/*** createQueue(String queueName):创建消息队列,指定队列名称,消费者可以根据队列名称获取消息* Destination 目的地,重点,interface Queue extends Destination*/Destination destination = session.createQueue("queue-app");//createProducer(Destination destination):根据目的地创建消息生产者MessageProducer producer = session.createProducer(destination);int massageTotal = 5;for (int i = 0; i < massageTotal; i++) {MapMessage mapMessage = session.createMapMessage();//创建一个 Map 消息mapMessage.setStringProperty("token", UUID.randomUUID().toString());//设置属性。口令mapMessage.setIntProperty("expires", 60000);设置属性。口令失效时间mapMessage.setString("key", "嫦娥 " + (i + 1) + " 号");//设置消息内容producer.send(mapMessage);//生产者发送消息session.commit();//会话提交,发送消息}} catch (JMSException e) {e.printStackTrace();} finally {if (session != null) {try {session.close();//关闭会话} catch (JMSException e) {e.printStackTrace();}}if (connection != null) {try {connection.close();//关闭连接} catch (JMSException e) {e.printStackTrace();}}}}
}

消费者内容如下:

import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/*** 消息消费者*/
@SuppressWarnings("all")
public class JmsReceiver {public static void main(String[] args) {Connection connection = null;Session session = null;try {String brokerURL = "tcp://127.0.0.1:61616";//ActiveMQ 中间件连接地址/*** 创建 javax.jms.ConnectionFactory 连接工厂* org.apache.activemq.ActiveMQConnectionFactory 中默认设置了大量的参数,还有几个重载的构造器可以选择*/ConnectionFactory mqConnectionFactory = new ActiveMQConnectionFactory(brokerURL);//如果 ActiveMQ 连不上,则抛异常:java.net.ConnectException: Connection refused: connectconnection = mqConnectionFactory.createConnection();//通过连接工厂获取连接 javax.jms.Connectionconnection.start();//启动连接,同理还有 stop、close/*** Session createSession(boolean transacted, int acknowledgeMode) 创建会话* transacted :表示是否开启事务* acknowledgeMode:表示会话确认模式* 关闭事务,自动确认:表示 receive 方法、MessageListener.onMessage 方法返回后就已经确认了*/session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);/*** createQueue(String queueName):创建消息队列,指定队列名称,消费者可以根据队列名称获取消息* Destination 目的地,重点,interface Queue extends Destination*/Destination destination = session.createQueue("queue-app");//createProducer(Destination destination):根据目的地创建消息消费者MessageConsumer consumer = session.createConsumer(destination);int massageTotal = 5;for (int i = 0; i < massageTotal; i++) {Message message = consumer.receive();//receive方法会导致当前线程阻塞,指导接收到消息if (message instanceof MapMessage) {MapMessage mapMessage = (MapMessage) message;//消费者接收消息。因为对方发送的 Map 消息,所以可以强转System.out.print((i + 1) + ":" + mapMessage.getStringProperty("token"));//获取属性System.out.print(":" + mapMessage.getIntProperty("expires"));//获取属性System.out.println(":" + mapMessage.getString("key"));//获取消息内容mapMessage.acknowledge();//应答/确认消息}}} catch (JMSException e) {e.printStackTrace();} finally {if (session != null) {try {session.close();//关闭会话} catch (JMSException e) {e.printStackTrace();}}if (connection != null) {try {connection.close();//关闭连接} catch (JMSException e) {e.printStackTrace();}}}}
}

JMS 基本概念、消息结构、确认模式 acknowledgeMode相关推荐

  1. RabbitMQ消息的确认模式

    确认模式 包括两种自动确认.手动确认 自动确认 只要消息从队列中获取,无论消费者获取到消息后, 是否执行成功,都认为是消息已经成功消费. 手动确认 消费者从队列中获取消息后,服务器会将该消息标记为不可 ...

  2. activemq - 浅析消息确认模式

    2019独角兽企业重金招聘Python工程师标准>>> 前言 JMS的消息确认模式,定义了客户端(消息发送者或者消费者)与broker确认消息的方式,可以认为是客户端与Broker之 ...

  3. 消息协议 AMQP 及MQTT ,STOMP,JMS的概念和基本理解

    前言 简单理解 就是需要大家都遵守的 套规 .在计算机领域中,只要涉及不同的计算机之间要共同完成一 事情的时候,就肯定会有协议的存在,就像我们说话用某种语言一样,不 同的计算机之间必 须使用相同的语 ...

  4. 【MQTT 5.0】协议 ——发布订阅模式、Qos、keepalive、连接认证、消息结构

    一.前言 1.1 MQTT 协议概述 1.2 MQTT规范 二.MQTT 协议基本概念 2.1 发布/订阅模式 2.11 MQTT 发布/订阅模式 2.12 MQTT 发布/订阅中的消息路由 2.13 ...

  5. java jms 框架_Apache RocketMQ之JMS基本概念及使用

    1.png Apache RocketMQ之JMS基本概念及使用 Apache RocketMQ 系列: 优秀博客: 介绍流程: 8.png RocketMQ 是什么? 是一个队列模型的消息中间件,具 ...

  6. [4G5G专题-57]:L2 RLC层-详解RLC架构、数据封装、三种模式:透明TM、非确认模式UM、确认模式AM

    目录 第1章  L2 RLC层的架构 1.1 RAN的架构 1.2 L2架构概述 1.3 RLC软件系统结构图 第2章 TCP/IP协议提供的三种传输服务 ​2.1 TCP 2.2 UDP 2.3 R ...

  7. 嵌入式消息订阅发布模式软件框架

    文章目录 一.总体框架 二.基于RT-Thread的SoftBus 2.1 SoftBus的由来 2.2 消息订阅者模式 2.3 静态订阅关系与动态订阅关系 2.4 C/S模式 2.5 消息订阅者模式 ...

  8. Springboot RabbitMQ 基础使用、消息发送确认、签收

    概述 rabbitMQ 会做一个系列,包括:安装.基础使用.高级队列.集群. 使用环境: jdk 8 .springboot 2.4.10 常见概念: AMQP:高级消息队列协议,这是一个消息应用的规 ...

  9. rabbitmq 启动异常_RabbitMQ:消息发送确认 与 消息接收确认(ACK)

    默认情况下如果一个 Message 被消费者所正确接收则会被从 Queue 中移除 如果一个 Queue 没被任何消费者订阅,那么这个 Queue 中的消息会被 Cache(缓存),当有消费者订阅时则 ...

  10. RabbitMQ 消息队列六种模式

    RabbitMQ 的第一个程序 RabbitMQ-生产者|消费者 搭建环境 java client 生产者和消费者都属于客户端, rabbitMQ的java客户端如下 创建 maven 工程 < ...

最新文章

  1. gradle风格的groovy代码
  2. 【Win32汇编】__declspec(naked)裸函数
  3. python mkl freebsd_freebsd下之简单安装python
  4. 构造函数、原型、继承原来这么简单?来吧,深入浅出
  5. [图解]红旗Linux6.0下如何安装VMWARE TOOLS
  6. react加水印_给网页增加水印的方法,react
  7. 关于redis性能问题分析和优化
  8. android 9.0 开机动画,小米9开机动画安装器
  9. mysql英文怎么发音_英文字母和汉语拼音先学哪个?科学答案在这里
  10. 给还在迷茫的你分享我从零基础的日语文科生半路出家搞Python如何上岸的
  11. 理解SSD核心技术FTL
  12. 【论文解读】 FPGA实现卷积神经网络CNN(二): Optimizing FPGA-based Accelerator Design for DCNN
  13. springboot 定时器使用方法之并行
  14. 图形学初步----------多边形填充算法
  15. 利用photoshop切图
  16. ipynb转换为python文件
  17. 阅读《工业革命》有感——福(氟)到了
  18. 试玩Android(一)
  19. Keil5出现Error: Flash Download failed - Could not load file ‘ces \ces.axf‘的解决方法
  20. 目标检测——标注图像(超详细步骤)

热门文章

  1. spring AOP的方式监控方法的执行时间
  2. 用 CSS 实现 Firefox 和 IE 都支持的半透明效果
  3. 生活大爆炸第7季第6集Howard写给Bernadette的歌
  4. c语言文件夹换行打印,关于文件操作,碰到空格就换行
  5. 批量生成横断面_批量生动生成填充图案的边界线
  6. 拓端tecdat|Python风险价值计算投资组合VaR(Value at Risk )、期望损失ES(Expected Shortfall)
  7. python-os库函数一些用法记录
  8. markdown首行空两格
  9. mysql建立聚族索引语句,MySQL学习教程之聚簇索引
  10. pyqtSignal()