目录

一、 Redis Cluster模式简介

二、  Redis Cluster命令

2.1、cluster info

​​​​​​​​​​​​​​2.2、cluster nodes

​​​​​​​2.3、client list

2.4、cluster slots 

2.5、cluster keyslot

三、请求重定向

四、Lettuce使用

五、Lettuce相关源码


一、 Redis Cluster模式简介

redis集群并没有使用一致性hash算法而引入了哈希槽概念,Redis 集群有16384个哈希槽,每个key通过CRC16校验后对16384取模来决定放置哪个槽.集群的每个节点负责一部分hash槽.也就是说如果key是不变的对应的slot也是不变的

二、  Redis Cluster命令

2.1、cluster info

可以通过cluster info 命令查看集群信息

cluster info
cluster_state:ok
cluster_slots_assigned:16384
cluster_slots_ok:16384
cluster_slots_pfail:0
cluster_slots_fail:0
cluster_known_nodes:12

​​​​​​​​​​​​​​2.2、cluster nodes 

通过cluster nodes命令查看当前节点以及该节点分配的slot,如下图可以发现当前redis集群有12个节点,每个节点大约管理1365个slot

xx.xxx.xxx.xx:6959> cluster nodes 45abb8663c0cdb25ed17c29521bf6fda98e913ea xx.xxx.xxx.xx:6961 master - 0 1529229636724 11 connected 13653-15018 e40080f32a3fb89e34b7622038ce490682428fdf
xx.xxx.xxx.xx:6960 master - 0 1529229633723 10 connected 12288-13652 a749bba5614680dea9f47e3c8fe595aa8be71a2c
xx.xxx.xxx.xx:6954 master - 0 1529229639230 4 connected 4096-5460 1096e2a8737401b66c7d4ee0addcb10d7ff14088
xx.xxx.xxx.xx:6952 master - 0 1529229636224 2 connected 1365-2730 fbc76f3481271241c1a89fabeb5139905e1ec2a6
xx.xxx.xxx.xx:6962 master - 0 1529229638230 12 connected 15019-16383 85601fa67820a5af0de0cc21d102d72575709ec6
xx.xxx.xxx.xx:6959 myself,master - 0 0 9 connected 10923-12287 c00d86999c98f97d697f3a2b33ba26fbf50e46eb
xx.xxx.xxx.xx:6955 master - 0 1529229634724 5 connected 5461-6826 0b09a5c4c9e9158520389dd2672bd711d55085c6
xx.xxx.xxx.xx:6953 master - 0 1529229637227 3 connected 2731-4095 9f26d208fa8772449d5c322eb63786a1cf9937e0
xx.xxx.xxx.xx:6958 master - 0 1529229635224 8 connected 9557-10922 274294a88758fcb674e1a0292db0e36a66a0bf48
xx.xxx.xxx.xx:6951 master - 0 1529229634223 1 connected 0-1364 369780bdf56d483a0f0a92cb2baab786844051f3
xx.xxx.xxx.xx:6957 master - 0 1529229640232 7 connected 8192-9556 71ed0215356c664cc56d4579684e86a83dba3a92
xx.xxx.xxx.xx:6956 master - 0 1529229635724 6 connected 6827-8191

​​​​​​​2.3、client list

Redis Client List 命令用于返回所有连接到服务器的客户端信息和统计数据。

redis 127.0.0.1:6379> CLIENT LIST
addr=127.0.0.1:43143 fd=6 age=183 idle=0 flags=N db=0 sub=0 psub=0 multi=-1 qbuf=0 qbuf-free=32768 obl=0 oll=0 omem=0 events=r cmd=client
addr=127.0.0.1:43163 fd=5 age=35 idle=15 flags=N db=0 sub=0 psub=0 multi=-1 qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=ping
addr=127.0.0.1:43167 fd=7 age=24 idle=6 flags=N db=0 sub=0 psub=0 multi=-1 qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=get

2.4、cluster slots 

  Redis Client Slots 命令用于当前的集群状态

redis 127.0.0.1:6379> cluster slots
1) 1) (integer) 02) (integer) 40953) 1) "127.0.0.1"2) (integer) 70004) 1) "127.0.0.1"2) (integer) 7004
2) 1) (integer) 122882) (integer) 163833) 1) "127.0.0.1"2) (integer) 70034) 1) "127.0.0.1"2) (integer) 7007
3) 1) (integer) 40962) (integer) 81913) 1) "127.0.0.1"2) (integer) 70014) 1) "127.0.0.1"2) (integer) 7005
4) 1) (integer) 81922) (integer) 122873) 1) "127.0.0.1"2) (integer) 70024) 1) "127.0.0.1"2) (integer) 7006

2.5、cluster keyslot

cluster keyslot key  返回一个整数,用于标识指定键所散列到的哈希槽

cluster keyslot test
(integer) 6918

三、请求重定向

由于每个节点只负责部分slot,以及slot可能从一个节点迁移到另一节点,造成客户端有可能会向错误的节点发起请求。因此需要有一种机制来对其进行发现和修正,这就是请求重定向。有两种不同的重定向场景:

  • MOVED

声明的是slot所有权的转移,收到的客户端需要更新其key-node映射关系

  • ASK

申明的是一种临时的状态.在重新进行分片期间,源节点向目标节点迁移一个slot过程中,可能会出现这样一种情况:属于被迁移slot的一部分键值对保存在源节点里面,一部分保存在目标节点里面.当客户端向源节点发送一个与键有关的命令,并且这个键企恰好被迁移到目标节点,则向客户端返回一个ASK错误.因为这个节点还在处于迁移过程中,所有权还没有转移,所以客户端在接收到ASK错误后,需要在目标节点执行命令前,先发送一个ASKING命令,如果不发放该命令到话,则会返回MOVED错误,ASKING表示已经知道迁移状态,则会执行该命令.

通过集群查询数据key为test的值 redis-cli为单机模式;如果为集群模式时(redis-cli -c) 接收到MOVED 错误时是不会打印MOVED错误,而是根据MOVED信息自动重定向到正确节点,并打印出重定向信息

xx.xxx.xxx.xx:6959> get test
(error) MOVED 6918 xx.xxx.xx.xxx:6956  

  此时返回的结果表示该key在6956这个实例上,通过这个实例可以获取到缓存值

xx.xxx.xx.xxx:6956> get test
"cluster"

  通过上文的示例可以发现获取缓存值的过程需要访问cluster两次,既然key到slot值的算法是已知的,如果可以通过key直接计算slot,在通过每个节点的管理的slot范围就可以知道这个key对应哪个节点了,这样不就可以一次获取到了吗?其实lettuce中就是这样处理的.下文会有详细介绍

    如果mget操作值跨slot时会怎样呢? 

mget test test1
(error) CROSSSLOT Keys in request don't hash to the same slot

四、Lettuce使用

    @Bean(name="clusterRedisURI")RedisURI clusterRedisURI(){return RedisURI.builder().withHost("xx.xx.xxx.xx").withPort(6954).build();}//配置集群选项,自动重连,最多重定型1次@BeanClusterClientOptions clusterClientOptions(){return ClusterClientOptions.builder().autoReconnect(true).maxRedirects(1).build();}//创建集群客户端@BeanRedisClusterClient redisClusterClient(ClientResources clientResources, ClusterClientOptions clusterClientOptions, RedisURI clusterRedisURI){RedisClusterClient redisClusterClient= RedisClusterClient.create(clientResources,clusterRedisURI);redisClusterClient.setOptions(clusterClientOptions);return redisClusterClient;}/*** 集群连接*/@Bean(destroyMethod = "close")StatefulRedisClusterConnection<String,String> statefulRedisClusterConnection(RedisClusterClient redisClusterClient){return  redisClusterClient.connect();}

  Lettuce在Spring 中的使用通过上文中的配置方式进行配置后就可以使用了

  1. 通过StatefulRedisClusterConnection获取命令处理方式,同步,异步以及响应式
  2. 执行redis相关命令

五、Lettuce相关源码

lettuce的使用方式还是很简单的那么它的处理过程到底是怎样的呢?下面将通过源码进行解析.

通过上文可以知道连接是通过RedisClusterClient创建的,它默认使用了StringCodec(LettuceCharsets.UTF8)作为编码器创建连接

 public StatefulRedisClusterConnection<String, String> connect() {return connect(newStringStringCodec());}

在创建连接时就会主动发现集群拓扑信息,在第一次创建的时候partitions一定为null则此时需要初始化分区信息

  <K, V> StatefulRedisClusterConnectionImpl<K, V> connectClusterImpl(RedisCodec<K, V> codec) {//如果分区信息为null则初始化分区信息if (partitions == null) {initializePartitions();}//如果需要就激活拓扑刷新activateTopologyRefreshIfNeeded();

 初始化集群分片信息,就是将加载分片信息赋值给partitions属性 

 protected void initializePartitions() {this.partitions = loadPartitions();}

  具体加载分片信息处理过程如下:

  protected Partitions loadPartitions() {//获取拓扑刷新信息,Iterable<RedisURI> topologyRefreshSource = getTopologyRefreshSource();String message = "Cannot retrieve initial cluster partitions from initial URIs " + topologyRefreshSource;try {//加载拓扑信息Map<RedisURI, Partitions> partitions = refresh.loadViews(topologyRefreshSource, useDynamicRefreshSources());

第一次可以知道partitions为null则此时需要初始化种子节点的,那么它的种子节点又是什么呢?通过代码可以发现种子节点就是初始化的URI,那么它又是什么时候设置的呢?

protected Iterable<RedisURI> getTopologyRefreshSource() {//是否初始化种子节点boolean initialSeedNodes = !useDynamicRefreshSources();Iterable<RedisURI> seed;//如果需要初始化种子节点或分区信息为null或分区信息为空 则将初始URI赋值给种子if (initialSeedNodes || partitions == null || partitions.isEmpty()) {seed = RedisClusterClient.this.initialUris;} else {//不需要初始化种子节点List<RedisURI> uris = new ArrayList<>();for (RedisClusterNode partition : TopologyComparators.sortByUri(partitions)) {uris.add(partition.getUri());}seed = uris;}return seed;}

  通过如下代码可以发现种子节点是在创建redisClusterClient的时候指定的

 protected RedisClusterClient(ClientResources clientResources, Iterable<RedisURI> redisURIs) {super(clientResources);assertNotEmpty(redisURIs);assertSameOptions(redisURIs);//初始化节点this.initialUris = Collections.unmodifiableList(LettuceLists.newList(redisURIs));//根据第一个URI的超时时间作为默认超时时间setDefaultTimeout(getFirstUri().getTimeout());setOptions(ClusterClientOptions.builder().build());}

  默认使用动态刷新

 protected boolean useDynamicRefreshSources() {//如果集群客户端选项不为nullif (getClusterClientOptions() != null) {//获取集群拓扑刷新选项ClusterTopologyRefreshOptions topologyRefreshOptions = getClusterClientOptions().getTopologyRefreshOptions();//返回集群拓扑刷新选项中配置到是否使用动态刷新return topologyRefreshOptions.useDynamicRefreshSources();}//默认动态刷新return true;}

  下面看看加载分区信息的处理过程,第一次则根据种子节点的连接获取整个集群的拓扑信息

 public Map<RedisURI, Partitions> loadViews(Iterable<RedisURI> seed, boolean discovery) {//获取超时时间,默认60秒long commandTimeoutNs = getCommandTimeoutNs(seed);Connections connections = null;try {//获取所有种子连接connections = getConnections(seed).get(commandTimeoutNs, TimeUnit.NANOSECONDS);//Requests将异步执行命令封装到多个节点//cluster nodesRequests requestedTopology = connections.requestTopology();//client listRequests requestedClients = connections.requestClients();//获取节点拓扑视图NodeTopologyViews nodeSpecificViews = getNodeSpecificViews(requestedTopology, requestedClients, commandTimeoutNs);if (discovery) {//是否查找额外节点//获取集群节点Set<RedisURI> allKnownUris = nodeSpecificViews.getClusterNodes();//排除种子节点,得到需要发现节点Set<RedisURI> discoveredNodes = difference(allKnownUris, toSet(seed));//如果需要发现节点不为空if (!discoveredNodes.isEmpty()) {//需要发现节点连接Connections discoveredConnections = getConnections(discoveredNodes).optionalGet(commandTimeoutNs,TimeUnit.NANOSECONDS);//合并连接connections = connections.mergeWith(discoveredConnections);//合并请求requestedTopology = requestedTopology.mergeWith(discoveredConnections.requestTopology());requestedClients = requestedClients.mergeWith(discoveredConnections.requestClients());//获取节点视图nodeSpecificViews = getNodeSpecificViews(requestedTopology, requestedClients, commandTimeoutNs);//返回uri对应分区信息return nodeSpecificViews.toMap();}}return nodeSpecificViews.toMap();} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new RedisCommandInterruptedException(e);} finally {if (connections != null) {connections.close();}}}

这样在创建connection的时候就已经知道集群中的所有有效节点.根据之前的文章可以知道对于集群命令的处理是在ClusterDistributionChannelWriter中处理的.其中有一些信息在初始化writer的时候就初始化了

class ClusterDistributionChannelWriter implements RedisChannelWriter {//默认写入器private final RedisChannelWriter defaultWriter;//集群事件监听器private final ClusterEventListener clusterEventListener;private final int executionLimit;//集群连接提供器private ClusterConnectionProvider clusterConnectionProvider;//异步集群连接提供器private AsyncClusterConnectionProvider asyncClusterConnectionProvider;//是否关闭private boolean closed = false;//分区信息private volatile Partitions partitions;

  写命令的处理如下,会根据key计算出slot,进而找到这个slot对应的node,直接访问这个node,这样可以有效减少访问cluster次数

public <K, V, T> RedisCommand<K, V, T> write(RedisCommand<K, V, T> command) {LettuceAssert.notNull(command, "Command must not be null");//如果连接已经关闭则抛出异常if (closed) {throw new RedisException("Connection is closed");}//如果是集群命令且命令没有处理完毕if (command instanceof ClusterCommand && !command.isDone()) {//类型转换, 转换为ClusterCommandClusterCommand<K, V, T> clusterCommand = (ClusterCommand<K, V, T>) command;if (clusterCommand.isMoved() || clusterCommand.isAsk()) {HostAndPort target;boolean asking;//如果集群命令已经迁移,此时通过ClusterCommand中到重试操作进行到此if (clusterCommand.isMoved()) {//获取命令迁移目标节点target = getMoveTarget(clusterCommand.getError());//触发迁移事件clusterEventListener.onMovedRedirection();asking = false;} else {//如果是asktarget = getAskTarget(clusterCommand.getError());asking = true;clusterEventListener.onAskRedirection();}command.getOutput().setError((String) null);//连接迁移后的目标节点CompletableFuture<StatefulRedisConnection<K, V>> connectFuture = asyncClusterConnectionProvider.getConnectionAsync(ClusterConnectionProvider.Intent.WRITE, target.getHostText(), target.getPort());//成功建立连接,则向该节点发送命令if (isSuccessfullyCompleted(connectFuture)) {writeCommand(command, asking, connectFuture.join(), null);} else {connectFuture.whenComplete((connection, throwable) -> writeCommand(command, asking, connection, throwable));}return command;}}//不是集群命令就是RedisCommand,第一个请求命令就是非ClusterCommand//将当前命令包装为集群命令ClusterCommand<K, V, T> commandToSend = getCommandToSend(command);//获取命令参数CommandArgs<K, V> args = command.getArgs();//排除集群路由的cluster命令if (args != null && !CommandType.CLIENT.equals(commandToSend.getType())) {//获取第一个编码后的keyByteBuffer encodedKey = args.getFirstEncodedKey();//如果encodedKey不为nullif (encodedKey != null) {//获取slot值int hash = getSlot(encodedKey);//根据命令类型获取命令意图 是读还是写ClusterConnectionProvider.Intent intent = getIntent(command.getType());//根据意图和slot获取连接CompletableFuture<StatefulRedisConnection<K, V>> connectFuture = ((AsyncClusterConnectionProvider) clusterConnectionProvider).getConnectionAsync(intent, hash);//如果成功获取连接if (isSuccessfullyCompleted(connectFuture)) {writeCommand(commandToSend, false, connectFuture.join(), null);} else {//如果连接尚未处理完,或有异常,则添加完成处理器connectFuture.whenComplete((connection, throwable) -> writeCommand(commandToSend, false, connection,throwable));}return commandToSend;}}writeCommand(commandToSend, defaultWriter);return commandToSend;}

  但是如果计算出的slot因为集群扩展导致这个slot已经不在这个节点上lettuce是如何处理的呢?通过查阅ClusterCommand源码可以发现在complete方法中对于该问题进行了处理;如果响应是MOVED则会继续访问MOVED目标节点,这个重定向的此时可以指定的,默认为5次,通过上文的配置可以发现,在配置中只允许一次重定向

 @Overridepublic void complete() {//如果响应是MOVED或ASKif (isMoved() || isAsk()) {//如果最大重定向次数大于当前重定向次数则可以进行重定向boolean retryCommand = maxRedirections > redirections;//重定向次数自增redirections++;if (retryCommand) {try {//重定向retry.write(this);} catch (Exception e) {completeExceptionally(e);}return;}}super.complete();completed = true;}

  如果是ask向重定向目标发送命令前需要同步发送asking

 private static <K, V> void writeCommand(RedisCommand<K, V, ?> command, boolean asking,StatefulRedisConnection<K, V> connection, Throwable throwable) {if (throwable != null) {command.completeExceptionally(throwable);return;}try {//如果需要发送asking请求,即接收到ASK错误消息,则在重定向到目标主机后需要发送asking命令if (asking) {connection.async().asking();}//发送命令writeCommand(command, ((RedisChannelHandler<K, V>) connection).getChannelWriter());} catch (Exception e) {command.completeExceptionally(e);}}

上文主要介绍了lettuce对于单个key的处理,如果存在多个key,如mget lettuce又是如何处理的呢?其主要思路是将key根据slot进行分组,将在同一个slot的命令一起发送到对应的节点,再将所有请求的返回值合并作为最终结果.源码如下:

  @Overridepublic RedisFuture<List<KeyValue<K, V>>> mget(Iterable<K> keys) {//获取分区和key的映射关系Map<Integer, List<K>> partitioned = SlotHash.partition(codec, keys);//如果分区数小于2也就是只有一个分区即所有key都落在一个分区就直接获取if (partitioned.size() < 2) {return super.mget(keys);}//每个key与slot映射关系Map<K, Integer> slots = SlotHash.getSlots(partitioned);Map<Integer, RedisFuture<List<KeyValue<K, V>>>> executions = new HashMap<>();//遍历分片信息,逐个发送for (Map.Entry<Integer, List<K>> entry : partitioned.entrySet()) {RedisFuture<List<KeyValue<K, V>>> mget = super.mget(entry.getValue());executions.put(entry.getKey(), mget);}//恢复key的顺序return new PipelinedRedisFuture<>(executions, objectPipelinedRedisFuture -> {List<KeyValue<K, V>> result = new ArrayList<>();for (K opKey : keys) {int slot = slots.get(opKey);int position = partitioned.get(slot).indexOf(opKey);RedisFuture<List<KeyValue<K, V>>> listRedisFuture = executions.get(slot);result.add(MultiNodeExecution.execute(() -> listRedisFuture.get().get(position)));}return result;});}

Lettuce之RedisClusterClient使用以及源码分析相关推荐

  1. Redis集群模式源码分析

    目录 1 主从复制模式 2 Sentinel(哨兵)模式 3 Cluster模式 4.参考文档 1 主从复制模式 主库负责读写操作,从库负责数据同步,接受来自主库的同步命令.通过分析Redis的客户端 ...

  2. Netty4.x: Server端 设置 option 警告 Unknown channel option ‘xxxx‘ for channel 分析及解决 (附源码分析)

    一.问题背景: 最近某springboot项目想嵌入一个用户聊天功能,打算使用 Rabbitmq + Netty4.x + Redis 来开发高性能聊天功能.花费三天时间所有功能都已实现.启动时却警告 ...

  3. 【Golang源码分析】Go Web常用程序包gorilla/mux的使用与源码简析

    目录[阅读时间:约10分钟] 一.概述 二.对比: gorilla/mux与net/http DefaultServeMux 三.简单使用 四.源码简析 1.NewRouter函数 2.HandleF ...

  4. SpringBoot-web开发(四): SpringMVC的拓展、接管(源码分析)

    [SpringBoot-web系列]前文: SpringBoot-web开发(一): 静态资源的导入(源码分析) SpringBoot-web开发(二): 页面和图标定制(源码分析) SpringBo ...

  5. SpringBoot-web开发(二): 页面和图标定制(源码分析)

    [SpringBoot-web系列]前文: SpringBoot-web开发(一): 静态资源的导入(源码分析) 目录 一.首页 1. 源码分析 2. 访问首页测试 二.动态页面 1. 动态资源目录t ...

  6. SpringBoot-web开发(一): 静态资源的导入(源码分析)

    目录 方式一:通过WebJars 1. 什么是webjars? 2. webjars的使用 3. webjars结构 4. 解析源码 5. 测试访问 方式二:放入静态资源目录 1. 源码分析 2. 测 ...

  7. Yolov3Yolov4网络结构与源码分析

    Yolov3&Yolov4网络结构与源码分析 从2018年Yolov3年提出的两年后,在原作者声名放弃更新Yolo算法后,俄罗斯的Alexey大神扛起了Yolov4的大旗. 文章目录 论文汇总 ...

  8. ViewGroup的Touch事件分发(源码分析)

    Android中Touch事件的分发又分为View和ViewGroup的事件分发,View的touch事件分发相对比较简单,可参考 View的Touch事件分发(一.初步了解) View的Touch事 ...

  9. View的Touch事件分发(二.源码分析)

    Android中Touch事件的分发又分为View和ViewGroup的事件分发,先来看简单的View的touch事件分发. 主要分析View的dispatchTouchEvent()方法和onTou ...

  10. MyBatis原理分析之四:一次SQL查询的源码分析

    上回我们讲到Mybatis加载相关的配置文件进行初始化,这回我们讲一下一次SQL查询怎么进行的. 准备工作 Mybatis完成一次SQL查询需要使用的代码如下: Java代码   String res ...

最新文章

  1. mysql去重保留最后一个_MySQL-去重留一
  2. rpa操作excel_RPA的功能与技术剖析
  3. 阳台花园不只美丽-东方美琪·安琪:身心健康谋定心灵升华
  4. vue调试工具vue-devtools安装及使用
  5. 如何利用LabelImg将标注文件在YOLO格式与PascalVOC格式间相互转换
  6. CMFCToolBar插入组合框
  7. Python_迭代器Iterator
  8. 持续交付 devops_使用DevOps开始加速软件交付
  9. 四款855旗舰对比:除开价格,各有优点
  10. 智能家居系统--选配防盗锁新(转载)
  11. python检查exe运行是否报错_python打包成exe格式后,在部分机子上没法运行
  12. 【语音隐写】基于matlab GUI DCT+DWT音频数字水印嵌入提取【含Matlab源码 836期】
  13. struts2 获得前端数据:
  14. 手机游戏开发如何正确选择设计分辨率
  15. 基于二分查询树(BinarySearchTrees)实现的键值对表(symbole-table)
  16. day7-列表和元组
  17. Pandas学习(二)—— Pandas基础
  18. 如何设置ajax监控,监控使用AJAX的XMLHttpRequest
  19. 极验第四代滑块验证码破解(一):AST还原混淆JS
  20. 移动广告聚合KeyMob移动广告平台——移动广告联盟

热门文章

  1. 建立基于安全域的涉密信息系统
  2. 《代码大全》读书笔记(转载)
  3. golang 求差集和并集算法
  4. Dubbo-自适应扩展机制之Adaptive注解原理
  5. Mono.Cecil DefaultAssemblyResolver.Dispose
  6. 计算机应用基础——计算机软件(二)
  7. python天天向上的力量实验报告_Python练习11:天天向上的力量
  8. python常数_SciPy所有常数解释
  9. QQ空间批量删除说说
  10. Docker三剑客详解