高清思维导图已同步Git:https://github.com/SoWhat1412/xmindfile,关注公众号sowhat1412获取海量资源

1. 消息队列解决了什么问题

消息中间件是目前比较流行的一个中间件,其中RabbitMQ更是占有一定的市场份额,主要用来做异步处理、应用解耦、流量削峰、日志处理等等方面。

1. 异步处理

一个用户登陆网址注册,然后系统发短信跟邮件告知注册成功,一般有三种解决方法。

  1. 串行到依次执行,问题是用户注册后就可以使用了,没必要等验证码跟邮件。
  2. 注册成功后,邮件跟验证码用并行等方式执行,问题是邮件跟验证码是非重要的任务,系统注册还要等这俩完成么?
  3. 基于异步MQ的处理,用户注册成功后直接把信息异步发送到MQ中,然后邮件系统跟验证码系统主动去拉取数据。

2. 应用解耦

比如我们有一个订单系统,还要一个库存系统,用户下订单了就要调用下库存系统来处理,直接调用到话库存系统出现问题咋办呢?

3. 流量削峰

举办一个 秒杀活动,如何较好到设计?服务层直接接受瞬间搞密度访问绝对不可以起码要加入一个MQ。

4. 日志处理

用户通过WebUI访问发送请求到时候后端如何接受跟处理呢一般?

2. RabbitMQ 安装跟配置

官网:https://www.rabbitmq.com/download.html
开发语言:https://www.erlang.org/
正式到安装跟允许需要Erlang跟RabbitMQ俩版本之间相互兼容!我这里图省事直接用Docker 拉取镜像了。
下载:

开启:
管理页面 默认账号:guest 默认密码:guest 。Docker启动时候可以指定账号密码对外端口以及

docker run -d --hostname my-rabbit --name rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:management

启动:

用户添加:

vitrual hosts 相当于mysql中的DB。创建一个virtual hosts,一般以/ 开头。

对用户进行授权,点击/vhost_mmr,

至于WebUI多点点即可了解。

3. 实战

RabbitMQ 官网支持任务模式:https://www.rabbitmq.com/getstarted.html
创建Maven项目导入必要依赖:

    <dependencies><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>4.0.2</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.10</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.5</version></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.11</version></dependency></dependencies>

0. 获取MQ连接

package com.sowhat.mq.util;import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class ConnectionUtils {/*** 连接器* @return* @throws IOException* @throws TimeoutException*/public static Connection getConnection() throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(5672);factory.setVirtualHost("/vhost_mmr");factory.setUsername("user_mmr");factory.setPassword("sowhat");Connection connection = factory.newConnection();return connection;}
}

1. 简单队列


P:Producer 消息的生产者
中间:Queue消息队列
C:Consumer 消息的消费者

package com.sowhat.mq.simple;import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sowhat.mq.util.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Send {public static final String QUEUE_NAME = "test_simple_queue";public static void main(String[] args) throws IOException, TimeoutException {// 获取一个连接Connection connection = ConnectionUtils.getConnection();// 从连接获取一个通道Channel channel = connection.createChannel();// 创建队列声明AMQP.Queue.DeclareOk declareOk = channel.queueDeclare(QUEUE_NAME, false, false, false, null);String msg = "hello Simple";// exchange,队列,参数,消息字节体channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());System.out.println("--send msg:" + msg);channel.close();connection.close();}
}
---
package com.sowhat.mq.simple;import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** 消费者获取消息*/
public class Recv {public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {newApi();oldApi();}private static void newApi() throws IOException, TimeoutException {// 创建连接Connection connection = ConnectionUtils.getConnection();// 创建频道Channel channel = connection.createChannel();// 队列声明  队列名,是否持久化,是否独占模式,无消息后是否自动删除,消息携带参数channel.queueDeclare(Send.QUEUE_NAME,false,false,false,null);// 定义消费者DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {@Override  // 事件模型,消息来了会触发该函数public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String s = new String(body, "utf-8");System.out.println("---new api recv:" + s);}};// 监听队列channel.basicConsume(Send.QUEUE_NAME,true,defaultConsumer);}// 老方法 消费者 MQ 在3。4以下 用次方法,private static void oldApi() throws IOException, TimeoutException, InterruptedException {// 创建连接Connection connection = ConnectionUtils.getConnection();// 创建频道Channel channel = connection.createChannel();// 定义队列消费者QueueingConsumer consumer = new QueueingConsumer(channel);//监听队列channel.basicConsume(Send.QUEUE_NAME, true, consumer);while (true) {// 发货体QueueingConsumer.Delivery delivery = consumer.nextDelivery();byte[] body = delivery.getBody();String s = new String(body);System.out.println("---Recv:" + s);}}
}

右上角有可以设置页面刷新频率,然后可以在UI界面直接手动消费掉,如下图:

简单队列的不足:耦合性过高,生产者一一对应消费者,如果有多个消费者想消费队列中信息就无法实现了。

2. WorkQueue 工作队列

Simple队列中只能一一对应的生产消费,实际开发中生产者发消息很简单,而消费者要跟业务结合,消费者接受到消息后要处理从而会耗时。可能会出现队列中出现消息积压。所以如果多个消费者可以加速消费。

1. round robin 轮询分发

代码编程一个生产者两个消费者:

package com.sowhat.mq.work;import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sowhat.mq.util.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Send {public static final String  QUEUE_NAME = "test_work_queue";public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {// 获取连接Connection connection = ConnectionUtils.getConnection();// 获取 channelChannel channel = connection.createChannel();// 声明队列AMQP.Queue.DeclareOk declareOk = channel.queueDeclare(QUEUE_NAME, false, false, false, null);for (int i = 0; i <50 ; i++) {String msg = "hello-" + i;System.out.println("WQ send " + msg);channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());Thread.sleep(i*20);}channel.close();connection.close();}
}---
package com.sowhat.mq.work;import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Recv1 {public static void main(String[] args) throws IOException, TimeoutException {// 获取连接Connection connection = ConnectionUtils.getConnection();// 获取通道Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(Send.QUEUE_NAME, false, false, false, null);//定义消费者DefaultConsumer consumer = new DefaultConsumer(channel) {@Override // 事件触发机制public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String s = new String(body, "utf-8");System.out.println("【1】:" + s);try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();} finally {System.out.println("【1】 done");}}};boolean autoAck = true;channel.basicConsume(Send.QUEUE_NAME, autoAck, consumer);}
}
---
package com.sowhat.mq.work;import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Recv2 {public static void main(String[] args) throws IOException, TimeoutException {// 获取连接Connection connection = ConnectionUtils.getConnection();// 获取通道Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(Send.QUEUE_NAME, false, false, false, null);//定义消费者DefaultConsumer consumer = new DefaultConsumer(channel) {@Override // 事件触发机制public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String s = new String(body, "utf-8");System.out.println("【2】:" + s);try {Thread.sleep(1000 );} catch (InterruptedException e) {e.printStackTrace();} finally {System.out.println("【2】 done");}}};boolean autoAck = true;channel.basicConsume(Send.QUEUE_NAME, autoAck, consumer);}
}

现象:
消费者1 跟消费者2 处理的数据量完全一样的个数:
消费者1:处理偶数
消费者2:处理奇数
这种方式叫轮询分发(round-robin)结果就是不管两个消费者谁忙,数据总是你一个我一个,MQ 给两个消费发数据的时候是不知道消费者性能的,默认就是雨露均沾。此时 autoAck = true。

2. 公平分发 fair dipatch

如果要实现公平分发,要让消费者消费完毕一条数据后就告知MQ,再让MQ发数据即可。自动应答要关闭!

package com.sowhat.mq.work;import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sowhat.mq.util.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Send {public static final String  QUEUE_NAME = "test_work_queue";public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {// 获取连接Connection connection = ConnectionUtils.getConnection();// 获取 channelChannel channel = connection.createChannel();// s声明队列AMQP.Queue.DeclareOk declareOk = channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 每个消费者发送确认消息之前,消息队列不发送下一个消息到消费者,一次只发送一个消息// 从而限制一次性发送给消费者到消息不得超过1个。int perfetchCount = 1;channel.basicQos(perfetchCount);for (int i = 0; i <50 ; i++) {String msg = "hello-" + i;System.out.println("WQ send " + msg);channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());Thread.sleep(i*20);}channel.close();connection.close();}
}
---
package com.sowhat.mq.work;import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Recv1 {public static void main(String[] args) throws IOException, TimeoutException {// 获取连接Connection connection = ConnectionUtils.getConnection();// 获取通道final Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(Send.QUEUE_NAME, false, false, false, null);// 保证一次只分发一个channel.basicQos(1);//定义消费者DefaultConsumer consumer = new DefaultConsumer(channel) {@Override // 事件触发机制public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String s = new String(body, "utf-8");System.out.println("【1】:" + s);try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();} finally {System.out.println("【1】 done");// 手动回执channel.basicAck(envelope.getDeliveryTag(),false);}}};// 自动应答boolean autoAck = false;channel.basicConsume(Send.QUEUE_NAME, autoAck, consumer);}
}
---
package com.sowhat.mq.work;import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Recv2 {public static void main(String[] args) throws IOException, TimeoutException {// 获取连接Connection connection = ConnectionUtils.getConnection();// 获取通道final Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(Send.QUEUE_NAME, false, false, false, null);// 保证一次只分发一个channel.basicQos(1);//定义消费者DefaultConsumer consumer = new DefaultConsumer(channel) {@Override // 事件触发机制public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String s = new String(body, "utf-8");System.out.println("【2】:" + s);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();} finally {System.out.println("【2】 done");// 手动回执channel.basicAck(envelope.getDeliveryTag(),false);}}};// 自动应答boolean autoAck = false;channel.basicConsume(Send.QUEUE_NAME, autoAck, consumer);}
}

结果:实现了公平分发,消费者2 是消费者1消费数量的2倍。

3. publish/subscribe 发布订阅模式

类似公众号的订阅跟发布,无需指定routingKey:

解读:

  1. 一个生产者多个消费者
  2. 每一个消费者都有一个自己的队列
  3. 生产者没有把消息直接发送到队列而是发送到了交换机转化器(exchange)
  4. 每一个队列都要绑定到交换机上。
  5. 生产者发送的消息经过交换机到达队列,从而实现一个消息被多个消费者消费。

生产者:

package com.sowhat.mq.ps;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sowhat.mq.util.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Send {public static final String EXCHANGE_NAME = "test_exchange_fanout";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtils.getConnection();Channel channel = connection.createChannel();//声明交换机channel.exchangeDeclare(EXCHANGE_NAME,"fanout");// 分发= fanout// 发送消息String msg = "hello ps ";channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes());System.out.println("Send:" + msg);channel.close();connection.close();}
}

消息哪儿去了?丢失了,在RabbitMQ中只有队列有存储能力,因为这个时候队列还没有绑定到交换机 所以消息丢失了
消费者:

package com.sowhat.mq.ps;import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Recv1 {public static final String  QUEUE_NAME = "test_queue_fanout_email";public static final String EXCHANGE_NAME = "test_exchange_fanout";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtils.getConnection();final Channel channel = connection.createChannel();// 队列声明channel.queueDeclare(QUEUE_NAME,false,false,false,null);// 绑定队列到交换机转发器channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"" );// 保证一次只分发一个channel.basicQos(1);//定义消费者DefaultConsumer consumer = new DefaultConsumer(channel) {@Override // 事件触发机制public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String s = new String(body, "utf-8");System.out.println("【1】:" + s);try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();} finally {System.out.println("【1】 done");// 手动回执channel.basicAck(envelope.getDeliveryTag(),false);}}};// 自动应答boolean autoAck = false;channel.basicConsume(QUEUE_NAME, autoAck, consumer);}
}
---
package com.sowhat.mq.ps;import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Recv2 {public static final String  QUEUE_NAME = "test_queue_fanout_sms";public static final String EXCHANGE_NAME = "test_exchange_fanout";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtils.getConnection();final Channel channel = connection.createChannel();// 队列声明channel.queueDeclare(QUEUE_NAME,false,false,false,null);// 绑定队列到交换机转发器channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"" );// 保证一次只分发一个channel.basicQos(1);//定义消费者DefaultConsumer consumer = new DefaultConsumer(channel) {@Override // 事件触发机制public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String s = new String(body, "utf-8");System.out.println("【2】:" + s);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();} finally {System.out.println("【2】 done");// 手动回执channel.basicAck(envelope.getDeliveryTag(),false);}}};// 自动应答boolean autoAck = false;channel.basicConsume(QUEUE_NAME, autoAck, consumer);}
}


同时还可以自己手动的添加一个队列监控到该exchange

4. routing 路由选择 通配符模式

Exchange(交换机,转发器):一方面接受生产者消息,另一方面是向队列推送消息
匿名转发用 “” 表示,比如前面到简单队列跟WorkQueue。
fanout:不处理路由键。不需要指定routingKey,我们只需要把队列绑定到交换机, 消息就会被发送到所有到队列中
direct:处理路由键,需要指定routingKey,此时生产者发送数据到时候会指定key,任务队列也会指定key,只有key一样消息才会被传送到队列中。如下图

package com.sowhat.mq.routing;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sowhat.mq.util.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Send {public static final String  EXCHANGE_NAME = "test_exchange_direct";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtils.getConnection();Channel channel = connection.createChannel();// exchangechannel.exchangeDeclare(EXCHANGE_NAME,"direct");String msg = "hello info!";// 可以指定类型String routingKey = "info";channel.basicPublish(EXCHANGE_NAME,routingKey,null,msg.getBytes());System.out.println("Send : " + msg);channel.close();connection.close();}
}
---
package com.sowhat.mq.routing;import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Recv1 {public static final String  EXCHANGE_NAME = "test_exchange_direct";public static final String QUEUE_NAME = "test_queue_direct_1";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtils.getConnection();final Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME,false,false,false,null);channel.basicQos(1);channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"error");//定义消费者DefaultConsumer consumer = new DefaultConsumer(channel) {@Override // 事件触发机制public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String s = new String(body, "utf-8");System.out.println("【1】:" + s);try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();} finally {System.out.println("【1】 done");// 手动回执channel.basicAck(envelope.getDeliveryTag(),false);}}};// 自动应答boolean autoAck = false;channel.basicConsume(QUEUE_NAME, autoAck, consumer);}
}
---
package com.sowhat.mq.routing;import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Recv2 {public static final String EXCHANGE_NAME = "test_exchange_direct";public static final String QUEUE_NAME = "test_queue_direct_2";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtils.getConnection();final Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.basicQos(1);// 绑定种类似 Keychannel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning");//定义消费者DefaultConsumer consumer = new DefaultConsumer(channel) {@Override // 事件触发机制public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String s = new String(body, "utf-8");System.out.println("【2】:" + s);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();} finally {System.out.println("【2】 done");// 手动回执channel.basicAck(envelope.getDeliveryTag(), false);}}};// 自动应答boolean autoAck = false;channel.basicConsume(QUEUE_NAME, autoAck, consumer);}
}

WebUI:

缺点:路由key必须要明确,无法实现规则性模糊匹配。

5. Topics 主题

将路由键跟某个模式匹配,# 表示匹配 >=1个字符, *表示匹配一个。生产者会带routingKey,但是消费者的MQ会带模糊routingKey。

商品:发布、删除、修改、查询。

package com.sowhat.mq.topic;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sowhat.mq.util.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Send {public static final String EXCHANGE_NAME = "test_exchange_topic";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtils.getConnection();Channel channel = connection.createChannel();// exchangechannel.exchangeDeclare(EXCHANGE_NAME, "topic");String msg = "商品!";// 可以指定类型String routingKey = "goods.find";channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());System.out.println("Send : " + msg);channel.close();connection.close();}
}
---
package com.sowhat.mq.topic;import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Recv1 {public static final String  EXCHANGE_NAME = "test_exchange_topic";public static final String QUEUE_NAME = "test_queue_topic_1";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtils.getConnection();final Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME,false,false,false,null);channel.basicQos(1);channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"goods.add");//定义消费者DefaultConsumer consumer = new DefaultConsumer(channel) {@Override // 事件触发机制public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String s = new String(body, "utf-8");System.out.println("【1】:" + s);try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();} finally {System.out.println("【1】 done");// 手动回执channel.basicAck(envelope.getDeliveryTag(),false);}}};// 自动应答boolean autoAck = false;channel.basicConsume(QUEUE_NAME, autoAck, consumer);}
}
---
package com.sowhat.mq.topic;import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Recv2 {public static final String EXCHANGE_NAME = "test_exchange_topic";public static final String QUEUE_NAME = "test_queue_topic_2";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtils.getConnection();final Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.basicQos(1);// 此乃重点channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.#");//定义消费者DefaultConsumer consumer = new DefaultConsumer(channel) {@Override // 事件触发机制public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String s = new String(body, "utf-8");System.out.println("【2】:" + s);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();} finally {System.out.println("【2】 done");// 手动回执channel.basicAck(envelope.getDeliveryTag(), false);}}};// 自动应答boolean autoAck = false;channel.basicConsume(QUEUE_NAME, autoAck, consumer);}
}

6. MQ的持久化跟非持久化

因为消息在内存中,如果MQ挂了那么消息也丢失了,所以应该考虑MQ的持久化。MQ是支持持久化的,

// 声明队列
channel.queueDeclare(Send.QUEUE_NAME, false, false, false, null);/*** Declare a queue* @see com.rabbitmq.client.AMQP.Queue.Declare* @see com.rabbitmq.client.AMQP.Queue.DeclareOk* @param queue the name of the queue* @param durable true if we are declaring a durable queue (the queue will survive a server restart)* @param exclusive true if we are declaring an exclusive queue (restricted to this connection)* @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)* @param arguments other properties (construction arguments) for the queue* @return a declaration-confirm method to indicate the queue was successfully declared* @throws java.io.IOException if an error is encountered*/Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments) throws IOException;

boolean durable就是表明是否可以持久化,如果我们将程序中的durable = false改为true是不可以的!因为我们已经定义过的test_work_queue,这个queue已声明为未持久化的。
结论:MQ 不允许修改一个已经存在的队列参数。

7. 消费者端手动跟自动确认消息

        // 自动应答boolean autoAck = false;channel.basicConsume(Send.QUEUE_NAME, autoAck, consumer);

当MQ发送数据个消费者后,消费者要对收到对信息应答给MQ。

如果autoAck = true 表示自动确认模式,一旦MQ把消息分发给消费者就会把消息从内存中删除。如果消费者收到消息但是还没有消费完而MQ中数据已删除则会导致丢失了正在处理对消息。

如果autoAck = false表示手动确认模式,如果有个消费者挂了,MQ因为没有收到回执信息可以把该信息再发送给其他对消费者。

MQ支持消息应答(Message acknowledgement),消费者发送一个消息应答告诉MQ这个消息已经被消费了,MQ才从内存中删除。消息应答模式默认为 false

8. RabbitMQ生产者端消息确认机制(事务 + confirm)

在RabbitMQ中我们可以通过持久化来解决MQ服务器异常的数据丢失问题,但是生产者如何确保数据发送到MQ了?默认情况下生产者也是不知道的。如何解决 呢?

1. AMQP事务

第一种方式AMQP实现了事务机制,类似mysql的事务机制。
txSelect:用户将当前channel设置为transition模式。
txCommit:用于提交事务。
txRollback:用于回滚事务。

以上都是对生产者对操作。

package com.sowhat.mq.tx;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sowhat.mq.util.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class TxSend {public static final String QUEUE_NAME = "test_queue_tx";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtils.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);String msg = "hello tx message";try {//开启事务模式channel.txSelect();channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());int x = 1 / 0;// 提交事务channel.txCommit();} catch (IOException e) {// 回滚channel.txRollback();System.out.println("send message rollback");} finally {channel.close();connection.close();}}
}
---
package com.sowhat.mq.tx;import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class TxRecv {public static final String QUEUE_NAME = "test_queue_tx";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtils.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);String s = channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("recv[tx] msg:" + new String(body, "utf-8"));}});channel.close();connection.close();}
}

缺点就是大量对请求尝试然后失败然后回滚,会降低MQ的吞吐量。

2. Confirm模式。

生产者端confirm实现原理
生产者将信道设置为confirm模式,一旦信道进入了confirm模式,所以该信道上发布的信息都会被派一个唯一的ID(从1开始),一旦消息被投递到所有的匹配队列后,Broker就回发送一个确认给生产者(包含消息唯一ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息跟队列是可持久化的,那么确认消息会在消息写入到磁盘后才发出。broker回传给生产者到确认消息中deliver-tag域包含了确认消息到序列号,此外broker也可以设置basic.ack的multiple域,表示这个序列号之前所以信息都已经得到处理。

Confirm模式最大的好处在于是异步的。第一条消息发送后不用一直等待回复后才发第二条消息。

开启confirm模式:channel.confimSelect()
编程模式:

1. 普通的发送一个消息后就 waitForConfirms()
package com.sowhat.confirm;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sowhat.mq.util.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Send1 {public static final String QUEUE_NAME = "test_queue_confirm1";public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {Connection connection = ConnectionUtils.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 将channel模式设置为 confirm模式,注意设置这个不能设置为事务模式。channel.confirmSelect();String msg = "hello confirm message";channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());if (!channel.waitForConfirms()) {System.out.println("消息发送失败");} else {System.out.println("消息发送OK");}channel.close();connection.close();}
}
---
package com.sowhat.confirm;import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Recv {public static final String QUEUE_NAME = "test_queue_confirm1";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtils.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);String s = channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("recv[tx] msg:" + new String(body, "utf-8"));}});}
}
2. 批量的发一批数据 waitForConfirms()
package com.sowhat.confirm;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sowhat.mq.util.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Send2 {public static final String QUEUE_NAME = "test_queue_confirm1";public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {Connection connection = ConnectionUtils.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 将channel模式设置为 confirm模式,注意设置这个不能设置为事务模式。channel.confirmSelect();String msg = "hello confirm message";// 批量发送for (int i = 0; i < 10; i++) {channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());}// 确认if (!channel.waitForConfirms()) {System.out.println("消息发送失败");} else {System.out.println("消息发送OK");}channel.close();connection.close();}
}
---
接受信息跟上面一样
3. 异步confirm模式,提供一个回调方法。

Channel对象提供的ConfirmListener()回调方法只包含deliveryTag(包含当前发出消息序号),我们需要自己为每一个Channel维护一个unconfirm的消息序号集合,每publish一条数据,集合中元素加1,每回调一次handleAck方法,unconfirm集合删掉响应的一条(multiple=false)或多条(multiple=true)记录,从运行效率来看,unconfirm集合最好采用有序集合SortedSet存储结构。
![在这里插入图片描述](https://img-blog.csdnimg.cn/2020071921400852.png#pic_ enter)

package com.sowhat.mq.confirm;import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;import java.io.IOException;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.TimeoutException;public class Send3 {public static final String QUEUE_NAME = "test_queue_confirm3";public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {Connection connection = ConnectionUtils.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);//生产者调用confirmSelectchannel.confirmSelect();// 存放未确认消息final SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());// 添加监听通道channel.addConfirmListener(new ConfirmListener() {// 回执有问题的public void handleAck(long deliveryTag, boolean multiple) throws IOException {if (multiple) {System.out.println("--handleNack---multiple");confirmSet.headSet(deliveryTag + 1).clear();} else {System.out.println("--handleNack-- multiple false");confirmSet.remove(deliveryTag);}}// 没有问题的handleAckpublic void handleNack(long deliveryTag, boolean multiple) throws IOException {if (multiple) {System.out.println("--handleAck---multiple");confirmSet.headSet(deliveryTag + 1).clear();} else {System.out.println("--handleAck--multiple false");confirmSet.remove(deliveryTag);}}});// 一般情况下是先开启 消费者,指定好 exchange跟routingkey,如果生产者等routingkey 就会触发这个return 方法channel.addReturnListener(new ReturnListener() {public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("---- handle return----");System.out.println("replyCode:" + replyCode );System.out.println("replyText:" +replyText );System.out.println("exchange:" + exchange);System.out.println("routingKey:" + routingKey);System.out.println("properties:" + properties);System.out.println("body:" + new String(body));}});String msgStr = "sssss";while(true){long nextPublishSeqNo = channel.getNextPublishSeqNo();channel.basicPublish("",QUEUE_NAME,null,msgStr.getBytes());confirmSet.add(nextPublishSeqNo);Thread.sleep(1000);}}
}

总结:AMQP模式相对来说没Confirm模式性能好些,推荐使用后者。

9. RabbitMQ延迟队列 跟死信

淘宝订单付款,验证码等限时类型服务。

        Map<String,Object> headers =  new HashMap<String,Object>();headers.put("my1","111");headers.put("my2","222");AMQP.BasicProperties build = new AMQP.BasicProperties().builder().deliveryMode(2).contentEncoding("utf-8").expiration("10000").headers(headers).build();

死信的处理:

10. SpringBoot Tpoic Demo

需求图:

新建SpringBoot 项目添加如下依赖:

       <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
1. 生产者

application.yml

spring:rabbitmq:host: 127.0.0.1username: adminpassword: admin

测试用例:

package com.sowhat.mqpublisher;import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest
class MqpublisherApplicationTests {@Autowiredprivate AmqpTemplate amqpTemplate;@Testvoid userInfo() {/*** exchange,routingKey,message*/this.amqpTemplate.convertAndSend("log.topic","user.log.error","Users...");}
}
2. 消费者

application.xml

spring:rabbitmq:host: 127.0.0.1username: adminpassword: admin# 自定义配置
mq:config:exchange_name: log.topic# 配置队列名称queue_name:info: log.infoerror: log.errorlogs: log.logs

三个不同的消费者:

package com.sowhat.mqconsumer.service;import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;/*** @QueueBinding value属性:用于绑定一个队列。@Queue去查找一个名字为value属性中的值得队列,如果没有则创建,如果有则返回* type = ExchangeTypes.TOPIC 指定交换器类型。默认的direct交换器*/
@Service
public class ErrorReceiverService {/*** 把一个方法跟一个队列进行绑定,收到消息后绑定给msg*/@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "${mq.config.queue_name.error}"),exchange = @Exchange(value = "${mq.config.exchange_name}", type = ExchangeTypes.TOPIC),key = "*.log.error"))public void process(String msg) {System.out.println(msg + " Logs...........");}
}
---
package com.sowhat.mqconsumer.service;import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;/*** @QueueBinding value属性:用于绑定一个队列。* @Queue去查找一个名字为value属性中的值得队列,如果没有则创建,如果有则返回*/
@Service
public class InfoReceiverService {/*** 添加一个能够处理消息的方法*/@RabbitListener(bindings = @QueueBinding(value = @Queue(value ="${mq.config.queue_name.info}"),exchange = @Exchange(value = "${mq.config.exchange_name}",type = ExchangeTypes.TOPIC),key = "*.log.info"))public void process(String msg){System.out.println(msg+" Info...........");}
}
--
package com.sowhat.mqconsumer.service;import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;/*** @QueueBinding value属性:用于绑定一个队列。* @Queue去查找一个名字为value属性中的值得队列,如果没有则创建,如果有则返回*/
@Service
public class LogsReceiverService {/*** 添加一个能够处理消息的方法*/@RabbitListener(bindings = @QueueBinding(value = @Queue(value ="${mq.config.queue_name.logs}"),exchange = @Exchange(value = "${mq.config.exchange_name}",type = ExchangeTypes.TOPIC),key = "*.log.*"))public void process(String msg){System.out.println(msg+" Error...........");}
}

详细安装跟代码看参考下载:

总结

如果需要指定模式一般是在消费者端设置,灵活性调节。

模式 生产者Queue 生产者exchange 生产者routingKey 消费者exchange 消费者queue routingKey
Simple(简单模式少用) 指定 不指定 不指定 不指定 指定 不指定
WorkQueue(多个消费者少用) 指定 不指定 不指定 不指定 指定 不指定
fanout(publish/subscribe模式) 不指定 指定 不指定 指定 指定 不指定
direct(路由模式) 不指定 指定 指定 指定 指定 消费者routingKey精确指定多个
topic(主题模糊匹配) 不指定 指定 指定 指定 指定 消费者routingKey可以进行模糊匹配

参考

SpringBoot整合RabbitMQ
RabbitMQ安装跟SpringBoot整合demo
RabbitMQ极速入门
RabbitMQdemo

RabbitMQ极速入门相关推荐

  1. 慕课网_《RabbitMQ消息中间件极速入门与实战》学习总结

    慕课网<RabbitMQ消息中间件极速入门与实战>学习总结 时间:2018年09月05日星期三 说明:本文部分内容均来自慕课网.@慕课网:https://www.imooc.com 教学源 ...

  2. RabbitMQ 从入门到精通 (一)

    目录 1. 初识RabbitMQ 2. AMQP 3.RabbitMQ的极速入门 4. Exchange(交换机)详解 4.1 Direct Exchange 4.2 Topic Exchange 4 ...

  3. RabbitMQ(一):RabbitMQ快速入门

    RabbitMQ是目前非常热门的一款消息中间件,不管是互联网大厂还是中小企业都在大量使用.作为一名合格的开发者,有必要对RabbitMQ有所了解,本文是RabbitMQ快速入门文章,主要内容包括Rab ...

  4. 九十分钟极速入门Linux——Linux Guide for Developments 学习笔记

    转载自: 九十分钟极速入门Linux--Linux Guide for Developments 学习笔记 http://mp.weixin.qq.com/s?__biz=MzAwNTMxMzg1MA ...

  5. PyTorch 深度学习: 60 分钟极速入门

    PyTorch 深度学习: 60 分钟极速入门 2019年年初,ApacheCN组织志愿者翻译了PyTorch1.2版本中文文档(github地址),同时也获得了PyTorch官方授权,我相信已经有许 ...

  6. Knative 初体验:CICD 极速入门

    Knative 社区很早就在讨论用 Tekton 替换 Build 模块的相关事宜.Knative Build 官方已经正式说明不再建议使用 Knative Build 了. 如果你知道 Knativ ...

  7. 60 分钟极速入门 PyTorch

    2017 年初,Facebook 在机器学习和科学计算工具 Torch 的基础上,针对 Python 语言发布了一个全新的机器学习工具包 PyTorch. 因其在灵活性.易用性.速度方面的优秀表现,经 ...

  8. rabbitmq详细入门文档+springboot结合使用

    在介绍RabbitMQ之前,我们先来看下面一个电商项目的场景: 商品的原始数据保存在数据库中,增删改查都在数据库中完成. 搜索服务数据来源是索引库(Elasticsearch),如果数据库商品发生变化 ...

  9. 24小时学会python_零基础python24小时极速入门学习

    本篇文章是 Python 入门必须掌握的知识点,24小时极速入门,是针对那些没有任何编程经验,从零开始学习 Python 的同学 想要快速学会python的6个关键点. 分别是:数据类型. 条件判断. ...

  10. RabbitMQ的入门程序test测试代码

    RabbitMQ有以下几种工作模式 : 1.Work queues  工作队列 2.Publish/Subscribe 发布订阅 3.Routing      路由 4.Topics        通 ...

最新文章

  1. [ Nowcoder Contest 165 #D ] 合法括号序列
  2. TCP/IP学习笔记(五)TCP的保活定时器
  3. 【渝粤教育】国家开放大学2019年春季 776员工招聘与管理 参考试题
  4. Python列表基础
  5. 八皇后问题 (信息学奥赛一本通-T1213)
  6. react学习预备知识_在10分钟内学习React基础知识
  7. Google Android 向华为“闭源”!华为手机迎来至暗时刻!
  8. python怎么在运行中查看执行状态_python,_python程序运行时 查看对象状态,python - phpStudy...
  9. 本特利振动前置器330180-51-00
  10. linux还原防火墙设置,Linux防火墙设置教程
  11. python黑底白字的车牌_Python+OpenCV实现车牌字符分割和识别
  12. 阿里云调用api配置access_key
  13. 微信小程序中好看的按钮样式(渐变色)、view的点击变色效果、按钮漂亮的圆角边框、解决hover设置失效
  14. apache和nginx对比
  15. Python面向对象练习题汇总
  16. AndroidStudio导入arr包
  17. A WebView method was called on thread 'JavaBridge'. All WebView methods must be called
  18. n级阶梯,每次走一步或两步,问最多有多少种走法
  19. 家居安防与监控全国产化电子元件推荐方案
  20. Button按钮的用法

热门文章

  1. NUC搭建Centos8服务器
  2. h3c路由器msr2600-10,msr3600-28调试记录
  3. 解决连接远程服务器MySQL“ACCESS DENIED FOR USER‘ROOT‘@‘IP地址‘“问题
  4. 广电物联网大赛正式开启
  5. Android实现网页动态背景“五彩蛛网”
  6. JavaScript 获取当前是周几
  7. Simon Phipps是开源计划总裁
  8. 迷茫时代的明白人——书摘
  9. 我的大学十年 -- 林锐
  10. shopnc route.php,shopnc自动结算的问题