目录

基本概念

代码与实例


基本概念

实现的就是官方给出的这个模型:

Topic exchange:将路由和某模式匹配

其中

#:匹配一个或多个

*:匹配一个

比如下面要举得这个例子

交换机设置为topic模式,生产者生成的消息的路由键值为goods.XXXX

其中XXXX,可能为add、delete、update、modify等

队列一绑定的是goods.add

队列二绑定的是goods.#

这样话,如果生产者生产一个路由键值为goods.add的消息,辣么2个队列都将会收到。

如果生产者生成一个路由键值为goods.delete的消息,辣么只有1个队列将会收到。

代码与实例

当生产者和消费者跑起来后,对应的RabbitMQ交换机如下:

可见有2个队列,一个绑定的路由键为goods.add

一个绑定的路由键为goods.#

当生产者发送的键值为goods.add时:

两个消费者都可以收到:

当生产者发送的键值为goods.delete时:

只有消费者二可以收到,消费者一和以前一样

源码如下:

Recv1.java

package topic;import com.rabbitmq.client.*;
import util.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Recv1 {private static final String EXCHANGE_NAME = "test_exchange_topic";private static final String QUEUE_NAME = "test_queue_topic_1";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtils.getConnection();final Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.add");channel.basicQos(1);Consumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body, "utf-8");System.out.println("[1] Recv msg : " + msg);try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();} finally {System.out.println("[1] done");channel.basicAck(envelope.getDeliveryTag(), false);}}};boolean autoAck = false;channel.basicConsume(QUEUE_NAME, autoAck, consumer);}
}

Recv2.java

package topic;import com.rabbitmq.client.*;
import util.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Recv2 {private static final String EXCHANGE_NAME = "test_exchange_topic";private static final String QUEUE_NAME = "test_queue_topic_2";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtils.getConnection();final Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.#");channel.basicQos(1);Consumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body, "utf-8");System.out.println("[2] Recv msg : " + msg);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();} finally {System.out.println("[2] done");channel.basicAck(envelope.getDeliveryTag(), false);}}};boolean autoAck = false;channel.basicConsume(QUEUE_NAME, autoAck, consumer);}
}

Send.java

package topic;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import util.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Send {private static final String EXCHANGE_NAME = "test_exchange_topic";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtils.getConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, "topic");String msgString = "goods ... ... ...";channel.basicPublish(EXCHANGE_NAME, "goods.delete", null, msgString.getBytes());System.out.println("send msg :" + msgString);channel.close();connection.close();}
}

ConnectionUtils.java

package util;import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class ConnectionUtils {public static Connection getConnection() throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(5672);factory.setVirtualHost("/vhost_cff");factory.setUsername("cff");factory.setPassword("123");return factory.newConnection();}
}

源码打包下载地址:

https://github.com/fengfanchen/Java/tree/master/TopicModel

Java笔记-使用RabbitMQ的Java接口实现topic(主题模式)相关推荐

  1. Java笔记-使用RabbitMQ的Java接口实现Routing(路由模式)

    目录 基本概念 代码与实例 基本概念 过程图如下: 主要是把交换机设置为直连的方式direct直连的方式然后把 数据 发送给交换机. 交换机再通过路由的Key值转发到队列上. 每一个客户端,都有一个队 ...

  2. Java笔记-使用RabbitMQ的Java接口实现Publish/Subscribe(订阅模式)

    目录 基本概念 代码与实例 基本概念 模型如上: 1. 一个生产者,多个消费者: 2. 每个消费者都有自己的队列: 3. 生产者没有直接把消息发送到队列,而是发送到交换机,通过交换机转发到队列: 4. ...

  3. Java笔记-使用RabbitMQ的Java接口实现Fair dispatch(公平分发)

    目录 基本概念 代码与实例 基本概念 当某些客户端处理比较强的时候,就多发数据让其处理,当某些客户端处理一般的时候,就少发数据让其处理. 主要是让消费者处理完后,回信息给RabbitMQ,然后Rabb ...

  4. Java笔记-使用RabbitMQ的Java接口实现round-robin(轮询分发)

    目录 基本概念 代码与实例 基本概念 简单队列的不足: 1. 耦合性高: 2. 如果生产者把生产队列该了,消费者也要同时改: Work Queues工作队列,模型如下: 代码与实例 程序运行截图如下: ...

  5. Java笔记-使用RabbitMQ的Java接口生产数据并消费

    目录 基本概念 代码及演示 基本概念 就是官方的这个模型: p为生产者不经过交换机,直接把数据传给消息队列,c为consumer用于消费. 这种结构在本科生的时候,经常自己写,现在用RabbitMQ来 ...

  6. Java笔记整理五(Iterator接口,泛型,常见数据结构(栈,队列,数组,链表,红黑树,集合),jdk新特性,异常,多线程,Lambda表达式)

    Java笔记整理五 1.1Iterator接口 Collection接口与Map接口主要用于存储元素,而Iterator主要用于迭代访问(即遍历)Collection中的元素,因此Iterator对象 ...

  7. (八)RabbitMQ消息队列-通过Topic主题模式分发消息

    前两章我们讲了RabbitMQ的direct模式和fanout模式,本章介绍topic主题模式的应用.如果对direct模式下通过routingkey来匹配消息的模式已经有一定了解那fanout也很好 ...

  8. 9.1-全栈Java笔记: 容器泛型—认识Collection接口

    开发和学习中需要时刻和数据打交道,如果组织这些数据是我们编程中重要的内容. 我们一般通过"容器"来容纳和管理数据.   事实上,数组就是一种容器,可以在其中放置对象或基本类型数据. ...

  9. C/C++笔记-使用RabbitMQ的C接口生产数据并消费

    目录 基本概念 代码与实例 基本概念 使用C语言接口完成官方的这个模型: 很有意思.感觉开源的东西真的好,不用自己去写C/C++服务端. p为生产者不经过交换机,直接把数据传给消息队列,c为consu ...

最新文章

  1. Linux修改密码是提示“passwd: 鉴定令牌操作错误”问题的处理办法
  2. java restful netty_Java RESTful 框架的性能比较
  3. java的队列_java实现队列
  4. 若依前后端分离版(vue)中配置页面跳转的路由
  5. 可能是目前最详细的Redis内存模型及应用解读
  6. JavaScript基本概念(下)
  7. python 爬取贝壳网小区名称_Python爬虫实战:爬取贝壳网二手房40000条数据
  8. qt添加菜单纯代码_开始玩qt,使用代码修改设计模式生成的菜单
  9. EMS设置发送连接器和接收连接器邮件大小
  10. 单元测试/集成测试/系统测试的区别
  11. [复杂网络博弈] 第二章 演化博弈动力学基础
  12. 替换class文件,重启Tomcat不生效
  13. NIO蔚来ET5/ET7电动汽车维修手册电路图用户手册技术资料
  14. 牛客多校第八场 C CDMA 线性代数:沃尔什矩阵
  15. 安卓APP自动更新功能实现
  16. XSS Overview
  17. 给你一个整数数组 nums 。 如果一组数字 (i,j) 满足 nums[i] == nums[j] 且 i < j ,就可以认为这是一组 好数对 。
  18. 我的言论05-04-06
  19. [转载] 细看名字服务中心
  20. Oracle11G数据库重演测试

热门文章

  1. windows下配置cvs服务端
  2. android开发实例-socket(一)
  3. Linux网络编程:原始套接字的魔力【续】
  4. oracle 10g rman catalog数据库版本问题
  5. 漢城博殺的日子 (一)
  6. 06CRecordView类
  7. 用线程实现动态改变图标
  8. 2020 年 7 个软件开发趋势
  9. ROS服务中存在string类型变量,如何给string类型变量赋值及取值
  10. 哈佛大学单细胞课程|笔记汇总 (二)