一、简介

最近,在看一些消息中间件的内容,之前都没有好好学习一下消息中间件。本文将对RabbitMQ中五种常用的工作模式做一个简单的介绍和总结。RabbitMQ常用的工作模式有:简单队列模式、工作队列模式、发布订阅模式、路由模式、主题模式。本文参照RabbitMQ官网示例总结,详细可以到官网查看:https://www.rabbitmq.com/getstarted.html。

二、简单队列模式(Simple Queue)

【a】模型图:只包含一个生产者以及一个消费者,生产者Producer将消息发送到队列中,消费者Consumer从该队列接收消息。(单生产单消费)

上图中,“P”是我们的生产者,“C”是我们的消费者。

【b】获取MQ连接对象工具类

/*** @Description: 获取RabbitMQ的连接工具类* @Author: weixiaohuai* @Date: 2019/6/22* @Time: 21:29*/
public class MQConnecitonUtils {private static final String RABBITMQ_HOST = "127.0.0.1";private static final Integer RABBITMQ_PORT = 5672;private static final String RABBITMQ_VHOST = "/vhost";private static final String RABBITMQ_USERNAME = "wsh";private static final String RABBITMQ_PASSWORD = "wsh";public static Connection getConnection() {//定义MQ连接对象Connection connection = null;//创建MQ连接工厂对象ConnectionFactory connectionFactory = new ConnectionFactory();// 设置MQ主机名称connectionFactory.setHost(RABBITMQ_HOST);// 设置MQ AMQP端口号connectionFactory.setPort(RABBITMQ_PORT);// 设置MQ 连接的virtual hostconnectionFactory.setVirtualHost(RABBITMQ_VHOST);// 设置MQ 用户名称connectionFactory.setUsername(RABBITMQ_USERNAME);// 设置MQ 用户密码connectionFactory.setPassword(RABBITMQ_PASSWORD);try {connection = connectionFactory.newConnection();} catch (IOException | TimeoutException e) {e.printStackTrace();}//返回连接对象return connection;}}

【c】生产者

/*** @Description: 消息生产者* @Author: weixiaohuai* @Date: 2019/6/22* @Time: 21:37*/
public class CustomProducer {private static final String SIMPLE_QUEUE_NAME = "MQ_SIMPLE_QUEUE";private static final String SIMPLE_QUEUE_MESSAGE = "Hello World!";public static void main(String[] args) {//获取MQ连接Connection connection = MQConnecitonUtils.getConnection();//从连接中获取Channel通道对象Channel channel = null;try {//创建通道channel = connection.createChannel();//创建Queue队列channel.queueDeclare(SIMPLE_QUEUE_NAME, false, false, false, null);//发送消息到队列MQ_SIMPLE_QUEUE//basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)channel.basicPublish("", SIMPLE_QUEUE_NAME, null, SIMPLE_QUEUE_MESSAGE.getBytes(StandardCharsets.UTF_8));} catch (IOException e) {e.printStackTrace();} finally {if (null != channel) {try {channel.close();} catch (IOException | TimeoutException e) {e.printStackTrace();}}if (null != connection) {try {connection.close();} catch (IOException e) {e.printStackTrace();}}}}
}

【d】消费者

/*** @Description: 消息消费者(新API)* @Author: weixiaohuai* @Date: 2019/6/22* @Time: 21:55*/
public class NewCustomConsumer {private static Logger logger = LoggerFactory.getLogger(NewCustomConsumer.class);private static final String SIMPLE_QUEUE_NAME = "MQ_SIMPLE_QUEUE";public static void main(String[] args) {//获取MQ连接对象Connection connection = MQConnecitonUtils.getConnection();Channel channel;try {//创建消息通道对象channel = connection.createChannel();//声明queue队列channel.queueDeclare(SIMPLE_QUEUE_NAME, false, false, false, null);//创建消费者对象DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//消息消费者获取消息String message = new String(body, StandardCharsets.UTF_8);logger.info("receive message: " + message);}};//监听消息队列channel.basicConsume(SIMPLE_QUEUE_NAME, true, consumer);} catch (IOException e) {e.printStackTrace();}}
}

【e】运行结果

三、工作队列模式(Work Queues)

【a】模型图:多个消费者绑定到同一个队列上,一条消息只能被一个消费者进行消费。工作队列有轮训分发和公平分发两种模式。

下面先说说轮训分发(round-robin)方式:

【b】消息生产者:

/*** @Description: 工作队列 - 消息生产者* @Author: weixiaohuai* @Date: 2019/6/23* @Time: 10:25* <p>* 说明:* 消费者1与消费者2处理的消息是均分的,而且消息是轮训分发的(轮训分发 round-robin)*/
public class CustomProducer {private static final String WORK_QUEUE_NAME = "MQ_WORK_QUEUE";private static final String WORK_QUEUE_MESSAGE = "hello world!! ------> ";public static void main(String[] args) {//获取MQ连接Connection connection = MQConnecitonUtils.getConnection();//从连接中获取Channel通道对象Channel channel = null;try {channel = connection.createChannel();//创建Queue队列channel.queueDeclare(WORK_QUEUE_NAME, false, false, false, null);//发送10条消息到工作队列for (int i = 1; i <= 10; i++) {StringBuilder msg = new StringBuilder(WORK_QUEUE_MESSAGE).append(i);//发送消息channel.basicPublish("", WORK_QUEUE_NAME, null, msg.toString().getBytes());}} catch (IOException e) {e.printStackTrace();} finally {if (null != channel) {try {channel.close();} catch (IOException | TimeoutException e) {e.printStackTrace();}}if (null != connection) {try {connection.close();} catch (IOException e) {e.printStackTrace();}}}}
}

【c】消息消费者1:模拟延迟操作2秒

public class CustomConsumer01 {private static Logger logger = LoggerFactory.getLogger(CustomConsumer01.class);private static final String WORK_QUEUE_NAME = "MQ_WORK_QUEUE";public static void main(String[] args) {//获取MQ连接对象Connection connection = MQConnecitonUtils.getConnection();Channel channel = null;try {//创建消息通道对象channel = connection.createChannel();//声明queue队列channel.queueDeclare(WORK_QUEUE_NAME, false, false, false, null);//创建消费者对象DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//消息消费者获取消息String message = new String(body, StandardCharsets.UTF_8);logger.info("【CustomConsumer01】receive message: " + message);try {//模拟延迟Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}}};//监听消息队列channel.basicConsume(WORK_QUEUE_NAME, true, consumer);} catch (IOException e) {e.printStackTrace();}}
}

【d】消息消费者2:模拟延迟操作1秒

public class CustomConsumer02 {private static Logger logger = LoggerFactory.getLogger(CustomConsumer02.class);private static final String WORK_QUEUE_NAME = "MQ_WORK_QUEUE";public static void main(String[] args) {//获取MQ连接对象Connection connection = MQConnecitonUtils.getConnection();Channel channel = null;try {//创建消息通道对象channel = connection.createChannel();//声明queue队列channel.queueDeclare(WORK_QUEUE_NAME, false, false, false, null);//创建消费者对象DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//消息消费者获取消息String message = new String(body, StandardCharsets.UTF_8);logger.info("【CustomConsumer02】receive message: " + message);try {//模拟延迟Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}};//监听消息队列channel.basicConsume(WORK_QUEUE_NAME, true, consumer);} catch (IOException e) {e.printStackTrace();}}
}

【e】运行结果

由上面图可见,消费者1和消费者2处理的消息是均分的(消费的消息条数一样),而且消息是轮训分发的,也就是说同一个消息只能被一个消费者消费。上面的消费者1和消费者2处理消息的效率不同,但是最后接收到的消息还是一样多,如果需要让工作效率高的消费者消费更多的消息,那么可以使用公平分发,下面介绍一下工作队列的公平分发模式(能者多劳)

【a】生产者:

/*** @Description: 工作队列 - 消息生产者 (公平分发方式Fair dispatch)* @Author: weixiaohuai* @Date: 2019/6/23* @Time: 10:25* <p>* 说明:* 1. 生产者、消费者指定:channel.basicQos(1);* 2. 消费者消费完消息自动发送确认消息:channel.basicAck(envelope.getDeliveryTag(), false);* 3. 消费者必须关闭自动应答:autoAck = false;* 4. 一般消费者如果处理消息的时间较短(效率较高),那么它处理的消息会比较多一些;*/
public class CustomProducer {private static final String WORK_QUEUE_NAME = "MQ_WORK_QUEUE";private static final String WORK_QUEUE_MESSAGE = "hello world!! ------> ";public static void main(String[] args) {//获取MQ连接Connection connection = MQConnecitonUtils.getConnection();//从连接中获取Channel通道对象Channel channel = null;try {channel = connection.createChannel();//创建Queue队列channel.queueDeclare(WORK_QUEUE_NAME, false, false, false, null);//每个消费者发送确认消息之前,消息队列不发送下一个消息到消费者(同一时刻服务器只会发送一条消息给消费者),消费者端发送了ack后才会接收下一个消息。channel.basicQos(1);//发送10条消息到工作队列for (int i = 1; i <= 10; i++) {StringBuilder msg = new StringBuilder(WORK_QUEUE_MESSAGE).append(i);//发送消息channel.basicPublish("", WORK_QUEUE_NAME, null, msg.toString().getBytes());}} catch (IOException e) {e.printStackTrace();} finally {if (null != channel) {try {channel.close();} catch (IOException | TimeoutException e) {e.printStackTrace();}}if (null != connection) {try {connection.close();} catch (IOException e) {e.printStackTrace();}}}}
}

【b】消费者1:

public class CustomConsumer01 {private static Logger logger = LoggerFactory.getLogger(CustomConsumer01.class);private static final String WORK_QUEUE_NAME = "MQ_WORK_QUEUE";public static void main(String[] args) {//获取MQ连接对象Connection connection = MQConnecitonUtils.getConnection();try {//创建消息通道对象final Channel channel = connection.createChannel();//声明queue队列channel.queueDeclare(WORK_QUEUE_NAME, false, false, false, null);channel.basicQos(1);//创建消费者对象DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//消息消费者获取消息String message = new String(body, StandardCharsets.UTF_8);logger.info("【CustomConsumer01】receive message: " + message);try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();} finally {//消费完一条消息需要自动发送确认消息给MQchannel.basicAck(envelope.getDeliveryTag(), false);}}};//使用公平分发必须关闭自动应答(autoAck:true自动返回结果,false手动返回)boolean autoAck = false;//监听消息队列channel.basicConsume(WORK_QUEUE_NAME, autoAck, consumer);} catch (IOException e) {e.printStackTrace();}}
}

【c】消费者2:

public class CustomConsumer02 {private static Logger logger = LoggerFactory.getLogger(CustomConsumer02.class);private static final String WORK_QUEUE_NAME = "MQ_WORK_QUEUE";public static void main(String[] args) {//获取MQ连接对象Connection connection = MQConnecitonUtils.getConnection();try {//创建消息通道对象final Channel channel = connection.createChannel();//声明queue队列channel.queueDeclare(WORK_QUEUE_NAME, false, false, false, null);channel.basicQos(1);//创建消费者对象DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//消息消费者获取消息String message = new String(body, StandardCharsets.UTF_8);logger.info("【CustomConsumer02】receive message: " + message);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();} finally {//消费完一条消息需要自动发送确认消息给MQchannel.basicAck(envelope.getDeliveryTag(), false);}}};//使用公平分发必须关闭自动应答boolean autoAck = false;//监听消息队列channel.basicConsume(WORK_QUEUE_NAME, autoAck, consumer);} catch (IOException e) {e.printStackTrace();}}
}

【d】运行结果:

由此可见,消费者2的效率相对较高,所以消费者2消费消息比消费者1多一些,这样就可以充分发挥消费者处理消息的能力。

【e】注意点:

  • 1. 生产者、消费者指定:channel.basicQos(1);
  • 2. 消费者消费完消息自动发送确认消息:channel.basicAck(envelope.getDeliveryTag(), false);
  • 3. 消费者必须关闭自动应答:autoAck = false;
  • 4. 一般消费者如果处理消息的时间较短(效率较高),那么它处理的消息会比较多一些;

四、发布-订阅模式(Publish/Subscribe)

【a】模型图:生产者将消息发送到交换器,然后交换器绑定到多个队列,监听该队列的所有消费者消费消息。

【b】生产者:

/*** @Description: 发布-订阅模式* @Author: weixiaohuai* @Date: 2019/6/23* @Time: 15:20* <p>* 说明:可实现一条消息被多个消费者消费* <p>* a. 一个生产者,多个消费者;* b. 每一个消费者都有自己的消息队列;* c. 生产者没有把消息发送到队列,而是发送到交换器exchange上;* d. 每个队列都需要绑定到交换机上;* e. 生产者生产的消息先经过交换机然后到达队列,一个消息可以被多个消费者消费;*/
public class CustomProducer {private static final String PUBLISH_SUBSCRIBE_EXCHANGE_NAME = "publish_subscribe_exchange_fanout";//类型:分发private static final String PUBLISH_SUBSCRIBE_EXCHANGE_TYPE = "fanout";public static void main(String[] args) {//获取MQ连接Connection connection = MQConnecitonUtils.getConnection();//从连接中获取Channel通道对象Channel channel = null;try {channel = connection.createChannel();//创建交换机对象publish_subscribe_exchange_fanoutchannel.exchangeDeclare(PUBLISH_SUBSCRIBE_EXCHANGE_NAME, PUBLISH_SUBSCRIBE_EXCHANGE_TYPE);//发送消息到交换机exchange上String msg = "hello world!!!";channel.basicPublish(PUBLISH_SUBSCRIBE_EXCHANGE_NAME, "", null, msg.getBytes());} catch (IOException e) {e.printStackTrace();} finally {if (null != channel) {try {channel.close();} catch (IOException | TimeoutException e) {e.printStackTrace();}}if (null != connection) {try {connection.close();} catch (IOException e) {e.printStackTrace();}}}}
}

【c】消费者1:

public class CustomConsumer01 {private static Logger logger = LoggerFactory.getLogger(CustomConsumer.class);private static final String PUBLIC_SUBSCRIBE_QUEUE_NAME = "public_subscribe_queue_name01";private static final String PUBLISH_SUBSCRIBE_EXCHANGE_NAME = "publish_subscribe_exchange_fanout";public static void main(String[] args) {//获取MQ连接对象Connection connection = MQConnecitonUtils.getConnection();try {//创建消息通道对象final Channel channel = connection.createChannel();//创建队列channel.queueDeclare(PUBLIC_SUBSCRIBE_QUEUE_NAME, false, false, false, null);//将队列绑定到交换机上channel.queueBind(PUBLIC_SUBSCRIBE_QUEUE_NAME, PUBLISH_SUBSCRIBE_EXCHANGE_NAME, "");channel.basicQos(1);//创建消费者对象DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//消息消费者获取消息String message = new String(body, StandardCharsets.UTF_8);logger.info("【CustomConsumer01】receive message: " + message);try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();} finally {//消费完一条消息需要自动发送确认消息给MQchannel.basicAck(envelope.getDeliveryTag(), false);}}};//使用公平分发必须关闭自动应答boolean autoAck = false;//监听消息队列channel.basicConsume(PUBLIC_SUBSCRIBE_QUEUE_NAME, autoAck, consumer);} catch (IOException e) {e.printStackTrace();}}}

【d】消费者2:

public class CustomConsumer02 {private static Logger logger = LoggerFactory.getLogger(CustomConsumer.class);private static final String PUBLIC_SUBSCRIBE_QUEUE_NAME = "public_subscribe_queue_name02";private static final String PUBLISH_SUBSCRIBE_EXCHANGE_NAME = "publish_subscribe_exchange_fanout";public static void main(String[] args) {//获取MQ连接对象Connection connection = MQConnecitonUtils.getConnection();try {//创建消息通道对象final Channel channel = connection.createChannel();//创建队列channel.queueDeclare(PUBLIC_SUBSCRIBE_QUEUE_NAME, false, false, false, null);//将队列绑定到交换机上channel.queueBind(PUBLIC_SUBSCRIBE_QUEUE_NAME, PUBLISH_SUBSCRIBE_EXCHANGE_NAME, "");channel.basicQos(1);//创建消费者对象DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//消息消费者获取消息String message = new String(body, StandardCharsets.UTF_8);logger.info("【CustomConsumer02】receive message: " + message);try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();} finally {//消费完一条消息需要自动发送确认消息给MQchannel.basicAck(envelope.getDeliveryTag(), false);}}};//使用公平分发必须关闭自动应答boolean autoAck = false;//监听消息队列channel.basicConsume(PUBLIC_SUBSCRIBE_QUEUE_NAME, autoAck, consumer);} catch (IOException e) {e.printStackTrace();}}}

【e】运行结果

由此可见,一条消息同时被两个消费者同时消费。

【f】交换机绑定信息

【g】注意点:

  • a. 一个生产者,多个消费者;
  • b. 每一个消费者都有自己的消息队列,分别绑定到不同的队列上;
  • c. 生产者没有把消息发送到队列,而是发送到交换器exchange上;
  • d. 每个队列都需要绑定到交换机上;
  • e. 生产者生产的消息先经过交换机然后到达队列,一个消息可以被多个消费者消费;
  • f. 如果消息发送到没有队列绑定的交换器时,消息将会丢失,因为交换器没有存储消息的能力,只有队列才有存储消息的能力;

五、路由模式(Routing)

【a】模型图:生产者将消息发送到direct交换器,它会把消息路由到那些binding key与routing key完全匹配的Queue中,这样就能实现消费者有选择性地去消费消息。

【b】生产者:

/*** @Description: routing路由模式* @Author: weixiaohuai* @Date: 2019/6/23* @Time: 15:20* <p>* 说明:生产者发送消息的时候指定routing key,然后消费者绑定队列的时候也指定一些binding key,只有binding key与routing key一致的消费者才能接收到此消息*/
public class CustomProducer {private static final String EXCHANGE_NAME = "publish_subscribe_exchange_direct";//交换机类型:directprivate static final String EXCHANGE_TYPE = "direct";private static final String EXCHANGE_ROUTE_KEY = "info";public static void main(String[] args) {//获取MQ连接Connection connection = MQConnecitonUtils.getConnection();//从连接中获取Channel通道对象Channel channel = null;try {channel = connection.createChannel();//创建交换机对象channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE);//发送消息到交换机exchange上String msg = "hello world!!!";//指定routing key为infochannel.basicPublish(EXCHANGE_NAME, EXCHANGE_ROUTE_KEY, null, msg.getBytes());} catch (IOException e) {e.printStackTrace();} finally {if (null != channel) {try {channel.close();} catch (IOException | TimeoutException e) {e.printStackTrace();}}if (null != connection) {try {connection.close();} catch (IOException e) {e.printStackTrace();}}}}
}

【c】消费者1:

public class CustomConsumer01 {private static Logger logger = LoggerFactory.getLogger(CustomConsumer01.class);private static final String QUEUE_NAME = "routing_direct_queue_name";private static final String EXCHANGE_NAME = "publish_subscribe_exchange_direct";//binding keyprivate static final String EXCHANGE_ROUTE_KEY = "error";public static void main(String[] args) {//获取MQ连接对象Connection connection = MQConnecitonUtils.getConnection();try {//创建消息通道对象final Channel channel = connection.createChannel();//创建队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);//将队列绑定到交换机上,并且指定routing_keychannel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROUTE_KEY);channel.basicQos(1);//创建消费者对象DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//消息消费者获取消息String message = new String(body, StandardCharsets.UTF_8);logger.info("【CustomConsumer01】receive message: " + message);try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();} finally {//消费完一条消息需要自动发送确认消息给MQchannel.basicAck(envelope.getDeliveryTag(), false);}}};//使用公平分发必须关闭自动应答boolean autoAck = false;//监听消息队列channel.basicConsume(QUEUE_NAME, autoAck, consumer);} catch (IOException e) {e.printStackTrace();}}}

【d】消费者2:

public class CustomConsumer02 {private static Logger logger = LoggerFactory.getLogger(CustomConsumer02.class);private static final String QUEUE_NAME = "routing_direct_queue_name02";private static final String EXCHANGE_NAME = "publish_subscribe_exchange_direct";//binding keyprivate static final String EXCHANGE_ROUTE_KEY01 = "error";private static final String EXCHANGE_ROUTE_KEY02 = "info";private static final String EXCHANGE_ROUTE_KEY03 = "warning";public static void main(String[] args) {//获取MQ连接对象Connection connection = MQConnecitonUtils.getConnection();try {//创建消息通道对象final Channel channel = connection.createChannel();//创建队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);//将队列绑定到交换机上,并且指定routing_keychannel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROUTE_KEY01);channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROUTE_KEY02);channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROUTE_KEY03);channel.basicQos(1);//创建消费者对象DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//消息消费者获取消息String message = new String(body, StandardCharsets.UTF_8);logger.info("【CustomConsumer02】receive message: " + message);try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();} finally {//消费完一条消息需要自动发送确认消息给MQchannel.basicAck(envelope.getDeliveryTag(), false);}}};//使用公平分发必须关闭自动应答boolean autoAck = false;//监听消息队列channel.basicConsume(QUEUE_NAME, autoAck, consumer);} catch (IOException e) {e.printStackTrace();}}}

【e】运行结果:

因为生产者发布消息的时候指定了routing key为info, 消费者绑定队列的时候指定的binding key 为error,显然消费者1接收不到此消息,因为消费者2绑定队列的时候指定了binding key为error、info、warning,所以消费者2能够成功接收该消息进行消费。

【f】交换机绑定信息

六、主题(Topic)模式

【a】模型图:类似于正则表达式匹配的一种模式。主要使用#、*进行匹配。

【b】生产者:

/*** @Description: topic主题模式* @Author: weixiaohuai* @Date: 2019/6/23* @Time: 15:20* <p>** 说明:* #: 代表一个或者多个* *: 代表一个** 举例:* 比如发送消息的时候指定了routing key为news.insert,* 如果消费者指定binding key 为news.* 或者news.#都能接收到该消息;**/
public class CustomProducer {private static final String EXCHANGE_NAME = "exchange_topic";//交换机类型:topic 类似正则匹配模式private static final String EXCHANGE_TYPE = "topic";//指定routing keyprivate static final String EXCHANGE_ROUTE_KEY = "news.insert";public static void main(String[] args) {//获取MQ连接Connection connection = MQConnecitonUtils.getConnection();//从连接中获取Channel通道对象Channel channel = null;try {channel = connection.createChannel();//创建交换机对象channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE);//发送消息到交换机exchange上String msg = "hello world!!!";channel.basicPublish(EXCHANGE_NAME, EXCHANGE_ROUTE_KEY, null, msg.getBytes());} catch (IOException e) {e.printStackTrace();} finally {if (null != channel) {try {channel.close();} catch (IOException | TimeoutException e) {e.printStackTrace();}}if (null != connection) {try {connection.close();} catch (IOException e) {e.printStackTrace();}}}}
}

【c】消费者1:

public class CustomConsumer01 {private static Logger logger = LoggerFactory.getLogger(CustomConsumer01.class);private static final String QUEUE_NAME = "topic_queue_name1";private static final String EXCHANGE_NAME = "exchange_topic";//binding keyprivate static final String EXCHANGE_ROUTE_KEY = "news.insert";public static void main(String[] args) {//获取MQ连接对象Connection connection = MQConnecitonUtils.getConnection();try {//创建消息通道对象final Channel channel = connection.createChannel();//创建队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);//将队列绑定到交换机上,并且指定routing_keychannel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROUTE_KEY);channel.basicQos(1);//创建消费者对象DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//消息消费者获取消息String message = new String(body, StandardCharsets.UTF_8);logger.info("【CustomConsumer01】receive message: " + message);try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();} finally {//消费完一条消息需要自动发送确认消息给MQchannel.basicAck(envelope.getDeliveryTag(), false);}}};//使用公平分发必须关闭自动应答boolean autoAck = false;//监听消息队列channel.basicConsume(QUEUE_NAME, autoAck, consumer);} catch (IOException e) {e.printStackTrace();}}}

【d】消费者2:

public class CustomConsumer02 {private static Logger logger = LoggerFactory.getLogger(CustomConsumer02.class);private static final String QUEUE_NAME = "topic_queue_name2";private static final String EXCHANGE_NAME = "exchange_topic";//binding keyprivate static final String EXCHANGE_ROUTE_KEY = "news.#";public static void main(String[] args) {//获取MQ连接对象Connection connection = MQConnecitonUtils.getConnection();try {//创建消息通道对象final Channel channel = connection.createChannel();//创建队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);//将队列绑定到交换机上,并且指定routing_keychannel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROUTE_KEY);channel.basicQos(1);//创建消费者对象DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//消息消费者获取消息String message = new String(body, StandardCharsets.UTF_8);logger.info("【CustomConsumer02】receive message: " + message);try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();} finally {//消费完一条消息需要自动发送确认消息给MQchannel.basicAck(envelope.getDeliveryTag(), false);}}};//使用公平分发必须关闭自动应答boolean autoAck = false;//监听消息队列channel.basicConsume(QUEUE_NAME, autoAck, consumer);} catch (IOException e) {e.printStackTrace();}}}

【e】运行结果:

生产者发送消息绑定的routing key 为news.insert;消费者1监听的队列和交换器binding key 为news.insert;消费者2监听的队列和交换器bindingkey为news.#,很显然,两个消费者都将接收到该消息。

 

【f】交换机绑定信息

RabbitMQ五种工作模式学习总结相关推荐

  1. RabbitMQ五种工作模式

    RabbitMQ五种工作模式 1.简单队列 一个生产者对应一个消费者!! 2.work 模式 一个生产者对应多个消费者,但是一条消息只能有一个消费者获得消息!!! 轮询分发就是将消息队列中的消息,依次 ...

  2. 基于SpringCloud开发rabbitmq五种工作模式实现

    工作模式 1. RabbitMQ消息模型 2. SpringAMQP 2.1. Basic Queue 简单队列模型 2.1.1.消息发送 2.1.2.消息接收 2.1.3.测试 2.2. WorkQ ...

  3. 快速入门RabbitMQ(详细)第二篇:RabbitMQ五种工作模式的使用及总结

    4. RabbitMQ工作模式 4.1. Work queues工作队列模式 Work Queues 与入门程序的 简单模式 相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息.应用场景 ...

  4. RabbitMQ介绍以及五种工作模式

    早期出现认证系统类似的提供认证服务; 出现了系统间的通信;并发的高需求 每个前端系统与认证系统的通信强耦合 传递消息,获取返回结果的过程,如果出现网络波动,整个传递数据,计算返回结果的流程重走一遍;需 ...

  5. rabbitmq几种工作模式_RabbitMQ的六种工作模式总结

    精品推荐 国内稀缺优秀Java全栈课程-Vue+SpringBoot通讯录系统全新发布! 作者:侧身左睡 https://www.cnblogs.com/xyfer1018/p/11581511.ht ...

  6. RabbitMQ入门(简介、搭建环境、五种工作模式)介绍

    1. RabbitMQ介绍 RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的开源实现.AMQP 的出现其实也是应了广大人民群众的需求,虽然在同步消 ...

  7. 消息队列(四)——rabbitMQ四种工作模式

    RabbitMQ工作模式 模式总结: 1.简单模式helloworld 一个生产者,一个消费者,不需要交换机(使用默认交换机) 2.工作队列模式Work Queue 一个生产者,多个消费者(竞争关系) ...

  8. RabbitMQ 6种工作模式

    2019独角兽企业重金招聘Python工程师标准>>> 1.Work queues 2.Publish/Subscribe 3.Routing 4.Topics 5.Header 6 ...

  9. rabbitmq几种工作模式_RabbitMQ六种队列模式-简单队列模式

    在官网的教程中,描述了如上六类工作队列模式: 简单队列模式:最简单的工作队列,其中一个消息生产者,一个消息消费者,一个队列.也称为点对点模式. 工作模式:一个消息生产者,一个交换器,一个消息队列,多个 ...

  10. RabbitMQ七种工作模式实现测试代码

    所有工作模式依赖都相同 <dependencies><!--RabbitMQ的客户端依赖--><dependency><groupId>com.rabb ...

最新文章

  1. 深入学习QWidget-1
  2. c保留小数点后三位数没有则为0_哪位老师整理的,这么齐全?赶紧存下为孩子期末考试助力...
  3. windows server 2012 FTP 服务器 / 虚拟目录
  4. html标签table的使用,HTML标签之table
  5. Jackson动态处理返回字段
  6. DES和AES加密:指定键的大小对于此算法无效
  7. 蓝桥杯-算法提高-凶手 断案
  8. GlobalSign 团队与世界领先的认证机构(CAs)合作 共同加强网络安全
  9. python通过函数类属性_函数作为类属性的赋值如何成为Python中的一个方法?
  10. Linux与shell编程之一: Linux基础知识总结
  11. oracle添加字段sql_如何用SQL语句添加和修改字段?
  12. [转载]MySQL各类SQL语句的加锁机制
  13. 11年艺术学习“转投”数学,他出版首本TensorFlow中文教材,成为蚂蚁金服技术大军一员...
  14. 十问数据库:问来路,问现在,问未来
  15. 使用 hugegraph-studio 插入电影数据并查询
  16. CSS3实现图片超立体3D效果
  17. 7-24 藏尾诗 (20分)
  18. 西门子smart200能用C语言吗,【项目详解】200SMART+V20在收卷机械上的应用
  19. 请更换备份电池 pos机_POS机的常见问题及处理方法
  20. SQL2008系统账户:Local system/Network service/Local Service 区别

热门文章

  1. 高中计算机网络培训心得体会,高中老师信息技术培训心得体会
  2. 自动驾驶 9-4: 改进的 EKF - 错误状态扩展卡尔曼滤波器 An Improved EKF - The Error State Extended Kalman Filter
  3. 容器技术Docker K8s 11 容器服务Kubernetes版ACK详解-集群查看及管理
  4. swift5的下标Subscripts 花式玩法
  5. 算法:Design Circular Deque(设计一个双端队列)
  6. 轻量级神经网络ShuffleNet
  7. 怎样打开android设备,在app中打开android设备的存储
  8. linux变量接收命令返回值,linux shell自定义函数(定义、返回值、变量作用域)介绍...
  9. 实验板FPGA型号在哪里看_【VE】一文看懂乙烯基树脂发展史!
  10. 【推荐】数据科学中的非数学特征提取方法