Memcached是一个很常见的分布式组件,现在有很多.NET下开源的Memcached组件的客户端实现,比如EnyimMemcachedClient。在Adhesive框架中实现了一个分布式组件客户端,同时也实现了第一个具体的客户端,也就是Memcached。与其它实现不同的是,这里我们提供了Memcached二进制协议的完整实现,并且也实现了一些特色功能,进一步封装了Memcached的一些基础API。

要使用Adhesive.DistributedComponentClient,首先免不了进行配置。和框架的其它模块一样,这里我们也使用了配置服务:

进一步查看:

在这里我们定义了一个TestMemcachedCluster:

对于每一个集群,需要配置其名字以及尝试恢复节点的时间间隔。也就是在节点连接不上的时候,多久尝试进行一次节点的恢复。在Adhesive的实现中,一个集群下应该有多个节点,节点的分配根据Key进行一致性哈希,如果某个节点不可用,进行节点的重新分配,并不会影响程序的使用,当然由于重分配,必定会有1/N的数据丢失。在节点恢复后,又会进行重新分配。下面看一下节点的配置:

在这里定义了四个节点,随便点进去一个看看:

这里定义了:

1、节点名字

2、节点地址

3、节点的权重:所谓权重就是在进行Key到Node分配时候的权重,权重越高分配的Key越高,在使用的时候负载也就越重。对于Memcached来说,可以把内存少的节点权重设置低一些。

4、最大的连接数:连接池中允许的最大连接数

5、最小的连接数:连接池中保留的最小连接数

6、最大闲置时间:闲置时间过长的连接会被回收

7、最大忙碌时间:忙碌时间过长的连接会被强制关闭

8、连接超时时间

9、发送数据超时时间

10、接受数据超时时间

11、连接池维护时间间隔:连接池在维护的时候会回收闲置过长的连接,会关闭忙碌过长的连接,会补充连接到最小连接数。

在某些情况下,您可能希望独立使用这个组件,也不是依赖配置服务,那么可以如下操作:

找到DistributedServerConfiguration.cs,把private static DistributedServerConfigurationEntity GetConfig()中的:

var defaultConfig = GetDefaultConfig();
var config = configService.GetConfigItemValue(true, "DistributedServerConfiguration", defaultConfig);

替换为:

var defaultConfig = GetDefaultConfig(); var config = LocalConfigService.GetConfig(defaultConfig);

也就是使用本地配置服务来替代通用配置服务。对于本地配置服务会在程序的Config目录下生成.config的配置文件,这样就可以不依赖配置服务直接启动程序,比如:

<?xml version="1.0" encoding="utf-8"?>
<DistributedServerConfigurationEntity xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema"><ClientClusterConfigurations><item><key><string>TestMemcachedCluster</string></key><value><ClientClusterConfiguration><Name>TestMemcachedCluster</Name><TryRecoverNodeInterval>00:00:10</TryRecoverNodeInterval><ClientNodeConfigurations><item><key><string>MemcachedNode1</string></key><value><ClientNodeConfiguration><Name>MemcachedNode1</Name><Address>192.168.135.222:12000</Address><Weight>Medium</Weight><MaxConnections>50</MaxConnections><MinConnections>5</MinConnections><MaxIdleTime>00:01:00</MaxIdleTime><MaxBusyTime>00:01:00</MaxBusyTime><ConnectTimeout>00:00:05</ConnectTimeout><SendTimeout>00:00:05</SendTimeout><ReceiveTimeout>00:00:05</ReceiveTimeout><MaintenanceInterval>00:00:30</MaintenanceInterval></ClientNodeConfiguration></value></item><item><key><string>MemcachedNode2</string></key><value><ClientNodeConfiguration><Name>MemcachedNode2</Name><Address>192.168.135.222:12001</Address><Weight>Medium</Weight><MaxConnections>50</MaxConnections><MinConnections>5</MinConnections><MaxIdleTime>00:01:00</MaxIdleTime><MaxBusyTime>00:01:00</MaxBusyTime><ConnectTimeout>00:00:05</ConnectTimeout><SendTimeout>00:00:05</SendTimeout><ReceiveTimeout>00:00:05</ReceiveTimeout><MaintenanceInterval>00:00:30</MaintenanceInterval></ClientNodeConfiguration></value></item><item><key><string>MemcachedNode3</string></key><value><ClientNodeConfiguration><Name>MemcachedNode3</Name><Address>192.168.135.221:12000</Address><Weight>Medium</Weight><MaxConnections>50</MaxConnections><MinConnections>5</MinConnections><MaxIdleTime>00:01:00</MaxIdleTime><MaxBusyTime>00:01:00</MaxBusyTime><ConnectTimeout>00:00:05</ConnectTimeout><SendTimeout>00:00:05</SendTimeout><ReceiveTimeout>00:00:05</ReceiveTimeout><MaintenanceInterval>00:00:30</MaintenanceInterval></ClientNodeConfiguration></value></item><item><key><string>MemcachedNode4</string></key><value><ClientNodeConfiguration><Name>MemcachedNode4</Name><Address>192.168.135.221:12001</Address><Weight>Medium</Weight><MaxConnections>50</MaxConnections><MinConnections>5</MinConnections><MaxIdleTime>00:01:00</MaxIdleTime><MaxBusyTime>00:01:00</MaxBusyTime><ConnectTimeout>00:00:05</ConnectTimeout><SendTimeout>00:00:05</SendTimeout><ReceiveTimeout>00:00:05</ReceiveTimeout><MaintenanceInterval>00:00:30</MaintenanceInterval></ClientNodeConfiguration></value></item></ClientNodeConfigurations></ClientClusterConfiguration></value></item></ClientClusterConfigurations>
</DistributedServerConfigurationEntity>

在介绍了配置之后,就来介绍如何使用客户端,首先可以获得一个集群:

var client = MemcachedClient.GetClient("TestMemcachedCluster");

然后可以通过client调用各种API了:

1、清空数据:

Dictionary<string, bool> Flush(TimeSpan expire)

Dictionary<string, bool> Flush()

这里提供了两个重载。可以清空整个集群所有数据,也可以清空一定未来一定时间内将会过期的所有数据。

2、获取服务端版本:

Dictionary<string, string> Version()

将会列出每一个节点的版本号

3、获取服务端状态:

Dictionary<string, Dictionary<string, string>> Stat(StatType statType)

Dictionary<string, Dictionary<string, string>> Stat()

在这里提供了两个重载,可以获取指定类型的状态数据,也可以获取所有状态数据。StatType 定义如下:

    public enum StatType{General = 0,Item = 1,Setting = 2,}

要注意,在这里同样是按照节点分组的。

4、获取数据的操作:

string Get(string key)

T Get<T>(string key)

string Get(string key, out ulong version)

T Get<T>(string key, out ulong version)

在这里提供了获取纯字符串和获取泛型类型两组方法,也提供了仅仅获取值以及获取值和版本号两组方法。在并发状态下,很可能我们在获取一个Value之后进行了一些修改,但在Set到Memcached的时候已经被其它人Set过一次了,那么我们可以通过Get时候获取到的版本号来确保Set的时候的版本和之前Get时候的版本一致:

 ulong version = 0;var key = Guid.NewGuid().ToString();Assert.IsTrue(client.Add(key, stringValue, expire, version));Assert.AreEqual(stringValue, client.Get(key, out version));Assert.IsTrue(client.Replace(key, stringValue, version));Assert.IsFalse(client.Replace(key, stringValue, version));

上面的测试代码表明在进行一次操作之后版本号会修改,再使用老的版本号进行Replace操作将会失败。

5、存入数据的操作:

bool Set(string key, string value)

bool Set(string key, string value, ulong version)

bool Set(string key, string value, TimeSpan expire)

bool Set(string key, string value, TimeSpan expire, ulong version)

bool Set<T>(string key, T value)

bool Set<T>(string key, T value, ulong version)

bool Set<T>(string key, T value, TimeSpan expire)

bool Set<T>(string key, T value, TimeSpan expire, ulong version)

对于泛型和纯字符串的重载,每一组中都会有普通的Set、检查版本号的Set、设置超时时间的Set以及设置超时时间+检查版本号的Set。

对于Set操作来说,如果Key存在,那么就会替换已有的Value,如果Key不存在就会增加一组KeyValue。

如果赋值了超时时间,那么超过这个时间值就或Get不到,当然由于memcached的限制,超时时间不能超过30天。

6、替换数据的操作:

bool Replace(string key, string value)

bool Replace(string key, string value, ulong version)

bool Replace(string key, string value, TimeSpan expire)

bool Replace(string key, string value, TimeSpan expire, ulong version)

bool Replace<T>(string key, T value)

bool Replace<T>(string key, T value, ulong version)

bool Replace<T>(string key, T value, TimeSpan expire)

bool Replace<T>(string key, T value, TimeSpan expire, ulong version)

方法签名和Set基本一致,只不过要注意,如果Key不存在,那么Replace会失败,也就是返回false。

 var key = Guid.NewGuid().ToString();Assert.IsFalse(client.Replace(key, stringValue));Assert.IsTrue(client.Set(key, ""));Assert.IsTrue(client.Replace(key, stringValue));Assert.AreEqual(client.Get(key), stringValue);

7、增加数据的操作:

bool Add(string key, string value)

bool Add(string key, string value, ulong version)

bool Add(string key, string value, TimeSpan expire)

bool Add(string key, string value, TimeSpan expire, ulong version)

bool Add<T>(string key, T value)

bool Add<T>(string key, T value, ulong version)

bool Add<T>(string key, T value, TimeSpan expire)

bool Add<T>(string key, T value, TimeSpan expire, ulong version)

方法签名和Set基本一致,只不过要注意,如果Key已经存在,那么Add会失败,也就是返回false。

 var key = Guid.NewGuid().ToString();Assert.IsTrue(client.Set(key, ""));Assert.IsFalse(client.Add(key, stringValue, expire));Assert.IsTrue(client.Delete(key));Assert.IsTrue(client.Add(key, stringValue, expire));Assert.AreEqual(client.Get(key), stringValue);

8、递增的操作:

ulong? Increment(string key, ulong amount)

ulong? Increment(string key, ulong amount, ulong version)

ulong? IncrementWithInit(string key, ulong seed, TimeSpan expire, ulong amount)

ulong? IncrementWithInit(string key, ulong seed, ulong amount)

ulong? IncrementWithInit(string key, ulong seed, TimeSpan expire, ulong amount, ulong version)

ulong? IncrementWithInit(string key, ulong seed, ulong amount, ulong version)

在这里主要有两组操作,第一组是增加一个已有的值,第二组是增加一个已有的值或者初始化值。先来看第一组的测试代码:

 var key = Guid.NewGuid().ToString();var result = client.Increment(key, 10);Assert.IsFalse(result.HasValue);Assert.IsTrue(client.Set(key, i));result = client.Increment(key, 10);Assert.IsTrue(result.HasValue);Assert.AreEqual(result.Value, (ulong)i + 10);

第一次调用的时候由于没有值会得到一个空值。在设置了值之后再调用就会得到值了。再来看看第二组的测试代码:

var key = Guid.NewGuid().ToString();var result = client.IncrementWithInit(key, (ulong)i, expire, 10);Assert.IsTrue(result.HasValue);Assert.AreEqual(result.Value, (ulong)i);result = client.IncrementWithInit(key, (ulong)i, expire, 10);Assert.IsTrue(result.HasValue);Assert.AreEqual(result.Value, (ulong)i + 10);

使用i进行初始化,并且递增步进设置为10,第一次由于没有值结果应该就是i,第二次则是i+10了。对于IncrementWithInit,由于附带了初始化Value的功能,所以我们可以提供一个过期时间。

9、和递增相对应的递减操作:

ulong? Decrement(string key, ulong amount)

ulong? Decrement(string key, ulong amount, ulong version)

ulong? DecrementWithInit(string key, ulong seed, TimeSpan expire, ulong amount)

ulong? DecrementWithInit(string key, ulong seed, ulong amount)

ulong? DecrementWithInit(string key, ulong seed, TimeSpan expire, ulong amount, ulong version)

ulong? DecrementWithInit(string key, ulong seed, ulong amount, ulong version)

这里就不过多介绍了,递减和递增相对应,为seed减少一个amount。

10、为字符串加值:

bool Append(string key, string value)

bool Prepend(string key, string value)

分别是把另一个字符串value加到key对应的字符串的后面和前面:

var key = Guid.NewGuid().ToString();Assert.IsTrue(client.Set(key, stringValue, expire));Assert.IsTrue(client.Append(key, "你好"));Assert.AreEqual(stringValue + "你好", client.Get(key));
var key = Guid.NewGuid().ToString();Assert.IsTrue(client.Set(key, stringValue, expire));Assert.IsTrue(client.Prepend(key, "你好"));Assert.AreEqual("你好" + stringValue, client.Get(key));

11、删除一个Value:

bool Delete(string key)

这个没啥好说的,删除后将会Get不到这个Key。

12、快速操作:

由于Set和Delete很常用,并且我们往往不需要知道操作的返回值也允许操作发生失败,因此在这里提供了FastSet和FastDelete的功能。对于这些API,不提供返回值,只是尝试进行Set和Delete操作。相比普通的Set和Delete操作,Fast版本可以有10%以上的性能提高,因为我们不会解析Response的消息,并且在不出错的情况下,Memcached服务端也不会返回Response消息。

void FastSet(string key, string value)

void FastSet(string key, string value, ulong version)

void FastSet(string key, string value, TimeSpan expire)

void FastSet(string key, string value, TimeSpan expire, ulong version)

void FastSet<T>(string key, T value)

void FastSet<T>(string key, T value, ulong version)

void FastSet<T>(string key, T value, TimeSpan expire)

void FastSet<T>(string key, T value, TimeSpan expire, ulong version)

void FastDelete(string key)

上面介绍的这些API其实都是Memcached服务端提供的API,下面会介绍一些利用这些API提供的特色功能,进一步方便使用者。在这里我们仅仅介绍使用,不会介绍其实现,大家可以思考一下,如何利用之前提到的12个API来完成下面的功能。

1、判断Key是否存在:

bool Exists(string key)

2、Get+Set:

也就是如果Key存在的话则获取,如果不存在的话,使用Set值为回调方法提供的值,最后返回值

string GetAndSet(string key, Func<string> getValue, TimeSpan expire)

string GetAndSet(string key, Func<string> getValue)

T GetAndSet<T>(string key, Func<T> getValue, TimeSpan expire)

T GetAndSet<T>(string key, Func<T> getValue)

string GetAndSetWhen(string key, Func<string> getValue, TimeSpan exipre, Func<bool> condition)

string GetAndSetWhen(string key, Func<string> getValue, Func<bool> condition)

T GetAndSetWhen<T>(string key, Func<T> getValue, Func<bool> condition)

T GetAndSetWhen<T>(string key, Func<T> getValue, TimeSpan expire, Func<bool> condition)

这里还定义了一组GetAndSetWhen方法,也就是满足condition的时候才进行值的Set。

我们来看一个应用:

   private int InternalGetDataCount(string typeFullName, string databaseName, string tableName, string columnName, DateTime begin, DateTime end, IMongoQuery filterquery){try{var count = memcachedClient.GetAndSetWhen<int?>(GetMemcachedKey(typeFullName, databaseName, tableName, columnName,begin.ToString("yyyyMMddHHmmss"), end.ToString("yyyyMMddHHmmss"), filterquery), () =>{var query = Query.And(Query.LT(columnName, end).GTE(begin), filterquery);var server = CreateSlaveMongoServer(typeFullName);var database = server.GetDatabase(databaseName);var collection = database.GetCollection(tableName);var c = collection.Count(query);Console.WriteLine(string.Format("Db {0} {1} {2} {3} {4} {5} {6} {7}", typeFullName, databaseName, tableName, columnName,begin.ToString("yyyyMMddHHmmss"), end.ToString("yyyyMMddHHmmss"), filterquery == null ? "" : filterquery.ToString(), c));return c;}, TimeSpan.FromDays(1), () => DateTime.Now > end);return count.Value;}catch (Exception ex){LocalLoggingService.Error(ex.ToString());throw;}}

比如在这里,我们从Mongodb数据库获取数据量,并希望把结果缓存在Memcached中。但是我们不希望在结束时间大于当前时间的情况下缓存值,因为只有过去时间获取的统计值才不会变动,才可以缓存下来。这是就可以使用GetAndSetWhen了。

3、分布式锁

IDisposable AcquireLock(string key, TimeSpan timeOut)

Memcached由于是单点,因此很适合作为分布式锁,测试代码如下:

 var timeoutCount = 0;var key = Guid.NewGuid().ToString();Parallel.For(0, 10, i =>{try{using (var locker = client.AcquireLock(key, TimeSpan.FromMilliseconds(100))){Thread.Sleep(1000);}}catch (TimeoutException){Interlocked.Increment(ref timeoutCount);}});Assert.AreEqual(timeoutCount, 9);

在这里,我们在获得了锁之后等待1秒,并且允许锁的超时时间为10毫秒。这样的话,在并发情况下,只应该有一次调用可以成功,其它9次都会失败,失败的时候会抛出TimeoutException异常。

4、列表操作:

List<string> GetList(string listKey, int pageSize, int pageIndex)

List<string> GetList(string listKey)

List<T> GetList<T>(string listKey, int pageSize, int pageIndex)

List<T> GetList<T>(string listKey)

bool SetListItem(string listKey, string itemKey, string itemValue)

bool SetListItem(string listKey, string itemKey, string itemValue, TimeSpan expire)

bool SetListItem<T>(string listKey, string itemKey, T itemValue, TimeSpan expire)

bool SetListItem<T>(string listKey, string itemKey, T itemValue)

bool DeleteListItem(string listKey, string itemKey)

有的时候,我们会在Memcached中存一组数据列表数据,并且又不希望在修改项的时候获取整个列表然后修改后回存回去。这个时候就需要保存多个KeyValue,并且使用一个KeyValue来存放列表中所有的Key。这组API封装了其中的实现。允许我们:

1)获取列表所有数据

2)分页方式获取部分数据

3)设置某个项

4)删除某个项

测试代码如下:

var listKey = Guid.NewGuid().ToString();var pageIndex = 2;var pageSize = 10;Parallel.For(0, 100, i =>{var itemKey = Guid.NewGuid().ToString();client.SetListItem(listKey, itemKey, new User(i), TimeSpan.FromMinutes(10));});var value = client.GetList<User>(listKey, pageSize, pageIndex);Assert.AreEqual(value.Count, pageSize);Assert.AreEqual(value.First().GetType(), typeof(User));

5、批量获取操作:

Dictionary<string, string> GetMultiple(IList<string> keys)

Dictionary<string, T> GetMultiple<T>(IList<string> keys)

也就是直接获取一组Key的值。由于提供的Key可能分布在多个节点上,因此在这里我们使用并行方式同时到都个节点上获取相应的值。测试代码如下:

 var keys = new System.Collections.Concurrent.ConcurrentBag<string>();Parallel.For(0, 1000, i =>{var key = Guid.NewGuid().ToString();Assert.IsTrue(client.Set(key, new User(i), TimeSpan.FromMinutes(10)));keys.Add(key);});var value = client.GetMultiple<User>(keys.ToList());Assert.AreEqual(value.Count, keys.Count);foreach (var item in value){Assert.AreEqual(item.Value.GetType(), typeof(User));}

至此,我们介绍了所有API的使用,可以直接查看源代码中的Adhesive.DistributedComponentClient.Test项目以及Adhesive.DistributedComponentClient.UnitTest来了解如何使用。经过测试,Adhesive的这个Memcached客户端的实现和Enyim的实现在性能上有5%左右的提高,并且API更丰富(后面那些特色API)。

Adhesive框架系列文章--分布式组件客户端模块使用相关推荐

  1. Adhesive框架系列文章--Mongodb数据服务模块使用(上)

    之前介绍的应用程序信息中心模块中所有日志.异常.性能和状态数据都依赖Mongodb数据服务,Mongodb数据服务的接口也简单的可以: public interface IMongodbInsertS ...

  2. Adhesive框架系列文章--Mongodb数据服务使用实践

    在此文中,我们会实践一下,如何使用Mongodb数据服务存储新的自定义数据.之前我们说过了,使用Mongodb数据服务只有4步这么简单: 1)定义实体 2)定义元数据 3)配置Mongodb数据服务 ...

  3. Adhesive框架系列文章--报警处理流程使用实践

    框架除了报警之外还提供了简单的报警处理流程. 先来看一下报警相关的配置: 比如这里有一个报警,30秒检查一次30秒内相关数据如果超过10条则报警. 在报警后,系统会自动创建一个事件,相关人员登录到后台 ...

  4. 无锁编程(Lock Free)框架 系列文章

    无锁编程(Lock Free)框架 系列文章: 1 前置知识:伪共享 原理 & 实战 2 disruptor 使用和原理 图解 3 akka 使用和原理 图解 4 camel 使用和 原理 图 ...

  5. Scott的ASP.net MVC框架系列文章之四:处理表单数据(2)

    前几周我发表了一系列文章介绍我们正在研究的ASP.NET MVC框架.ASP.NET MVC框架为你提供了一种新的开发Web应用程序的途径,这种途径可以让应用程序变得更加层次清晰,而且更加有利于对代码 ...

  6. 2019 年起如何开始学习 ABP 框架系列文章-开篇有益

    阅读文本大概需要 3.3 分钟. 本系列文章推荐阅读地址为:52ABP 开发文档https://www.52abp.com/Wiki/52abp/latest/Welcome-to-52abp 本文的 ...

  7. android下usb框架系列文章---(2)Usb mass_storage turn on的过程

    下面从framework层的ui来看一下插入usb后share的过程,这个动作的触发是从status bar下面弹出的usb connect开始的. frameworks/base/packages/ ...

  8. 如何实现一个php框架系列文章【2】实现类的自动加载

    根据前一篇文章的设计原则,我们暂时把php文件分为3类,类名和文件名都遵守如下约定.   类名 文件名 路径 模型类m {$app}Mod  {$app}.mod.php {$app}/model   ...

  9. 轻量级神经网络算法系列文章-MobileNet v3

    4. 轻量级神经网络算法目录 轻量级神经网络算法 4.1 各轻量级神经网络算法总结对比 4.2 SqueezeNet 4.3 DenseNet 4.4 Xception 4.5 MobileNet v ...

最新文章

  1. 机房收费系统系列一:运行时错误‘-2147217843(80040e4d)’;用户‘sa’登陆失败...
  2. 徒手撸出一个类Flask微框架(三)根据业务进行路由分组
  3. c、c++---linux上的GetTickCount函数
  4. 《JS权威指南学习总结--1.1语言核心》
  5. 【数字信号处理】序列傅里叶变换 ( 基本序列的傅里叶变换 | e^jωn 的傅里叶变换 )
  6. 常用的后端性能优化六种方式:缓存化+服务化+异步化等
  7. 【Python】 数字求和
  8. 五一档票房超8.8亿元!张艺谋新片仅第二
  9. Maven私服(二) - Nexus的安装
  10. 移动应用的一般测试流程和需要注意的测试项
  11. 波卡跨链交易协议RAI Finance将接入Bounce Finance实现IDO代币发行
  12. 多线程java_由浅入深地介绍Java多线程,让你如何快速进入Java多线程的学习
  13. 二进制GCD算法解析
  14. excel生成趋势线和函数关系式
  15. Linux网络编程必学的TCP/IP协议——图解分层(通俗易懂)【建议新手收藏】
  16. 高可用架构篇:【2】ActiveMQ高可用+负载均衡集群的安装、配置、高可用(多节点)
  17. jquery fadeOut 异步
  18. 看雪CTF.TSRC 2018 团队赛 第一题 初世纪 writeup
  19. 【保姆级】阿里云服务器frp内网穿透教程
  20. Mysql IFNULL SUM 结合使用无效的问题

热门文章

  1. IBM websphere MQ远程队列的简单配置
  2. java异常处理的throw和throws的区别
  3. UNIX网络编程之旅-配置unp.h头文件环境
  4. 一起学asp.net基础文章二 服务器控件、客户端控件和html表单控件
  5. python定义字符串数组_从字符串数组(或元组)在Python中创建动态sql“ in list”子句的“最佳”方法是什么?...
  6. html文档表示表格的标记,【单选题】在HTML文档中用于表示表格的标记对是( )...
  7. mysql获取配置文件信息,四种获取MySQL数据库配置文件加载顺序
  8. selenium判断元素是否存在_如何判断宝宝是否缺微量元素?
  9. java两个线程同时运行_Java实现的两个线程同时运行案例
  10. javascript获取系统时间时区_5个JavaScript技巧让你成为更好的开发者