queue(队列):存储消息,消费者从队列中消费消息,durable表示队列是否持久化,持久化队列在rabbitmq服务重启后还在;

exchange(交换):生产者将消息生产给交换机,再由交换机路由到绑定队列,AMQP规范将“默认Exchange(direct)”定义为没有名称。并且所有队列都使用其名称作为绑定值自动绑定到该默认Exchange(即直接Exchange),所以也可以设置routingKey为队列名称发送消息到指定队列。交换机也可以设置持久化,交换机有以下几种类型:

  • fanout:所有发送到该Exchange的消息都会路由到所有与它绑定的Queue中;
  • direct:把消息路由到那些binding key与routing key完全匹配的Queue中;
  • topic:将消息路由到binding key与routing key相匹配的Queue中,但这里为模糊匹配;
  • headers:headers类型的Exchange不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。

binding(绑定):连接exchange和queue;

bindingKey:绑定exchange和queue的句点号“. ”分隔的字符串;

routingKey:发送消息时指定的路由关键字,也是句点号“. ”分隔的字符串,通过它来匹配bindingKey从而发送到指定队列;

autoAck:设置队列是否自动应答,默认为true,消费者应答表示该消费者消费消息并处理完成,rabbitmq会删除该消息,如果一个消费者挂掉而没有应答,rabbitmq会将任务交付给另一个消费者去处理。如果设置为false,而消费者处理完消息后没有手动应答,rabbitmq认为消息处理失败,而不会删除消息,可通过basicAck进行手动应答,

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false)

Delivery Tag: 用来标识信道中投递的消息。RabbitMQ 推送消息给 Consumer 时,会附带一个 Delivery Tag,multiple 取值为 false 时,表示通知 RabbitMQ 当前消息被确认;如果为 true,则额外将比第一个参数指定的 delivery tag 小的消息一并确认。

如果消费者处理消息中抛出异常并且没有捕获,程序没有主动应答或自动应答,rabbitmq认为该消息没有被正确处理,而不会删出该消息,因此消费者会一直消费该条消息,并持续抛出异常,

SimpleMessageListenerContainer: 最简单的消息监听器容器,只能处理固定数量的JMS会话,且不支持事务;

DefaultMessageListenerContainer:这个消息监听器容器建立在SimpleMessageListenerContainer容器之上,添加了对事务的支持;

ListenerContainer:这是功能最强大的消息监听器,与DefaultMessageListenerContainer相同,它支持事务,但是它还允许动态地管理JMS会话。

rabbitmq两种监听器消费消息的方法:

1、@RabbitListener注解的方式

@Component
public class OutBoundListener {@RabbitListener(queues = "MSG_OUTBOUND_APP8075", containerFactory = "rlcFactory")public void process(Message message) {log.info("接收消息:{}", message.toString());}
}

rlcFactory是在DefaultRabbitConfig中配置的SimpleRabbitListenerContainerFactory

package com.spring.component.rabbitmq.config;import java.io.IOException;
import java.util.concurrent.TimeoutException;import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import com.spring.component.rabbitmq.Properties;/*** 默认rabbitmq配置类* * @author liushihua* @since 2018年04月11日*/
@Configuration
public class DefaultRabbitConfig extends RabbitConfig {@Autowiredprivate Properties rabbitProperties;@Bean("connectionFactory")public ConnectionFactory connectionFactory() {if (rabbitProperties.validQueue()) {return this.instanceConnectionFactory(rabbitProperties.getHost(),Integer.parseInt(rabbitProperties.getPort()), rabbitProperties.getUsername(),rabbitProperties.getPassword(), rabbitProperties.getVirtualHost());} else {log.info("没有配置默认mq参数,将不能使用默认mq");return new CachingConnectionFactory();}}@Bean("rabbitTemplate")public RabbitTemplate rabbitTemplate(@Qualifier("connectionFactory") ConnectionFactory connectionFactory) {return this.instanceRabbitTemplate(connectionFactory);}@Bean("rlcFactory")public SimpleRabbitListenerContainerFactory rlcFactory(@Qualifier("connectionFactory") ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = this.instanceFactory(connectionFactory);return factory;}@Bean("topicExchange")public TopicExchange topicExchange(@Qualifier("connectionFactory") CachingConnectionFactory connectionFactory) {if ("true".equals(rabbitProperties.getExchange())) {try {this.topicExchangeDeclare(connectionFactory);log.info("exchangeDeclare成功:TOPIC.EXCHANGE");} catch (IOException e) {log.error("exchangeDeclare失败:" + e.getMessage());} catch (TimeoutException e) {log.error("exchangeDeclare失败, 访问超时:" + e.getMessage());}}return new TopicExchange(Properties.TOPIC_EXCHANGE);}
}
package com.spring.component.rabbitmq.config;import java.io.IOException;
import java.util.concurrent.TimeoutException;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.spring.component.utils.CollectionUtil;/*** 自定义rabbitmq配置* * @author liushihua* @since 2018年04月10日*/
public class RabbitConfig {protected final Logger log = LoggerFactory.getLogger(this.getClass());public CachingConnectionFactory instanceConnectionFactory(String host, int port, String username,String password, String virtualHost) {CachingConnectionFactory factory = new CachingConnectionFactory();factory.setHost(host);factory.setPort(port);factory.setUsername(username);factory.setPassword(password);if (CollectionUtil.isNotBlank(virtualHost)) {factory.setVirtualHost(virtualHost);}return factory;}public RabbitTemplate instanceRabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);// 为RabbitTemplate指定发送消息是的转换器rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());return rabbitTemplate;}/*** 配置RabbitListenerContainer的工厂类,可配置在@RabbitListener注解中的containerFactory属性* * @param connectionFactory* @return*/public SimpleRabbitListenerContainerFactory instanceFactory(ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);// 指定了我们接受消息的时候,以 JSON 传输的消息可以转换成对应的类型传入到方法中factory.setMessageConverter(new Jackson2JsonMessageConverter());// 设置为手动应答// factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);// 为每个listener配置并发的消费者个数的。// factory.setConcurrentConsumers(5);// 最大个数// factory.setMaxConcurrentConsumers(10);return factory;}/*** 配置rabbitTemplate的交换机,该模板默认发送消息到TOPIC.EXCHANGE交换机* * @param connectionFactory* @throws IOException* @throws TimeoutException*/public void topicExchangeDeclare(CachingConnectionFactory connectionFactory)throws IOException, TimeoutException {Connection connection = connectionFactory.getRabbitConnectionFactory().newConnection();Channel channel = connection.createChannel();// 交换机名称,交换机类型,是否持久化channel.exchangeDeclare("TOPIC.EXCHANGE", "topic", true);}
}
package com.spring.component.rabbitmq;import java.io.IOException;
import java.util.concurrent.TimeoutException;
import java.util.stream.Stream;import javax.annotation.Resource;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.spring.component.utils.CollectionUtil;
import com.spring.component.utils.StringUtil;/*** 绑定队列和交换机* @author liushihua* @since 2018年04月11日*/
@Component
public class QueueBuilder {private final Logger log = LoggerFactory.getLogger(this.getClass());@Resourceprivate CachingConnectionFactory connectionFactory;@Resource(name = "iotbusConnectionFactory")private CachingConnectionFactory iotBusConnectionFactory;@Resource(name = "logserviceConnectionFactory")private CachingConnectionFactory logserviceConnectionFactory;@Autowiredprivate Properties rabbitProperties;@Autowiredprivate TopicExchange topicExchange;@Autowiredprivate TopicExchange iotbusTopicExchange;@Beanpublic QueueBuilder intiQueue() {log.info("系统加载定义queue......");// 初始化默认mq队列if (rabbitProperties.validQueue()) {// 声明队列queueDeclare(connectionFactory, rabbitProperties.getQueues());// 声明订阅topic的队列queueDeclare(connectionFactory, rabbitProperties.getTopics());}// 初始化iotbus mq队列if (rabbitProperties.validIotbusQueue()) {queueDeclare(iotBusConnectionFactory, rabbitProperties.getIotbusQueues());queueDeclare(iotBusConnectionFactory, rabbitProperties.getIotbusTopics());}// 初始化logservice mq队列if (rabbitProperties.validLogQueue()) {queueDeclare(logserviceConnectionFactory, rabbitProperties.getLogQueues());}// 绑定默认mq topic routingKey到指定交换机上bindExchange(connectionFactory, topicExchange.getName(), rabbitProperties.getRouting());// 绑定iotbus mq topic routingKeybindExchange(iotBusConnectionFactory, iotbusTopicExchange.getName(),rabbitProperties.getIotbusRouting());return new QueueBuilder();}/*** 声明队列,根据队列是否配置auotAck自动应答创建队列* * @param connectionFactory* @param queues*/private void queueDeclare(CachingConnectionFactory connectionFactory, String queues) {Channel channel = channel(connectionFactory);if (null == channel) {log.error("channel为空,queueDeclare失败");return;}if (StringUtil.isNotBlank(queues)) {Stream.of(queues.trim().split(",")).filter(StringUtil::isNotBlank).forEach(queue -> {String queueName = queue;// 是否自动ack,如果不自动ack,需要使用channel.ack、channel.nack、channel.basicReject 进行消息应答String autoAck = null;int index = queue.indexOf(":");if (index > -1) {queueName = queue.trim().substring(0, index);autoAck = queue.trim().substring(index + 1);}try {// 创建队列,durable表示队列是否持久化,持久化队列在rabbitmq服务重启后还在channel.queueDeclare(queueName, true, false, false, null);if (false && StringUtil.equals("false", autoAck)) {// 设置队列是否自动ack,默认自动应答,channel.basicConsume(queueName, false, new DefaultConsumer(channel));}StringBuilder sb = new StringBuilder("queueDeclare成功:").append(queueName);if (StringUtil.isNotBlank(autoAck)) {sb.append(", autoAck = ").append(autoAck);}log.info(sb.toString());} catch (IOException e) {log.error("定义queue失败[{}],异常:{}", queueName, e.getMessage());}});}}/*** 为队列绑定交换机topic* * @param connectionFactory* @param exchange* @param rotuing*/private void bindExchange(CachingConnectionFactory connectionFactory, String exchange,String rotuing) {Channel channel = channel(connectionFactory);if (null == channel) {log.error("channel为空,bindExchange失败");return;}if (StringUtil.isAllNotBlank(exchange, rotuing)) {Stream.of(rotuing.trim().split("&")).filter(StringUtil::isNotBlank).forEach(rotuingName -> {String[] ss = rotuingName.trim().split(":");if (null != ss && ss.length > 1) {String routingKey = ss[0].trim();String routingValues = ss[1].trim();if (StringUtil.isNotBlank(routingKey) && CollectionUtil.isNotBlank(routingValues)) {Stream.of(routingValues.trim().split(",")).filter(StringUtil::isNotBlank).forEach(queue -> {try {// 为队列绑定交换机topic和bindKeychannel.queueBind(queue.trim(), exchange, routingKey);log.info("#####################      绑定topic成功: topicName={} & queue= {} & routingKey= {}",exchange, queue.trim(), routingKey);} catch (IOException e) {log.error("bindExchange失败routing[{}],异常: {}", rotuing, e.getMessage());}});} else {log.error("routing 格式不正确,rotuing={}", rotuingName);}} else {log.error("routing 格式不正确,rotuing={}", rotuingName);}});}}/*** 获得channel,并设置同一时间每次发给一个消息给一个worker* * @param connectionFactory* @return*/private Channel channel(CachingConnectionFactory connectionFactory) {Channel channel = null;try {Connection connection = connectionFactory.getRabbitConnectionFactory().newConnection();channel = connection.createChannel();// 设置同一时间每次发给一个消息给一个workerchannel.basicQos(1, false);} catch (IOException e) {log.error("createChannel失败,异常:{}", e.getMessage());} catch (TimeoutException e) {log.error("createChannel超时,异常:{}", e.getMessage());}return channel;}
}
package com.spring.component.rabbitmq;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;import com.spring.component.utils.StringUtil;/*** 多rabbitmq环境配置* * @author liushihua* @since 2018年04月11日*/
@Component
public class Properties {// 默认rabbitmq配置private String host;private String port;private String username;private String password;private String virtualHost;private String queues;private String topics;private String routing;private String exchange;private String iotbusHost;// 自定义rabbitmq配置private String iotbusPort;private String iotbusUsername;private String iotbusPassword;private String iotbusVirtualHost;private String iotbusQueues;private String iotbusTopics;private String iotbusRouting;private String iotbusExchange;// 日志rabbitmq配置private String logHost;private String logPort;private String logUsername;private String logPassword;private String logVirtualHost;private String logQueues;private String logEnable;@Autowiredprivate Environment env;public static final String TOPIC_EXCHANGE = "TOPIC.EXCHANGE";@Bean(name = "rabbitProperties")public Properties properties() {Properties properties = new Properties();this.host = this.env.getProperty("spring.rabbitmq.host", "");this.port = this.env.getProperty("spring.rabbitmq.port", "");this.username = this.env.getProperty("spring.rabbitmq.username", "");this.password = this.env.getProperty("spring.rabbitmq.password", "");this.virtualHost = this.env.getProperty("spring.rabbitmq.virtualHost", "");this.queues = this.env.getProperty("spring.rabbitmq.queues", "");this.topics = this.env.getProperty("spring.rabbitmq.topics", "");this.routing = this.env.getProperty("spring.rabbitmq.topic.routing", "");this.exchange = this.env.getProperty("spring.rabbitmq.topic.exchange", "true");this.iotbusHost = this.env.getProperty("iotbus.rabbitmq.host", "");this.iotbusPort = this.env.getProperty("iotbus.rabbitmq.port", "");this.iotbusUsername = this.env.getProperty("iotbus.rabbitmq.username", "");this.iotbusPassword = this.env.getProperty("iotbus.rabbitmq.password", "");this.iotbusVirtualHost = this.env.getProperty("iotbus.rabbitmq.virtualHost", "");this.iotbusQueues = this.env.getProperty("iotbus.rabbitmq.queues", "");this.iotbusTopics = this.env.getProperty("iotbus.rabbitmq.topics", "");this.iotbusRouting = this.env.getProperty("iotbus.rabbitmq.topic.routing", "");this.iotbusExchange = this.env.getProperty("iotbus.rabbitmq.topic.exchange", "true");this.logHost = this.env.getProperty("log.rabbitmq.host", "");this.logPort = this.env.getProperty("log.rabbitmq.port", "");this.logUsername = this.env.getProperty("log.rabbitmq.username", "");this.logPassword = this.env.getProperty("log.rabbitmq.password", "");this.logVirtualHost = this.env.getProperty("log.rabbitmq.virtualHost", "");this.logQueues = this.env.getProperty("log.rabbitmq.queues", "");this.logEnable = this.env.getProperty("log.enable", "");properties.setHost(this.host);properties.setPort(this.port);properties.setUsername(this.username);properties.setPassword(this.password);properties.setVirtualHost(this.virtualHost);properties.setQueues(this.queues);properties.setTopics(this.topics);properties.setRouting(this.routing);properties.setExchange(this.exchange);properties.setIotbusHost(this.iotbusHost);properties.setIotbusPort(this.iotbusPort);properties.setIotbusUsername(this.iotbusUsername);properties.setIotbusPassword(this.iotbusPassword);properties.setIotbusVirtualHost(this.iotbusVirtualHost);properties.setIotbusQueues(this.iotbusQueues);properties.setIotbusTopics(this.iotbusTopics);properties.setIotbusRouting(this.iotbusRouting);properties.setIotbusExchange(this.iotbusExchange);properties.setLogHost(this.logHost);properties.setLogPort(this.logPort);properties.setLogUsername(this.logUsername);properties.setLogPassword(this.logPassword);properties.setLogVirtualHost(this.logVirtualHost);properties.setLogQueues(this.logQueues);properties.setLogEnable(this.logEnable);return properties;}public boolean validQueue() {return StringUtil.isNumeric(this.port)&& StringUtil.isAllNotBlank(this.host, this.username, this.password);}public boolean validIotbusQueue() {return StringUtil.isNumeric(this.iotbusPort)&& StringUtil.isAllNotBlank(this.iotbusHost, this.iotbusUsername, this.iotbusPassword);}public boolean validLogQueue() {return StringUtil.isNumeric(this.logPort)&& StringUtil.isAllNotBlank(this.logHost, this.logUsername, this.logPassword);}public String getHost() {return host;}public void setHost(String host) {this.host = host;}public String getPort() {return port;}public void setPort(String port) {this.port = port;}public String getUsername() {return username;}public void setUsername(String username) {this.username = username;}public String getPassword() {return password;}public void setPassword(String password) {this.password = password;}public String getVirtualHost() {return virtualHost;}public void setVirtualHost(String virtualHost) {this.virtualHost = virtualHost;}public String getQueues() {return queues;}public void setQueues(String queues) {this.queues = queues;}public String getTopics() {return topics;}public void setTopics(String topics) {this.topics = topics;}public String getRouting() {return routing;}public void setRouting(String routing) {this.routing = routing;}public String getExchange() {return exchange;}public void setExchange(String exchange) {this.exchange = exchange;}public String getIotbusHost() {return iotbusHost;}public void setIotbusHost(String iotbusHost) {this.iotbusHost = iotbusHost;}public String getIotbusPort() {return iotbusPort;}public void setIotbusPort(String iotbusPort) {this.iotbusPort = iotbusPort;}public String getIotbusUsername() {return iotbusUsername;}public void setIotbusUsername(String iotbusUsername) {this.iotbusUsername = iotbusUsername;}public String getIotbusPassword() {return iotbusPassword;}public void setIotbusPassword(String iotbusPassword) {this.iotbusPassword = iotbusPassword;}public String getIotbusVirtualHost() {return iotbusVirtualHost;}public void setIotbusVirtualHost(String iotbusVirtualHost) {this.iotbusVirtualHost = iotbusVirtualHost;}public String getIotbusQueues() {return iotbusQueues;}public void setIotbusQueues(String iotbusQueues) {this.iotbusQueues = iotbusQueues;}public String getIotbusTopics() {return iotbusTopics;}public void setIotbusTopics(String iotbusTopics) {this.iotbusTopics = iotbusTopics;}public String getIotbusRouting() {return iotbusRouting;}public void setIotbusRouting(String iotbusRouting) {this.iotbusRouting = iotbusRouting;}public String getIotbusExchange() {return iotbusExchange;}public void setIotbusExchange(String iotbusExchange) {this.iotbusExchange = iotbusExchange;}public String getLogHost() {return logHost;}public void setLogHost(String logHost) {this.logHost = logHost;}public String getLogPort() {return logPort;}public void setLogPort(String logPort) {this.logPort = logPort;}public String getLogUsername() {return logUsername;}public void setLogUsername(String logUsername) {this.logUsername = logUsername;}public String getLogPassword() {return logPassword;}public void setLogPassword(String logPassword) {this.logPassword = logPassword;}public String getLogVirtualHost() {return logVirtualHost;}public void setLogVirtualHost(String logVirtualHost) {this.logVirtualHost = logVirtualHost;}public String getLogQueues() {return logQueues;}public void setLogQueues(String logQueues) {this.logQueues = logQueues;}public String getLogEnable() {return logEnable;}public void setLogEnable(String logEnable) {this.logEnable = logEnable;}
}

2、实现ChannelAwareMessageListener,并注册到SimpleMessageListenerContainer的方式,这种方式可以实现消息的手动应答。

MessageListener实现:

package com.spring.component.rabbitmq.listener;import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;/*** 消费者,手动ack* * @author liushihua* @since 2018年04月25日*/
@Component
public class Receiver implements ChannelAwareMessageListener {private final Logger log = LoggerFactory.getLogger(this.getClass());@Overridepublic void onMessage(Message message, Channel channel) throws Exception {log.info("接收消息:{}", message.toString());// 手动应答ack// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);log.info("处理完成");}
}

注册到SimpleMessageListenerContainer:

package com.spring.component.rabbitmq.config;import com.spring.component.rabbitmq.listener.Receiver;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 配置rabbitmq监听器* * @author liushihua* @since 2018年04月25日*/
@Configuration
public class ListenerConfig {@Autowiredprivate Receiver receiver;/*** 注册一个简单的监听器容器 1、SimpleMessageListenerContainer 简单的监听器容器,不支持事务 2、DefaultMessageListenerContainer* 支持事务 3、ListenerContainer 不仅支持事务,还允许动态地管理JMS会话* * @param connectionFactory* @return*/@Beanpublic SimpleMessageListenerContainer listenerContainer(@Qualifier("connectionFactory") ConnectionFactory connectionFactory) {SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();// 设置connectionFactorycontainer.setConnectionFactory(connectionFactory);// 设置消息json转换器container.setMessageConverter(new Jackson2JsonMessageConverter());// 队列container.setQueueNames("MSG_INBOUND_APP8075");container.setExposeListenerChannel(true);// 设置手动ackcontainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);// 设置容器的消费者的线程数container.setConcurrentConsumers(1);// 设置容器的最大消费者数container.setMaxConcurrentConsumers(10);// 讲一个MessageListener实现注入到监听容器container.setMessageListener(receiver);return container;}
}

Rabbitmq支持多线程消费消息,可以在消息监听容器SimpleMessageListenerContainer设置线程数:

  • container.setConcurrentConsumers(1);设置容器的消费者的线程数
  • container.setMaxConcurrentConsumers(10);设置容器的最大消费者数

也可以在消息监听容器设置消息手动应答: container.setAcknowledgeMode(AcknowledgeMode.MANUAL);

JMS RabbitMQ消息代理相关推荐

  1. RabbitMQ 入门:1. Message Broker(消息代理)

    Message Broker(消息代理) 维基百科对 Message Broker 的定义是:Message broker是一种中介程序模块,它把消息从发送方的正式消息传递协议转化为接收方的正式消息传 ...

  2. Springboot 整合Websocket+Stomp协议+RabbitMQ做消息代理 实例教程

    前言 如果你还没有了解过websocket,关于整合websocket的简单入门使用,可以先看看我这篇: <SpringBoot 整合WebSocket 简单实战案例> https://b ...

  3. 消息代理 - RabbitMQ - 学习/实践

    1.应用场景 用于在分布式系统中存储转发消息,在易用性.扩展性.高可用性等方面表现不俗. 2.学习/操作 1. 文档 Messaging that just works - RabbitMQ // 官 ...

  4. ActiveMQ作为Logstash的消息代理

    扩展Logstash时,通常会添加一个消息代理,该消息代理用于在一个或多个Logstash节点处理传入消息之前临时缓冲传入的消息. 数据通过像Beaver这样的发运人推送到代理, Beaver读取日志 ...

  5. RabbitMQ消息模型详解

    目录 一.消息队列 什么是消息队列 AMQP和JMS 常见MQ产品 二.RabbitMQ 三.五种消息模型 四.简单消息模型 代码演示 获取连接 生产者 消费者 手动ACK 五.工作模式 代码演示: ...

  6. day72 JavaWeb框架阶段——RabbitMQ消息队列【了解常见的MQ产品,了解RabbitMQ的5种消息模型,会使用Spring AMQP】

    文章目录 0.学习目标 1.RabbitMQ 1.1.搜索与商品服务的问题 1.2.消息队列(MQ) 1.2.1.什么是消息队列 1.2.2.AMQP和JMS 1.2.3.常见MQ产品 1.2.4.R ...

  7. RabbitMQ 入门系列(6)— 如何保证 RabbitMQ 消息不丢失

    1. 消息丢失源头 RabbitMQ 消息丢失的源头主要有以下三个: 生产者丢失消息 RabbitMQ 丢失消息 消费者丢失消息 下面主要从 3 个方面进行说明并提供应对措施 2. 生产者丢失消息 R ...

  8. 大数据互联网架构阶段 QuartZ定时任务+RabbitMQ消息队列

    QuartZ定时任务+RabbitMQ消息队列 一 .QuartZ定时任务解决订单系统遗留问题 情景分析: 在电商项目中 , 订单生成后 , 数据库商品数量-1 , 但是用户迟迟不进行支付操作 , 这 ...

  9. 干货:RabbitMQ消息队列基本原理介绍

    RabbitMQ 是高级消息队列协议(AMQP)的开源消息代理软件. RabbitMQ 服务器是用 Erlang 语言编写的,消息系统允许软件.应用相互连接和扩展.这些应用可以相互链接起来组成一个更大 ...

最新文章

  1. 我们试着用FBI的方法破解了一台安卓机
  2. 正则表达式匹配单个字符(.、[]、\d、\D、\s、\S、\w、\W)
  3. 最新EOS合约教程,从系统到环境,从创建到部署,从前端到后台,Dapp开发
  4. 2017年html5行业报告,云适配发布2017 HTML5开发者生态报告 期待更多行业标准
  5. python读excel乱码_Python读写excel练习_去除excel中乱码行,并添加列
  6. 网络爬虫:采用“负载均衡”策略来优化网络爬虫
  7. win7窗口颜色没有透明的开启教程
  8. matlab guide实现多级界面
  9. P1319 压缩技术(python3实现)
  10. hive check in checkDiagnosticMessage found error
  11. 报错, Exception: Missing URI template variable ‘id‘ for method parameter of type Long
  12. Mybatis简单数据库查询
  13. PostScript —— 一种编程语言
  14. 雷林鹏分享:jQuery EasyUI 树形菜单 - 创建带复选框的树形菜单
  15. 【钢带厚度预测】基于matlab模拟退火遗传算法优化BP神经网络钢带厚度预测【含Matlab源码 1285期】
  16. 面试中如何巧妙回答离职原因
  17. 一个故事讲完CPU的工作原理
  18. tftd32搭建DHCP服务器软件打开报错
  19. 群晖系统硬盘损毁的修复
  20. mysql数据可视化 1

热门文章

  1. MTD,文件系统,存储器分区的个人理解
  2. linux ubuntu配置要求,Ubuntu Server 14.04和Kylin 14.04 Enhanced Release amd64最低硬件配置要求...
  3. EM2040D和SES2000采集图像判读(一)
  4. python学习笔记(七):运算符和流程控制
  5. jmeter5.4.1 调整默认工具栏图片大小
  6. [转]冬天上厕所,马桶太冷怎么办?
  7. 石油公路工程都在用的光纤测试仪是什么型号
  8. TFN F7 光时域反射仪 给您不一样体验
  9. img文件制作linux启动u盘,如何在Linux系统中制作可启动img/iso镜像文件
  10. 黑盒测试用例设计--题目3