redis使用sysc超时_优雅的处理Redis访问超时
很长一段时间以来,一直在项目中使用Redis作为辅助存储,确切来说是利用Redis的内存存储,而不是将其作为缓存。比如常见的利用Set集合来判断某个数值是否存在,或者将来自不同请求的数据放在Redis中进行拼接然后一起写入MySQL等数据库。
这种存储目的的使用要求对Redis的访问不能失败(如果作为缓存使用,是接受失败的),所以作为存储目的使用代码中要对请求Redis的代码进行异常处理以及重试等。
在最初的代码中采用了最常见的方法如try ... catch ...处理异常,递归进行重试,类似:
//伪代码
public void Process(int retry)
{
if(retry>3)
{
//记录错误
return;
}
try
{
//业务代码
}
catch(Exception ex)
{
//重试
++retry;
Process(retry);
}
}
后来有一天看到了园友Jeffcky推荐的Polly库,瞬间眼前一亮,这才是我们处理异常和重试所需要的东西。
关于Polly的使用,可以参考Jeffcky的博文或者Polly项目的GitHub主页(文档很详细)。
大致的代码结构如:
var tsArr = new TimeSpan[]
{
TimeSpan.FromSeconds(1),
TimeSpan.FromSeconds(1)
};
// 构造一种重试测试(其它可选的包括熔断等)
var policy = Policy
.Handle()
.WaitAndRetryAsync(tsArr);
// 需要有Polly调用的业务代码,以异步方法为例
async Task SomeToInvoke()
{
// 一些异步调用
}
// 使用Polly执行业务代码(如不需要捕获异常可选用其它重载)
var pollyRet = await policy.ExecuteAndCaptureAsync(SomeToInvoke);
// 处理返回值判断调用是否成功,或发生了什么异常
下面一步步来看博主的实现过程。
先放上一些测试所用的代码,首先是创建Redis连接的接口和类,它们是从NopCommerce项目一个早起版本借(chao)鉴(xi)来的(文件名都没改,为了测试方便代码略有改动),一直用着没啥大问题就这样用了。
public interface IRedisConnectionWrapper : IDisposable
{
IDatabase Database(int? db = null);
IServer Server(EndPoint endPoint);
EndPoint[] GetEndpoints();
void FlushDb(int? db = null);
}
public class RedisConnectionWrapper : IRedisConnectionWrapper
{
private readonly Lazy _connectionString;
private readonly Lazy _auth;
private volatile ConnectionMultiplexer _connection;
private readonly object _lock = new object();
public RedisConnectionWrapper(string server, string pswd)
{
this._connectionString = new Lazy(() => server);
this._auth = new Lazy(() => pswd);
}
private ConnectionMultiplexer GetConnection()
{
if (_connection != null && _connection.IsConnected) return _connection;
lock (_lock)
{
if (_connection != null && _connection.IsConnected) return _connection;
if (_connection != null)
{
_connection.Dispose();
}
var options = new ConfigurationOptions();
options.EndPoints.Add(_connectionString.Value);
if (!string.IsNullOrEmpty(_auth.Value))
options.Password = _auth.Value;
_connection = ConnectionMultiplexer.Connect(options);
}
return _connection;
}
public IDatabase Database(int? db = null)
{
return GetConnection().GetDatabase(db ?? -1);
}
public IServer Server(EndPoint endPoint)
{
return GetConnection().GetServer(endPoint);
}
public EndPoint[] GetEndpoints()
{
return GetConnection().GetEndPoints();
}
public void FlushDb(int? db = null)
{
var endPoints = GetEndpoints();
foreach (var endPoint in endPoints)
{
Server(endPoint).FlushDatabase(db ?? -1);
}
}
public void Dispose()
{
if (_connection != null)
{
_connection.Dispose();
}
}
}
对于StackExchange.Redis来说是比较标准的连接创建方式,顺便看了下新版的NopCommerce代码中,代码有了些小改进,增加了一个双重锁。有需要的园友可以自行去下载新的。
接着开始考虑重试问题,为了代码看起来更简洁,决定尝试通过动态代理将捕捉异常并重试的操作作为切面注入。说到动态代理,第一个想到肯定是Castle.Core(前身为CastleDynamicProxy)。动态代理可以选择接口或者是类,如果是类的话需要方法是虚方法。看了下StackExchange.Redis的代码,几个实现类都是internal,方法也都是非virtual。所以只能只能自己写一个类包一下。
这个类就是一个壳,为了我们切面注入。下面的代码只保留的一个方法,其它的省略。另外Castle.Core的动态代理是不支持异步方法的,所以先用Redis的同步接口做下尝试。
public class RedisDatabaseWrapper:IDatabase
{
private IDatabase _redisDb;
public RedisDatabaseWrapper(IRedisConnectionWrapper redisConnectionWrapper)
{
_redisDb = redisConnectionWrapper.Database();
}
public virtual bool SetContains(RedisKey key, RedisValue value, CommandFlags flags = CommandFlags.None)
{
return _redisDb.SetContains(key, value, flags);
}
// 省略其它所有方法...
}
安装Castle.Core,并开始实现动态代理类。
public class RetryByPollyInterceptor : IInterceptor
{
public async void Intercept(IInvocation invocation)
{
var isAsync = IsAsyncMethod(invocation.Method);
if (isAsync)
InterceptAsync(invocation);
else
InterceptSync(invocation);
}
private void InterceptSync(IInvocation invocation)
{
var tsArr = new TimeSpan[]
{
TimeSpan.FromSeconds(1),
TimeSpan.FromSeconds(1)
};
Action action = (ex, ts, idx, ctx) =>
{
Console.WriteLine($"Polly Exp:{ex.GetType()} {ex.Message} Try:{idx} ");
var invca = (IInvocation)ctx["inv"];
if (idx == 2)
{
var type = invca.Method.ReturnType;
if (type == typeof(void)) return;
var ret = type.IsValueType ? Activator.CreateInstance(type) : null;
invca.ReturnValue = ret;
}
};
var policy = Policy
.Handle()
.Or()
.Or()
.WaitAndRetry(tsArr, action);
void OrignalInvoke()
{
invocation.Proceed();
}
var pollyRet = policy.ExecuteAndCapture(OrignalInvoke, new Dictionary() { ["inv"] = invocation });
if (pollyRet.Outcome != OutcomeType.Successful)
{
Console.WriteLine($"Polly Ret Type:{pollyRet.Outcome} Exp:{pollyRet.ExceptionType} Msg:{pollyRet.FinalException?.Message}");
}
}
private void InterceptAsync(IInvocation invocation)
{
// 异步方法代理,下文会讨论
}
private static bool IsAsyncMethod(MethodInfo method)
{
return (
method.ReturnType == typeof(Task) ||
(method.ReturnType.IsGenericType && method.ReturnType.GetGenericTypeDefinition() == typeof(Task<>))
);
}
}
注意
这个方法也是经过多次尝试才最终完成,可以看到这里预留了处理异步代理的方法,后文会详细说。对于同步方法这段代码可以完美的捕获异常并重试。不用在外侧代码进行catch。当然内部发生异常并多次重试仍失败后会返回非期望的结果,还是需要根据业务的需要对返回值进行判断。
这段代码最值得注意的是这几行:
Action action = (ex, ts, idx, ctx) =>
{
Console.WriteLine($"Polly Exp:{ex.GetType()} {ex.Message} Try:{idx} ");
var invca = (IInvocation)ctx["inv"];
if (idx == 2)
{
var type = invca.Method.ReturnType;
if (type == typeof(void)) return;
var ret = type.IsValueType ? Activator.CreateInstance(type) : null;
invca.ReturnValue = ret;
}
};
由于我们设置重试两次,当第二次发生异常时,我们强制给方法返回值赋一个返回值,这样可以让外部调用方法正常执行下去而不会由于无法获取代理方法的返回值而报空引用异常。
接着看看其它组成部分。在博主目前大部分项目中都使用Autofac作为容器,我们需要注册一下用到的类。并且通过Autofac的Castle.Core插件,可以注册动态代理,这样就不用通过给类添加Attribute的方式来添加代理,这是个人比较喜欢的风格。
var builder = new ContainerBuilder();
builder.Register(c => new RetryByPollyInterceptor()); //动态代理类
builder.RegisterType().As().EnableInterfaceInterceptors().InterceptedBy(typeof(RetryByPollyInterceptor)).SingleInstance(); //添加动态代理
builder.RegisterType().As()
.WithParameters(new[]
{
new NamedParameter("server", "127.0.0.1"),
new NamedParameter("pswd",""),
}).SingleInstance();
Container = builder.Build();
可以用下面的代码来测试一下上面这些方法。
public void ReadTest(long start, long end)
{
for (var i = start; i <= end; i++)
{
var exists = _redisDb.SetContains(RedisKey, i);
}
}
可以使用Windows版的Redis,直接运行redis-server.exe来启动服务。然后直接关闭redis-server程序来模拟服务端失败,或者直接禁用网卡来模拟网络失败。
可以看到Polly会进行重试并且捕获异常,也就说在ReadTest中感知不到异常。
搞定了同步方法,开始尝试动态代理异步方法。添加Redis异步接口的实现并注册:
public class RedisDatabaseAsyncWrapper:IDatabaseAsync
{
private IDatabase _redisDb;
public RedisDatabaseAsyncWrapper(IRedisConnectionWrapper redisConnectionWrapper)
{
_redisDb = redisConnectionWrapper.Database();
}
public virtual async Task SetContainsAsync(RedisKey key, RedisValue value, CommandFlags flags = CommandFlags.None)
{
return await _redisDb.SetContainsAsync(key, value, flags);
}
// 省略其它实现..
}
//注册异步实现
builder.RegisterType().As().EnableInterfaceInterceptors().InterceptedBy(typeof(RetryByPollyInterceptor)).SingleInstance();
//异步代理
private void InterceptAsync(IInvocation invocation)
{
var tsArr = new TimeSpan[]
{
TimeSpan.FromSeconds(1),
TimeSpan.FromSeconds(1)
};
var policy = Policy
.Handle()
.Or()
.Or()
.WaitAndRetry(tsArr);
void OrignalInvoke()
{
try
{
invocation.Proceed();
}
catch (Exception e)
{
var geneType = invocation.Method.ReturnType.GenericTypeArguments[0];
var ret = geneType.IsValueType ? Activator.CreateInstance(geneType) : null;
invocation.ReturnValue = Task.FromResult(ret);
Console.WriteLine(e);
}
}
var pollyRet = policy.ExecuteAndCapture(OrignalInvoke,
new Dictionary() { ["inv"] = invocation });
if (pollyRet.Outcome != OutcomeType.Successful)
{
Console.WriteLine(
$"Polly Ret Type:{pollyRet.Outcome} Exp:{pollyRet.ExceptionType} Msg:{pollyRet.FinalException?.Message}");
var invca = (IInvocation)pollyRet.Context["inv"];
var type = invca.Method.ReturnType;
if (type == typeof(void)) return;
if (type.IsGenericType)
{
var geneType = invca.Method.ReturnType.GenericTypeArguments[0];
var ret = geneType.IsValueType ? Activator.CreateInstance(geneType) : null;
invca.ReturnValue = Task.FromResult(ret);
}
else
{
invca.ReturnValue = Task.FromResult(0);
}
}
}
这里直接告诉各位我的尝试结果是无论如何都无法通过Polly来捕获异常。即上面代码中,OrignalInvoke方法中try...catch...抓不到异常,异常直接被扔给了外部方法。具体原因由于本人比较菜也比较懒没有仔细研究,大概可能就是用一个同步环境去调异步环境的方法没有特殊处理所以出的问题。有知道的园友评论中指点下。
如果是把invocation.Proceed()放在Task中,到是异常不会抛到外侧,但会因为被代理的方法取不到返回值而报空引用错误。原因大概应该是Castle.Core没有取到这个异步构造中的返回值。
经过一番尝试后放弃。在查找解决方法的过程中还发现一个名为Castle.Core.AsyncInterceptor的库,给Castle.Core添加动态代理异步函数的功能,但此扩展的文档实在过长,而且粗略看了下还不支持针对Autofac等IoC容器的扩展,直接放弃。
后来机缘巧合看到了园友Lemon大神的介绍其AspectCore库的文章。留言问了下对异步方法支持的情况,Lemon大神立刻给了回复,还附送了一些使用的特别提示。于是立马安装尝试。
首先是最重要的代理方法,AspectCore原生对异步方法提供支持,代码写起来很简单:
public class RetryByPollyAspectCoreInterceptor : AbstractInterceptorAttribute
{
public override async Task Invoke(AspectContext context, AspectDelegate next)
{
var tsArr = new TimeSpan[]
{
TimeSpan.FromSeconds(1),
TimeSpan.FromSeconds(1)
};
var policy = Policy
.Handle(ex=>ex.InnerException?.GetType()==typeof(TimeoutException))
.Or(ex=>ex.InnerException?.GetType()==typeof(RedisConnectionException))
.WaitAndRetryAsync(tsArr);
async Task OrignalInvoke()
{
await context.Invoke(next);
}
var pollyRet = await policy.ExecuteAndCaptureAsync(OrignalInvoke,new Dictionary() { ["ctx"] = context});
if (pollyRet.Outcome != OutcomeType.Successful)
{
Console.WriteLine($"Polly Ret Type:{pollyRet.Outcome} Exp:{pollyRet.ExceptionType} Msg:{pollyRet.FinalException?.Message}");
var ctx = (AspectContext)pollyRet.Context["ctx"];
var type = ctx.ProxyMethod.ReturnType;
if (type == typeof(void)) return;
if (type.IsGenericType)
{
var geneType = type.GenericTypeArguments[0];
dynamic ret = geneType.IsValueType ? Activator.CreateInstance(geneType) : null;
ctx.ReturnValue = Task.FromResult(ret);
}
else
{
var ret = type.IsValueType ? Activator.CreateInstance(type) : null;
ctx.ReturnValue = Task.FromResult(ret);
}
}
}
}
AspectCore也有Autofac的扩展,注册也是非常简单:
builder.RegisterDynamicProxy();
不过AspectCore还是需要给被代理的类添加Attribute:
[RetryByPollyAspectCoreInterceptor]
public class RedisDatabaseAsyncWrapper:IDatabaseAsync
{
...
}
希望大神可以扩展AspectCore的Autofac插件实现无需Attribute的代理设置。
2018/01/18补充
根据Lemon大神在评论中指点,AspectCore可以使用如下方式在Autofac注册中进行全局AOP注入:
builder.RegisterDynamicProxy(config =>
{
config.Interceptors.AddTyped(
Predicates.ForService("IDatabaseAsync"));
});
最后可以使用下面的代码测试这个异步的重试实现:
public async Task ReadTestAsync(long start, long end)
{
var total = end - start;
for (var i = 0; i <= total; i++)
{
var item = i + start;
var exists = await _redisDb.SetContainsAsync(RedisKey, item);
}
}
可以看到代理方法完美的处理了异常。
文末,在这个异步方法越来越多的新时代再次强烈推荐AspectCore。
感谢各位大神提供了这么多好用的库。感谢各位园友阅读本文。
redis使用sysc超时_优雅的处理Redis访问超时相关推荐
- php redis hash删除key,如何优雅的删除Redis的大key
关于Redis大键(Key),我们从[空间复杂性]和访问它的[时间复杂度]两个方面来定义大键.前者主要表示Redis键的占用内存大小:后者表示Redis集合数据类型(set/hash/list/sor ...
- gin redis 链接不上_内存优化,Redis是如何实现的!
点击上方"小罗技术笔记",关注公众号 第一时间送达实用干货 各位朋友新年开工好,今年由于特殊情况好多小伙伴今天在家开启远程办公模式(一直很向往),不过在这真想吐槽一下现有的远程办公 ...
- redis多服务器共享_【数据库】Redis(二)持久化及事务
Redis的数据持久化 Redis是基于内存对数据操作的数据库,计算机重启后,内存中的数据就会丢失,所以redis提供了持久化的功能,可以将redis操作的内存中数据持久化到本地的硬盘中.在redis ...
- mysql 行锁 超时_技术分享 | MySQL 行锁超时排查方法优化
作者:xuty 本文来源:原创投稿 * 爱可生开源社区出品,原创内容未经授权不得随意使用,转载请联系小编并注明来源. 一.大纲 #### 20191219 10:10:10,234 | com.ali ...
- redis为什么是单线程_面试官:Redis单线程为什么执行效率这么高?
点击上方☝Java编程技术乐园,轻松关注!及时获取有趣有料的技术文章 做一个积极的人 编码.改bug.提升自己 我有一个乐园,面向编程,春暖花开! 上一篇回顾: 面试官:Redis为什么设计成单线程的 ...
- python安装第三方包总是超时_(python pip安装第三方库超时问题(raise ReadTimeoutErrorself._pool, None, 'Read timed out.')...
(python pip安装第三方库超时问题(raise ReadTimeoutErrorself._pool, None, 'Read timed out.') pip工具安装 百度经验链接: pip ...
- redis客户端连接数量_实战解析无所不知的Redis拓展应用——Info,进阶学习,无所不能...
前言 学习是一个持续的过程.像咱们一直在更新的Redis学习内容,由基础结构,到原理应用,再到集群搭建,了解的够充分了,咱们接着又介绍Redis拓展应用,将知识面拓宽,毕竟技术都是相通的,只有灵活运用 ...
- docker redis 删除集群_基于Docker的Redis集群实践
单机版的Redis相信大家都比较熟悉了,这里介绍几种Redis的集群模式,并结合Docker来进行实践操作 abstract.png 准备工作 通过Docker下载最新的Redis镜像 # 获取red ...
- redis php高级使用_项目中应用Redis+Php的场景
前言 一些案例中有的同学说为什么不可以用string类型,string类型完全可以实现呀 我建议你看下我的专栏文章<Redis高级用法>,里面介绍了用hash类型的好处 商品维度计数 对商 ...
最新文章
- [转]Linux 的多线程编程的高效开发经验
- 主业失利,跨界捞金,飞科的算盘能如意吗?
- 沈阳招聘.NET(C#)高级软件工程师
- Rsyslog 日志相关内容
- 夺命雷公狗---ECSHOP---01-解决报错问题
- MYSQL添加新用户 MYSQL为用户创建数据库 MYSQL为新用户分配权限(转)
- NetAssist连接报错!
- Jquery Uploadify之Java获取动态传参参数
- 搜狗微信公众号文章反爬虫完美攻克
- excel2019批量删除空白行的方法
- 免费FTP解决方案之FileZilla
- H5页面跳转关注微信公众号页面
- 爬虫篇——采集单机游戏(网页游戏),爬取小游戏
- typec扩展坞hdmi没反应_全功能扩展坞Type-C Docking (扩展坞) 方案讲解-可实现拔插不掉屏...
- 无线攻击及密码破解的四种方式详解
- selenium java自动化测试
- matlab生成向量和矩阵
- Revit二次开发之技能篇(二)———轴网尺寸标注
- 技术人员如何判断靠谱的创业合伙人?
- 计算机里s大小,Mbps和mb/s换算知识-电脑维护