RabbitMQ五种工作模式学习总结
一、简介
最近,在看一些消息中间件的内容,之前都没有好好学习一下消息中间件。本文将对RabbitMQ中五种常用的工作模式做一个简单的介绍和总结。RabbitMQ常用的工作模式有:简单队列模式、工作队列模式、发布订阅模式、路由模式、主题模式。本文参照RabbitMQ官网示例总结,详细可以到官网查看:https://www.rabbitmq.com/getstarted.html。
二、简单队列模式(Simple Queue)
【a】模型图:只包含一个生产者以及一个消费者,生产者Producer将消息发送到队列中,消费者Consumer从该队列接收消息。(单生产单消费)
上图中,“P”是我们的生产者,“C”是我们的消费者。
【b】获取MQ连接对象工具类
/*** @Description: 获取RabbitMQ的连接工具类* @Author: weixiaohuai* @Date: 2019/6/22* @Time: 21:29*/
public class MQConnecitonUtils {private static final String RABBITMQ_HOST = "127.0.0.1";private static final Integer RABBITMQ_PORT = 5672;private static final String RABBITMQ_VHOST = "/vhost";private static final String RABBITMQ_USERNAME = "wsh";private static final String RABBITMQ_PASSWORD = "wsh";public static Connection getConnection() {//定义MQ连接对象Connection connection = null;//创建MQ连接工厂对象ConnectionFactory connectionFactory = new ConnectionFactory();// 设置MQ主机名称connectionFactory.setHost(RABBITMQ_HOST);// 设置MQ AMQP端口号connectionFactory.setPort(RABBITMQ_PORT);// 设置MQ 连接的virtual hostconnectionFactory.setVirtualHost(RABBITMQ_VHOST);// 设置MQ 用户名称connectionFactory.setUsername(RABBITMQ_USERNAME);// 设置MQ 用户密码connectionFactory.setPassword(RABBITMQ_PASSWORD);try {connection = connectionFactory.newConnection();} catch (IOException | TimeoutException e) {e.printStackTrace();}//返回连接对象return connection;}}
【c】生产者
/*** @Description: 消息生产者* @Author: weixiaohuai* @Date: 2019/6/22* @Time: 21:37*/
public class CustomProducer {private static final String SIMPLE_QUEUE_NAME = "MQ_SIMPLE_QUEUE";private static final String SIMPLE_QUEUE_MESSAGE = "Hello World!";public static void main(String[] args) {//获取MQ连接Connection connection = MQConnecitonUtils.getConnection();//从连接中获取Channel通道对象Channel channel = null;try {//创建通道channel = connection.createChannel();//创建Queue队列channel.queueDeclare(SIMPLE_QUEUE_NAME, false, false, false, null);//发送消息到队列MQ_SIMPLE_QUEUE//basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)channel.basicPublish("", SIMPLE_QUEUE_NAME, null, SIMPLE_QUEUE_MESSAGE.getBytes(StandardCharsets.UTF_8));} catch (IOException e) {e.printStackTrace();} finally {if (null != channel) {try {channel.close();} catch (IOException | TimeoutException e) {e.printStackTrace();}}if (null != connection) {try {connection.close();} catch (IOException e) {e.printStackTrace();}}}}
}
【d】消费者
/*** @Description: 消息消费者(新API)* @Author: weixiaohuai* @Date: 2019/6/22* @Time: 21:55*/
public class NewCustomConsumer {private static Logger logger = LoggerFactory.getLogger(NewCustomConsumer.class);private static final String SIMPLE_QUEUE_NAME = "MQ_SIMPLE_QUEUE";public static void main(String[] args) {//获取MQ连接对象Connection connection = MQConnecitonUtils.getConnection();Channel channel;try {//创建消息通道对象channel = connection.createChannel();//声明queue队列channel.queueDeclare(SIMPLE_QUEUE_NAME, false, false, false, null);//创建消费者对象DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//消息消费者获取消息String message = new String(body, StandardCharsets.UTF_8);logger.info("receive message: " + message);}};//监听消息队列channel.basicConsume(SIMPLE_QUEUE_NAME, true, consumer);} catch (IOException e) {e.printStackTrace();}}
}
【e】运行结果
三、工作队列模式(Work Queues)
【a】模型图:多个消费者绑定到同一个队列上,一条消息只能被一个消费者进行消费。工作队列有轮训分发和公平分发两种模式。
下面先说说轮训分发(round-robin)方式:
【b】消息生产者:
/*** @Description: 工作队列 - 消息生产者* @Author: weixiaohuai* @Date: 2019/6/23* @Time: 10:25* <p>* 说明:* 消费者1与消费者2处理的消息是均分的,而且消息是轮训分发的(轮训分发 round-robin)*/
public class CustomProducer {private static final String WORK_QUEUE_NAME = "MQ_WORK_QUEUE";private static final String WORK_QUEUE_MESSAGE = "hello world!! ------> ";public static void main(String[] args) {//获取MQ连接Connection connection = MQConnecitonUtils.getConnection();//从连接中获取Channel通道对象Channel channel = null;try {channel = connection.createChannel();//创建Queue队列channel.queueDeclare(WORK_QUEUE_NAME, false, false, false, null);//发送10条消息到工作队列for (int i = 1; i <= 10; i++) {StringBuilder msg = new StringBuilder(WORK_QUEUE_MESSAGE).append(i);//发送消息channel.basicPublish("", WORK_QUEUE_NAME, null, msg.toString().getBytes());}} catch (IOException e) {e.printStackTrace();} finally {if (null != channel) {try {channel.close();} catch (IOException | TimeoutException e) {e.printStackTrace();}}if (null != connection) {try {connection.close();} catch (IOException e) {e.printStackTrace();}}}}
}
【c】消息消费者1:模拟延迟操作2秒
public class CustomConsumer01 {private static Logger logger = LoggerFactory.getLogger(CustomConsumer01.class);private static final String WORK_QUEUE_NAME = "MQ_WORK_QUEUE";public static void main(String[] args) {//获取MQ连接对象Connection connection = MQConnecitonUtils.getConnection();Channel channel = null;try {//创建消息通道对象channel = connection.createChannel();//声明queue队列channel.queueDeclare(WORK_QUEUE_NAME, false, false, false, null);//创建消费者对象DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//消息消费者获取消息String message = new String(body, StandardCharsets.UTF_8);logger.info("【CustomConsumer01】receive message: " + message);try {//模拟延迟Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}}};//监听消息队列channel.basicConsume(WORK_QUEUE_NAME, true, consumer);} catch (IOException e) {e.printStackTrace();}}
}
【d】消息消费者2:模拟延迟操作1秒
public class CustomConsumer02 {private static Logger logger = LoggerFactory.getLogger(CustomConsumer02.class);private static final String WORK_QUEUE_NAME = "MQ_WORK_QUEUE";public static void main(String[] args) {//获取MQ连接对象Connection connection = MQConnecitonUtils.getConnection();Channel channel = null;try {//创建消息通道对象channel = connection.createChannel();//声明queue队列channel.queueDeclare(WORK_QUEUE_NAME, false, false, false, null);//创建消费者对象DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//消息消费者获取消息String message = new String(body, StandardCharsets.UTF_8);logger.info("【CustomConsumer02】receive message: " + message);try {//模拟延迟Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}};//监听消息队列channel.basicConsume(WORK_QUEUE_NAME, true, consumer);} catch (IOException e) {e.printStackTrace();}}
}
【e】运行结果
由上面图可见,消费者1和消费者2处理的消息是均分的(消费的消息条数一样),而且消息是轮训分发的,也就是说同一个消息只能被一个消费者消费。上面的消费者1和消费者2处理消息的效率不同,但是最后接收到的消息还是一样多,如果需要让工作效率高的消费者消费更多的消息,那么可以使用公平分发,下面介绍一下工作队列的公平分发模式(能者多劳)。
【a】生产者:
/*** @Description: 工作队列 - 消息生产者 (公平分发方式Fair dispatch)* @Author: weixiaohuai* @Date: 2019/6/23* @Time: 10:25* <p>* 说明:* 1. 生产者、消费者指定:channel.basicQos(1);* 2. 消费者消费完消息自动发送确认消息:channel.basicAck(envelope.getDeliveryTag(), false);* 3. 消费者必须关闭自动应答:autoAck = false;* 4. 一般消费者如果处理消息的时间较短(效率较高),那么它处理的消息会比较多一些;*/
public class CustomProducer {private static final String WORK_QUEUE_NAME = "MQ_WORK_QUEUE";private static final String WORK_QUEUE_MESSAGE = "hello world!! ------> ";public static void main(String[] args) {//获取MQ连接Connection connection = MQConnecitonUtils.getConnection();//从连接中获取Channel通道对象Channel channel = null;try {channel = connection.createChannel();//创建Queue队列channel.queueDeclare(WORK_QUEUE_NAME, false, false, false, null);//每个消费者发送确认消息之前,消息队列不发送下一个消息到消费者(同一时刻服务器只会发送一条消息给消费者),消费者端发送了ack后才会接收下一个消息。channel.basicQos(1);//发送10条消息到工作队列for (int i = 1; i <= 10; i++) {StringBuilder msg = new StringBuilder(WORK_QUEUE_MESSAGE).append(i);//发送消息channel.basicPublish("", WORK_QUEUE_NAME, null, msg.toString().getBytes());}} catch (IOException e) {e.printStackTrace();} finally {if (null != channel) {try {channel.close();} catch (IOException | TimeoutException e) {e.printStackTrace();}}if (null != connection) {try {connection.close();} catch (IOException e) {e.printStackTrace();}}}}
}
【b】消费者1:
public class CustomConsumer01 {private static Logger logger = LoggerFactory.getLogger(CustomConsumer01.class);private static final String WORK_QUEUE_NAME = "MQ_WORK_QUEUE";public static void main(String[] args) {//获取MQ连接对象Connection connection = MQConnecitonUtils.getConnection();try {//创建消息通道对象final Channel channel = connection.createChannel();//声明queue队列channel.queueDeclare(WORK_QUEUE_NAME, false, false, false, null);channel.basicQos(1);//创建消费者对象DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//消息消费者获取消息String message = new String(body, StandardCharsets.UTF_8);logger.info("【CustomConsumer01】receive message: " + message);try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();} finally {//消费完一条消息需要自动发送确认消息给MQchannel.basicAck(envelope.getDeliveryTag(), false);}}};//使用公平分发必须关闭自动应答(autoAck:true自动返回结果,false手动返回)boolean autoAck = false;//监听消息队列channel.basicConsume(WORK_QUEUE_NAME, autoAck, consumer);} catch (IOException e) {e.printStackTrace();}}
}
【c】消费者2:
public class CustomConsumer02 {private static Logger logger = LoggerFactory.getLogger(CustomConsumer02.class);private static final String WORK_QUEUE_NAME = "MQ_WORK_QUEUE";public static void main(String[] args) {//获取MQ连接对象Connection connection = MQConnecitonUtils.getConnection();try {//创建消息通道对象final Channel channel = connection.createChannel();//声明queue队列channel.queueDeclare(WORK_QUEUE_NAME, false, false, false, null);channel.basicQos(1);//创建消费者对象DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//消息消费者获取消息String message = new String(body, StandardCharsets.UTF_8);logger.info("【CustomConsumer02】receive message: " + message);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();} finally {//消费完一条消息需要自动发送确认消息给MQchannel.basicAck(envelope.getDeliveryTag(), false);}}};//使用公平分发必须关闭自动应答boolean autoAck = false;//监听消息队列channel.basicConsume(WORK_QUEUE_NAME, autoAck, consumer);} catch (IOException e) {e.printStackTrace();}}
}
【d】运行结果:
由此可见,消费者2的效率相对较高,所以消费者2消费消息比消费者1多一些,这样就可以充分发挥消费者处理消息的能力。
【e】注意点:
- 1. 生产者、消费者指定:channel.basicQos(1);
- 2. 消费者消费完消息自动发送确认消息:channel.basicAck(envelope.getDeliveryTag(), false);
- 3. 消费者必须关闭自动应答:autoAck = false;
- 4. 一般消费者如果处理消息的时间较短(效率较高),那么它处理的消息会比较多一些;
四、发布-订阅模式(Publish/Subscribe)
【a】模型图:生产者将消息发送到交换器,然后交换器绑定到多个队列,监听该队列的所有消费者消费消息。
【b】生产者:
/*** @Description: 发布-订阅模式* @Author: weixiaohuai* @Date: 2019/6/23* @Time: 15:20* <p>* 说明:可实现一条消息被多个消费者消费* <p>* a. 一个生产者,多个消费者;* b. 每一个消费者都有自己的消息队列;* c. 生产者没有把消息发送到队列,而是发送到交换器exchange上;* d. 每个队列都需要绑定到交换机上;* e. 生产者生产的消息先经过交换机然后到达队列,一个消息可以被多个消费者消费;*/
public class CustomProducer {private static final String PUBLISH_SUBSCRIBE_EXCHANGE_NAME = "publish_subscribe_exchange_fanout";//类型:分发private static final String PUBLISH_SUBSCRIBE_EXCHANGE_TYPE = "fanout";public static void main(String[] args) {//获取MQ连接Connection connection = MQConnecitonUtils.getConnection();//从连接中获取Channel通道对象Channel channel = null;try {channel = connection.createChannel();//创建交换机对象publish_subscribe_exchange_fanoutchannel.exchangeDeclare(PUBLISH_SUBSCRIBE_EXCHANGE_NAME, PUBLISH_SUBSCRIBE_EXCHANGE_TYPE);//发送消息到交换机exchange上String msg = "hello world!!!";channel.basicPublish(PUBLISH_SUBSCRIBE_EXCHANGE_NAME, "", null, msg.getBytes());} catch (IOException e) {e.printStackTrace();} finally {if (null != channel) {try {channel.close();} catch (IOException | TimeoutException e) {e.printStackTrace();}}if (null != connection) {try {connection.close();} catch (IOException e) {e.printStackTrace();}}}}
}
【c】消费者1:
public class CustomConsumer01 {private static Logger logger = LoggerFactory.getLogger(CustomConsumer.class);private static final String PUBLIC_SUBSCRIBE_QUEUE_NAME = "public_subscribe_queue_name01";private static final String PUBLISH_SUBSCRIBE_EXCHANGE_NAME = "publish_subscribe_exchange_fanout";public static void main(String[] args) {//获取MQ连接对象Connection connection = MQConnecitonUtils.getConnection();try {//创建消息通道对象final Channel channel = connection.createChannel();//创建队列channel.queueDeclare(PUBLIC_SUBSCRIBE_QUEUE_NAME, false, false, false, null);//将队列绑定到交换机上channel.queueBind(PUBLIC_SUBSCRIBE_QUEUE_NAME, PUBLISH_SUBSCRIBE_EXCHANGE_NAME, "");channel.basicQos(1);//创建消费者对象DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//消息消费者获取消息String message = new String(body, StandardCharsets.UTF_8);logger.info("【CustomConsumer01】receive message: " + message);try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();} finally {//消费完一条消息需要自动发送确认消息给MQchannel.basicAck(envelope.getDeliveryTag(), false);}}};//使用公平分发必须关闭自动应答boolean autoAck = false;//监听消息队列channel.basicConsume(PUBLIC_SUBSCRIBE_QUEUE_NAME, autoAck, consumer);} catch (IOException e) {e.printStackTrace();}}}
【d】消费者2:
public class CustomConsumer02 {private static Logger logger = LoggerFactory.getLogger(CustomConsumer.class);private static final String PUBLIC_SUBSCRIBE_QUEUE_NAME = "public_subscribe_queue_name02";private static final String PUBLISH_SUBSCRIBE_EXCHANGE_NAME = "publish_subscribe_exchange_fanout";public static void main(String[] args) {//获取MQ连接对象Connection connection = MQConnecitonUtils.getConnection();try {//创建消息通道对象final Channel channel = connection.createChannel();//创建队列channel.queueDeclare(PUBLIC_SUBSCRIBE_QUEUE_NAME, false, false, false, null);//将队列绑定到交换机上channel.queueBind(PUBLIC_SUBSCRIBE_QUEUE_NAME, PUBLISH_SUBSCRIBE_EXCHANGE_NAME, "");channel.basicQos(1);//创建消费者对象DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//消息消费者获取消息String message = new String(body, StandardCharsets.UTF_8);logger.info("【CustomConsumer02】receive message: " + message);try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();} finally {//消费完一条消息需要自动发送确认消息给MQchannel.basicAck(envelope.getDeliveryTag(), false);}}};//使用公平分发必须关闭自动应答boolean autoAck = false;//监听消息队列channel.basicConsume(PUBLIC_SUBSCRIBE_QUEUE_NAME, autoAck, consumer);} catch (IOException e) {e.printStackTrace();}}}
【e】运行结果
由此可见,一条消息同时被两个消费者同时消费。
【f】交换机绑定信息
【g】注意点:
- a. 一个生产者,多个消费者;
- b. 每一个消费者都有自己的消息队列,分别绑定到不同的队列上;
- c. 生产者没有把消息发送到队列,而是发送到交换器exchange上;
- d. 每个队列都需要绑定到交换机上;
- e. 生产者生产的消息先经过交换机然后到达队列,一个消息可以被多个消费者消费;
- f. 如果消息发送到没有队列绑定的交换器时,消息将会丢失,因为交换器没有存储消息的能力,只有队列才有存储消息的能力;
五、路由模式(Routing)
【a】模型图:生产者将消息发送到direct交换器,它会把消息路由到那些binding key与routing key完全匹配的Queue中,这样就能实现消费者有选择性地去消费消息。
【b】生产者:
/*** @Description: routing路由模式* @Author: weixiaohuai* @Date: 2019/6/23* @Time: 15:20* <p>* 说明:生产者发送消息的时候指定routing key,然后消费者绑定队列的时候也指定一些binding key,只有binding key与routing key一致的消费者才能接收到此消息*/
public class CustomProducer {private static final String EXCHANGE_NAME = "publish_subscribe_exchange_direct";//交换机类型:directprivate static final String EXCHANGE_TYPE = "direct";private static final String EXCHANGE_ROUTE_KEY = "info";public static void main(String[] args) {//获取MQ连接Connection connection = MQConnecitonUtils.getConnection();//从连接中获取Channel通道对象Channel channel = null;try {channel = connection.createChannel();//创建交换机对象channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE);//发送消息到交换机exchange上String msg = "hello world!!!";//指定routing key为infochannel.basicPublish(EXCHANGE_NAME, EXCHANGE_ROUTE_KEY, null, msg.getBytes());} catch (IOException e) {e.printStackTrace();} finally {if (null != channel) {try {channel.close();} catch (IOException | TimeoutException e) {e.printStackTrace();}}if (null != connection) {try {connection.close();} catch (IOException e) {e.printStackTrace();}}}}
}
【c】消费者1:
public class CustomConsumer01 {private static Logger logger = LoggerFactory.getLogger(CustomConsumer01.class);private static final String QUEUE_NAME = "routing_direct_queue_name";private static final String EXCHANGE_NAME = "publish_subscribe_exchange_direct";//binding keyprivate static final String EXCHANGE_ROUTE_KEY = "error";public static void main(String[] args) {//获取MQ连接对象Connection connection = MQConnecitonUtils.getConnection();try {//创建消息通道对象final Channel channel = connection.createChannel();//创建队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);//将队列绑定到交换机上,并且指定routing_keychannel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROUTE_KEY);channel.basicQos(1);//创建消费者对象DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//消息消费者获取消息String message = new String(body, StandardCharsets.UTF_8);logger.info("【CustomConsumer01】receive message: " + message);try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();} finally {//消费完一条消息需要自动发送确认消息给MQchannel.basicAck(envelope.getDeliveryTag(), false);}}};//使用公平分发必须关闭自动应答boolean autoAck = false;//监听消息队列channel.basicConsume(QUEUE_NAME, autoAck, consumer);} catch (IOException e) {e.printStackTrace();}}}
【d】消费者2:
public class CustomConsumer02 {private static Logger logger = LoggerFactory.getLogger(CustomConsumer02.class);private static final String QUEUE_NAME = "routing_direct_queue_name02";private static final String EXCHANGE_NAME = "publish_subscribe_exchange_direct";//binding keyprivate static final String EXCHANGE_ROUTE_KEY01 = "error";private static final String EXCHANGE_ROUTE_KEY02 = "info";private static final String EXCHANGE_ROUTE_KEY03 = "warning";public static void main(String[] args) {//获取MQ连接对象Connection connection = MQConnecitonUtils.getConnection();try {//创建消息通道对象final Channel channel = connection.createChannel();//创建队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);//将队列绑定到交换机上,并且指定routing_keychannel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROUTE_KEY01);channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROUTE_KEY02);channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROUTE_KEY03);channel.basicQos(1);//创建消费者对象DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//消息消费者获取消息String message = new String(body, StandardCharsets.UTF_8);logger.info("【CustomConsumer02】receive message: " + message);try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();} finally {//消费完一条消息需要自动发送确认消息给MQchannel.basicAck(envelope.getDeliveryTag(), false);}}};//使用公平分发必须关闭自动应答boolean autoAck = false;//监听消息队列channel.basicConsume(QUEUE_NAME, autoAck, consumer);} catch (IOException e) {e.printStackTrace();}}}
【e】运行结果:
因为生产者发布消息的时候指定了routing key为info, 消费者绑定队列的时候指定的binding key 为error,显然消费者1接收不到此消息,因为消费者2绑定队列的时候指定了binding key为error、info、warning,所以消费者2能够成功接收该消息进行消费。
【f】交换机绑定信息
六、主题(Topic)模式
【a】模型图:类似于正则表达式匹配的一种模式。主要使用#、*进行匹配。
【b】生产者:
/*** @Description: topic主题模式* @Author: weixiaohuai* @Date: 2019/6/23* @Time: 15:20* <p>** 说明:* #: 代表一个或者多个* *: 代表一个** 举例:* 比如发送消息的时候指定了routing key为news.insert,* 如果消费者指定binding key 为news.* 或者news.#都能接收到该消息;**/
public class CustomProducer {private static final String EXCHANGE_NAME = "exchange_topic";//交换机类型:topic 类似正则匹配模式private static final String EXCHANGE_TYPE = "topic";//指定routing keyprivate static final String EXCHANGE_ROUTE_KEY = "news.insert";public static void main(String[] args) {//获取MQ连接Connection connection = MQConnecitonUtils.getConnection();//从连接中获取Channel通道对象Channel channel = null;try {channel = connection.createChannel();//创建交换机对象channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE);//发送消息到交换机exchange上String msg = "hello world!!!";channel.basicPublish(EXCHANGE_NAME, EXCHANGE_ROUTE_KEY, null, msg.getBytes());} catch (IOException e) {e.printStackTrace();} finally {if (null != channel) {try {channel.close();} catch (IOException | TimeoutException e) {e.printStackTrace();}}if (null != connection) {try {connection.close();} catch (IOException e) {e.printStackTrace();}}}}
}
【c】消费者1:
public class CustomConsumer01 {private static Logger logger = LoggerFactory.getLogger(CustomConsumer01.class);private static final String QUEUE_NAME = "topic_queue_name1";private static final String EXCHANGE_NAME = "exchange_topic";//binding keyprivate static final String EXCHANGE_ROUTE_KEY = "news.insert";public static void main(String[] args) {//获取MQ连接对象Connection connection = MQConnecitonUtils.getConnection();try {//创建消息通道对象final Channel channel = connection.createChannel();//创建队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);//将队列绑定到交换机上,并且指定routing_keychannel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROUTE_KEY);channel.basicQos(1);//创建消费者对象DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//消息消费者获取消息String message = new String(body, StandardCharsets.UTF_8);logger.info("【CustomConsumer01】receive message: " + message);try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();} finally {//消费完一条消息需要自动发送确认消息给MQchannel.basicAck(envelope.getDeliveryTag(), false);}}};//使用公平分发必须关闭自动应答boolean autoAck = false;//监听消息队列channel.basicConsume(QUEUE_NAME, autoAck, consumer);} catch (IOException e) {e.printStackTrace();}}}
【d】消费者2:
public class CustomConsumer02 {private static Logger logger = LoggerFactory.getLogger(CustomConsumer02.class);private static final String QUEUE_NAME = "topic_queue_name2";private static final String EXCHANGE_NAME = "exchange_topic";//binding keyprivate static final String EXCHANGE_ROUTE_KEY = "news.#";public static void main(String[] args) {//获取MQ连接对象Connection connection = MQConnecitonUtils.getConnection();try {//创建消息通道对象final Channel channel = connection.createChannel();//创建队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);//将队列绑定到交换机上,并且指定routing_keychannel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROUTE_KEY);channel.basicQos(1);//创建消费者对象DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//消息消费者获取消息String message = new String(body, StandardCharsets.UTF_8);logger.info("【CustomConsumer02】receive message: " + message);try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();} finally {//消费完一条消息需要自动发送确认消息给MQchannel.basicAck(envelope.getDeliveryTag(), false);}}};//使用公平分发必须关闭自动应答boolean autoAck = false;//监听消息队列channel.basicConsume(QUEUE_NAME, autoAck, consumer);} catch (IOException e) {e.printStackTrace();}}}
【e】运行结果:
生产者发送消息绑定的routing key 为news.insert;消费者1监听的队列和交换器binding key 为news.insert;消费者2监听的队列和交换器bindingkey为news.#,很显然,两个消费者都将接收到该消息。
【f】交换机绑定信息
RabbitMQ五种工作模式学习总结相关推荐
- RabbitMQ五种工作模式
RabbitMQ五种工作模式 1.简单队列 一个生产者对应一个消费者!! 2.work 模式 一个生产者对应多个消费者,但是一条消息只能有一个消费者获得消息!!! 轮询分发就是将消息队列中的消息,依次 ...
- 基于SpringCloud开发rabbitmq五种工作模式实现
工作模式 1. RabbitMQ消息模型 2. SpringAMQP 2.1. Basic Queue 简单队列模型 2.1.1.消息发送 2.1.2.消息接收 2.1.3.测试 2.2. WorkQ ...
- 快速入门RabbitMQ(详细)第二篇:RabbitMQ五种工作模式的使用及总结
4. RabbitMQ工作模式 4.1. Work queues工作队列模式 Work Queues 与入门程序的 简单模式 相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息.应用场景 ...
- RabbitMQ介绍以及五种工作模式
早期出现认证系统类似的提供认证服务; 出现了系统间的通信;并发的高需求 每个前端系统与认证系统的通信强耦合 传递消息,获取返回结果的过程,如果出现网络波动,整个传递数据,计算返回结果的流程重走一遍;需 ...
- rabbitmq几种工作模式_RabbitMQ的六种工作模式总结
精品推荐 国内稀缺优秀Java全栈课程-Vue+SpringBoot通讯录系统全新发布! 作者:侧身左睡 https://www.cnblogs.com/xyfer1018/p/11581511.ht ...
- RabbitMQ入门(简介、搭建环境、五种工作模式)介绍
1. RabbitMQ介绍 RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的开源实现.AMQP 的出现其实也是应了广大人民群众的需求,虽然在同步消 ...
- 消息队列(四)——rabbitMQ四种工作模式
RabbitMQ工作模式 模式总结: 1.简单模式helloworld 一个生产者,一个消费者,不需要交换机(使用默认交换机) 2.工作队列模式Work Queue 一个生产者,多个消费者(竞争关系) ...
- RabbitMQ 6种工作模式
2019独角兽企业重金招聘Python工程师标准>>> 1.Work queues 2.Publish/Subscribe 3.Routing 4.Topics 5.Header 6 ...
- rabbitmq几种工作模式_RabbitMQ六种队列模式-简单队列模式
在官网的教程中,描述了如上六类工作队列模式: 简单队列模式:最简单的工作队列,其中一个消息生产者,一个消息消费者,一个队列.也称为点对点模式. 工作模式:一个消息生产者,一个交换器,一个消息队列,多个 ...
- RabbitMQ七种工作模式实现测试代码
所有工作模式依赖都相同 <dependencies><!--RabbitMQ的客户端依赖--><dependency><groupId>com.rabb ...
最新文章
- 深入学习QWidget-1
- c保留小数点后三位数没有则为0_哪位老师整理的,这么齐全?赶紧存下为孩子期末考试助力...
- windows server 2012 FTP 服务器 / 虚拟目录
- html标签table的使用,HTML标签之table
- Jackson动态处理返回字段
- DES和AES加密:指定键的大小对于此算法无效
- 蓝桥杯-算法提高-凶手 断案
- GlobalSign 团队与世界领先的认证机构(CAs)合作 共同加强网络安全
- python通过函数类属性_函数作为类属性的赋值如何成为Python中的一个方法?
- Linux与shell编程之一: Linux基础知识总结
- oracle添加字段sql_如何用SQL语句添加和修改字段?
- [转载]MySQL各类SQL语句的加锁机制
- 11年艺术学习“转投”数学,他出版首本TensorFlow中文教材,成为蚂蚁金服技术大军一员...
- 十问数据库:问来路,问现在,问未来
- 使用 hugegraph-studio 插入电影数据并查询
- CSS3实现图片超立体3D效果
- 7-24 藏尾诗 (20分)
- 西门子smart200能用C语言吗,【项目详解】200SMART+V20在收卷机械上的应用
- 请更换备份电池 pos机_POS机的常见问题及处理方法
- SQL2008系统账户:Local system/Network service/Local Service 区别
热门文章
- 高中计算机网络培训心得体会,高中老师信息技术培训心得体会
- 自动驾驶 9-4: 改进的 EKF - 错误状态扩展卡尔曼滤波器 An Improved EKF - The Error State Extended Kalman Filter
- 容器技术Docker K8s 11 容器服务Kubernetes版ACK详解-集群查看及管理
- swift5的下标Subscripts 花式玩法
- 算法:Design Circular Deque(设计一个双端队列)
- 轻量级神经网络ShuffleNet
- 怎样打开android设备,在app中打开android设备的存储
- linux变量接收命令返回值,linux shell自定义函数(定义、返回值、变量作用域)介绍...
- 实验板FPGA型号在哪里看_【VE】一文看懂乙烯基树脂发展史!
- 【推荐】数据科学中的非数学特征提取方法