来自公众号:Hollis

>>>千人线上直播活动报名倒计时(今晚20:00): 从Oracle出发,走进GaussDB的世界


众所周知,redis是一个高性能的分布式key-value存储系统,在NoSQL数据库市场上,redis自己就占据了将近半壁江山,足以见到其强大之处。同时,由于redis的单线程特性,我们可以将其用作为一个消息队列。本篇文章就来讲讲如何将redis整合到spring boot中,并用作消息队列的……

一、什么是消息队列

“消息队列”是在消息的传输过程中保存消息的容器。——《百度百科》

消息我们可以理解为在计算机中或在整个计算机网络中传递的数据。

队列是我们在学习数据结构的时候学习的基本数据结构之一,它具有先进先出的特性。

所以,消息队列就是一个保存消息的容器,它具有先进先出的特性。

为什么会出现消息队列?

  1. 异步:常见的B/S架构下,客户端向服务器发送请求,但是服务器处理这个消息需要花费的时间很长的时间,如果客户端一直等待服务器处理完消息,会造成客户端的系统资源浪费;而使用消息队列后,服务器直接将消息推送到消息队列中,由专门的处理消息程序处理消息,这样客户端就不必花费大量时间等待服务器的响应了;

  2. 解耦:传统的软件开发模式,模块之间的调用是直接调用,这样的系统很不利于系统的扩展,同时,模块之间的相互调用,数据之间的共享问题也很大,每个模块都要时时刻刻考虑其他模块会不会挂了;使用消息队列以后,模块之间不直接调用,而是通过数据,且当某个模块挂了以后,数据仍旧会保存在消息队列中。最典型的就是生产者-消费者模式,本案例使用的就是该模式;

  3. 削峰填谷:某一时刻,系统的并发请求暴增,远远超过了系统的最大处理能力后,如果不做任何处理,系统会崩溃;使用消息队列以后,服务器把请求推送到消息队列中,由专门的处理消息程序以合理的速度消费消息,降低服务器的压力。

下面一张图我们来简单了解一下消息队列

由上图可以看到,消息队列充当了一个中间人的角色,我们可以通过操作这个消息队列来保证我们的系统稳定。

二、环境准备

Java环境:jdk1.8

spring boot版本:2.2.1.RELEASE

redis-server版本:3.2.100

三、相关依赖

这里只展示与redis相关的依赖,

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-redis</artifactId></dependency>

这里解释一下这两个依赖:

  • 第一个依赖是对redis NoSQL的支持

  • 第二个依赖是spring integration与redis的结合,这里添加这个代码主要是为了实现分布式锁

四、配置文件

这里只展示与redis相关的配置

# redis所在的的地址spring.redis.host=localhost# redis数据库索引,从0开始,可以从redis的可视化客户端查看spring.redis.database=1# redis的端口,默认为6379spring.redis.port=6379# redis的密码spring.redis.password=# 连接redis的超时时间(ms),默认是2000spring.redis.timeout=5000# 连接池最大连接数spring.redis.jedis.pool.max-active=16# 连接池最小空闲连接spring.redis.jedis.pool.min-idle=0# 连接池最大空闲连接spring.redis.jedis.pool.max-idle=16# 连接池最大阻塞等待时间(负数表示没有限制)spring.redis.jedis.pool.max-wait=-1# 连接redis的客户端名spring.redis.client-name=mall

五、代码配置

redis用作消息队列,其在spring boot中的主要表现为一RedisTemplate.convertAndSend()方法和一个MessageListener接口。所以我们要在IOC容器中注入一个RedisTemplate和一个实现了MessageListener接口的类。话不多说,先看代码

配置RedisTemplate

配置RedisTemplate的主要目的是配置序列化方式以解决乱码问题,同时合理配置序列化方式还能降低一点性能开销。

/*** 配置RedisTemplate,解决乱码问题*/@Beanpublic RedisTemplate&lt;String, Object&gt; redisTemplate(RedisConnectionFactory factory) {LOGGER.debug("redis序列化配置开始");RedisTemplate&lt;String, Object&gt; template = new RedisTemplate&lt;&gt;();template.setConnectionFactory(factory);// string序列化方式RedisSerializer serializer = new GenericJackson2JsonRedisSerializer();// 设置默认序列化方式template.setDefaultSerializer(serializer);template.setKeySerializer(new StringRedisSerializer());template.setHashValueSerializer(serializer);LOGGER.debug("redis序列化配置结束");return template;}

代码第12行,我们配置默认的序列化方式为GenericJackson2JsonRedisSerializer

代码第13行,我们配置键的序列化方式为StringRedisSerializer

代码第14行,我们配置哈希表的值的序列化方式为GenericJackson2JsonRedisSerializer

RedisTemplate几种序列化方式的简要介绍

六、redis队列监听器(消费者)

上面说了,与redis队列监听器相关的类为一个名为MessageListener的接口,下面是该接口的源码

public interface MessageListener {void onMessage(Message message, @Nullable byte[] pattern);}

可以看到,该接口仅有一个onMessage(Message message, @Nullable byte[] pattern)方法,该方法便是监听到队列中消息后的回调方法。下面解释一下这两个参数:

  • message:redis消息类,该类中仅有两个方法

    • byte[] getBody()以二进制形式获取消息体

    • byte[] getChannel()以二进制形式获取消息通道

  • pattern:二进制形式的消息通道,和message.getChannel()返回值相同

介绍完接口,我们来实现一个简单的redis队列监听器

@Componentpublic class RedisListener implement MessageListener{private static final Logger LOGGER = LoggerFactory.getLogger(RedisListener.class);@Overridepublic void onMessage(Message message,byte[] pattern){LOGGER.debug("从消息通道={}监听到消息",new String(pattern));LOGGER.debug("从消息通道={}监听到消息",new String(message.getChannel()));LOGGER.debug("元消息={}",new String(message.getBody()));// 新建一个用于反序列化的对象,注意这里的对象要和前面配置的一样// 因为我前面设置的默认序列化方式为GenericJackson2JsonRedisSerializer// 所以这里的实现方式为GenericJackson2JsonRedisSerializerRedisSerializer serializer=new GenericJackson2JsonRedisSerializer();LOGGER.debug("反序列化后的消息={}",serializer.deserialize(message.getBody()));}}

代码很简单,就是输出参数中包含的关键信息。需要注意的是,RedisSerializer的实现要与上面配置的序列化方式一致。

队列监听器实现完以后,我们还需要将这个监听器添加到redis队列监听器容器中,代码如下:

@Beanpublic public RedisMessageListenerContainer container(RedisConnectionFactory factory) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(factory);container.addMessageListener(redisListener, new PatternTopic("demo-channel"));return container;}

这几行代码大概意思就是新建一个Redis消息监听器容器,然后将监听器和管道名想绑定,最后返回这个容器。

这里要注意的是,这个管道名和下面将要说的推送消息时的管道名要一致,不然监听器监听不到消息。

七、redis队列推送服务(生产者)

上面我们配置了RedisTemplate将要在这里使用到。

代码如下:

@Servicepublic class Publisher{@Autowriteprivate RedisTemplate redis;public void publish(Object msg){redis.convertAndSend("demo-channel",msg);}}

关键代码为第7行,redis.convertAndSend()这个方法的作用为,向某个通道(参数1)推送一条消息(第二个参数)。

这里还是要注意上面所说的,生产者和消费者的通道名要相同。

至此,消息队列的生产者和消费者已经全部编写完成。

八、遇到的问题及解决办法

1、spring boot使用log4j2日志框架问题

在我添加了spring-boot-starter-log4j2依赖并在spring-boot-starter-web中排除了spring-boot-starter-logging后,运行项目,还是会提示下面的错误:

SLF4J: Class path contains multiple SLF4J bindings.SLF4J: Found binding in [jar:file:.....m2/repository/ch/qos/logback/logback-classic/1.2.3/logback-classic-1.2.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]SLF4J: Found binding in [jar:file:.....m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.12.1/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.SLF4J: Actual binding is of type [ch.qos.logback.classic.util.ContextSelectorStaticBinder]

这个错误就是maven中有多个日志框架导致的。后来通过依赖分析,发现在spring-boot-starter-data-redis中,也依赖了spring-boot-starter-logging,解决办法也很简单,下面贴出详细代码

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId><exclusions><exclusion><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-logging</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><exclusions><exclusion><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-logging</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-log4j2</artifactId></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-redis</artifactId></dependency>

2、redis队列监听器线程安全问题

redis队列监听器的监听机制是:使用一个线程监听队列,队列有未消费的消息则取出消息并生成一个新的线程来消费消息。如果你还记得,我开头说的是由于redis单线程特性,因此我们用它来做消息队列,但是如果监听器每次接受一个消息就生成新的线程来消费信息的话,这样就完全没有使用到redis的单线程特性,同时还会产生线程安全问题。

单一消费者(一个通道只有一个消费者)的解决办法

最简单的办法莫过于为onMessage()方法加锁,这样简单粗暴却很有用,不过这种方式无法控制队列监听的速率,且无限制的创造线程最终会导致系统资源被占光。

那如何解决这种情况呢?线程池。

在将监听器添加到容器的配置的时候,RedisMessageListenerContainer类中有一个方法setTaskExecutor(Executor taskExecutor)可以为监听容器配置线程池。配置线程池以后,所有的线程都会由该线程池产生,由此,我们可以通过调节线程池来控制队列监听的速率。

多个消费者(一个通道有多个消费者)的解决办法

单一消费者的问题相比于多个消费者来说还是较为简单,因为Java内置的锁都是只能控制自己程序的运行,不能干扰其他的程序的运行;然而现在很多时候我们都是在分布式环境下进行开发,这时处理多个消费者的情况就很有意义了。

那么这种问题如何解决呢?分布式锁。

下面来简要科普一下什么是分布式锁:

分布式锁是指在分布式环境下,同一时间只有一个客户端能够从某个共享环境中(例如redis)获取到锁,只有获取到锁的客户端才能执行程序。

然后分布式锁一般要满足:排他性(即同一时间只有一个客户端能够获取到锁)、避免死锁(即超时后自动释放)、高可用(即获取或释放锁的机制必须高可用且性能佳)

上面讲依赖的时候,我们导入了一个spring-integration-redis依赖,这个依赖里面包含了很多实用的工具类,而我们接下来要讲的分布式锁就是这个依赖下面的一个工具包RedisLockRegistry。

首先讲一下如何使用,导入了依赖以后,首先配置一个Bean

@Beanpublic RedisLockRegistry redisLockRegistry(RedisConnectionFactory factory) {return new RedisLockRegistry(factory, "demo-lock",60);}

RedisLockRegistry的构造函数,第一个参数是redis连接池,第二个参数是锁的前缀,即取出的锁,键名为“demo-lock:KEY_NAME”,第三个参数为锁的过期时间(秒),默认为60秒,当持有锁超过该时间后自动过期。

使用锁的方法,下面是对监听器的修改

@Componentpublic class RedisListener implement MessageListener{@Autowriteprivate RedisLockRegistry redisLockRegistry;private static final Logger LOGGER = LoggerFactory.getLogger(RedisListener.class);@Overridepublic void onMessage(Message message,byte[] pattern){Lock lock=redisLockRegistry.obtain("lock");try{lock.lock(); //上锁LOGGER.debug("从消息通道={}监听到消息",new String(pattern));LOGGER.debug("从消息通道={}监听到消息",new String(message.getChannel()));LOGGER.debug("元消息={}",new String(message.getBody()));// 新建一个用于反序列化的对象,注意这里的对象要和前面配置的一样// 因为我前面设置的默认序列化方式为GenericJackson2JsonRedisSerializer// 所以这里的实现方式为GenericJackson2JsonRedisSerializerRedisSerializer serializer=new GenericJackson2JsonRedisSerializer();LOGGER.debug("反序列化后的消息={}",serializer.deserialize(message.getBody()));} catch (Exception e) {e.printStackTrace();} finally {lock.unlock(); //解锁}}}

上面代码的代码比起前面的监听器代码,只是多了一个注入的RedisLockRegistry,一个通过redisLockRegistry.obtain()方法获取锁,一个加锁一个解锁,然后这就完成了分布式锁的使用。

注意这个获取锁的方法redisLockRegistry.obtain(),其返回的是一个名为RedisLock的锁,这是一个私有内部类,它实现了Lock接口,因此我们不能从代码外部创建一个他的实例,只能通过obtian()方法来获取这个锁。

以上就是本文的全部内容。


近期精选:
1. 144页!分享珍藏已久的数据库技术年刊
2. 千人线上直播分享:从Oracle DBA出发,走进GaussDB的世界


点击图片了解更多 ↓

云和恩墨大讲堂 | 一个分享交流的地方

长按,识别二维码,加入万人交流社群

请备注:云和恩墨大讲堂

  点个“在看” 
你的喜欢会被看到????

面试官竟让我用Redis实现一个消息队列!相关推荐

  1. 什么鬼,面试官竟然让我用Redis实现一个消息队列!!?

    GitHub 9.4k Star 的Java工程师成神之路 ,不来了解一下吗? GitHub 9.4k Star 的Java工程师成神之路 ,真的不来了解一下吗? GitHub 9.4k Star 的 ...

  2. 使用NODEJS+REDIS开发一个消息队列以及定时任务处理

    作者:RobanLee 原创文章,转载请注明: 萝卜李 http://www.robanlee.com 源码在这里: https://github.com/robanlee123/RobCron 时间 ...

  3. 面试官最爱问的Redis(三)Redis的基本知识

    面试官最爱问的redis,继续整理了Redis的学习笔记,动力节点的redis视频,13个小时搞定redis,笔记分享给大家. 视频资源:https://www.bilibili.com/video/ ...

  4. 【大厂面试】面试官看了赞不绝口的Redis笔记

    文章目录 一.Redis简介 二.Redis API的使用和理解 (一)通用命令 (二)单线程架构 (三)数据结构和内部编码 (四)字符串 (五)hash (字典) (六)列表 (七)Set集合 (八 ...

  5. 【大厂面试】面试官看了赞不绝口的Redis笔记(二)

    文章目录 说明 四.Redis的其他功能 (一)慢查询 (二)pipeline (三)发布订阅 (四)Bitmap (五)HyperLogLog (六)GEO 五.Redis持久化的取舍和选择 (一) ...

  6. 面试官不讲码德,欺负我一个年轻的开发工程师

    面试官不讲码德,欺负我一个年轻的开发工程师,问如果是你怎么设计RPC? RPC也不是很难啊,教你如何使用socket加动态代理与反射实现Rpc 先来解释解释一下rpc,首先很多人以为rpc是一种协议, ...

  7. 2.6_10 Redis订阅发布(消息队列)

    相关链接 Excel目录 redis官网 redis中文网 1. Redis发布订阅(消息队列) 替代产品:ActiveMQ,RabiitMQ,Kafka 订阅/发布消息图:第一个(消息发送者),第二 ...

  8. redis延迟消息队列不准时php,Redis实现延迟消息队列

    消息队列是应用中常用的一个技术点,通常我们可以借助消息队列中间件来实现,但是并不是所有的情况下,都需要使用到MQ. 如果只需要实现简单的消息队列,那么借助Redis即可. 如果对消息有着严格的可靠性等 ...

  9. 使用 Redis Stream 实现消息队列

    使用 Redis Stream 实现消息队列 Intro Redis 5.0 中增加了 Stream 的支持,利用 Stream 我们可以实现可靠的消息队列,并且支持一个消息被多个消费者所消费,可以很 ...

最新文章

  1. python内置库之学习ctypes库(一)
  2. Oracle监听配置
  3. 【Android】隐藏底部虚拟按键
  4. Stream中toMap引发NullPointerException____Stream的执行流程
  5. 二叉树递归非递归遍历,层次遍历,反转,输出路径等常见操作详细总结
  6. PHP trim()的使用
  7. 又一个PS2汉化入门
  8. 2022年计算机二级Java语言程序设计练习题及答案
  9. vue项目安装axios
  10. 模拟电子技术 PN结的形成与工作原理 个人笔记
  11. 读《产品经理面试宝典》
  12. cesium之深圳区域行政图
  13. 面包屑导航 java_jquery 面包屑导航 具体实现
  14. android 柱状图绘制,安卓MPAndroidChart绘制柱状图
  15. asp实现注册登录界面_asp.net 实现用户登录和注册——基于webform模式
  16. php免费问答源码,whatsns问答系统PHP免费源码 v4.1
  17. UML图解简单工厂模式工厂方法模式抽象工厂模式区别
  18. 彻底掌握 Javascript(八)正则表达式【讲师辅导】-曾亮-专题视频课程
  19. java socket 加密,Java socket通信实现DES加密与解密
  20. 阿里巴巴校招笔试题型攻略

热门文章

  1. 那一种笔记软件更好用_制作更好的面向用户软件的7种方法
  2. 北京创客空间_世界上最大的创客空间,可增强开放安全性等
  3. 云中台技术架构_为什么开放基础架构在云中很重要
  4. (23)Vue.js组件介绍
  5. (17)css3新增背景属性
  6. Flex弹性布局_思维导图
  7. HTTP协商缓存与HTTP强缓存
  8. es6 Class简介
  9. main函数的类型定义
  10. c语言用于提示的指令,C语言指令、符号表.doc