转:http://wosyingjun.iteye.com/blog/2314681

ActiveMQ的简单使用

ActiveMQ是一种开源的,实现了JMS规范的,面向消息(MOM)的中间件,为应用程序提供高效的、可扩展的、稳定的和安全的企业级消息通信。

相关文章:
范例项目: http://wosyingjun.iteye.com/blog/2312553 
ActiveMQ集群高可用方案:http://wosyingjun.iteye.com/blog/2314683

ActiveMQ组成:

ActiveMQ接发送消息流程图:

一. ActiveMQ的安装和配置

1、官网下载Linux版的ActiveMQ(最新版本为5.13.4)
https://activemq.apache.org/download.html
2、解压安装
tar -zxvf apache-activemq-5.13.4-bin.tar.gz
3、配置(这里采用默认配置,无需修改)
vim /usr/lical/activemq-1/conf/activemq.xml
4、启动
cd /usr/local/activemq-1/bin
./activemq start
5、打开管理界面(管理界面可以查看并管理所有队列及消息)
http://192.168.1.100:8161

二. Spring结合ActiveMQ使用

1、pom文件引入依赖
Xml代码  
  1. <!--active mq start-->
  2. <dependency>
  3. <groupId>org.apache.activemq</groupId>
  4. <artifactId>activemq-core</artifactId>
  5. <version>5.7.0</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>org.apache.activemq</groupId>
  9. <artifactId>activemq-pool</artifactId>
  10. <version>5.13.3</version>
  11. </dependency>
  12. <!--active mq end-->
2、spring-mq配置文件
Xml代码  
  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <beans xmlns="http://www.springframework.org/schema/beans"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="
  5. http://www.springframework.org/schema/beans
  6. http://www.springframework.org/schema/beans/spring-beans.xsd">
  7. <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->
  8. <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
  9. <!-- ActiveMQ服务地址 -->
  10. <property name="brokerURL" value="${mq.brokerURL}"/>
  11. <property name="userName" value="${mq.userName}"></property>
  12. <property name="password" value="${mq.password}"></property>
  13. <!-- 这里定义重试策略,注意:只有持久化的才会重试-->
  14. <property name="redeliveryPolicyMap" ref="redeliveryPolicyMap"/>
  15. </bean>
  16. <!--这里设置各个消息队列的重发机制-->
  17. <bean id="redeliveryPolicyMap" class="org.apache.activemq.broker.region.policy.RedeliveryPolicyMap">
  18. <property name="redeliveryPolicyEntries">
  19. <list>
  20. <ref bean="bizRedeliveryPolicy"/>
  21. <ref bean="mailRedeliveryPolicy"/>
  22. </list>
  23. </property>
  24. </bean>
  25. <bean id="bizRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
  26. <!--重发次数 延时、延时系数、延时指数开关、目标(重发等待时间1s, 2s, 4s, 8s)-->
  27. <property name="maximumRedeliveries" value="3"/>
  28. <property name="redeliveryDelay" value="1000"/>
  29. <property name="backOffMultiplier" value="2"/>
  30. <property name="useExponentialBackOff" value="true"/>
  31. <property name="destination" ref="bizQueue"/>
  32. </bean>
  33. <bean id="mailRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
  34. <!--重发次数 延时、延时系数、延时指数开关-->
  35. <property name="maximumRedeliveries" value="2"/>
  36. <property name="redeliveryDelay" value="5000"/>
  37. <property name="destination" ref="mailQueue"/>
  38. </bean>
  39. <!--
  40. ActiveMQ为我们提供了一个PooledConnectionFactory,通过往里面注入一个ActiveMQConnectionFactory
  41. 可以用来将Connection、Session和MessageProducer池化,这样可以大大的减少我们的资源消耗。
  42. 要依赖于 activemq-pool包
  43. -->
  44. <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
  45. <property name="connectionFactory" ref="targetConnectionFactory"/>
  46. <property name="maxConnections" value="${mq.pool.maxConnections}"/>
  47. </bean>
  48. <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
  49. <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
  50. <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
  51. <property name="targetConnectionFactory" ref="pooledConnectionFactory"/>
  52. <property name="reconnectOnException" value="true"/>
  53. </bean>
  54. <!-- 队列目的地-->
  55. <bean id="bizQueue" class="org.apache.activemq.command.ActiveMQQueue">
  56. <constructor-arg index="0" value="${biz.queueName}"/>
  57. </bean>
  58. <bean id="mailQueue" class="org.apache.activemq.command.ActiveMQQueue">
  59. <constructor-arg index="0" value="${mail.queueName}"/>
  60. </bean>
  61. <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
  62. <!-- 队列模板 这里配置2个,一个用于分布式业务,一个用于发送邮件-->
  63. <bean id="bizMqJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
  64. <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
  65. <property name="connectionFactory" ref="connectionFactory"/>
  66. <property name="defaultDestination" ref="bizQueue"/>
  67. <!-- 使 deliveryMode, priority, timeToLive设置生效-->
  68. <property name="explicitQosEnabled" value="true" />
  69. <!-- 持久化 如果设置为非持久化MQ服务器重启后MQ中的数据会丢失-->
  70. <property name="deliveryPersistent" value="true"/>
  71. <!--这里注意:如果不开启事务,消息在异常的情况下是不会重试的-->
  72. <property name="sessionTransacted" value="false"/>
  73. </bean>
  74. <bean id="mailMqJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
  75. <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
  76. <property name="connectionFactory" ref="connectionFactory"/>
  77. <property name="defaultDestination" ref="mailQueue"/>
  78. <!-- 使 deliveryMode, priority, timeToLive设置生效-->
  79. <property name="explicitQosEnabled" value="true" />
  80. <!-- 持久化 如果设置为非持久化MQ服务器重启后MQ中的数据会丢失-->
  81. <property name="deliveryPersistent" value="true"/>
  82. <!--这里注意:如果不开启事务,消息在异常的情况下是不会重试的-->
  83. <property name="sessionTransacted" value="true"/>
  84. </bean>
  85. <!-- 消息监听实现方法一 -->
  86. <bean id="bizListener" class="com.yingjun.ssm.mq.listener.TransactionBizMessageListener"/>
  87. <bean id="mailListener" class="com.yingjun.ssm.mq.listener.MailMessageListener"/>
  88. <!-- 消息接收监听器用于异步接收消息-->
  89. <bean id="bizContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
  90. <property name="connectionFactory" ref="connectionFactory"/>
  91. <property name="destination" ref="bizQueue"/>
  92. <property name="messageListener" ref="bizListener"/>
  93. <!--这里注意:如果不开启事务,消息在异常的情况下是不会重试的-->
  94. <property name="sessionTransacted" value="true"/>
  95. <property name="concurrentConsumers" value="1"/>
  96. </bean>
  97. <bean id="mailContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
  98. <property name="connectionFactory" ref="connectionFactory"/>
  99. <property name="destination" ref="mailQueue"/>
  100. <property name="messageListener" ref="mailListener"/>
  101. <!--这里注意:如果不开启事务,消息在异常的情况下是不会重试的-->
  102. <property name="sessionTransacted" value="true"/>
  103. <property name="concurrentConsumers" value="1"/>
  104. </bean>
  105. </beans>
3、重试机制以及死信的配置
Xml代码  
  1. <!--这里设置各个消息队列的重发机制-->
  2. <bean id="redeliveryPolicyMap" class="org.apache.activemq.broker.region.policy.RedeliveryPolicyMap">
  3. <property name="redeliveryPolicyEntries">
  4. <list>
  5. <ref bean="bizRedeliveryPolicy"/>
  6. <ref bean="mailRedeliveryPolicy"/>
  7. </list>
  8. </property>
  9. </bean>
  10. <bean id="bizRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
  11. <!--重发次数 延时、延时系数、延时指数开关、目标(重发等待时间1s, 2s, 4s, 8s)-->
  12. <property name="maximumRedeliveries" value="3"/>
  13. <property name="redeliveryDelay" value="1000"/>
  14. <property name="backOffMultiplier" value="2"/>
  15. <property name="useExponentialBackOff" value="true"/>
  16. <property name="destination" ref="bizQueue"/>
  17. </bean>
  18. <bean id="mailRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
  19. <!--重发次数 延时、延时系数、延时指数开关-->
  20. <property name="maximumRedeliveries" value="2"/>
  21. <property name="redeliveryDelay" value="5000"/>
  22. <property name="destination" ref="mailQueue"/>
  23. </bean>
4、发送端代码
Java代码  
  1. package com.yingjun.ssm.biz;
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.yingjun.ssm.common.model.BizOperator;
  4. import org.junit.Test;
  5. import org.junit.runner.RunWith;
  6. import org.slf4j.Logger;
  7. import org.slf4j.LoggerFactory;
  8. import org.springframework.beans.factory.annotation.Autowired;
  9. import org.springframework.jms.core.JmsTemplate;
  10. import org.springframework.jms.core.MessageCreator;
  11. import org.springframework.test.context.ContextConfiguration;
  12. import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
  13. import javax.jms.JMSException;
  14. import javax.jms.Message;
  15. import javax.jms.Session;
  16. /**
  17. * @author yingjun
  18. */
  19. @RunWith(SpringJUnit4ClassRunner.class)
  20. @ContextConfiguration("classpath:application.xml")
  21. public class Application {
  22. private final Logger log = LoggerFactory.getLogger(Application.class);
  23. @Autowired
  24. private JmsTemplate bizMqJmsTemplate;
  25. @Test
  26. public void mailSend() throws Exception {
  27. bizMqJmsTemplate.setSessionTransacted(true);
  28. for (int i = 0; i < 1; i++) {
  29. log.info("==>send message" + i);
  30. bizMqJmsTemplate.send(new MessageCreator() {
  31. @Override
  32. public Message createMessage(Session session) throws JMSException {
  33. log.info("getTransacted:" + session.getTransacted());
  34. BizOperator operator = new BizOperator("testDistributedTransaction", 1001);
  35. return session.createTextMessage(JSONObject.toJSONString(operator));
  36. }
  37. });
  38. log.info("==>finish send message"+ i);
  39. }
  40. while (true) {
  41. }
  42. }
  43. }
5、接受端代码
Java代码  
  1. package com.yingjun.ssm.mq.listener;
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.yingjun.ssm.common.model.BizOperator;
  4. import com.yingjun.ssm.mq.biz.TransactionBizService;
  5. import org.apache.activemq.command.ActiveMQTextMessage;
  6. import org.slf4j.Logger;
  7. import org.slf4j.LoggerFactory;
  8. import org.springframework.beans.factory.annotation.Autowired;
  9. import org.springframework.jms.listener.SessionAwareMessageListener;
  10. import org.springframework.stereotype.Component;
  11. import javax.jms.JMSException;
  12. import javax.jms.Message;
  13. import javax.jms.Session;
  14. /**
  15. *
  16. * @author yingjun
  17. */
  18. @Component
  19. public class TransactionBizMessageListener implements SessionAwareMessageListener<Message> {
  20. private static final Logger log = LoggerFactory.getLogger(TransactionBizMessageListener.class);
  21. private final String transactionBiz = "testDistributedTransaction";
  22. @Autowired
  23. private TransactionBizService transactionBizService;
  24. /**
  25. * @param message
  26. * @param session
  27. */
  28. public void onMessage(Message message, Session session) throws JMSException{
  29. //这里建议不要try catch,让异常抛出,通过redeliveryPolicy去重试,达到重试次数进入死信DLQ(Dead Letter Queue)
  30. ActiveMQTextMessage msg = (ActiveMQTextMessage) message;
  31. String ms = ms = msg.getText();
  32. log.info("==>receive message:" + ms);
  33. // 转换成相应的对象
  34. BizOperator operator = JSONObject.parseObject(ms, BizOperator.class);
  35. if (operator != null && transactionBiz.equals(operator.getOperator())) {
  36. transactionBizService.addScoreBySyn(100);
  37. //throw new RuntimeException("test redeliveryPolicy");
  38. } else {
  39. log.info("==>message:" + ms + " no about operator!");
  40. }
  41. }
  42. }

ActiveMQ的简单使用相关推荐

  1. Activemq MQTT 简单消息推送示例

    Activemq MQTT 简单消息推送示例 简介     简单使用 MQTT 连接 Activemq 进行消息推送的示例代码 编写详情 环境准备     使用docker启动Activemq,查看M ...

  2. Activemq Jms 简单示例

    Activemq Jms 简单示例 简介     简单的 Activemp JMS 示例代码 activemq 运行     简单使用docker启动一个: docker run -dit --nam ...

  3. ActiveMQ此例简单介绍基于docker的activemq安装与集群搭建

    ActiveMQ拓展连接 此例简单介绍基于Docker的activemq安装与集群搭建 一 :安装 1.获取activemq镜像 docker pull webcenter/activemq 2.启动 ...

  4. ActiveMQ的简单实现

    在学习JMS过程中,网上找了很多资料,选定ActiveMQ做为JMS的组件开发,于是实现了一个简单的例子. 第一步:下载ActiveMQ,测试实例采用的是apache-activemq-5.6.0,启 ...

  5. ActiveMQ的简单例子应用

    ActiveMQ是一种消息中间件,它实现了JMS规范,提供点对点和订阅-发布两种模式.下面介绍下ActiveMQ的使用: 一.环境的搭建 首先我们需要下载ActiveMQ的安装包,下载地址http:/ ...

  6. 遇见JMS[1] —— activeMQ的简单使用

    1.JMS Java Message Service,提供API,供两个应用程序或者分布式应用之间异步通信,以传送消息. 2.相关概念 提供者:实现JMS规范的消息中间件服务器 客户端:发送或接收消息 ...

  7. ActiveMQ简单介绍+简单实例

    本文出自:http://www.open-open.com/lib/view/open1388994166156.html 1. JMS基本概念      JMS(Java Message Servi ...

  8. ActiveMQ在C#中的应用

    ActiveMQ是个好东东,不必多说.ActiveMQ提供多种语言支持,如Java, C, C++, C#, Ruby, Perl, Python, PHP等.由于我在windows下开发GUI,比较 ...

  9. ActiveMQ入门教程(三) - ActiveMQ P2P版的HelloWorld

    为什么80%的码农都做不了架构师?>>>    在这篇博客,我们来写一个ActiveMQ版的HelloWorld. 其实,要想写程序的话,还是要先了解一下JMS规范里的东西. 可以参 ...

最新文章

  1. Deepfit: 通过神经网络加权最小二乘法进行3D表面拟合
  2. Arduino可穿戴开发入门教程(大学霸内部资料)
  3. 推荐一套开源通用后台管理系统(附源码)
  4. airflow零基础入门
  5. 华为软件研发面试题1
  6. 最常见的10种Java异常问题!
  7. 一分钟读懂java的super关键字
  8. 求数组三项之和最接近某个目标数字
  9. dns服务器的配置与管理
  10. java语言编程三角形图形_编程题:编写程序输入三角形的3条边长,计算并输出... 求助一道JAVA编程题:编写一个类似记事本的图形用......
  11. 比特币白皮书 Bitcoin: A Peer-to-Peer Electronic Cash System
  12. QT中foreach的使用
  13. 解决办法:configure: error: You requested SRTP (requires libsrtp) but not found...die
  14. 应用时间序列案例-基于R语言
  15. java候选码计算的替换法_数据库闭包和候选码求解方法
  16. 吴恩达OpenAI最新课程:prompt-engineering-for-developers读书笔记
  17. java毕业设计瓷砖仓库管理mybatis+源码+调试部署+系统+数据库+lw
  18. nginx整合fastdfs出现2个master进程没有worker进程的原因以及解决方法
  19. 博士生毕业后进高校当老师,直接是副教授吗
  20. JavaScript学习第十三天

热门文章

  1. 基于51单片机的心形流水灯
  2. 基于windows7的usb多点触控设备
  3. Ubuntu安装KVM虚拟机
  4. 【技能教学】如何通过FFMPEG编码推RTSP视频直播流到EasyDarwin开源平台时叠加时间水印?
  5. laravel+容联.云通讯 实现手机短信验证用户注册
  6. Acwing第 70 场周赛【未完结】
  7. java大数据培训专业课程与教学模式的介绍
  8. 【闲谈】应聘时要问HR的7个问题
  9. php 高德地图创建标注,使用高德地图API生成带标的专属地图
  10. 标记网购ThinkPad过程