文章目录

  • 2_RabbitMQ工作模式
    • 1.Work queues
    • 2.Publish/Subscribe
      • 1.工作模式
      • 2.代码
        • 1.生产者
          • 1.指定消息队列相关消息
          • 2.建立连接&绑定队列
          • 3.发送消息
          • 完整代码:
        • 2.消费者
          • 1.指定消息队列相关消息
          • 2.建立连接&绑定队列
          • 3.实现消费方法&监听消息
          • 完整代码
      • 3.小结
    • 3.Routing
      • 1.工作模式
      • 2.代码
        • 1.生产者
        • 2.消费者
      • 3.小结
    • 4.Topics
      • 1.工作模式
      • 2.代码
        • 1.生产者
        • 2.消费者
      • 3.小结
    • 5.Header&Rpc
      • header模式
        • 1)生产者
        • 2)发送邮件消费者
      • Rpc

2_RabbitMQ工作模式

前置知识:工作模式指的是某条消息该怎么分发到队列?分发策略

1.Work queues

work queues与入门程序相比,多了一个消费端,两个消费端共同消费同一个队列中的消息。

应用场景:对于 任务过重或任务较多情况使用工作队列可以提高任务处理的速度。

测试:

1、使用入门程序,启动多个消费者。

2、生产者发送多个消息。

结果

1、一条消息只会被一个消费者接收;

2、rabbit采用轮询的方式将消息是平均发送给消费者的;

3、消费者在处理完某条消息后,才会收到下一条消息。

2.Publish/Subscribe

1.工作模式

发布订阅模式:

1、每个消费者监听自己的队列。

2、生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息

此交换机的概念类似计算机网络中的交换机,接受包然后分发下去

2.代码

案例:

用户通知,当用户充值成功或转账完成系统通知用户,通知方式有短信、邮件多种方法 。

1.生产者

声明exchange_routing_inform交换机。

声明两个队列并且绑定到此交换机,绑定时需要指定routingkey

发送消息时需要指定routingkey

1.指定消息队列相关消息
//队列名称
private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
private static final String EXCHANGE_FANOUT_INFORM="exchange_fanout_inform";//通过连接工厂创建新的连接和mq建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);//端口connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");//设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mqconnectionFactory.setVirtualHost("/");
2.建立连接&绑定队列
Connection connection = null;
Channel channel = null;//建立新连接
connection = connectionFactory.newConnection();
//创建会话通道,生产者和mq服务所有通信都在channel通道中完成
channel = connection.createChannel();
//声明队列,如果队列在mq 中没有则要创建
//参数:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
/*** 参数明细* 1、queue 队列名称* 2、durable 是否持久化,如果持久化,mq重启后队列还在* 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建* 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)* 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间*/
channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);
channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);
//声明一个交换机
//参数:String exchange, String type
/*** 参数明细:* 1、交换机的名称* 2、交换机的类型* fanout:对应的rabbitmq的工作模式是 publish/subscribe* direct:对应的Routing  工作模式* topic:对应的Topics工作模式* headers: 对应的headers工作模式*/
channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT);
//进行交换机和队列绑定
//参数:String queue, String exchange, String routingKey
/*** 参数明细:* 1、queue 队列名称* 2、exchange 交换机名称* 3、routingKey 路由key,作用是交换机根据路由key的值将消息转发到指定的队列中,在发布订阅模式中调协为空字符串*/
channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_FANOUT_INFORM,"");
channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_FANOUT_INFORM,"");
3.发送消息
//发送消息
//参数:String exchange, String routingKey, BasicProperties props, byte[] body
/*** 参数明细:* 1、exchange,交换机,如果不指定将使用mq的默认交换机(设置为"")* 2、routingKey,路由key,交换机根据路由key来将消息转发到指定的队列,如果使用默认交换机,routingKey设置为队列的名称* 3、props,消息的属性* 4、body,消息内容*/
for(int i=0;i<20;i++){//消息内容String message = "send inform message to user "+(i+1);channel.basicPublish(EXCHANGE_FANOUT_INFORM,"",null,message.getBytes());System.out.println("send to mq "+message);
}
完整代码:
package com.xuecheng.test.rabbitmq;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author Administrator* @version 1.0* @create 2018-06-17 18:10**/
public class Producer02_publish {//队列名称private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";private static final String QUEUE_INFORM_SMS = "queue_inform_sms";private static final String EXCHANGE_FANOUT_INFORM="exchange_fanout_inform";public static void main(String[] args) {//通过连接工厂创建新的连接和mq建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);//端口connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");//设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mqconnectionFactory.setVirtualHost("/");Connection connection = null;Channel channel = null;try {//建立新连接connection = connectionFactory.newConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成channel = connection.createChannel();//声明队列,如果队列在mq 中没有则要创建//参数:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments/*** 参数明细* 1、queue 队列名称* 2、durable 是否持久化,如果持久化,mq重启后队列还在* 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建* 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)* 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间*/channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);//声明一个交换机//参数:String exchange, String type/*** 参数明细:* 1、交换机的名称* 2、交换机的类型* fanout:对应的rabbitmq的工作模式是 publish/subscribe* direct:对应的Routing    工作模式* topic:对应的Topics工作模式* headers: 对应的headers工作模式*/channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT);//进行交换机和队列绑定//参数:String queue, String exchange, String routingKey/*** 参数明细:* 1、queue 队列名称* 2、exchange 交换机名称* 3、routingKey 路由key,作用是交换机根据路由key的值将消息转发到指定的队列中,在发布订阅模式中调协为空字符串*/channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_FANOUT_INFORM,"");channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_FANOUT_INFORM,"");//发送消息//参数:String exchange, String routingKey, BasicProperties props, byte[] body/*** 参数明细:* 1、exchange,交换机,如果不指定将使用mq的默认交换机(设置为"")* 2、routingKey,路由key,交换机根据路由key来将消息转发到指定的队列,如果使用默认交换机,routingKey设置为队列的名称* 3、props,消息的属性* 4、body,消息内容*/for(int i=0;i<20;i++){//消息内容String message = "send inform message to user "+(i+1);channel.basicPublish(EXCHANGE_FANOUT_INFORM,"",null,message.getBytes());System.out.println("send to mq "+message);}} catch (Exception e) {e.printStackTrace();} finally {//关闭连接//先关闭通道try {channel.close();} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}try {connection.close();} catch (IOException e) {e.printStackTrace();}}}
}

2.消费者

1.指定消息队列相关消息
//队列名称
private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
private static final String EXCHANGE_FANOUT_INFORM="exchange_fanout_inform";//通过连接工厂创建新的连接和mq建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);//端口
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq
connectionFactory.setVirtualHost("/");
2.建立连接&绑定队列
Connection connection = null;
Channel channel = null;//建立新连接
connection = connectionFactory.newConnection();
//创建会话通道,生产者和mq服务所有通信都在channel通道中完成
channel = connection.createChannel();
//声明队列,如果队列在mq 中没有则要创建
//参数:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
/*** 参数明细* 1、queue 队列名称* 2、durable 是否持久化,如果持久化,mq重启后队列还在* 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建* 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)* 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间*/
channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);
channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);
//声明一个交换机
//参数:String exchange, String type
/*** 参数明细:* 1、交换机的名称* 2、交换机的类型* fanout:对应的rabbitmq的工作模式是 publish/subscribe* direct:对应的Routing  工作模式* topic:对应的Topics工作模式* headers: 对应的headers工作模式*/
channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT);
//进行交换机和队列绑定
//参数:String queue, String exchange, String routingKey
/*** 参数明细:* 1、queue 队列名称* 2、exchange 交换机名称* 3、routingKey 路由key,作用是交换机根据路由key的值将消息转发到指定的队列中,在发布订阅模式中调协为空字符串*/
channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_FANOUT_INFORM,"");
channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_FANOUT_INFORM,"");
3.实现消费方法&监听消息
//实现消费方法
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){/*** 当接收到消息后此方法将被调用* @param consumerTag  消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume* @param envelope 信封,通过envelope* @param properties 消息属性* @param body 消息内容* @throws IOException*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//交换机String exchange = envelope.getExchange();//消息id,mq在channel中用来标识消息的id,可用于确认消息已接收long deliveryTag = envelope.getDeliveryTag();//消息内容String message= new String(body,"utf-8");System.out.println("receive message:"+message);}
};
//监听队列
//参数:String queue, boolean autoAck, Consumer callback
/*** 参数明细:* 1、queue 队列名称* 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现回复* 3、callback,消费方法,当消费者接收到消息要执行的方法*/
channel.basicConsume(QUEUE_INFORM_EMAIL,true,defaultConsumer);
完整代码
package com.xuecheng.test.rabbitmq;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author Administrator* @version 1.0* @create 2018-06-17 18:22**/
public class Consumer02_subscribe_email {//队列名称private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";private static final String EXCHANGE_FANOUT_INFORM="exchange_fanout_inform";public static void main(String[] args) throws IOException, TimeoutException {//通过连接工厂创建新的连接和mq建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);//端口connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");//设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mqconnectionFactory.setVirtualHost("/");//建立新连接Connection connection = connectionFactory.newConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成Channel channel = connection.createChannel();/*** 参数明细* 1、queue 队列名称* 2、durable 是否持久化,如果持久化,mq重启后队列还在* 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建* 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)* 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间*/channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);//声明一个交换机//参数:String exchange, String type/*** 参数明细:* 1、交换机的名称* 2、交换机的类型* fanout:对应的rabbitmq的工作模式是 publish/subscribe* direct:对应的Routing   工作模式* topic:对应的Topics工作模式* headers: 对应的headers工作模式*/channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT);//进行交换机和队列绑定//参数:String queue, String exchange, String routingKey/*** 参数明细:* 1、queue 队列名称* 2、exchange 交换机名称* 3、routingKey 路由key,作用是交换机根据路由key的值将消息转发到指定的队列中,在发布订阅模式中调协为空字符串*/channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_FANOUT_INFORM, "");//实现消费方法DefaultConsumer defaultConsumer = new DefaultConsumer(channel){/*** 当接收到消息后此方法将被调用* @param consumerTag  消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume* @param envelope 信封,通过envelope* @param properties 消息属性* @param body 消息内容* @throws IOException*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//交换机String exchange = envelope.getExchange();//消息id,mq在channel中用来标识消息的id,可用于确认消息已接收long deliveryTag = envelope.getDeliveryTag();//消息内容String message= new String(body,"utf-8");System.out.println("receive message:"+message);}};//监听队列//参数:String queue, boolean autoAck, Consumer callback/*** 参数明细:* 1、queue 队列名称* 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现回复* 3、callback,消费方法,当消费者接收到消息要执行的方法*/channel.basicConsume(QUEUE_INFORM_EMAIL,true,defaultConsumer);}
}

3.小结

3.Routing

1.工作模式

路由模式

1、每个消费者监听自己的队列,并且设置routingkey。

2、生产者将消息发给交换机,由交换机根据routingkey来转发消息到指定的队列。

2.代码

1.生产者

全部代码:

package com.xuecheng.test.rabbitmq;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author Administrator* @version 1.0* @create 2018-06-17 19:23**/
public class Producer03_routing {//队列名称private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";private static final String QUEUE_INFORM_SMS = "queue_inform_sms";private static final String EXCHANGE_ROUTING_INFORM="exchange_routing_inform";private static final String ROUTINGKEY_EMAIL="inform_email";private static final String ROUTINGKEY_SMS="inform_sms";public static void main(String[] args) {//通过连接工厂创建新的连接和mq建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);//端口connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");//设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mqconnectionFactory.setVirtualHost("/");Connection connection = null;Channel channel = null;try {//建立新连接connection = connectionFactory.newConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成channel = connection.createChannel();//声明队列,如果队列在mq 中没有则要创建//参数:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments/*** 参数明细* 1、queue 队列名称* 2、durable 是否持久化,如果持久化,mq重启后队列还在* 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建* 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)* 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间*/channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);//声明一个交换机//参数:String exchange, String type/*** 参数明细:* 1、交换机的名称* 2、交换机的类型* fanout:对应的rabbitmq的工作模式是 publish/subscribe* direct:对应的Routing   工作模式* topic:对应的Topics工作模式* headers: 对应的headers工作模式*/channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);//进行交换机和队列绑定//参数:String queue, String exchange, String routingKey/*** 参数明细:* 1、queue 队列名称* 2、exchange 交换机名称* 3、routingKey 路由key,作用是交换机根据路由key的值将消息转发到指定的队列中,在发布订阅模式中调协为空字符串*/channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM,ROUTINGKEY_EMAIL);channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM,"inform");channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_ROUTING_INFORM,ROUTINGKEY_SMS);channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_ROUTING_INFORM,"inform");//发送消息//参数:String exchange, String routingKey, BasicProperties props, byte[] body/*** 参数明细:* 1、exchange,交换机,如果不指定将使用mq的默认交换机(设置为"")* 2、routingKey,路由key,交换机根据路由key来将消息转发到指定的队列,如果使用默认交换机,routingKey设置为队列的名称* 3、props,消息的属性* 4、body,消息内容*//* for(int i=0;i<5;i++){//发送消息的时候指定routingKeyString message = "send email inform message to user";channel.basicPublish(EXCHANGE_ROUTING_INFORM,ROUTINGKEY_EMAIL,null,message.getBytes());System.out.println("send to mq "+message);}for(int i=0;i<5;i++){//发送消息的时候指定routingKeyString message = "send sms inform message to user";channel.basicPublish(EXCHANGE_ROUTING_INFORM,ROUTINGKEY_SMS,null,message.getBytes());System.out.println("send to mq "+message);}*/for(int i=0;i<5;i++){//发送消息的时候指定routingKeyString message = "send inform message to user";channel.basicPublish(EXCHANGE_ROUTING_INFORM,"inform",null,message.getBytes());System.out.println("send to mq "+message);}} catch (Exception e) {e.printStackTrace();} finally {//关闭连接//先关闭通道try {channel.close();} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}try {connection.close();} catch (IOException e) {e.printStackTrace();}}}
}

2.消费者

全部代码:

package com.xuecheng.test.rabbitmq;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author Administrator* @version 1.0* @create 2018-06-17 18:22**/
public class Consumer03_routing_email {//队列名称private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";private static final String EXCHANGE_ROUTING_INFORM="exchange_routing_inform";private static final String ROUTINGKEY_EMAIL="inform_email";public static void main(String[] args) throws IOException, TimeoutException {//通过连接工厂创建新的连接和mq建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);//端口connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");//设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mqconnectionFactory.setVirtualHost("/");//建立新连接Connection connection = connectionFactory.newConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成Channel channel = connection.createChannel();/*** 参数明细* 1、queue 队列名称* 2、durable 是否持久化,如果持久化,mq重启后队列还在* 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建* 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)* 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间*/channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);//声明一个交换机//参数:String exchange, String type/*** 参数明细:* 1、交换机的名称* 2、交换机的类型* fanout:对应的rabbitmq的工作模式是 publish/subscribe* direct:对应的Routing   工作模式* topic:对应的Topics工作模式* headers: 对应的headers工作模式*/channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);//进行交换机和队列绑定//参数:String queue, String exchange, String routingKey/*** 参数明细:* 1、queue 队列名称* 2、exchange 交换机名称* 3、routingKey 路由key,作用是交换机根据路由key的值将消息转发到指定的队列中,在发布订阅模式中调协为空字符串*/channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_ROUTING_INFORM,ROUTINGKEY_EMAIL);//实现消费方法DefaultConsumer defaultConsumer = new DefaultConsumer(channel){/*** 当接收到消息后此方法将被调用* @param consumerTag  消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume* @param envelope 信封,通过envelope* @param properties 消息属性* @param body 消息内容* @throws IOException*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//交换机String exchange = envelope.getExchange();//消息id,mq在channel中用来标识消息的id,可用于确认消息已接收long deliveryTag = envelope.getDeliveryTag();//消息内容String message= new String(body,"utf-8");System.out.println("receive message:"+message);}};//监听队列//参数:String queue, boolean autoAck, Consumer callback/*** 参数明细:* 1、queue 队列名称* 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现回复* 3、callback,消费方法,当消费者接收到消息要执行的方法*/channel.basicConsume(QUEUE_INFORM_EMAIL,true,defaultConsumer);}
}

3.小结

4.Topics

1.工作模式

2.代码

1.生产者

声明exchange_routing_inform交换机。

声明两个队列并且绑定到此交换机,绑定时需要指定routingkey

发送消息时需要指定routingkey

package com.xuecheng.test.rabbitmq;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author Administrator* @version 1.0* @create 2018-06-17 19:23**/
public class Producer04_topics {//队列名称private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";private static final String QUEUE_INFORM_SMS = "queue_inform_sms";private static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";private static final String ROUTINGKEY_EMAIL="inform.#.email.#";private static final String ROUTINGKEY_SMS="inform.#.sms.#";public static void main(String[] args) {//通过连接工厂创建新的连接和mq建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);//端口connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");//设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mqconnectionFactory.setVirtualHost("/");Connection connection = null;Channel channel = null;try {//建立新连接connection = connectionFactory.newConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成channel = connection.createChannel();//声明队列,如果队列在mq 中没有则要创建//参数:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments/*** 参数明细* 1、queue 队列名称* 2、durable 是否持久化,如果持久化,mq重启后队列还在* 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建* 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)* 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间*/channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);//声明一个交换机//参数:String exchange, String type/*** 参数明细:* 1、交换机的名称* 2、交换机的类型* fanout:对应的rabbitmq的工作模式是 publish/subscribe* direct:对应的Routing   工作模式* topic:对应的Topics工作模式* headers: 对应的headers工作模式*/channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC);//进行交换机和队列绑定//参数:String queue, String exchange, String routingKey/*** 参数明细:* 1、queue 队列名称* 2、exchange 交换机名称* 3、routingKey 路由key,作用是交换机根据路由key的值将消息转发到指定的队列中,在发布订阅模式中调协为空字符串*/channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_TOPICS_INFORM,ROUTINGKEY_EMAIL);channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_TOPICS_INFORM,ROUTINGKEY_SMS);//发送消息//参数:String exchange, String routingKey, BasicProperties props, byte[] body/*** 参数明细:* 1、exchange,交换机,如果不指定将使用mq的默认交换机(设置为"")* 2、routingKey,路由key,交换机根据路由key来将消息转发到指定的队列,如果使用默认交换机,routingKey设置为队列的名称* 3、props,消息的属性* 4、body,消息内容*/for(int i=0;i<5;i++){//发送消息的时候指定routingKeyString message = "send email inform message to user";channel.basicPublish(EXCHANGE_TOPICS_INFORM,"inform.email",null,message.getBytes());System.out.println("send to mq "+message);}for(int i=0;i<5;i++){//发送消息的时候指定routingKeyString message = "send sms inform message to user";channel.basicPublish(EXCHANGE_TOPICS_INFORM,"inform.sms",null,message.getBytes());System.out.println("send to mq "+message);}for(int i=0;i<5;i++){//发送消息的时候指定routingKeyString message = "send sms and email inform message to user";channel.basicPublish(EXCHANGE_TOPICS_INFORM,"inform.sms.email",null,message.getBytes());System.out.println("send to mq "+message);}} catch (Exception e) {e.printStackTrace();} finally {//关闭连接//先关闭通道try {channel.close();} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}try {connection.close();} catch (IOException e) {e.printStackTrace();}}}
}

2.消费者

package com.xuecheng.test.rabbitmq;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author Administrator* @version 1.0* @create 2018-06-17 18:22**/
public class Consumer04_topics_email {//队列名称private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";private static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";private static final String ROUTINGKEY_EMAIL="inform.#.email.#";public static void main(String[] args) throws IOException, TimeoutException {//通过连接工厂创建新的连接和mq建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);//端口connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");//设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mqconnectionFactory.setVirtualHost("/");//建立新连接Connection connection = connectionFactory.newConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成Channel channel = connection.createChannel();/*** 参数明细* 1、queue 队列名称* 2、durable 是否持久化,如果持久化,mq重启后队列还在* 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建* 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)* 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间*/channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);//声明一个交换机//参数:String exchange, String type/*** 参数明细:* 1、交换机的名称* 2、交换机的类型* fanout:对应的rabbitmq的工作模式是 publish/subscribe* direct:对应的Routing   工作模式* topic:对应的Topics工作模式* headers: 对应的headers工作模式*/channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC);//进行交换机和队列绑定//参数:String queue, String exchange, String routingKey/*** 参数明细:* 1、queue 队列名称* 2、exchange 交换机名称* 3、routingKey 路由key,作用是交换机根据路由key的值将消息转发到指定的队列中,在发布订阅模式中调协为空字符串*/channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_TOPICS_INFORM,ROUTINGKEY_EMAIL);//实现消费方法DefaultConsumer defaultConsumer = new DefaultConsumer(channel){/*** 当接收到消息后此方法将被调用* @param consumerTag  消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume* @param envelope 信封,通过envelope* @param properties 消息属性* @param body 消息内容* @throws IOException*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//交换机String exchange = envelope.getExchange();//消息id,mq在channel中用来标识消息的id,可用于确认消息已接收long deliveryTag = envelope.getDeliveryTag();//消息内容String message= new String(body,"utf-8");System.out.println("receive message:"+message);}};//监听队列//参数:String queue, boolean autoAck, Consumer callback/*** 参数明细:* 1、queue 队列名称* 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现回复* 3、callback,消费方法,当消费者接收到消息要执行的方法*/channel.basicConsume(QUEUE_INFORM_EMAIL,true,defaultConsumer);}
}

3.小结

5.Header&Rpc

header模式

header模式与routing不同的地方在于,header模式取消routingkey,使用header中的 key/value(键值对)匹配

队列。

案例

根据用户的通知设置去通知用户,设置接收Email的用户只接收Email,设置接收sms的用户只接收sms,设置两种

通知类型都接收的则两种通知都有效。

1)生产者

队列与交换机绑定的代码与之前不同,如下:

通知:

2)发送邮件消费者

Rpc

RPC即客户端远程调用服务端的方法 ,使用MQ可以实现RPC的异步调用,基于Direct交换机实现,流程如下:

1、客户端即是生产者就是消费者,向RPC请求队列发送RPC调用消息,同时监听RPC响应队列。

2、服务端监听RPC请求队列的消息,收到消息后执行服务端的方法,得到方法返回的结果

3、服务端将RPC方法 的结果发送到RPC响应队列

4、客户端(RPC调用方)监听RPC响应队列,接收到RPC调用结果。

2_RabbitMQ工作模式_Work queues_Publish/Subscribe_Routing_Topics_HeaderRpc相关推荐

  1. ARM 寄存器 和 工作模式了解

    一. ARM 工作模式 1.   ARM7,ARM9,ARM11,处理器有 7 种工作模式:Cortex-A 多了一个监视模式(Monitor) 2.  用户模式:非特权模式,大部分任务执行在这种模式 ...

  2. esxi ntp服务器地址_NTP的工作原理以及工作模式

    NTP(Network Time Protocol,网络时间协议)是由RFC 1305定义的时间同步协议,用来在分布式时间服务器和客户端之间进行时间同步.NTP基于UDP报文进行传输,使用的UDP端口 ...

  3. oracle主备库sync模式,Oracle 探索DG备库undo工作模式

    模拟备库出现 ORA-01555 分析备库 undo 工作模式 一: 修改主库 备库 undo 表空间 1.在主库创建undo表空间(会自动同步到备库) SYS@prod>create undo ...

  4. vsftpd 的工作模式

    首先,讲下FTP的工作模式,FTP般有2个通道分别为: 控制通道:管理用户登录等,常用端口号:TCP的21:客户端主动与服务器建立连接 数据通道:用户传输数据时所要用到的,如,浏览目录,上传.下载等: ...

  5. 对称加密、工作模式和填充模式

    对称加密密钥长度分析 DES秘钥长度:8个字符 AES秘钥长度:16个字符 DES加密后密文长度是8的整数倍 AES加密后密文长度是16的整数倍 工作模式和填充模式 IOS加密,android没有解密 ...

  6. 初识LVS(二)——LVS的DR工作模式

    LVS的DR模式介绍 LVS在NAT模式下所有的请求和响应报文都需要经过director,尽管LVS工作在内核层不受套接字文件65535的数量限制,但也有可能会成为性能瓶颈(如视频网站),以下介绍LV ...

  7. Apache优化配置——工作模式

    Apache所运行的硬件环境都是对性能影响最大的因素 各个硬件指标中,对性能影响最大的是内存,其次是硬盘的速度 ●Apache的工作模式 1.prefork模式(一个 非线程型的) ⑴.主要工作方式: ...

  8. 解读ADC采样芯片(EV10AQ190A)的工作模式(四通道模式)

    上篇博文讲了EV10AQ190A这种ADC芯片的工作模式:双通道模式 我十分重视这些内容,因为这是我认识硬件工作模式的起点,当然这也只是理论上的内容,实际采样过程中也许会遇到这样那样的问题,那就需要自 ...

  9. RabbitMQ 6种工作模式

    2019独角兽企业重金招聘Python工程师标准>>> 1.Work queues 2.Publish/Subscribe 3.Routing 4.Topics 5.Header 6 ...

最新文章

  1. Gitlab用户权限管理
  2. 用Python解“分段计算居民水费”题
  3. POJ-排序-归并排序与逆序对
  4. epoll实现高并发聊天室
  5. Linux:Access time、 Modify time 、Change time 和 find 命令使用解析
  6. 最简易上手的numpy学习笔记三
  7. [转载] 杜拉拉升职记——43 偷听者
  8. 列表生成式、生成器表达式、模块导入
  9. 解析Disruptor的依赖关系
  10. 基于 CODING 的 Spring Boot 持续集成项目 1
  11. element ui的table组件在鼠标滑动时边框线消失的解决
  12. Linux消息队列讲解
  13. Docker端口映射实现网络访问
  14. 《小岛经济学》读书笔记
  15. 软路由Linux7,CentOS 7 NAT软路由
  16. Android开发笔记(一百八十六)管理SQLite的利器——应用检查器App Inspection
  17. mysql怎么限制输入男女_excel表格中如何限制只输入男女
  18. 推荐10款社群运营必备工具
  19. 集体智慧编程——垃圾邮件过滤器(贝叶斯)-Python实现
  20. vue——axios请求成功却进入catch的原因

热门文章

  1. LeetCode 1859. 将句子排序
  2. LeetCode 794. 有效的井字游戏(分类讨论)
  3. LeetCode 210. 课程表 II(拓扑排序)
  4. LeetCode 817. 链表组件
  5. python scipy库函数solve用法_如何在中使用事件scipy.integrate.solve_ivp
  6. java set hashcode_Java学习笔记_180724_HashSet_hashCode()
  7. mysql set类型 查询,MYSQL SET字段类型怎么查询
  8. i2c通信的详细讲解_【博文连载】SCCB(I2C)初始化时序介绍
  9. 小米10pro第二个摄像头下面_小米10至尊纪念版、小米10 Pro对比评测:至尊版“至尊”在哪里?...
  10. 深度学习可以与大数据分手吗?