关于 spring-data-redis 和 lettuce,笔者写过不少文章:

  • 这个 Redis 连接池的新监控方式针不戳~我再加一点佐料
  • spring-data-redis 连接泄漏,我 TM 人傻了
  • spring-data-redis 动态切换数据源
  • spring-data-redis 上百万的 QPS 压力太大连接失败,我 TM 人傻了

最近,私信还有留言中,网友提到 spring-data-redis 和 lettuce 一起使用,pipeline 通过抓包一看,并没有生效,这个如何配置才能生效呢?

首先,在上面的文章中,我们分析过 Spring-data-redis + Lettuce 的基本原理,在这种环境下 RedisTemplate 使用的连接内部包括:

  • asyncSharedConn:可以为空,如果开启了连接共享,则不为空,默认是开启的;所有 LettuceConnection 共享的 Redis 连接,对于每个 LettuceConnection 实际上都是同一个连接;用于执行简单命令,因为 Netty 客户端与 Redis 的单处理线程特性,共享同一个连接也是很快的。如果没开启连接共享,则这个字段为空,使用 asyncDedicatedConn 执行命令。
  • asyncDedicatedConn:私有连接,如果需要保持会话,执行事务,以及 Pipeline 命令,固定连接,则必须使用这个 asyncDedicatedConn 执行 Redis 命令。

execute(RedisCallback),流程是:

对于 executePipelined(RedisCallback),如果使用正确的话,会使用 asyncDedicatedConn 私有连接执行。那么怎么算使用正确呢?

需要使用回调的连接进行 Redis 调用,不能直接使用 redisTemplate 调用,否则 pipeline 不生效

Pipeline 生效

List<Object> objects = redisTemplate.executePipelined(new RedisCallback<Object>() {@Overridepublic Object doInRedis(RedisConnection connection) throws DataAccessException {connection.get("test".getBytes());connection.get("test2".getBytes());return null;}
});

Pipeline 不生效

List<Object> objects = redisTemplate.executePipelined(new RedisCallback<Object>() {@Overridepublic Object doInRedis(RedisConnection connection) throws DataAccessException {redisTemplate.opsForValue().get("test");redisTemplate.opsForValue().get("test2");return null;}
});

这样我们就能使用保证 API 层正确使用 pipeline 了,但是默认配置的情况下, 底层还是没有执行 Pipeline,这是怎么回事呢?

Redis Pipeline 类比 Lettuce 中的 AutoFlushCommands

Redis Pipeline 是 Redis 中的 批量操作,它能将一组 Redis 命令进行组装,通过一次传输给 Redis 并返回结果集,大大减少了如果命令时一条条单独传输需要的 RTT 时间(包括 Redis 客户端,Redis 服务端切换系统调用发送接收数据的时间,以及网络传输时间)。

如果原来的命令是这么发送的:

Client -> Server: INCR X\r\n
Server -> Client: 1
Client -> Server: INCR X\r\n
Server -> Client: 2
Client -> Server: INCR X\r\n
Server -> Client: 3
Client -> Server: INCR X\r\n
Server -> Client: 4

那么使用 PIPELINE 之后,命令就是类似于这么发送的

Client -> Server: INCR X\r\nINCR X\r\nINCR X\r\nINCR X\r\n
Server -> Client: 1\r\n2\r\n3\r\n4

我们可以看出,其实它的原理,就是客户端先将所有命令拼接在一起然后本地缓存起来,之后统一发到服务端,服务端执行所有命令之后,统一响应。

Lettuce 的连接有一个 AutoFlushCommands 配置,就是指在这个连接上执行的命令,如果发送到服务端。默认是 false,即收到一个命令就发到服务端一个。如果配置为 false,则将所有命令缓存起来,手动调用 flushCommands 的时候,将缓存的命令一起发到服务端,这样其实就是实现了 Pipeline。

配置 Spring-data-redis + Lettuce 使用 Pipeline

Spring-data-redis 从 2.3.0 版本开始,对于 Lettuce 也兼容了 Pipeline 配置,参考:

  • DATAREDIS-1011 - Allow configuration of Lettuce pipelining flush behavior
  • https://github.com/spring-projects/spring-data-redis/issues/1581

我们可以这样配置:

@Bean
public BeanPostProcessor lettuceConnectionFactoryBeanProcessor() {return new BeanPostProcessor() {@Overridepublic Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {//在 LettuceConnectionFactory 这个 Bean 初始化之后,设置 PipeliningFlushPolicy 为 flushOnCloseif (bean instanceof LettuceConnectionFactory) {LettuceConnectionFactory lettuceConnectionFactory = (LettuceConnectionFactory) bean;lettuceConnectionFactory.setPipeliningFlushPolicy(LettuceConnection.PipeliningFlushPolicy.flushOnClose());//多谢评论区的[孤胆枪手](https://juejin.cn/user/2084329775180605) 的指正,我之前忘了放这个配置了lettuceConnectionFactory.setShareNativeConnection(false);}return bean;}};
}

注意这里将 shareNativeConnection 设置为 false。本来基于 Lettuce 的 RedisTemplate 中大部分请求都可以通过共享连接使用同一个连接,关闭的话每次都获取的是独占连接。这种情况下我们要注意使用连接池(防止每次创建新连接),同时连接池需要大于可能的并发线程个数防止阻塞等待连接。

为啥要关闭这个共享链接呢,参考源码:

RedisClusterAsyncCommands<byte[], byte[]> getAsyncConnection() {//Redis事务才是 trueif (this.isQueueing()) {return this.getAsyncDedicatedConnection();} else {//如果有共享连接则返回共享连接,否则返回独占连接,只有独占连接 PipeliningFlushPolicy 才会生效,PipeliningFlushPolicy 不会修改共享连接return (RedisClusterAsyncCommands)(this.asyncSharedConn != null && this.asyncSharedConn instanceof StatefulRedisConnection ? ((StatefulRedisConnection)this.asyncSharedConn).async() : this.getAsyncDedicatedConnection());}
}

由于我们要使用 PipeliningFlushPolicy,所以需要这里返回独占连接,也就不能打开共享连接。

我们来看下这个 PipeliningFlushPolicy 的源码就知道这个 flushOnClose 的意义:

public interface PipeliningFlushPolicy {//其实就是默认的每个命令都直接发到 Redis Serverstatic PipeliningFlushPolicy flushEachCommand() {return FlushEachCommand.INSTANCE;}//在连接关闭的时候,将命令一起发到 Redisstatic PipeliningFlushPolicy flushOnClose() {return FlushOnClose.INSTANCE;}//手动设置在多少条命令之后,统一发到 Redis,但是同样的,连接关闭的时候也会发到 Redisstatic PipeliningFlushPolicy buffered(int bufferSize) {return () -> new BufferedFlushing(bufferSize);}
}

这三个类也都实现了 PipeliningFlushState 接口:

public interface PipeliningFlushState {//对于 executePipelined,刚开始就会调用 connection.openPipeline(); 开启 pipeline,里面会调用这个方法void onOpen(StatefulConnection<?, ?> connection);//对于 executePipelined 中的每个命令都会调用这个方法void onCommand(StatefulConnection<?, ?> connection);//在 executePipelined 的最后会调用 connection.closePipeline(),里面会调用这个方法void onClose(StatefulConnection<?, ?> connection);
}

默认的每个命令都直接发到 Redis Server 的实现是:其实就是方法里什么都不做。

private enum FlushEachCommand implements PipeliningFlushPolicy, PipeliningFlushState {INSTANCE;@Overridepublic PipeliningFlushState newPipeline() {return INSTANCE;}@Overridepublic void onOpen(StatefulConnection<?, ?> connection) {}@Overridepublic void onCommand(StatefulConnection<?, ?> connection) {}@Overridepublic void onClose(StatefulConnection<?, ?> connection) {}
}

对于 flushOnClose:

private enum FlushOnClose implements PipeliningFlushPolicy, PipeliningFlushState {INSTANCE;@Overridepublic PipeliningFlushState newPipeline() {return INSTANCE;}@Overridepublic void onOpen(StatefulConnection<?, ?> connection) {//首先配置连接的 AutoFlushCommands 为 false,这样命令就不会立刻发到 Redisconnection.setAutoFlushCommands(false);}@Overridepublic void onCommand(StatefulConnection<?, ?> connection) {//收到命令时什么都不做}@Overridepublic void onClose(StatefulConnection<?, ?> connection) {//在 pipeline 关闭的时候发送所有命令connection.flushCommands();//恢复默认配置,这样连接如果退回连接池不会影响后续使用connection.setAutoFlushCommands(true);}
}

对于 buffered:

private static class BufferedFlushing implements PipeliningFlushState {private final AtomicLong commands = new AtomicLong();private final int flushAfter;public BufferedFlushing(int flushAfter) {this.flushAfter = flushAfter;}@Overridepublic void onOpen(StatefulConnection<?, ?> connection) {//首先配置连接的 AutoFlushCommands 为 false,这样命令就不会立刻发到 Redisconnection.setAutoFlushCommands(false);}@Overridepublic void onCommand(StatefulConnection<?, ?> connection) {//如果命令达到指定个数,就发到 Redisif (commands.incrementAndGet() % flushAfter == 0) {connection.flushCommands();}}@Overridepublic void onClose(StatefulConnection<?, ?> connection) {//在 pipeline 关闭的时候发送所有命令connection.flushCommands();//恢复默认配置,这样连接如果退回连接池不会影响后续使用connection.setAutoFlushCommands(true);}
}

微信搜索“我的编程喵”关注公众号,每日一刷,轻松提升技术,斩获各种offer

Spring-data-redis + Lettuce 如何使用 Pipeline相关推荐

  1. Redis - Spring Data Redis 操作 Jedis 、Lettuce 、 Redisson

    文章目录 官网 Jedis VS Lettuce Jedis Code POM依赖 配置文件 配置类 单元测试 Lettuce Code Redisson Code POM依赖 配置文件 配置类 单元 ...

  2. Java Spring Data Redis实战与配置参数详解 application.properties...

    Redis作为开源分布式高并发缓存,使用范围非常广泛,主流互联网公司几乎都在使用. Java Spring Boot 2.0实战开发Redis缓存可以参考下面的步骤,Redis安装可以直接使用Linu ...

  3. 一文搞定 Spring Data Redis 详解及实战

    转载自  一文搞定 Spring Data Redis 详解及实战 SDR - Spring Data Redis的简称. Spring Data Redis提供了从Spring应用程序轻松配置和访问 ...

  4. Spring Data Redis:Sentinel的高可用性

    1.概述 为了使Redis具有高可用性,我们可以使用Spring Data Redis对Redis Sentinel的支持. 借助Sentinel,我们可以创建自动抵御某些故障的Redis部署. Re ...

  5. Spring Data Redis 让 NoSQL 快如闪电(2)

    2019独角兽企业重金招聘Python工程师标准>>> 把 Redis 当作数据库的用例 现在我们来看看在服务器端 Java 企业版系统中把 Redis 当作数据库的各种用法吧.无论 ...

  6. 使用客户端jedis时报错Could not get a resource from the pool 以及使用Spring Data Redis报错解决方法

    一.Jedis 报错 今天在使用jedis时,一直报错 Could not get a resource from the pool 在网上找了好多解决的方法,并且找了半天错误,才发现是我的启动方式有 ...

  7. Spring Data Redis 多源

    完整代码:Ciiiiing/springboot_multi_redis 最近需要在同一个项目中访问多个 redis 而 spring data redis 默认是只支持一个数据源的,那就需要我们自己 ...

  8. Spring Data Redis 正确使用姿势

    课程简介 本课程主要讲解常规 Redis 的写法,Redis 和 Spring 的结合使用,即 Spring Data Redis,以及 Redis 在工作中的正确使用姿势,Redis 和 Sprin ...

  9. Java Spring Data Redis实战与配置参数详解 application.properties

    Redis作为开源分布式高并发缓存,使用范围非常广泛,主流互联网公司几乎都在使用. Java Spring Boot 2.0实战开发Redis缓存可以参考下面的步骤,Redis安装可以直接使用Linu ...

  10. Spring认证中国教育管理中心-Spring Data Redis框架教程三

    原标题:Spring认证中国教育管理中心-Spring Data Redis框架教程三 10.15.支持类 Packageorg.springframework.data.redis.support提 ...

最新文章

  1. sql server 2000 版本查询
  2. python之路---装饰器函数
  3. 《数据库SQL实战》查找所有员工的last_name和first_name以及对应部门编号dept_no,也包括展示没有分配具体部门的员工
  4. spring属性注入
  5. c语言long double位数,int long double 所占位数 和最大值
  6. 现有php环境下安装memcached并测试(centos6.4系统64位)
  7. 实现简单的python爬虫功能
  8. 电影院售票系统,电影院订票系统,电影院购票管理系统计算机毕业设计
  9. 《模式分类》第二版 第二章课后编程题
  10. C语言符号意思(看了必懂系列)
  11. 游戏理论之Shapley Value(Shapley值)
  12. win7万能声卡驱动_驱动精灵万能网卡版下载-驱动精灵万能网卡版免费下载
  13. R语言ggplot2边框背景去除
  14. Serenity框架官方文档翻译(1-2开始、安装和界面)
  15. Function ‘MseLossBackward0‘ returned nan values in its 0th output.
  16. ecshop小京东短信接口插件修改-v41,42,43,50+图片说明
  17. 教师计算机考试模块有哪些内容有哪些内容,教师资格考试信息与信息技术模块知识点...
  18. mysql动态变量_Mysql的变量一览
  19. goods购物表MySQL的代码_第一节:数据库及表
  20. Common Vector Operators(常见的向量操作)

热门文章

  1. LT8619C中文简介
  2. 巴可放映机服务器型号怎么看,巴可放映机根据影片怎么选择通道,JPEG和MPEG分别是什么,...
  3. bc劫持流量_seo流量劫持
  4. 【架构师(第五十篇)】 服务端开发之自动发布到测试机
  5. InnoDB的redo日志管理---饶珑辉
  6. 安全、环保的随身“数据库”——体验希捷锦系列移动硬盘
  7. Compose Icon
  8. mysql select 时间格式_Mysql格式化日期
  9. JVM内存模型->程序计算器
  10. splash官方文档解读(翻译)