Java笔记-使用RabbitMQ的Java接口实现topic(主题模式)
目录
基本概念
代码与实例
基本概念
实现的就是官方给出的这个模型:
Topic exchange:将路由和某模式匹配
其中
#:匹配一个或多个
*:匹配一个
比如下面要举得这个例子
交换机设置为topic模式,生产者生成的消息的路由键值为goods.XXXX
其中XXXX,可能为add、delete、update、modify等
队列一绑定的是goods.add
队列二绑定的是goods.#
这样话,如果生产者生产一个路由键值为goods.add的消息,辣么2个队列都将会收到。
如果生产者生成一个路由键值为goods.delete的消息,辣么只有1个队列将会收到。
代码与实例
当生产者和消费者跑起来后,对应的RabbitMQ交换机如下:
可见有2个队列,一个绑定的路由键为goods.add
一个绑定的路由键为goods.#
当生产者发送的键值为goods.add时:
两个消费者都可以收到:
当生产者发送的键值为goods.delete时:
只有消费者二可以收到,消费者一和以前一样
源码如下:
Recv1.java
package topic;import com.rabbitmq.client.*;
import util.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Recv1 {private static final String EXCHANGE_NAME = "test_exchange_topic";private static final String QUEUE_NAME = "test_queue_topic_1";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtils.getConnection();final Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.add");channel.basicQos(1);Consumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body, "utf-8");System.out.println("[1] Recv msg : " + msg);try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();} finally {System.out.println("[1] done");channel.basicAck(envelope.getDeliveryTag(), false);}}};boolean autoAck = false;channel.basicConsume(QUEUE_NAME, autoAck, consumer);}
}
Recv2.java
package topic;import com.rabbitmq.client.*;
import util.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Recv2 {private static final String EXCHANGE_NAME = "test_exchange_topic";private static final String QUEUE_NAME = "test_queue_topic_2";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtils.getConnection();final Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.#");channel.basicQos(1);Consumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body, "utf-8");System.out.println("[2] Recv msg : " + msg);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();} finally {System.out.println("[2] done");channel.basicAck(envelope.getDeliveryTag(), false);}}};boolean autoAck = false;channel.basicConsume(QUEUE_NAME, autoAck, consumer);}
}
Send.java
package topic;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import util.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Send {private static final String EXCHANGE_NAME = "test_exchange_topic";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtils.getConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, "topic");String msgString = "goods ... ... ...";channel.basicPublish(EXCHANGE_NAME, "goods.delete", null, msgString.getBytes());System.out.println("send msg :" + msgString);channel.close();connection.close();}
}
ConnectionUtils.java
package util;import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class ConnectionUtils {public static Connection getConnection() throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(5672);factory.setVirtualHost("/vhost_cff");factory.setUsername("cff");factory.setPassword("123");return factory.newConnection();}
}
源码打包下载地址:
https://github.com/fengfanchen/Java/tree/master/TopicModel
Java笔记-使用RabbitMQ的Java接口实现topic(主题模式)相关推荐
- Java笔记-使用RabbitMQ的Java接口实现Routing(路由模式)
目录 基本概念 代码与实例 基本概念 过程图如下: 主要是把交换机设置为直连的方式direct直连的方式然后把 数据 发送给交换机. 交换机再通过路由的Key值转发到队列上. 每一个客户端,都有一个队 ...
- Java笔记-使用RabbitMQ的Java接口实现Publish/Subscribe(订阅模式)
目录 基本概念 代码与实例 基本概念 模型如上: 1. 一个生产者,多个消费者: 2. 每个消费者都有自己的队列: 3. 生产者没有直接把消息发送到队列,而是发送到交换机,通过交换机转发到队列: 4. ...
- Java笔记-使用RabbitMQ的Java接口实现Fair dispatch(公平分发)
目录 基本概念 代码与实例 基本概念 当某些客户端处理比较强的时候,就多发数据让其处理,当某些客户端处理一般的时候,就少发数据让其处理. 主要是让消费者处理完后,回信息给RabbitMQ,然后Rabb ...
- Java笔记-使用RabbitMQ的Java接口实现round-robin(轮询分发)
目录 基本概念 代码与实例 基本概念 简单队列的不足: 1. 耦合性高: 2. 如果生产者把生产队列该了,消费者也要同时改: Work Queues工作队列,模型如下: 代码与实例 程序运行截图如下: ...
- Java笔记-使用RabbitMQ的Java接口生产数据并消费
目录 基本概念 代码及演示 基本概念 就是官方的这个模型: p为生产者不经过交换机,直接把数据传给消息队列,c为consumer用于消费. 这种结构在本科生的时候,经常自己写,现在用RabbitMQ来 ...
- Java笔记整理五(Iterator接口,泛型,常见数据结构(栈,队列,数组,链表,红黑树,集合),jdk新特性,异常,多线程,Lambda表达式)
Java笔记整理五 1.1Iterator接口 Collection接口与Map接口主要用于存储元素,而Iterator主要用于迭代访问(即遍历)Collection中的元素,因此Iterator对象 ...
- (八)RabbitMQ消息队列-通过Topic主题模式分发消息
前两章我们讲了RabbitMQ的direct模式和fanout模式,本章介绍topic主题模式的应用.如果对direct模式下通过routingkey来匹配消息的模式已经有一定了解那fanout也很好 ...
- 9.1-全栈Java笔记: 容器泛型—认识Collection接口
开发和学习中需要时刻和数据打交道,如果组织这些数据是我们编程中重要的内容. 我们一般通过"容器"来容纳和管理数据. 事实上,数组就是一种容器,可以在其中放置对象或基本类型数据. ...
- C/C++笔记-使用RabbitMQ的C接口生产数据并消费
目录 基本概念 代码与实例 基本概念 使用C语言接口完成官方的这个模型: 很有意思.感觉开源的东西真的好,不用自己去写C/C++服务端. p为生产者不经过交换机,直接把数据传给消息队列,c为consu ...
最新文章
- Linux修改密码是提示“passwd: 鉴定令牌操作错误”问题的处理办法
- java restful netty_Java RESTful 框架的性能比较
- java的队列_java实现队列
- 若依前后端分离版(vue)中配置页面跳转的路由
- 可能是目前最详细的Redis内存模型及应用解读
- JavaScript基本概念(下)
- python 爬取贝壳网小区名称_Python爬虫实战:爬取贝壳网二手房40000条数据
- qt添加菜单纯代码_开始玩qt,使用代码修改设计模式生成的菜单
- EMS设置发送连接器和接收连接器邮件大小
- 单元测试/集成测试/系统测试的区别
- [复杂网络博弈] 第二章 演化博弈动力学基础
- 替换class文件,重启Tomcat不生效
- NIO蔚来ET5/ET7电动汽车维修手册电路图用户手册技术资料
- 牛客多校第八场 C CDMA 线性代数:沃尔什矩阵
- 安卓APP自动更新功能实现
- XSS Overview
- 给你一个整数数组 nums 。 如果一组数字 (i,j) 满足 nums[i] == nums[j] 且 i < j ,就可以认为这是一组 好数对 。
- 我的言论05-04-06
- [转载] 细看名字服务中心
- Oracle11G数据库重演测试