ActiveMQ简述
欢迎支持笔者新作:《深入理解Kafka:核心设计与实践原理》和《RabbitMQ实战指南》,同时欢迎关注笔者的微信公众号:朱小厮的博客。
欢迎跳转到本文的原文链接:https://honeypps.com/mq/activemq-quick-start/
##概述
ActiveMQ是Apache所提供的一个开源的消息系统,完全采用Java来实现,因此,它能很好地支持J2EE提出的JMS(Java Message Service,即Java消息服务)规范。JMS是一组Java应用程序接口,它提供消息的创建、发送、读取等一系列服务。JMS提供了一组公共应用程序接口和响应的语法,类似于Java数据库的统一访问接口JDBC,它是一种与厂商无关的API,使得Java程序能够与不同厂商的消息组件很好地进行通信。
JMS支持两种消息发送和接收模型。一种称为P2P(Ponit to Point)模型,即采用点对点的方式发送消息。P2P模型是基于队列的,消息生产者发送消息到队列,消息消费者从队列中接收消息,队列的存在使得消息的异步传输称为可能,P2P模型在点对点的情况下进行消息传递时采用。
另一种称为Pub/Sub(Publish/Subscribe,即发布-订阅)模型,发布-订阅模型定义了如何向一个内容节点发布和订阅消息,这个内容节点称为topic(主题)。主题可以认为是消息传递的中介,消息发布这将消息发布到某个主题,而消息订阅者则从主题订阅消息。主题使得消息的订阅者与消息的发布者互相保持独立,不需要进行接触即可保证消息的传递,发布-订阅模型在消息的一对多广播时采用。
##ActiveMQ的安装
下载最新的安装包apache-activemq-5.13.2-bin.tar.gz(此包linux下的,案例也是针对linux系统进行阐述,当然ActiveMQ也有win版的,这里就不赘述了),可以去官网下载,也可以在下方留言区留下你的邮箱,博主会发给你的~
下载之后解压: tar -zvxf apache-activemq-5.13.2-bin.tar.gz
ActiveMQ目录内容有:
- bin目录包含ActiveMQ的启动脚本
- conf目录包含ActiveMQ的所有配置文件
- data目录包含日志文件和持久性消息数据
- example: ActiveMQ的示例
- lib: ActiveMQ运行所需要的lib
- webapps: ActiveMQ的web控制台和一些相关的demo
运行命令:activemq start(在activemq/bin下运行)
INFO: Loading '/users/shr/apache-activemq-5.13.2//bin/env'
INFO: Using java '/users/shr/util/JavaDir/jdk/bin/java'
INFO: Starting - inspect logfiles specified in logging.properties and log4j.properties to get details
INFO: pidfile created : '/users/shr/apache-activemq-5.13.2//data/activemq.pid' (pid '986')
查看activemq是否运行命令:ps -aux | grep activemq
shr 986 1.2 9.7 1281720 201936 pts/5 Sl 19:43 0:17 /users/shr/util/JavaDir/jdk/bin/java -Xms64M -Xmx1G -Djava.util.logging.config.file=logging.properties -Djava.security.auth.login.config=/users/shr/apache-activemq-5.13.2//conf/login.config -Dcom.sun.management.jmxremote -Djava.awt.headless=true -Djava.io.tmpdir=/users/shr/apache-activemq-5.13.2//tmp -Dactivemq.classpath=/users/shr/apache-activemq-5.13.2//conf:/users/shr/apache-activemq-5.13.2//../lib/: -Dactivemq.home=/users/shr/apache-activemq-5.13.2/ -Dactivemq.base=/users/shr/apache-activemq-5.13.2/ -Dactivemq.conf=/users/shr/apache-activemq-5.13.2//conf -Dactivemq.data=/users/shr/apache-activemq-5.13.2//data -jar /users/shr/apache-activemq-5.13.2//bin/activemq.jar start
shr 1501 0.0 0.0 5176 724 pts/5 S+ 20:06 0:00 grep activemq
关闭命令: activemq stop
INFO: Loading '/users/shr/apache-activemq-5.13.2//bin/env'
INFO: Using java '/users/shr/util/JavaDir/jdk/bin/java'
INFO: Waiting at least 30 seconds for regular process termination of pid '986' :
Java Runtime: Oracle Corporation 1.7.0_79 /users/shr/util/JavaDir/jdk1.7.0_79/jreHeap sizes: current=63232k free=62218k max=932096kJVM args: -Xms64M -Xmx1G -Djava.util.logging.config.file=logging.properties -Djava.security.auth.login.config=/users/shr/apache-activemq-5.13.2//conf/login.config -Dactivemq.classpath=/users/shr/apache-activemq-5.13.2//conf:/users/shr/apache-activemq-5.13.2//../lib/: -Dactivemq.home=/users/shr/apache-activemq-5.13.2/ -Dactivemq.base=/users/shr/apache-activemq-5.13.2/ -Dactivemq.conf=/users/shr/apache-activemq-5.13.2//conf -Dactivemq.data=/users/shr/apache-activemq-5.13.2//data
Extensions classpath:[/users/shr/apache-activemq-5.13.2/lib,/users/shr/apache-activemq-5.13.2/lib/camel,/users/shr/apache-activemq-5.13.2/lib/optional,/users/shr/apache-activemq-5.13.2/lib/web,/users/shr/apache-activemq-5.13.2/lib/extra]
ACTIVEMQ_HOME: /users/shr/apache-activemq-5.13.2
ACTIVEMQ_BASE: /users/shr/apache-activemq-5.13.2
ACTIVEMQ_CONF: /users/shr/apache-activemq-5.13.2/conf
ACTIVEMQ_DATA: /users/shr/apache-activemq-5.13.2/data
Connecting to pid: 986
..Stopping broker: localhost
.. TERMINATED
ActiveMQ的默认服务端口为61616,这个可以在conf/activemq.xml配置文件中修改:
<transportConnectors><!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB --><transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
##案例
在下载的apache-activemq-5.13.2-bin.tar.gz包中解压有一个jar包:activemq-all-5.13.2.jar,引入这个jar到你的项目中即可开始编写案例代码。
博主的activemq服务器地址为10.10.195.187,这个在下面代码中会有体现。
按照JMS的规范,我们首先需要获得一个JMS connection factory.,通过这个connection factory来创建connection.在这个基础之上我们再创建session, destination, producer和consumer。因此主要的几个步骤如下:
- 获得JMS connection factory. 通过我们提供特定环境的连接信息来构造factory。
- 利用factory构造JMS connection
- 启动connection
- 通过connection创建JMS session.
- 指定JMS destination.
- 创建JMS producer或者创建JMS message并提供destination.
- 创建JMS consumer或注册JMS message listener.
- 发送和接收JMS message.
- 关闭所有JMS资源,包括connection, session, producer, consumer等。
下面来看代码举例(P2P式)。
通过Java实现的基于ActiveMQ的请求提交:
package com.zzh.activemq;import java.io.Serializable;
import java.util.HashMap;import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;public class RequestSubmit
{//消息发送者private MessageProducer producer;//一个发送或者接受消息的线程private Session session;public void init() throws Exception{//ConnectionFactory连接工厂,JMS用它创建连接ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,ActiveMQConnection.DEFAULT_PASSWORD,"tcp://10.10.195.187:61616");//Connection:JMS客户端到JMS Provider的连接,从构造工厂中得到连接对象Connection connection = connectionFactory.createConnection();//启动connection.start();//获取连接操作session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);Destination destinatin = session.createQueue("RequestQueue");//得到消息生成(发送)者producer = session.createProducer(destinatin);//设置不持久化producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);}public void submit(HashMap<Serializable,Serializable> requestParam) throws Exception{ObjectMessage message = session.createObjectMessage(requestParam);producer.send(message);session.commit();}public static void main(String[] args) throws Exception{RequestSubmit submit = new RequestSubmit();submit.init();HashMap<Serializable,Serializable> requestParam = new HashMap<Serializable,Serializable>();requestParam.put("朱小厮", "zzh");submit.submit(requestParam);}
}
创建Session时有两个非常重要的参数,第一个boolean类型的参数用来表示是否采用事务消息。如果是事务消息,对于的参数设置为true,此时消息的提交自动有comit处理,消息的回滚则自动由rollback处理。加入消息不是事务的,则对应的该参数设置为false,此时分为三种情况:
- Session.AUTO_ACKNOWLEDGE表示Session会自动确认所接收到的消息。
- Session.CLIENT_ACKNOWLEDGE表示由客户端程序通过调用消息的确认方法来确认所接收到的消息。
- Session.DUPS_OK_ACKNOWLEDGE使得Session将“懒惰”地确认消息,即不会立即确认消息,这样有可能导致消息重复投递。
提供Java实现的基于ActiveMQ的请求处理:
package com.zzh.activemq;import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.ObjectMessage;
import javax.jms.Session;import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;public class RequestProcessor
{public void requestHandler(HashMap<Serializable,Serializable> requestParam) throws Exception{System.out.println("requestHandler....."+requestParam.toString());for(Map.Entry<Serializable, Serializable> entry : requestParam.entrySet()){System.out.println(entry.getKey()+":"+entry.getValue());}}public static void main(String[] args) throws Exception{ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,ActiveMQConnection.DEFAULT_PASSWORD,"tcp://10.10.195.187:61616");Connection connection = connectionFactory.createConnection();connection.start();Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);Destination destination = session.createQueue("RequestQueue");//消息消费(接收)者MessageConsumer consumer = session.createConsumer(destination);RequestProcessor processor = new RequestProcessor();while(true){ObjectMessage message = (ObjectMessage) consumer.receive(1000);if(null != message){System.out.println(message);HashMap<Serializable,Serializable> requestParam = (HashMap<Serializable,Serializable>) message.getObject();processor.requestHandler(requestParam);}else{break;}}}
}
输出结果:
ActiveMQObjectMessage {commandId = 6, responseRequired = false, messageId = ID:hidden-PC-58748-1460550507055-1:1:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:hidden-PC-58748-1460550507055-1:1:1:1, destination = queue://RequestQueue, transactionId = TX:ID:hidden-PC-58748-1460550507055-1:1:1, expiration = 0, timestamp = 1460550507333, arrival = 0, brokerInTime = 1460550505969, brokerOutTime = 1460550509143, correlationId = null, replyTo = null, persistent = false, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@74a456bb, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false}
requestHandler.....{朱小厮=zzh}
朱小厮:zzh
可以通过页面查看队列的使用情况,在浏览器中输入http://10.10.195.187:8161/admin/queues.jsp,用户名和密码都是:admin,看到以下页面:
这个是在jetty服务器下跑的,可以修改conf/jetty.xml来修改相关jetty配置。
上面的例子是关于P2P模式的,不过有个不妥之处,就是没有资源的释放。下面举一个Pub/Sub模式的。
通过JMS创建ActiveMQ的topic,并给topic发送消息:
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.camel.Produce;public class TopicRequest
{//消息发送者private MessageProducer producer;//一个发送或者接受消息的线程private Session session;//Connection:JMS客户端到JMS Provider的连接private Connection connection;public void init() throws Exception{//ConnectionFactory连接工厂,JMS用它创建连接ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,ActiveMQConnection.DEFAULT_PASSWORD,"tcp://10.10.195.187:61616");//从构造工厂中得到连接对象connection = connectionFactory.createConnection();//启动connection.start();//获取连接操作session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);Topic topic = session.createTopic("MessageTopic");producer = session.createProducer(topic);//设置不持久化producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);}public void submit(String mess) throws Exception{TextMessage message = session.createTextMessage();message.setText(mess);producer.send(message);}public void close(){try{if(session != null)session.close();if(producer != null)producer.close();if(connection !=null )connection.close();}catch (JMSException e){e.printStackTrace();}}public static void main(String[] args) throws Exception{TopicRequest topicRequest = new TopicRequest();topicRequest.init();topicRequest.submit("I'm first");topicRequest.close();}
}
消息发送到对应的topic后,需要将listener注册到需要订阅的topic上,以便能够接收该topic的消息:
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;public class TopicReceive
{private MessageConsumer consumer;private Session session;public void init() throws Exception{ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,ActiveMQConnection.DEFAULT_PASSWORD,"tcp://10.10.195.187:61616");Connection connection = connectionFactory.createConnection();connection.start();session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);Topic topic = session.createTopic("MessageTopic");consumer = session.createConsumer(topic);consumer.setMessageListener(new MessageListener(){@Overridepublic void onMessage(Message message){TextMessage tm = (TextMessage) message;System.out.println(tm);try{System.out.println(tm.getText());}catch (JMSException e){e.printStackTrace();}}});}public static void main(String[] args) throws Exception{TopicReceive receive = new TopicReceive();receive.init();}
}
输出结果:
ActiveMQTextMessage {commandId = 5, responseRequired = false, messageId = ID:hidden-PC-50073-1460597487065-1:1:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:hidden-PC-50073-1460597487065-1:1:1:1, destination = topic://MessageTopic, transactionId = null, expiration = 0, timestamp = 1460597487308, arrival = 0, brokerInTime = 1460597487297, brokerOutTime = 1460597487298, correlationId = null, replyTo = null, persistent = false, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@2e4d3abf, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = I'm first}
I'm first
参考文献
- 《大型分布式网站架构——设计与实践》陈康贤著。
- http://activemq.apache.org/
欢迎跳转到本文的原文链接:https://honeypps.com/mq/activemq-quick-start/
欢迎支持笔者新作:《深入理解Kafka:核心设计与实践原理》和《RabbitMQ实战指南》,同时欢迎关注笔者的微信公众号:朱小厮的博客。
ActiveMQ简述相关推荐
- ActiveMQ简单使用介绍
2019独角兽企业重金招聘Python工程师标准>>> 1. ActiveMQ简述 ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线.ActiveMQ 是一个完 ...
- ActiveMQ相关存储介绍
ActiveMQ 的存储: ActiveMQ 在 queue 中存储 Message 时,采用先进先出顺序(FIFO)存储.同一时间一个消息被分派给单个消费者,且只有当 Message 被消费并确认时 ...
- Sping+ActiveMQ整合
欢迎支持笔者新作:<深入理解Kafka:核心设计与实践原理>和<RabbitMQ实战指南>,同时欢迎关注笔者的微信公众号:朱小厮的博客. 欢迎跳转到本文的原文链接:https: ...
- 消息中间件(Kafka/RabbitMQ)收录集
本篇主要整理工作中遇到的一些消息中间件的相关知识,包括Kafka, RabbitMQ, RocketMQ, ActiveMQ等,不排除收录其他消息中间件的可能. 这里会持续收录相关知识,包括安装.部署 ...
- 消息队列之ActiveMQ安装配置
简述:ActiveMQ是由Apache出品的,一款最流行的,能力强劲的开源消息总线.ActiveMQ是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,它非常快速,支持多 ...
- ActiveMQ消费者平滑关闭
平滑关闭的思路就是让正在执行的任务线程正常执行完毕,然后再关闭JVM.在JVM关闭之前触发一个shutdown hook,jvm自带这个hook,在java启动时候就可以注册这样的hook. ##1. ...
- activemq中怎么知道推送消息是否成功_ActiveMQ安装试用示列
ActiveMQ安装配置和使用简例 ActiveMQ是一套JMS(Java Message Service)开源消息服务实现的组件.以Windows操作系统为例,本文简述了ActiveMQ的安装配置和 ...
- activemq消息丢失_Kafka or RabbitMQ:消息中间件选型深入分析
消息中间件选型深入分析 --从Kafka与RabbitMQ的对比来看全局 有很多网友留言:公司要做消息中间件选型,该如何选?你觉得哪个比较好?消息选型的确是一个大论题,实则说来话长的事情又如何长话短说 ...
- linux activemq修改端口号,linux下 activemq集群配置
1.简述:回想老王打电话讲故事案例. 2.优势:解耦,异步,横向扩展,顺序保障,安全可靠... 3.JMS(java message service),是java平台中关于面向消息中间件的API,用于 ...
最新文章
- Git 命令行的使用
- 程序世界的秘密(下)(完)
- poj1222开关问题
- roobo机器人怎么唱歌_日本“观音”机器人问世,可以陪僧人念佛经
- 网络工程中,VLAN到底有什么作用?
- 科目三中模拟灯光使用考试常见的错误 广州学车网光大国际驾校学车
- 让敏捷的回顾会议变得有趣而高效
- android学习笔记---46视频刻录的实现,视频录像器。
- rstudio r语言_如何在R中接受用户输入?
- 使用Pycharm运行TensorFlow,Virtualenv安装TensorFlow
- aref无效 lisp_Common Lisp专题4:数组
- 中山大学计算机学院离散数学,中大信科院计算机复试专业课离散数学.pdf
- Python 查询 MAC 地址相关信息
- 改变世界面貌的十个数学公式
- 2022 美亚杯 资格赛 赛后复盘 题解
- 一个RGB数据采集实例学习matlab_GUI界面设计
- Linux C聊天室的实现
- java软件测试技术栈
- 【利用VBA批量处理中望CAD的修改打印出PDF】
- 电子科技大学计算机学刘峰林,康昭 - 电子科技大学 - 计算机科学与工程学院
热门文章
- linux删除文件夹提示没找到,Win10中遇到删除文件夹提示找不到该项目的解决过程...
- pytorch教程龙曲良36-40
- Linux--网络编程
- 电脑无故弹出yyy102.html网页的解决办法(没办法,今天中招了)
- Just for fun——go实现一下观察者模式
- sed 学习笔记(未完成)
- hadoop之 YARN配置参数剖析—RM与NM相关参数
- 牛客 - Across the Firewall(最大流)
- CodeForces - 1366D Two Divisors(数论)
- 中石油训练赛 - Swapity Swap(矩阵快速幂)