
mq.xml//rabbitmq config
public class AmqpConfig {@Value("${spring.rabbitmq.host}")private String address;@Value("${spring.rabbitmq.username}")private String username;@Value("${spring.rabbitmq.password}")private String password;@Value("${spring.rabbitmq.virtual-host}")private String virtualHost;@Beanpublic ConnectionFactory connectionFactory() {CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setAddresses(address);connectionFactory.setUsername(username);connectionFactory.setPassword(password);connectionFactory.setVirtualHost(virtualHost);connectionFactory.setPublisherConfirms(true); //必须要设置、消息发送确认return connectionFactory;}/***  常用spring为singleton单例模式,此处mq消息需将其改为非单例模式*/@Bean@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)//必须是prototype类型public RabbitTemplate rabbitTemplate() {return new RabbitTemplate(connectionFactory());}@Beanpublic RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {return new RabbitAdmin(connectionFactory);}}//消息发送
public class SystemMqMessageSender {private static final Logger logger = LoggerFactory.getLogger(SystemMqMessageSender.class);@Autowiredprivate AmqpTemplate rabbitTemplate;@Value("${send.exchange.name}")private String exchangeSystem;@Value("${send.queue.name}")private String queueSystem;@Resourceprivate RabbitAdmin rabbitAdmin;public void sendMessage(EventModel eventModel) {String message = JsonUtils.json(eventModel);logger.info("发送消息:{}", message);rabbitTemplate.convertAndSend(exchangeSystem, queueSystem, message);}//声明持久化队列,并绑定到exchange上@Beanpublic Binding bindingExchangeSystem() {Queue queue = QueueBuilder.durable(queueSystem).build();//队列持久化rabbitAdmin.declareQueue(queue);//声明队列DirectExchange exchange = (DirectExchange) ExchangeBuilder.directExchange(exchangeSystem).build();rabbitAdmin.declareExchange(exchange);//创建路由Binding binding = BindingBuilder.bind(queue).to(exchange).withQueueName();//绑定路由rabbitAdmin.declareBinding(binding);return binding;}}//消息接收
@RabbitListener(queues = "${listen.queue.name.system}")
public class SystemMessageListener extends BaseListener implements EventModelConsumer,InitializingBean {private static final Logger logger = LoggerFactory.getLogger(SystemMessageListener.class);@Value("${listen.queue.name.system}")private String queueName;@RabbitHandlerpublic void process(String message) {//监听消息logger.info("接收到消息:{}", message);processMessage(message, queueName);}public void processMessage(String content, String queueName) {//业务处理}


Doing a task can take a few seconds. You may wonder what happens if one of the consumers starts a long task and dies with it only partly done. With our current code, once RabbitMQ delivers a message to the customer it immediately marks it for deletion. In this case, if you kill a worker we will lose the message it was just processing. We'll also lose all the messages that were dispatched to this particular worker but were not yet handled.

But we don't want to lose any tasks. If a worker dies, we'd like the task to be delivered to another worker.

In order to make sure a message is never lost, RabbitMQ supports message acknowledgments. An ack(nowledgement) is sent back by the consumer to tell RabbitMQ that a particular message has been received, processed and that RabbitMQ is free to delete it.

If a consumer dies (its channel is closed, connection is closed, or TCP connection is lost) without sending an ack, RabbitMQ will understand that a message wasn't processed fully and will re-queue it. If there are other consumers online at the same time, it will then quickly redeliver it to another consumer. That way you can be sure that no message is lost, even if the workers occasionally die.

There aren't any message timeouts; RabbitMQ will redeliver the message when the consumer dies. It's fine even if processing a message takes a very, very long time.

Manual message acknowledgments are turned on by default. In previous examples we explicitly turned them off via the autoAck=true flag. It's time to set this flag to false and send a proper acknowledgment from the worker, once we're done with a task.







