SharedCache由3个主要的项目组成MergeSystem.Indexus.WinServiceCommon、MergeSystem.Indexus.WinService和MergeSystem.Indexus.Notify。WinService可以以Windows服务方式加载,也可以以控制台方式运行,如果注册为Windows服务,则可以通过MergeSystem.Indexus.Notify程序来了解其状态,若是以控制台方式运行,则运行时的信息会在控制台窗口中显示。当然也可以配置通过NLog.dll记录日志。

大部分的功能都封装在WinServiceCommon项目中,WinService项目只负责监听和数据中转,这个项目中只有几个文件,如图1。

Indexus是个windows服务,也是程序的入口点,其类图如图2。

图1

图2

图3 ShareCache服务端程序主流程

Indexus可以接受几个命令行参数/install /i /uninstall /u来安装或卸载windows服务,若想以控制台方式启动程序,可以使用/local参数,如:
MergeSystem.Indexus.WinService.exe /local

或直接使用相同目录中的几个批处理文件来执行。来看一下main函数:

public static void Main(string[] args)
{
    Access Log#region Access Log
    COM.Handler.LogHandler.Tracking(
        "Access Method: " + ((object)MethodBase.GetCurrentMethod()).ToString() + " ;"
    );
    #endregion Access Log

    string optionalArgs = string.Empty;
    if (args.Length > 0)
    {
        optionalArgs = args[0];
    }
    args Handling#region args Handling
    if (!string.IsNullOrEmpty(optionalArgs))
    {
        if("/?".Equals(optionalArgs.ToLower()))
        {
            Console.WriteLine("Help Menu");
            Console.ReadLine();
            return;
        }
        else if("/local".Equals(optionalArgs.ToLower()))
        {
            Console.Title = "Indexus.Net Shared Cache - Server";
            Console.BackgroundColor = ConsoleColor.DarkBlue;
            Console.ForegroundColor = ConsoleColor.White;
            // running as cmd appliacation
            Indexus SrvIndexus = new Indexus();
            SrvIndexus.StartService();
            Console.ReadLine();
            SrvIndexus.StopService();
            return;
        }
        else if(@"/verbose".Equals(optionalArgs.ToLower()))
        {
            // Console.SetOut(this);
            // Console.SetIn(Console.Out);
            // Console.ReadLine();
            return;
        }
        TransactedInstaller ti = new TransactedInstaller();
        IndexusInstaller ii = new IndexusInstaller();
        ti.Installers.Add(ti);
        string path = string.Format("/assemblypath={0}", System.Reflection.Assembly.GetExecutingAssembly().Location);
        string[] cmd = { path };
        InstallContext context = new InstallContext(string.Empty, cmd);
        ti.Context = context;

        if ("/install".Equals(optionalArgs.ToLower()) || "/i".Equals(optionalArgs.ToLower()))
        {
            ti.Install(new Hashtable());
        }
        else if ("/uninstall".Equals(optionalArgs.ToLower()) || "/u".Equals(optionalArgs.ToLower()))
        {
            ti.Uninstall(null);
        }
        else
        {
            StringBuilder sb = new StringBuilder();
            sb.Append(@"Your provided Argument is not available." + Environment.NewLine);
            sb.Append(@"Use one of the following once:" + Environment.NewLine);
            sb.AppendFormat(@"To Install the service '{0}': '/install' or '/i'" + Environment.NewLine, @"IndeXus.Net");
            sb.AppendFormat(@"To Un-Install the service'{0}': '/uninstall' or '/u'" + Environment.NewLine, @"IndeXus.Net");
            Console.WriteLine(sb.ToString());
        }
    }
    else
    {
        // nothing received as input argument
        ServiceBase[] servicesToRun;
        servicesToRun = new ServiceBase[] { new Indexus() };
        ServiceBase.Run(servicesToRun);
    }
    #endregion args Handling
}

这里可以看到,加/local参数启动时,直接调用StartService()方法,然后就是和安装windows服务后启动服务一样了。

StartService中调用ServiceLogic.Init()方法进行初始化。代码注释比较少,但已经够我们理解这段代码了。

/**//// <summary>
/// Inits this instance. This method used at startup to initialize
/// all required server components
/// </summary>
public void Init()
{
    Access Log#region Access Log
    COM.Handler.LogHandler.Tracking(
        "Access Method: " + this.GetType().ToString()+ "->" + ((object)MethodBase.GetCurrentMethod()).ToString() + " ;"
    );
    #endregion Access Log

    COM.Handler.LogHandler.Force("Initializing Settings" + COM.Enums.LogCategory.ServiceStart.ToString());
    Console.WriteLine(@"Welcome to indeXus.Net Shared Cache");
    Console.WriteLine();
    Console.WriteLine(COM.Handler.Config.DisplayAppSettings());
    COM.Handler.LogHandler.Info(COM.Handler.Config.DisplayAppSettings());
    // needs to be instantiated before TCP, it needs an instance of CachExpire
    cacheExpireInstance = new CacheExpire();
    // TCP needs an instance of CacheExpire
    tcpInstance = new TcpServer(cacheExpireInstance);

    COM.Handler.LogHandler.Force("Init and Start Thread Tcp");
    COM.Handler.LogHandler.Force("Init and Start Thread CacheExpire");
    // Init all extenders
    // an extender is a class which initializes its own logic around
    // specific issue within its own thread;
    /**////
    this.workerTcp = new Thread(this.tcpInstance.Init);
    this.workerTcp.Name = "TCP Handler";
    this.workerTcp.IsBackground = true;
    this.workerTcp.Priority = ThreadPriority.Normal;
    /**////
    this.workerCacheExpire = new Thread(this.cacheExpireInstance.Init);
    this.workerCacheExpire.Name = "Cache Expire Handler";
    this.workerCacheExpire.IsBackground = true;
    this.workerCacheExpire.Priority = ThreadPriority.Lowest;
    /**////
    this.workerCacheExpire.Start();
    this.workerTcp.Start();

    // enable the search of replicaiton servers
    if (this.enableServiceFamilyMode)
    {
        NetworkDistribution.Init();
    } 

    string msgThreadInfo = Environment.NewLine + 
        "Main Thread Id: " + Thread.CurrentThread.ManagedThreadId.ToString() + Environment.NewLine +
        "this.workerTcp: " + this.workerTcp.ManagedThreadId.ToString() + Environment.NewLine +
        /**//*"this.workerTimer: " + this.workerTimer.ManagedThreadId.ToString() + Environment.NewLine +*/
        "this.workerCacheExpire: " + this.workerCacheExpire.ManagedThreadId.ToString();

    Console.WriteLine(msgThreadInfo + Environment.NewLine);
    Console.WriteLine("+ + + + + + + + + + + + + + + + + + + + + + + + + + + + ");
    Console.WriteLine("server is ready to receive data.");

    COM.Handler.LogHandler.Force("IndeXus.Net Service Started " + COM.Enums.LogCategory.ServiceStart.ToString());
}

这里启动2个Thread,一个负责执行CacheExpire.Init()方法,负责定时轮查缓Cache中设置有过期策略的对象,如果有到期的,就从Cache中清除。另一个Thread负责监听TCP端口,然后每有一个新的客户端连接过来,又启动一个新的线程处理。

接下来转到MergeSystem.Indexus.WinService.TcpServer.Init()方法:

public void Init()
{
   

    IPEndPoint endpoint = COM.Handler.Network.GetServerAnyIPEndPoint(this.cacheIpPort);

    serverSocket = new Socket(endpoint.Address.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
    serverSocket.Bind(endpoint);
    serverSocket.Listen((int)SocketOptionName.MaxConnections);

   

    // setup listener issues
    acceptThread = new Thread(AcceptConnections);
    acceptThread.IsBackground = true;
    acceptThread.Priority = ThreadPriority.Normal;
    acceptThread.Start();

   

}

这里启动一个新线程监听TCP 48888(默认值)端口,跟到AcceptConnections方法:

private void AcceptConnections()
{
   

    while (true)
    {
        // Accept a connection
        Socket socket = serverSocket.Accept();
        ConnectionInfo connection = new ConnectionInfo();
        connection.Socket = socket;

        // Modified:06-01-2008 Merge System GmbH, rschuetz : for more information checkout the following link> http://netrsc.blogspot.com/2007/02/another-extract-from-scalenet-threading.html
        ThreadPool.QueueUserWorkItem(this.ProcessConnection, connection);

        // Store the socket in the open connections
        lock (this.connections)
        {
            this.connections.Add(connection);
        } 

    }
}

这个比较简单,只是在一个死循环中等待客户端连接,等到后要线程池中执行ProcessConnection方法来处理,此线程继续等待下一个请求。跟到ProcessConnection方法:

private void ProcessConnection(object stats)
{
     

    ConnectionInfo connection = (ConnectionInfo)stats;

    try
    {
       

        // read data from Socket and convert it to an IndeXusMessage
        COM.IndexusMessage msg = new COM.Handler.NetworkMessage().ProcessNetworkMessage(connection.Socket);

        if (msg != null)
        {
            this.ProcessMessage(msg, connection);
        }
        else
        {
            Console.WriteLine("Error appears due processing network message!");
            COM.Handler.LogHandler.Error("Error appears due processing network message!");
        }
    }
    catch (OutOfMemoryException ex)
    {
       
    }
    catch (Exception ex)
    {
       

    }
    finally
    {
        connection.Socket.Close();
        lock (this.connections)
            this.connections.Remove(connection);
    }
}

这里只调用了COM.Handler.NetworkMessage().ProcessNetworkMessage()方法将客户端发过来的内容还原为IndexusMessage 对象,然后交给ProcessMessage()方法处理:

private void ProcessMessage(COM.IndexusMessage msg, ConnectionInfo connection)
{
   

    // check status first [Request || Response]
    switch (msg.Status)
    {
        case COM.IndexusMessage.StatusValue.Request:
            {
                request case#region request case
                COM.IndexusMessage replicationMessage = null;

                if (ServiceLogic.NetworkDistribution.ReplicationEnabled)
                {
                    // create a new object of received msg. to broadcast on servers.
                    replicationMessage = new COM.IndexusMessage(new Random().Next(), msg.Action, COM.IndexusMessage.StatusValue.Request,new KeyValuePair<string,byte[]>(msg.Key, msg.Payload));
                } 

               
                // switch over cache actions
                switch (msg.Action)
                {
                    case COM.IndexusMessage.ActionValue.Ping:
                        {
                           
                        }
                    case COM.IndexusMessage.ActionValue.Add:
                        {
                           
                        }
                    case COM.IndexusMessage.ActionValue.Get:
                        {
                          
                        }
                    case COM.IndexusMessage.ActionValue.Remove:
                        {
                           
                        }
                    case COM.IndexusMessage.ActionValue.RemoveAll:
                        {
                           
                        }
                    case COM.IndexusMessage.ActionValue.GetAllKeys:
                        {
                           
                        }
                    case COM.IndexusMessage.ActionValue.Statistic:
                        {
                           
                        }
                }
                break;
                #endregion request case
            }
        case COM.IndexusMessage.StatusValue.Response:
            {
               
            }
    } 

}

这里分类对消息进行处理,如添加到缓存或移除等。下面仔细看下Add分支:

case COM.IndexusMessage.ActionValue.Add:
    {
        Add Case#region Add Case
        if (writeStats)
            Console.WriteLine("Message Action: {0}", msg.Action.ToString());

        Console.WriteLine(@"Adding new Item with Key: {0}", msg.Key);
        lock (bulkObject)
        {
            LocalCache.Add(new KeyValuePair<string, byte[]>(msg.Key, msg.Payload));
            // if the given msg expires is not MaxValue
            // it will be listed sub-process which clean
            // up in iterations the cache.
            // QuickFix
            if (msg.Expires != DateTime.MaxValue)
            {
                if (this.expire != null)
                {
                    this.expire.Expire.DumpCacheItemAt(msg.Key, msg.Expires);
                }                                           
            }
            // update cleanup list with new object
            CacheCleanup.Update(msg);
        } 

        msg.Action = COM.IndexusMessage.ActionValue.Successful;

        rschuetz: MODIFIED: 21-07-2007: distribute object over wire to other installations#region rschuetz: MODIFIED: 21-07-2007: distribute object over wire to other installations
        // Question is if the client needs to wait until this happens,
        // or should the client first get an answer and just then it
        // will distribute it.
        if (ServiceLogic.NetworkDistribution.ReplicationEnabled)
        {
            ServiceLogic.NetworkDistribution.Replicate(replicationMessage);
        }
        #endregion 

        // send object back throug the connection
        connection.Socket.Send(msg.GetBytes());

        // handle max size and purge issues
        //DateTime startTime = DateTime.Now;
        if (this.cacheAmountOfObjects != -1 && this.cacheAmountOfObjects <= LocalCache.CalculatedCacheSize && !CacheCleanup.PurgeIsRunning)
        {
            Console.WriteLine(@"Current Size of Cache: {0} ; {1} ", LocalCache.CalculatedCacheSize, LocalCache.CalculatedCacheSize <= 0 ? 0 : LocalCache.CalculatedCacheSize / (1024 * 1024));
            List<string> remove = CacheCleanup.Purge(LocalCache.CalculatedCacheSize);
            if (remove != null)
            {
                lock (bulkObject)
                {
                    foreach (string s in remove)
                    {
                        LocalCache.Remove(s);
                    }
                }
            }
        }
        break;
        #endregion Add Case
    }

主要完成了4个功能:1.将数据保存到LocalCache中;2.如果要缓存的对象设置有过期时间,则在expire中保存一份存根;3.如果配置有复制服务器,则分发到各兄弟节点;4.如果缓存对象超出了最大值,则使用配置的策略靖仓。LocalCache是MergeSystem.Indexus.WinServiceCommon.Cache类型的静态成员,expire是WinServiceCommon.CacheExpire类型的静态成员。

分析到这里,ShartCache的大概思路已经很清晰了。

ShartCache的核心是MergeSystem.Indexus.WinServiceCommon.Cache和MergeSystem.Indexus.WinServiceCommon.CacheExpire这二个类。WinServiceCommon.Cache中使用一个字典对象readonly Dictionary<string, byte[]> dict;来存储需要缓存的数据,因为要缓存的数据都已经序列化通过TCP传输过来,这里就直接保存byte[]类型的了。WinServiceCommon.CacheExpire类中也有一个字典对象private static Dictionary<string, DateTime> expireTable = null;所有设置有过期时间的缓存对象,都在这里有个存根,过期的缓存有WinServiceCommon.CacheExpire负责清除。

来源:wuchang.cnblogs.com

转载于:https://www.cnblogs.com/wuchang/archive/2008/02/07/1065843.html

SharedCache分析:服务端程序相关推荐

  1. 服务端程序的初步实现

    文章目录 1 服务端程序的初步实现 1.1 设计实现 1.2 代码实现 1 服务端程序的初步实现 1.1 设计实现 服务端设计初步: 设计要素分析: 一般情况下,聊天服务端只负责消息传递. 客户端的连 ...

  2. 也谈如何构建高性能服务端程序

    引子:我接触过很多编程语言,接触过各种各样的服务器端开发,Java,Go,Ruby,Javascript等语言,Spring,Node.js,Rails等等常见服务器端框架和编程模型都有接触.这里谈一 ...

  3. 基于半同步/半反应堆线程池实现的HTTP解析服务端程序

    简介: 半同步/半反应堆线程池是通过一个线程往工作队列添加任务T,然后工作线程竞争工作队列获得任务T.HTTP请求解析服务端程序:逐行解析客户端发送来的HTTP请求然后作出HTTP回答.采用线程池就是 ...

  4. 三角定位法java代码_GitHub - megagao/IndoorPos: 这是一个采用蓝牙4.0--iBeacon技术的室内定位服务端程序。...

    IndoorPos 这是一个采用iBeacon技术的室内定位服务端程序,里面包含了三种定位算法,三边定位算法.加权三边定位算法和三角形加权质心定位算法.程序采用Spring框架,简化了JDBC和RMI ...

  5. winform服务器消息推送,winform项目——仿QQ即时通讯程序12:服务端程序补充及优化...

    原标题:winform项目--仿QQ即时通讯程序12:服务端程序补充及优化 上一篇文章大概完成了服务端程序,今天继续做项目的时候发现还有一些功能没有做,还有几处地方不够完善.不做好就会影响客户端程序的 ...

  6. TCP服务端程序开发

    TCP服务端程序开发 1. 开发 TCP 服务端程序开发步骤回顾 创建服务端端套接字对象 绑定端口号 设置监听 等待接受客户端的连接请求 接收数据 发送数据 关闭套接字 2. socket 类的介绍 ...

  7. 【技术分享】linux各种一句话反弹shell总结——攻击者指定服务端,受害者主机(无公网IP)主动连接攻击者的服务端程序(CC server),开启一个shell交互,就叫反弹shell。...

    反弹shell背景: 想要搞清楚这个问题,首先要搞清楚什么是反弹,为什么要反弹. 假设我们攻击了一台机器,打开了该机器的一个端口,攻击者在自己的机器去连接目标机器(目标ip:目标机器端口),这是比较常 ...

  8. 如何在golang http服务端程序中读取2次Request Body?(转)

    转自知乎:如何在golang http服务端程序中读取2次Request Body? - 知乎 提问: 在golang http服务端程序中,我想在真正处理Request Body之前将Body中的内 ...

  9. 《精通并发与Netty》学习笔记(02 - 服务端程序编写)

    上节我们介绍了开发netty项目所必需的开发环境及工具的使用,这节我们来写第一个netty项目 开发步骤 第一步:打开https://search.maven.org 找到netty依赖库 第二步:打 ...

最新文章

  1. VS Code 常用快捷键
  2. 8月20日全球六大国际域名解析量变化情况统计报告
  3. JavaScript学习(四十一)—字面量形式创建对象的注意事项和不足
  4. spyder里import tensorflow报错显示没有tensorflow模块解决
  5. 将查询出来的数据按照一个字段分组且排序过程中,遇到的一些有关group的问题(分组排序应该使用partition by)
  6. java课时,java学习笔记_课时一
  7. C#测绘兰勃特墨卡托投影
  8. Unity3D for iOS初级教程:Part 3/3
  9. android eclipse ddms 查看线程,android ddms中查看线程释疑
  10. linux mint 环境配置jimi
  11. 如何用Flutter实现跨平台移动开发
  12. shell脚本里的#!/bin/bash是什么意思
  13. halide 资源整理
  14. 图像的基本操作--反转、放大等
  15. k8s部署微服务组件eureka
  16. 首批实现QQ在RTOS端应用部署 展锐持续深耕穿戴生态
  17. shell脚本编程学习笔记8(XDL)——流程控制和循环
  18. 俄罗斯、乌克兰程序员薪资大曝光!
  19. u盘ios刻录_无需越狱易操作 iPhone/iPad也能当U盘
  20. html格式简历如何转换,我用HTML写简历

热门文章

  1. 3d打印机 form3_桌面SLA卖掉5万台后,Formlabs发布新机型Form3和Form 3L
  2. android 沙箱 逆向,【原创】沙箱Sandboxie v3.40 逆向完整源码
  3. 一般信道容量迭代算法c语言,离散信道容量的迭代算法
  4. android 绘图流程,Android View绘制流程
  5. Python用正则表达式匹配ABAC和AABB的词语
  6. 运行python的两种方式磁盘式_python计算机基础-Day1
  7. 20200218:不同路径(leetcode62)
  8. php后端路由,laravel实现前后台路由分离的方法
  9. Androidstudio设置Ctrl+W关闭当前编辑的页面窗口
  10. AlertDialog源码解析之一