RabbitMQ如何保证消息投递的准确性?

生产端的可靠性投递:

1.保证消息成功发送

2.保证MQ节点成功接收

3.发送端收到MQ节点(Broker)确认应答

4.完善的消息补偿机制

BAT等大厂解决方案:

1.消息落库,对消息状态进行打标(数据信息和消息信息落库)

2.消息的延迟投递,做二次确认,回调检查(  保证MQ在高并发的场景下)

幂等性:

幂等性是什么?

https://blog.csdn.net/miachen520/article/details/91039661

借鉴数据库的乐观锁机制:在执行更新库存SQL语句时:UPDATE DB SET COUNT = COUNT - 1,VERSION = VERSION + 1 WHERE VERSION = 1

消费端-幂等性保障(在海量订单产生的业务高峰期,如何避免消息的重复消费问题):

消费端实现幂等,就意味着,消息永远不会消费多次,即时我们收到了多条一样的消息

业界主流的幂等性操作:

1.唯一性ID+指纹码 机制

*唯一ID+指纹码机制,利用数据库主键去重

*SELECT COUNT(1) FROM T_ORDER WHERE ID = 唯一ID + 指纹码

*好处:实现简单

*坏处:高并发下有数据库写入的性能瓶颈

*解决方案:跟进ID进行库分表进行算法路由

2.利用Redis的原子性去实现

Confirm确认消息:

理解Confirm消息确认机制:

消息的确认,是指生产者投递消息后,如果Broker收到消息,则会给我们生产者一个应答,生产者进行接收应答,用来确定这条消息是否正常的发送到Broker,这种方式也是消息的可靠性投递的核心保障!

如何实现Confirm确认消息?

1.在channel上开启确认模式:channel.confirmSelect()

2.在channel上添加监听:addConfirmListener,监听成功和失败的返回结果,根据具体的结果对消息进行重新发送,或记录日志等后续处理。

附源码:

消费端:ConfirmListenerConsumer.java

package com.xuy.rabbitmq.confirmListener;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author aric* @create 2021-03-02-11:24* @fun*/
public class Consumer {public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("localhost");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();//指定消息投递模式,消息的确认模式channel.confirmSelect();String exchangeName = "confirm_exchange";String routingKey = "confirm.save";String queueName = "confirm_queue";//声明交换机和队列,然后进行绑定设置,最后指定路由keychannel.exchangeDeclare(exchangeName,"topic",true);channel.queueDeclare(queueName,true,false,false,null);channel.queueBind(queueName,exchangeName,routingKey);//创建消费者QueueingConsumer queueingConsumer = new QueueingConsumer(channel);channel.basicConsume(queueName,true,queueingConsumer);while(true){QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();String msg = new String(delivery.getBody());System.out.println(msg);}}
}

生产端:ConfirmListenerProducer.java

package com.xuy.rabbitmq.confirmListener;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author aric* @create 2021-03-02-11:23* @fun** 添加确认回传机制*/
public class Producer {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("localhost");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();//指定消息投递模式,消息的确认模式channel.confirmSelect();String exchangeName = "confirm_exchange";String routingKey = "confirm.save";//发送消息String msg = "Hello World";channel.basicPublish(exchangeName,routingKey,null,msg.getBytes());//添加一个确认监听channel.addConfirmListener(new ConfirmListener() {@Override//返回成功ACKpublic void handleAck(long deliveryTag, boolean multiple) throws IOException {System.err.println("----no ack!");}@Override//返回失败ACKpublic void handleNack(long deliveryTag, boolean multiple) throws IOException {System.err.println("----ack!");}});}
}

Retrun消息机制:

Return Listener用于处理一些不可路由的消息,消息生产者通过指定一个Exchange和RoutingKey,把消息送达到某一个队列中去,然后消费者监听队列,进行消费处理操作。

但是在某些情况下,如果我们子啊发送消息的时候,当前的exchange不存在或者指定的路由key路由不到,这个时候如果我们要监听这种不可达的消息,就要使用Return Listener

在基础API中有一个关键的我配置项:Mandatory:如果为true,则监听器会接收到路由不可达的消息,然后进行后续处理,如果为false,那么broker端自动删除该消息。

附源码:

消费端:ReturnListenerConsumer.java

package com.xuy.rabbitmq.returnListener;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author aric* @create 2021-03-02-14:44* @fun*/
public class Consumer {public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("localhost");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();String exchangeName = "return_exchange";String routingKey = "return.save";String queueName = "return_queue";channel.exchangeDeclare(exchangeName,"topic",true,false,null);channel.queueDeclare(queueName,true,false,false,null);channel.queueBind(queueName,exchangeName,routingKey);QueueingConsumer queueingConsumer = new QueueingConsumer(channel);channel.basicConsume(queueName,true,queueingConsumer);while(true){QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();String msg = new String(delivery.getBody());System.out.println(msg);}}
}

生产端:ReturnListenerProducer.java

package com.xuy.rabbitmq.returnListener;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author aric* @create 2021-03-02-14:44* @fun** basicPublish中参数Mandatory测试*/
public class Producer {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("localhost");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();String exchange = "return_exchange";String routingKey = "return.save";String routingKeyError = "abc.save";String msg = "hello World";channel.addReturnListener(new ReturnListener() {@Overridepublic void handleReturn(int replyCode,String replyText,String exchange,String routingKey,AMQP.BasicProperties Properties,byte[] bytes) throws IOException {System.err.println("----handle return");System.err.println(replyCode);System.err.println(replyText);System.err.println(exchange);System.err.println(routingKey);System.err.println(Properties);System.err.println(new String(bytes));}});channel.basicPublish(exchange,routingKey,true,null,msg.getBytes());channel.basicPublish(exchange,routingKeyError,true,null,msg.getBytes());}
}

消费端自定义监听:

之前在代码中编写while循环,进行consumer.nextDelivery方法进行获取下一条消息,然后进行消费处理。但是使用自定义的Consumer更加的方便,解耦性更加的强。

附源码:

自定义Consumer:MyConsumer.java

package com.xuy.rabbitmq.consumerBySelf;import com.rabbitmq.client.*;
import com.rabbitmq.client.Consumer;import java.io.IOException;/*** @author aric* @create 2021-03-02-16:35* @fun*/
public class MyConsumer extends DefaultConsumer {public MyConsumer(Channel channel) {super(channel);}@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] bytes) throws IOException {System.err.println("------consumer message-------");System.err.println(consumerTag);System.err.println(envelope);System.err.println(properties);System.err.println(new String(bytes));}
}

消费端:ConsumerBySelfConsumer.java

package com.xuy.rabbitmq.consumerBySelf;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author aric* @create 2021-03-02-16:29* @fun** 使用自定义的Consumer*/
public class Consumer {public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("localhost");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();String exchangeName = "consumer_exchange";String routingKey = "consumer.save";String queueName = "consumer_queue";channel.exchangeDeclare(exchangeName,"topic",true,false,null);channel.queueDeclare(queueName,true,false,false,null);channel.queueBind(queueName,exchangeName,routingKey);channel.basicConsume(queueName,true,new MyConsumer(channel));}
}

生产端:ConsumerBySelfProducer.java

package com.xuy.rabbitmq.consumerBySelf;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author aric* @create 2021-03-02-16:30* @fun*/
public class Producer {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("localhost");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();String exchange = "consumer_exchange";String routingKey = "consumer.save";String msg = "hello World";for(int i = 0 ; i < 5;i++) {channel.basicPublish(exchange, routingKey, true, null, msg.getBytes());}}
}

消费端限流:

什么是消费端限流?

假设RabbitMQ服务器有上万条未处理的消息,随便打开一个消费者客户端,会出现下面情况:巨量的消息瞬间全部推送过来,但是单个客户端无法同时处理这么多数据。

解决:

RabbitMQ提供了一种qos(服务质量保证)功能,即在非自动确认消息的前提下,如果一定数目的消息(通过基于consumer或者channel设置Qos的值)违背确认前,不进行消费新的消息。

void BasicQos(uint prefetchSize,ushort prefetchCount,bool global);

prefetchSize:0消费消息大小限制

prefetchCount:告诉RabbitMQ不要同时给一个消费者推送多于N个消息,即一旦有N个消息还没有ack,则该consumer将block掉,直到有消息ack

global:true/false是否将上面设置应用于channel或consumer

注意:prefetchSize和global这两项,rabbitMQ没有实现,暂且不研究prefetch_count在no_ask=false的情况下生效,即在自动应答的情况下这两个值是不生效的。

附源码:

自定义消费者模式:MyConsumerByLimit.java

package com.xuy.rabbitmq.consumerLimit;import com.rabbitmq.client.*;import java.io.IOException;/*** @author aric* @create 2021-03-02-17:22* @fun*/
public class MyConsumerByLimit extends DefaultConsumer {private Channel channel;public MyConsumerByLimit(Channel channel) {super(channel);}@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] bytes) throws IOException {System.err.println("------consumer message-------");System.err.println(consumerTag);System.err.println(envelope);System.err.println(properties);System.err.println(new String(bytes));channel.basicAck(envelope.getDeliveryTag(),false);}
}

消费端:ConsumerBySelfConsumer.java

package com.xuy.rabbitmq.consumerLimit;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author aric* @create 2021-03-02-17:00* @fun*/
public class Consumer {public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("localhost");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();String exchangeName = "qos_exchange";String routingKey = "qos.save";String queueName = "qos_queue";channel.exchangeDeclare(exchangeName,"topic",true,false,null);channel.queueDeclare(queueName,true,false,false,null);channel.queueBind(queueName,exchangeName,routingKey);channel.basicQos(0,1,false);//1.限流方式,第一件事情就是AutoAck设置为falsechannel.basicConsume(queueName,false,new MyConsumerByLimit(channel));}
}

生产端:ConsumerBySelfProducer.java

package com.xuy.rabbitmq.consumerLimit;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author aric* @create 2021-03-02-17:00* @fun*/
public class Producer {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("localhost");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();String exchange = "qos_exchange";String routingKey = "qos.save";String msg = "hello World";for(int i = 0 ; i < 5;i++) {channel.basicPublish(exchange, routingKey, true, null, msg.getBytes());}}
}

消费端ACK与重回队列:

消费端的手工ACK和NACK

1.消费端进行消费的时候,如果由于业务异常我们可以进行日志的记录,然后进行补偿!

2.如果由于服务器宕机等严重问题,那我们就需要手工进行ACK保障消费端消费成功!

消费端的重回队列

1.消费端重回队列是为了对没有处理成功的消息,把消息重新回递给Broker队列中!

2.一般我们在实际中都会关闭重回队列,也就是设置为False。

附源码:

自定义消费者模式:MyConsumerByACK.java

package com.xuy.rabbitmq.ack;import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;import java.io.IOException;/*** @author aric* @create 2021-03-02-17:36* @fun** 如果是第0条消息,不签收,重回队列**/
public class MyConsumerByAck extends DefaultConsumer {private Channel channel;public MyConsumerByAck(Channel channel) {super(channel);this.channel = channel;}@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] bytes) throws IOException {System.err.println("--------consumer message");System.err.println(new String(bytes));if((Integer)properties.getHeaders().get("num") == 0){channel.basicNack(envelope.getDeliveryTag(),false,true);}else{channel.basicAck(envelope.getDeliveryTag(),false);}}
}

消费端:ConsumerByACKConsumer.java

package com.xuy.rabbitmq.ack;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.xuy.rabbitmq.consumerLimit.MyConsumerByLimit;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author aric* @create 2021-03-02-17:35* @fun*/
public class Consumer {public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("localhost");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();String exchangeName = "ack_exchange";String routingKey = "ack.save";String queueName = "ack_queue";channel.exchangeDeclare(exchangeName,"topic",true,false,null);channel.queueDeclare(queueName,true,false,false,null);channel.queueBind(queueName,exchangeName,routingKey);//手工签收必须要关闭  AutoACK=falsechannel.basicConsume(queueName,false,new MyConsumerByAck(channel));}
}

生产端:ConsumerByACKProducer.java

package com.xuy.rabbitmq.consumerLimit;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author aric* @create 2021-03-02-17:00* @fun*/
public class Producer {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("localhost");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();String exchange = "qos_exchange";String routingKey = "qos.save";String msg = "hello World";for(int i = 0 ; i < 5;i++) {channel.basicPublish(exchange, routingKey, true, null, msg.getBytes());}}
}

TTL队列/消息:

TTL:TTL是Time To Live的缩写,也就是生存时间,RabbitMQ支持消息的过期时间,在消息发送时可以进行指定,RabbitMQ支持队列的过期时间,从消息入队列开始计算,只要超过了队列的超时时间配置,那么消息会自动的清除

注意:消息体中Producer的properties的expiration属性也可设置过期时间

死信队列:

死信队列:

DLX(Dead-Letter-Exchange):利用DLX,当消息在一个队列中变成死信(dead message)之后,它能被重新publish到另一个Exchange,这个Exchange就是DLX。

DLX也是一个正常的Exchange,和一般的Exchange没有区别,他能在任何的队列上被指定,实际上就是设置某个队列的属性。

当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列。

可以监听这个队列中消息做相应的处理,这个特性可以弥补RabbitMQ3.0以前支持的immediate参数的功能。

消息变为死信有以下机种情况:

1.消息被拒绝(basic.reject/basic.nack)并且requeue=false

2.消息TTL过期

3.队列达到最大长度

死信队列设置:

1.首先需要设置死信队列的exchange和queue,然后进行绑定:

Exchange:dlx.exchange

Queue:dlx.queue

RoutingKey:#

2.然后进行正常声明交换机,队列,绑定,只不过需要在队列加上一个参数即可:arguments.put("x-dead-letter-exchange","dlx.exchange"); 这样消息在过期,requeue,队列在达到最大长度时,消息就可以i直接路由到死信队列!

附源码:

消费端:DlxConsumer.java

package com.xuy.rabbitmq.dlx;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.xuy.rabbitmq.ack.MyConsumerByAck;import java.io.IOException;
import java.util.HashMap;
import java.util.concurrent.TimeoutException;/*** @author aric* @create 2021-03-02-18:35* @fun*/
public class Consumer {public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("localhost");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();//这就是一个普通的交换机和队列以及路由String exchangeName = "dlx_exchange";String routingKey = "dlx.save";String queueName = "dlx_queue";channel.exchangeDeclare(exchangeName,"topic",true,false,null);HashMap<String, Object> agruments = new HashMap<>();agruments.put("x-dead-letter-exchange","dlx.exchange");//这个agruments属性,要设置到声明队列上channel.queueDeclare(queueName,true,false,false,agruments);channel.queueBind(queueName,exchangeName,routingKey);//要进行死信队列的声明channel.exchangeDeclare("dlx.exchange","topic",true,false,null);channel.queueDeclare("dlx.queue",true,false,false,null);channel.queueBind("dlx.queue","dlx.exchange","#");channel.basicConsume(queueName,true,new MyConsumerByAck(channel));}
}

生产端:DlxProducer.java

package com.xuy.rabbitmq.dlx;import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author aric* @create 2021-03-02-18:35* @fun** expiration : 10s过期 进入死信队列*/
public class Producer {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("localhost");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();String exchange = "dlx_exchange";String routingKey = "dlx.save";String msg = "hello World";for(int i = 0 ; i < 5;i++) {AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().deliveryMode(2).contentEncoding("UTF-8").expiration("10000").build();channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());}}
}

RabbitMQ消息中间件(二) RabbitMQ如何保证消息的可靠性投递相关推荐

  1. Rabbitmq专题:rabbitMQ如何保证消息的可靠性投递?如何防止消息丢失

    文章目录 1. 消息可能出现丢失的情况 2. 生产者如何保证消息的可靠性投递 2.1 消息落库打标 + confirm机制 2.2 消息幂等性如何保证? 2.3 延时消息确认 3. rabbitMQ服 ...

  2. RabbitMQ(十):RabbitMQ 如何保证消息的可靠性

    一条消费成功被消费经历了生产者->MQ->消费者,因此在这三个步骤中都有可能造成消息丢失. 一 消息生产者没有把消息成功发送到MQ 1.1 事务机制 AMQP协议提供了事务机制,在投递消息 ...

  3. RabbitMQ 如何保证消息的可靠性

    一条消费成功被消费经历了生产者->MQ->消费者,因此在这三个步骤中都有可能造成消息丢失. 一 消息生产者没有把消息成功发送到MQ 1.1 事务机制 AMQP协议提供了事务机制,在投递消息 ...

  4. RabbitMQ如何保证消息的可靠性

    在了解RabbitMQ消息可靠性之前,先来了解一下RabbitMQ整个消息投递的路径: producer --> exchange --> queue --> consumer Ra ...

  5. 【RabbitMQ】消息的可靠性投递与签收

    消息的可靠性投递--Porducer Confirm 确认模式 发生在从Producer发送到Exchange时,发送成功/失败都会自动调用ConfirmCallBack回调方法. 步骤: (1)开启 ...

  6. 消息队列面试 - 如何保证消息的可靠性传输?

    消息队列面试 - 如何保证消息的可靠性传输? 面试题 如何保证消息的可靠性传输?或者说,如何处理消息丢失的问题? 面试官心理分析 这个是肯定的,用 MQ 有个基本原则,就是数据不能多一条,也不能少一条 ...

  7. IM系统中如何保证消息的可靠投递(即QoS机制)(转)

    消息的可靠性,即消息的不丢失和不重复,是im系统中的一个难点.当初qq在技术上(当时叫oicq)因为以下两点原因才打败了icq: 1)qq的消息投递可靠(消息不丢失,不重复) 2)qq的垃圾消息少(它 ...

  8. IM即时通讯-N-如何保证消息的可靠性展示

    结论先行 客户端如何在推拉结合的模式下保证消息的可靠性展示? 原则: server拉取的消息一定是连续的 原则: 端侧记录的消息的连续段有两个作用: 1. 记录消息的连续性, 即起始中间没有断层, 2 ...

  9. rabbitmq接收不到消息_分布式消息队列:如何保证消息的可靠性传输

    rabbitmq (1)生产者弄丢了数据 生产者将数据发送到rabbitmq的时候,可能数据就在半路给搞丢了,因为网络啥的问题,都有可能. 此时可以选择用rabbitmq提供的事务功能,就是生产者发送 ...

最新文章

  1. Apache的服务端包含--SSI
  2. 震惊!ConcurrentHashMap里面也有死循环,作者留下的“彩蛋”了解一下?
  3. 内核管理实战之虚地址转换为物理地址
  4. gradle 编译java_Java的Gradle依赖关系,使用编译还是实现?
  5. 返回一个整数数组中最大子数组的和。
  6. win安装appium
  7. 全国中小学信息技术创新与实践大赛:软件创意编程赛道
  8. NodeJS + WebStorm 中文显示乱码
  9. android判断是华为手机,华为手机怎么辨别真假?华为手机真伪验证多种方法
  10. 【电脑讲解】压缩包的使用技巧
  11. 朗格Lange 1古董车展特别版表落谁家?法拉利 335 Sport非常合衬
  12. coreseek分词
  13. System.BadImageFormatException: 试图加载格式不正确的程序。
  14. java、mysql、tomcat、maven、云环境配置、VUE
  15. HTML绘制七巧板,canvas绘制七巧板
  16. 浅谈企业文化的重要性,搭档之家有话说
  17. 为什么机器人不会抢走你的工作?
  18. LambdaStream
  19. Java 遍历数组的常见方法
  20. 怎么看伦敦银实时行情走势图?

热门文章

  1. 关于实数理论的探讨(一)
  2. PostgreSQL锁机制
  3. 微信小程序 Hi衣橱 衣橱管理小程序
  4. 贵金属入门,有以下四个方法
  5. better comment and better habit
  6. 微信域名检测的最新知识
  7. 12306抢票爬虫selenium+Chromedriver(需手动完成支付)
  8. c语言 有符号数与0作比较大小,C语言中有符号数与无符号数能否进行比较运算...
  9. 程序包无效:“CRX_HEADER_INVALID“的解决方法(最全最细)
  10. 榛子树搜索算法(Hazelnut tree search algorithm,HTS)——一种高效的优化算法