​写在前面:全文12000多字,从为什么需要用消息队列,到rabbitMQ安装使用,如何使用JavaAPI生产消费消息,以及使用消息队列带来的一些常见问题。绝对很适合新手入门学习。

为什么需要消息队列

  1. 异步处理

  2. 削峰限流

    秒杀活动,一般会因为流量过大,导致应用挂掉。加入消息队列可控制活动人数,缓解短时间的高流量。

  3. 应用解耦

    双十一购物节,订单系统需要通知库存系统,传统做法是订单系统直接调用库存系统的接口,库存系统出现故障时订单就会失败。可在订单系统和库存系统中间加一个MQ,达到应用解耦的需求。

    A 系统发送数据到 BCD 三个系统,通过接口调用发送。如果 E 系统也要这个数据呢?那如果 C 系统现在不需要了呢?A 系统负责人几乎崩溃......

  4. 日志处理

消息队列有哪些

Kafka、ActiveMQ、RabbitMQ、RocketMQ 有什么优缺点?

特性 ActiveMQ RabbitMQ RocketMQ Kafka
单机吞吐量 万级,比 RocketMQ、Kafka 低一个数量级 同 ActiveMQ 10 万级,支撑高吞吐 10 万级,高吞吐,一般配合大数据类的系统来进行实时数据计算、日志采集等场景
topic 数量对吞吐量的影响 topic 可以达到几百/几千的级别,吞吐量会有较小幅度的下降,这是 RocketMQ 的一大优势,在同等机器下,可以支撑大量的 topic topic 从几十到几百个时候,吞吐量会大幅度下降,在同等机器下,Kafka 尽量保证 topic 数量不要过多,如果要支撑大规模的 topic,需要增加更多的机器资源
时效性 ms 级 微秒级,这是 RabbitMQ 的一大特点,延迟最低 ms 级 延迟在 ms 级以内
可用性 高,基于主从架构实现高可用 同 ActiveMQ 非常高,分布式架构 非常高,分布式,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用
消息可靠性 有较低的概率丢失数据 基本不丢 经过参数优化配置,可以做到 0 丢失 同 RocketMQ
功能支持 MQ 领域的功能极其完备 基于 erlang 开发,并发能力很强,性能极好,延时很低 MQ 功能较为完善,还是分布式的,扩展性好 功能较为简单,主要支持简单的 MQ 功能,在大数据领域的实时计算以及日志采集被大规模使用

综上,各种对比之后,有如下建议:

一般的业务系统要引入 MQ,最早大家都用 ActiveMQ,但是现在确实大家用的不多了,没经过大规模吞吐量场景的验证,社区也不是很活跃,所以大家还是算了吧,不推荐用这个了;

后来大家开始用 RabbitMQ,但是确实 erlang 语言阻止了大量的 Java 工程师去深入研究和掌控它,对公司而言,几乎处于不可控的状态,但是确实人家是开源的,比较稳定的支持,活跃度也高;

不过现在确实越来越多的公司,会去用 RocketMQ,确实很不错(阿里出品),但社区可能有突然黄掉的风险,对自己公司技术实力有绝对自信的,推荐用 RocketMQ,否则回去老老实实用 RabbitMQ 吧,人家有活跃的开源社区,绝对不会黄。

所以中小型公司,技术实力较为一般,技术挑战不是特别高,用 RabbitMQ 是不错的选择;大型公司,基础架构研发实力较强,用 RocketMQ 是很好的选择。如果是大数据领域的实时计算、日志采集等场景,用 Kafka 是业内标准的,绝对没问题,社区活跃度很高,绝对不会黄,何况几乎是全世界这个领域的事实性规范。

AMQP协议

1、AMQP 是什么

AMQP(Advanced Message Queuing Protocol,高级消息队列协议)是一个进程间传递异步消息网络协议

2、AMQP模型

3、工作过程

发布者(Publisher)发布消息(Message),经由交换机(Exchange)。

交换机根据路由规则将收到的消息分发给与该交换机绑定的队列(Queue)。

最后 AMQP 代理会将消息投递给订阅了此队列的消费者,或者消费者按照需求自行获取。

rabbitMQ基本概念

RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件),用Erlang语言编写的

producer&Consumer

producer指的是消息生产者,consumer消息的消费者。

Queue

消息队列,提供了FIFO的处理机制,具有缓存消息的能力。rabbitmq中,队列消息可以设置为持久化,临时或者自动删除。

  1. 设置为持久化的队列,queue中的消息会在server本地硬盘存储一份,防止系统crash,数据丢失

  2. 设置为临时队列,queue中的数据在系统重启之后就会丢失

  3. 设置为自动删除的队列,当不存在用户连接到server,队列中的数据会被自动删除

Exchange

Exchange类似于数据通信网络中的交换机,提供消息路由策略。rabbitmq中,producer不是通过信道直接将消息发送给queue,而是先发送给Exchange。一个Exchange可以和多个Queue进行绑定,producer在传递消息的时候,会传递一个ROUTING_KEY,Exchange会根据这个ROUTING_KEY按照特定的路由算法,将消息路由给指定的queue。和Queue一样,Exchange也可设置为持久化,临时或者自动删除。

Exchange有4种类型:direct(默认),fanout, topic, 和headers,不同类型的Exchange转发消息的策略有所区别:

  1. Direct 直接交换器,工作方式类似于单播,Exchange会将消息发送完全匹配ROUTING_KEY的Queue

  2. fanout 广播式交换器,不管消息的ROUTING_KEY设置为什么,Exchange都会将消息转发给所有绑定的Queue。

  3. topic 主题交换器,工作方式类似于组播,Exchange会将消息转发和ROUTING_KEY匹配模式相同的所有队列,比如,ROUTING_KEY为user.stock的Message会转发给绑定匹配模式为 * .stock,user.stock, * . * 和#.user.stock.#的队列。( * 表是匹配一个任意词组,#表示匹配0个或多个词组)

  4. headers 消息体的header匹配(ignore)

Binding

所谓绑定就是将一个特定的 Exchange 和一个特定的 Queue 绑定起来。Exchange 和Queue的绑定可以是多对多的关系。

virtual host

在rabbitmq server上可以创建多个虚拟的message broker,又叫做virtual hosts (vhosts)。每一个vhost本质上是一个mini-rabbitmq server,分别管理各自的exchange,和bindings。vhost相当于物理的server,可以为不同app提供边界隔离,使得应用安全的运行在不同的vhost实例上,相互之间不会干扰。producer和consumer连接rabbit server需要指定一个vhost。

通信过程

假设P1和C1注册了相同的Broker,Exchange和Queue。P1发送的消息最终会被C1消费。基本的通信流程大概如下所示:

  1. P1生产消息,发送给服务器端的Exchange

  2. Exchange收到消息,根据ROUTINKEY,将消息转发给匹配的Queue1

  3. Queue1收到消息,将消息发送给订阅者C1

  4. C1收到消息,发送ACK给队列确认收到消息

  5. Queue1收到ACK,删除队列中缓存的此条消息

rabbitMQ安装

1、安装文件准备

rabbitMQ下载地址:Downloading and Installing RabbitMQ — RabbitMQ

进入对应的系统的下载页面:

点击GitHub下载,可选择之前的版本

rabbit是erlang开发,需要安装erlang环境。

erlang下载地址:Downloads - Erlang/OTP

查看对应版本支持的erlang supported version of Erlang,例如我这下载的是3.7.28,可以看到erlang最低版本是21.3,最高时22.x,最好选择安装22.x安装。

下载好安装文件。

2、傻瓜式安装

1、安装erlang,双击opt_win64_22.2.exe,一直下一步。

2、安装rabbitMQ,双击rabbitmq-server-3.7.28.exe,一直下一步。

启动RabbitMQ Service,访问不了web控制台界面,找到如下路径下面的文件,删除,然后重新安装一下rabbitMQ

激活 RabbitMQ's Management Plugin

使用RabbitMQ 管理插件,可以更好的可视化方式查看Rabbit MQ 服务器实例的状态。

到RabbitMQ Server安装目录下面的sbin目录:RabbitMQ Server\rabbitmq_server-3.6.5\sbin

执行命令:

rabbitmq-plugins.bat enable rabbitmq_management

然后再访问http://127.0.0.1:15672/即可打开web控制页面

可使用guest/guest登录。

rabbitMQ管理界面的使用

1、添加用户

2、添加Virtual Hosts(虚拟服务器)

一般以/开头

3、Virtual Hosts授权给用户

Java连接rabbitMQ

Java连接rabbitMQ需要用到的包:

 <dependencies><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>3.4.1</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.5</version></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.11</version></dependency></dependencies>

1、简单队列

P:消息的生产者-->队列-->消费者

连接rabbitmq

public class ConnectionUtils {public static Connection getConnections() throws IOException{//定义一个连接工厂ConnectionFactory factory=new ConnectionFactory();//设置服务器ַfactory.setHost("127.0.0.1");//    AMQP  5672factory.setPort(5672);//vhostfactory.setVirtualHost("/vhost_mmr");//用户名factory.setUsername("huyanglin");//密码factory.setPassword("huyanglin");Connection newConnection = factory.newConnection();return newConnection;}
}

生产者

public class Send {private static final String QUEUE_NAME = "test_simple_queue";
​public static void main(String[] args) throws IOException {//获取连接Connection connection = ConnectionUtils.getConnection();//从连接中获取通道Channel channel = connection.createChannel();//声明队列//queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)//queue:队列名字//durable:是否持久化, 队列的声明默认是存放到内存中的,如果rabbitmq重启会丢失//exclusive:是否排外的,有两个作用,一:当连接关闭时connection.close()该队列是否会自动删除;二:该队列是否是私有的private,如果不是排外的,可以使用两个消费者都访问同一个队列,没有任何问题,如果是排外的,会对当前队列加锁,其他通道channel是不能访问的,如果强制访问会报异常:com.rabbitmq.client.ShutdownSignalException://autoDelete:是否自动删除队列,当最后一个消费者断开连接之后队列是否自动被删除,//args:相关参数,目前一般为nilchannel.queueDeclare(QUEUE_NAME,false,false,false,null);
​String msg = "hello simple !";
​//发送消息//basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)//exchange 交换机//routingKey 路由键//props 基本属性设置//body 消息体,真正需要发送的消息channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
​System.out.println("send message :" +msg);
​channel.close();connection.close();}
}

消费者

public class Recv {private static final String QUEUE_NAME = "test_simple_queue";public static void main(String[] args) throws IOException {//获取连接Connection connection = ConnectionUtils.getConnection();
​//从连接中创建通道Channel channel = connection.createChannel();
​//声明队列channel.queueDeclare(QUEUE_NAME,false,false,false,null);
​DefaultConsumer defaultConsumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body,"utf-8");System.out.println("取数据:"+msg);}};
​//监听队列//basicConsume(String queue, boolean autoAck, Consumer callback)//autoAck 是否自动确认channel.basicConsume(QUEUE_NAME,true,defaultConsumer);}
}

简单队列的不足

  1. 耦合性高,生产者一一对应消费者(不能满足多个消费者消费队列中的消息)

  2. 队列名变更,得同时变更

2、work queues 工作队列

1 轮询分发

为什么会出现工作队列

simple队列 是一一对应的,实际开发中,生产者发送消息是毫不费力的,而消费者一般是与业务结合的,消费者接收到消息后就需要处理,可能需要花费时间。这时候队列就会挤压着很多消息。

生产者

public class Send {private static final String QUEUE_NAME = "test_work_queue";public static void main(String[] args) throws IOException, InterruptedException {
​//获取连接Connection connection = ConnectionUtils.getConnection();//创建通道Channel channel = connection.createChannel();//声明队列channel.queueDeclare(QUEUE_NAME,false,false,false,null);
​int count = 50;for (int i = 0; i < count; i++) {String msg = "work-queue-"+i;channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());System.out.println("send message :"+msg);Thread.sleep(100);}//关闭连接channel.close();connection.close();}
}

消费者1

public class Recv1 {private static final String QUEUE_NAME = "test_work_queue";public static void main(String[] args) throws IOException {
​//获取连接Connection connection = ConnectionUtils.getConnection();//创建通道Channel channel = connection.createChannel();//声明队列channel.queueDeclare(QUEUE_NAME,false,false,false,null);
​//定义一个消费者DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body,"utf-8");System.out.println("consumer[1] recv message :"+msg);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}finally {System.out.println("consumer[1] done");}}};boolean autoAck = true;channel.basicConsume(QUEUE_NAME,autoAck,consumer);}
}

消费者2

public class Recv2 {private static final String QUEUE_NAME = "test_work_queue";public static void main(String[] args) throws IOException {
​//获取连接Connection connection = ConnectionUtils.getConnection();//创建通道Channel channel = connection.createChannel();//声明队列channel.queueDeclare(QUEUE_NAME,false,false,false,null);
​//定义一个消费者DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body,"utf-8");System.out.println("consumer[2] recv message :"+msg);try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}finally {System.out.println("consumer[2] done");}}};boolean autoAck = true;channel.basicConsume(QUEUE_NAME,autoAck,consumer);}
}

现象:消费者1和消费者2处理的数据消息是一样多的,

消费者1都是偶数

消费者2都是奇数

这种方式叫 轮询分发(round-robin)结果是 不管谁忙或者谁闲,任务消息都是一边一个轮询发

2 公平分发

生产者

//在消费者返回确认消息之前,只分发一个消息
int prefecthCount = 1;
channel.basicQos(prefecthCount);

消费者

1.确认每次只收到一个消息

2.每次处理完消息要返回确认信息

3.自动应答 关闭

现象:消费者1处理得比消费者2多(能者多劳)

  • 消息应答 与 消息持久化

boolean autoAck = false;//自动应答=false

channel.basicConsume(QUEUE_NAME, autoAck, consumer);

boolean autoAck = true;

自动确认模式,一旦rabbitmq将消息分发给消费者,就会从内存中删除

这种情况,如果杀死正在执行任务的消费者,则会丢失正在处理的消息

boolean autoAck = false;

手动模式 ,如果有一个消费者挂掉,就会交付给其他消费者。rabbitMQ支出消息应答,消费者处理完消息后,给abbitmq发送确认消息,rabbitmq收到后就会删除内存中的消息

消息应答默认是打开的==>false

消息的持久化

boolean durable=false;//持久化

channel.queueDeclare(QUEUE_NAME, durable, false, false, null);

我们将程序中的 boolean durable=false 直接改成true是不可以的,因为test_work_queue队列已经定义成未持久化的队列,rabbitmq不允许重新定义已经存在的队列。

3、发布订阅模式

X:交换机、转发器

解读:

1.一个生产者,多个消费者

2.每一个消费者都有自己的队列

3.生产者没有直接将消息发送到队列。而是发送到了交换机

4.每个队列都要绑定到交换机上

5.生产者发送的消息,经过交换机,到达队列,就能实现一个消息被多个消费者消费

生产者

public class Send {private static final String EXCHANGE_NAME = "test_exchange_fanout";public static void main(String[] args) throws IOException {//获取连接Connection connection = ConnectionUtils.getConnection();//创建通道Channel channel = connection.createChannel();//声明交换机,fanout:广播式交换器channel.exchangeDeclare(EXCHANGE_NAME,"fanout");String msg = "hello ps!";channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes());System.out.println("send :"+msg);channel.close();connection.close();}
}

这时候消息哪去了??

消息丢失了!因为交换机没有存储能力,在rabbitMQ里面,只有队列有存储能力

消费者1

public class Recv1 {private static final String EXCHANGE_NAME = "test_exchange_fanout";private static final String QUEUE_NAME = "test_queue_fanout_email";public static void main(String[] args) throws IOException {//获取连接Connection connection = ConnectionUtils.getConnection();//创建通道final Channel channel =  connection.createChannel();//声明队列channel.queueDeclare(QUEUE_NAME,false,false,false,null);//绑定队列到交换机channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");//每次只接收一个消息channel.basicQos(1);DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body,"utf-8");System.out.println("consumer[1] recv message: "+msg);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}finally {System.out.println("consumer[1] done");channel.basicAck(envelope.getDeliveryTag(),false);}}};channel.basicConsume(QUEUE_NAME,false,consumer);}
}

消费者2

public class Recv2 {private static final String EXCHANGE_NAME = "test_exchange_fanout";private static final String QUEUE_NAME = "test_queue_fanout_sms";public static void main(String[] args) throws IOException {//获取连接Connection connection = ConnectionUtils.getConnection();//创建通道final Channel channel =  connection.createChannel();//声明队列channel.queueDeclare(QUEUE_NAME,false,false,false,null);//绑定队列到交换机channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");//每次只接收一个消息channel.basicQos(1);DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body,"utf-8");System.out.println("consumer[2] recv message: "+msg);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}finally {System.out.println("consumer[2] done");channel.basicAck(envelope.getDeliveryTag(),false);}}};channel.basicConsume(QUEUE_NAME,false,consumer);}
}

Exchange(交换机 转发器)

一方面是接收生产者的消息,一方面将消息推送到各个消费者的队列

Fanout(不处理路由键)

Direct (处理路由键)

4、路由模式

模型

生产者

public class Send {private static final String EXCHANGE_NAME = "test_exchange_direct";public static void main(String[] args) throws IOException {Connection connection = ConnectionUtils.getConnection();Channel channel = connection.createChannel();//直接交换器,完全匹配路由channel.exchangeDeclare(EXCHANGE_NAME, "direct");String msg = "hello direct!";String routingKey = "info";channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.concat(routingKey).getBytes());System.out.println("send message:" + msg.concat(routingKey));routingKey = "error";channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.concat(routingKey).getBytes());System.out.println("send message:" + msg.concat(routingKey));channel.close();connection.close();}
}

消费者1

public class Recv1 {private static final String EXCHANGE_NAME = "test_exchange_direct";private static final String QUEUE_NAME = "test_queue_direct_1";public static void main(String[] args) throws IOException {Connection connection = ConnectionUtils.getConnection();final Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.basicQos(1);channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body, "utf-8");System.out.println("consumer[1] recv message: " + msg);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();} finally {System.out.println("consumer[1] done");channel.basicAck(envelope.getDeliveryTag(), false);}}};channel.basicConsume(QUEUE_NAME, false, consumer);}
}

消费者2

public class Recv2 {private static final String EXCHANGE_NAME = "test_exchange_direct";private static final String QUEUE_NAME = "test_queue_direct_2";public static void main(String[] args) throws IOException {Connection connection = ConnectionUtils.getConnection();final Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.basicQos(1);channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning");DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body, "utf-8");System.out.println("consumer[2] recv message: " + msg);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();} finally {System.out.println("consumer[2] done");channel.basicAck(envelope.getDeliveryTag(), false);}}};channel.basicConsume(QUEUE_NAME, false, consumer);}
}

5、主题模式(topic)

将路由与某模式进行匹配

#-------匹配一个或者多个

*--------匹配一个

生产者

public class Send {private static final String EXCHANGE_NAME = "test_exchange_topic";public static void main(String[] args) throws IOException {Connection connection = ConnectionUtils.getConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, "topic");String msg = "hello topic!";String routingKey = "goods.del";channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.concat(routingKey).getBytes());System.out.println("send message:" + msg.concat(routingKey));routingKey = "goods.add";channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.concat(routingKey).getBytes());System.out.println("send message:" + msg.concat(routingKey));channel.close();connection.close();}
}

消费者1

public class Recv1 {private static final String EXCHANGE_NAME = "test_exchange_topic";private static final String QUEUE_NAME = "test_queue_topic_1";public static void main(String[] args) throws IOException {Connection connection = ConnectionUtils.getConnection();final Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.basicQos(1);channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.add");DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body, "utf-8");System.out.println("consumer[1] recv message: " + msg);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();} finally {System.out.println("consumer[1] done");channel.basicAck(envelope.getDeliveryTag(), false);}}};channel.basicConsume(QUEUE_NAME, false, consumer);}
}

消费者2

public class Recv2 {private static final String EXCHANGE_NAME = "test_exchange_topic";private static final String QUEUE_NAME = "test_queue_topic_2";public static void main(String[] args) throws IOException {Connection connection = ConnectionUtils.getConnection();final Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.basicQos(1);channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.*");DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body, "utf-8");System.out.println("consumer[2] recv message: " + msg);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();} finally {System.out.println("consumer[2] done");channel.basicAck(envelope.getDeliveryTag(), false);}}};channel.basicConsume(QUEUE_NAME, false, consumer);}
}

6、rabbitMQ的消息确认机制(事物+confirm)

在rabbitmq中,我们可以通过持久化数据,解决rabbitmq的服务器异常 的数据丢失问题

问题:生产者将消息发送出去后,是否到达rabbitmq服务器?默认的情况下是不知道的

两种方式解决:

1.AMQP 实现了事物机制

2.confirm 模式

AMQP 事物机制

txSelect 用户将当前channel设置成transation模式

txCommit 用于提交事物

txRollback 回滚事物

生产者

public class TxSend {private static final String QUEUE_NAME = "test_tx";public static void main(String[] args) throws IOException {Connection connection = ConnectionUtils.getConnection();Channel channel = connection.createChannel();String msg = "hello tx!";channel.queueDeclare(QUEUE_NAME, false, false, false, null);try {channel.txSelect();channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());int qq = 1 / 0;channel.txCommit();System.out.println("send message commit");} catch (Exception e) {channel.txRollback();System.out.println("send message rollback");}System.out.println("send message:" + msg);channel.close();connection.close();}
}

消费者

public class TxRecv {private static final String QUEUE_NAME = "test_tx";public static void main(String[] args) throws IOException {Connection connection = ConnectionUtils.getConnection();final Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.basicQos(1);channel.basicConsume(QUEUE_NAME, true,new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body, "utf-8");System.out.println("recv message: " + msg);}});}
}

这种模式比较耗时,降低了rabbitmq的吞吐量

confirm模式

生产者将信道设置成confirm模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都将会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker回传给生产者的确认消息中delivery-tag域包含了确认消息的序列号,此外broker也可以设置basic.ack的multiple域,表示到这个序列号之前的所有消息都已经得到了处理;

confirm模式最大的好处是 异步

rabbitmq如果服务器异常或者崩溃,就会发送一个nack消息

开启confirm模式

channel.confirmSelect();

编程模式

1.普通 发一条 waitForConfirm()

2.批量 发一批 waitForConfirms()

3.异步confirm模式 提供一个回调方法

confirm单条

public class Send {private static final String QUEUE_NAME = "test_queue_confirm1";public static void main(String[] args) throws IOException, InterruptedException {Connection connection = ConnectionUtils.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);//生产者调用confirmSelect,将channel设置为confirm模式channel.confirmSelect();String msg = "hello confirm1!";channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());if (!channel.waitForConfirms()) {System.out.println("message send failed");} else {System.out.println("message send ok");}channel.close();connection.close();}
}

confirm多条

public class Send2 {private static final String QUEUE_NAME = "test_queue_confirm1";public static void main(String[] args) throws IOException, InterruptedException {Connection connection = ConnectionUtils.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);//生产者调用confirmSelect,将channel设置为confirm模式channel.confirmSelect();String msg = "hello confirm1!";//批量发送for (int i = 0; i < 10; i++) {channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());}if (!channel.waitForConfirms()) {System.out.println("message send failed");} else {System.out.println("message send ok");}channel.close();connection.close();}
}

异步confirm模式

Channel对象提供的ConfirmListener()回调方法只包含deliveryTag(当前Chanel发出的消息序号),我们需要自己为每一个Channel维护一个unconfirm的消息序号集合,每publish一条数据,集合中元素加1,每回调一次handleAck方法,unconfirm集合删掉相应的一条(multiple=false)或多条(multiple=true)记录。从程序运行效率上看,这个unconfirm集合最好采用有序集合SortedSet存储结构。实际上,SDK中的waitForConfirms()方法也是通过SortedSet维护消息序号的。

public class Send3 {private static final String QUEUE_NAME = "confirm_test_1";public static void main(String[] args) throws IOException, InterruptedException {Connection connections = ConnectionUtils.getConnection();Channel channel = connections.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 生产者调用confirmSelect 将channel设置为confirm模式channel.confirmSelect();// 存放未确认的消息标识final SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());// 添加通道监听channel.addConfirmListener(new ConfirmListener() {public void handleAck(long deliverTag, boolean mutiple) throws IOException {if (mutiple) {System.out.println("---handleAck-------mutiple----");confirmSet.headSet(deliverTag + 1).clear();} else {System.out.println("---handleAck-------mutiple---false");confirmSet.remove(deliverTag);}}public void handleNack(long deliverTag, boolean mutiple) throws IOException {if (mutiple) {System.out.println("---handleNack-------mutiple----");confirmSet.headSet(deliverTag + 1).clear();} else {System.out.println("---handleNack-------mutiple---false");confirmSet.remove(deliverTag);}}});String msg = "hello confirm msg";while (true) {long seqNo = channel.getNextPublishSeqNo();channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());confirmSet.add(seqNo);if(seqNo == 100){break;}}}
}

7、Spring集合rabbitmq

配置

<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:rabbit="http://www.springframework.org/schema/rabbit"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans-3.0.xsdhttp://www.springframework.org/schema/rabbithttp://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd" ><description>rabbitmq 连接服务配置</description><!-- 1.定义rabbitMQ连接工厂 --><rabbit:connection-factory id="connectionFactory"host="127.0.0.1"username="rabbit"password="rabbit"port="5672"virtual-host="/rabbit"/><!-- 2.定义rabbit模板,指定连接工厂以及定义exchange --><rabbit:template exchange="amqpExchange" id="amqpTemplate"  connection-factory="connectionFactory" /><!-- MQ的管理 --><rabbit:admin connection-factory="connectionFactory"/><rabbit:queue id="test_queue_key" name="test_queue_key" /><!--durable:是否持久化exclusive: 仅创建者可以使用的私有队列,断开后自动删除auto_delete: 当所有消费客户端连接断开后,是否自动删除队列--><rabbit:fanout-exchange name="amqpExchange" ><rabbit:bindings><rabbit:binding queue="test_queue_key"/></rabbit:bindings></rabbit:fanout-exchange><!--队列监听--><rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto"><rabbit:listener queues="test_queue_key" ref="queueListenter" method="listen"/></rabbit:listener-container><!--消费者--><bean id="queueListenter" class="com.rabbitmq.spring.MyConsumer" /></beans>

生产者

public class SpringMain {public static void main(String[] args) throws InterruptedException {AbstractApplicationContext context = new ClassPathXmlApplicationContext("classpath:application-mq.xml");//rabbit模板RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);//发送消息rabbitTemplate.convertAndSend("hello world!");//休眠1秒Thread.sleep(1000);//容器销毁context.destroy();}

消费者

public class MyConsumer {public void listen(String foo){System.out.println("消费者:"+foo);}
}

8、代码示例

https://gitee.com/luojinjiang/javaproject/tree/master/myrabbitmq

RabbitMQ如何解决各种情况下丢数据的问题

1.生产者丢数据

生产者的消息没有投递到MQ中怎么办?从生产者弄丢数据这个角度来看,RabbitMQ提供transaction和confirm模式来确保生产者不丢消息。
transaction机制就是说,发送消息前,开启事物(channel.txSelect()),然后发送消息,如果发送过程中出现什么异常,事物就会回滚(channel.txRollback()),如果发送成功则提交事物(channel.txCommit())。然而缺点就是吞吐量下降了。因此,按照博主的经验,生产上用confirm模式的居多。一旦channel进入confirm模式,所有在该信道上面发布的消息都将会被指派一个唯一的ID(从1开始),一旦
消息被投递到所有匹配的队列之后,rabbitMQ就会发送一个Ack给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了.如果rabiitMQ没能处理该消息,则会发送一个Nack消息给你,你可以进行重试操作。

下面演示一下confirm模式:

//测试确认后回调
@Service
public class HelloSender1 implements RabbitTemplate.ConfirmCallback {@Autowiredprivate RabbitTemplate rabbitTemplate;public void send() {String context = "你好现在是 " + new Date() +"";System.out.println("HelloSender发送内容 : " + context);this.rabbitTemplate.setConfirmCallback(this);//exchange,queue 都正确,confirm被回调, ack=true//this.rabbitTemplate.convertAndSend("exchange","topic.message", context);//exchange 错误,queue 正确,confirm被回调, ack=false//this.rabbitTemplate.convertAndSend("fasss","topic.message", context);//exchange 正确,queue 错误 ,confirm被回调, ack=true; return被回调 replyText:NO_ROUTE//this.rabbitTemplate.convertAndSend("exchange","", context);//exchange 错误,queue 错误,confirm被回调, ack=falsethis.rabbitTemplate.convertAndSend("fasss","fass", context);}@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("confirm--:correlationData:"+correlationData+",ack:"+ack+",cause:"+cause);}}

2.消息队列丢数据

处理消息队列丢数据的情况,一般是开启持久化磁盘的配置。这个持久化配置可以和confirm机制配合使用,你可以在消息持久化磁盘后,再给生产者发送一个Ack信号。这样,如果消息持久化磁盘
之前,rabbitMQ阵亡了,那么生产者收不到Ack信号,生产者会自动重发。
那么如何持久化呢,这里顺便说一下吧,其实也很容易,就下面两步
①、将queue的持久化标识durable设置为true,则代表是一个持久的队列
②、发送消息的时候将deliveryMode=2
这样设置以后,rabbitMQ就算挂了,重启后也能恢复数据。在消息还没有持久化到硬盘时,可能服务已经死掉,这种情况可以通过引入mirrored-queue即镜像队列,但也不能保证消息百分百不丢
失(整个集群都挂掉)
/*** 第二个参数:queue的持久化是通过durable=true来实现的。* 第三个参数:exclusive:排他队列,如果一个队列被声明为排他队列,该队列仅对首次申明它的连接可见,并在连接断开时自动删除。这里需要注意三点:    1. 排他队列是基于连接可见的,同一连接的不同信道是可以同时访问同一连接创建的排他队列;    2.“首次”,如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同;    3.即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除的,这种队列适用于一个客户端发送读取消息的应用场景。* 第四个参数:自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。* @param* @return* @Author zxj*/@Beanpublic Queue queue() {Map<String, Object> arguments = new HashMap<>();arguments.put("x-message-ttl", 25000);//25秒自动删除Queue queue = new Queue("topic.messages", true, false, true, arguments);return queue;}
MessageProperties properties=new MessageProperties();properties.setContentType(MessageProperties.DEFAULT_CONTENT_TYPE);properties.setDeliveryMode(MessageProperties.DEFAULT_DELIVERY_MODE);//持久化设置properties.setExpiration("2018-12-15 23:23:23");//设置到期时间Message message=new Message("hello".getBytes(),properties);this.rabbitTemplate.sendAndReceive("exchange","topic.message",message);

3.消费者丢数据

启用手动确认模式可以解决这个问题
①自动确认模式,消费者挂掉,待ack的消息回归到队列中。消费者抛出异常,消息会不断的被重发,直到处理成功。不会丢失消息,即便服务挂掉,没有处理完成的消息会重回队列,但是异常会让
消息不断重试。
②手动确认模式
③不确认模式,acknowledge="none" 不使用确认机制,只要消息发送完成会立即在队列移除,无论客户端异常还是断开,只要发送完就移除,不会重发。
指定Acknowledge的模式:
spring.rabbitmq.listener.direct.acknowledge-mode=manual,表示该监听器手动应答消息
针对手动确认模式,有以下特点:
1.使用手动应答消息,有一点需要特别注意,那就是不能忘记应答消息,因为对于RabbitMQ来说处理消息没有超时,只要不应答消息,他就会认为仍在正常处理消息,导致消息队列出现阻塞,影响业务执行。
2.如果消费者来不及处理就死掉时,没有响应ack时,会项目启动后会重复发送一条信息给其他消费者;
3.可以选择丢弃消息,这其实也是一种应答,如下,这样就不会再次收到这条消息。
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);
4.如果消费者设置了手动应答模式,并且设置了重试,出现异常时无论是否捕获了异常,都是不会重试的
5.如果消费者没有设置手动应答模式,并且设置了重试,那么在出现异常时没有捕获异常会进行重试,如果捕获了异常不会重试。

重试机制:

spring.rabbitmq.listener.simple.retry.max-attempts=5  最大重试次数
spring.rabbitmq.listener.simple.retry.enabled=true 是否开启消费者重试(为false时关闭消费者重试,这时消费端代码异常会一直重复收到消息)
spring.rabbitmq.listener.simple.retry.initial-interval=5000 重试间隔时间(单位毫秒)
spring.rabbitmq.listener.simple.default-requeue-rejected=false 重试次数超过上面的设置之后是否丢弃(false不丢弃时需要写相应代码将该消息加入死信队列)

如果设置了重试模式,那么在出现异常时没有捕获异常会进行重试,如果捕获了异常不会重试。

当出现异常时,我们需要把这个消息回滚到消息队列,有两种方式:

//ack返回false,并重新回到队列,api里面解释得很清楚

//ack返回false,并重新回到队列,api里面解释得很清楚
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
//拒绝消息
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);

经过开发中的实际测试,当消息回滚到消息队列时,这条消息不会回到队列尾部,而是仍是在队列头部,这时消费者会立马又接收到这条消息进行处理,接着抛出异常,进行 回滚,如此反复进行。这种情况会导致消息队列处理出现阻塞,消息堆积,导致正常消息也无法运行。对于消息回滚到消息队列,我们希望比较理想的方式时出现异常的消息到达消息队列尾部,这样既保证消息不会丢失,又保证了正常业务的进行,因此我们采取的解决方案是,将消息进行应答,这时消息队列会删除该消息,同时我们再次发送该消息 到消息队列,这时就实现了错误消息进行消息队列尾部的方案。

//手动进行应答channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);//重新发送消息到队尾channel.basicPublish(message.getMessageProperties().getReceivedExchange(),message.getMessageProperties().getReceivedRoutingKey(), MessageProperties.PERSISTENT_TEXT_PLAIN,JSON.toJSONBytes(new Object()));

如果一个消息体本身有误,会导致该消息体,一直无法进行处理,而服务器中刷出大量无用日志。解决这个问题可以采取两种方案:

1.一种是对于日常细致处理,分清哪些是可以恢复的异常,哪些是不可以恢复的异常。对于可以恢复的异常我们采取第三条中的解决方案,对于不可以处理的异常,我们采用记录日志,直接丢弃该消息方案。

2.另一种是我们对每条消息进行标记,记录每条消息的处理次数,当一条消息,多次处理仍不能成功时,处理次数到达我们设置的值时,我们就丢弃该消息,但需要记录详细的日志。

RabbitMQ如何保证高可用

RabbitMQ有两种集群模式,普通集群和镜像集群

普通集群

即在多个服务器上部署多个MQ实例, 每台机器一个实例. 创建的每一个queue,只会存在一个MQ实例上. 但是每一个实例都会同步queue的元数据(即queue的标识信息). 当在进行消费的时候, 就算 连接到了其他的MQ实例上, 其也会根据内部的queue的元数据,从该queue所在实例上拉取数据过来.

这种方式只是一个简单的集群,并没有考虑高可用. 并且性能开销巨大.容易造成单实例的性能瓶颈. 并且如果真正有数据的那个queue的实例宕机了. 那么其他的实例就无法进行数据的拉取.

这种方式只是通过集群部署的方式提高了消息的吞吐量,但是并没有考虑到高可用.

镜像集群模式

这种模式才是高可用模式. 与普通集群模式的主要区别在于. 无论queue的元数据还是queue中的消息都会同时存在与多个实例上.

要开启镜像集群模式,需要在后台新增镜像集群模式策略. 即要求数据同步到所有的节点.也可以指定同步到指定数量的节点.

这种方式的好处就在于, 任何一个服务宕机了,都不会影响整个集群数据的完整性, 因为其他服务中都有queue的完整数据, 当进行消息消费的时候,连接其他的服务器节点一样也能获取到数据.

缺点:

  1. 性能开销非常大,因为要同步消息到对应的节点,这个会造成网络之间的数据量的频繁交互,对于网络带宽的消耗和压力都是比较重的

  2. 没有扩展可言,rabbitMQ是集群,不是分布式的,所以当某个Queue负载过重,我们并不能通过新增节点来缓解压力,因为所以节点上的数据都是相同的,这样就没办法进行扩展了

好了,本次rabbitMQ篇的学习就到这里了。相关的示例代码已上传码云,仓库地址可关注"良辰"公众号,回复"rabbitMQ"获取

学无止境,关注我,我们一起进步。如果觉得文章还可以,点个赞呗,谢谢~我们下期见。

干货!消息队列RabbitMQ入门教程相关推荐

  1. 消息队列RabbitMQ入门与PHP实战

    消息队列介绍以及消息队列应用场景 RabbitMQ 说明 MQ(Message Queue) 即消息队列,是应用间的通信方式,消息发送后可立即返回,由消息系统来确保消息的可靠传递."消息队列 ...

  2. 消息队列RabbitMQ入门与5种模式详解

    1.RabbitMQ概述 简介: MQ全称为Message Queue,消息队列是应用程序和应用程序之间的通信方法: RabbitMQ是开源的,实现了AMQP协议的,采用Erlang(面向并发编程语言 ...

  3. 【转载】消息队列RabbitMQ入门介绍

    (一)基本概念 RabbitMQ 是流行的开源消息队列系统,用 erlang 语言开发.我曾经对这门语言挺有兴趣,学过一段时间,后来没坚持.RabbitMQ 是 AMQP(高级消息队列协议)的标准实现 ...

  4. linux发布微软消息队列,消息队列RabbitMQ入门与5种模式详解

    1.RabbitMQ概述 简介: MQ全称为Message Queue,消息队列是应用程序和应用程序之间的通信方法: RabbitMQ是开源的,实现了AMQP协议的,采用Erlang(面向并发编程语言 ...

  5. RabbitMQ入门教程(安装,管理插件,Publisher/Consumer/交换机/路由/队列/绑定关系,及如何保证100%投递等)

    RabbitMQ入门教程(安装,管理插件,Publisher/Consumer/交换机/路由/队列/绑定关系,及如何保证100%投递等) 1. RabbitMQ简介及AMQP协议 开源的消息代理和队列 ...

  6. 初识消息队列/RabbitMQ详解

    欢迎大家阅读<朝夕Net社区技术专刊> 我们致力于.NetCore的推广和落地,为更好的帮助大家学习,方便分享干货,特创此刊!很高兴你能成为忠实读者,文末福利不要错过哦! 今天来给大家分享 ...

  7. 消息队列RabbitMQ的使用

    最近在学习spring cloud微服务,当学习到spring cloud bus的时候,涉及到了消息队列,需要学习RabbitMQ. 一.消息队列 1.1介绍消息队列 消息队列,即MQ,Messag ...

  8. 消息队列RabbitMQ之初学者

    文章目录 消息队列 什么是消息队列 生产者和消费者 AMQP和JMS AMQP和JMS的区别 常见的MQ产品 RabbitMQ Erlang语言 RabbitMQ下载 什么是消息队列RabbitMQ? ...

  9. Spring Boot 消息队列 RocketMQ 入门

    转载自  芋道 Spring Boot 消息队列 RocketMQ 入门 摘要: 原创出处 http://www.iocoder.cn/Spring-Boot/RocketMQ/ 「芋道源码」欢迎转载 ...

最新文章

  1. 知方可补不足~开发人员可以自己定义VS文件模版
  2. 海外净利润低?海尔智家H股上市有望看齐国内!
  3. python flask与django的区别_真正搞明白Python中Django和Flask框架的区别
  4. DebootstrapChroot
  5. 完成DI 依赖注入功能
  6. mysql 数据库引擎介绍_MYSQL 数据库引擎介绍
  7. SPA (单页应用程序)
  8. 放大镜_医用手术放大镜
  9. android网络框架
  10. Tensorflow 实战 Google 深度学习框架(第2版)---- 10.2.2节 P272 代码
  11. javaSE(java基础库)私人学习笔记
  12. Selenium3 Java自动化测试完整教程
  13. 常见后端数据存储问题解决方案
  14. ps4正在连接ea服务器,ps4极品飞车19连不上ea服务器 | 手游网游页游攻略大全
  15. Page migration
  16. 讯飞语音集成(语音转文字,文字转语音)
  17. 海思3559AV100 HiSysLink 之 IPCMSG
  18. python 导入自己写的类
  19. iOS开发---本地通知(UILocalNotification)
  20. leetcode_Hamming Distance

热门文章

  1. 优酷土豆合并数据一览
  2. MTK 平台sensor arch 介绍-kernel
  3. Mysql有效的删除重复数据保留一条
  4. 高校房地产管理系统可以管理高校哪类房产?
  5. 我的世界服务器显示最大,我的世界最大的服务器是哪个
  6. 【程序员必修数学课】-基础思想篇-数学归纳法-如何用数学归纳提高代码效率
  7. Unity3D对象池的设计
  8. 基础知识复习,html、css、js
  9. 三种方式,教你优雅的替换if-else语句!
  10. 洛谷-P1706 全排列问题