目录

JMS API 结构与开发步骤

非持久 Topic 消息

持久化 Topic 消息


JMS API 结构与开发步骤

JMS 开发的基本步骤如下:

1、创建一个 JMS Connection Factory

2、通过 Connection Factory 创建 JMS Connection

3、启动 JMS Connection

4、通过 JMS Connection 创建 JMS Session

5、创建 JMS Destination(目的地)

6、创建 JMS Producer,或者创建 JMS Message,并设置 destination

7、创建 JMS Consumer,或者注册一个 JMS message listener

8、发送或者接收 JMS message

9、关闭所有的 JMS 资源(connection、session、producer、consumer等)

下面通过具体的代码进行演示,分别介绍持久化与非持久化的 topic ,环境:Maven 3.6.1 + Java JDK 1.8 + ActiveMQ 5.15.9 + IDEA 2018 ,Mava 管理的 Java SE 应用,它们的 pom.xml 文件都是一样的,导入 activemq-all 依赖即可:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.wmx</groupId>
<artifactId>activeMQ1</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies><dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-all</artifactId><version>5.15.9</version></dependency>
</dependencies>
</project>

《ActiveMQ 简介 与 Maven 项目基本使用》中已经介绍过消息队列 queue,即使消费者没有在运行中,生产者发送的消息,都会在消费者下次启动时获取到,所以这里持久化与非持久化主要针对主题 topic。

非持久 Topic 消息

1、对于非持久化的 Topic 消息的发送:基本与发送队列一样,只需要把创建 Destination 的地方,由创建队列(queue)改成 Topic,例如:Destination destination = session.createTopic("topic-app");

2、对于非持久的 Topic 消息的接收:

1)必须要求接收方在线,然后客户端再发送消息,接收方才能接收到消息。

2)同样把创建 Destination 的地方,由创建 queue 改为创建 topic。

3)由于不知道客户端发送了多少 topic 消息,因此可以改为 while 循环的方式接收。

4)同一个 topic ,某个消费者接收一次后,它不能再消费此消息,其它未消费的客户端可以继续接收此消息。

3、非持久化消息生产者将消息发送到中间件后,无论消息有没有被消费,默认情况下,再次启动中间件时,消息就没有了。

生产者代码如下:

import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.util.UUID;
/*** 非持久化 topic(主题) 消息生产者*/
@SuppressWarnings("all")
public class NoPersistenceTopicSender {public static void main(String[] args) {Connection connection = null;Session session = null;MessageProducer messageProducer = null;try {String brokerURL = "tcp://127.0.0.1:61616";//ActiveMQ 中间件连接地址//创建 javax.jms.ConnectionFactory 连接工厂ConnectionFactory mqConnectionFactory = new ActiveMQConnectionFactory(brokerURL);//如果 ActiveMQ 连不上,则抛异常:java.net.ConnectException: Connection refused: connectconnection = mqConnectionFactory.createConnection();//通过连接工厂获取连接 javax.jms.Connectionconnection.start();//启动连接,同理还有 stop、close//创建 session 会话,设置开启事务,消息确认模式为自动确认session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);//创建消息主题(topic),主题名称自己定义。Queue 、Topic 都是 Destination 的字接口Destination destination = session.createTopic("topic-app");messageProducer = session.createProducer(destination);//根据目的地创建消息生产者int massageTotal = 5;for (int i = 0; i < massageTotal; i++) {TextMessage textMessage = session.createTextMessage("密文" + (i + 1) + ":" + UUID.randomUUID());messageProducer.send(textMessage);//生产者发送消息}session.commit();//批量会话提交。此时消息会被正式发送到中间件System.out.println("消息发送完毕...");} catch (JMSException e) {e.printStackTrace();} finally {try {if (session != null) {session.close();//关闭会话}if (connection != null) {connection.close();//关闭连接}if (messageProducer != null) {messageProducer.close();//关闭生产者}} catch (JMSException e) {e.printStackTrace();}}}
}

消费者代码如下:

import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/*** 非持久化 topic(主题) 消息消费者*/
@SuppressWarnings("all")
public class NoPersistenceTopicReceiver {public static void main(String[] args) {Connection connection = null;Session session = null;MessageConsumer messageConsumer = null;try {String brokerURL = "tcp://127.0.0.1:61616";//ActiveMQ 中间件连接地址ConnectionFactory mqConnectionFactory = new ActiveMQConnectionFactory(brokerURL);//如果 ActiveMQ 连不上,则抛异常:java.net.ConnectException: Connection refused: connectconnection = mqConnectionFactory.createConnection();//通过连接工厂获取连接 javax.jms.Connectionconnection.start();//启动连接,同理还有 stop、close//创建会话 session。开启事务,消息确认模式为自动确认session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);//创建消息主题(topic),主题名称与生产者设置的保持一致。Queue 、Topic 都是 Destination 的字接口Destination destination = session.createTopic("topic-app");messageConsumer = session.createConsumer(destination);//根据目的地创建消息消费者Message message = messageConsumer.receive();//receive方法会导致当前线程阻塞,直到接收到消息while (message != null) {if (message instanceof TextMessage) {TextMessage textMessage = (TextMessage) message;System.out.println("收到消息:" + textMessage.getText());session.commit();//确认消息。//接收消息时设置超时时间,单位为毫秒。如果为0,则等同于 receive()一致阻塞。//如果超过超时时间,仍然未接收到消息,则返回 null。while 会推出message = messageConsumer.receive(3000);}}} catch (JMSException e) {e.printStackTrace();} finally {try {if (session != null) {session.close();//关闭会话}if (connection != null) {connection.close();//关闭连接}if (messageConsumer != null) {messageConsumer.close();//关闭消费者}} catch (JMSException e) {e.printStackTrace();}}}
}

启动 activeMQ 中间件,测试如下:

非持久化 topic 消息时,如果先运行生产者发送消息,再运行消费者接收消息,那么消费者是接收不到之前发送的消息的,只能接收订阅主题时间点之后的消息。

持久化 Topic 消息

1、对于持久化 topic 的发送:

1)在非持久的基础上设置传递模式(JMSDeliveryMode)(缺省时,默认是非持久):

producer.setDeliveryMode(DeliveryMode.PERSISTENT); //设置所有 message 采用同样的传递模式
    Message.setJMSDeliveryMode(DeliveryMode.PERSISTENT); //为每个 messge 分别设置传递模式

2)设置完生产者的传递模式后,再启动连接:connection.start();

3)对于发送的消息,activeMQ 默认情况下,不管消息有没有被投递/消费,activeMQ 重启后,消息就都没有了。

2、对于持久化 topic 的接收:

1)需要在连接上设置消费者 id,用于标识消费者:connection.setClientID("clientID_100");

2)创建 TopicSubscriber 来订阅,而不再是 MessageConsumer:session.createDurableSubscriber(destination,"ds_1");

3)设置好主题订阅后,再启动连接:connection.start();

4)订阅者需要先运行一次,表示向消息中间件注册自己,然后运行客户端发送消息,此时无论订阅者是否在线,都能接收到,即使当时订阅者不在线,下次启动连接的时候,也会把没有收到的消息都接收下来。

5)对于注册成功了的持久化订阅者,ActiveMQ 默认情况下,重启之后,仍然保存了注册了的订阅者,无需再次注册。

import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.util.UUID;
/*** 持久化 topic(主题) 消息生产者*/
@SuppressWarnings("all")
public class PersistenceTopicSender {public static void main(String[] args) {Connection connection = null;Session session = null;MessageProducer messageProducer = null;try {String brokerURL = "tcp://127.0.0.1:61616";//ActiveMQ 中间件连接地址//创建 javax.jms.ConnectionFactory 连接工厂ConnectionFactory mqConnectionFactory = new ActiveMQConnectionFactory(brokerURL);//如果 ActiveMQ 连不上,则抛异常:java.net.ConnectException: Connection refused: connectconnection = mqConnectionFactory.createConnection();//通过连接工厂获取连接 javax.jms.Connection//创建 session 会话,设置开启事务,消息确认模式为自动确认session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);//创建消息主题(topic),主题名称自己定义。Queue 、Topic 都是 Destination 的子接口Destination destination = session.createTopic("topic-app-2");messageProducer = session.createProducer(destination);//根据目的地创建消息生产者/**设置消息传递模式为持久化,不写时默认为非持久化。DeliveryMode.NON_PERSISTENT* 设置完生产者的传递模式后,再启动连接:connection.start();* */messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);connection.start();//启动连接,同理还有 stop、closeint massageTotal = 5;for (int i = 0; i < massageTotal; i++) {TextMessage textMessage = session.createTextMessage("密码" + (i + 1) + ":" + UUID.randomUUID());messageProducer.send(textMessage);//生产者发送消息}session.commit();//批量会话提交。此时消息会被正式发送到中间件System.out.println("消息发送完毕...");} catch (JMSException e) {e.printStackTrace();} finally {try {if (session != null) {session.close();//关闭会话}if (connection != null) {connection.close();//关闭连接}if (messageProducer != null) {messageProducer.close();//关闭生产者}} catch (JMSException e) {e.printStackTrace();}}}
}
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/*** 持久化 topic(主题) 消息消费者*/
@SuppressWarnings("all")
public class PersistenceTopicReceiver {public static void main(String[] args) {Connection connection = null;Session session = null;TopicSubscriber topicSubscriber = null;try {String brokerURL = "tcp://127.0.0.1:61616";//ActiveMQ 中间件连接地址ConnectionFactory mqConnectionFactory = new ActiveMQConnectionFactory(brokerURL);//如果 ActiveMQ 连不上,则抛异常:java.net.ConnectException: Connection refused: connectconnection = mqConnectionFactory.createConnection();//通过连接工厂获取连接 javax.jms.Connection/**为持久订阅设置客户端id,这样即使订阅者不在线,消息中心也能在它下次上线时将消息投递给它*/connection.setClientID("clientID_100");//创建会话 session。开启事务,消息确认模式为自动确认session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);//创建消息主题(topic),主题名称与生产者设置的保持一致。Queue 、Topic 都是 Destination 的子接口Topic topic = session.createTopic("topic-app-2");/**createDurableSubscriber(Topic var1, String var2):创建持久订阅,var1 是订阅对象,var2 是持久订阅名称,自定义即可* 主题订阅接口 TopicSubscriber 继承了 MessageConsumer 消息消费者接口*/topicSubscriber = session.createDurableSubscriber(topic,"ds-1");//根据目的地创建消息消费者connection.start();/**设置了主题订阅后,再启动连接*/System.out.println("订阅者启动成功...");Message message = topicSubscriber.receive();//receive方法会导致当前线程阻塞,直到接收到消息while (message != null) {if (message instanceof TextMessage) {TextMessage textMessage = (TextMessage) message;System.out.println("收到消息:" + textMessage.getText());session.commit();//确认消息。//接收消息时设置超时时间,单位为毫秒。如果为0,则等同于 receive()一致阻塞。//如果超过超时时间,仍然未接收到消息,则返回 null。while 会推出message = topicSubscriber.receive(3000);}}} catch (JMSException e) {e.printStackTrace();} finally {try {if (session != null) {session.close();//关闭会话}if (connection != null) {connection.close();//关闭连接}if (topicSubscriber != null) {topicSubscriber.close();//关闭消费者}} catch (JMSException e) {e.printStackTrace();}}}
}

JMS 开发步骤、持久化 topic 消息与非持久化 topic 消息相关推荐

  1. JMS学习(五)--ActiveMQ中的消息的持久化和非持久化 以及 持久订阅者 和 非持久订阅者之间的区别与联系...

    一,消息的持久化和非持久化 ①DeliveryMode 这是传输模式.ActiveMQ支持两种传输模式:持久传输和非持久传输(persistent and non-persistent deliver ...

  2. java jms消息删除_activemq的topic消息一直都会存在数据库中,为什么不会删除

    该楼层疑似违规已被系统折叠 隐藏此楼查看此楼 我把消息设置的持久化到数据库.采用topic的方式发送消息. 每次发送消息之后,数据库的ACTIVEMQ_MSGS表里都会插入10条记录(每次是发送10条 ...

  3. Kafka是什么,JMS是什么,常见的类JMS消息服务器,为什么需要消息队列(来自学习笔记)

    1.Kafka是什么  Apache Kafka是一个开源消息系统,由Scala写成.是由Apache软件基金会开发的一个开源消息系统项目.  Kafka最初是由LinkedIn开发,并于2011 ...

  4. 虚拟字符设备驱动开发步骤

    目录 前言 字符设备驱动简介 内核驱动操作函数集合(file_operations结构体) 字符设备驱动开发步骤 .ko驱动模块的加载和卸载(module_init驱动入口.insmod驱动加载) 字 ...

  5. rabbitmq 持久化_RabbitMQ原理与相关操作(三)消息持久化

    现在聊一下RabbitMQ消息持久化: 问题及方案描述 1.当有多个消费者同时收取消息,且每个消费者在接收消息的同时,还要处理其它的事情,且会消耗很长的时间.在此过程中可能会出现一些意外,比如消息接收 ...

  6. 看完就入门系列!吞吐量、消息持久化、负载均衡和持久化、伸缩性…… 你真的了解 Kafka 了吗?...

    作者| liuhehe123 来源| CSDN博客 责编| Carol 出品| CSDN云计算(ID:CSDNcloud) 封图| CSDN下载于视觉中国 无论是已经接触过 Kafka 还是刚入坑的小 ...

  7. WCF4.0新特性体验(9):非破坏性队列消息接收(Non-destructive queue receive )

    这次来介绍一下WCF4.0新特性体验(9):非破坏性队列接收(Non-destructive queue receive ).这个特性不是那么直观.确切来说是WCF4.0对于以前处理MSMQ消息队列机 ...

  8. Topic路由表- IoT 设备一对多消息通信实战

    在企业物联网项目中经常会遇到一对多消息分发需求,即一个消息发布者对应多个消息订阅者的场景(1:N).IoT企业物联网平台提供了消息路由表能力,可以轻松实现一对多场景的消息实时同步,达到秒级延迟性能. ...

  9. RocketMQ源码(八)Broker asyncSendMessage处理消息以及自动创建Topic

    此前已经梳理了RocketMQ的broker接收Producer消息的入口源码RocketMQ(七)broker接收消息入口源码_代码---小白的博客-CSDN博客 在文章的最后我们到了SendMes ...

  10. 微信公众号开发步骤(一)

    微信公众号主要有以下几个步骤 微信公众号的通讯机制 微信公众号简介 1.注册微信公众号 2.注册测试公众号 3.搭建微信本地调试环境 1)下载客户端natapp: 2)安装natapp: 4.微信公众 ...

最新文章

  1. 通俗易懂:快速理解ipv4的NAT穿透原理
  2. 【报错信息】Google Play 上架报错 ( Your app contains ads that do not comply with our Families ad | 退出亲子同乐计划 )
  3. oracle锁表导致update,delete,insert出现无反应的问题
  4. 自己实现spring核心功能 二
  5. 【PP生产订单】入门介绍(十三)
  6. 微信小程序技巧-让特定组件首页始终展示修改编译条件即可,不用改json
  7. Dubbo xml配置 和注解配置 写法
  8. Oracle GoldenGate经典架构
  9. itest英语考试bug_itest(爱测试) 4.2.1 发布,开源BUG 跟踪管理 敏捷测试管理软件...
  10. JavaSE04、什么是类和对象,如何使用?
  11. matlab求线性规划最大值,matlab线性规划算例
  12. 服务器系统在虚拟机安装win7系统安装教程,vmware虚拟机如何安装win7系统_vmware虚拟机安装win7纯净版图文教程...
  13. HDU3404POJ3533(Nim积摸版)
  14. Excel绘制排名变化曲线图(折线图),附源文件
  15. 老台式电脑怎么连热点_台式电脑怎样连接wifi热点,教你一招快速连接
  16. 实体消歧方法(1)__BOOTLEG
  17. 004_Makefile的编译
  18. VS2013使用技巧汇总
  19. 视频教程-Windows Server 2016 IPAM 服务管理-Windows
  20. java语言画图_Java语言实现画图工具

热门文章

  1. 实习笔记 6: 测试技巧,json序列化对象
  2. 【转】 叫人起床的学问
  3. tcp/ip协议listen函数中backlog参数的含义
  4. spark 添加依赖_单机用python写spark处理20G的数据
  5. 拓端tecdat|R语言使用Metropolis-Hastings采样算法自适应贝叶斯估计与可视化
  6. Java web技术及应用答案_超星《Java Web应用开发技术》答案
  7. JSP教程第7、8讲笔记
  8. Ubuntu 18.04 8T硬盘挂载
  9. ubuntu16.04安装驱动
  10. caffe的python接口学习(2):生成solver文件