因为公司项目后面需要用到mq做数据的同步,所以学习mq并在此记录,这里的是rabbitMq

mq(message queue)消息队列

官网:www.rabbitmq.com
使用消息队列的优点:1、异步可加快访问速度 (以前一个订单接口需要做下单、库存、付款、快递等相关操作,有了mq只需要给相关信息传入队列,下单、库存、付款、快递等相关操作会自动从队列中收到信息进行异步操作)2、解耦下游服务或其他服务或语言可接入3、削峰高并发访问量可分摊多个队列分摊
缺点:1、系统可用性降低(一旦mq挂了系统就宕机了)2、系统复杂性增大 (增加了mq模块需要考虑更多)

RabbitMQ的高级特性

  • 消费端限流
  • TTL 全称time to live(存活时间/过期时间) - 当消息到达存活时间后还没被消费会被丢弃 ttl+死信队列可实现延迟队列效果
  • 死信队列
  • 延迟队列
  • 消息可靠性投递
  • Consumer ACK

rabbitMq为了确保消息投递的可靠性提供了两种方式 confirm和return

rabbitmq整个消息投递的路径为
producer--->rabbitmq broker--->exchange--->queue--->consumer
1.消息从producer到exchange则会返回一个confirmCallback.
2.消息从exchange到queue投递失败则会返回一个returnCallBack.
我们将利用这两个callback控制消息的可靠性投递

Consumer ACK

ack指acknowledge,确认。表示消费者端接收到消息后的确认方式
有三种方式确认:自动确认:acknowledge="none"手动确认:acknowledge="manual"根据异常情况确认:acknowledge="auto"自动确认指,当消息一旦被消费者接收到,则自动确认收到,并将相应的message从mq的消息缓存中移除。
但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。
如果设置了手动确认模式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,
则调用channel.basicNack()方法,让其自动重新发送消息。

我这里学习了前面五种

1:简单模式
2:工作队列模式
3:发布订阅模式
4:路由模式
5:主题模式

简单模式:即一条线一个发送到队列,队列发送到接收者
工作队列模式:即有一个发送者发送信息到队列,队列发给多个接收者,比如群发
发布订阅模式:这个是使用的最多的,发布者需要先发送到交换机,交换机再发送到与之绑定的队列, 然后队列在发送到与之绑定队列的接收者
路由模式:路由模式在发布订阅上增加了条件筛选,在消息到达交换机后发送队列时进行条件匹配,匹配成功才能发送给对应绑定的队列,最后再发送给接收者
主题模式:主题模式在路由模式上面进行升级,条件可进行模糊匹配,通配符规则 #可以匹配多个词 * 只能匹配一个词 如:test.# 匹配 test.one.tow test.one.q.wqe / test.* 匹配 test.one test.two

先安装rabbitMq,不同的环境可安装相关的版本,我这里已经安装好了

然后运行sbin下面的rabbitmq-server.bat


然后网页localhost:15672,如下页面即安装成功



然后去rabbitmq的官网


左边是下载右边是文档

文档中也会有一些代码案例,点击文档可以看到mq有七种方式

第一个是在测试的时候需要引入的包,第二个是在springboot上需要引入的包

com.rabbitmq
amqp-client
5.3.0

    <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

一:简单模式

我给mq的连接封装在工具类里,一些队列名放在常量类里了
工具类代码:

package com.lansi.realtynavi.test.utils;import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;/*** @Description 描述* @Date 2021/3/23 11:22* @Created by huyao*/
public class RabbitUtils {public static ConnectionFactory factory = new ConnectionFactory();static {factory.setHost("localhost");}public static Connection getConnection() throws Exception{Connection connection = null;try {//获取长连接connection  = factory.newConnection();}catch (Exception e){e.printStackTrace();}/*finally {connection.close();} */return connection;}}

常量类代码:

package com.lansi.realtynavi.test.constant;/*** @Description 描述* @Date 2021/3/23 11:01* @Created by huyao*/
public class MqConstant {public static final String MQ_HELLO_WORD = "helloWord";public static final String MQ_PUBLISH = "publish";public static final String MQ_ROUTING = "routing";public static final String MQ_TOPICS = "topics";public static final String MQ_WORK_QUEUES = "workQueues";public static final String MQ_QUEUE_BAIDU = "baidu";public static final String MQ_QUEUE_XINLANG = "xinlang";public static final String MQ_PUBLISH_JHJ = "jiaohuanji";public static final String MQ_ROUTING_JHJ = "jiaohuanjiRout";public static final String MQ_TOPIC_JHJ = "jiaohuanjiTopic";}

生产者代码

package com.lansi.realtynavi.test.helloWord;import com.lansi.realtynavi.test.constant.MqConstant;
import com.lansi.realtynavi.test.utils.RabbitUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;/*** @Description 简单模式* @Date 2021/3/22 17:19* @Created by huyao*/
public class Producer {public static void main(String[] args) throws Exception{Channel channel = null;Connection connection = null;try {//获取长连接connection = RabbitUtils.getConnection();channel = connection.createChannel();channel.queueDeclare(MqConstant.MQ_HELLO_WORD, false, false, false, null);String message = "这是我发送的第三个队列消息";//第一个参数是交换机信息   简单队列不需要交换机  第二个参数队列名称 ,第三个额外信息,第四个需要发布的信息channel.basicPublish("", MqConstant.MQ_HELLO_WORD, null, message.getBytes());System.out.println("[x] Send ‘" + message + "’");}catch (Exception e){e.printStackTrace();}finally {channel.close();connection.close();}}}

消费者代码:

package com.lansi.realtynavi.test.helloWord;import com.lansi.realtynavi.test.constant.MqConstant;
import com.lansi.realtynavi.test.utils.RabbitUtils;
import com.rabbitmq.client.*;import java.io.IOException;/*** @Description 描述* @Date 2021/3/22 17:27* @Created by huyao*/
public class Consumer {public static void main(String[] argv) throws Exception {//连接Connection connection = RabbitUtils.getConnection();Channel channel = connection.createChannel();//声明并创建一个队列//参数1 队列ID//参数2 是否持久化,false对应不持久化数据,mq停掉数据就会丢失//参数3 是否队列私有化,false则代表所有消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用//参数4 是否自动删除, false代表连接停掉后不自动删除这个队列// 其他额外的参数,nullchannel.queueDeclare(MqConstant.MQ_HELLO_WORD, false, false, false, null);//从MQ服务器中获取数据//创建一个消息消费者//参数1:队列ID//参数2:代表是否自动确认收到消息,false代表手动编程来确认消息,这是mq的推荐做法//参数3:参数要传入的DefaultConsumer的实现类channel.basicConsume(MqConstant.MQ_HELLO_WORD, false, new Reciver(channel));}
}class Reciver extends DefaultConsumer {private Channel channel;//重写构造函数,Channel通道对象需要从外层传入,在handleDelivery中用到public Reciver(Channel channel) {super(channel);this.channel = channel;}@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body);System.out.println("消费者接收到的消息:"+message);System.out.println("消息的ID:"+envelope.getDeliveryTag());//false只确认签收当前的消息,设置为true的时候则代表签收该消费者所有未签收的消息channel.basicAck(envelope.getDeliveryTag(), false);}
}

测试的时候队列需要手动去创建,不过springboot的话可以自动创建

这里已经手动创建好了
运行接收者,运行启动者


这里接收者自动接收消息

二:工作队列模式

  一个队列多个接收者

生产者代码:

package com.lansi.realtynavi.test.workQueues;import com.google.gson.Gson;
import com.lansi.realtynavi.test.constant.MqConstant;
import com.lansi.realtynavi.test.utils.RabbitUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;/*** @Description 工作队列模式* @Date 2021/3/22 17:33* @Created by huyao*/
public class Producer {public static void main(String[] args) throws Exception{Connection connection = RabbitUtils.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(MqConstant.MQ_WORK_QUEUES, false, false, false, null);for(int i = 1; i<=20; i++){SMS sms = new SMS("乘客" + i, "123456789", "你的车票已预订成功");String message = new Gson().toJson(sms);channel.basicPublish("", MqConstant.MQ_WORK_QUEUES, null, message.getBytes());}System.out.println("发送数据成功");channel.close();connection.close();}}

封装对象代码:

package com.lansi.realtynavi.test.workQueues;/*** @Description 描述* @Date 2021/3/23 11:28* @Created by huyao*/
public class SMS {private String name;private String mobile;private String content;public SMS(String name, String mobile, String content) {this.name = name;this.mobile = mobile;this.content = content;}public String getName() {return name;}public void setName(String name) {this.name = name;}public String getMobile() {return mobile;}public void setMobile(String mobile) {this.mobile = mobile;}public String getContent() {return content;}public void setContent(String content) {this.content = content;}
}

三个接收者代码

接收者1

package com.lansi.realtynavi.test.workQueues;import com.lansi.realtynavi.test.constant.MqConstant;
import com.lansi.realtynavi.test.utils.RabbitUtils;
import com.rabbitmq.client.*;import java.io.IOException;/*** @Description 描述* @Date 2021/3/23 11:33* @Created by huyao*/
public class Consumer1 {public static void main(String[] args) throws Exception{Connection connection = RabbitUtils.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(MqConstant.MQ_WORK_QUEUES, false, false, false, null);//如果不写baiscQos(1) 则自动mq会将所有请求平均发送给所有消费者//baiscQos,mq不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),再从队列中获取一个新的channel.basicQos(1);//处理完一个取一个channel.basicConsume(MqConstant.MQ_WORK_QUEUES, false, new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body);System.out.println("smsConsumer1-短信发送成功:"+message);//服务器好的话可以在这里睡眠   这里可动态配置开启和设置睡眠时间/*try {Thread.sleep(10);}catch (Exception e){e.printStackTrace();}*/channel.basicAck(envelope.getDeliveryTag(), false);}});}
}

接收者2

package com.lansi.realtynavi.test.workQueues;import com.lansi.realtynavi.test.constant.MqConstant;
import com.lansi.realtynavi.test.utils.RabbitUtils;
import com.rabbitmq.client.*;import java.io.IOException;/*** @Description 描述* @Date 2021/3/23 11:40* @Created by huyao*/
public class Consumer2 {public static void main(String[] args) throws Exception{Connection connection = RabbitUtils.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(MqConstant.MQ_WORK_QUEUES, false, false, false, null);channel.basicConsume(MqConstant.MQ_WORK_QUEUES, false, new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body);System.out.println("smsConsumer2-短信发送成功:"+message);channel.basicAck(envelope.getDeliveryTag(), false);}});}}

接收者3

package com.lansi.realtynavi.test.workQueues;import com.lansi.realtynavi.test.constant.MqConstant;
import com.lansi.realtynavi.test.utils.RabbitUtils;
import com.rabbitmq.client.*;import java.io.IOException;/*** @Description 描述* @Date 2021/3/23 11:41* @Created by huyao*/
public class Consumer3 {public static void main(String[] args) throws Exception{Connection connection = RabbitUtils.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(MqConstant.MQ_WORK_QUEUES, false, false, false, null);channel.basicConsume(MqConstant.MQ_WORK_QUEUES, false, new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body);System.out.println("smsConsumer1-短信发送成功:"+message);channel.basicAck(envelope.getDeliveryTag(), false);}});}}

启动三个接收类,启动发送类


三个接收都拿到了数据,我学习的时候队列是以轮询的方式给三个消费者发送数据,这里出现了接收数据不均衡的情况应该是缓存没用清理,给队列删掉重新创建就好了

三:发布订阅模式

生成者代码:

这里和前面两种模式不同,发送者绑定了交换机,没用绑定队列,需要消费者绑定交换机和队列

package com.lansi.realtynavi.test.publish;import com.lansi.realtynavi.test.constant.MqConstant;
import com.lansi.realtynavi.test.utils.RabbitUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;import java.util.Scanner;/*** @Description 发布订阅模式* @Date 2021/3/23 13:31* @Created by huyao*/
public class Producer {public static void main(String[] args) throws Exception{Connection connection = RabbitUtils.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(MqConstant.MQ_PUBLISH, false, false, false, null);String input = new Scanner(System.in).next();//第一个参数交换机名字,其他参数和之前一样channel.basicPublish(MqConstant.MQ_PUBLISH_JHJ, "", null, input.getBytes());channel.close();connection.close();}
}

接收者1代码:

package com.lansi.realtynavi.test.publish;import com.lansi.realtynavi.test.constant.MqConstant;
import com.lansi.realtynavi.test.utils.RabbitUtils;
import com.rabbitmq.client.*;import java.io.IOException;/*** @Description 消费者* @Date 2021/3/23 13:50* @Created by huyao*/
public class ConsumerXinLang {public static void main(String[] args) throws Exception{Connection connection = RabbitUtils.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(MqConstant.MQ_QUEUE_XINLANG, false, false, false, null);//队列绑定交换机//参数1:队列名,参数2:交换机名,参数3:路由key(目前用不到,路由模式通配符模式使用)channel.queueBind(MqConstant.MQ_QUEUE_XINLANG, MqConstant.MQ_PUBLISH_JHJ, "");channel.basicQos(1);channel.basicConsume(MqConstant.MQ_QUEUE_XINLANG, false, new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者新浪收到消息:"+new String(body));channel.basicAck(envelope.getDeliveryTag(), false);}});}}

接收者2代码:

package com.lansi.realtynavi.test.publish;import com.lansi.realtynavi.test.constant.MqConstant;
import com.lansi.realtynavi.test.utils.RabbitUtils;
import com.rabbitmq.client.*;import java.io.IOException;/*** @Description 消费者* @Date 2021/3/23 13:50* @Created by huyao*/
public class ConsumerBaiDu {public static void main(String[] args) throws Exception{Connection connection = RabbitUtils.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(MqConstant.MQ_QUEUE_BAIDU, false, false, false, null);//队列绑定交换机   目前交换机需要在rabbit也手动创建,在和spring整合的时候spring会自动帮我们创建//参数1:队列名,参数2:交换机名,参数3:路由key(目前用不到,路由模式通配符模式使用)channel.queueBind(MqConstant.MQ_QUEUE_BAIDU, MqConstant.MQ_PUBLISH_JHJ, "");channel.basicQos(1);channel.basicConsume(MqConstant.MQ_QUEUE_BAIDU, false, new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者百度收到消息:"+new String(body));channel.basicAck(envelope.getDeliveryTag(), false);}});}}

启动生产者消费者,在生产者控制台输入信息:



两个消费者都接收到了


四 路由模式

路由模式发送需要携带路由key,用作接收者进行判断

生产者代码:

package com.lansi.realtynavi.test.routing;import com.lansi.realtynavi.test.constant.MqConstant;
import com.lansi.realtynavi.test.utils.RabbitUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;/*** @Description 路由模式* @Date 2021/3/23 13:31* @Created by huyao*** 交换机类型:fanout广播(发布订阅)   direct转发(路由)  topic通配符(通配模式)**/
public class Producer {public static void main(String[] args) throws Exception{Connection connection = RabbitUtils.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(MqConstant.MQ_PUBLISH, false, false, false, null);LinkedHashMap<String, String> map = new LinkedHashMap<>();map.put("test1","测试一数据");map.put("test2","测试二数据");map.put("test3","测试三数据");map.put("test4","测试四数据");map.put("test5","测试五数据");map.put("test6","测试六数据");map.put("test7","测试七数据");Iterator<Map.Entry<String, String>> iterator = map.entrySet().iterator();while (iterator.hasNext()){Map.Entry<String, String> next = iterator.next();//第一个参数交换机名字,第二个参数指定rout_keychannel.basicPublish(MqConstant.MQ_ROUTING_JHJ, next.getKey(), null, next.getValue().getBytes());}channel.close();connection.close();}}

接收者1:

package com.lansi.realtynavi.test.routing;import com.lansi.realtynavi.test.constant.MqConstant;
import com.lansi.realtynavi.test.utils.RabbitUtils;
import com.rabbitmq.client.*;import java.io.IOException;/*** @Description 消费者* @Date 2021/3/23 13:50* @Created by huyao*/
public class ConsumerBaiDu {public static void main(String[] args) throws Exception{Connection connection = RabbitUtils.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(MqConstant.MQ_QUEUE_BAIDU, false, false, false, null);//队列绑定交换机   目前交换机需要在rabbit也手动创建,在和spring整合的时候spring会自动帮我们创建//参数1:队列名,参数2:交换机名,参数3:路由key(目前用不到,路由模式通配符模式使用)channel.queueBind(MqConstant.MQ_QUEUE_BAIDU, MqConstant.MQ_ROUTING_JHJ, "test1");channel.queueBind(MqConstant.MQ_QUEUE_BAIDU, MqConstant.MQ_ROUTING_JHJ, "test2");channel.basicQos(1);channel.basicConsume(MqConstant.MQ_QUEUE_BAIDU, false, new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者百度收到消息:"+new String(body));channel.basicAck(envelope.getDeliveryTag(), false);}});}}

接收者二

package com.lansi.realtynavi.test.routing;import com.lansi.realtynavi.test.constant.MqConstant;
import com.lansi.realtynavi.test.utils.RabbitUtils;
import com.rabbitmq.client.*;import java.io.IOException;/*** @Description 消费者* @Date 2021/3/23 13:50* @Created by huyao*/
public class ConsumerXinLang {public static void main(String[] args) throws Exception{Connection connection = RabbitUtils.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(MqConstant.MQ_QUEUE_XINLANG, false, false, false, null);//队列绑定交换机//参数1:队列名,参数2:交换机名,参数3:路由keychannel.queueBind(MqConstant.MQ_QUEUE_XINLANG, MqConstant.MQ_ROUTING_JHJ, "test10");channel.queueBind(MqConstant.MQ_QUEUE_XINLANG, MqConstant.MQ_ROUTING_JHJ, "test6");channel.queueBind(MqConstant.MQ_QUEUE_XINLANG, MqConstant.MQ_ROUTING_JHJ, "test5");channel.basicQos(1);channel.basicConsume(MqConstant.MQ_QUEUE_XINLANG, false, new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者新浪收到消息:"+new String(body));channel.basicAck(envelope.getDeliveryTag(), false);}});}}


在这里看到百度接收者只接受test1、test2,所以只接收到了1和2的数据,新浪同理

五 主题模式

 在路由的基础上增加了通配符匹配通配符规则  #可以匹配多个词  * 只能匹配一个词

生产者代码:

package com.lansi.realtynavi.test.topics;import com.lansi.realtynavi.test.constant.MqConstant;
import com.lansi.realtynavi.test.utils.RabbitUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;/*** @Description 通配符模式* @Date 2021/3/23 13:31* @Created by huyao*** 交换机类型:fanout广播(发布订阅)   direct转发(路由)  topic通配符(通配模式)**  通配符规则  #可以匹配多个词  * 只能匹配一个词*              test.#  test.one.tow  test.one.q.wqe  /  test.*   test.one test.two*/
public class Producer {public static void main(String[] args) throws Exception{Connection connection = RabbitUtils.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(MqConstant.MQ_TOPIC_JHJ, false, false, false, null);LinkedHashMap<String, String> map = new LinkedHashMap<>();map.put("test.one","测试一数据");map.put("test2.two.one","测试二数据");map.put("test.wqe","测试三数据");map.put("test4.com.hash.oqp","测试四数据");map.put("test5.com.code.oqp","测试五数据");map.put("test6.com.code.oqp","测试六数据");Iterator<Map.Entry<String, String>> iterator = map.entrySet().iterator();while (iterator.hasNext()){Map.Entry<String, String> next = iterator.next();//第一个参数交换机名字,第二个参数指定rout_keychannel.basicPublish(MqConstant.MQ_TOPIC_JHJ, next.getKey(), null, next.getValue().getBytes());}channel.close();connection.close();}}

接收者1代码:

package com.lansi.realtynavi.test.topics;import com.lansi.realtynavi.test.constant.MqConstant;
import com.lansi.realtynavi.test.utils.RabbitUtils;
import com.rabbitmq.client.*;import java.io.IOException;/*** @Description 消费者* @Date 2021/3/23 13:50* @Created by huyao*/
public class ConsumerBaiDu {public static void main(String[] args) throws Exception{Connection connection = RabbitUtils.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(MqConstant.MQ_QUEUE_BAIDU, false, false, false, null);//队列绑定交换机   目前交换机需要在rabbit也手动创建,在和spring整合的时候spring会自动帮我们创建//参数1:队列名,参数2:交换机名,参数3:路由key(目前用不到,路由模式通配符模式使用)channel.queueBind(MqConstant.MQ_QUEUE_BAIDU, MqConstant.MQ_TOPIC_JHJ, "*.*.*.oqp");channel.basicQos(1);channel.basicConsume(MqConstant.MQ_QUEUE_BAIDU, false, new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者百度收到消息:"+new String(body));channel.basicAck(envelope.getDeliveryTag(), false);}});}}

接收者2代码

package com.lansi.realtynavi.test.topics;import com.lansi.realtynavi.test.constant.MqConstant;
import com.lansi.realtynavi.test.utils.RabbitUtils;
import com.rabbitmq.client.*;import java.io.IOException;/*** @Description 消费者* @Date 2021/3/23 13:50* @Created by huyao*/
public class ConsumerXinLang {public static void main(String[] args) throws Exception{Connection connection = RabbitUtils.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(MqConstant.MQ_QUEUE_XINLANG, false, false, false, null);//队列绑定交换机//参数1:队列名,参数2:交换机名,参数3:路由keychannel.queueBind(MqConstant.MQ_QUEUE_XINLANG, MqConstant.MQ_TOPIC_JHJ, "test.#");channel.basicQos(1);channel.basicConsume(MqConstant.MQ_QUEUE_XINLANG, false, new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者新浪收到消息:"+new String(body));channel.basicAck(envelope.getDeliveryTag(), false);}});}}


最后就是springboot上整合rabbitmq

需要用到的依赖

     <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
然后配置rabbitmq连接
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=111111
#发送者开启confirm确认机制
spring.rabbitmq.publisher-confirms=true
#发送者开启return确认机制
spring.rabbitmq.publisher-returns=true#开启ackspring.rabbitmq.listener.type=simple
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.simple.default-requeue-rejected=false

接下来一个rabbitmq的配置

package com.lansi.realtynavi.config;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @Description mq的配置* @Date 2021/3/24 14:19* @Created by huyao*/
@Configuration
public class RabbitMqConfig {//定义交换机的名字public static final String EXCHANGE_NAME = "boot_topic_exchange";public static final String QUEUE_NAME = "boot_queue";//1.声明交换机@Bean("bootExchange")public Exchange bootExchange(){return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();}//2.声明队列@Bean("bootQueue")public Queue bootQueue(){return QueueBuilder.durable(QUEUE_NAME).build();}//3.绑定@Beanpublic Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue, @Qualifier("bootExchange") Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();}
}

接收者

package com.lansi.realtynavi.config;import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** @Description mq监听/消费者手动签收消息* @Date 2021/3/24 14:44* @Created by huyao**rabbitmq给了两种消息的可靠性  confirm和return**/
@Component
public class RabbitMqConsumer {//可监听分布式其他项目,只要mq连接的地址相同监听的队列名存在即可//消费者@RabbitListener(queues = "boot_queue")public void ListenerQueue(Message message, Channel channel) throws Exception{System.out.println("消费者接收到消息:"+new String(message.getBody()));try{//开始业务处理System.out.println("开始业务处理");//int i = 5/0;System.out.println("业务处理完成");//业务处理完成确认收到消息  , 第二个参数为true支持多消息channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);}catch (Exception e){System.out.println("业务处理异常");//业务异常,拒收消息,请求重发    参数三为true则重回队列发送channel.basicNack(message.getMessageProperties().getDeliveryTag(), true, true);}}}

这里的生产者我写的一个controller中的列子(错误示范,只能调用一次)

testTopic1 是测试mq的高级特性,这里只用到testTopic就可以

package com.lansi.realtynavi.rabbitmq;import com.lansi.realtynavi.config.RabbitMqConfig;
import com.lansi.realtynavi.dev.helloWord.HelloSender;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;/*** @Description 描述* @Date 2021/3/24 13:46* @Created by huyao*/
@RestController
@RequestMapping("api/rabbitMq")
public class RabbitMqController {@Autowiredprivate HelloSender helloSender;@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("helloWorld")public void hello(){helloSender.send();}@GetMapping("testTopic")public void testTopic(){rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_NAME,"boot.hhh", "topic的mq.......");}//mq的可靠性机制,必须要在配置文件中开启@GetMapping("testTopic1")public void testTopic1(){rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean b, String s) {System.out.println("confirm方法被执行了。。。");if(b){System.out.println("交换机确认成功!!");} else {System.out.println("交换机确认失败!!");}}});//设置交换机处理失败消息的模式,为true的时候,消息打到不了队列时,会将消息重新返回给生产者rabbitTemplate.setMandatory(true);rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {/*** @param message 消息对象* @param returnCode 错误码* @param returnText 错误信息* @param exchange 交换机* @param routingKey 路由键** */@Overridepublic void returnedMessage(Message message, int returnCode, String returnText, String exchange,String routingKey) {System.out.println("return被执行了。。。");System.out.println("message:"+new String(message.getBody()));System.out.println("错误码:"+returnCode);System.out.println("错误信息:"+returnText);System.out.println("交换机:"+exchange);System.out.println("路由键:"+routingKey);}});rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_NAME,"abc.boot.hhh", "topic的mq.......");}}

运行后掉对应的接口,消费者接收


这样rabbitmq就整合进springboot中了

rabbitMq工作模式特性及整合springboot相关推荐

  1. RabbitMQ初步到精通-第四章-RabbitMQ工作模式-Routing

    第四章-RabbitMQ工作模式-Routing 1.模式介绍 1.1 模式 路由模式-继续还是和Exchange打交道,上节提到的Exchange类型为Fanout,此次声明的类型为direct 与 ...

  2. 3.RabbitMQ工作模式介绍

    3.RabbitMQ工作模式介绍.md 文章目录 3.RabbitMQ工作模式介绍.md 1.简单模式 1.1总结 2.Work Queues 工作队列模式 2.1 模式说明 2.2 代码编写 2.3 ...

  3. rabbitmq消息队列入门到整合springboot(篇幅较长内容详细)

    1.安装rabbitmq服务器 我们选择在linux下安装 安装的前提需要在虚拟机下安装docker docker pull rabbitmq:management(拉去镜像) docker run ...

  4. 学成在线--13.RabbitMQ工作模式

    文章目录 一.Work queues 二.Publish/subscribe 1.工作模式 2.代码 1)生产者 2)消费者 3.测试 4.思考 三.Routing 1.工作模式 2.代码 1)生产者 ...

  5. RabbitMQ工作模式Publish/Subscribe发布订阅,test测试代码

    RabbitMQ有以下几种工作模式 : 1.Work queues  工作队列 2.Publish/Subscribe 发布订阅 3.Routing      路由 4.Topics        通 ...

  6. RabbitMQ工作模式

    RabbitMQ总共有六种工作模式: 1.工作队列模式 work queues 2.发布/订阅模式 Publish/Subscribe 3.路由模式 Routing 4.通配符模式 Topics 5. ...

  7. 消息中间件的应用场景与 RabbitMQ的六种工作模式介绍

    消息中间件的应用场景与 RabbitMQ的六种工作模式介绍 消息中间件应用场景 异步处理 应用解耦 流量削峰 RabbitMQ的六种工作模式 简单模式 工作模式 发布订阅模式 路由模式 主题模式 PR ...

  8. SpringAMQP整合RabbitMQ-五种工作模式Demo

    文章目录 一.MQ基本概念 二.RabbitMQ 三.Docker部署RabbitMQ 四.SpringAMQP 五.RabbitMQ工作模式 1.Basic Queue 简单模式 2.Work qu ...

  9. RabbitMQ的交换机类型和工作模式

    RabbitMQ的交换机类型有四种 1.direct 直流交换机: 根据消息的路由键routingkey,将消息以完全匹配的方式路由到指定的队列中. 这里的匹配指的是消息本身携带的路由键和队列与交换机 ...

最新文章

  1. 巴西CC成为FreeStor全球首个用户
  2. iptables的conntrack表满了导致访问网站很慢
  3. 呼叫中心如何规划好工作习惯
  4. 我们为什么要接受教育
  5. 服务器版的mysql怎么装_WIN7服务器配置之MySQL数据库安装图解(适用于5.1,5.5的版本)...
  6. 静态路由(原理+实验)
  7. java is a_java中 is - a和 has - a的区别
  8. 从迁移到Java 7的小技巧
  9. MySQL 调优基础(三) Linux文件系统
  10. Linux samba的配置和使用
  11. 传输层两大协议:TCP与UDP详解(两者的联系与区别)
  12. python调用所有函数_python 调用函数
  13. mysql取消外键限制_mysql怎么取消外键限制(约束)?
  14. oracle全文检索 分区表,oracle全文检索
  15. lisp用entmake生产圆柱体_铝型材挤压生产金属流动与模具分析
  16. 帮助你在移动设备上生成倾斜控制(重力控制)的旋转效果jQuery插件 - lenticular.js...
  17. java+@api_java 常用API
  18. linux系统外接硬盘_如何使用外部硬盘安装linux系统?
  19. 知行EDI系统-入门篇
  20. 基于QT+ffmpeg+SDL2的流媒体播放器

热门文章

  1. Android大话设计模式 第三章----开放封闭原则---孙悟空任弼马温一职
  2. 为什么你得不到 90 度的温暖
  3. 1504.ICCVPartial Person Re-Identification 论文笔记
  4. arduinopn532模块_NFC开发板/nfc芯片标签/PN532开发板/RFID读卡器/NFC模块/Arduino
  5. LTE网络中UU与X2接口研究
  6. c语言程序设计课程总结600字,2019年春季学期《C语言程序设计II》课程总结
  7. java中mapper层作用_Java的MyBatis框架中Mapper映射配置的使用及原理解析
  8. FFmpeg 时间单位与转换
  9. linux(ubuntu)查看硬件设备命令
  10. 科技将把我们带向哪里