前言

消息队列作为一种常用的异步通信解决方案,而redis是一款高性能的nosql产品,今天就给大家介绍一下,如何使用redis实现消息队列,并整合到springboot。

两个消息模型

1. 队列模型

队列模型如图所示,它具有以下几个特点,就像我们用微信和好友(群聊除外)聊天一样,微信就是这个队列,我们可以和很多个好友聊天,但是每条消息只能发给一个好友。

  • 只有一个消费者将获得消息
  • 生产者不需要在接收者消费该消息期间处于运行状态,接收者也同样不需要在消息发送时处于运行状态。
  • 每一个成功处理的消息都由接收者签收

发布/订阅模型

发布/订阅模型如图所示,不用说,和订阅公众号是一样的。

  • 多个消费者可以获得消息
  • 在发布者和订阅者之间存在时间依赖性。发布者需要建立一个topic,以便客户能够购订阅。订阅者必须保持持续的活动状态以接收消息,除非订阅者建立了持久的订阅。在那种情况下,在订阅者未连接时发布的消息将在订阅者重新连接时重新发布

redis如何实现

  1. 对于队列模型,我们可以使用redis的list数据结构,通过LPUSH和RPOP来实现一个队列。
  2. 发布/订阅模型就更简单了,redis官方就支持,而且还可以使用PSUBSCRIBE支持模式匹配,使用如下命令,即可订阅所有f开头的订阅,具体可查看文档。
PSUBSCRIBE f*
  1. keyspace notifications(键空间通知)
    该功能是在redis2.8之后引入的,即客户端可以通过pub/sub机制,接收key的变更的消息。换句话说,就是redis官方提供了一些topic,帮助我们去监听redis数据库中的key,我曾经就使用其中的’keyevent@0:expired’实现了定时任务。

和spring boot整合

首先得介绍一下spring-data-redis中的两种template的默认serializer,当然spring还提供其他的序列化器,具体可查看文档,也可以自己实现RedisSerializer接口,构建自己的序列化器。

template default serializer serialization
RedisTemplate JdkSerializationRedisSerializer 序列化String类型的key和value
StringRedisTemplate StringRedisSerializer 使用Java序列化

发布/订阅模型

终于到了写代码的时候了,先从发布/订阅说起吧,因为spring官方给了示例。但是呢,示例里面的消息是String类型,对于我们的业务来说,可能更需要一个POJO,所以还需要改造一下,走起。

  1. 先学习下org.springframework.data.redis.listener.adapter.MessageListenerAdapter源码如下,可以看到,如果使用StringRedisTemplate的话,默认都是使用StringRedisSerializer来反序列化,而如果想主动接收消息,则需要实现MessageListener接口。
    /*** Standard Redis {@link MessageListener} entry point.* <p>* Delegates the message to the target listener method, with appropriate conversion of the message argument. In case* of an exception, the {@link #handleListenerException(Throwable)} method will be invoked.* * @param message the incoming Redis message* @see #handleListenerException*/public void onMessage(Message message, byte[] pattern) {try {// Check whether the delegate is a MessageListener impl itself.// In that case, the adapter will simply act as a pass-through.if (delegate != this) {if (delegate instanceof MessageListener) {((MessageListener) delegate).onMessage(message, pattern);return;}}// Regular case: find a handler method reflectively.Object convertedMessage = extractMessage(message);String convertedChannel = stringSerializer.deserialize(pattern);// Invoke the handler method with appropriate arguments.Object[] listenerArguments = new Object[] { convertedMessage, convertedChannel };invokeListenerMethod(invoker.getMethodName(), listenerArguments);} catch (Throwable th) {handleListenerException(th);}}/*** Extract the message body from the given Redis message.* * @param message the Redis <code>Message</code>* @return the content of the message, to be passed into the listener method as argument*/protected Object extractMessage(Message message) {if (serializer != null) {return serializer.deserialize(message.getBody());}return message.getBody();}/*** Initialize the default implementations for the adapter's strategies.* * @see #setSerializer(RedisSerializer)* @see JdkSerializationRedisSerializer*/protected void initDefaultStrategies() {RedisSerializer<String> serializer = new StringRedisSerializer();setSerializer(serializer);setStringSerializer(serializer);}
  1. spring data redis实现发布与订阅需要配置以下信息:
  • Topic
  • MessageListener
  • RedisMessageListenerContainer

1). 用到的相关依赖:

dependencies {implementation 'org.apache.commons:commons-pool2'implementation 'com.fasterxml.jackson.core:jackson-core'implementation 'com.fasterxml.jackson.core:jackson-databind'implementation 'org.springframework.boot:spring-boot-starter-data-redis'testImplementation 'org.springframework.boot:spring-boot-starter-test'
}

其中,jackson相关依赖用于将对象序列化成json。

2). 配置 spring data redis:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.johnfnash.learn.config.listener.ConsumerRedisListener;@Configuration
public class RedisConfig {@Autowiredprivate LettuceConnectionFactory connectionFactory;@Beanpublic ConsumerRedisListener consumeRedis() {return new ConsumerRedisListener();}@Beanpublic ChannelTopic topic() {return new ChannelTopic("topic");}@Beanpublic RedisMessageListenerContainer redisMessageListenerContainer() {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);container.addMessageListener(consumeRedis(), topic());return container;}@Beanpublic RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {RedisTemplate<String, Object> template = new RedisTemplate<String, Object>();template.setConnectionFactory(factory);Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<Object>(Object.class);ObjectMapper om = new ObjectMapper();om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);jackson2JsonRedisSerializer.setObjectMapper(om);template.setValueSerializer(jackson2JsonRedisSerializer);template.setKeySerializer(new StringRedisSerializer());template.setHashKeySerializer(new StringRedisSerializer());template.afterPropertiesSet();return template;}}
  1. 实现一个Object类型的 topic MessageListener
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;public class ConsumerRedisListener implements MessageListener {@Autowiredprivate RedisTemplate<String, Object> redisTemplate;@Overridepublic void onMessage(Message message, byte[] pattern) {doBusiness(message);}/*** 打印 message body 内容* @param message*/public void doBusiness(Message message) {Object value = redisTemplate.getValueSerializer().deserialize(message.getBody());System.out.println("consumer message: " + value.toString());}}
  1. 其它:

记得配置上 redis 相关的配置,最简单的application.properties配置如下:

# REDIS (RedisProperties)
# Redis数据库索引(默认为0)
spring.redis.database=0
# Redis服务器地址
spring.redis.host=127.0.0.1
# Redis服务器连接端口
spring.redis.port=6379
# Redis服务器连接密码(默认为空)
spring.redis.password=
# 连接超时时间(毫秒)
spring.redis.timeout=500ms#lettuce客户端
spring.redis.lettuce.pool.min-idle=0
spring.redis.lettuce.pool.max-idle=8
spring.redis.lettuce.pool.max-wait=-1
spring.redis.lettuce.pool.max-active=8
spring.redis.lettuce.shutdown-timeout=100

通过上面四步,简单的订阅者就做好了,通过以下代码可以发布一个消息,同时可以查看到控制台会有订阅者消费信息打印出来:

import java.util.Date;import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringRedisSubscribeApplicationTests {@Autowiredprivate RedisTemplate<String, Object> redisTemplate;@Testpublic void testSubscribe() {String channel = "topic";redisTemplate.convertAndSend(channel, "hello world");redisTemplate.convertAndSend(channel, new Date(System.currentTimeMillis()));redisTemplate.convertAndSend(channel, new MessageEntity("1", "object"));}}

这里用到了一个实体类用于测试。

import java.io.Serializable;public class MessageEntity implements Serializable {private static final long serialVersionUID = 8632296967087444509L;private String id;private String content;public MessageEntity() {super();}public MessageEntity(String id, String content) {super();this.id = id;this.content = content;}// getter, setter @Overridepublic String toString() {return "MessageEntity [id=" + id + ", content=" + content + "]";}}

输出结果如下:

consumer message: hello world
consumer message: Sat Feb 23 13:04:40 CST 2019
consumer message: MessageEntity [id=1, content=object]

最后总结下:

用 spring data redis 来实现 redis 订阅者,本质上还是Listener模式,只需要配置Topic, MessageListener 和 RedisMessageListenerContainer就可以了。同时,发布时,只需要使用 redisTemplate 的 convertAndSend方法即可topic来发布message。

消息队列模型

接下来就是消息队列了,这个就需要自己造轮子了,在spring中使用redisTemlate操作数据库,而对于不同的数据类型则需要不同的操作方式,如下表格所示,具体还是请看官方文档。

实现队列选择list数据结构,redisTemplate.opsForList()使用起来非常简单,和redis命令基本一致。

数据类型 操作方式
string redisTemplate.opsForValue()
hash redisTemplate.opsForHash()
list redisTemplate.opsForList()
set redisTemplate.opsForSet()
  1. 先定义一个消息的POJO
    直接使用上面定义的 MessageEntity 实体类。

  2. 配置 spring data redis

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.johnfnash.learn.redis.queue.entity.MessageEntity;@Configuration
public class RedisConfig {@Beanpublic RedisTemplate<String, MessageEntity> redisTemplate(RedisConnectionFactory factory) {RedisTemplate<String, MessageEntity> template = new RedisTemplate<String, MessageEntity>();template.setConnectionFactory(factory);Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<Object>(Object.class);ObjectMapper om = new ObjectMapper();om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);jackson2JsonRedisSerializer.setObjectMapper(om);template.setValueSerializer(jackson2JsonRedisSerializer);template.setKeySerializer(new StringRedisSerializer());template.setHashKeySerializer(new StringRedisSerializer());template.afterPropertiesSet();return template;}}

用到的 redis 配置信息如下:

# REDIS (RedisProperties)
# Redis数据库索引(默认为0)
spring.redis.database=0
# Redis服务器地址
spring.redis.host=127.0.0.1
# Redis服务器连接端口
spring.redis.port=6379
# Redis服务器连接密码(默认为空)
spring.redis.password=
# 连接超时时间(毫秒)
spring.redis.timeout=5000ms# redis消息队列键名
redis.queue.key=queue
# redis消息队列读取消息超时时间,单位:秒
redis.queue.pop.timeout=1000#lettuce客户端
spring.redis.lettuce.pool.min-idle=0
spring.redis.lettuce.pool.max-idle=8
spring.redis.lettuce.pool.max-wait=-1
spring.redis.lettuce.pool.max-active=8
spring.redis.lettuce.shutdown-timeout=100
  1. 消息的消费者,消费者需要不断轮询队列,有消息便取出来,实现方式如下:
import java.util.concurrent.TimeUnit;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;import com.johnfnash.learn.redis.queue.entity.MessageEntity;@Service
public class MessageConsumerService extends Thread {@Autowiredprivate RedisTemplate<String, MessageEntity> redisTemplate;private volatile boolean flag = true;@Value("${redis.queue.key}")private String queueKey;@Value("${redis.queue.pop.timeout}")private Long popTimeout;@Overridepublic void run() {try {MessageEntity message;while(flag && !Thread.currentThread().isInterrupted()) {message = redisTemplate.opsForList().rightPop(queueKey, popTimeout, TimeUnit.SECONDS);System.out.println("接收到了" + message);}} catch (Exception e) {System.err.println(e.getMessage());}}public boolean isFlag() {return flag;}public void setFlag(boolean flag) {this.flag = flag;}}
  1. 消息的生产者,这个类提供一个发送消息的方法。
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;import com.johnfnash.learn.redis.queue.entity.MessageEntity;@Service
public class MessageProducerService {@Autowiredprivate RedisTemplate<String, MessageEntity> redisTemplate;@Value("${redis.queue.key}")private String queueKey;public Long sendMeassage(MessageEntity message) {System.out.println("发送了" + message);return redisTemplate.opsForList().leftPush(queueKey, message);}}

测试

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringRedisQueueApplicationTests {@Autowiredprivate MessageProducerService producer;@Autowiredprivate MessageConsumerService consumer;@Testpublic void testQueue() {consumer.start();producer.sendMeassage(new MessageEntity("1", "aaaa"));producer.sendMeassage(new MessageEntity("2", "bbbb"));try {Thread.sleep(2000L);} catch (InterruptedException e) {e.printStackTrace();}consumer.interrupt();}}

输出信息如下:

发送了MessageEntity [id=1, content=aaaa]
2019-02-23 13:15:01.156  INFO 21436 --- [       Thread-2] io.lettuce.core.EpollProvider            : Starting without optional epoll library
2019-02-23 13:15:01.159  INFO 21436 --- [       Thread-2] io.lettuce.core.KqueueProvider           : Starting without optional kqueue library
发送了MessageEntity [id=2, content=bbbb]
接收到了MessageEntity [id=1, content=aaaa]
接收到了MessageEntity [id=2, content=bbbb]
Redis command interrupted; nested exception is io.lettuce.core.RedisCommandInterruptedException: Command interrupted

至此,消息队列的方式也整合完成了
虽然redisTemplate是线程安全的,但是如果一个队列有多个接收者的话,可能也还需要考虑一下并发的问题。

转自

  1. springboot整合redis消息队列

  2. Springboot2 之 Spring Data Redis 实现消息队列——发布/订阅模式

springboot整合redis消息队列相关推荐

  1. springboot:整合redis消息队列

    整合redis消息队列 项目依赖 <!-- RedisTemplate --><dependency><groupId>org.springframework.bo ...

  2. Springboot 实现Redis消息队列

    Springboot 实现Redis 消息队列 之前被面试官问到怎么实现Redis的消息队列,我人麻了,当时一个劲的摇头,娘的,欺负我那时知识少,恶心啊  最近看到一个Demo,然后随笔记录了一篇,以 ...

  3. SpringBoot使用Redis消息队列 实现生产/消费者

    文章目录 一.redis 依赖和配置源 二.消费者 2.1.生产者和消息公共的代码 消息队列 key 2.2.redis 消息队列相关配置 1).MsgConsumer 定义公共消息接口 2).Red ...

  4. SpringBoot整合activeMQ消息队列手动签收(Session.CLIENT_ACKNOWLEDGE)为什么失效啊?

    今天在家隔离办公,不太忙,然后就琢磨起来消息队列activeMQ的消息事务来解决分布式事务,但是奈何在SpringBoot整合activeMQ时,其消费者手动签收消息时出现了问题-->当acti ...

  5. SpringBoot整合RabbitMQ消息队列

    RabbitMQ 一.RabbitMQ介绍 1.1 现存问题 服务调用:两个服务调用时,我们可以通过传统的HTTP方式,让服务A直接去调用服务B的接口,但是这种方式是同步的方式,虽然可以采用Sprin ...

  6. SpringBoot整合MQ消息队列

    SpringBoot整合MQ 借鉴的文章 1.什么是MQ 2.消息队列可以做什么 3.下载安装MQ 4.SpringBoot整合MQ的步骤 借鉴的文章 https://www.jianshu.com/ ...

  7. SpringBoot整合RabbitMQ 消息可靠投递、手动ack、延迟队列、死信队列、消息幂等性保障、消息积压

    1.消息可靠投递 在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景.RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式. confirm 确认模式 ...

  8. Springboot整合redis(lettuce)

    springboot 整合redis(lettuce) 首先确保电脑上装了redis.最好能用redisDesktop查看一下数据情况 redis是一款非常流行的Nosql数据库.redis的功能非常 ...

  9. SpringBoot整合redis实现发布订阅模式

    Redis的发布订阅模式 发布订阅(Pub/Sub):目前广泛使用的通信模型,它采用事件作为基本的通信机制,提供大规模系统所要求的松散耦合的交互模式:订阅者(如客户端)以事件订阅的方式表达出它有兴趣接 ...

最新文章

  1. 项目实战之组件化架构
  2. hdu5693 D gamehdu 5712 D++ game
  3. 博文视点大讲堂第30期——职场新人胜出关键点
  4. 08-R包那么多,怎么才能快速找到自己需要的包呢?
  5. 移动端ajax分页,移动端分页加载 - 花乐天的个人空间 - OSCHINA - 中文开源技术交流社区...
  6. BZOJ1604 洛谷2906:[USACO2008 OPEN]Cow Neighborhoods 奶牛的邻居——题解
  7. CONVERT TEXT(转换为可排序格式)
  8. Arch Linux下 让MPlayer用上CoreAVC1.7.0.0解码器
  9. 安卓设备手柄无法映射线性扳机的解决思路(1)
  10. linux使用dwc串口,linux自带usb gadget设备驱动应用
  11. Eclipse配置android开发环境详解
  12. 抖音无水印解析网站源码
  13. 双目三维重建:双目摄像头实现双目测距(Python)
  14. 你有必要不沾计算机一段时间英语,八年级上册英语第一单元背默(人教版)
  15. 旅游指南之一----各地旅行社
  16. 计算机局域网组网技术的核心技术,自考“局域网技术与组网工程”模拟题(6)
  17. 一张纸对折多少次后能达到珠穆朗玛峰的高度
  18. Spark RDD实训4:计算总成绩
  19. 【微信小程序使用canvas绘制二维码】
  20. 第10章第10节:使用iSlide的幻灯片诊断工具优化幻灯片中的图片 [PowerPoint精美幻灯片实战教程]

热门文章

  1. 洛谷P1363幻象迷宫
  2. Win10任务栏不显示蓝牙图标 - 解决方案
  3. 计算机中专可以考殡仪专业吗,全国只有5所学校开设,专业很冷门,但就业率100%...
  4. mac redis启动与关闭
  5. 职业中专学校有哪些专业?中职优选
  6. 蓝牙智能指纹锁解决方案OM6621PW
  7. 前端开发中图片缓存清除
  8. 欧虎网站系统移服务器,欧虎SNS互动社区平台
  9. html5音乐播放器设计论文,音乐播放器的设计与实现毕业设计论文
  10. 【图灵奖得主】Jeffrey D. Ullman 斯坦福大学