本文主要研究一下lettuce的shareNativeConnection参数

LettuceConnectionFactory

spring-data-redis-2.0.10.RELEASE-sources.jar!/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactory.java

public class LettuceConnectionFactoryimplements InitializingBean, DisposableBean, RedisConnectionFactory, ReactiveRedisConnectionFactory {private static final ExceptionTranslationStrategy EXCEPTION_TRANSLATION = new PassThroughExceptionTranslationStrategy(LettuceConverters.exceptionConverter());private final Log log = LogFactory.getLog(getClass());private final LettuceClientConfiguration clientConfiguration;private @Nullable AbstractRedisClient client;private @Nullable LettuceConnectionProvider connectionProvider;private @Nullable LettuceConnectionProvider reactiveConnectionProvider;private boolean validateConnection = false;private boolean shareNativeConnection = true;private @Nullable SharedConnection<byte[]> connection;private @Nullable SharedConnection<ByteBuffer> reactiveConnection;private @Nullable LettucePool pool;/** Synchronization monitor for the shared Connection */private final Object connectionMonitor = new Object();private boolean convertPipelineAndTxResults = true;private RedisStandaloneConfiguration standaloneConfig = new RedisStandaloneConfiguration("localhost", 6379);private @Nullable RedisSentinelConfiguration sentinelConfiguration;private @Nullable RedisClusterConfiguration clusterConfiguration;private @Nullable ClusterCommandExecutor clusterCommandExecutor;//......@Overridepublic LettuceReactiveRedisConnection getReactiveConnection() {return getShareNativeConnection()? new LettuceReactiveRedisConnection(getSharedReactiveConnection(), reactiveConnectionProvider): new LettuceReactiveRedisConnection(reactiveConnectionProvider);}@Overridepublic LettuceReactiveRedisClusterConnection getReactiveClusterConnection() {if (!isClusterAware()) {throw new InvalidDataAccessApiUsageException("Cluster is not configured!");}RedisClusterClient client = (RedisClusterClient) this.client;return getShareNativeConnection()? new LettuceReactiveRedisClusterConnection(getSharedReactiveConnection(), reactiveConnectionProvider, client): new LettuceReactiveRedisClusterConnection(reactiveConnectionProvider, client);}/*** Indicates if multiple {@link LettuceConnection}s should share a single native connection.** @return native connection shared.*/public boolean getShareNativeConnection() {return shareNativeConnection;}/*** @return the shared connection using {@link ByteBuffer} encoding for reactive API use. {@literal null} if*         {@link #getShareNativeConnection() connection sharing} is disabled.* @since 2.0.1*/@Nullableprotected StatefulConnection<ByteBuffer, ByteBuffer> getSharedReactiveConnection() {return shareNativeConnection ? getOrCreateSharedReactiveConnection().getConnection() : null;}private SharedConnection<ByteBuffer> getOrCreateSharedReactiveConnection() {synchronized (this.connectionMonitor) {if (this.reactiveConnection == null) {this.reactiveConnection = new SharedConnection<>(reactiveConnectionProvider, true);}return this.reactiveConnection;}}
}
复制代码
  • 可以看到这里的shareNativeConnection默认为true,表示多个LettuceConnection将共享一个native connection
  • 如果该值为true,则getReactiveConnection及getReactiveClusterConnection方法使用的是getSharedReactiveConnection
  • getSharedReactiveConnection在shareNativeConnection为true的时候,调用的是getOrCreateSharedReactiveConnection().getConnection()

LettuceConnectionFactory.SharedConnection

spring-data-redis-2.0.10.RELEASE-sources.jar!/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactory.java

   /*** Wrapper for shared connections. Keeps track of the connection lifecycleThe wrapper is thread-safe as it* synchronizes concurrent calls by blocking.** @param <E> connection encoding.* @author Mark Paluch* @author Christoph Strobl* @since 2.1*/@RequiredArgsConstructorclass SharedConnection<E> {private final LettuceConnectionProvider connectionProvider;private final boolean shareNativeClusterConnection;/** Synchronization monitor for the shared Connection */private final Object connectionMonitor = new Object();private @Nullable StatefulConnection<E, E> connection;/*** Returns a valid Lettuce connection. Initializes and validates the connection if* {@link #setValidateConnection(boolean) enabled}.** @return the connection.*/@NullableStatefulConnection<E, E> getConnection() {synchronized (this.connectionMonitor) {if (this.connection == null) {this.connection = getNativeConnection();}if (getValidateConnection()) {validateConnection();}return this.connection;}}/*** Obtain a connection from the associated {@link LettuceConnectionProvider}.** @return the connection.*/@Nullableprivate StatefulConnection<E, E> getNativeConnection() {try {if (isClusterAware() && !shareNativeClusterConnection) {return null;}StatefulConnection<E, E> connection = connectionProvider.getConnection(StatefulConnection.class);if (connection instanceof StatefulRedisConnection && getDatabase() > 0) {((StatefulRedisConnection) connection).sync().select(getDatabase());}return connection;} catch (RedisException e) {throw new RedisConnectionFailureException("Unable to connect to Redis", e);}}/*** Validate the connection. Invalid connections will be closed and the connection state will be reset.*/void validateConnection() {synchronized (this.connectionMonitor) {boolean valid = false;if (connection != null && connection.isOpen()) {try {if (connection instanceof StatefulRedisConnection) {((StatefulRedisConnection) connection).sync().ping();}if (connection instanceof StatefulRedisClusterConnection) {((StatefulRedisConnection) connection).sync().ping();}valid = true;} catch (Exception e) {log.debug("Validation failed", e);}}if (!valid) {if (connection != null) {connectionProvider.release(connection);}log.warn("Validation of shared connection failed. Creating a new connection.");resetConnection();this.connection = getNativeConnection();}}}/*** Reset the underlying shared Connection, to be reinitialized on next access.*/void resetConnection() {synchronized (this.connectionMonitor) {if (this.connection != null) {this.connectionProvider.release(this.connection);}this.connection = null;}}}
复制代码
  • 要注意这里维护了StatefulConnection,第一个为null的时候,才调用getNativeConnection去获取
  • 另外要注意,这里的getValidateConnection,默认是false的,也就是说只要connection不为null,就不会归还,每次用同一个connection
  • 如果开启validate的话,每次get的时候都会validate一下,而其validate方法不仅判断isOpen,还判断ping,如果超时等,则会将连接释/归还,再重新获取一次(如果使用连接池的话,则重新borrow一次)
  • 这里的validateConnection方法有点问题,调用了两次connectionProvider.release(connection)

LettucePoolingConnectionProvider.release

spring-data-redis-2.0.10.RELEASE-sources.jar!/org/springframework/data/redis/connection/lettuce/LettucePoolingConnectionProvider.java

    @Overridepublic void release(StatefulConnection<?, ?> connection) {GenericObjectPool<StatefulConnection<?, ?>> pool = poolRef.remove(connection);if (pool == null) {throw new PoolException("Returned connection " + connection+ " was either previously returned or does not belong to this connection provider");}pool.returnObject(connection);}
复制代码
  • 第二次remove同一个connection的时候,pool为null,然后抛出PoolException,来不及执行returnObject方法

ConnectionWatchdog

lettuce-core-5.0.5.RELEASE-sources.jar!/io/lettuce/core/protocol/ConnectionWatchdog.java

/*** A netty {@link ChannelHandler} responsible for monitoring the channel and reconnecting when the connection is lost.** @author Will Glozer* @author Mark Paluch* @author Koji Lin*/
@ChannelHandler.Sharable
public class ConnectionWatchdog extends ChannelInboundHandlerAdapter {//......@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {logger.debug("{} channelInactive()", logPrefix());if (!armed) {logger.debug("{} ConnectionWatchdog not armed", logPrefix());return;}channel = null;if (listenOnChannelInactive && !reconnectionHandler.isReconnectSuspended()) {scheduleReconnect();} else {logger.debug("{} Reconnect scheduling disabled", logPrefix(), ctx);}super.channelInactive(ctx);}/*** Schedule reconnect if channel is not available/not active.*/public void scheduleReconnect() {logger.debug("{} scheduleReconnect()", logPrefix());if (!isEventLoopGroupActive()) {logger.debug("isEventLoopGroupActive() == false");return;}if (!isListenOnChannelInactive()) {logger.debug("Skip reconnect scheduling, listener disabled");return;}if ((channel == null || !channel.isActive()) && reconnectSchedulerSync.compareAndSet(false, true)) {attempts++;final int attempt = attempts;int timeout = (int) reconnectDelay.createDelay(attempt).toMillis();logger.debug("{} Reconnect attempt {}, delay {}ms", logPrefix(), attempt, timeout);this.reconnectScheduleTimeout = timer.newTimeout(it -> {reconnectScheduleTimeout = null;if (!isEventLoopGroupActive()) {logger.warn("Cannot execute scheduled reconnect timer, reconnect workers are terminated");return;}reconnectWorkers.submit(() -> {ConnectionWatchdog.this.run(attempt);return null;});}, timeout, TimeUnit.MILLISECONDS);// Set back to null when ConnectionWatchdog#run runs earlier than reconnectScheduleTimeout's assignment.if (!reconnectSchedulerSync.get()) {reconnectScheduleTimeout = null;}} else {logger.debug("{} Skipping scheduleReconnect() because I have an active channel", logPrefix());}}/*** Reconnect to the remote address that the closed channel was connected to. This creates a new {@link ChannelPipeline} with* the same handler instances contained in the old channel's pipeline.** @param attempt attempt counter** @throws Exception when reconnection fails.*/public void run(int attempt) throws Exception {reconnectSchedulerSync.set(false);reconnectScheduleTimeout = null;if (!isEventLoopGroupActive()) {logger.debug("isEventLoopGroupActive() == false");return;}if (!isListenOnChannelInactive()) {logger.debug("Skip reconnect scheduling, listener disabled");return;}if (isReconnectSuspended()) {logger.debug("Skip reconnect scheduling, reconnect is suspended");return;}boolean shouldLog = shouldLog();InternalLogLevel infoLevel = InternalLogLevel.INFO;InternalLogLevel warnLevel = InternalLogLevel.WARN;if (shouldLog) {lastReconnectionLogging = System.currentTimeMillis();} else {warnLevel = InternalLogLevel.DEBUG;infoLevel = InternalLogLevel.DEBUG;}InternalLogLevel warnLevelToUse = warnLevel;try {reconnectionListener.onReconnect(new ConnectionEvents.Reconnect(attempt));logger.log(infoLevel, "Reconnecting, last destination was {}", remoteAddress);ChannelFuture future = reconnectionHandler.reconnect();future.addListener(it -> {if (it.isSuccess() || it.cause() == null) {return;}Throwable throwable = it.cause();if (ReconnectionHandler.isExecutionException(throwable)) {logger.log(warnLevelToUse, "Cannot reconnect: {}", throwable.toString());} else {logger.log(warnLevelToUse, "Cannot reconnect: {}", throwable.toString(), throwable);}if (!isReconnectSuspended()) {scheduleReconnect();}});} catch (Exception e) {logger.log(warnLevel, "Cannot reconnect: {}", e.toString());}}
}
复制代码
  • 这个ConnectionWatchdog专门用来处理被异常close掉的channel,然后定时重连
  • 重连采用的是ReconnectionHandler.reconnect方法

ReconnectionHandler.reconnect

lettuce-core-5.0.5.RELEASE-sources.jar!/io/lettuce/core/protocol/ReconnectionHandler.java

class ReconnectionHandler {//....../*** Initiate reconnect and return a {@link ChannelFuture} for synchronization. The resulting future either succeeds or fails.* It can be {@link ChannelFuture#cancel(boolean) canceled} to interrupt reconnection and channel initialization. A failed* {@link ChannelFuture} will close the channel.** @return reconnect {@link ChannelFuture}.*/protected ChannelFuture reconnect() {SocketAddress remoteAddress = socketAddressSupplier.get();logger.debug("Reconnecting to Redis at {}", remoteAddress);ChannelFuture connectFuture = bootstrap.connect(remoteAddress);ChannelPromise initFuture = connectFuture.channel().newPromise();initFuture.addListener((ChannelFuture it) -> {if (it.cause() != null) {connectFuture.cancel(true);close(it.channel());}});connectFuture.addListener((ChannelFuture it) -> {if (it.cause() != null) {initFuture.tryFailure(it.cause());return;}ChannelPipeline pipeline = it.channel().pipeline();RedisChannelInitializer channelInitializer = pipeline.get(RedisChannelInitializer.class);if (channelInitializer == null) {initFuture.tryFailure(new IllegalStateException("Reconnection attempt without a RedisChannelInitializer in the channel pipeline"));return;}channelInitializer.channelInitialized().whenComplete((state, throwable) -> {if (throwable != null) {if (isExecutionException(throwable)) {initFuture.tryFailure(throwable);return;}if (clientOptions.isCancelCommandsOnReconnectFailure()) {connectionFacade.reset();}if (clientOptions.isSuspendReconnectOnProtocolFailure()) {logger.error("Disabling autoReconnect due to initialization failure", throwable);setReconnectSuspended(true);}initFuture.tryFailure(throwable);return;}if (logger.isDebugEnabled()) {logger.info("Reconnected to {}, Channel {}", remoteAddress,ChannelLogDescriptor.logDescriptor(it.channel()));} else {logger.info("Reconnected to {}", remoteAddress);}initFuture.trySuccess();});});Runnable timeoutAction = () -> {initFuture.tryFailure(new TimeoutException(String.format("Reconnection attempt exceeded timeout of %d %s ",timeout, timeoutUnit)));};Timeout timeoutHandle = timer.newTimeout(it -> {if (connectFuture.isDone() && initFuture.isDone()) {return;}if (reconnectWorkers.isShutdown()) {timeoutAction.run();return;}reconnectWorkers.submit(timeoutAction);}, this.timeout, timeoutUnit);initFuture.addListener(it -> timeoutHandle.cancel());return this.currentFuture = initFuture;}
}
复制代码
  • ReconnectionHandler.reconnect就是基于netty来进行重新连接,连接失败
  • 如果重连成功,则cancel掉timeoutHandle,否则就一直延时重试

小结

  • lettuce默认的shareNativeConnection参数为true,且validateConnection为false
  • 如果使用线程池,则默认是borrow一次,之后就一直复用,不归还,但是对于docker pause的场景不能有效识别,一直报command timeout
  • 对于不归还的shareNativeConnection,lettuce有个ConnectionWatchdog进行不断重连处理
  • 如果validateConnection为true,则每次get连接的时候会进行校验,校验失败理论上则会归还到连接池,然后重新连接获取一个新的nativeConnection(建立连接不成功连接池那里会抛出org.springframework.data.redis.connection.PoolException: Could not get a resource from the pool; nested exception is io.lettuce.core.RedisConnectionException: Unable to connect to 192.168.99.100:6379)

不过由于LettuceConnectionFactory.SharedConnection的validateConnection方法在校验失败时,重复调用connectionProvider.release(connection),导致抛出org.springframework.data.redis.connection.PoolException: Returned connection io.lettuce.core.StatefulRedisConnectionImpl@1e4ad4a was either previously returned or does not belong to this connection provider异常

doc

  • Connection-Pooling

聊聊lettuce的shareNativeConnection参数相关推荐

  1. 简单聊聊FPGA的一些参数

    笔者:E林1010 在上一篇中,我们已经知道了,FPGA的几个主流厂家和其中Intel家族中FPGA的系列的分类. 上一篇文章链接: https://mp.weixin.qq.com/s/1YufdR ...

  2. 简单聊聊FPGA的一些参数——后篇

    上一篇:https://mp.weixin.qq.com/s/prBScH4D4ixxtbdW_7n9Yg 2. I/O 和体系结构特性 安全器件管理器 英特尔 Stratix 10 器件系列在所有密 ...

  3. 聊聊lettuce的sentinel连接

    为什么80%的码农都做不了架构师?>>>    序 本文主要研究一下lettuce的sentinel连接 RedisClient.connectSentinel lettuce-co ...

  4. vr设备的服务器性能指标,让我们一起聊聊VR眼镜技术参数

    时下最炙手可热的数码产品无疑就是VR眼镜了,而作为影响VR体验的重要一部分,VR眼镜技术参数也成为了很多人讨论的焦点.目前市面上的VR设备良莠不齐,具体什么样的VR眼镜技术参数规格才是优秀的产品,而这 ...

  5. java forward 修改请求参数_聊聊springboot session timeout参数设置

    序 本文主要介绍下spring boot中对session timeout参数值的设置过程. ServerProperties spring-boot-autoconfigure-1.5.8.RELE ...

  6. Spring Session官方介绍及spring框架学习方法

    现在我们开始讲Spring Session,首先进入maven中央仓库,在百度查一下,如何替换成阿里云的仓库就OK了,我们搜索什么呢,spring-session-data-redis,这里面找到这个 ...

  7. ThreadPoolExecutor使用和思考(上)-线程池大小设置与BlockingQueue的三种实现区别

    前记: jdk官方文档(javadoc)是学习的最好,最权威的参考. 文章分上中下.上篇中主要介绍ThreadPoolExecutor接受任务相关的两方面入参的意义和区别,池大小参数corePoolS ...

  8. 对于python命令行,你应该这么做才专业

    论吃苦 和朋友聊天,说到了学习的问题.不知道何时听过一句话,因为不想吃生活的苦,所以我忍受着学习的苦.生活的苦只要躺着就能吃到,而学习的苦却要我们逼着自己去吃.每天下班回来等到孩子睡了开始学习,学的差 ...

  9. 【R语言】ggplot2:初次见面,请多多关照!

    目录 一.序 二.ggplot2是什么? 三.ggplot2能画出什么样的图? 四.组装机器 五.设计图纸 六.机器的零件 1. 零件--散点图 1) 变换颜色 2) 拟合曲线 3) 变换大小 4) ...

最新文章

  1. php 判断是否有相同的ID,如果有就修改数据库字段,没有就插入数据库字段
  2. 系统、应用监控的缜密思路,性能瓶颈的克星
  3. QT:KeepAliveOption的应用
  4. ffmpeg源码分析——av_register_all
  5. ThinkPHP控制器
  6. 陕西活性炭需求分析_20212027年中国粉末活性炭行业市场发展现状调研与投资趋势前景分析报告...
  7. 简单tarjan》一道裸题(BZOJ1051)(easy)
  8. java客户端_Java常用的Http client客户端
  9. aws 部署python lambda_awslambda-为Lambda工具部署Python项目。-Philipp Gorczak Getting started Usage...
  10. 进程被kill原因_Linux内核系列 简析进程生命周期:从生到死的这一生(一)
  11. HoloToolkit/unity远程实时传输视频
  12. paip.表格化CSV输出
  13. 解决github上的提交说明中文乱码的问题
  14. uni-app 无网络图标不显示问题解决
  15. 解决玩Minecraft时鼠标Dpi不稳定问题
  16. Docker容器网络模式与数据管理
  17. Oracle格式化总结
  18. MT6323 PMIC 功能介绍
  19. pl/sql develo 13 下载安装地址
  20. 一文简单理解《Effective Java》建议

热门文章

  1. mongodb 导出指定数据库文件大小_大数据技术之将mongodb 数据指定字段导出,然后指定字段导入mysql 实例 及相关问题解决...
  2. python dataframe列数值相加,python合并dataframe中的行并将值相加
  3. java se翻译_(翻译)Java SE 8 Lambda 标准库概览(下)
  4. eeglab教程系列(15)-绘制独立成分ERP贡献
  5. JStorm与Storm源码分析(六)--收集器 IOutputCollector 、OutputCollector
  6. 19倍超音速“大炮”轰出可控核聚变!成本仅为传统方法0.1%,腾讯已投资
  7. 助力健康中国,国内首个中文医疗信息处理挑战榜正式发布
  8. 谷歌翻译大型翻车现场:请服用“反坦克导弹”来缓解疼痛,UCLA:医生们要注意了...
  9. 困扰数学家25年的“切苹果”难题,被一位华人统计学博士解决了
  10. 超过Google,微信AI在NLP领域又获一项世界第一