ActiveMQ使用分为两大块:生产者和消费者
一、准备
项目导入jar包:activemq-all-5.15.3.jar
并buildpath 
二、生产者
  1. 创建连接工厂
ActiveMQConnectionFactory mqf = new ActiveMQConnectionFactory(userName, password, brokerURL);

注:
userName是ActiveMQ的用户名,默认可以通过:ActiveMQConnection.DEFAULT_USER
password是ActiveMQ的密码,默认可以通过: ActiveMQConnection.DEFAULT_PASSWORD
brokerURL是ActiveMQ的连接,指定格式为:tcp://主机名:61616
  1. 获取连接
connection = mqf.createConnection();

  1. 生成会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

  1. 生成对应的topic
Destination destination = session.createTopic("mytopic");

  1. 创建生产者
MessageProducer producer = session.createProducer(destination);

  1. 设置发送消息使用的模式
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

默认是:DeliveryMode.PERSISTENT
  1. 生成消息
TextMessage msg = session.createTextMessage(“message");

  1. 启动连接
connection.start();

  1. 发送消息
producer.send(msg);

  1. 关闭生产者
producer.close();

  1. 关闭会话
session.close();

  1. 关闭连接
connection.close();

三、消费者
  1. 继承接口
MessageListener
ExceptionListener
并实现onException(JMSException exception)和onMessage(Message message)方法
  1. 创建连接工厂
ActiveMQConnectionFactory mqf = new ActiveMQConnectionFactory(userName, password, brokerURL);

具体参数同上
  1. 获取连接
Connection connection = mqf.createConnection();

  1. 生成会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

  1. 生成对应的topic
Destination destination = session.createTopic("mytopic”);

  1. 创建消费者
MessageConsumer consumer = session.createConsumer(destination);

  1. 启动连接
connection.start();

  1. 设置消息监听
consumer.setMessageListener(this);

  1. 设置异常监听
connection.setExceptionListener(this);

  1. 实现onMessage方法
改方法有一个参数Message message,这个参数是从ActiveMQ上拿到的消息,可以通过如下方法解析出来:
TextMessage tm = (TextMessage)message;
String result = tm.getText();

  1. 关闭消费者
consumer.close();

  1. 关闭会话
session.close();

  1. 关闭连接
connection.close();

四、例程
  1. 生产者实现程序
 1 package activemq_test;
 2
 3 import javax.jms.Connection;
 4 import javax.jms.DeliveryMode;
 5 import javax.jms.Destination;
 6 import javax.jms.JMSException;
 7 import javax.jms.MessageProducer;
 8 import javax.jms.Session;
 9 import javax.jms.TextMessage;
10
11 import org.apache.activemq.ActiveMQConnection;
12 import org.apache.activemq.ActiveMQConnectionFactory;
13
14 public class Producer_tool {
15
16     private final static String userName = ActiveMQConnection.DEFAULT_USER;
17     private final static String password = ActiveMQConnection.DEFAULT_PASSWORD;
18     private final static String brokerURL = "tcp://192.168.0.5:61616";
19     private MessageProducer producer = null;
20     private Connection connection = null;
21     private Session session = null;
22
23     public void initialize() throws JMSException {
24         ActiveMQConnectionFactory mqf = new ActiveMQConnectionFactory(userName, password, brokerURL);
25         connection = mqf.createConnection();
26         session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
27         Destination destination = session.createTopic("mytopic");
28         producer = session.createProducer(destination);
29         producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
30     }
31
32     public void send(String message) throws JMSException {
33         initialize();
34         TextMessage msg = session.createTextMessage(message);
35         System.out.println("sending message: " + message);
36         connection.start();
37         producer.send(msg);
38     }
39
40     public void close() throws JMSException {
41         if(producer != null) {
42             producer.close();
43         }
44         if(session != null) {
45             session.close();
46         }
47         if(connection != null) {
48             connection.close();
49         }
50         System.out.println("closed");
51     }
52
53 }

  1. 生产者主程序
 1 package activemq_test;
 2 import javax.jms.JMSException;
 3 public class Producer_test {
 4     public static void main(String[] args) throws JMSException {
 5         Producer_tool producer = null;
 6         for(int i = 0; i < 10; i++) {
 7             producer = new Producer_tool();
 8             producer.send("message" + i);
 9             producer.close();
10         }
11     }
12 }

  1. 消费者实现程序
 1 package activemq_test;
 2
 3 import javax.jms.Connection;
 4 import javax.jms.Destination;
 5 import javax.jms.ExceptionListener;
 6 import javax.jms.JMSException;
 7 import javax.jms.Message;
 8 import javax.jms.MessageConsumer;
 9 import javax.jms.MessageListener;
10 import javax.jms.Session;
11 import javax.jms.TextMessage;
12
13 import org.apache.activemq.ActiveMQConnection;
14 import org.apache.activemq.ActiveMQConnectionFactory;
15
16 public class Consumer_tool implements MessageListener,ExceptionListener{
17
18     private final static String userName = ActiveMQConnection.DEFAULT_USER;
19     private final static String password = ActiveMQConnection.DEFAULT_PASSWORD;
20     private final static String brokerURL = "tcp://192.168.0.5:61616";
21     private Connection connection = null;
22     private Session session = null;
23     private MessageConsumer consumer = null;
24     static boolean isConnection = false;
25
26     public void initialize() throws JMSException {
27         ActiveMQConnectionFactory mqf = new ActiveMQConnectionFactory(userName, password, brokerURL);
28         connection = mqf.createConnection();
29         session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
30         Destination destination = session.createTopic("mytopic");
31         consumer = session.createConsumer(destination);
32     }
33
34     public void consumeMessage() throws JMSException {
35         initialize();
36         connection.start();
37         consumer.setMessageListener(this);
38         connection.setExceptionListener(this);
39         isConnection = true;
40         System.out.println("consumer is listening");
41
42     }
43
44     @Override
45     public void onException(JMSException exception) {
46         isConnection = false;
47     }
48
49     @Override
50     public void onMessage(Message message) {
51         if(message instanceof TextMessage) {
52             TextMessage tm = (TextMessage)message;
53             try {
54                 System.out.println("consumer received " + tm.getText());
55             } catch (JMSException e) {
56                 e.printStackTrace();
57             }
58         }
59         else {
60             System.out.println(message);
61         }
62     }
63
64     public void close() throws JMSException {
65         if(consumer != null) {
66             consumer.close();
67         }
68         if(session != null) {
69             session.close();
70         }
71         if(connection != null) {
72             connection.close();
73         }
74         System.out.println("consumer has closed");
75     }
76 }

  1. 消费者主程序
 1 package activemq_test;
 2 import javax.jms.JMSException;
 3 public class Consumer_test {
 4     public static void main(String[] args) throws JMSException {
 5         Consumer_tool consumer = new Consumer_tool();
 6         consumer.consumeMessage();
 7         while(Consumer_tool.isConnection) {
 8
 9         }
10         consumer.close();
11     }
12 }

转载于:https://www.cnblogs.com/xiatianyu/p/9055647.html

ActiveMQ的使用相关推荐

  1. activeMQ 本地测试

    参考博主 搭建~ https://www.cnblogs.com/jaycekon/p/6225058.html ActiveMQ官网下载地址:http://activemq.apache.org/d ...

  2. ActiveMQ依赖JDK版本关系

    1.如何查看官方发布的activeMQ依赖的JDK版本 1)以ActiveMQ 5.15.2 Release为例:在下载页面的Change Log处, 2)打开下载号的jar包,以activemq-a ...

  3. ActiveMQ—消息特性(延迟和定时消息投递)

    ActiveMQ消息特性:延迟和定时消息投递(Delay and Schedule Message Delivery) 转自:http://blog.csdn.net/kimmking/article ...

  4. ActiveMQ—安装配置及使用

    安装配置及使用 转自:http://blog.csdn.net/qq_21033663/article/details/52461543 (一)ActiveMQ介绍 ActiveMQ 是Apache出 ...

  5. ActiveMQ—Queue与Topic区别

    Queue与Topic区别 转自:http://blog.csdn.net/qq_21033663/article/details/52458305 队列(Queue)和主题(Topic)是JMS支持 ...

  6. activemq 开启监听_ActiveMQ 消息监听 MessageListener 的使用

    刚学 ActiveMQ, 最开始搭建环境的时候引入的jar 包,几个核心的jar jms.jar, httpcore.jar , httpclient.jar, activemq-all.jar 准备 ...

  7. 在CentOS 6.3 64bit上安装ActiveMQ 5.15.9实录

    1.下载安装ActiveMQ ActiveMQ 5的官网下载地址为 http://activemq.apache.org/components/classic/download/ 我这里选择Unix版 ...

  8. activemq的学习,第一篇

    本地的activemq的地址: http://localhost:8161/admin/ win10的启动avtivemq E:\Program Files\ActiveMQ\apache-activ ...

  9. ActiveMQ在C#中的应用

    ActiveMQ是个好东东,不必多说.ActiveMQ提供多种语言支持,如Java, C, C++, C#, Ruby, Perl, Python, PHP等.由于我在windows下开发GUI,比较 ...

  10. activemq 消息阻塞优化和消息确认机制优化

    一.消息阻塞优化 1.activemq消费者在从待消费队列中获取消息是会先进行预读取,默认是1000条(prefetch=1000).这样很容易造成消息积压. 2.可以通过设置prefetch的默认值 ...

最新文章

  1. poj 1329(求三角形外接圆)
  2. 操作多个表_8_不等值连接
  3. node-red教程 7dashboard简介与输入型仪表板控件的使用
  4. .net core 实现基于 JSON 的多语言
  5. 前端学习(2382):js编码规范说明
  6. python单元测试mock_单元测试-具有多次调用方法的Python Mock对象
  7. php 合并 js css,PHP实现合并多个JS和CSS文件示例
  8. 从0到1搭建RPC框架
  9. Illustrator 教程,如何将 Illustrator 文档另存为 PDF?
  10. 家长或孩子图像的Gif动画小人在少儿学习软件中的妙用.
  11. Dkhadoop虚拟机安装操作演示教程
  12. 帮助你免于失业的十大软件技术
  13. BUUCTF------相册
  14. Python日期时间格式转换
  15. 写给15岁的女儿-乐嘉
  16. ofbiz UOM Conversion Relationship Not Found [单位转化关系没有找到] 问题解决:
  17. Hadoop系列之什么是Hadoop(1)
  18. kotlin和java相互转换的实操
  19. 【mysql】limit实现分页
  20. 应用商店打开服务器错误,应用商店出错的修复方法

热门文章

  1. Python爬虫基础面试题为2020年初大学生就业做准备(文末附教程)
  2. 埃夫特冲刺科创板 国产工业机器人领军者谋“换道超车”
  3. 一个项目中能提出哪些数据库优化_如何有效进行项目集管理?
  4. 什么样的数据适合缓存?
  5. 052_Drawer抽屉
  6. 015_Vue生命周期
  7. php PDO 浮点数返回,php – 如何在PDO中简单地返回对象?
  8. Android Volley完全解析1:初识Volley的基本用法
  9. 现在计算机学什么好找工作吗,计算机专业都学什么 毕业好找工作吗
  10. java 空语句_Java空语句怎么写才正确?这样的Java基础知识才是你需要的