• 项目开发过程中,遇到需要发消息的情况,是不是脑海里不自主的浮现kafka、rabbitmq等常用的消息队列?但如果消息非常简单,并且用量也不大,消息队列就会有点大材小用了吧,忽然想起了redis 也有消息队列的功能,只不过我们经常把redis 用作缓存(这个是redis最大的卖点),忽略了它的辅助技能,今天我就简单讲解一下 redis 的发布订阅模式如何使用。
  • 发布者和订阅者都是Redis客户端,Channel则为Redis服务器端,发布者将消息发送到某个频道,订阅了这个频道的订阅者就能接收到这条消息。Redis的这种发布订阅机制与基于主题的发布订阅类似,Channel相当于主题。
  • 方法的选择
    操作 redis 集群的方法一般有两种,一种是 redis 官方推荐的 JedisCluster,另一种是RedisTemplate,这两种客户端工具均能实现redis 的发布订阅模式,下面我会逐一讲解。
  • maven依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
  • Redsi 集群连接配置
    1、在application.yml 文件添加属性
spring:redis-config:max-total: 50max-idle: 10min-idle: 5max-wait-millis: 6000test-on-borrow: truetest-on-return: falsetest-while-idle: falsemax-redirects: 10connection-timeout: 1000so-timeout: 1000address:- 192.168.xx.xxx:7360- 192.168.yy.yyy:7360- 192.168.zz.zzz:7360

2、注入config 文件

package com.jianmin.config;import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisClusterConfiguration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.RedisNode;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.JdkSerializationRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPoolConfig;import java.util.HashSet;
import java.util.List;
import java.util.Set;/*** @Author: jianmin.li* @Description: redis配置* @Date: 2019/5/28 14:47* @Version: 1.0*/
@Configuration
@Slf4j
@Data
@ConfigurationProperties(prefix = "spring.redis-config")
public class RedisConfig {private List<String> address;private Integer connectionTimeout;private Integer soTimeout;private Integer maxRedirects;@ConfigurationProperties(prefix = "spring.redis-config")@Beanpublic JedisPoolConfig jedisPoolConfig() {return new JedisPoolConfig();}@Beanpublic JedisCluster jedisCluster(JedisPoolConfig poolConfig) {Set<HostAndPort> hostAndPorts = new HashSet<>(address.size());for (String ipAndPort : address) {String[] arr = ipAndPort.split(":");hostAndPorts.add(new HostAndPort(arr[0],Integer.parseInt(arr[1])));log.warn("JedisCluster init Redis Address--->{}:{}",arr[0].trim(),arr[1]);}return new JedisCluster(hostAndPorts,connectionTimeout,soTimeout,maxRedirects,poolConfig);}@Beanpublic RedisClusterConfiguration redisClusterConfiguration() {RedisClusterConfiguration redisClusterConfiguration = new RedisClusterConfiguration();Set<RedisNode> nodes = new HashSet<>(address.size());for (String ipPort : address) {String[] ipAndPort = ipPort.split(":");nodes.add(new RedisNode(ipAndPort[0].trim(),Integer.valueOf(ipAndPort[1])));log.warn("RedisTemplate init Redis Address--->{}:{}",ipAndPort[0].trim(),ipAndPort[1]);}redisClusterConfiguration.setClusterNodes(nodes);redisClusterConfiguration.setMaxRedirects(maxRedirects);return redisClusterConfiguration;}@Beanpublic JedisConnectionFactory jedisConnectionFactory(JedisPoolConfig jedisPoolConfig,RedisClusterConfigurationredisClusterConfiguration) {JedisConnectionFactory jedisConnectionFactory = new JedisConnectionFactory(redisClusterConfiguration,jedisPoolConfig);jedisConnectionFactory.afterPropertiesSet();return jedisConnectionFactory;}@Beanpublic RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();redisTemplate.setConnectionFactory(redisConnectionFactory);redisTemplate.setKeySerializer(new StringRedisSerializer());redisTemplate.setValueSerializer(new JdkSerializationRedisSerializer());redisTemplate.setEnableTransactionSupport(true);redisTemplate.setHashKeySerializer(new StringRedisSerializer());redisTemplate.setHashValueSerializer(new Jackson2JsonRedisSerializer<>(Object.class));redisTemplate.afterPropertiesSet();return redisTemplate;}@BeanRedisMessageListenerContainer redisMessageListenerContainer(MessageListenerAdapter listenerAdapter,JedisConnectionFactory jedisConnectionFactory) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(jedisConnectionFactory);container.addMessageListener(listenerAdapter,Arrays.asList(new PatternTopic("channel-lijianmin-redistemplate")));return container;}
}
  • 使用方法
    1、JedisCluster 实现redis 的发布订阅
    <1>发送消息
@CrossOrigin
@RestController
@RequestMapping("/demo")
public class Demo {@Autowiredprivate JedisCluster jedisCluster;/*** 发送消息非常简单,直接使用 JedisCluster 的 publish 方法即可。发送消息成* 功,会返回一个长整型数值0L。** @param* @return : void* @Author: jianmin.li* @Date: 2019/10/25 13:38*/@GetMapping("/jedisCluster")public void method() {Long publish = jedisCluster.publish("channel-lijianmin","这是来自JedisCluster发布者的消息");System.err.println(publish);}
}

<2>监听消息
要想让redis 在web 容器开启时就一直处于订阅状态,考虑通过listener 实现: 自定义MyListener,实现ServletContextListener,开启异步线程去监听redis 的channel,因为JedisCluster 的发布和订阅是阻塞的,如果用同步监听,那么你的项目就起不来了,一直阻塞在JedisCluster 初始化的地方,所以这里必须用异步线程去监听redsi 的channel。

package com.jianmin.util;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import redis.clients.jedis.JedisCluster;import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;
import javax.servlet.annotation.WebListener;/*** @Author: jianmin.li* @Description: 消息监听* @Date: 2019/5/30 12:55* @Version: 1.0*/
@Configuration
@WebListener
public class MyListener implements ServletContextListener {@Autowiredprivate MySubscribe jedisPubSub;@Autowiredprivate JedisCluster jedisCluster;@Overridepublic void contextInitialized(ServletContextEvent sce) {new Thread(() -> jedisCluster.subscribe(jedisPubSub,"channel-lijianmin")).start();}@Overridepublic void contextDestroyed(ServletContextEvent sce) {System.out.println("ServletContext容器销毁了。。。");}
}

自定义 MySubscribe 并继承 JedisPubSub,实现对 redis 通道的监听,重写
onMessage 方法,该方法可以获取通道的名称以及监听到的消息,具体处理消
息的逻辑就写在这里。

package com.jianmin.util;import org.springframework.stereotype.Component;
import redis.clients.jedis.JedisPubSub;/*** @Author: jianmin.li* @Description: 监听redis的通道* @Date: 2019/5/30 12:51* @Version: 1.0*/
@Component
public class MySubscribe extends JedisPubSub {/*** 监听到的消息在这里进行业务处理** @param channel* @param message* @return : void* @Author: jianmin.li* @Date: 2019/10/25 13:46*/@Overridepublic void onMessage(String channel,String message) {System.err.println("redis通道" + channel + "监听到的消息:" + message);}
}

<3>启动项目,触发接口

redis发布订阅模式--jedisCluster客户端

2、RedisTemplate 实现redis 的发布订阅
<1>发送消息

@CrossOrigin
@RestController
@RequestMapping("/demo")
public class Demo {@Autowiredprivate RedisTemplate<String, Object> redisTemplate;/*** 使用 RedisTemplate 发送消息也非常简单,* 直接调用 convertAndSend 方法即可。** @param* @return : void* @Author: jianmin.li* @Date: 2019/10/25 14:10*/@GetMapping("/redisTemplate")public void send() {redisTemplate.convertAndSend("channel-lijianmin-redistemplate","这是来自RedisTemplate发布者的消息");}
}

<2>监听消息
自定义RedisSubscriber 并继承MessageListenerAdapter,重写onMessage 方
法,该方法可以获取通道的名称以及监听到的消息,具体处理消息的逻辑就写
在这里。

package com.jianmin.util;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.stereotype.Component;/*** @Author: jianmin.li* @Description: 监听redis通道* @Date: 2019/5/30 15:37* @Version: 1.0*/
@Component
public class RedisSubscriber extends MessageListenerAdapter {@Autowiredprivate RedisTemplate<String, Object> redisTemplate;@Overridepublic void onMessage(Message message,byte[] pattern) {System.err.println("通道----->" + redisTemplate.getKeySerializer().deserialize(message.getChannel()));System.err.println("消息----->" + redisTemplate.getValueSerializer().deserialize(message.getBody()));}
}

<3>在上面的RedisConfig 里面注入redis 消息监听容器 (上面代码已经注入)

@BeanRedisMessageListenerContainer redisMessageListenerContainer(MessageListenerAdapter listenerAdapter,JedisConnectionFactory jedisConnectionFactory) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(jedisConnectionFactory);container.addMessageListener(listenerAdapter,Arrays.asList(new PatternTopic("channel-lijianmin-redistemplate")));return container;}

<3>启动项目,触发接口

redis发布订阅模式--redisTemplate客户端

  • 总结:
    1、 redis 的发布订阅模式,监听了同一个通道的监听者都能收到相同的消息,在集群模式下,多个相同的监听者进行相同的信息消费,容易产生表单重复提交的问题,在实际开发中需要注意,建议采取分布式锁;
    2、 JedisCluster 的发布订阅,在容器启动时要异步去监听redis 通道;
    3、 RedisTemplate 的发布订阅,在监听到消息时,建议用redisTemplate 的KeySerializer反序列化通道,用ValueSerializer 反序列化消息,不建议用new String()反序列化,因为redisTemplate 在初始化时如果指定了相关的序列化转换器,在序列化和反序列化时就必须使用相同的序列化转换器,否则容易出现反序列化异常或乱码。

redis集群的发布订阅模式相关推荐

  1. redis集群的几种模式

    redis集群的几种模式 主从模式 哨兵模式 Cluster集群模式(推荐) 三种模式都有搭建成功,相比之下,个人还是推荐Cluster集群 主从模式 主从模式(Master-Slave Replic ...

  2. redis集群之主从复制+哨兵模式

    文章目录 一.redis集群之主从复制 1.1.集群介绍 1.2.搭建过程 1.3.验证 二.redis集群之哨兵模式 2.1.部署过程 1.搭建主从复制集群 2.设置哨兵模式的配置文件 3.启动哨兵 ...

  3. redis集群的三种模式

    通过持久化功能,Redis保证了即使在服务器重启的情况下也不会丢失(或少量丢失)数据,因为持久化会把内存中数据保存到硬盘上,重启会从硬盘上加载数据.但是由于数据是存储在一台服务器上的,如果这台服务器出 ...

  4. Redis 进阶篇:发布订阅模式原理与运用

    Redis 通过 SUBSCRIBE,UNSUBSCRIBE和 PUBLISH 实现发布订阅消息传递模式,Redis 提供了两种模式实现,分别是「发布 / 订阅到频道」和「发布 \ 订阅到模式」. [ ...

  5. Redis进阶篇:发布订阅模式原理与运用

    目录 Redis 发布订阅简介 Pub/Sub 实战 通过频道(Channel)实现 通过模式(Pattern)实现 订阅模式 Redisson 与 SpringBoot 实战 原理分析 频道(Cha ...

  6. 搭建高可用的redis集群,避免standalone模式带给你的苦难

    现在项目上用redis的话,很少说不用集群的情况,毕竟如果生产上只有一台redis会有极大的风险,比如机器挂掉,或者内存爆掉,就比如我们生产环境 曾今也遭遇到这种情况,导致redis内存不够挂掉的情况 ...

  7. Redis集群——去中心化模式

    去中心化模式特点 之前介绍的主从模式和哨兵模式都只有一个master主节点,如果写操作并发比较大时,这两个模式就会堵塞.这时就可以使用集群化模式,也称之为去中心化模式,其特点是多master和多sla ...

  8. redis之mq实现发布订阅模式

    https://github.com/smltq/spring-boot-demo/blob/master/mq-redis 概述 Redis不仅可作为缓存服务器,还可用作消息队列,本示例演示如何使用 ...

  9. redis集群之哨兵模式【原】

    redis集群之哨兵(sentinel)模式 哨兵模式理想状态 需要>=3个redis服务,>=3个redis哨兵,每个redis服务搭配一个哨兵. 本例以3个redis服务为例: 一开始 ...

最新文章

  1. JavaScript 高级技巧
  2. 我的微信'智障聊天助手'的设计思路
  3. 计算机未来的储存装置,图说计算机存储设备
  4. 参数估计_状态估计的基本概念(1)参数估计问题
  5. 未发现数据源名称并且未指定默认驱动程序_看我如何发现NVIDIA GeForce Experience代码执行漏洞...
  6. cpp map 获取所有 key_uniapp 利用map标签 开发地图定位和搜索关键字查询功能
  7. java实现验证码生成工具类
  8. TCP滑动窗口协议作用
  9. 网络爬虫相关软件以及论文检索与推荐网站调研
  10. DSP之TMS320F28335学习总结与笔记(二)————ADC模块
  11. 我的世界刷猪人塔java版_我的世界速攻猪人塔详解 史上最牛的经验塔
  12. ISCC 2018 PWN WriteUp
  13. 淘宝/天猫获取淘宝直播分类id接口 API 返回值说明
  14. TCP协议为什么需要三次握手?
  15. 从2-3树谈到左倾红黑树
  16. 2020.08.14日常总结——Trie树的实际应用
  17. 案例研究 | 运用设计冲刺,解决俄罗斯家庭暴力问题
  18. 4p、4c、4R营销理论概要
  19. 【数据分析/商业分析】数据分析中使用的商业模型(学习总结)
  20. 怎样用迅捷画图绘制高端大气的思维导图

热门文章

  1. frp连接Linux客户端
  2. 老虎证券开放api期货合约的创建
  3. 千里送人头  ——APIO2017 游记
  4. Python竞猜商品价格
  5. 来看看这位年轻的 eBay 小伙是如何成为 Committer
  6. 低代码平台,企业服务新战场
  7. MyCat简单安装及应用(linux
  8. 一个极端的前端国际化方法
  9. VideoView 无法播放此视频
  10. 为何“爱辞职”成为了90后的又一标签?