JAVA ActiveMQ消息发送和接收
JMS即Java消息服务(Java Message Service)应用程序接口是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。
在 Java 里有 JMS 的多个实现,ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现。
JMS 定义了两种方式:Quere(点对点);Topic(发布/订阅)。
ConnectionFactory 是连接工厂,负责创建Connection。Connection 负责创建 Session。Destination 是消息的目的地。
Session 创建 MessageProducer(用来发消息) 和 MessageConsumer(用来接收消息)。
ActiveMQ的官方网址:http://activemq.apache.org。在此可以下载ActiveMQ的最新版本和阅读相关文档。
下面是使用ActiveMQ发送和接收消息的JAVA实现:
1、消息发送者
package com.jmsd;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* 说明: activemq send message
*
* @author xajava
* @version 创建时间:2012-10-24 下午1:22:40
*/
public class JmsSender {
private String USER = ActiveMQConnection.DEFAULT_USER;
private String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
private String URL = ActiveMQConnection.DEFAULT_BROKER_URL;
private String SUBJECT = "ActiveMQ.Demo";
private Destination destination = null;
private Connection conn = null;
private Session session = null;
private MessageProducer producer = null;
// 初始化
private void initialize() throws JMSException, Exception {
// 连接工厂
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USER, PASSWORD, URL);
conn = connectionFactory.createConnection();
// 事务性会话,自动确认消息
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 消息的目的地(Queue/Topic)
destination = session.createQueue(SUBJECT);
// destination = session.createTopic(SUBJECT);
// 消息的提供者(生产者)
producer = session.createProducer(destination);
// 不持久化消息
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
}
public void sendMessage(String msgType) throws JMSException, Exception {
initialize();
// 连接到JMS提供者(服务器)
conn.start();
// 发送文本消息
if ("text".equals(msgType)) {
String textMsg = "ActiveMQ Text Message!";
TextMessage msg = session.createTextMessage();
// TextMessage msg = session.createTextMessage(textMsg);
msg.setText(textMsg);
producer.send(msg);
}
// 发送Map消息
if ("map".equals(msgType)) {
MapMessage msg = session.createMapMessage();
msg.setBoolean("boolean", true);
msg.setShort("short", (short) 0);
msg.setLong("long", 123456);
msg.setString("MapMessage", "ActiveMQ Map Message!");
producer.send(msg);
}
// 发送流消息
if ("stream".equals(msgType)) {
String streamValue = "ActiveMQ stream Message!";
StreamMessage msg = session.createStreamMessage();
msg.writeString(streamValue);
msg.writeBoolean(false);
msg.writeLong(1234567890);
producer.send(msg);
}
// 发送对象消息
if ("object".equals(msgType)) {
JmsObjectMessageBean jmsObject = new JmsObjectMessageBean("ActiveMQ Object Message", 18, false);
ObjectMessage msg = session.createObjectMessage();
msg.setObject(jmsObject);
producer.send(msg);
}
// 发送字节消息
if ("bytes".equals(msgType)) {
String byteValue = "字节消息";
BytesMessage msg = session.createBytesMessage();
msg.writeBytes(byteValue.getBytes());
producer.send(msg);
}
}
// 关闭连接
public void close() throws JMSException {
if (producer != null)
producer.close();
if (session != null)
session.close();
if (conn != null)
conn.close();
}
}
2、消息接收者
package com.jmsd;
import java.util.Enumeration;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* 说明:
*
* @author xajava
* @version 创建时间:2012-10-24 下午2:06:48
*/
public class JmsReceiver implements MessageListener {
private String USER = ActiveMQConnection.DEFAULT_USER;
private String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
private String URL = ActiveMQConnection.DEFAULT_BROKER_URL;
private String SUBJECT = "ActiveMQ.Demo";
private Destination dest = null;
private Connection conn = null;
private Session session = null;
private MessageConsumer consumer = null;
private boolean stop = false;
// 初始化
private void initialize() throws JMSException, Exception {
// 连接工厂是用户创建连接的对象.
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USER, PASSWORD, URL);
// 连接工厂创建一个jms connection
conn = connectionFactory.createConnection();
// 是生产和消费的一个单线程上下文。会话用于创建消息的生产者,消费者和消息。会话提供了一个事务性的上下文。
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); // 不支持事务
// 目的地是客户用来指定他生产消息的目标还有他消费消息的来源的对象.
dest = session.createQueue(SUBJECT);
// dest = session.createTopic(SUBJECT);
// 会话创建消息的生产者将消息发送到目的地
consumer = session.createConsumer(dest);
}
/**
* 消费消息
*
* @throws JMSException
* @throws Exception
*/
public void receiveMessage() throws JMSException, Exception {
initialize();
conn.start();
consumer.setMessageListener(this);
// 等待接收消息
while (!stop) {
Thread.sleep(5000);
}
}
@SuppressWarnings("rawtypes")
@Override
public void onMessage(Message msg) {
try {
if (msg instanceof TextMessage) {
TextMessage message = (TextMessage) msg;
System.out.println("------Received TextMessage------");
System.out.println(message.getText());
} else if (msg instanceof MapMessage) {
MapMessage message = (MapMessage) msg;
System.out.println("------Received MapMessage------");
System.out.println(message.getLong("long"));
System.out.println(message.getBoolean("boolean"));
System.out.println(message.getShort("short"));
System.out.println(message.getString("MapMessage"));
System.out.println("------Received MapMessage for while------");
Enumeration enumer = message.getMapNames();
while (enumer.hasMoreElements()) {
Object obj = enumer.nextElement();
System.out.println(message.getObject(obj.toString()));
}
} else if (msg instanceof StreamMessage) {
StreamMessage message = (StreamMessage) msg;
System.out.println("------Received StreamMessage------");
System.out.println(message.readString());
System.out.println(message.readBoolean());
System.out.println(message.readLong());
} else if (msg instanceof ObjectMessage) {
System.out.println("------Received ObjectMessage------");
ObjectMessage message = (ObjectMessage) msg;
JmsObjectMessageBean jmsObject = (JmsObjectMessageBean) message.getObject();
System.out.println(jmsObject.getUserName() + "__" + jmsObject.getAge() + "__" + jmsObject.isFlag());
} else if (msg instanceof BytesMessage) {
System.out.println("------Received BytesMessage------");
BytesMessage message = (BytesMessage) msg;
byte[] byteContent = new byte[1024];
int length = -1;
StringBuffer content = new StringBuffer();
while ((length = message.readBytes(byteContent)) != -1) {
content.append(new String(byteContent, 0, length));
}
System.out.println(content.toString());
} else {
System.out.println(msg);
}
stop = true;
} catch (JMSException e) {
e.printStackTrace();
} finally {
try {
this.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
// 关闭连接
public void close() throws JMSException {
System.out.println("Consumer:->Closing connection");
if (consumer != null)
consumer.close();
if (session != null)
session.close();
if (conn != null)
conn.close();
}
}
3、对象消息
package com.jmsd;
import java.io.Serializable;
/**
* 说明: JMS 对象消息示例对象
*
* @author xajava
* @version 创建时间:2012-10-24 下午1:56:07
*/
public class JmsObjectMessageBean implements Serializable {
private static final long serialVersionUID = 2620024932905963095L;
private String userName;
private int age = 16;
private boolean flag = true;
public JmsObjectMessageBean(String userName,int age,boolean flag){
this.setUserName(userName);
this.setAge(age);
this.setFlag(flag);
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
public boolean isFlag() {
return flag;
}
public void setFlag(boolean flag) {
this.flag = flag;
}
}
4、测试类
package com.jmsd;
import javax.jms.JMSException;
/**
* 说明:
*
* @author xajava
* @version 创建时间:2012-10-22 下午4:33:17
*/
public class Test {
public static void main(String[] args) throws JMSException, Exception {
JmsSender sender = new JmsSender();
JmsReceiver receiver = new JmsReceiver();
sender.sendMessage("bytes");
sender.close();
receiver.receiveMessage();
receiver.close();
}
}
package com.jmsd;
import javax.jms.JMSException;
/**
* 说明:
*
* @author xajava
* @version 创建时间:2012-10-22 下午4:33:17
*/
public class Test {
public static void main(String[] args) throws JMSException, Exception {
JmsSender sender = new JmsSender();
JmsReceiver receiver = new JmsReceiver();
sender.sendMessage("bytes");
sender.close();
receiver.receiveMessage();
receiver.close();
}
}
package com.jmsd;
import javax.jms.JMSException;
/**
* 说明:
*
* @author xajava
* @version 创建时间:2012-10-22 下午4:33:17
*/
public class Test {
public static void main(String[] args) throws JMSException, Exception {
JmsSender sender = new JmsSender();
JmsReceiver receiver = new JmsReceiver();
sender.sendMessage("bytes");
sender.close();
receiver.receiveMessage();
receiver.close();
}
}
JAVA ActiveMQ消息发送和接收相关推荐
- java activeMQ消息的发送与接收
java activeMQ消息的发送与接收 activemq是我们经常用到的消息队列之一,比如说速度快,对spring的很好的支持,支持多种协议等等,今天我们就来看一下activeMQ消息的发送与接收 ...
- javasocket连续给服务器发送消息,Java通过Socket发送和接收多条消息
我们需要实现一个Socket客户端,它应该连接到一个接受TCP连接的服务器.如果我通过netcap与服务器进行通信,我会立即得到它的响应(通过命令行).Java通过Socket发送和接收多条消息 的工 ...
- RabbitMQ消息发送和接收
1.RabbitMQ的消息发送和接受机制 所有 MQ 产品从模型抽象上来说都是一样的过程: 消费者(consumer)订阅某个队列.生产者(producer)创建消息,然后发布到队列(queue)中, ...
- Java实现消息发送
消息发送和接收演示 接下来我们使用Java代码来演示消息的发送和接收 <dependency><groupId>org.apache.rocketmq</groupId& ...
- python 网络编程之Socket通信案例消息发送与接收
背景 网络编程是python编程中的一项基本技术.本文将实现一个简单的Socket通信案例消息发送与接收 正文 在python中的socket编程的大致流程图如上所示 我们来首先编写客户端的代码: # ...
- go 实现 kafka 消息发送、接收
引言 网络上关于 go 实现 kafka 消息发送和接收的文章很多,但是实际操作起来又不是很清楚,本文在网络资源的基础上,结合自己搭建过程中遇到的问题进行了总结. 本文的实验主机:Mac笔记本. 一. ...
- 使用Akka持久化——消息发送与接收
版权声明:本文为博主原创文章,未经博主允许不得转载. https://blog.csdn.net/beliefer/article/details/53929751 前言 在<使用Akka持久化 ...
- linux ibm mq 安装,消息发送与接收
下载地址 http://public.dhe.ibm.com/ibmdl/export/pub/software/websphere/messaging/mqadv/ 安装 1.2 解压并安装 1.2 ...
- RabbitMQ——使用Exchange中的fanout交换机实现消息发送和接收
文章目录: 1.写在前面 2.使用fanout交换机实现消息的发送和接收 2.1 编写消息接收类(有两个) 2.2 编写消息发送类 1.写在前面 所有 MQ 产品从模型抽象上来说都是一样的过程: 消费 ...
最新文章
- 怎样在swift中创建CocoaPods
- 前端书签归纳(持续更新)
- 每日一皮:CPU的新用途,要不要试试?
- 闲话WPF之五(XAML中的类型转换)
- openresty开发系列34--openresty执行流程之4访问阶段
- SQL Server扩展事件(Extended Events)-- 将现有 SQL 跟踪脚本转换为扩展事件会话
- 如何理解Eating这个词?云原生与微服务专场介绍
- python拆分合并文件_Python 视频文件的分割和合并
- Python《回车桌面图片》
- 使用Metal打造令人惊叹的游戏效果
- CRM Online Outlook Client Configuration Wizard
- Python-初体验
- java中的定时任务
- 在python中datetime使用中如何识别上月同期日期
- Window Live Writer Test
- h2os android版本,h2os属于安卓系统吗
- 【在线图表生成】掌握这些图表,年终报表根本不用愁!
- LANMT架构搭建jspxcms
- 水浒卡牌 《梦回水浒》抽卡大变革~
- 【LDAP】在Centos7环境搭建LDAP服务端