Spring-data-redis + Lettuce 如何使用 Pipeline
关于 spring-data-redis 和 lettuce,笔者写过不少文章:
- 这个 Redis 连接池的新监控方式针不戳~我再加一点佐料
- spring-data-redis 连接泄漏,我 TM 人傻了
- spring-data-redis 动态切换数据源
- spring-data-redis 上百万的 QPS 压力太大连接失败,我 TM 人傻了
最近,私信还有留言中,网友提到 spring-data-redis 和 lettuce 一起使用,pipeline 通过抓包一看,并没有生效,这个如何配置才能生效呢?
首先,在上面的文章中,我们分析过 Spring-data-redis + Lettuce 的基本原理,在这种环境下 RedisTemplate 使用的连接内部包括:
- asyncSharedConn:可以为空,如果开启了连接共享,则不为空,默认是开启的;所有 LettuceConnection 共享的 Redis 连接,对于每个 LettuceConnection 实际上都是同一个连接;用于执行简单命令,因为 Netty 客户端与 Redis 的单处理线程特性,共享同一个连接也是很快的。如果没开启连接共享,则这个字段为空,使用 asyncDedicatedConn 执行命令。
- asyncDedicatedConn:私有连接,如果需要保持会话,执行事务,以及 Pipeline 命令,固定连接,则必须使用这个 asyncDedicatedConn 执行 Redis 命令。
execute(RedisCallback)
,流程是:
对于 executePipelined(RedisCallback)
,如果使用正确的话,会使用 asyncDedicatedConn
私有连接执行。那么怎么算使用正确呢?
需要使用回调的连接进行 Redis 调用,不能直接使用 redisTemplate
调用,否则 pipeline 不生效:
Pipeline 生效:
List<Object> objects = redisTemplate.executePipelined(new RedisCallback<Object>() {@Overridepublic Object doInRedis(RedisConnection connection) throws DataAccessException {connection.get("test".getBytes());connection.get("test2".getBytes());return null;}
});
Pipeline 不生效:
List<Object> objects = redisTemplate.executePipelined(new RedisCallback<Object>() {@Overridepublic Object doInRedis(RedisConnection connection) throws DataAccessException {redisTemplate.opsForValue().get("test");redisTemplate.opsForValue().get("test2");return null;}
});
这样我们就能使用保证 API 层正确使用 pipeline 了,但是默认配置的情况下, 底层还是没有执行 Pipeline,这是怎么回事呢?
Redis Pipeline 类比 Lettuce 中的 AutoFlushCommands
Redis Pipeline 是 Redis 中的 批量操作,它能将一组 Redis 命令进行组装,通过一次传输给 Redis 并返回结果集,大大减少了如果命令时一条条单独传输需要的 RTT 时间(包括 Redis 客户端,Redis 服务端切换系统调用发送接收数据的时间,以及网络传输时间)。
如果原来的命令是这么发送的:
Client -> Server: INCR X\r\n
Server -> Client: 1
Client -> Server: INCR X\r\n
Server -> Client: 2
Client -> Server: INCR X\r\n
Server -> Client: 3
Client -> Server: INCR X\r\n
Server -> Client: 4
那么使用 PIPELINE 之后,命令就是类似于这么发送的
Client -> Server: INCR X\r\nINCR X\r\nINCR X\r\nINCR X\r\n
Server -> Client: 1\r\n2\r\n3\r\n4
我们可以看出,其实它的原理,就是客户端先将所有命令拼接在一起然后本地缓存起来,之后统一发到服务端,服务端执行所有命令之后,统一响应。
Lettuce 的连接有一个 AutoFlushCommands 配置,就是指在这个连接上执行的命令,如果发送到服务端。默认是 false,即收到一个命令就发到服务端一个。如果配置为 false,则将所有命令缓存起来,手动调用 flushCommands 的时候,将缓存的命令一起发到服务端,这样其实就是实现了 Pipeline。
配置 Spring-data-redis + Lettuce 使用 Pipeline
Spring-data-redis 从 2.3.0 版本开始,对于 Lettuce 也兼容了 Pipeline 配置,参考:
- DATAREDIS-1011 - Allow configuration of Lettuce pipelining flush behavior
- https://github.com/spring-projects/spring-data-redis/issues/1581
我们可以这样配置:
@Bean
public BeanPostProcessor lettuceConnectionFactoryBeanProcessor() {return new BeanPostProcessor() {@Overridepublic Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {//在 LettuceConnectionFactory 这个 Bean 初始化之后,设置 PipeliningFlushPolicy 为 flushOnCloseif (bean instanceof LettuceConnectionFactory) {LettuceConnectionFactory lettuceConnectionFactory = (LettuceConnectionFactory) bean;lettuceConnectionFactory.setPipeliningFlushPolicy(LettuceConnection.PipeliningFlushPolicy.flushOnClose());//多谢评论区的[孤胆枪手](https://juejin.cn/user/2084329775180605) 的指正,我之前忘了放这个配置了lettuceConnectionFactory.setShareNativeConnection(false);}return bean;}};
}
注意这里将 shareNativeConnection 设置为 false。本来基于 Lettuce 的 RedisTemplate 中大部分请求都可以通过共享连接使用同一个连接,关闭的话每次都获取的是独占连接。这种情况下我们要注意使用连接池(防止每次创建新连接),同时连接池需要大于可能的并发线程个数防止阻塞等待连接。
为啥要关闭这个共享链接呢,参考源码:
RedisClusterAsyncCommands<byte[], byte[]> getAsyncConnection() {//Redis事务才是 trueif (this.isQueueing()) {return this.getAsyncDedicatedConnection();} else {//如果有共享连接则返回共享连接,否则返回独占连接,只有独占连接 PipeliningFlushPolicy 才会生效,PipeliningFlushPolicy 不会修改共享连接return (RedisClusterAsyncCommands)(this.asyncSharedConn != null && this.asyncSharedConn instanceof StatefulRedisConnection ? ((StatefulRedisConnection)this.asyncSharedConn).async() : this.getAsyncDedicatedConnection());}
}
由于我们要使用 PipeliningFlushPolicy,所以需要这里返回独占连接,也就不能打开共享连接。
我们来看下这个 PipeliningFlushPolicy 的源码就知道这个 flushOnClose 的意义:
public interface PipeliningFlushPolicy {//其实就是默认的每个命令都直接发到 Redis Serverstatic PipeliningFlushPolicy flushEachCommand() {return FlushEachCommand.INSTANCE;}//在连接关闭的时候,将命令一起发到 Redisstatic PipeliningFlushPolicy flushOnClose() {return FlushOnClose.INSTANCE;}//手动设置在多少条命令之后,统一发到 Redis,但是同样的,连接关闭的时候也会发到 Redisstatic PipeliningFlushPolicy buffered(int bufferSize) {return () -> new BufferedFlushing(bufferSize);}
}
这三个类也都实现了 PipeliningFlushState
接口:
public interface PipeliningFlushState {//对于 executePipelined,刚开始就会调用 connection.openPipeline(); 开启 pipeline,里面会调用这个方法void onOpen(StatefulConnection<?, ?> connection);//对于 executePipelined 中的每个命令都会调用这个方法void onCommand(StatefulConnection<?, ?> connection);//在 executePipelined 的最后会调用 connection.closePipeline(),里面会调用这个方法void onClose(StatefulConnection<?, ?> connection);
}
默认的每个命令都直接发到 Redis Server 的实现是:其实就是方法里什么都不做。
private enum FlushEachCommand implements PipeliningFlushPolicy, PipeliningFlushState {INSTANCE;@Overridepublic PipeliningFlushState newPipeline() {return INSTANCE;}@Overridepublic void onOpen(StatefulConnection<?, ?> connection) {}@Overridepublic void onCommand(StatefulConnection<?, ?> connection) {}@Overridepublic void onClose(StatefulConnection<?, ?> connection) {}
}
对于 flushOnClose:
private enum FlushOnClose implements PipeliningFlushPolicy, PipeliningFlushState {INSTANCE;@Overridepublic PipeliningFlushState newPipeline() {return INSTANCE;}@Overridepublic void onOpen(StatefulConnection<?, ?> connection) {//首先配置连接的 AutoFlushCommands 为 false,这样命令就不会立刻发到 Redisconnection.setAutoFlushCommands(false);}@Overridepublic void onCommand(StatefulConnection<?, ?> connection) {//收到命令时什么都不做}@Overridepublic void onClose(StatefulConnection<?, ?> connection) {//在 pipeline 关闭的时候发送所有命令connection.flushCommands();//恢复默认配置,这样连接如果退回连接池不会影响后续使用connection.setAutoFlushCommands(true);}
}
对于 buffered:
private static class BufferedFlushing implements PipeliningFlushState {private final AtomicLong commands = new AtomicLong();private final int flushAfter;public BufferedFlushing(int flushAfter) {this.flushAfter = flushAfter;}@Overridepublic void onOpen(StatefulConnection<?, ?> connection) {//首先配置连接的 AutoFlushCommands 为 false,这样命令就不会立刻发到 Redisconnection.setAutoFlushCommands(false);}@Overridepublic void onCommand(StatefulConnection<?, ?> connection) {//如果命令达到指定个数,就发到 Redisif (commands.incrementAndGet() % flushAfter == 0) {connection.flushCommands();}}@Overridepublic void onClose(StatefulConnection<?, ?> connection) {//在 pipeline 关闭的时候发送所有命令connection.flushCommands();//恢复默认配置,这样连接如果退回连接池不会影响后续使用connection.setAutoFlushCommands(true);}
}
微信搜索“我的编程喵”关注公众号,每日一刷,轻松提升技术,斩获各种offer:
Spring-data-redis + Lettuce 如何使用 Pipeline相关推荐
- Redis - Spring Data Redis 操作 Jedis 、Lettuce 、 Redisson
文章目录 官网 Jedis VS Lettuce Jedis Code POM依赖 配置文件 配置类 单元测试 Lettuce Code Redisson Code POM依赖 配置文件 配置类 单元 ...
- Java Spring Data Redis实战与配置参数详解 application.properties...
Redis作为开源分布式高并发缓存,使用范围非常广泛,主流互联网公司几乎都在使用. Java Spring Boot 2.0实战开发Redis缓存可以参考下面的步骤,Redis安装可以直接使用Linu ...
- 一文搞定 Spring Data Redis 详解及实战
转载自 一文搞定 Spring Data Redis 详解及实战 SDR - Spring Data Redis的简称. Spring Data Redis提供了从Spring应用程序轻松配置和访问 ...
- Spring Data Redis:Sentinel的高可用性
1.概述 为了使Redis具有高可用性,我们可以使用Spring Data Redis对Redis Sentinel的支持. 借助Sentinel,我们可以创建自动抵御某些故障的Redis部署. Re ...
- Spring Data Redis 让 NoSQL 快如闪电(2)
2019独角兽企业重金招聘Python工程师标准>>> 把 Redis 当作数据库的用例 现在我们来看看在服务器端 Java 企业版系统中把 Redis 当作数据库的各种用法吧.无论 ...
- 使用客户端jedis时报错Could not get a resource from the pool 以及使用Spring Data Redis报错解决方法
一.Jedis 报错 今天在使用jedis时,一直报错 Could not get a resource from the pool 在网上找了好多解决的方法,并且找了半天错误,才发现是我的启动方式有 ...
- Spring Data Redis 多源
完整代码:Ciiiiing/springboot_multi_redis 最近需要在同一个项目中访问多个 redis 而 spring data redis 默认是只支持一个数据源的,那就需要我们自己 ...
- Spring Data Redis 正确使用姿势
课程简介 本课程主要讲解常规 Redis 的写法,Redis 和 Spring 的结合使用,即 Spring Data Redis,以及 Redis 在工作中的正确使用姿势,Redis 和 Sprin ...
- Java Spring Data Redis实战与配置参数详解 application.properties
Redis作为开源分布式高并发缓存,使用范围非常广泛,主流互联网公司几乎都在使用. Java Spring Boot 2.0实战开发Redis缓存可以参考下面的步骤,Redis安装可以直接使用Linu ...
- Spring认证中国教育管理中心-Spring Data Redis框架教程三
原标题:Spring认证中国教育管理中心-Spring Data Redis框架教程三 10.15.支持类 Packageorg.springframework.data.redis.support提 ...
最新文章
- sql server 2000 版本查询
- python之路---装饰器函数
- 《数据库SQL实战》查找所有员工的last_name和first_name以及对应部门编号dept_no,也包括展示没有分配具体部门的员工
- spring属性注入
- c语言long double位数,int long double 所占位数 和最大值
- 现有php环境下安装memcached并测试(centos6.4系统64位)
- 实现简单的python爬虫功能
- 电影院售票系统,电影院订票系统,电影院购票管理系统计算机毕业设计
- 《模式分类》第二版 第二章课后编程题
- C语言符号意思(看了必懂系列)
- 游戏理论之Shapley Value(Shapley值)
- win7万能声卡驱动_驱动精灵万能网卡版下载-驱动精灵万能网卡版免费下载
- R语言ggplot2边框背景去除
- Serenity框架官方文档翻译(1-2开始、安装和界面)
- Function ‘MseLossBackward0‘ returned nan values in its 0th output.
- ecshop小京东短信接口插件修改-v41,42,43,50+图片说明
- 教师计算机考试模块有哪些内容有哪些内容,教师资格考试信息与信息技术模块知识点...
- mysql动态变量_Mysql的变量一览
- goods购物表MySQL的代码_第一节:数据库及表
- Common Vector Operators(常见的向量操作)