Lettuce在Spring boot中的使用方式
Lettuce是一个可伸缩线程安全的Redis客户端。多个线程可以共享同一个RedisConnection.本文是基于Lettuce5,主要介绍的知识点如下:
- Lettuce在Spring Boot中的配置
- Lettuce的同步,异步,响应式使用方式
- 事件的订阅
- 发布自定义事件
- 读写分离
- 读写分离策略实现源码
- 客户端分片实现
@Configuration
public class LettuceConfig {/*** 配置客户端资源* @return*/@Bean(destroyMethod = "shutdown")ClientResources clientResources() {return DefaultClientResources.builder().ioThreadPoolSize(8).computationThreadPoolSize(10).build();}/*** 配置Socket选项* keepAlive=true* tcpNoDelay=true* connectionTimeout=5秒* @return*/@BeanSocketOptions socketOptions(){return SocketOptions.builder().keepAlive(true).tcpNoDelay(true).connectTimeout(Duration.ofSeconds(5)).build(); }/*** 配置客户端选项* @return*/@BeanClientOptions clientOptions(SocketOptions socketOptions) {return ClientOptions.builder().socketOptions(socketOptions).build();}/*** 创建RedisClient* @param clientResources 客户端资源* @param clientOptions 客户端选项* @return */@Bean(destroyMethod = "shutdown")RedisClient redisClient(ClientResources clientResources, ClientOptions clientOptions) {RedisURI uri = RedisURI.builder().withSentinel("xx.xx.xx.xx", 26009).withPassword("abcd1234").withSentinelMasterId("xxx").build();RedisClient client = RedisClient.create(clientResources, uri);client.setOptions(clientOptions);return client;}/*** 创建连接* @param redisClient* @return*/@Bean(destroyMethod = "close")StatefulRedisConnection<String, String> connection(RedisClient redisClient) {return redisClient.connect();}
}
基本使用
public Mono<ServerResponse> hello(ServerRequest request) throws Exception {//响应式使用Mono<String> resp = redisConnection.reactive().get("gxt_new");//同步使用redisConnection.sync().get("test");redisConnection.async().get("test").get(5, TimeUnit.SECONDS);return ServerResponse.ok().body(resp, String.class);
}
客户端订阅事件
客户端使用事件总线传输运行期间产生的事件;EventBus可以从客户端资源进行配置和获取,并用于客户端和自定义事件。
如下事件可以被客户端发送:
- 连接事件
- 测量事件
- 集群拓扑事件
client.getResources().eventBus().get().subscribe(e -> {System.out.println("client 订阅事件: " + e);});
client 订阅事件: ConnectionActivatedEvent [/xx:49910 -> /xx:6008]
client 订阅事件: ConnectionActivatedEvent [/xx:49911 -> /xx:6018]
client 订阅事件: ConnectedEvent [/xx:49912 -> /xx:6018]
发布事件
发布使用也是通过使用eventBus进行发布事件,Event接口只是一个标签接口
eventBus.publish(new Event() {@Overridepublic String toString() {return "自定义事件";}});
订阅者就可以订阅到这个自定义事件了
client 订阅事件: 自定义事件
读写分离
@Bean(destroyMethod = "close")StatefulRedisMasterSlaveConnection<String, String> statefulRedisMasterSlaveConnection(RedisClient redisClient, RedisURI redisURI) {StatefulRedisMasterSlaveConnection connection = MasterSlave.connect(redisClient, new Utf8StringCodec(), redisURI);connection.setReadFrom(ReadFrom.NEAREST);return connection;}
}
StatefulRedisMasterSlaveConnection 支持读写分离,通过设置ReadFrom控制读是从哪个节点读取.
参数 | 含义 |
MASTER | 从master节点读取 |
SLAVE | 从slave节点读取 |
|
从master节点读取,如果master节点不可以则从slave节点读取 |
|
从slave节点读取,如果slave节点不可用则倒退到master节点读取 |
|
从最近到节点读取 |
具体是如何实现到呢? 下面看一下MasterSlaveConnectionProvider相关源码
//根据意图获取连接public StatefulRedisConnection<K, V> getConnection(Intent intent) {if (debugEnabled) {logger.debug("getConnection(" + intent + ")");}//如果readFrom不为null且是READif (readFrom != null && intent == Intent.READ) {//根据readFrom配置从已知节点中选择可用节点描述List<RedisNodeDescription> selection = readFrom.select(new ReadFrom.Nodes() {@Overridepublic List<RedisNodeDescription> getNodes() {return knownNodes;}@Overridepublic Iterator<RedisNodeDescription> iterator() {return knownNodes.iterator();}});//如果可选择节点集合为空则抛出异常if (selection.isEmpty()) {throw new RedisException(String.format("Cannot determine a node to read (Known nodes: %s) with setting %s",knownNodes, readFrom));}try {//遍历所有可用节点for (RedisNodeDescription redisNodeDescription : selection) {//获取节点连接StatefulRedisConnection<K, V> readerCandidate = getConnection(redisNodeDescription);//如果节点连接不是打开到连接则继续查找下一个连接if (!readerCandidate.isOpen()) {continue;}//返回可用连接return readerCandidate;}//如果没有找到可用连接,默认返回第一个return getConnection(selection.get(0));} catch (RuntimeException e) {throw new RedisException(e);}}//如果没有配置readFrom或者不是READ 则返回master连接return getConnection(getMaster());}
我们可以看到选择连接到逻辑是通用的,不同的处理就是在selection的处理上,下面看一下不同readFrom策略对于selection的处理
ReadFromSlavePerferred和ReadFromMasterPerferred都是有优先级到概念,看看相关逻辑的处理
static final class ReadFromSlavePreferred extends ReadFrom {@Overridepublic List<RedisNodeDescription> select(Nodes nodes) {List<RedisNodeDescription> result = new ArrayList<>(nodes.getNodes().size());//优先添加slave节点for (RedisNodeDescription node : nodes) {if (node.getRole() == RedisInstance.Role.SLAVE) {result.add(node);}}//最后添加master节点for (RedisNodeDescription node : nodes) {if (node.getRole() == RedisInstance.Role.MASTER) {result.add(node);}}return result;}
static final class ReadFromMasterPreferred extends ReadFrom {@Overridepublic List<RedisNodeDescription> select(Nodes nodes) {List<RedisNodeDescription> result = new ArrayList<>(nodes.getNodes().size());//优先添加master节点for (RedisNodeDescription node : nodes) {if (node.getRole() == RedisInstance.Role.MASTER) {result.add(node);}}//其次在添加slave节点for (RedisNodeDescription node : nodes) {if (node.getRole() == RedisInstance.Role.SLAVE) {result.add(node);}}return result;}}
对于ReadFromMaster和ReadFromSlave都是获取指定角色的节点
static final class ReadFromSlave extends ReadFrom {@Overridepublic List<RedisNodeDescription> select(Nodes nodes) {List<RedisNodeDescription> result = new ArrayList<>(nodes.getNodes().size());//只获取slave节点for (RedisNodeDescription node : nodes) {if (node.getRole() == RedisInstance.Role.SLAVE) {result.add(node);}}return result;}}
static final class ReadFromMaster extends ReadFrom {@Overridepublic List<RedisNodeDescription> select(Nodes nodes) {for (RedisNodeDescription node : nodes) {if (node.getRole() == RedisInstance.Role.MASTER) {return LettuceLists.newList(node);}}return Collections.emptyList();}}
获取最近的节点这个就有点特殊了,它对已知对节点没有做处理,直接返回了它们的节点描述,也就是谁在前面就优先使用谁
static final class ReadFromNearest extends ReadFrom {@Overridepublic List<RedisNodeDescription> select(Nodes nodes) {return nodes.getNodes();}}
在SentinelTopologyProvider中可以发现,获取nodes节点总是优先获取Master节点,其次是slave节点,这样Nearest效果就等效与MasterPreferred
public List<RedisNodeDescription> getNodes() {logger.debug("lookup topology for masterId {}", masterId);try (StatefulRedisSentinelConnection<String, String> connection = redisClient.connectSentinel(CODEC, sentinelUri)) {RedisFuture<Map<String, String>> masterFuture = connection.async().master(masterId);RedisFuture<List<Map<String, String>>> slavesFuture = connection.async().slaves(masterId);List<RedisNodeDescription> result = new ArrayList<>();try {Map<String, String> master = masterFuture.get(timeout.toNanos(), TimeUnit.NANOSECONDS);List<Map<String, String>> slaves = slavesFuture.get(timeout.toNanos(), TimeUnit.NANOSECONDS);//添加master节点 result.add(toNode(master, RedisInstance.Role.MASTER));//添加所有slave节点result.addAll(slaves.stream().filter(SentinelTopologyProvider::isAvailable).map(map -> toNode(map, RedisInstance.Role.SLAVE)).collect(Collectors.toList()));} catch (ExecutionException | InterruptedException | TimeoutException e) {throw new RedisException(e);}return result;}}
自定义负载均衡
通过上文可以发现只需要实现 ReadFrom接口,就可以通过该接口实现Master,Slave负载均衡;下面的示例是通过将nodes节点进行打乱,进而实现
@Bean(destroyMethod = "close")StatefulRedisMasterSlaveConnection<String, String> statefulRedisMasterSlaveConnection(RedisClient redisClient, RedisURI redisURI) {StatefulRedisMasterSlaveConnection connection = MasterSlave.connect(redisClient, new Utf8StringCodec(), redisURI);connection.setReadFrom(new ReadFrom() {@Overridepublic List<RedisNodeDescription> select(Nodes nodes) {List<RedisNodeDescription> list = nodes.getNodes();Collections.shuffle(list);return list;}});return connection;}
在大规模使用的时候会使用多组主备服务,可以通过客户端分片的方式将部分请求路由到指定的服务器上,但是Lettuce没有提供这样的支持,下面是自定义的实现:
public class Sharded< C extends StatefulRedisConnection,V> {private TreeMap<Long, String> nodes;private final Hashing algo = Hashing.MURMUR_HASH;private final Map<String, StatefulRedisConnection> resources = new LinkedHashMap<>();private RedisClient redisClient;private String password;private Set<HostAndPort> sentinels;private RedisCodec<String, V> codec;public Sharded(List<String> masters, RedisClient redisClient, String password, Set<HostAndPort> sentinels, RedisCodec<String, V> codec) {this.redisClient = redisClient;this.password = password;this.sentinels = sentinels;this.codec = codec;initialize(masters);}private void initialize(List<String> masters) {nodes = new TreeMap<>();for (int i = 0; i != masters.size(); ++i) {final String master = masters.get(i);for (int n = 0; n < 160; n++) {nodes.put(this.algo.hash("SHARD-" + i + "-NODE-" + n), master);}RedisURI.Builder builder = RedisURI.builder();for (HostAndPort hostAndPort : sentinels) {builder.withSentinel(hostAndPort.getHostText(), hostAndPort.getPort());}RedisURI redisURI = builder.withPassword(password).withSentinelMasterId(master).build();resources.put(master, MasterSlave.connect(redisClient, codec, redisURI));}}public StatefulRedisConnection getConnectionBy(String key) {return resources.get(getShardInfo(SafeEncoder.encode(key)));}public Collection<StatefulRedisConnection> getAllConnection(){return Collections.unmodifiableCollection(resources.values());}public String getShardInfo(byte[] key) {SortedMap<Long, String> tail = nodes.tailMap(algo.hash(key));if (tail.isEmpty()) {return nodes.get(nodes.firstKey());}return tail.get(tail.firstKey());}public void close(){for(StatefulRedisConnection connection: getAllConnection()){connection.close();}}private static class SafeEncoder {static byte[] encode(final String str) {try {if (str == null) {throw new IllegalArgumentException("value sent to redis cannot be null");}return str.getBytes("UTF-8");} catch (UnsupportedEncodingException e) {throw new RuntimeException(e);}}}private interface Hashing {Hashing MURMUR_HASH = new MurmurHash();long hash(String key);long hash(byte[] key);}private static class MurmurHash implements Hashing {static long hash64A(byte[] data, int seed) {return hash64A(ByteBuffer.wrap(data), seed);}static long hash64A(ByteBuffer buf, int seed) {ByteOrder byteOrder = buf.order();buf.order(ByteOrder.LITTLE_ENDIAN);long m = 0xc6a4a7935bd1e995L;int r = 47;long h = seed ^ (buf.remaining() * m);long k;while (buf.remaining() >= 8) {k = buf.getLong();k *= m;k ^= k >>> r;k *= m;h ^= k;h *= m;}if (buf.remaining() > 0) {ByteBuffer finish = ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN);// for big-endian version, do this first:// finish.position(8-buf.remaining());finish.put(buf).rewind();h ^= finish.getLong();h *= m;}h ^= h >>> r;h *= m;h ^= h >>> r;buf.order(byteOrder);return h;}public long hash(byte[] key) {return hash64A(key, 0x1234ABCD);}public long hash(String key) {return hash(SafeEncoder.encode(key));}}}
@Bean(destroyMethod = "close")Sharded<StatefulRedisMasterSlaveConnection,String> sharded(RedisClient redisClient) {Set<HostAndPort> hostAndPorts=new HashSet<>();hostAndPorts.add(HostAndPort.parse("1xx:26009"));hostAndPorts.add(HostAndPort.parse("1xx:26009"));return new Sharded<>(Arrays.asList("te009","test68","test67"),redisClient,"password",hostAndPorts, new Utf8StringCodec());}
使用方式
//只从slave节点中读取StatefulRedisMasterSlaveConnection redisConnection = (StatefulRedisMasterSlaveConnection) sharded.getConnectionBy("key");//使用异步模式获取缓存值System.out.println(redisConnection.sync().get("key"));
Lettuce在Spring boot中的使用方式相关推荐
- java 方式配置ssm,关于SSM以及Spring boot中对于Spring MVC配置的问题
SSM中 Spring MVC配置 传统的web.xml配置 web.xml contextConfigLocation classpath*:applicationContext.xml org.s ...
- springboot初始化逻辑_详解Spring Boot中初始化资源的几种方式
假设有这么一个需求,要求在项目启动过程中,完成线程池的初始化,加密证书加载等功能,你会怎么做?如果没想好答案,请接着往下看.今天介绍几种在Spring Boot中进行资源初始化的方式,帮助大家解决和回 ...
- RabbitMQ(六)——Spring boot中消费消息的两种方式
前言 上一篇博客中,我们只是简单总结了Spring boot中整合RabbitMQ的操作,针对消息消费的两种方式只是简单给了一个实例,这篇博客,我们进一步总结关于Spring boot消息消费的相关功 ...
- Spring Boot 中的热部署方式总结
1 Spring Boot 的热部署方式 1.模板热部署 在 Spring Boot 中,模板引擎的页面默认是开启缓存的,如果修改了页面的内容,则刷新页面是得不到修改后的页面的,因此我们可以在appl ...
- (转)Spring Boot(三):Spring Boot 中 Redis 的使用
http://www.ityouknow.com/springboot/2016/03/06/spring-boot-redis.html Spring Boot 对常用的数据库支持外,对 Nosql ...
- 工厂方法模式:在 Spring Boot 中的典型应用
大家好,我是CodingLong! 在上一篇<简单工厂模式>中我们了解到,简单工厂模式每次增加新的产品时,都要修改其"工厂类",这违背了开闭原则.而本篇介绍的工厂方法模 ...
- 【spring boot2】第8篇:spring boot 中的 servlet 容器及如何使用war包部署
嵌入式 servlet 容器 在 spring boot 之前的web开发,我们都是把我们的应用部署到 Tomcat 等servelt容器,这些容器一般都会在我们的应用服务器上安装好环境,但是 spr ...
- Spring Boot 中使用 MongoDB 增删改查
本文快速入门,MongoDB 结合SpringBoot starter-data-mongodb 进行增删改查 1.什么是MongoDB ? MongoDB 是由C++语言编写的,是一个基于分布式文件 ...
- Spring Boot 中使用@Async实现异步调用,加速任务执行!
欢迎关注方志朋的博客,回复"666"获面试宝典 什么是"异步调用"?"异步调用"对应的是"同步调用",同步调用指程序按照 ...
- 徒手解密 Spring Boot 中的 Starter自动化配置黑魔法
我们使用 Spring Boot,基本上都是沉醉在它 Stater 的方便之中.Starter 为我们带来了众多的自动化配置,有了这些自动化配置,我们可以不费吹灰之力就能搭建一个生产级开发环境,有的小 ...
最新文章
- go gin框架:Any响应任何请求类型
- android应用程序签名
- mybatis调用oracle过程,使用MyBatis调用Oracle存储过程
- Python通过Zabbix API获得数据
- pytorch损失函数(正在更新中,勿点进来看)
- Vue自定义组件——图片放大器,js点击<img>触发图片放大,富文本内图片点击实现放大器效果
- matlab的基本语法规则_Matlab基本语法与指令
- server2008服务器进不了桌面,解决windows server 2008 r2 登录进入桌面只显示一片蓝色背景...
- ubuntu18字符终端不支持中文问题(汉字菱形)
- 王者荣耀微信登陆不了服务器,王者荣耀微信区怎么登陆不了 王者荣耀微信区怎么登不上...
- mysql 导入 没有数据库文件怎么打开_没有SQL Server数据库时如何打开.MDF文件
- 数据库缓存服务—Redis配置与优化
- 厦门市各公交线路途经站点
- eclipse hana xs 开发环境搭建
- vue引入echarts-liquidfill水滴图并批量动态加载
- 戴尔科技云平台赋能“新基建”,打造云底座
- 电子设计教程12:Buck降压电路
- 基于GEC6818的智能家居系统[完整源码/项目报告/笔记分享]
- 金融数据分析(三)当当网店铺商品爬虫——爬虫类书籍为例:requestsbs4
- [Pytorch框架] PyTorch 中文手册
热门文章
- net_speeder发双倍包加速
- DELPHI 旧控件安装到 DELPHI11 新版环境的操作
- 数据库实验2:简易登录页面设计(c#)
- 搭建web服务器asp网站传马
- nuc8i7beh黑苹果_Intel NUC8i7BEH 黑苹果Hackintosh EFI引导
- csdner: china_jeffery, C++默认构造函数; csdner: thief thief, 什么情况下C++编译器会生成默认的构造函数
- JAva继承编写自行车例子,java – Freemarker中的继承/实例检查
- java图片合成_Java图片处理(一)图片合成
- mysql localhost可以连接,输入ip地址连接访问被拒绝
- Python中的逻辑运算符:‘and‘与‘or‘的用法