ActiveMQ的使用
- 创建连接工厂
ActiveMQConnectionFactory mqf = new ActiveMQConnectionFactory(userName, password, brokerURL);
- 获取连接
connection = mqf.createConnection();
- 生成会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- 生成对应的topic
Destination destination = session.createTopic("mytopic");
- 创建生产者
MessageProducer producer = session.createProducer(destination);
- 设置发送消息使用的模式
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- 生成消息
TextMessage msg = session.createTextMessage(“message");
- 启动连接
connection.start();
- 发送消息
producer.send(msg);
- 关闭生产者
producer.close();
- 关闭会话
session.close();
- 关闭连接
connection.close();
- 继承接口
- 创建连接工厂
ActiveMQConnectionFactory mqf = new ActiveMQConnectionFactory(userName, password, brokerURL);
- 获取连接
Connection connection = mqf.createConnection();
- 生成会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- 生成对应的topic
Destination destination = session.createTopic("mytopic”);
- 创建消费者
MessageConsumer consumer = session.createConsumer(destination);
- 启动连接
connection.start();
- 设置消息监听
consumer.setMessageListener(this);
- 设置异常监听
connection.setExceptionListener(this);
- 实现onMessage方法
TextMessage tm = (TextMessage)message; String result = tm.getText();
- 关闭消费者
consumer.close();
- 关闭会话
session.close();
- 关闭连接
connection.close();
- 生产者实现程序
1 package activemq_test; 2 3 import javax.jms.Connection; 4 import javax.jms.DeliveryMode; 5 import javax.jms.Destination; 6 import javax.jms.JMSException; 7 import javax.jms.MessageProducer; 8 import javax.jms.Session; 9 import javax.jms.TextMessage; 10 11 import org.apache.activemq.ActiveMQConnection; 12 import org.apache.activemq.ActiveMQConnectionFactory; 13 14 public class Producer_tool { 15 16 private final static String userName = ActiveMQConnection.DEFAULT_USER; 17 private final static String password = ActiveMQConnection.DEFAULT_PASSWORD; 18 private final static String brokerURL = "tcp://192.168.0.5:61616"; 19 private MessageProducer producer = null; 20 private Connection connection = null; 21 private Session session = null; 22 23 public void initialize() throws JMSException { 24 ActiveMQConnectionFactory mqf = new ActiveMQConnectionFactory(userName, password, brokerURL); 25 connection = mqf.createConnection(); 26 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 27 Destination destination = session.createTopic("mytopic"); 28 producer = session.createProducer(destination); 29 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 30 } 31 32 public void send(String message) throws JMSException { 33 initialize(); 34 TextMessage msg = session.createTextMessage(message); 35 System.out.println("sending message: " + message); 36 connection.start(); 37 producer.send(msg); 38 } 39 40 public void close() throws JMSException { 41 if(producer != null) { 42 producer.close(); 43 } 44 if(session != null) { 45 session.close(); 46 } 47 if(connection != null) { 48 connection.close(); 49 } 50 System.out.println("closed"); 51 } 52 53 }
- 生产者主程序
1 package activemq_test; 2 import javax.jms.JMSException; 3 public class Producer_test { 4 public static void main(String[] args) throws JMSException { 5 Producer_tool producer = null; 6 for(int i = 0; i < 10; i++) { 7 producer = new Producer_tool(); 8 producer.send("message" + i); 9 producer.close(); 10 } 11 } 12 }
- 消费者实现程序
1 package activemq_test; 2 3 import javax.jms.Connection; 4 import javax.jms.Destination; 5 import javax.jms.ExceptionListener; 6 import javax.jms.JMSException; 7 import javax.jms.Message; 8 import javax.jms.MessageConsumer; 9 import javax.jms.MessageListener; 10 import javax.jms.Session; 11 import javax.jms.TextMessage; 12 13 import org.apache.activemq.ActiveMQConnection; 14 import org.apache.activemq.ActiveMQConnectionFactory; 15 16 public class Consumer_tool implements MessageListener,ExceptionListener{ 17 18 private final static String userName = ActiveMQConnection.DEFAULT_USER; 19 private final static String password = ActiveMQConnection.DEFAULT_PASSWORD; 20 private final static String brokerURL = "tcp://192.168.0.5:61616"; 21 private Connection connection = null; 22 private Session session = null; 23 private MessageConsumer consumer = null; 24 static boolean isConnection = false; 25 26 public void initialize() throws JMSException { 27 ActiveMQConnectionFactory mqf = new ActiveMQConnectionFactory(userName, password, brokerURL); 28 connection = mqf.createConnection(); 29 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 30 Destination destination = session.createTopic("mytopic"); 31 consumer = session.createConsumer(destination); 32 } 33 34 public void consumeMessage() throws JMSException { 35 initialize(); 36 connection.start(); 37 consumer.setMessageListener(this); 38 connection.setExceptionListener(this); 39 isConnection = true; 40 System.out.println("consumer is listening"); 41 42 } 43 44 @Override 45 public void onException(JMSException exception) { 46 isConnection = false; 47 } 48 49 @Override 50 public void onMessage(Message message) { 51 if(message instanceof TextMessage) { 52 TextMessage tm = (TextMessage)message; 53 try { 54 System.out.println("consumer received " + tm.getText()); 55 } catch (JMSException e) { 56 e.printStackTrace(); 57 } 58 } 59 else { 60 System.out.println(message); 61 } 62 } 63 64 public void close() throws JMSException { 65 if(consumer != null) { 66 consumer.close(); 67 } 68 if(session != null) { 69 session.close(); 70 } 71 if(connection != null) { 72 connection.close(); 73 } 74 System.out.println("consumer has closed"); 75 } 76 }
- 消费者主程序
1 package activemq_test; 2 import javax.jms.JMSException; 3 public class Consumer_test { 4 public static void main(String[] args) throws JMSException { 5 Consumer_tool consumer = new Consumer_tool(); 6 consumer.consumeMessage(); 7 while(Consumer_tool.isConnection) { 8 9 } 10 consumer.close(); 11 } 12 }
转载于:https://www.cnblogs.com/xiatianyu/p/9055647.html
ActiveMQ的使用相关推荐
- activeMQ 本地测试
参考博主 搭建~ https://www.cnblogs.com/jaycekon/p/6225058.html ActiveMQ官网下载地址:http://activemq.apache.org/d ...
- ActiveMQ依赖JDK版本关系
1.如何查看官方发布的activeMQ依赖的JDK版本 1)以ActiveMQ 5.15.2 Release为例:在下载页面的Change Log处, 2)打开下载号的jar包,以activemq-a ...
- ActiveMQ—消息特性(延迟和定时消息投递)
ActiveMQ消息特性:延迟和定时消息投递(Delay and Schedule Message Delivery) 转自:http://blog.csdn.net/kimmking/article ...
- ActiveMQ—安装配置及使用
安装配置及使用 转自:http://blog.csdn.net/qq_21033663/article/details/52461543 (一)ActiveMQ介绍 ActiveMQ 是Apache出 ...
- ActiveMQ—Queue与Topic区别
Queue与Topic区别 转自:http://blog.csdn.net/qq_21033663/article/details/52458305 队列(Queue)和主题(Topic)是JMS支持 ...
- activemq 开启监听_ActiveMQ 消息监听 MessageListener 的使用
刚学 ActiveMQ, 最开始搭建环境的时候引入的jar 包,几个核心的jar jms.jar, httpcore.jar , httpclient.jar, activemq-all.jar 准备 ...
- 在CentOS 6.3 64bit上安装ActiveMQ 5.15.9实录
1.下载安装ActiveMQ ActiveMQ 5的官网下载地址为 http://activemq.apache.org/components/classic/download/ 我这里选择Unix版 ...
- activemq的学习,第一篇
本地的activemq的地址: http://localhost:8161/admin/ win10的启动avtivemq E:\Program Files\ActiveMQ\apache-activ ...
- ActiveMQ在C#中的应用
ActiveMQ是个好东东,不必多说.ActiveMQ提供多种语言支持,如Java, C, C++, C#, Ruby, Perl, Python, PHP等.由于我在windows下开发GUI,比较 ...
- activemq 消息阻塞优化和消息确认机制优化
一.消息阻塞优化 1.activemq消费者在从待消费队列中获取消息是会先进行预读取,默认是1000条(prefetch=1000).这样很容易造成消息积压. 2.可以通过设置prefetch的默认值 ...
最新文章
- poj 1329(求三角形外接圆)
- 操作多个表_8_不等值连接
- node-red教程 7dashboard简介与输入型仪表板控件的使用
- .net core 实现基于 JSON 的多语言
- 前端学习(2382):js编码规范说明
- python单元测试mock_单元测试-具有多次调用方法的Python Mock对象
- php 合并 js css,PHP实现合并多个JS和CSS文件示例
- 从0到1搭建RPC框架
- Illustrator 教程,如何将 Illustrator 文档另存为 PDF?
- 家长或孩子图像的Gif动画小人在少儿学习软件中的妙用.
- Dkhadoop虚拟机安装操作演示教程
- 帮助你免于失业的十大软件技术
- BUUCTF------相册
- Python日期时间格式转换
- 写给15岁的女儿-乐嘉
- ofbiz UOM Conversion Relationship Not Found [单位转化关系没有找到] 问题解决:
- Hadoop系列之什么是Hadoop(1)
- kotlin和java相互转换的实操
- 【mysql】limit实现分页
- 应用商店打开服务器错误,应用商店出错的修复方法
热门文章
- Python爬虫基础面试题为2020年初大学生就业做准备(文末附教程)
- 埃夫特冲刺科创板 国产工业机器人领军者谋“换道超车”
- 一个项目中能提出哪些数据库优化_如何有效进行项目集管理?
- 什么样的数据适合缓存?
- 052_Drawer抽屉
- 015_Vue生命周期
- php PDO 浮点数返回,php – 如何在PDO中简单地返回对象?
- Android Volley完全解析1:初识Volley的基本用法
- 现在计算机学什么好找工作吗,计算机专业都学什么 毕业好找工作吗
- java 空语句_Java空语句怎么写才正确?这样的Java基础知识才是你需要的