这里写了一个简单的springboot的demo来处理RabbitMq的发布订阅

添加pom依赖

     <dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.8.0</version></dependency>

创建连接管理类

package com.my.tool;import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;/*** Create by hadoop*/
public class ConnectionUtil {public static Connection getConnection(String host,int port,String vHost,String userName,String passWord) throws Exception{//1、定义连接工厂ConnectionFactory factory = new ConnectionFactory();//2、设置服务器地址factory.setHost(host);//3、设置端口factory.setPort(port);//4、设置虚拟主机、用户名、密码factory.setVirtualHost(vHost);factory.setUsername(userName);factory.setPassword(passWord);//5、通过连接工厂获取连接Connection connection = factory.newConnection();return connection;}
}

创建订阅发布者

package com.my.tool;import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.util.concurrent.TimeoutException;@Component
public class Publish {private final static String EXCHANGE_NAME = "test_exchange_fanout";Channel channel = null;Connection connection = null;@PostConstructvoid init() throws Exception {//获取连接Connection connection = ConnectionUtil.getConnection("127.0.0.1",5672,"/","guest","guest");//从连接中获取一个通道channel = connection.createChannel();//声明交换机(分发:发布/订阅模式channel.exchangeDeclare(EXCHANGE_NAME, "fanout");}@PreDestroyvoid uninit() throws IOException, TimeoutException {channel.close();connection.close();}public  void  SendData(String message) throws IOException {channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("utf-8"));}
}

创建订阅者需要两个类来提供支撑

package com.my.tool;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.io.IOException;
import java.util.concurrent.TimeoutException;@Component
public class subscribe {//交换机名称private final static String EXCHANGE_NAME = "test_exchange_fanout";//队列名称private static final String QUEUE_NAME    = "test_queue_email";Channel channel = null;Connection connection = null;MyConsumer myConsumer = null;@PostConstructvoid init() throws Exception {//获取连接Connection connection = ConnectionUtil.getConnection("127.0.0.1",5672,"/","guest","guest");//从连接中获取一个通道channel = connection.createChannel();//声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);//将队列绑定到交换机channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");//保证一次只分发一个int prefetchCount = 1;channel.basicQos(prefetchCount);myConsumer = new MyConsumer(channel);//设置应答boolean autoAck = true;//监听队列channel.basicConsume(QUEUE_NAME, autoAck, myConsumer);}@PreDestroyvoid uninit() throws IOException, TimeoutException {channel.close();connection.close();}
}
package com.my.tool;import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;import java.io.IOException;public class MyConsumer extends DefaultConsumer {public MyConsumer(Channel channel) {super(channel);}public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("-----------consume message----------");System.out.println("consumerTag: " + consumerTag);System.out.println("envelope: " + envelope);System.out.println("properties: " + properties);System.out.println("body: " + new String(body));}
}

效果如下:

RabbitMq之发布订阅模式相关推荐

  1. RabbitMQ 之发布订阅模式

    publish/subscribe 发布订阅模式中,生产者不再直接与队列绑定,而是将数据发送至交换机Exchange 交换机Exchange用于将数据按某种规则送入与之绑定的队列,进而供消费者使用. ...

  2. RabbitMQ六种队列模式-发布订阅模式

    前言 RabbitMQ六种队列模式-简单队列 RabbitMQ六种队列模式-工作队列 RabbitMQ六种队列模式-发布订阅 [本文] RabbitMQ六种队列模式-路由模式 RabbitMQ六种队列 ...

  3. 【转】RabbitMQ六种队列模式-3.发布订阅模式

    前言 RabbitMQ六种队列模式-简单队列 RabbitMQ六种队列模式-工作队列 RabbitMQ六种队列模式-发布订阅 [本文] RabbitMQ六种队列模式-路由模式 RabbitMQ六种队列 ...

  4. RabbitMQ入门学习系列(四) 发布订阅模式

    什么时发布订阅模式 把消息发送给多个订阅者.也就是有多个消费端都完整的接收生产者的消息 换句话说 把消息广播给多个消费者 消息模型的核心 RabbitMQ不发送消息给队列,生产者也不知道消息发送到队列 ...

  5. RabbitMQ指南之三:发布/订阅模式(Publish/Subscribe)

    在上一章中,我们创建了一个工作队列,工作队列模式的设想是每一条消息只会被转发给一个消费者.本章将会讲解完全不一样的场景: 我们会把一个消息转发给多个消费者,这种模式称之为发布-订阅模式. 为了阐述这个 ...

  6. RabbitMQ发布/订阅模式(Publish/Subscribe)

    工作队列模式是直接在生产者与消费者里声明好一个队列,这种情况下消息只会对应同类型的消费者. 举个用户注册的列子:用户在注册完后一般都会发送消息通知用户注册成功(失败).如果在一个系统中,用户注册信息有 ...

  7. RabbitMQ 服务异步通信 -- 入门案例(消息预存机制)、SpringAMQP、发布订阅模式(FanoutExchange、DirectExchange、TopicExchange)、消息转换器

    文章目录 1. 入门案例 2. 完成官方Demo中的hello world案例 2.1 创建1个工程,2个模块 2.1.1 父工程的依赖,子工程不需要导入额外的依赖 2.1.2 配置子工程的配置文件( ...

  8. RabbitMQ:发布订阅模式

    ✨ RabbitMQ:发布订阅模式 1.订阅模式基本介绍 2.交换机 3.发布订阅模式 3.1基本介绍 3.2生产者 3.3消费者 3.4测试

  9. 观察者模式VS发布-订阅模式

    前言 观察者模式的大名,想必各位看官早已有所耳闻.从我们现实生活来说,微信公众号订阅.医院挂号叫号等都属于它的实际应用.在程序世界中,它是一种用于将代码解耦的设计模式,如果你想掌握并理解这种设计模式, ...

最新文章

  1. java端模拟http的get、post请求(转)
  2. Android中的基础控件CheckBox、RadioButton、ToggleButton、SeekBar
  3. 无限网络无限连接掉网的解决
  4. R语言观察日志(part15)--R的缺点
  5. 获取html滚动条位置,pc和移动端获取滚动条的位置
  6. Ubuntu 16.04 安装 cuda 7.5.run BEGIN failed--compilation aborted at ./cuda-installer.pl line 5
  7. mysql创建视图sql_SQL视图介绍-如何在SQL和MySQL中创建视图
  8. npm 报错 Module build failed: Error: No PostCSS Config found in:
  9. Excel 使用ODBC 连接mysql 5.0
  10. 如此多的深度学习框架,为什么我选择PyTorch?
  11. python *args 和 **kwargs
  12. mysql命令导出表结构和数据_mysql命令导入\导出表结构或数据
  13. grafana快速搭建数据平台
  14. AM调制解调matlab实验报告,基于MATLAB的AM调制解调系统仿真报告
  15. 方正飞鸿:OA系统先进性如何在工作流中体现
  16. 免费Bootstrap后台管理模板
  17. 马云:关于P2P,不要你觉得,我要我觉得。---------王艾老哥
  18. Zcash中的description
  19. 自定义设置电脑屏保(.scr文件)
  20. Discounted Cumulative Gain(DCG)

热门文章

  1. ORACLE数据库设置编码
  2. 今天学得有点多——end用法
  3. Web 前端开发框架收集
  4. InfoWorld 2020 年公布最佳开源软件
  5. hp 服务器可以安装win7系统安装失败,如何解决win7hp1213打印机安装失败的问题
  6. 察看无限网络linux,linux 无线网络调试
  7. php mysql 设置字符_php mysql字符集设置方法
  8. php 函数频率,这是一些使用频率比较高的php函数……
  9. 洛谷P1088 火星人__(作业)
  10. bzoj 1409 Password