Lettuce是一个可伸缩线程安全的Redis客户端。多个线程可以共享同一个RedisConnection.本文是基于Lettuce5,主要介绍的知识点如下:

  1. Lettuce在Spring Boot中的配置
  2. Lettuce的同步,异步,响应式使用方式
  3. 事件的订阅
  4. 发布自定义事件
  5. 读写分离
  6. 读写分离策略实现源码
  7. 客户端分片实现
@Configuration
public class LettuceConfig {/*** 配置客户端资源* @return*/@Bean(destroyMethod = "shutdown")ClientResources clientResources() {return DefaultClientResources.builder().ioThreadPoolSize(8).computationThreadPoolSize(10).build();}/*** 配置Socket选项* keepAlive=true* tcpNoDelay=true* connectionTimeout=5秒* @return*/@BeanSocketOptions socketOptions(){return SocketOptions.builder().keepAlive(true).tcpNoDelay(true).connectTimeout(Duration.ofSeconds(5)).build(); }/*** 配置客户端选项* @return*/@BeanClientOptions clientOptions(SocketOptions socketOptions) {return ClientOptions.builder().socketOptions(socketOptions).build();}/*** 创建RedisClient* @param clientResources 客户端资源* @param clientOptions 客户端选项* @return */@Bean(destroyMethod = "shutdown")RedisClient redisClient(ClientResources clientResources, ClientOptions clientOptions) {RedisURI uri = RedisURI.builder().withSentinel("xx.xx.xx.xx", 26009).withPassword("abcd1234").withSentinelMasterId("xxx").build();RedisClient client = RedisClient.create(clientResources, uri);client.setOptions(clientOptions);return client;}/*** 创建连接* @param redisClient* @return*/@Bean(destroyMethod = "close")StatefulRedisConnection<String, String> connection(RedisClient redisClient) {return redisClient.connect();}
}

 基本使用

public Mono<ServerResponse> hello(ServerRequest request) throws  Exception {//响应式使用Mono<String> resp = redisConnection.reactive().get("gxt_new");//同步使用redisConnection.sync().get("test");redisConnection.async().get("test").get(5, TimeUnit.SECONDS);return ServerResponse.ok().body(resp, String.class);
}

 客户端订阅事件

客户端使用事件总线传输运行期间产生的事件;EventBus可以从客户端资源进行配置和获取,并用于客户端和自定义事件。  

如下事件可以被客户端发送:

  • 连接事件
  • 测量事件
  • 集群拓扑事件
client.getResources().eventBus().get().subscribe(e -> {System.out.println("client 订阅事件: " + e);});

client 订阅事件: ConnectionActivatedEvent [/xx:49910 -> /xx:6008]
client 订阅事件: ConnectionActivatedEvent [/xx:49911 -> /xx:6018]
client 订阅事件: ConnectedEvent [/xx:49912 -> /xx:6018]

 发布事件

发布使用也是通过使用eventBus进行发布事件,Event接口只是一个标签接口

 eventBus.publish(new Event() {@Overridepublic String toString() {return "自定义事件";}});

订阅者就可以订阅到这个自定义事件了  

client 订阅事件: 自定义事件

 读写分离 

@Bean(destroyMethod = "close")StatefulRedisMasterSlaveConnection<String, String> statefulRedisMasterSlaveConnection(RedisClient redisClient, RedisURI redisURI) {StatefulRedisMasterSlaveConnection connection = MasterSlave.connect(redisClient, new Utf8StringCodec(), redisURI);connection.setReadFrom(ReadFrom.NEAREST);return connection;}
}

  StatefulRedisMasterSlaveConnection 支持读写分离,通过设置ReadFrom控制读是从哪个节点读取.

参数 含义
MASTER 从master节点读取
SLAVE 从slave节点读取
MASTER_PREFERRED
从master节点读取,如果master节点不可以则从slave节点读取
SLAVE_PREFERRED
从slave节点读取,如果slave节点不可用则倒退到master节点读取
NEAREST
从最近到节点读取

具体是如何实现到呢? 下面看一下MasterSlaveConnectionProvider相关源码

 //根据意图获取连接public StatefulRedisConnection<K, V> getConnection(Intent intent) {if (debugEnabled) {logger.debug("getConnection(" + intent + ")");}//如果readFrom不为null且是READif (readFrom != null && intent == Intent.READ) {//根据readFrom配置从已知节点中选择可用节点描述List<RedisNodeDescription> selection = readFrom.select(new ReadFrom.Nodes() {@Overridepublic List<RedisNodeDescription> getNodes() {return knownNodes;}@Overridepublic Iterator<RedisNodeDescription> iterator() {return knownNodes.iterator();}});//如果可选择节点集合为空则抛出异常if (selection.isEmpty()) {throw new RedisException(String.format("Cannot determine a node to read (Known nodes: %s) with setting %s",knownNodes, readFrom));}try {//遍历所有可用节点for (RedisNodeDescription redisNodeDescription : selection) {//获取节点连接StatefulRedisConnection<K, V> readerCandidate = getConnection(redisNodeDescription);//如果节点连接不是打开到连接则继续查找下一个连接if (!readerCandidate.isOpen()) {continue;}//返回可用连接return readerCandidate;}//如果没有找到可用连接,默认返回第一个return getConnection(selection.get(0));} catch (RuntimeException e) {throw new RedisException(e);}}//如果没有配置readFrom或者不是READ 则返回master连接return getConnection(getMaster());}

 我们可以看到选择连接到逻辑是通用的,不同的处理就是在selection的处理上,下面看一下不同readFrom策略对于selection的处理

ReadFromSlavePerferred和ReadFromMasterPerferred都是有优先级到概念,看看相关逻辑的处理

static final class ReadFromSlavePreferred extends ReadFrom {@Overridepublic List<RedisNodeDescription> select(Nodes nodes) {List<RedisNodeDescription> result = new ArrayList<>(nodes.getNodes().size());//优先添加slave节点for (RedisNodeDescription node : nodes) {if (node.getRole() == RedisInstance.Role.SLAVE) {result.add(node);}}//最后添加master节点for (RedisNodeDescription node : nodes) {if (node.getRole() == RedisInstance.Role.MASTER) {result.add(node);}}return result;}

 static final class ReadFromMasterPreferred extends ReadFrom {@Overridepublic List<RedisNodeDescription> select(Nodes nodes) {List<RedisNodeDescription> result = new ArrayList<>(nodes.getNodes().size());//优先添加master节点for (RedisNodeDescription node : nodes) {if (node.getRole() == RedisInstance.Role.MASTER) {result.add(node);}}//其次在添加slave节点for (RedisNodeDescription node : nodes) {if (node.getRole() == RedisInstance.Role.SLAVE) {result.add(node);}}return result;}}

对于ReadFromMaster和ReadFromSlave都是获取指定角色的节点  

 static final class ReadFromSlave extends ReadFrom {@Overridepublic List<RedisNodeDescription> select(Nodes nodes) {List<RedisNodeDescription> result = new ArrayList<>(nodes.getNodes().size());//只获取slave节点for (RedisNodeDescription node : nodes) {if (node.getRole() == RedisInstance.Role.SLAVE) {result.add(node);}}return result;}}

static final class ReadFromMaster extends ReadFrom {@Overridepublic List<RedisNodeDescription> select(Nodes nodes) {for (RedisNodeDescription node : nodes) {if (node.getRole() == RedisInstance.Role.MASTER) {return LettuceLists.newList(node);}}return Collections.emptyList();}}

  获取最近的节点这个就有点特殊了,它对已知对节点没有做处理,直接返回了它们的节点描述,也就是谁在前面就优先使用谁

static final class ReadFromNearest extends ReadFrom {@Overridepublic List<RedisNodeDescription> select(Nodes nodes) {return nodes.getNodes();}}

  在SentinelTopologyProvider中可以发现,获取nodes节点总是优先获取Master节点,其次是slave节点,这样Nearest效果就等效与MasterPreferred

public List<RedisNodeDescription> getNodes() {logger.debug("lookup topology for masterId {}", masterId);try (StatefulRedisSentinelConnection<String, String> connection = redisClient.connectSentinel(CODEC, sentinelUri)) {RedisFuture<Map<String, String>> masterFuture = connection.async().master(masterId);RedisFuture<List<Map<String, String>>> slavesFuture = connection.async().slaves(masterId);List<RedisNodeDescription> result = new ArrayList<>();try {Map<String, String> master = masterFuture.get(timeout.toNanos(), TimeUnit.NANOSECONDS);List<Map<String, String>> slaves = slavesFuture.get(timeout.toNanos(), TimeUnit.NANOSECONDS);//添加master节点    result.add(toNode(master, RedisInstance.Role.MASTER));//添加所有slave节点result.addAll(slaves.stream().filter(SentinelTopologyProvider::isAvailable).map(map -> toNode(map, RedisInstance.Role.SLAVE)).collect(Collectors.toList()));} catch (ExecutionException | InterruptedException | TimeoutException e) {throw new RedisException(e);}return result;}}

   自定义负载均衡  

通过上文可以发现只需要实现 ReadFrom接口,就可以通过该接口实现Master,Slave负载均衡;下面的示例是通过将nodes节点进行打乱,进而实现

 @Bean(destroyMethod = "close")StatefulRedisMasterSlaveConnection<String, String> statefulRedisMasterSlaveConnection(RedisClient redisClient, RedisURI redisURI) {StatefulRedisMasterSlaveConnection connection = MasterSlave.connect(redisClient, new Utf8StringCodec(), redisURI);connection.setReadFrom(new ReadFrom() {@Overridepublic List<RedisNodeDescription> select(Nodes nodes) {List<RedisNodeDescription> list = nodes.getNodes();Collections.shuffle(list);return list;}});return connection;}

在大规模使用的时候会使用多组主备服务,可以通过客户端分片的方式将部分请求路由到指定的服务器上,但是Lettuce没有提供这样的支持,下面是自定义的实现:

public class Sharded< C extends StatefulRedisConnection,V> {private TreeMap<Long, String> nodes;private final Hashing algo = Hashing.MURMUR_HASH;private final Map<String, StatefulRedisConnection> resources = new LinkedHashMap<>();private RedisClient redisClient;private String password;private Set<HostAndPort> sentinels;private RedisCodec<String, V> codec;public Sharded(List<String> masters, RedisClient redisClient, String password, Set<HostAndPort> sentinels, RedisCodec<String, V> codec) {this.redisClient = redisClient;this.password = password;this.sentinels = sentinels;this.codec = codec;initialize(masters);}private void initialize(List<String> masters) {nodes = new TreeMap<>();for (int i = 0; i != masters.size(); ++i) {final String master = masters.get(i);for (int n = 0; n < 160; n++) {nodes.put(this.algo.hash("SHARD-" + i + "-NODE-" + n), master);}RedisURI.Builder builder = RedisURI.builder();for (HostAndPort hostAndPort : sentinels) {builder.withSentinel(hostAndPort.getHostText(), hostAndPort.getPort());}RedisURI redisURI = builder.withPassword(password).withSentinelMasterId(master).build();resources.put(master, MasterSlave.connect(redisClient, codec, redisURI));}}public StatefulRedisConnection getConnectionBy(String key) {return resources.get(getShardInfo(SafeEncoder.encode(key)));}public Collection<StatefulRedisConnection> getAllConnection(){return Collections.unmodifiableCollection(resources.values());}public String getShardInfo(byte[] key) {SortedMap<Long, String> tail = nodes.tailMap(algo.hash(key));if (tail.isEmpty()) {return nodes.get(nodes.firstKey());}return tail.get(tail.firstKey());}public void close(){for(StatefulRedisConnection connection:  getAllConnection()){connection.close();}}private static  class SafeEncoder {static byte[] encode(final String str) {try {if (str == null) {throw new IllegalArgumentException("value sent to redis cannot be null");}return str.getBytes("UTF-8");} catch (UnsupportedEncodingException e) {throw new RuntimeException(e);}}}private interface Hashing {Hashing MURMUR_HASH = new MurmurHash();long hash(String key);long hash(byte[] key);}private static  class MurmurHash implements Hashing {static long hash64A(byte[] data, int seed) {return hash64A(ByteBuffer.wrap(data), seed);}static long hash64A(ByteBuffer buf, int seed) {ByteOrder byteOrder = buf.order();buf.order(ByteOrder.LITTLE_ENDIAN);long m = 0xc6a4a7935bd1e995L;int r = 47;long h = seed ^ (buf.remaining() * m);long k;while (buf.remaining() >= 8) {k = buf.getLong();k *= m;k ^= k >>> r;k *= m;h ^= k;h *= m;}if (buf.remaining() > 0) {ByteBuffer finish = ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN);// for big-endian version, do this first:// finish.position(8-buf.remaining());finish.put(buf).rewind();h ^= finish.getLong();h *= m;}h ^= h >>> r;h *= m;h ^= h >>> r;buf.order(byteOrder);return h;}public long hash(byte[] key) {return hash64A(key, 0x1234ABCD);}public long hash(String key) {return hash(SafeEncoder.encode(key));}}}

 @Bean(destroyMethod = "close")Sharded<StatefulRedisMasterSlaveConnection,String> sharded(RedisClient redisClient) {Set<HostAndPort> hostAndPorts=new HashSet<>();hostAndPorts.add(HostAndPort.parse("1xx:26009"));hostAndPorts.add(HostAndPort.parse("1xx:26009"));return new Sharded<>(Arrays.asList("te009","test68","test67"),redisClient,"password",hostAndPorts, new Utf8StringCodec());}

  使用方式

  //只从slave节点中读取StatefulRedisMasterSlaveConnection redisConnection = (StatefulRedisMasterSlaveConnection) sharded.getConnectionBy("key");//使用异步模式获取缓存值System.out.println(redisConnection.sync().get("key"));

Lettuce在Spring boot中的使用方式相关推荐

  1. java 方式配置ssm,关于SSM以及Spring boot中对于Spring MVC配置的问题

    SSM中 Spring MVC配置 传统的web.xml配置 web.xml contextConfigLocation classpath*:applicationContext.xml org.s ...

  2. springboot初始化逻辑_详解Spring Boot中初始化资源的几种方式

    假设有这么一个需求,要求在项目启动过程中,完成线程池的初始化,加密证书加载等功能,你会怎么做?如果没想好答案,请接着往下看.今天介绍几种在Spring Boot中进行资源初始化的方式,帮助大家解决和回 ...

  3. RabbitMQ(六)——Spring boot中消费消息的两种方式

    前言 上一篇博客中,我们只是简单总结了Spring boot中整合RabbitMQ的操作,针对消息消费的两种方式只是简单给了一个实例,这篇博客,我们进一步总结关于Spring boot消息消费的相关功 ...

  4. Spring Boot 中的热部署方式总结

    1 Spring Boot 的热部署方式 1.模板热部署 在 Spring Boot 中,模板引擎的页面默认是开启缓存的,如果修改了页面的内容,则刷新页面是得不到修改后的页面的,因此我们可以在appl ...

  5. (转)Spring Boot(三):Spring Boot 中 Redis 的使用

    http://www.ityouknow.com/springboot/2016/03/06/spring-boot-redis.html Spring Boot 对常用的数据库支持外,对 Nosql ...

  6. 工厂方法模式:在 Spring Boot 中的典型应用

    大家好,我是CodingLong! 在上一篇<简单工厂模式>中我们了解到,简单工厂模式每次增加新的产品时,都要修改其"工厂类",这违背了开闭原则.而本篇介绍的工厂方法模 ...

  7. 【spring boot2】第8篇:spring boot 中的 servlet 容器及如何使用war包部署

    嵌入式 servlet 容器 在 spring boot 之前的web开发,我们都是把我们的应用部署到 Tomcat 等servelt容器,这些容器一般都会在我们的应用服务器上安装好环境,但是 spr ...

  8. Spring Boot 中使用 MongoDB 增删改查

    本文快速入门,MongoDB 结合SpringBoot starter-data-mongodb 进行增删改查 1.什么是MongoDB ? MongoDB 是由C++语言编写的,是一个基于分布式文件 ...

  9. Spring Boot 中使用@Async实现异步调用,加速任务执行!

    欢迎关注方志朋的博客,回复"666"获面试宝典 什么是"异步调用"?"异步调用"对应的是"同步调用",同步调用指程序按照 ...

  10. 徒手解密 Spring Boot 中的 Starter自动化配置黑魔法

    我们使用 Spring Boot,基本上都是沉醉在它 Stater 的方便之中.Starter 为我们带来了众多的自动化配置,有了这些自动化配置,我们可以不费吹灰之力就能搭建一个生产级开发环境,有的小 ...

最新文章

  1. go gin框架:Any响应任何请求类型
  2. android应用程序签名
  3. mybatis调用oracle过程,使用MyBatis调用Oracle存储过程
  4. Python通过Zabbix API获得数据
  5. pytorch损失函数(正在更新中,勿点进来看)
  6. Vue自定义组件——图片放大器,js点击<img>触发图片放大,富文本内图片点击实现放大器效果
  7. matlab的基本语法规则_Matlab基本语法与指令
  8. server2008服务器进不了桌面,解决windows server 2008 r2 登录进入桌面只显示一片蓝色背景...
  9. ubuntu18字符终端不支持中文问题(汉字菱形)
  10. 王者荣耀微信登陆不了服务器,王者荣耀微信区怎么登陆不了 王者荣耀微信区怎么登不上...
  11. mysql 导入 没有数据库文件怎么打开_没有SQL Server数据库时如何打开.MDF文件
  12. 数据库缓存服务—Redis配置与优化
  13. 厦门市各公交线路途经站点
  14. eclipse hana xs 开发环境搭建
  15. vue引入echarts-liquidfill水滴图并批量动态加载
  16. 戴尔科技云平台赋能“新基建”,打造云底座
  17. 电子设计教程12:Buck降压电路
  18. 基于GEC6818的智能家居系统[完整源码/项目报告/笔记分享]
  19. 金融数据分析(三)当当网店铺商品爬虫——爬虫类书籍为例:requestsbs4
  20. [Pytorch框架] PyTorch 中文手册

热门文章

  1. net_speeder发双倍包加速
  2. DELPHI 旧控件安装到 DELPHI11 新版环境的操作
  3. 数据库实验2:简易登录页面设计(c#)
  4. 搭建web服务器asp网站传马
  5. nuc8i7beh黑苹果_Intel NUC8i7BEH 黑苹果Hackintosh EFI引导
  6. csdner: china_jeffery, C++默认构造函数; csdner: thief thief, 什么情况下C++编译器会生成默认的构造函数
  7. JAva继承编写自行车例子,java – Freemarker中的继承/实例检查
  8. java图片合成_Java图片处理(一)图片合成
  9. mysql localhost可以连接,输入ip地址连接访问被拒绝
  10. Python中的逻辑运算符:‘and‘与‘or‘的用法