SharedCache分析:服务端程序
SharedCache由3个主要的项目组成MergeSystem.Indexus.WinServiceCommon、MergeSystem.Indexus.WinService和MergeSystem.Indexus.Notify。WinService可以以Windows服务方式加载,也可以以控制台方式运行,如果注册为Windows服务,则可以通过MergeSystem.Indexus.Notify程序来了解其状态,若是以控制台方式运行,则运行时的信息会在控制台窗口中显示。当然也可以配置通过NLog.dll记录日志。
大部分的功能都封装在WinServiceCommon项目中,WinService项目只负责监听和数据中转,这个项目中只有几个文件,如图1。
Indexus是个windows服务,也是程序的入口点,其类图如图2。
图1 |
图2 |
图3 ShareCache服务端程序主流程 |
Indexus可以接受几个命令行参数/install /i /uninstall /u来安装或卸载windows服务,若想以控制台方式启动程序,可以使用/local参数,如:
MergeSystem.Indexus.WinService.exe /local
或直接使用相同目录中的几个批处理文件来执行。来看一下main函数:
{
Access Log#region Access Log
COM.Handler.LogHandler.Tracking(
"Access Method: " + ((object)MethodBase.GetCurrentMethod()).ToString() + " ;"
);
#endregion Access Log
string optionalArgs = string.Empty;
if (args.Length > 0)
{
optionalArgs = args[0];
}
args Handling#region args Handling
if (!string.IsNullOrEmpty(optionalArgs))
{
if("/?".Equals(optionalArgs.ToLower()))
{
Console.WriteLine("Help Menu");
Console.ReadLine();
return;
}
else if("/local".Equals(optionalArgs.ToLower()))
{
Console.Title = "Indexus.Net Shared Cache - Server";
Console.BackgroundColor = ConsoleColor.DarkBlue;
Console.ForegroundColor = ConsoleColor.White;
// running as cmd appliacation
Indexus SrvIndexus = new Indexus();
SrvIndexus.StartService();
Console.ReadLine();
SrvIndexus.StopService();
return;
}
else if(@"/verbose".Equals(optionalArgs.ToLower()))
{
// Console.SetOut(this);
// Console.SetIn(Console.Out);
// Console.ReadLine();
return;
}
TransactedInstaller ti = new TransactedInstaller();
IndexusInstaller ii = new IndexusInstaller();
ti.Installers.Add(ti);
string path = string.Format("/assemblypath={0}", System.Reflection.Assembly.GetExecutingAssembly().Location);
string[] cmd = { path };
InstallContext context = new InstallContext(string.Empty, cmd);
ti.Context = context;
if ("/install".Equals(optionalArgs.ToLower()) || "/i".Equals(optionalArgs.ToLower()))
{
ti.Install(new Hashtable());
}
else if ("/uninstall".Equals(optionalArgs.ToLower()) || "/u".Equals(optionalArgs.ToLower()))
{
ti.Uninstall(null);
}
else
{
StringBuilder sb = new StringBuilder();
sb.Append(@"Your provided Argument is not available." + Environment.NewLine);
sb.Append(@"Use one of the following once:" + Environment.NewLine);
sb.AppendFormat(@"To Install the service '{0}': '/install' or '/i'" + Environment.NewLine, @"IndeXus.Net");
sb.AppendFormat(@"To Un-Install the service'{0}': '/uninstall' or '/u'" + Environment.NewLine, @"IndeXus.Net");
Console.WriteLine(sb.ToString());
}
}
else
{
// nothing received as input argument
ServiceBase[] servicesToRun;
servicesToRun = new ServiceBase[] { new Indexus() };
ServiceBase.Run(servicesToRun);
}
#endregion args Handling
}
这里可以看到,加/local参数启动时,直接调用StartService()方法,然后就是和安装windows服务后启动服务一样了。
StartService中调用ServiceLogic.Init()方法进行初始化。代码注释比较少,但已经够我们理解这段代码了。
/// Inits this instance. This method used at startup to initialize
/// all required server components
/// </summary>
public void Init()
{
Access Log#region Access Log
COM.Handler.LogHandler.Tracking(
"Access Method: " + this.GetType().ToString()+ "->" + ((object)MethodBase.GetCurrentMethod()).ToString() + " ;"
);
#endregion Access Log
COM.Handler.LogHandler.Force("Initializing Settings" + COM.Enums.LogCategory.ServiceStart.ToString());
Console.WriteLine(@"Welcome to indeXus.Net Shared Cache");
Console.WriteLine();
Console.WriteLine(COM.Handler.Config.DisplayAppSettings());
COM.Handler.LogHandler.Info(COM.Handler.Config.DisplayAppSettings());
// needs to be instantiated before TCP, it needs an instance of CachExpire
cacheExpireInstance = new CacheExpire();
// TCP needs an instance of CacheExpire
tcpInstance = new TcpServer(cacheExpireInstance);
COM.Handler.LogHandler.Force("Init and Start Thread Tcp");
COM.Handler.LogHandler.Force("Init and Start Thread CacheExpire");
// Init all extenders
// an extender is a class which initializes its own logic around
// specific issue within its own thread;
/**////
this.workerTcp = new Thread(this.tcpInstance.Init);
this.workerTcp.Name = "TCP Handler";
this.workerTcp.IsBackground = true;
this.workerTcp.Priority = ThreadPriority.Normal;
/**////
this.workerCacheExpire = new Thread(this.cacheExpireInstance.Init);
this.workerCacheExpire.Name = "Cache Expire Handler";
this.workerCacheExpire.IsBackground = true;
this.workerCacheExpire.Priority = ThreadPriority.Lowest;
/**////
this.workerCacheExpire.Start();
this.workerTcp.Start();
// enable the search of replicaiton servers
if (this.enableServiceFamilyMode)
{
NetworkDistribution.Init();
}
string msgThreadInfo = Environment.NewLine +
"Main Thread Id: " + Thread.CurrentThread.ManagedThreadId.ToString() + Environment.NewLine +
"this.workerTcp: " + this.workerTcp.ManagedThreadId.ToString() + Environment.NewLine +
/**//*"this.workerTimer: " + this.workerTimer.ManagedThreadId.ToString() + Environment.NewLine +*/
"this.workerCacheExpire: " + this.workerCacheExpire.ManagedThreadId.ToString();
Console.WriteLine(msgThreadInfo + Environment.NewLine);
Console.WriteLine("+ + + + + + + + + + + + + + + + + + + + + + + + + + + + ");
Console.WriteLine("server is ready to receive data.");
COM.Handler.LogHandler.Force("IndeXus.Net Service Started " + COM.Enums.LogCategory.ServiceStart.ToString());
}
这里启动2个Thread,一个负责执行CacheExpire.Init()方法,负责定时轮查缓Cache中设置有过期策略的对象,如果有到期的,就从Cache中清除。另一个Thread负责监听TCP端口,然后每有一个新的客户端连接过来,又启动一个新的线程处理。
接下来转到MergeSystem.Indexus.WinService.TcpServer.Init()方法:
{
IPEndPoint endpoint = COM.Handler.Network.GetServerAnyIPEndPoint(this.cacheIpPort);
serverSocket = new Socket(endpoint.Address.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
serverSocket.Bind(endpoint);
serverSocket.Listen((int)SocketOptionName.MaxConnections);
// setup listener issues
acceptThread = new Thread(AcceptConnections);
acceptThread.IsBackground = true;
acceptThread.Priority = ThreadPriority.Normal;
acceptThread.Start();
}
这里启动一个新线程监听TCP 48888(默认值)端口,跟到AcceptConnections方法:
{
while (true)
{
// Accept a connection
Socket socket = serverSocket.Accept();
ConnectionInfo connection = new ConnectionInfo();
connection.Socket = socket;
// Modified:06-01-2008 Merge System GmbH, rschuetz : for more information checkout the following link> http://netrsc.blogspot.com/2007/02/another-extract-from-scalenet-threading.html
ThreadPool.QueueUserWorkItem(this.ProcessConnection, connection);
// Store the socket in the open connections
lock (this.connections)
{
this.connections.Add(connection);
}
}
}
这个比较简单,只是在一个死循环中等待客户端连接,等到后要线程池中执行ProcessConnection方法来处理,此线程继续等待下一个请求。跟到ProcessConnection方法:
{
ConnectionInfo connection = (ConnectionInfo)stats;
try
{
// read data from Socket and convert it to an IndeXusMessage
COM.IndexusMessage msg = new COM.Handler.NetworkMessage().ProcessNetworkMessage(connection.Socket);
if (msg != null)
{
this.ProcessMessage(msg, connection);
}
else
{
Console.WriteLine("Error appears due processing network message!");
COM.Handler.LogHandler.Error("Error appears due processing network message!");
}
}
catch (OutOfMemoryException ex)
{
}
catch (Exception ex)
{
}
finally
{
connection.Socket.Close();
lock (this.connections)
this.connections.Remove(connection);
}
}
这里只调用了COM.Handler.NetworkMessage().ProcessNetworkMessage()方法将客户端发过来的内容还原为IndexusMessage 对象,然后交给ProcessMessage()方法处理:
{
// check status first [Request || Response]
switch (msg.Status)
{
case COM.IndexusMessage.StatusValue.Request:
{
request case#region request case
COM.IndexusMessage replicationMessage = null;
if (ServiceLogic.NetworkDistribution.ReplicationEnabled)
{
// create a new object of received msg. to broadcast on servers.
replicationMessage = new COM.IndexusMessage(new Random().Next(), msg.Action, COM.IndexusMessage.StatusValue.Request,new KeyValuePair<string,byte[]>(msg.Key, msg.Payload));
}
// switch over cache actions
switch (msg.Action)
{
case COM.IndexusMessage.ActionValue.Ping:
{
}
case COM.IndexusMessage.ActionValue.Add:
{
}
case COM.IndexusMessage.ActionValue.Get:
{
}
case COM.IndexusMessage.ActionValue.Remove:
{
}
case COM.IndexusMessage.ActionValue.RemoveAll:
{
}
case COM.IndexusMessage.ActionValue.GetAllKeys:
{
}
case COM.IndexusMessage.ActionValue.Statistic:
{
}
}
break;
#endregion request case
}
case COM.IndexusMessage.StatusValue.Response:
{
}
}
}
这里分类对消息进行处理,如添加到缓存或移除等。下面仔细看下Add分支:
{
Add Case#region Add Case
if (writeStats)
Console.WriteLine("Message Action: {0}", msg.Action.ToString());
Console.WriteLine(@"Adding new Item with Key: {0}", msg.Key);
lock (bulkObject)
{
LocalCache.Add(new KeyValuePair<string, byte[]>(msg.Key, msg.Payload));
// if the given msg expires is not MaxValue
// it will be listed sub-process which clean
// up in iterations the cache.
// QuickFix
if (msg.Expires != DateTime.MaxValue)
{
if (this.expire != null)
{
this.expire.Expire.DumpCacheItemAt(msg.Key, msg.Expires);
}
}
// update cleanup list with new object
CacheCleanup.Update(msg);
}
msg.Action = COM.IndexusMessage.ActionValue.Successful;
rschuetz: MODIFIED: 21-07-2007: distribute object over wire to other installations#region rschuetz: MODIFIED: 21-07-2007: distribute object over wire to other installations
// Question is if the client needs to wait until this happens,
// or should the client first get an answer and just then it
// will distribute it.
if (ServiceLogic.NetworkDistribution.ReplicationEnabled)
{
ServiceLogic.NetworkDistribution.Replicate(replicationMessage);
}
#endregion
// send object back throug the connection
connection.Socket.Send(msg.GetBytes());
// handle max size and purge issues
//DateTime startTime = DateTime.Now;
if (this.cacheAmountOfObjects != -1 && this.cacheAmountOfObjects <= LocalCache.CalculatedCacheSize && !CacheCleanup.PurgeIsRunning)
{
Console.WriteLine(@"Current Size of Cache: {0} ; {1} ", LocalCache.CalculatedCacheSize, LocalCache.CalculatedCacheSize <= 0 ? 0 : LocalCache.CalculatedCacheSize / (1024 * 1024));
List<string> remove = CacheCleanup.Purge(LocalCache.CalculatedCacheSize);
if (remove != null)
{
lock (bulkObject)
{
foreach (string s in remove)
{
LocalCache.Remove(s);
}
}
}
}
break;
#endregion Add Case
}
主要完成了4个功能:1.将数据保存到LocalCache中;2.如果要缓存的对象设置有过期时间,则在expire中保存一份存根;3.如果配置有复制服务器,则分发到各兄弟节点;4.如果缓存对象超出了最大值,则使用配置的策略靖仓。LocalCache是MergeSystem.Indexus.WinServiceCommon.Cache类型的静态成员,expire是WinServiceCommon.CacheExpire类型的静态成员。
分析到这里,ShartCache的大概思路已经很清晰了。
ShartCache的核心是MergeSystem.Indexus.WinServiceCommon.Cache和MergeSystem.Indexus.WinServiceCommon.CacheExpire这二个类。WinServiceCommon.Cache中使用一个字典对象readonly Dictionary<string, byte[]> dict;来存储需要缓存的数据,因为要缓存的数据都已经序列化通过TCP传输过来,这里就直接保存byte[]类型的了。WinServiceCommon.CacheExpire类中也有一个字典对象private static Dictionary<string, DateTime> expireTable = null;所有设置有过期时间的缓存对象,都在这里有个存根,过期的缓存有WinServiceCommon.CacheExpire负责清除。
来源:wuchang.cnblogs.com
转载于:https://www.cnblogs.com/wuchang/archive/2008/02/07/1065843.html
SharedCache分析:服务端程序相关推荐
- 服务端程序的初步实现
文章目录 1 服务端程序的初步实现 1.1 设计实现 1.2 代码实现 1 服务端程序的初步实现 1.1 设计实现 服务端设计初步: 设计要素分析: 一般情况下,聊天服务端只负责消息传递. 客户端的连 ...
- 也谈如何构建高性能服务端程序
引子:我接触过很多编程语言,接触过各种各样的服务器端开发,Java,Go,Ruby,Javascript等语言,Spring,Node.js,Rails等等常见服务器端框架和编程模型都有接触.这里谈一 ...
- 基于半同步/半反应堆线程池实现的HTTP解析服务端程序
简介: 半同步/半反应堆线程池是通过一个线程往工作队列添加任务T,然后工作线程竞争工作队列获得任务T.HTTP请求解析服务端程序:逐行解析客户端发送来的HTTP请求然后作出HTTP回答.采用线程池就是 ...
- 三角定位法java代码_GitHub - megagao/IndoorPos: 这是一个采用蓝牙4.0--iBeacon技术的室内定位服务端程序。...
IndoorPos 这是一个采用iBeacon技术的室内定位服务端程序,里面包含了三种定位算法,三边定位算法.加权三边定位算法和三角形加权质心定位算法.程序采用Spring框架,简化了JDBC和RMI ...
- winform服务器消息推送,winform项目——仿QQ即时通讯程序12:服务端程序补充及优化...
原标题:winform项目--仿QQ即时通讯程序12:服务端程序补充及优化 上一篇文章大概完成了服务端程序,今天继续做项目的时候发现还有一些功能没有做,还有几处地方不够完善.不做好就会影响客户端程序的 ...
- TCP服务端程序开发
TCP服务端程序开发 1. 开发 TCP 服务端程序开发步骤回顾 创建服务端端套接字对象 绑定端口号 设置监听 等待接受客户端的连接请求 接收数据 发送数据 关闭套接字 2. socket 类的介绍 ...
- 【技术分享】linux各种一句话反弹shell总结——攻击者指定服务端,受害者主机(无公网IP)主动连接攻击者的服务端程序(CC server),开启一个shell交互,就叫反弹shell。...
反弹shell背景: 想要搞清楚这个问题,首先要搞清楚什么是反弹,为什么要反弹. 假设我们攻击了一台机器,打开了该机器的一个端口,攻击者在自己的机器去连接目标机器(目标ip:目标机器端口),这是比较常 ...
- 如何在golang http服务端程序中读取2次Request Body?(转)
转自知乎:如何在golang http服务端程序中读取2次Request Body? - 知乎 提问: 在golang http服务端程序中,我想在真正处理Request Body之前将Body中的内 ...
- 《精通并发与Netty》学习笔记(02 - 服务端程序编写)
上节我们介绍了开发netty项目所必需的开发环境及工具的使用,这节我们来写第一个netty项目 开发步骤 第一步:打开https://search.maven.org 找到netty依赖库 第二步:打 ...
最新文章
- VS Code 常用快捷键
- 8月20日全球六大国际域名解析量变化情况统计报告
- JavaScript学习(四十一)—字面量形式创建对象的注意事项和不足
- spyder里import tensorflow报错显示没有tensorflow模块解决
- 将查询出来的数据按照一个字段分组且排序过程中,遇到的一些有关group的问题(分组排序应该使用partition by)
- java课时,java学习笔记_课时一
- C#测绘兰勃特墨卡托投影
- Unity3D for iOS初级教程:Part 3/3
- android eclipse ddms 查看线程,android ddms中查看线程释疑
- linux mint 环境配置jimi
- 如何用Flutter实现跨平台移动开发
- shell脚本里的#!/bin/bash是什么意思
- halide 资源整理
- 图像的基本操作--反转、放大等
- k8s部署微服务组件eureka
- 首批实现QQ在RTOS端应用部署 展锐持续深耕穿戴生态
- shell脚本编程学习笔记8(XDL)——流程控制和循环
- 俄罗斯、乌克兰程序员薪资大曝光!
- u盘ios刻录_无需越狱易操作 iPhone/iPad也能当U盘
- html格式简历如何转换,我用HTML写简历
热门文章
- 3d打印机 form3_桌面SLA卖掉5万台后,Formlabs发布新机型Form3和Form 3L
- android 沙箱 逆向,【原创】沙箱Sandboxie v3.40 逆向完整源码
- 一般信道容量迭代算法c语言,离散信道容量的迭代算法
- android 绘图流程,Android View绘制流程
- Python用正则表达式匹配ABAC和AABB的词语
- 运行python的两种方式磁盘式_python计算机基础-Day1
- 20200218:不同路径(leetcode62)
- php后端路由,laravel实现前后台路由分离的方法
- Androidstudio设置Ctrl+W关闭当前编辑的页面窗口
- AlertDialog源码解析之一