转载自 https://www.cnblogs.com/tohxyblog/p/7256554.html

一、开启rabbitMQ服务,导入MQ jar包和gson jar包(MQ默认的是jackson,但是效率不如Gson,所以我们用gson)

二、发送端配置,在spring配置文件中配置

<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd"><!-- 连接服务配置 如果MQ服务器在远程服务器上,请新建用户用新建的用户名密码  guest默认不允许远程登录-->  <rabbit:connection-factory id="connectionFactory"  host="localhost" username="guest" password="guest" port="5672"  virtual-host="/" channel-cache-size="5" />  <!-- 配置爱admin,自动根据配置文件生成交换器和队列,无需手动配置 --><rabbit:admin connection-factory="connectionFactory" />  <!-- queue 队列声明 -->  <rabbit:queue  durable="true"  auto-delete="false" exclusive="false" name="spring.queue.tag" />  <!-- exchange queue binging key 绑定 -->  <rabbit:direct-exchange name="spring.queue.exchange"  durable="true" auto-delete="false">  <rabbit:bindings>  <rabbit:binding queue="spring.queue.tag" key="spring.queue.tag.key" />  </rabbit:bindings>  </rabbit:direct-exchange>  <!-- spring amqp默认的是jackson 的一个插件,目的将生产者生产的数据转换为json存入消息队列,由于Gson的速度快于jackson,这里替换为Gson的一个实现 -->  <bean id="jsonMessageConverter"    class="sendMQ.Gson2JsonMessageConverter" />  <!-- spring template声明 -->  <rabbit:template id="amqpTemplate" exchange="spring.queue.exchange"  routing-key="spring.queue.tag.key"  connection-factory="connectionFactory" message-converter="jsonMessageConverter" /> 

发送端代码:GSON配置

package sendMQ;import java.io.IOException;
import java.io.UnsupportedEncodingException;import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.AbstractJsonMessageConverter;
import org.springframework.amqp.support.converter.ClassMapper;
import org.springframework.amqp.support.converter.DefaultClassMapper;
import org.springframework.amqp.support.converter.MessageConversionException;import com.google.gson.Gson;public class Gson2JsonMessageConverter extends AbstractJsonMessageConverter{private static Log log = LogFactory.getLog(Gson2JsonMessageConverter.class);  private static  ClassMapper classMapper =  new DefaultClassMapper();  private static Gson gson = new Gson();  public Gson2JsonMessageConverter() {  super();  }  @Override  protected Message createMessage(Object object,  MessageProperties messageProperties) {  byte[] bytes = null;  try {  String jsonString = gson.toJson(object);  bytes = jsonString.getBytes(getDefaultCharset());  }  catch (IOException e) {  throw new MessageConversionException(  "Failed to convert Message content", e);  }  messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);  messageProperties.setContentEncoding(getDefaultCharset());  if (bytes != null) {  messageProperties.setContentLength(bytes.length);  }  classMapper.fromClass(object.getClass(),messageProperties);  return new Message(bytes, messageProperties);  }  @Override  public Object fromMessage(Message message)  throws MessageConversionException {  Object content = null;  MessageProperties properties = message.getMessageProperties();  if (properties != null) {  String contentType = properties.getContentType();  if (contentType != null && contentType.contains("json")) {  String encoding = properties.getContentEncoding();  if (encoding == null) {  encoding = getDefaultCharset();  }  try {  Class<?> targetClass = getClassMapper().toClass(  message.getMessageProperties());  content = convertBytesToObject(message.getBody(),  encoding, targetClass);  }  catch (IOException e) {  throw new MessageConversionException(  "Failed to convert Message content", e);  }  }  else {  log.warn("Could not convert incoming message with content-type ["  + contentType + "]");  }  }  if (content == null) {  content = message.getBody();  }  return content;  }  private Object convertBytesToObject(byte[] body, String encoding,  Class<?> clazz) throws UnsupportedEncodingException {  String contentAsString = new String(body, encoding);  return gson.fromJson(contentAsString, clazz);  }
}

发送类接口:

public interface MQProducer {/*** 发送消息到指定队列* @param queueKey* @param object*/public void sendDataToQueue(String queueKey, Object object);
}

实现类:test是测试用的。

package sendMQ;import java.util.HashMap;
import java.util.Map;import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.stereotype.Component;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;@RunWith(value = SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = {"classpath:/spring-common.xml"})@Component
public class MQProducerImpl implements MQProducer {@Autowiredprivate  AmqpTemplate amqpTemplate;@Overridepublic void sendDataToQueue(String queueKey, Object object) {System.out.println("--"+amqpTemplate);try {amqpTemplate.convertAndSend(object);System.out.println("------------消息发送成功");} catch (Exception e) {System.out.println(e);}}@Testpublic  void test() {  Map<String,Object> msg = new HashMap<>();msg.put("data","hello,456");while(true){amqpTemplate.convertAndSend(msg); try {Thread.sleep(2000);} catch (InterruptedException e) {// TODO 自动生成的 catch 块e.printStackTrace();}}}  }

接收端配置:

  <!-- 连接服务配置  -->  <rabbit:connection-factory id="connectionFactory" host="localhost" username="guest"  password="guest" port="5672" virtual-host="/"  channel-cache-size="5" />  <rabbit:admin connection-factory="connectionFactory"/>  <!-- queue 队列声明-->  <rabbit:queue durable="true" auto-delete="false" exclusive="false" name="spring.queue.tag"/>  <!-- exchange queue binging key 绑定 -->  <rabbit:direct-exchange name="spring.queue.exchange" durable="true" auto-delete="false">  <rabbit:bindings>  <rabbit:binding queue="spring.queue.tag" key="spring.queue.tag.key"/>  </rabbit:bindings>  </rabbit:direct-exchange>  <bean id="receiveMessageListener"  class="receiveMQ.QueueListenter" />  <!-- queue litener  观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象-->  <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" >  <rabbit:listener queues="spring.queue.tag" ref="receiveMessageListener" />  </rabbit:listener-container>  

接收端代码:

package receiveMQ;import java.io.UnsupportedEncodingException;import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;public class QueueListenter implements MessageListener{@Overridepublic void onMessage(Message msg) {try {System.out.print("-------------------"+new String(msg.getBody(),"UTF-8"));} catch (UnsupportedEncodingException e) {// TODO 自动生成的 catch 块e.printStackTrace();}}}

接收端测试启动:

package receiveMQ;import org.springframework.context.support.ClassPathXmlApplicationContext;public class ConsumerMain {public static void main(String[] args) {  new ClassPathXmlApplicationContext("spring-common.xml");    }
}

上面代码均有注释,应该不难看懂,复制即可使用,实现了MQ的简单功能。

说明:可以配置多个接收端,spring默认的是负载均衡机制,每个接收端接收一条的来,这些扩展功能待后面有时间再讲解

rabbitMQ教程 spring整合rabbitMQ代码实例相关推荐

  1. rabittmq java spring_消息队列 RabbitMQ 与 Spring 整合使用的实例代码

    一.什么是 RabbitMQ RabbitMQ 是实现 AMQP(高级消息队列协议)的消息中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性.扩展性.高可用性等方面表现不俗.消 ...

  2. RabbitMQ,RabbitMQ 的工作模式,Spring 整合 RabbitMQ,Springboot 整合RabbitMQ

    什么是RabbitMQ 1.1 MQ概述 MQ全称 Message Queue(消息队列),是在消息的传输过程中保存消息的容器.多用于分布式系统之间进行通信. ⚫ MQ,消息队列,存储消息的中间件 ⚫ ...

  3. Spring整合RabbitMQ(包含生产者和消费者)

    生产者 创建一个MAVEN项目spring-exchange-producer作为消息队列的生产者 导入相关的依赖坐标 <dependencies><!-- https://mvnr ...

  4. spring 整合 RabbitMQ 3.9.11

    spring 整合 RabbitMQ 3.9.11 rabbitmq 3.9.11 创建工程 不管用什么办法创建出一个空的工程出来,或者在已有工程里面创建一个新的module,下面只说创建新modul ...

  5. activiti自己定义流程之Spring整合activiti-modeler5.16实例(四):部署流程定义

    注:(1)环境搭建:activiti自己定义流程之Spring整合activiti-modeler5.16实例(一):环境搭建         (2)创建流程模型:activiti自己定义流程之Spr ...

  6. activiti自定义流程之Spring整合activiti-modeler5.16实例(四):部署流程定义

    注:(1)环境搭建:activiti自定义流程之Spring整合activiti-modeler5.16实例(一):环境搭建         (2)创建流程模型:activiti自定义流程之Sprin ...

  7. Java代码制作ie浏览器_[Java教程]判断IE浏览器代码实例

    [Java教程]判断IE浏览器代码实例 0 2015-08-12 01:00:19 判断IE浏览器代码实例: 由于当下浏览器类型众多,并且对同一段代码的解读有时候各有不同,所以要根据浏览器的类型来执行 ...

  8. RabbitMQ学习总结(7)——Spring整合RabbitMQ实例

    2019独角兽企业重金招聘Python工程师标准>>> 1.RabbitMQ简介 RabbitMQ是流行的开源消息队列系统,用erlang语言开发.RabbitMQ是AMQP(高级消 ...

  9. rabbitmq实战_RabbitMQ实战(四) - RabbitMQ amp; Spring整合开发

    0 相关源码 1 你将学到 RabbitMQ 整合 Spring AMQP实战 RabbitMQ 整合 Spring Boot实战 RabbitMQ 整合 Spring Cloud实战 2 Sprin ...

最新文章

  1. 重磅!“全脑介观神经联接图谱”大科学计划中国工作组成立!
  2. ERP成分简介--听觉感觉反应
  3. java实现异步调用实例
  4. 乡村要振兴,快递先进村?
  5. 通过调用API函数实现的无边框窗体的拖拽,比判断坐标更快捷
  6. Spring中的后置处理器BeanPostProcessor讲解
  7. ASP.NET Core分布式项目实战(客户端集成IdentityServer)--学习笔记
  8. python小学生课本剧_二年级上学期课本剧
  9. Linux服务器iops性能测试-fio
  10. 如何解决ajax跨域问题
  11. 阻塞非阻塞,同步异步四种I/O方式
  12. 三菱st语言编程实例_C语言编程实例39
  13. c#中的一些容易混淆的概念
  14. 山西省计算机商务学校地址,山西计算机等级考试报名地点
  15. html怎么制作壁纸,CSS3制作皮卡丘动画壁纸的示例
  16. 富爸爸实现财务自由七步骤
  17. 永久代,方法区 和 元空间之间的关系
  18. IntelliJ IDEA远程debug调试
  19. 两分钟解决IntelliJ IDEA中文乱码问题
  20. mysql联合查询注入防护,SQL注入之BypassWaf

热门文章

  1. delphi调用windows api
  2. 初级教程之---delphi调试
  3. 对现有的所能找到的DDOS代码(攻击模块)做出一次分析----GET篇
  4. 标准模板库(STL)学习指南之set集合
  5. mupdf-android-viewer 设计与实现浅析
  6. 第04讲: 基础探究,Session 与 Cookies
  7. Linux网络子系统
  8. SpringSecurity + JWT,从入门到精通!
  9. 为什么直播时要用CDN?
  10. 若只让我推荐一名LiveVideoStackCon上海的讲师,就是他