ActiveMQ 注意用5.11版本,5.12与spring有冲突

ActiveMQTest:

测试类示例

package com.igeek.test.jedis;import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;import javax.jms.*;
import java.io.IOException;public class ActiveMQTest {/*** PTP(点对点)*///生产者public void producer_queue() throws JMSException {
//        第一步:创建 ConnectionFactory 对象,需要指定服务端 ip 及端口号。ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.8.101:61616");//192.168.8.101:8161
//        第二步:使用 ConnectionFactory 对象创建一个 Connection 对象。Connection connection = connectionFactory.createConnection();
//        第三步:开启连接,调用 Connection 对象的 start 方法。connection.start();
//        第四步:使用 Connection 对象创建一个 Session 对象。Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//参数:1.是否使用分布式事务(false 不使用) 2.activemq的响应类型(自动应答)
//        第五步:使用 Session 对象创建一个 Destination 对象(topic、queue),此处创建一个 Queue对象。Queue queue = session.createQueue("test_queue");//参数:给队列设置名称
//        第六步:使用 Session 对象创建一个 Producer 对象。MessageProducer producer = session.createProducer(queue);
//        第七步:创建一个 Message 对象,创建一个 TextMessage 对象。TextMessage message = new ActiveMQTextMessage();message.setText("hello queue...message03");
//        第八步:使用 Producer 对象发送消息。producer.send(message);
//        第九步:关闭资源。session.close();connection.close();}//消费者public void consumer_queue() throws JMSException, IOException {//        第一步:创建 ConnectionFactory 对象,需要指定服务端 ip 及端口号。ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.8.101:61616");
//        第二步:使用 ConnectionFactory 对象创建一个 Connection 对象。Connection connection = connectionFactory.createConnection();
//        第三步:开启连接,调用 Connection 对象的 start 方法。connection.start();
//        第四步:使用 Connection 对象创建一个 Session 对象。Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//参数:1.是否使用分布式事务(false 不使用) 2.activemq的响应类型(自动应答)
//        第五步:使用 Session 对象创建一个 Destination 对象(topic、queue),此处创建一个 Queue对象。Queue queue = session.createQueue("test_queue");//参数:给队列设置名称
//        第六步:使用 Session 对象创建一个 Consumer 对象。MessageConsumer cosumer = session.createConsumer(queue);//        第七步:使用 Consumer 对象消费消息。cosumer.setMessageListener(new MessageListener() {@Overridepublic void onMessage(Message message) {TextMessage _message = (TextMessage) message;try {System.out.println("消费者接收到消息..."+_message.getText());} catch (JMSException e) {e.printStackTrace();}}});System.in.read();//        第八步:关闭资源。session.close();connection.close();}/*** Topic(发布/订阅)*///生产者public void producer_topic() throws JMSException {
//        第一步:创建 ConnectionFactory 对象,需要指定服务端 ip 及端口号。ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.8.101:61616");
//        第二步:使用 ConnectionFactory 对象创建一个 Connection 对象。Connection connection = connectionFactory.createConnection();
//        第三步:开启连接,调用 Connection 对象的 start 方法。connection.start();
//        第四步:使用 Connection 对象创建一个 Session 对象。Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//参数:1.是否使用分布式事务(false 不使用) 2.activemq的响应类型(自动应答)
//        第五步:使用 Session 对象创建一个 Destination 对象(topic、queue),此处创建一个 Topic对象。Topic topic = session.createTopic("test-topic");//参数:给队列设置名称
//        第六步:使用 Session 对象创建一个 Producer 对象。MessageProducer producer = session.createProducer(topic);
//        第七步:创建一个 Message 对象,创建一个 TextMessage 对象。TextMessage message = new ActiveMQTextMessage();message.setText("hello topic...message03");
//        第八步:使用 Producer 对象发送消息。producer.send(message);
//        第九步:关闭资源。session.close();connection.close();}//消费者public void consumer_topic() throws JMSException, IOException {//        第一步:创建 ConnectionFactory 对象,需要指定服务端 ip 及端口号。ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.8.101:61616");
//        第二步:使用 ConnectionFactory 对象创建一个 Connection 对象。Connection connection = connectionFactory.createConnection();
//        第三步:开启连接,调用 Connection 对象的 start 方法。connection.start();
//        第四步:使用 Connection 对象创建一个 Session 对象。Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//参数:1.是否使用分布式事务(false 不使用) 2.activemq的响应类型(自动应答)
//        第五步:使用 Session 对象创建一个 Destination 对象(topic、queue),此处创建一个 Queue对象。Topic topic = session.createTopic("test-topic");//参数:给队列设置名称
//        第六步:使用 Session 对象创建一个 Consumer 对象。MessageConsumer cosumer = session.createConsumer(topic);//        第七步:使用 Consumer 对象消费消息。cosumer.setMessageListener(new MessageListener() {@Overridepublic void onMessage(Message message) {TextMessage _message = (TextMessage) message;try {System.out.println("消费者接收到消息..."+_message.getText());} catch (JMSException e) {e.printStackTrace();}}});System.in.read();//        第八步:关闭资源。session.close();connection.close();}/*** Spring与ActiveMQ整合测试*///@Testpublic void test_spring_activeMq(){ApplicationContext ac = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-activemq.xml");JmsTemplate jmsTemplate = ac.getBean(JmsTemplate.class);Destination destination = (Destination) ac.getBean("topicDestination");jmsTemplate.send(destination, new MessageCreator() {@Overridepublic Message createMessage(Session session) throws JMSException {TextMessage message = new ActiveMQTextMessage();message.setText("hello...");return message;}});//参数:1.目的地 2.消息创建器}}

与Spring整合的配置文件:

发送消息:

applicationContext-activemq.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsdhttp://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsdhttp://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.0.xsdhttp://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.0.xsdhttp://code.alibabatech.com/schema/dubbo http://code.alibabatech.com/schema/dubbo/dubbo.xsd "><!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 --><bean id="targetConnectionFactory"class="org.apache.activemq.ActiveMQConnectionFactory"><property name="brokerURL" value="tcp://192.168.8.101:61616" /></bean><!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory --><bean id="connectionFactory"class="org.springframework.jms.connection.SingleConnectionFactory"><!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory--><property name="targetConnectionFactory" ref="targetConnectionFactory" /></bean><!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 --><bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"><!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 --><property name="connectionFactory" ref="connectionFactory" /></bean><!--新增商品 --><bean id="addItemDestination" class="org.apache.activemq.command.ActiveMQTopic"><constructor-arg value="ITEM_ADD" /></bean><!--这个是队列目的地,点对点的 --><bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"><constructor-arg><value>spring-queue</value></constructor-arg></bean><!--这个是主题目的地,一对多的 --><bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic"><constructor-arg value="topic" /></bean></beans>

ItemServiceImpl 业务层应用

package com.igeek.service.impl;import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import com.igeek.mapper.TbItemMapper;
import com.igeek.pojo.TbItem;
import com.igeek.pojo.TbItemDesc;
import com.igeek.service.ItemDescService;
import com.igeek.service.ItemService;
import com.igeek.util.DataGridResult;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;import javax.jms.*;
import java.util.Date;
import java.util.List;
@Service
public class ItemServiceImpl implements ItemService {@Autowiredprivate TbItemMapper tbItemMapper;@Autowiredprivate ItemDescService itemDescService;@Autowiredprivate JmsTemplate jmsTemplate;@Autowired@Qualifier("addItemDestination")private Topic topic;@Overridepublic List<TbItem> selectItems() {return tbItemMapper.selectAll();}@Overridepublic DataGridResult selectItemsByPage(Long page, Long rows) {DataGridResult result = new DataGridResult();//设置页码 以及 每页条数PageHelper.startPage(page.intValue(),rows.intValue());//查询所有List<TbItem> tbItems = tbItemMapper.selectAll();//真正的查询操作/*查询对象:pageInfo.getList() 每页的数据pageInfo.getPages() 总页数...*/PageInfo<TbItem> pageInfo = new PageInfo<>(tbItems);//result.setRows(rows);result.setTotal(pageInfo.getTotal());result.setRows(pageInfo.getList());return result;}//事务传播行为@Overridepublic int addItem(TbItem item,String desc) {//开启事务tbItemMapper.insert(item);//封装商品描述对象TbItemDesc itemDesc = new TbItemDesc();itemDesc.setItemId(item.getId());itemDesc.setItemDesc(desc);itemDesc.setCreated(new Date());itemDesc.setUpdated(itemDesc.getCreated());//int i = 10/0;//保存描述对象itemDescService.addItemDesc(itemDesc);//发送消息jmsTemplate.send(topic, new MessageCreator() {@Overridepublic Message createMessage(Session session) throws JMSException {TextMessage message = new ActiveMQTextMessage();message.setText(item.getId()+"");//发送消息(消息内容:商品id)return message;}});return 1;}@Overridepublic void updateItem(TbItem item, String desc) {//设置更新时间item.setUpdated(new Date());//设置不需要更新的字段值为nullitem.setCreated(null);item.setStatus(null);tbItemMapper.updateByPrimaryKeySelective(item);//status->0 create->null  Selective:选择性更新,如果为null不更新TbItemDesc itemDesc = new TbItemDesc();itemDesc.setItemId(item.getId());itemDesc.setItemDesc(desc);itemDescService.updateItemDesc(itemDesc);}@Overridepublic TbItem selectItemById(Long id) {//加缓存  ITEM_CACHE->(id+"INFO"=内容)return tbItemMapper.selectByPrimaryKey(id);}
}

接收消息:

创建监听器:

package com.igeek.search.listener;import com.igeek.search.service.SearchItemService;
import org.apache.solr.client.solrj.SolrServerException;
import org.springframework.beans.factory.annotation.Autowired;import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import java.io.IOException;/*** ActiveMQ消息监听器(商品添加)*/
public class ItemAddMessageListener implements MessageListener {@Autowiredprivate SearchItemService searchItemService;@Overridepublic void onMessage(Message message) {TextMessage _message = (TextMessage)message;try {String text = _message.getText();//商品ID//根据商品ID获取商品信息 同步索引库Thread.sleep(200);//确保数据保存到数据库searchItemService.addItem(Long.parseLong(text));} catch (JMSException e) {e.printStackTrace();} catch (SolrServerException e) {e.printStackTrace();} catch (IOException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}
}

配置监听器:

applicationContext-activemq.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsdhttp://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsdhttp://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.0.xsdhttp://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.0.xsdhttp://code.alibabatech.com/schema/dubbo http://code.alibabatech.com/schema/dubbo/dubbo.xsd "><!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 --><bean id="targetConnectionFactory"class="org.apache.activemq.ActiveMQConnectionFactory"><property name="brokerURL" value="tcp://192.168.8.101:61616" /></bean><!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory --><bean id="connectionFactory"class="org.springframework.jms.connection.SingleConnectionFactory"><!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory--><property name="targetConnectionFactory" ref="targetConnectionFactory" /></bean><!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 --><bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"><!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 --><property name="connectionFactory" ref="connectionFactory" /></bean><!--新增商品 --><bean id="addItemDestination" class="org.apache.activemq.command.ActiveMQTopic"><constructor-arg value="ITEM_ADD" /></bean><!--配置消息监听器--><bean id="myMessageListener"class="com.igeek.search.listener.ItemAddMessageListener"/><!-- 消息监听容器 --><bean class="org.springframework.jms.listener.DefaultMessageListenerContainer"><property name="connectionFactory" ref="connectionFactory" /><property name="destination" ref="addItemDestination" /><property name="messageListener" ref="myMessageListener" /></bean></beans>

ActiveMQ代码示例相关推荐

  1. java爬虫代码示例_那些让你代码思维和能力有较大的提升Java源码

    来源:www.cnblogs.com/jiagou/p/9270070.html 对于学习J2EE的框架有很大的帮助,代码里使用了各种设计模式.事件机制.Java8语法.代码量也很小,web服务使用N ...

  2. 用户自定义协议client/server代码示例

    用户自定义协议client/server代码示例 代码参考链接:https://github.com/sogou/workflow message.h message.cc server.cc cli ...

  3. 2021年大数据Flink(二十六):​​​​​​​State代码示例

    目录 State代码示例 Keyed State 官网代码示例 需求: 编码步骤 代码示例 Operator State 官网代码示例 需求: 编码步骤: 代码示例 State代码示例 Keyed S ...

  4. TensorFlow常用操作:代码示例

    1,定义矩阵代码示例: import tensorflow as tftf.zeros([3,4]) #定义3行4列元素均为0的矩阵tensor=tf.constant([1,2,3,4])#定义一维 ...

  5. TensorFlow基本计算单元:代码示例

    1,代码示例: import tensorflow as tf a = 3 #创建变量 w = tf.Variable([[0.6,1.2]])#创建行向量 x = tf.Variable([[2.1 ...

  6. php mms,PHP代码示例_PHP账号余额查询接口 | 微米-中国领先的短信彩信接口平台服务商...

    PHP余额查询接口代码示例 请求 $ch = curl_init(); curl_setopt($ch, CURLOPT_URL, "http://api.weimi.cc/2/accoun ...

  7. java结束全部操作代码_Java创建与结束线程代码示例

    这篇文章主要介绍了Java创建与结束线程代码示例,小编觉得挺不错的,这里分享给大家,供需要的朋友参考. 本文讲述了在Java中如何创建和结束线程的最基本方法,只针对于Java初学者.一些高级知识如线程 ...

  8. doc python 颜色_Python wordcloud.ImageColorGenerator方法代码示例

    本文整理汇总了Python中wordcloud.ImageColorGenerator方法的典型用法代码示例.如果您正苦于以下问题:Python wordcloud.ImageColorGenerat ...

  9. 机器学习简单代码示例

    机器学习简单代码示例 //在gcc-4.7.2下编译通过. //命令行:g++ -Wall -ansi -O2 test.cpp -o test #include <iostream> u ...

最新文章

  1. 剖析nodejs的事件循环
  2. 两道相似题——water-easyJavabeans
  3. C++模拟键盘操作窗口入门
  4. 基于web的工作流设计器(多比图形控件)
  5. 大一新生计算机课word知识,大学新生计算机基础分层考试结果探析与启发.doc
  6. python提取文本中的字符串到新的txt_Python实现jieba对文本分词并写入新的文本文件,然后提取出文本中的关键词...
  7. 利用域名(host碰撞)碰撞实现从任何地方发起中间人攻击(理论篇)
  8. WebGL编程指南理论分析之物体层次模型(局部运动)
  9. Python中定时任务框架APScheduler的快速入门指南
  10. 【计算机网络原理】各层的数据传输
  11. 解构领域驱动设计--思维导图
  12. 以风景为主题的html,以风景为主题的英语作文
  13. web前端程序员到底值多少钱?
  14. 贪心入门+10道例题+解析代码
  15. 【无标题】26-时尚精品服饰网店响应式网页模板
  16. el-input 密码输入框 显示隐藏优化
  17. CVPR 2019 ViLBERT: Pretraining Task-Agnostic Visiolinguistic Representations for Vision-and-Language
  18. python导出dxf图_利用Dxfwrite/ezdxf操作CAD文件!
  19. 【每天一个没用的干货】海康摄像头rtsp流 不登录即播放
  20. 致Java开发者:济南java工资待遇

热门文章

  1. [译]Ocelot - Headers Transformation
  2. MySQL 报错记录
  3. C# ITextSharp pdf 自动打印
  4. Linux之进程通信20160720
  5. 初学C#和MVC的一些心得,弯路,总结,还有教训(4)--Cache 关于创建多个缓存实例
  6. Oracle 数据库升级
  7. oracle数据恢复
  8. kibana报错Request Timeout after 30000ms故障解决
  9. yum安装:zabbix-web-4.2.8-1.el7.noarch: [Errno 256] No more mirrors to try
  10. Python获取照片信息