1.RabbitMQ的消息发送和接受机制

所有 MQ 产品从模型抽象上来说都是一样的过程:

消费者(consumer)订阅某个队列。生产者(producer)创建消息,然后发布到队列(queue)中,最后将消息发送到监听的消费者。

上面是MQ的基本抽象模型,但是不同的MQ产品有有者不同的机制,RabbitMQ实际基于AMQP协议的一个开源实现,因此RabbitMQ内部也是AMQP的基本概念。

RabbitMQ的内部接收如下:

1、Message
消息,消息是不具体的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。
2、Publisher
消息的生产者,也是一个向交换器发布消息的客户端应用程序。
3、Exchange
交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
4、Binding
绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。
5、Queue
消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
6、Connection
网络连接,比如一个TCP连接。
7、Channel
信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。
8、Consumer
消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。
9、Virtual Host
虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。
10、Broker
表示消息队列服务器实体。

2.AMQP中的消息路由

AMQP 中消息的路由过程和 Java 开发者熟悉的 JMS 存在一些差别,AMQP 中增加了 Exchange 和 Binding 的角色。生产者把消息发布到 Exchange 上,消息最终到达队列并被消费者接收,而 Binding 决定交换器的消息应该发送到那个队列

3.Exchange类型

Exchange分发消息时根据类型的不同分发策略有区别,目前共四种类型:direct、fanout、topic、headers 。headers 匹配 AMQP 消息的 header 而不是路由键,此外 headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了,所以直接看另外三种类型

1、direct

消息中的路由键(routing key)如果和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。路由键与队列名完全匹配,如果一个队列绑定到交换机要求路由键为“dog”,则只转发 routing key 标记为“dog”的消息,不会转发“dog.puppy”,也不会转发“dog.guard”等等。它是完全匹配、单播的模式。

2、fanout
每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。fanout 类型转发消息是最快的。

3、topic
topic 交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。它同样也会识别两个通配符:符号“#”和符号“”。#匹配0个或多个单词,“”匹配不多不少一个单词。

4.Java发送和接收Queue的消息

4.1创建Maven工程01-rabbitmq-send-java添加Maven依赖

<dependencies>
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.1.1</version>
</dependency>
</dependencies>

4.2 编写消息发送类
在01-rabbitmq-send-java项目中创建,com.xxxx.rabbitmq.queue.Send类

public class Send{public static void main(String[] args) throws IOException, TimeoutException {//创建链接工厂对象
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("192.168.222.128");//设置RabbitMQ的主机IP
factory.setPort(5672);//设置RabbitMQ的端口号
factory.setUsername("root");//设置访问用户名
factory.setPassword("root");//设置访问密码
Connection connection=null;//定义链接对象
Channel channel=null;//定义通道对象
connection=factory.newConnection();//实例化链接对象
channel=connection.createChannel();//实例化通道对象
String message ="Hello World!3";
//创建队列 ,名字为myQueue
channel.queueDeclare("myQueue", true, false, false, null);
//发送消息到指定队列
channel.basicPublish("","myQueue",null,message.getBytes("UTF-8"));
System.out.println("消息发送成功: "+message);
channel.close();
connection.close();}
}

以运行Send类观看管控台的变化

4.3 创建Maven工程01-rabbitmq-receive-java添加Maven依赖

<dependencies>
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.1.1</version>
</dependency>
</dependencies>

4.4 编写消息接收类
在01-rabbitmq-receive-java项目中创建,com.xxxx.rabbitmq.queue.Receive类

public class Receive {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setUsername("root");factory.setPassword("root");factory.setHost("192.168.222.128");//建立到代理服务器到连接Connection conn = factory.newConnection();//获得信道final Channel channel = conn.createChannel();//声明队列channel.queueDeclare("myQueue", true, false, false, null);//消费消息boolean autoAck = true;String consumerTag = "";//接收消息//参数1 队列名称//参数2 是否自动确认消息 true表示自动确认 false表示手动确认//参数3 为消息标签 用来区分不同的消费者这里暂时为""// 参数4 消费者回调方法用于编写处理消息的具体代码(例如打印或将消息写入数据库)channel.basicConsume("myQueue", autoAck, consumerTag, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException {//获取消息数据String bodyStr = new String(body, "UTF-8");System.out.println(bodyStr);}});
//            channel.close();
//            conn.close();}
}

注意:
1、Queue的消息只能被同一个消费者消费,如果没有消费监听队列那么消息会存放到队列中持久化保存,直到有消费者来消费这个消息,如果以有消费者监听队列则立即消费发送到队列中的消息
2、Queue的消息可以保证每个消息都一定能被消费

5.Java绑定Exchange发送和接收消息

AMQP 协议中的核心思想就是生产者和消费者的解耦,生产者从不直接将消息发送给队列。生产者通常不知道是否一个消息会被发送到队列中,只是将消息发送到一个交换机。先由 Exchange 来接收,然后 Exchange 按照特定的策略转发到 Queue 进行存储。Exchange 就类似于一个交换机,将各个消息分发到相应的队列中。

在实际应用中我们只需要定义好 Exchange 的路由策略,而生产者则不需要关心消息会发送到哪个 Queue 或被哪些 Consumer 消费。在这种模式下生产者只面向 Exchange 发布消息,消费者只面向 Queue 消费消息,Exchange 定义了消息路由到 Queue 的规则,将各个层面的消息传递隔离开,使每一层只需要关心自己面向的下一层,降低了整体的耦合度。

5.1 Exchange的direct消息绑定

5.1.1 编写direct消息发送类
在01-rabbitmq-send-java项目中创建,com.xxxx.rabbitmq.direct.Send类

public class Send {public static void main(String[] args) throws IOException, TimeoutException {//创建链接工厂对象ConnectionFactory factory=new ConnectionFactory();factory.setHost("192.168.222.128");//设置RabbitMQ的主机IPfactory.setPort(5672);//设置RabbitMQ的端口号factory.setUsername("root");//设置访问用户名factory.setPassword("root");//设置访问密码Connection connection=null;//定义链接对象Channel channel=null;//定义通道对象connection=factory.newConnection();//实例化链接对象channel=connection.createChannel();//实例化通道对象String message ="Hello World!3";String exchangeName="myExchange";
channel.queueDeclare("myQueueDirect", true, false, false, null);//指定Exchange的类型//参数1为 交换机名称//参数2为交换机类型取值为 direct、queue、topic、headers//参数3 为是否为持久化消息 true表示持久化消息 false表示非持久化channel.exchangeDeclare(exchangeName, "direct", true);//发送消息到RabbitMQ//参数1 我们自定义的交换机名称//参数2 自定义的RoutingKey值//参数3 设置消息的属性,可以通过消息属性设置消息是否是持久化的//参数4 具体要发送的消息信息channel.basicPublish(exchangeName,"myRoutingKey",null,message.getBytes("UTF-8"));System.out.println("消息发送成功: "+message);
//        channel.close();
//        connection.close();}
}

注意:使用direct消息模式时必须要指定RoutingKey(路由键),将指定的消息绑定到指定的路由键上

5.1.2 编写direct消息接收类
在01-rabbitmq-Receive-java项目中创建,com.xxxx.rabbitmq.direct.Receive类

public static void main(String[] args) throws IOException, TimeoutException {//创建链接工厂对象ConnectionFactory factory=new ConnectionFactory();factory.setHost("192.168.222.128");//设置RabbitMQ的主机IPfactory.setPort(5672);//设置RabbitMQ的端口号factory.setUsername("root");//设置访问用户名factory.setPassword("root");//设置访问密码Connection connection=null;//定义链接对象Channel channel=null;//定义通道对象connection=factory.newConnection();//实例化链接对象channel=connection.createChannel();//实例化通道对象String message ="Hello World!3111";
channel.queueDeclare("myQueueDirect", true, false, false, null);String exchangeName="myExchange";//指定Exchange的类型//参数1为 交换机名称//参数2为交换机类型取值为 direct、queue、topic、headers//参数3 为是否为持久化消息 true表示持久化消息 false表示非持久化channel.exchangeDeclare(exchangeName, "direct", true);channel.queueDeclare("myQueueDirect", true, false, false, null);channel.basicConsume("myQueueDirect ", autoAck, consumerTag, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException {//获取消息数据String bodyStr = new String(body, "UTF-8");System.out.println(bodyStr);}});
//            channel.close();
//            conn.close();}}

注意:
1、使用Exchange的direct模式时接收者的RoutingKey必须要与发送时的RoutingKey完全一致否则无法获取消息
2、接收消息时队列名也必须要发送消息时的完全一致

5.2 Exchange的fanout消息绑定

5.2.1 编写fanout消息发送类
在01-rabbitmq-send-java项目中创建,com.xxxx.rabbitmq.fanout.Send类

public static void main(String[] args) throws IOException, TimeoutException {//创建链接工厂对象ConnectionFactory factory=new ConnectionFactory();factory.setHost("192.168.222.128");//设置RabbitMQ的主机IPfactory.setPort(5672);//设置RabbitMQ的端口号factory.setUsername("root");//设置访问用户名factory.setPassword("root");//设置访问密码Connection connection=null;//定义链接对象Channel channel=null;//定义通道对象connection=factory.newConnection();//实例化链接对象channel=connection.createChannel();//实例化通道对象String message ="Hello World!3111";String exchangeName="myExchangeFanout";//指定Exchange的类型//参数1为 交换机名称//参数2为交换机类型取值为 direct、queue、topic、headers//参数3 为是否为持久化消息 true表示持久化消息 false表示非持久化channel.exchangeDeclare(exchangeName, "fanout", true);//接收消息//参数1 队列名称//参数2 是否自动确认消息 true表示自动确认 false表示手动确认//参数3 为消息标签 用来区分不同的消费者这列暂时为""// 参数4 消费者回调方法用于编写处理消息的具体代码(例如打印或将消息写入数据库)System.out.println(queueName);channel.basicConsume(queueName,autoAck,consumerTag,new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException {//获取消息数String bodyStr = new String(body, "UTF-8");System.out.println(bodyStr);}});}

注意:
fanout模式的消息需要将一个消息同时绑定到多个队列中因此这里不能创建并指定某个队列

5.2.2 编写fanout消息接收类

在01-rabbitmq-receive-java项目中创建,com.xxxx.rabbitmq.fanout.Receive类

public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setUsername("root");factory.setPassword("root");factory.setHost("192.168.222.128");//建立到代理服务器到连接Connection conn = factory.newConnection();//获得信道final Channel channel = conn.createChannel();//声明交换器String exchangeName = "myExchangeFanout";channel.exchangeDeclare(exchangeName, "fanout", true);//声明队列String queueName = channel.queueDeclare().getQueue();String routingKey = "";//绑定队列,通过键 hola 将队列和交换器绑定起来channel.queueBind(queueName, exchangeName, routingKey);//消费消息boolean autoAck = true;String consumerTag = "";//接收消息//参数1 队列名称//参数2 是否自动确认消息 true表示自动确认 false表示手动确认//参数3 为消息标签 用来区分不同的消费者这列暂时为""// 参数4 消费者回调方法用于编写处理消息的具体代码(例如打印或将消息写入数据库)System.out.println(queueName);channel.basicConsume(queueName,autoAck,consumerTag,new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException {//获取消息数String bodyStr = new String(body, "UTF-8");System.out.println(bodyStr);}});}

注意:
1、使用fanout模式获取消息时不需要绑定特定的队列名称,只需使用channel.queueDeclare().getQueue();获取一个随机的队列名称,然后绑定到指定的Exchange即可获取消息。
2、这种模式中可以同时启动多个接收者只要都绑定到同一个Exchang即可让所有接收者同时接收同一个消息是一种广播的消息机制

5.3 Exchange的topic消息绑定

5.3.1编写topic消息发送类

在01-rabbitmq-send-java项目中创建,com.xxxx.rabbitmq.topic.Send类

public static void main(String[] args) throws IOException, TimeoutException {//创建链接工厂对象ConnectionFactory factory=new ConnectionFactory();factory.setHost("192.168.222.128");//设置RabbitMQ的主机IPfactory.setPort(5672);//设置RabbitMQ的端口号factory.setUsername("root");//设置访问用户名factory.setPassword("root");//设置访问密码Connection connection=null;//定义链接对象Channel channel=null;//定义通道对象connection=factory.newConnection();//实例化链接对象channel=connection.createChannel();//实例化通道对象String message ="Hello World!";String exchangeName="myExchangeTopic";//指定Exchange的类型//参数1为 交换机名称//参数2为交换机类型取值为 direct、queue、topic、headers//参数3 为是否为持久化消息 true表示持久化消息 false表示非持久化channel.exchangeDeclare(exchangeName, "topic", true);//发送消息到RabbitMQ//参数1 我们自定义的交换机名称//参数2 自定义的RoutingKey值//参数3 设置消息的属性,可以通过消息属性设置消息是否是持久化的//参数4 具体要发送的消息信息channel.basicPublish(exchangeName,"test.myRoutingKey",null,message.getBytes("UTF-8"));System.out.println("消息发送成功: "+message);channel.close();connection.close();
}

注意:
1、在topic模式中必须要指定Routingkey,并且可以同时指定多层的RoutingKey,每个层次之间使用 点分隔即可 例如test.myRoutingKey

5.3.2编写topic的消息接收类

在01-rabbitmq-receive-java项目中创建,com.xxxx.rabbitmq.topic.Receive类

public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setUsername("root");factory.setPassword("root");factory.setHost("192.168.222.128");//建立到代理服务器到连接Connection conn = factory.newConnection();//获得信道final Channel channel = conn.createChannel();//声明交换器String exchangeName = "myExchangeTopic";channel.exchangeDeclare(exchangeName, "topic", true);//声明队列String queueName = channel.queueDeclare().getQueue();String routingKey = "test.#";//绑定队列,通过键 将队列和交换器绑定起来channel.queueBind(queueName, exchangeName, routingKey);//消费消息boolean autoAck = true;String consumerTag = "";//接收消息//参数1 队列名称//参数2 是否自动确认消息 true表示自动确认 false表示手动确认//参数3 为消息标签 用来区分不同的消费者这列暂时为""// 参数4 消费者回调方法用于编写处理消息的具体代码(例如打印或将消息写入数据库)channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException {//获取消息数据String bodyStr = new String(body, "UTF-8");System.out.println("test.#----"+bodyStr);}});
}

注意:
1、Topic模式的消息接收时必须要指定RoutingKey并且可以使用# 和 *来做统配符号,#表示通配任意一个单词 *表示通配任意多个单词,例如消费者的RoutingKey为test.#或#.myRoutingKey都可以获取RoutingKey为test.myRoutingKey发送者发送的消息

5.4 事务消息

事务消息与数据库的事务类似,只是MQ中的消息是要保证消息是否会全部发送成功,防止丢失消息的一种策略。

RabbitMQ有两种方式来解决这个问题:

  1. 通过AMQP提供的事务机制实现;
  2. 使用发送者确认模式实现;

5.4.1 事务使用

事务的实现主要是对信道(Channel)的设置,主要的方法有三个:

  1. channel.txSelect()声明启动事务模式;
  2. channel.txCommint()提交事务;
  3. channel.txRollback()回滚事务;

5.4.2 编写消息发送类

在01-rabbitmq-send-java项目中创建,com.xxxx.rabbitmq.transaction.Send类

public class Send{public static void main(String[] args) throws IOException, TimeoutException {//创建链接工厂对象ConnectionFactory factory=new ConnectionFactory();factory.setHost("192.168.171.143");//设置RabbitMQ的主机IPfactory.setPort(5672);//设置RabbitMQ的端口号factory.setUsername("root");//设置访问用户名factory.setPassword("root");//设置访问密码Connection connection=null;//定义链接对象Channel channel=null;//定义通道对象connection=factory.newConnection();//实例化链接对象channel=connection.createChannel();//实例化通道对象String message ="Hello World!";
String exchangeName="myExchangeTransaction";
//指定Exchange的类型
//参数1为 交换机名称
//参数2为交换机类型取值为 direct、fanout、topic、headers
//参数3 为是否为持久化消息 true表示持久化消息 false表示非持久化
channel.exchangeDeclare(exchangeName, "direct", true);// 声明事务
channel.txSelect();//发送消息到RabbitMQ//参数1 我们自定义的交换机名称//参数2 自定义的RoutingKey值//参数3 设置消息的属性,可以通过消息属性设置消息是否是持久化的//参数4 具体要发送的消息信息channel.basicPublish(exchangeName,"myRoutingKeyTransaction",null,message.getBytes("UTF-8"));
// 提交事务channel.txCommit();System.out.println("消息发送成功: "+message);channel.close();connection.close();}
}

5.1.3编写消息接收类

在01-rabbitmq-receive-java项目中创建,com.xxxx.rabbitmq.transaction.Receive类

public class Receive{public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setUsername("root");factory.setPassword("root");factory.setHost("192.168.171.143");//建立到代理服务器到连接Connection conn = factory.newConnection();//获得信道final Channel channel = conn.createChannel();//声明交换器String exchangeName = "myExchangeTransaction";channel.exchangeDeclare(exchangeName, "direct", true);//声明队列String queueName = channel.queueDeclare().getQueue();String routingKey = "myRoutingKeyTransaction";//绑定队列,通过键 hola 将队列和交换器绑定起来channel.queueBind(queueName, exchangeName, routingKey);//消费消息boolean autoAck = true;String consumerTag = "";//接收消息 //参数1 队列名称//参数2 是否自动确认消息 true表示自动确认 false表示手动确认//参数3 为消息标签 用来区分不同的消费者这列暂时为""// 参数4 消费者回调方法用于编写处理消息的具体代码(例如打印或将消息写入数据库) channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException {//获取消息数据String bodyStr = new String(body, "UTF-8");System.out.println(bodyStr);}});
channel.close();conn.close();}
}

5.5 消息的发送者确认模式

Confirm发送方确认模式使用和事务类似,也是通过设置Channel进行发送方确认的,最终达到确保所有的消息全部发送成功

Confirm的三种实现方式:

方式一:channel.waitForConfirms()普通发送方确认模式;

public class Send {public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//创建链接工厂对象ConnectionFactory factory=new ConnectionFactory();factory.setHost("192.168.222.128");//设置RabbitMQ的主机IPfactory.setPort(5672);//设置RabbitMQ的端口号factory.setUsername("root");//设置访问用户名factory.setPassword("root");//设置访问密码Connection connection=null;//定义链接对象Channel channel=null;//定义通道对象connection=factory.newConnection();//实例化链接对象channel=connection.createChannel();//实例化通道对象String message ="Hello World!3";//创建队列 ,名字为myQueuechannel.queueDeclare("myQueue", true, false, false, null);// 开启发送方确认模式channel.confirmSelect();long time=System.currentTimeMillis();//发送消息到指定队列for(int i=0;i<10000;i++){message="Hello World!"+i;channel.basicPublish("","myQueue",null,message.getBytes("UTF-8"));}channel.waitForConfirms();System.out.println(System.currentTimeMillis()-time);System.out.println("消息发送成功: "+message);channel.close();connection.close();}
}

方式二:channel.waitForConfirmsOrDie()批量确认模式;

public class Send {public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//创建链接工厂对象ConnectionFactory factory=new ConnectionFactory();factory.setHost("192.168.222.128");//设置RabbitMQ的主机IPfactory.setPort(5672);//设置RabbitMQ的端口号factory.setUsername("root");//设置访问用户名factory.setPassword("root");//设置访问密码Connection connection=null;//定义链接对象Channel channel=null;//定义通道对象connection=factory.newConnection();//实例化链接对象channel=connection.createChannel();//实例化通道对象String message ="Hello World!3";//创建队列 ,名字为myQueuechannel.queueDeclare("myQueue", true, false, false, null);// 开启发送方确认模式channel.confirmSelect();long time=System.currentTimeMillis();//发送消息到指定队列for(int i=0;i<10000;i++){message="Hello World!"+i;channel.basicPublish("","myQueue",null,message.getBytes("UTF-8"));}channel.waitForConfirmsOrDie();System.out.println(System.currentTimeMillis()-time);System.out.println("消息发送成功: "+message);channel.close();connection.close();}
}

方式三:channel.addConfirmListener()异步监听发送方确认模式

public class Send {public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//创建链接工厂对象ConnectionFactory factory=new ConnectionFactory();factory.setHost("192.168.222.128");//设置RabbitMQ的主机IPfactory.setPort(5672);//设置RabbitMQ的端口号factory.setUsername("root");//设置访问用户名factory.setPassword("root");//设置访问密码Connection connection=null;//定义链接对象Channel channel=null;//定义通道对象connection=factory.newConnection();//实例化链接对象channel=connection.createChannel();//实例化通道对象String message ="Hello World!3";//创建队列 ,名字为myQueuechannel.queueDeclare("myQueue", true, false, false, null);// 开启发送方确认模式channel.confirmSelect();long time=System.currentTimeMillis();//发送消息到指定队列for(int i=0;i<10000;i++){message="Hello World!"+i;channel.basicPublish("","myQueue",null,message.getBytes("UTF-8"));}channel.addConfirmListener(new ConfirmListener() {public void handleAck(long deliveryTag, boolean multiple) throws IOException {System.out.println("未确认消息,标识:" + deliveryTag+"----"+multiple);}public void handleNack(long deliveryTag, boolean multiple) throws IOException {System.out.println("已确认消息,标识:"+deliveryTag+" ---多个消息:"+multiple);}});System.out.println(System.currentTimeMillis()-time);System.out.println("消息发送成功: "+message);channel.close();connection.close();}
}

5.6 消息的消费者确认模式

为了保证消息从队列可靠地到达消费者,RabbitMQ提供消息确认机制(message acknowledgment)。消费者在声明队列时,可以指定noAck参数,当noAck=false时,RabbitMQ会等待消费者显式发回ack信号后才从内存(和磁盘,如果是持久化消息的话)中移去消息。否则,RabbitMQ会在队列中消息被消费后立即删除它。
在Consumer中Confirm模式中分为手动确认和自动确认。

手动确认主要并使用以下方法:

basicAck(): 用于肯定确认,multiple参数用于多个消息确认。

basicRecover():是路由不成功的消息可以使用recovery重新发送到队列中。

basicReject():是接收端告诉服务器这个消息我拒绝接收,不处理,可以设置是否放回到队列中还是丢掉,而且只能一次拒绝一个消息,官网中有明确说明不能批量拒绝消息,为解决批量拒绝消息才有了basicNack。

basicNack():可以一次拒绝N条消息,客户端可以设置basicNack方法的multiple参数为true。

在01-rabbitmq-send-java项目中创建,
com.xxxx.rabbitmq.ack.Send类

public class Send {public static void main(String[] args) throws IOException, TimeoutException {//创建链接工厂对象ConnectionFactory factory=new ConnectionFactory();factory.setHost("192.168.222.128");//设置RabbitMQ的主机IPfactory.setPort(5672);//设置RabbitMQ的端口号factory.setUsername("root");//设置访问用户名factory.setPassword("root");//设置访问密码Connection connection=null;//定义链接对象Channel channel=null;//定义通道对象connection=factory.newConnection();//实例化链接对象channel=connection.createChannel();//实例化通道对象String message ="Hello World!3111222";String exchangeName="myExchange";channel.queueDeclare("myQueueDirect", true, false, false, null);//指定Exchange的类型//参数1为 交换机名称//参数2为交换机类型取值为 direct、queue、topic、headers//参数3 为是否为持久化消息 true表示持久化消息 false表示非持久化channel.exchangeDeclare(exchangeName, "direct", true);//发送消息到RabbitMQ//参数1 我们自定义的交换机名称//参数2 自定义的RoutingKey值//参数3 设置消息的属性,可以通过消息属性设置消息是否是持久化的//参数4 具体要发送的消息信息channel.basicPublish(exchangeName,"myRoutingKeyDirect",null,message.getBytes("UTF-8"));System.out.println("消息发送成功: "+message);
//        channel.close();
//        connection.close();}
}

在01-rabbitmq-receive-java项目中创建,
com.xxxx.rabbitmq.ack.Receive类

public class Receive {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setUsername("root");factory.setPassword("root");factory.setHost("192.168.222.128");//建立到代理服务器到连接Connection conn = factory.newConnection();//获得信道final Channel channel = conn.createChannel();//声明交换器String exchangeName = "myExchange";String queueName = "myQueueDirect";channel.queueDeclare(queueName, true, false, false, null);channel.exchangeDeclare(exchangeName, "direct", true);//声明队列String routingKey = "myRoutingKeyDirect";//绑定队列,通过键 hola 将队列和交换器绑定起来channel.queueBind(queueName, exchangeName, routingKey);//消费消息boolean autoAck = false;String consumerTag = "";//接收消息//参数1 队列名称//参数2 是否自动确认消息 true表示自动确认 false表示手动确认//参数3 为消息标签 用来区分不同的消费者这列暂时为""// 参数4 消费者回调方法用于编写处理消息的具体代码(例如打印或将消息写入数据库)System.out.println(queueName);
//开启事务channel.txSelect();channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException {//获取消息数据String bodyStr = new String(body, "UTF-8");System.out.println(bodyStr);//获取当前消息的序列号long deliveryTag = envelope.getDeliveryTag();//确认消息//参数 1 用于确定确认那条消息//参数 2 false 表示确认这条消息, true表示确认小于这个值的所有消息channel.basicAck(deliveryTag, false);}});
//开始提交事务
channel.txCommit()
//回滚事务
//      channel.txRollback();
//      channel.close();
//      conn.close();}
}

注意:
1、如果开启了事务手动提交以后再开始事务,如果事务执行了回滚操作那么即使手动确认了消息那么消息也不会从队列中移除,除非使用事务执行提交以后才会移除。

RabbitMQ消息发送和接收相关推荐

  1. rabbitmq消息发送与接收stomp通道测试

    新人学习笔记,有错欢迎交流指出~~ 发现问题: 安装好Erlang.rabbitmq客户端后,启用插件rabbitmq-plugins enable rabbitmq_web_stomp后无法访问ht ...

  2. python 网络编程之Socket通信案例消息发送与接收

    背景 网络编程是python编程中的一项基本技术.本文将实现一个简单的Socket通信案例消息发送与接收 正文 在python中的socket编程的大致流程图如上所示 我们来首先编写客户端的代码: # ...

  3. go 实现 kafka 消息发送、接收

    引言 网络上关于 go 实现 kafka 消息发送和接收的文章很多,但是实际操作起来又不是很清楚,本文在网络资源的基础上,结合自己搭建过程中遇到的问题进行了总结. 本文的实验主机:Mac笔记本. 一. ...

  4. 四种策略确保 RabbitMQ 消息发送可靠性!你用哪种?

    微服务可以设计成消息驱动的微服务,响应式系统也可以基于消息中间件来做,从这个角度来说,在互联网应用开发中,消息中间件真的是太重要了. 今天,以 RabbitMQ 为例,松哥来和大家聊一聊消息中间消息发 ...

  5. 使用Akka持久化——消息发送与接收

    版权声明:本文为博主原创文章,未经博主允许不得转载. https://blog.csdn.net/beliefer/article/details/53929751 前言 在<使用Akka持久化 ...

  6. RabbitMq 消息发送确认(可靠生产和推送确认)

    RabbitMq 消息发送确认(可靠生产和推送确认) 此文档只是本人在项目中碰到的一些问题而产生的个人相关总结,实际上的消息确认机制可以做得更多(比如分布式事务等,但此处不做阐述). 一.消息发送确认 ...

  7. RabbitMQ——使用Exchange中的fanout交换机实现消息发送和接收

    文章目录: 1.写在前面 2.使用fanout交换机实现消息的发送和接收 2.1 编写消息接收类(有两个) 2.2 编写消息发送类 1.写在前面 所有 MQ 产品从模型抽象上来说都是一样的过程: 消费 ...

  8. springcloudstream+rabbitmq+eureka进行消息发送和接收实例代码

    文章目录 eureka作注册中心的配置: 消息提供方: 消费者代码 注册中心.消息接受者.消息提供者分别启动: eureka作注册中心的配置: 依赖包: <dependencies>< ...

  9. 【转】DICOM医学图像处理:DIMSE消息发送与接收“大同小异”之DCMTK fo-dicom mDCM

    转自:https://my.oschina.net/zssure/blog/354816 背景: 从DICOM网络传输一文开始,相继介绍了C-ECHO.C-FIND.C-STORE.C-MOVE等DI ...

最新文章

  1. python3.8.5是python3吗_Python 升级到3.8.5
  2. swoole UDP TCP客户端
  3. 使用Java实现K-Means聚类算法
  4. 学习数字电路必须知道的几种编码
  5. Android 分享功能大全
  6. spring(java,js,html) 截图上传
  7. 一进庙会freeeim
  8. OpenState之 Mac学习 实验
  9. C语言实现常用排序算法——基数排序
  10. CentOS 7安装 MySQL 8 数据库
  11. jquery on()方法绑定多个选择器,多个事件
  12. java基础-对象-练习集锦
  13. 10个python经典小游戏(上)-五一嗨起来(动图演示+源码分享)
  14. 一位硕士毕业生三个月求职经历与经验的结晶
  15. 【计算机毕业设计】小型OA系统设计与实现Springboot
  16. Finalize、dispose、dispose(bool disposing)
  17. 指纹识别、图形识别、aliOCR 识别
  18. 百度地图绘制自定义区域
  19. CF1442D Sum 分治 背包dp
  20. Unity3D ——强大的跨平台3D游戏开发工具教程

热门文章

  1. time_t和字符串间的转化
  2. Postgres使用Copy命令将表导出成csv文件的遇到Permission Denied等错误
  3. 如何查看某oracle实例下面定义了多少jobs
  4. yii2 设置的缓存无效,返回false,不存在
  5. SearchScore
  6. 从数据库备份创建数据库
  7. 中国国家气象局天气预报信息接口
  8. Javascript模式阅读笔记 · 简介
  9. SQLITE中原子提交的实现
  10. java中方法的参数传递