RabbitMQ消息中间件(二) RabbitMQ如何保证消息的可靠性投递
RabbitMQ如何保证消息投递的准确性?
生产端的可靠性投递:
1.保证消息成功发送
2.保证MQ节点成功接收
3.发送端收到MQ节点(Broker)确认应答
4.完善的消息补偿机制
BAT等大厂解决方案:
1.消息落库,对消息状态进行打标(数据信息和消息信息落库)
2.消息的延迟投递,做二次确认,回调检查( 保证MQ在高并发的场景下)
幂等性:
幂等性是什么?
https://blog.csdn.net/miachen520/article/details/91039661
借鉴数据库的乐观锁机制:在执行更新库存SQL语句时:UPDATE DB SET COUNT = COUNT - 1,VERSION = VERSION + 1 WHERE VERSION = 1
消费端-幂等性保障(在海量订单产生的业务高峰期,如何避免消息的重复消费问题):
消费端实现幂等,就意味着,消息永远不会消费多次,即时我们收到了多条一样的消息
业界主流的幂等性操作:
1.唯一性ID+指纹码 机制
*唯一ID+指纹码机制,利用数据库主键去重
*SELECT COUNT(1) FROM T_ORDER WHERE ID = 唯一ID + 指纹码
*好处:实现简单
*坏处:高并发下有数据库写入的性能瓶颈
*解决方案:跟进ID进行库分表进行算法路由
2.利用Redis的原子性去实现
Confirm确认消息:
理解Confirm消息确认机制:
消息的确认,是指生产者投递消息后,如果Broker收到消息,则会给我们生产者一个应答,生产者进行接收应答,用来确定这条消息是否正常的发送到Broker,这种方式也是消息的可靠性投递的核心保障!
如何实现Confirm确认消息?
1.在channel上开启确认模式:channel.confirmSelect()
2.在channel上添加监听:addConfirmListener,监听成功和失败的返回结果,根据具体的结果对消息进行重新发送,或记录日志等后续处理。
附源码:
消费端:ConfirmListenerConsumer.java
package com.xuy.rabbitmq.confirmListener;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author aric* @create 2021-03-02-11:24* @fun*/
public class Consumer {public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("localhost");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();//指定消息投递模式,消息的确认模式channel.confirmSelect();String exchangeName = "confirm_exchange";String routingKey = "confirm.save";String queueName = "confirm_queue";//声明交换机和队列,然后进行绑定设置,最后指定路由keychannel.exchangeDeclare(exchangeName,"topic",true);channel.queueDeclare(queueName,true,false,false,null);channel.queueBind(queueName,exchangeName,routingKey);//创建消费者QueueingConsumer queueingConsumer = new QueueingConsumer(channel);channel.basicConsume(queueName,true,queueingConsumer);while(true){QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();String msg = new String(delivery.getBody());System.out.println(msg);}}
}
生产端:ConfirmListenerProducer.java
package com.xuy.rabbitmq.confirmListener;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author aric* @create 2021-03-02-11:23* @fun** 添加确认回传机制*/
public class Producer {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("localhost");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();//指定消息投递模式,消息的确认模式channel.confirmSelect();String exchangeName = "confirm_exchange";String routingKey = "confirm.save";//发送消息String msg = "Hello World";channel.basicPublish(exchangeName,routingKey,null,msg.getBytes());//添加一个确认监听channel.addConfirmListener(new ConfirmListener() {@Override//返回成功ACKpublic void handleAck(long deliveryTag, boolean multiple) throws IOException {System.err.println("----no ack!");}@Override//返回失败ACKpublic void handleNack(long deliveryTag, boolean multiple) throws IOException {System.err.println("----ack!");}});}
}
Retrun消息机制:
Return Listener用于处理一些不可路由的消息,消息生产者通过指定一个Exchange和RoutingKey,把消息送达到某一个队列中去,然后消费者监听队列,进行消费处理操作。
但是在某些情况下,如果我们子啊发送消息的时候,当前的exchange不存在或者指定的路由key路由不到,这个时候如果我们要监听这种不可达的消息,就要使用Return Listener
在基础API中有一个关键的我配置项:Mandatory:如果为true,则监听器会接收到路由不可达的消息,然后进行后续处理,如果为false,那么broker端自动删除该消息。
附源码:
消费端:ReturnListenerConsumer.java
package com.xuy.rabbitmq.returnListener;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author aric* @create 2021-03-02-14:44* @fun*/
public class Consumer {public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("localhost");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();String exchangeName = "return_exchange";String routingKey = "return.save";String queueName = "return_queue";channel.exchangeDeclare(exchangeName,"topic",true,false,null);channel.queueDeclare(queueName,true,false,false,null);channel.queueBind(queueName,exchangeName,routingKey);QueueingConsumer queueingConsumer = new QueueingConsumer(channel);channel.basicConsume(queueName,true,queueingConsumer);while(true){QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();String msg = new String(delivery.getBody());System.out.println(msg);}}
}
生产端:ReturnListenerProducer.java
package com.xuy.rabbitmq.returnListener;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author aric* @create 2021-03-02-14:44* @fun** basicPublish中参数Mandatory测试*/
public class Producer {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("localhost");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();String exchange = "return_exchange";String routingKey = "return.save";String routingKeyError = "abc.save";String msg = "hello World";channel.addReturnListener(new ReturnListener() {@Overridepublic void handleReturn(int replyCode,String replyText,String exchange,String routingKey,AMQP.BasicProperties Properties,byte[] bytes) throws IOException {System.err.println("----handle return");System.err.println(replyCode);System.err.println(replyText);System.err.println(exchange);System.err.println(routingKey);System.err.println(Properties);System.err.println(new String(bytes));}});channel.basicPublish(exchange,routingKey,true,null,msg.getBytes());channel.basicPublish(exchange,routingKeyError,true,null,msg.getBytes());}
}
消费端自定义监听:
之前在代码中编写while循环,进行consumer.nextDelivery方法进行获取下一条消息,然后进行消费处理。但是使用自定义的Consumer更加的方便,解耦性更加的强。
附源码:
自定义Consumer:MyConsumer.java
package com.xuy.rabbitmq.consumerBySelf;import com.rabbitmq.client.*;
import com.rabbitmq.client.Consumer;import java.io.IOException;/*** @author aric* @create 2021-03-02-16:35* @fun*/
public class MyConsumer extends DefaultConsumer {public MyConsumer(Channel channel) {super(channel);}@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] bytes) throws IOException {System.err.println("------consumer message-------");System.err.println(consumerTag);System.err.println(envelope);System.err.println(properties);System.err.println(new String(bytes));}
}
消费端:ConsumerBySelfConsumer.java
package com.xuy.rabbitmq.consumerBySelf;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author aric* @create 2021-03-02-16:29* @fun** 使用自定义的Consumer*/
public class Consumer {public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("localhost");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();String exchangeName = "consumer_exchange";String routingKey = "consumer.save";String queueName = "consumer_queue";channel.exchangeDeclare(exchangeName,"topic",true,false,null);channel.queueDeclare(queueName,true,false,false,null);channel.queueBind(queueName,exchangeName,routingKey);channel.basicConsume(queueName,true,new MyConsumer(channel));}
}
生产端:ConsumerBySelfProducer.java
package com.xuy.rabbitmq.consumerBySelf;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author aric* @create 2021-03-02-16:30* @fun*/
public class Producer {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("localhost");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();String exchange = "consumer_exchange";String routingKey = "consumer.save";String msg = "hello World";for(int i = 0 ; i < 5;i++) {channel.basicPublish(exchange, routingKey, true, null, msg.getBytes());}}
}
消费端限流:
什么是消费端限流?
假设RabbitMQ服务器有上万条未处理的消息,随便打开一个消费者客户端,会出现下面情况:巨量的消息瞬间全部推送过来,但是单个客户端无法同时处理这么多数据。
解决:
RabbitMQ提供了一种qos(服务质量保证)功能,即在非自动确认消息的前提下,如果一定数目的消息(通过基于consumer或者channel设置Qos的值)违背确认前,不进行消费新的消息。
void BasicQos(uint prefetchSize,ushort prefetchCount,bool global);
prefetchSize:0消费消息大小限制
prefetchCount:告诉RabbitMQ不要同时给一个消费者推送多于N个消息,即一旦有N个消息还没有ack,则该consumer将block掉,直到有消息ack
global:true/false是否将上面设置应用于channel或consumer
注意:prefetchSize和global这两项,rabbitMQ没有实现,暂且不研究prefetch_count在no_ask=false的情况下生效,即在自动应答的情况下这两个值是不生效的。
附源码:
自定义消费者模式:MyConsumerByLimit.java
package com.xuy.rabbitmq.consumerLimit;import com.rabbitmq.client.*;import java.io.IOException;/*** @author aric* @create 2021-03-02-17:22* @fun*/
public class MyConsumerByLimit extends DefaultConsumer {private Channel channel;public MyConsumerByLimit(Channel channel) {super(channel);}@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] bytes) throws IOException {System.err.println("------consumer message-------");System.err.println(consumerTag);System.err.println(envelope);System.err.println(properties);System.err.println(new String(bytes));channel.basicAck(envelope.getDeliveryTag(),false);}
}
消费端:ConsumerBySelfConsumer.java
package com.xuy.rabbitmq.consumerLimit;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author aric* @create 2021-03-02-17:00* @fun*/
public class Consumer {public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("localhost");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();String exchangeName = "qos_exchange";String routingKey = "qos.save";String queueName = "qos_queue";channel.exchangeDeclare(exchangeName,"topic",true,false,null);channel.queueDeclare(queueName,true,false,false,null);channel.queueBind(queueName,exchangeName,routingKey);channel.basicQos(0,1,false);//1.限流方式,第一件事情就是AutoAck设置为falsechannel.basicConsume(queueName,false,new MyConsumerByLimit(channel));}
}
生产端:ConsumerBySelfProducer.java
package com.xuy.rabbitmq.consumerLimit;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author aric* @create 2021-03-02-17:00* @fun*/
public class Producer {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("localhost");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();String exchange = "qos_exchange";String routingKey = "qos.save";String msg = "hello World";for(int i = 0 ; i < 5;i++) {channel.basicPublish(exchange, routingKey, true, null, msg.getBytes());}}
}
消费端ACK与重回队列:
消费端的手工ACK和NACK
1.消费端进行消费的时候,如果由于业务异常我们可以进行日志的记录,然后进行补偿!
2.如果由于服务器宕机等严重问题,那我们就需要手工进行ACK保障消费端消费成功!
消费端的重回队列
1.消费端重回队列是为了对没有处理成功的消息,把消息重新回递给Broker队列中!
2.一般我们在实际中都会关闭重回队列,也就是设置为False。
附源码:
自定义消费者模式:MyConsumerByACK.java
package com.xuy.rabbitmq.ack;import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;import java.io.IOException;/*** @author aric* @create 2021-03-02-17:36* @fun** 如果是第0条消息,不签收,重回队列**/
public class MyConsumerByAck extends DefaultConsumer {private Channel channel;public MyConsumerByAck(Channel channel) {super(channel);this.channel = channel;}@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] bytes) throws IOException {System.err.println("--------consumer message");System.err.println(new String(bytes));if((Integer)properties.getHeaders().get("num") == 0){channel.basicNack(envelope.getDeliveryTag(),false,true);}else{channel.basicAck(envelope.getDeliveryTag(),false);}}
}
消费端:ConsumerByACKConsumer.java
package com.xuy.rabbitmq.ack;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.xuy.rabbitmq.consumerLimit.MyConsumerByLimit;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author aric* @create 2021-03-02-17:35* @fun*/
public class Consumer {public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("localhost");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();String exchangeName = "ack_exchange";String routingKey = "ack.save";String queueName = "ack_queue";channel.exchangeDeclare(exchangeName,"topic",true,false,null);channel.queueDeclare(queueName,true,false,false,null);channel.queueBind(queueName,exchangeName,routingKey);//手工签收必须要关闭 AutoACK=falsechannel.basicConsume(queueName,false,new MyConsumerByAck(channel));}
}
生产端:ConsumerByACKProducer.java
package com.xuy.rabbitmq.consumerLimit;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author aric* @create 2021-03-02-17:00* @fun*/
public class Producer {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("localhost");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();String exchange = "qos_exchange";String routingKey = "qos.save";String msg = "hello World";for(int i = 0 ; i < 5;i++) {channel.basicPublish(exchange, routingKey, true, null, msg.getBytes());}}
}
TTL队列/消息:
TTL:TTL是Time To Live的缩写,也就是生存时间,RabbitMQ支持消息的过期时间,在消息发送时可以进行指定,RabbitMQ支持队列的过期时间,从消息入队列开始计算,只要超过了队列的超时时间配置,那么消息会自动的清除
注意:消息体中Producer的properties的expiration属性也可设置过期时间
死信队列:
死信队列:
DLX(Dead-Letter-Exchange):利用DLX,当消息在一个队列中变成死信(dead message)之后,它能被重新publish到另一个Exchange,这个Exchange就是DLX。
DLX也是一个正常的Exchange,和一般的Exchange没有区别,他能在任何的队列上被指定,实际上就是设置某个队列的属性。
当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列。
可以监听这个队列中消息做相应的处理,这个特性可以弥补RabbitMQ3.0以前支持的immediate参数的功能。
消息变为死信有以下机种情况:
1.消息被拒绝(basic.reject/basic.nack)并且requeue=false
2.消息TTL过期
3.队列达到最大长度
死信队列设置:
1.首先需要设置死信队列的exchange和queue,然后进行绑定:
Exchange:dlx.exchange
Queue:dlx.queue
RoutingKey:#
2.然后进行正常声明交换机,队列,绑定,只不过需要在队列加上一个参数即可:arguments.put("x-dead-letter-exchange","dlx.exchange"); 这样消息在过期,requeue,队列在达到最大长度时,消息就可以i直接路由到死信队列!
附源码:
消费端:DlxConsumer.java
package com.xuy.rabbitmq.dlx;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.xuy.rabbitmq.ack.MyConsumerByAck;import java.io.IOException;
import java.util.HashMap;
import java.util.concurrent.TimeoutException;/*** @author aric* @create 2021-03-02-18:35* @fun*/
public class Consumer {public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("localhost");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();//这就是一个普通的交换机和队列以及路由String exchangeName = "dlx_exchange";String routingKey = "dlx.save";String queueName = "dlx_queue";channel.exchangeDeclare(exchangeName,"topic",true,false,null);HashMap<String, Object> agruments = new HashMap<>();agruments.put("x-dead-letter-exchange","dlx.exchange");//这个agruments属性,要设置到声明队列上channel.queueDeclare(queueName,true,false,false,agruments);channel.queueBind(queueName,exchangeName,routingKey);//要进行死信队列的声明channel.exchangeDeclare("dlx.exchange","topic",true,false,null);channel.queueDeclare("dlx.queue",true,false,false,null);channel.queueBind("dlx.queue","dlx.exchange","#");channel.basicConsume(queueName,true,new MyConsumerByAck(channel));}
}
生产端:DlxProducer.java
package com.xuy.rabbitmq.dlx;import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author aric* @create 2021-03-02-18:35* @fun** expiration : 10s过期 进入死信队列*/
public class Producer {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("localhost");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();String exchange = "dlx_exchange";String routingKey = "dlx.save";String msg = "hello World";for(int i = 0 ; i < 5;i++) {AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().deliveryMode(2).contentEncoding("UTF-8").expiration("10000").build();channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());}}
}
RabbitMQ消息中间件(二) RabbitMQ如何保证消息的可靠性投递相关推荐
- Rabbitmq专题:rabbitMQ如何保证消息的可靠性投递?如何防止消息丢失
文章目录 1. 消息可能出现丢失的情况 2. 生产者如何保证消息的可靠性投递 2.1 消息落库打标 + confirm机制 2.2 消息幂等性如何保证? 2.3 延时消息确认 3. rabbitMQ服 ...
- RabbitMQ(十):RabbitMQ 如何保证消息的可靠性
一条消费成功被消费经历了生产者->MQ->消费者,因此在这三个步骤中都有可能造成消息丢失. 一 消息生产者没有把消息成功发送到MQ 1.1 事务机制 AMQP协议提供了事务机制,在投递消息 ...
- RabbitMQ 如何保证消息的可靠性
一条消费成功被消费经历了生产者->MQ->消费者,因此在这三个步骤中都有可能造成消息丢失. 一 消息生产者没有把消息成功发送到MQ 1.1 事务机制 AMQP协议提供了事务机制,在投递消息 ...
- RabbitMQ如何保证消息的可靠性
在了解RabbitMQ消息可靠性之前,先来了解一下RabbitMQ整个消息投递的路径: producer --> exchange --> queue --> consumer Ra ...
- 【RabbitMQ】消息的可靠性投递与签收
消息的可靠性投递--Porducer Confirm 确认模式 发生在从Producer发送到Exchange时,发送成功/失败都会自动调用ConfirmCallBack回调方法. 步骤: (1)开启 ...
- 消息队列面试 - 如何保证消息的可靠性传输?
消息队列面试 - 如何保证消息的可靠性传输? 面试题 如何保证消息的可靠性传输?或者说,如何处理消息丢失的问题? 面试官心理分析 这个是肯定的,用 MQ 有个基本原则,就是数据不能多一条,也不能少一条 ...
- IM系统中如何保证消息的可靠投递(即QoS机制)(转)
消息的可靠性,即消息的不丢失和不重复,是im系统中的一个难点.当初qq在技术上(当时叫oicq)因为以下两点原因才打败了icq: 1)qq的消息投递可靠(消息不丢失,不重复) 2)qq的垃圾消息少(它 ...
- IM即时通讯-N-如何保证消息的可靠性展示
结论先行 客户端如何在推拉结合的模式下保证消息的可靠性展示? 原则: server拉取的消息一定是连续的 原则: 端侧记录的消息的连续段有两个作用: 1. 记录消息的连续性, 即起始中间没有断层, 2 ...
- rabbitmq接收不到消息_分布式消息队列:如何保证消息的可靠性传输
rabbitmq (1)生产者弄丢了数据 生产者将数据发送到rabbitmq的时候,可能数据就在半路给搞丢了,因为网络啥的问题,都有可能. 此时可以选择用rabbitmq提供的事务功能,就是生产者发送 ...
最新文章
- Apache的服务端包含--SSI
- 震惊!ConcurrentHashMap里面也有死循环,作者留下的“彩蛋”了解一下?
- 内核管理实战之虚地址转换为物理地址
- gradle 编译java_Java的Gradle依赖关系,使用编译还是实现?
- 返回一个整数数组中最大子数组的和。
- win安装appium
- 全国中小学信息技术创新与实践大赛:软件创意编程赛道
- NodeJS + WebStorm 中文显示乱码
- android判断是华为手机,华为手机怎么辨别真假?华为手机真伪验证多种方法
- 【电脑讲解】压缩包的使用技巧
- 朗格Lange 1古董车展特别版表落谁家?法拉利 335 Sport非常合衬
- coreseek分词
- System.BadImageFormatException: 试图加载格式不正确的程序。
- java、mysql、tomcat、maven、云环境配置、VUE
- HTML绘制七巧板,canvas绘制七巧板
- 浅谈企业文化的重要性,搭档之家有话说
- 为什么机器人不会抢走你的工作?
- LambdaStream
- Java 遍历数组的常见方法
- 怎么看伦敦银实时行情走势图?
热门文章
- 关于实数理论的探讨(一)
- PostgreSQL锁机制
- 微信小程序 Hi衣橱 衣橱管理小程序
- 贵金属入门,有以下四个方法
- better comment and better habit
- 微信域名检测的最新知识
- 12306抢票爬虫selenium+Chromedriver(需手动完成支付)
- c语言 有符号数与0作比较大小,C语言中有符号数与无符号数能否进行比较运算...
- 程序包无效:“CRX_HEADER_INVALID“的解决方法(最全最细)
- 榛子树搜索算法(Hazelnut tree search algorithm,HTS)——一种高效的优化算法