本篇为JMS的学习笔记, 教你一步一步使用JMS,并提供一个能运行的完整的案例,可以使人达到快速入门的目的。

JMS(Java Message Service),是Java消息服务,通过JMS,可以在Java对象之间发送消息。JMS消息支持点对点之间的消息发送,也支持主题/订阅方式的消息发送。

消息服务由客户和消息代理组成。每位客户都连接到消息服务,客户可以创建消息、发送消息、接收消息、阅读消息。消息服务可以将接收到的消息转发给其他的客户。

消息服务的关键特点:客户只需要最少的信息就可以其他客户通信(需知道其他客户的提供的服务、服务所需信息和客户地址)。

消息服务使用转发-存储结构以提交异步信息。

JMS包含5个元素:
· 提供者: 负责管理消息服务的消息代理
· 客户: java编写的,利用提供者进行通信的应用程序和组件
· 消息: 在客户之间传输的对象
· 管理的对象: 传输中使用的jms对象,分两种,目标工厂(Destination Factory)对象和连接对象,用于连接消息服务,处理发送者和接收者之间的传输。
· 本机客户: 是在引入jms之前构建的应用程序,它们是采用另外一种消息系统的本机客户API
在引入jms之前,客户是用 点对点 和 订阅/发布结构

jms的5个元素:
· 管理对象: 连接工厂(Connection Factory)对象和会话对象;连接工厂对象用于创建会话对象;会话对象用于创建发送者和接收者
ConnectionFactory(QueueConnectionFactory,TopicConnectionFactoy,XAQueueConnectionFactory,XATopicConnectionFactoy)
Connection(QueueConnection,TopicConnection,XAQueueConnection,XATopicConnection)

· 会话: Session
· 消息生成者: MessageProducer(QueueSender, TopicPublisher)
· 消息使用者: MessageConsumer(QueueReciever,TopicSubscriber)
· 消息: Message

jms使用步骤:
--------------------
1 访问连接工厂:
InitialContext ctx=new InitialContext();
TopicConnectionFactory tcf=(TopicConnectionFactory) ctx.lookup(“TopicConnectionFactory”);
QueueConnectionFactory qcf=(QueueConnectionFactory) ctx.lookup(“QueueConnectionFactory”);

2 访问目标工厂:创建Queue对象/Topic对象
Topic mt=(Topic) ctx.lookup(“topic/testTopic”);
Queue mq=(Queue) ctx.lookup(“queue/A”);

3 创建连接,分两种:队列连接和主题连接
TopicConnection tc=tcf.createTopicConnection();
QueueConnection qc=qcf.createQueueConnection();

  1. 开始接收:也可以放在后面进行
    tc.start(); //以后需要调用 tc.close(); 关闭连接
    qc.start(); //以后需要调用 qc.close(); 关闭连接

  2. 创建会话:会话用于创建消息生产者、消息消费者、消息。
    QueueSession qSession=qc.createQueueSession(true,0);
    TopicSession tSession=tc.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);

参数1:是否使用事务
参数2:消息确认模式

当消息发送者向消息提供者(即消息代理)发送消息时,消息发送者等待消息代理的确认,
没有回应则抛出异常,消息发送程序负责处理这个错误。
注:消息代理确认只是确认收到了消息,而不是确认消息提交给了消息接收者。

消息确认模式:JMS使用确认协议以保证消息的发送,使用了3种确认模式
AUTO_ACKNOWLEDGE : 指定消息提供者在每次收到消息时自动发送确认。消息只向目标发送一次,但传输过程中可能因为错误而丢失消息。
CLIENT_ACKNOWLEDGE : 由消息接收者确认收到消息,通过调用消息的acknowledge()方法(会通知消息提供者收到了消息)
DUPS_OK_ACKNOWLEDGE : 指定消息提供者在消息接收者没有确认发送时重新发送消息(这种确认模式不在乎接收者收到重复的消息)。如果消息提供者在试图向非持久化的消息接收者发送消息失败时,消息会丢失。在向持久化消息接收者发送消息时,会等待消息接收者确认,未收到确认,则重新发送消息(消息提供者需要设置JMSRedilivered=true,消息接收者需要调用Message对象的getJMSRedelivered()方法确认JMSRedilivered标记是否为true)

CLIENT_ACKNOWLEDGE 确认收到消息代码:
public void onMessage(Message msg){
try{
//Process incoming messages
msg.acknowledge();
}catch(Exception e){
//handle error
}
}
消息事务: 包含一组消息,要么全部发送,要不全部都不发送给消息提供者。
消息提供者缓存消息,如果消息发送者有一个消息发送失败,则调用session.rollback()方法,则消息提供者会放弃前面发送成功的消息;如果全部发送成功,调用session.commit()方法,将消息全部发送给消息接收者。

6 创建消息生产者:
QueueSender qSender=qSession.createSender(mq);
TopicPublisher tPublisher=tSession.createPublisher(mt);
qSender.send(msg);
tPublisher.publish(msg);

7 创建消息使用者:在接收任何消息之前,客户必须注册到JMS提供者表明希望接收消息。注册后,JMS提供者就负责向客户发送消息。
QueueReceiver qr=qSession.createReceiver(mq);
qc.start();
Message msg1=qr.receive();

TopicSubscriber tSubscriber=tSession.createSubscriber(mt);
tc.start();
Message msg2=tSubscriber.receive(1500); //每隔1500毫秒从主题接收一次

8 创建消息监听器(实现MessageListener接口onMessage()方法):消息使用者通过消息监听器异步接收消息。
QueueListener qListener=new QueueListener(); //QueueListener为自定义类,实现MessageListener接口
qr.setMessageListener(qListener);

TopicListener tListener=new TopicListener(); //TopicListener为自定义类,实现MessageListener接口
tSubscriber.setMessageListener(tListener);

9 消息(Message):三部分,头(必须的),属性和正文 (二者为可选)。
消息头读写方法: getXXX(),setXXX()
XXX是字段的名字,许多消息字段是由send()和publish()方法自动设置的,其他自动则由客户或者JMS程序设置。

1) 由send() or publish()设置的:
JMSDestination
JMSDeliveryMode
JMSExpiration
JMSPriority
JMSMessageID
JMSTimestamp
JMSCorrelationID: 关联消息ID
JMSReplyTo
JMSType
JMSRedelivered
2)由客户设置的:
JMSCorrelationID: 关联消息ID
JMSReplyTo
JMSType
3) 由jms提供者设置的:
JMSRedelivered

属性读写方法: getXXX(name), setXXX(name,value)

消息正文:包含消息,JMS支持6种消息格式,称为消息类型
TextMessage: 有文本组成的String对象
MapMessage: 可以是按顺序或随机访问的 key-value对,key为String, value为primitive
BytesMessage: 字节信息(如存放图像)
StreamMessage: 包含顺序读取值的流
ObjectMessage: 可以序列化的java对象
Message: 无消息正文时可以使用

创建TextMessage消息:
TextMessage msg=qSession.createTextMessage();
msg.setText(“myMessage”);
qSender.send(msg);

读取TextMessage消息:
Message msg=qr.receive();
if(msg instanceof TextMessage){
TextMessage txtMsg=(TextMessage) msg;
System.out.println(“Imcoming message: “+txtMsg.getText());
}else{
//handle error
}

  1. 消息选择器:消息使用者使用消息选择器(message selector)选择收到的消息。消息选择器使用条件表达式(符合WHERE子句的SQL-92标准)作为选择条件。
    创建并使用消息选择器:
    String criteria=”Customer=’1234’”;
    TopicSubscriber tSubscriber=ts.createSubscriber(myTopic,criteria,false); //只从主题中接收Customer=’1234’的消息

向队列发送消息


SendToQueue.java:

package test.jms;

import java.util.Properties;
import javax.jms.*;
import javax.naming.*;

public class SendToQueue {
public static void main(String[] args) {
final int msgCount;
if( (args.length<1) || ( args.length>2)) {
System.out.println(“Usage: java test.jms.SendToQueue queueName [sendCount]”);
System.exit(1);
}

    String qName=new String(args[0]);if(args.length==2) {msgCount=(new Integer(args[1]).intValue());}else {msgCount=1;}QueueConnection qc=null;try { Properties p = new Properties();p.setProperty(Context.INITIAL_CONTEXT_FACTORY,"org.jnp.interfaces.NamingContextFactory");p.setProperty(Context.PROVIDER_URL, "localhost:1099");InitialContext jc=new InitialContext(p);QueueConnectionFactory qcf=(QueueConnectionFactory)jc.lookup("QueueConnectionFactory");Queue q=(Queue)jc.lookup(qName);qc=qcf.createQueueConnection();QueueSession qs=qc.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);QueueSender qSender=qs.createSender(q);TextMessage msg=qs.createTextMessage();for(int i=0;i<msgCount;i++) {msg.setText("Welcome number "+(i+1));qSender.send(msg);System.out.println("Send Message To "+qName+" : "+msg.getText()+"\n");}qSender.close();qs.close();}catch(Exception e) {e.printStackTrace();}finally {if(qc!=null) {try {qc.close();}catch(JMSException e) {}}}
}

}

运行: java test.jms.SendToQueue queue/A 10

从队列接收消息

ReceiveFromQueue.java:

package test.jms;

import java.io.*;
import java.util.Properties;
import javax.jms.*;
import javax.naming.*;

public class ReceiveFromQueue {
public static void doReceive(String qName) {
Message msg;
TextMessage txtMsg;

    QueueConnection qc = null;try {Properties p = new Properties();p.setProperty(Context.INITIAL_CONTEXT_FACTORY,"org.jnp.interfaces.NamingContextFactory");p.setProperty(Context.PROVIDER_URL, "localhost:1099");InitialContext jc = new InitialContext(p);QueueConnectionFactory qcf = (QueueConnectionFactory) jc.lookup("QueueConnectionFactory");Queue q = (Queue) jc.lookup(qName);qc = qcf.createQueueConnection();QueueSession qs = qc.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);QueueReceiver qr = qs.createReceiver(q);qc.start();System.out.println("begin receive messge from " + qName + "...");msg = qr.receive(1000); // 如果不加间隔参数,会一直等着,知道消息到来。 while (msg != null) {if (msg instanceof TextMessage) {txtMsg = (TextMessage) msg;System.out.println("Receive Msg from " + qName + " : "+ txtMsg.getText());}msg = qr.receive(1000);}System.out.println("no message available!");qr.close();qs.close();} catch (Exception e) {e.printStackTrace();} finally {if (qc != null) {try {qc.close();} catch (JMSException e) {}}}
}public static void doListen(String qName) {Message msg;TextMessage txtMsg;QueueConnection qc = null;try {Properties p = new Properties();p.setProperty(Context.INITIAL_CONTEXT_FACTORY,"org.jnp.interfaces.NamingContextFactory");p.setProperty(Context.PROVIDER_URL, "localhost:1099");InitialContext jc = new InitialContext(p);QueueConnectionFactory qcf = (QueueConnectionFactory) jc.lookup("QueueConnectionFactory");Queue q = (Queue) jc.lookup(qName);qc = qcf.createQueueConnection();QueueSession qs = qc.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);QueueReceiver qr = qs.createReceiver(q);qc.start();System.out.println("begin listen to messge from " + qName + "...");TextListener tListener = new TextListener();qr.setMessageListener(tListener);qc.start();System.out.println("Enter 'q' and press <return> to exit ");InputStreamReader isr = new InputStreamReader(System.in);char response = '\0';while (!((response == 'q') || (response == 'Q'))) {try {response = (char) isr.read();} catch (IOException e) {e.printStackTrace();}}System.out.println("End listening!");qr.close();qs.close();} catch (Exception e) {e.printStackTrace();} finally {if (qc != null) {try {qc.close();} catch (JMSException e) {}}}
}public static void main(String[] args) {if ((args.length != 1)) {System.out.println("Usage: java test.jms.ReceiveFromQueue queueName");System.exit(1);}String qName = new String(args[0]);doReceive(qName); //通过QueueReceiver.receive()读消息doListen(qName);    // 通过消息监听器读消息
}

}

运行: java test.jms.ReceiveFromQueue queue/A

向主题发送消息


PublishToTopic.java:
package test.jms;

import java.util.Properties;

import javax.jms.*;
import javax.naming.*;

public class PublishToTopic {
public static void main(String[] args) {
final int msgCount;
if( (args.length<1) || ( args.length>2)) {
System.out.println(“Usage: java test.jms.PublishToTopic topicName [sendCount]”);
System.exit(1);
}

    String tName=new String(args[0]);if(args.length==2) {msgCount=(new Integer(args[1]).intValue());}else {msgCount=10;}TopicConnection tc=null;try { Properties p = new Properties();p.setProperty(Context.INITIAL_CONTEXT_FACTORY,"org.jnp.interfaces.NamingContextFactory");p.setProperty(Context.PROVIDER_URL, "localhost:1099");InitialContext jc=new InitialContext(p);TopicConnectionFactory tcf=(TopicConnectionFactory)jc.lookup("TopicConnectionFactory");Topic t=(Topic)jc.lookup(tName);tc=tcf.createTopicConnection();TopicSession ts=tc.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);TopicPublisher tp=ts.createPublisher(t);TextMessage msg=ts.createTextMessage();for(int i=0;i<msgCount;i++) {msg.setText("Welcome number "+(i+1));tp.publish(msg);System.out.println("Publish Message To "+tName+" : "+msg.getText()+"\n");}tp.close();ts.close();}catch(Exception e) {e.printStackTrace();}finally {if(tc!=null) {try {tc.close();}catch(JMSException e) {}}}
}

}

运行: java -classpath d:\jboss-4.0.5\client\jnp-client.jar;d:\jboss-4.0.5\client\jbossmq-client.jar;d:\jboss-4.0.5\client\jboss-j2ee.jar;d:\jboss-4.0.5\client\jbossall-client.jar;. test.jms.PublishToTopic topic/testTopic

从主题接收消息


SubscribeFromTopic.java:
package test.jms;

import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Properties;

import javax.jms.*;
import javax.naming.*;

public class SubscribeFromTopic {
public static void doListen(String tName) {
char response=’\0’;

    TopicConnection tc=null;try { Properties p = new Properties();p.setProperty(Context.INITIAL_CONTEXT_FACTORY,"org.jnp.interfaces.NamingContextFactory");p.setProperty(Context.PROVIDER_URL, "localhost:1099");InitialContext jc=new InitialContext(p);TopicConnectionFactory tcf=(TopicConnectionFactory)jc.lookup("TopicConnectionFactory");Topic t=(Topic)jc.lookup(tName);tc=tcf.createTopicConnection();TopicSession ts=tc.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);TopicSubscriber tSubscriber=ts.createSubscriber(t);System.out.println("begin listen to messge from "+tName+"...");TextListener tListener=new TextListener();tSubscriber.setMessageListener(tListener);tc.start();System.out.println("Enter 'q' and press <return> to exit ");InputStreamReader isr=new InputStreamReader(System.in);while(!( (response=='q') || (response=='Q' ))) {try {response=(char)isr.read();}catch(IOException e) {e.printStackTrace();}}System.out.println("End listening!");tSubscriber.close();ts.close();}catch(Exception e) {e.printStackTrace();}finally {if(tc!=null) {try {tc.close();}catch(JMSException e) {}}}
}public static void doSubscribe(String tName) {Message msg;TextMessage txtMsg;TopicConnection tc=null;try { Properties p = new Properties();p.setProperty(Context.INITIAL_CONTEXT_FACTORY,"org.jnp.interfaces.NamingContextFactory");p.setProperty(Context.PROVIDER_URL, "localhost:1099");InitialContext jc=new InitialContext(p);TopicConnectionFactory tcf=(TopicConnectionFactory)jc.lookup("TopicConnectionFactory");Topic t=(Topic)jc.lookup(tName);tc=tcf.createTopicConnection();TopicSession ts=tc.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);TopicSubscriber tSubscriber=ts.createSubscriber(t);System.out.println("begin listen to messge from "+tName+"...");System.out.println("begin receive messge from " + tName + "...");msg = tSubscriber.receive(1000); while (msg != null) {if (msg instanceof TextMessage) {txtMsg = (TextMessage) msg;System.out.println("Receive Msg from " + tName + " : "+ txtMsg.getText());}msg = tSubscriber.receive(1000);}System.out.println("no message available!");tSubscriber.close();ts.close();}catch(Exception e) {e.printStackTrace();}finally {if(tc!=null) {try {tc.close();}catch(JMSException e) {}}}
}public static void main(String[] args) {if( (args.length!=1)) {System.out.println("Usage: java test.jms.SubscribeFromTopic topicName");System.exit(1);}String tName=new String(args[0]);
doListen(tName);    //通过消息监听器接收消息

// doSubscribe(tName); //通过receive()来接收消息
}
}
运行: java -classpath d:\jboss-4.0.5\client\jnp-client.jar;d:\jboss-4.0.5\client\jbossmq-client.jar;d:\jboss-4.0.5\client\jboss-j2ee.jar;d:\jboss-4.0.5\client\jbossall-client.jar;. test.jms.SubscribeFromTopic topic/testTopic

附:TextListener.java:
package test.jms;

import javax.jms.*;

public class TextListener implements MessageListener{
public void onMessage(Message msg) {
try {
if(msg instanceof TextMessage) {
TextMessage tMessage=(TextMessage)msg;
System.out.println(“In TextListener: Receive Message: “+tMessage.getText());
}else {
System.out.println(“Message Type error: “+msg.getClass().getName());
}
}catch(JMSException e) {
System.out.println(“onMessage() error: “+e.toString());
}catch(Throwable e) {
System.out.println(“onMessage() error: “+e.toString());
}
}
}

JMS入门_StepByStep相关推荐

  1. JMS入门(一)--JMS基础

    1. JMS基本概念 JMS(Java Message Service) 即Java消息服务.它提供标准的产生.发送.接收消息的接口,简化企业应用的开发.它支持两种消息通信模型:点到点(point-t ...

  2. JMS入门——开发起步之ActiveMQ

    J2EE与JMS JMS是J2EE的13种核心技术规范之一,是J2EE众多应用程序组件中的重要一员.J2EE有标准的JMS API开放,以支持各个JMS应用生产厂商的产品,开源的有jbossmq,op ...

  3. JMS(二):简单的JMS入门实例

    为什么80%的码农都做不了架构师?>>>    public class Test1 {public static void main(String[] args) throws J ...

  4. Spring集成JMS入门

    1.预备知识 在学习Spring集成JMS之前最好可以是先去了解JMS基本概念和ActiveMQ 自己学习Spring整合JMS的一些心得,希望可以帮到大家 2.Spring整合JMS 2.1.使用点 ...

  5. MQ(队列消息的入门)

    消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成,通过提供消息传递和消息排队模型,它可以在分布式环境下拓展进程间的通信,对于消息中间件,常见的角色大致也 ...

  6. 消息中间件解决方案-JMS-ActiveMQ

    一.JMS入门 1.1 JMS简介 JMS是Java平台上有关面向消息中间件的技术规范,它本身只定义了一系列的接口规范,是一种与厂商无关的API,用来访问消息收发系统.消息是JMS中的一种类型对象,由 ...

  7. JMSSpringJms

    文章目录 JMS入门 消息中间件 1. 什么是消息中间件 2. 常见的消息中间件产品 3. 品优购系统模块调用关系分析 4. 改造系统模块调用关系 JMS简介 1. 什么是JMS 2. JMS消息传递 ...

  8. Maven使用ActiveMQ

    文章目录 1. ActiveMQ JMS入门案例 1.1 环境准备 1.2 JMS-点对点模式发送消息 1.3 JMS-点对点模式接收消息 1.4 JMS-发布订阅模式-发送消息 1.5 JMS-发布 ...

  9. JMS(Java消息服务)入门教程

    什么是Java消息服务 Java消息服务指的是两个应用程序之间进行异步通信的API,它为标准消息协议和消息服务提供了一组通用接口,包括创建.发送.读取消息等,用于支持JAVA应用程序开发.在J2EE中 ...

最新文章

  1. 你离距离算法只差零点几毫米!
  2. sersync之不洗澡
  3. CNCF 官方大使张磊:Kubernetes 是一个“数据库”吗?
  4. 图片日志:泛型的作用/hashset的内部实现/常见运行期异常
  5. 实例22:python
  6. php wmi,window_Win7系统中的wmi控件是什么?有什么用?,  wmi控件是什么Windows - phpStudy...
  7. 了解NearPy,进行快速最近邻搜索
  8. java中的socket模型_Socket通信模型
  9. VHDL学习之TEXTIO在仿真中的应用
  10. 计算机组成原理R0bus是什么,计算机组成原理实验 堆栈寄存器实验
  11. c++类与对象(1)——构造,复制构造函数
  12. 【语音合成】基于matlab重叠相加法的信号分帧与还原【含Matlab源码 568期】
  13. 2022年 MathorCup 思路分享
  14. 怎么修改PDF文件内容
  15. 活动报道:语音开发技术实践:语音云 语音浏览器 即时语音聊天技术(CMDN Club15期)...
  16. 11n、11ac和11ax速率表
  17. 前端插件日常问题 整理
  18. mysql如何返回上一步_月亮上没有火箭和发射塔,嫦娥五号完成任务后,是如何起飞返回的...
  19. xpath爬取智联招聘--大数据开发职位并保存为csv
  20. springboot 整合javassist详解

热门文章

  1. 检测导航仪GPS端口的工具
  2. 新超级马里奥wii编辑器_新超级
  3. 后缀数组 LCP--模板题
  4. java 地图模式_Java设计模式之从[Dota地图]分析享元(Flyweight)模式
  5. 无聊的肥宅反编译neko atsume猫咪后院 (一)
  6. linux 磁盘格式化 恢复数据,从格式化为 exfat 的损坏 U 盘上恢复数据的记录
  7. 毕业设计 Android人脸门禁系统
  8. 各种软件系统缩写 --不断更新中 欢迎补充
  9. 最好的多目标跟踪(MOT)入门介绍!
  10. 30CrMnSiA介绍