服务端启动

  服务端启动主要做几件事情,1. 从配置文件读取服务配置(主要是服务监听端口和编解码配置),2. 注册编解码器工厂,3. 启动dotnetty监听端口,4. 读取配置文件,解析全局消息处理模型5. 注册服务端处理对象到容器。

  JsonRpcServerModule代码如下,见备注说明

[DependsOn(typeof(AbpKernelModule))]public class JsonRpcServerModule : AbpModule{public override void PreInitialize(){// 注册客户端配置,固定从Xml文件读取SocketServiceConfiguration socketServiceConfiguration = XmlConfigProvider.GetConfig<SocketServiceConfiguration>("SocketServiceConfiguration.xml");IocManager.IocContainer.Register(Component.For<ISocketServiceConfiguration>().Instance(socketServiceConfiguration));IocManager.Register<IServiceExecutor, DefaultServiceExecutor>(Dependency.DependencyLifeStyle.Singleton);}public override void Initialize(){IocManager.RegisterAssemblyByConvention(typeof(JsonRpcServerModule).GetAssembly());var socketServiceConfiguration = Configuration.Modules.RpcServiceConfig();switch (socketServiceConfiguration.MessageCode) //  根据配置文件,编解码配置选择
            {case EMessageCode.Json:IocManager.RegisterIfNot<ITransportMessageCodecFactory, JsonTransportMessageCodecFactory>(Dependency.DependencyLifeStyle.Singleton);break;case EMessageCode.MessagePack:IocManager.RegisterIfNot<ITransportMessageCodecFactory, MessagePackTransportMessageCodecFactory>(Dependency.DependencyLifeStyle.Singleton);break;case EMessageCode.ProtoBuffer:IocManager.RegisterIfNot<ITransportMessageCodecFactory, ProtoBufferTransportMessageCodecFactory>(Dependency.DependencyLifeStyle.Singleton);break;default:break;}RegisterDefaultProtocol();}public override void PostInitialize(){var socketServiceConfiguration = IocManager.Resolve<ISocketServiceConfiguration>();// 方法里面调用ServiceHost构造函数传入的委托,启动dotnetty监听IocManager.Resolve<IServiceHost>().StartAsync(new IpAddressModel("0.0.0.0", socketServiceConfiguration.Port).CreateEndPoint());// 从配置文件读取json-rpc服务配置,解析消息处理模型
            JsonRpcRegister.LoadFromConfig(IocManager);}private void RegisterDefaultProtocol(){var dotNettyServerMessageListener = new DotNettyServerMessageListener(Logger,IocManager.Resolve<ITransportMessageCodecFactory>(), IocManager.Resolve<ISocketServiceConfiguration>());IocManager.IocContainer.Register(Component.For<IMessageListener>().Instance(dotNettyServerMessageListener));var serviceExecutor = IocManager.Resolve<IServiceExecutor>();// 新建一个ServiceHost对象,放入容器,这个时候dotnetty还未启动,只是定义了执行方法。var serverHost = new DefaultServiceHost(async endPoint =>{await dotNettyServerMessageListener.StartAsync(endPoint); // 启动dotnetty监听return dotNettyServerMessageListener;}, serviceExecutor);IocManager.IocContainer.Register(Component.For<IServiceHost>().Instance(serverHost));}
}

  Dotnetty启动监听代码,参考dotnetty提供的实例代码,ServerHandler为自定义消息处理Chanel

/// <summary>/// 触发接收到消息事件。/// </summary>/// <param name="sender">消息发送者。</param>/// <param name="message">接收到的消息。</param>/// <returns>一个任务。</returns>public async Task OnReceived(IMessageSender sender, TransportMessage message){if (Received == null)return;await Received(sender, message);}
public async Task StartAsync(EndPoint endPoint){_logger.Debug($"准备启动服务主机,监听地址:{endPoint}。");IEventLoopGroup bossGroup = new MultithreadEventLoopGroup(1);IEventLoopGroup workerGroup = new MultithreadEventLoopGroup();//Default eventLoopCount is Environment.ProcessorCount * 2var bootstrap = new ServerBootstrap();bossGroup = new MultithreadEventLoopGroup(1);workerGroup = new MultithreadEventLoopGroup();bootstrap.Channel<TcpServerSocketChannel>();bootstrap.Option(ChannelOption.SoBacklog, _socketServiceConfiguration.Backlog).ChildOption(ChannelOption.Allocator, PooledByteBufferAllocator.Default).Group(bossGroup, workerGroup).ChildHandler(new ActionChannelInitializer<IChannel>(channel =>{var pipeline = channel.Pipeline;pipeline.AddLast(new LengthFieldPrepender(4));pipeline.AddLast(new LengthFieldBasedFrameDecoder(int.MaxValue, 0, 4, 0, 4));pipeline.AddLast(new TransportMessageChannelHandlerAdapter(_transportMessageDecoder));pipeline.AddLast(new ServerHandler(async (contenxt, message) =>{var sender = new DotNettyServerMessageSender(_transportMessageEncoder, contenxt);await OnReceived(sender, message);}, _logger));}));try{_channel = await bootstrap.BindAsync(endPoint);_logger.Debug($"服务主机启动成功,监听地址:{endPoint}。");}catch{_logger.Error($"服务主机启动失败,监听地址:{endPoint}。 ");}}

  消息最终经过解码处理之后,会落到DefaultServiceExecutor类进行处理,在这里调用JsonRpcProcessor静态类的Process方法,处理Json-Rpc请求,并构造答复消息,答复客户端。

public class DefaultServiceExecutor : IServiceExecutor{private readonly ILogger _logger;public DefaultServiceExecutor(ILogger logger){_logger = logger;}public async Task ExecuteAsync(IMessageSender sender, TransportMessage message){_logger.Debug("服务提供者接收到消息");if (!message.IsInvokeMessage())return;JsonRequest jsonRequest;try{jsonRequest = message.GetContent<JsonRequest>();}catch (Exception exception){_logger.Error("将接收到的消息反序列化成 TransportMessage<JsonRequest> 时发送了错误。", exception);return;}_logger.Debug("准备执行本地逻辑。");var resultMessage = await LocalExecuteAsync(jsonRequest, message.Headers);//向客户端发送调用结果。await SendRemoteInvokeResult(sender, message.Id, JsonConvert.DeserializeObject<JsonResponse>(resultMessage));}private async Task<string> LocalExecuteAsync(JsonRequest jsonRequest,object headers){return await JsonRpcProcessor.Process(JsonConvert.SerializeObject(jsonRequest), headers);}private async Task SendRemoteInvokeResult(IMessageSender sender, string messageId, JsonResponse resultMessage){try{_logger.Debug("准备发送响应消息。");await sender.SendAndFlushAsync(TransportMessage.CreateInvokeResultMessage(messageId, resultMessage, new NameValueCollection()));_logger.Debug("响应消息发送成功。");}catch (Exception exception){_logger.Error("发送响应消息时候发生了异常。", exception);}}}

这部分内容没有太多的说明,参见surging

转载于:https://www.cnblogs.com/spritekuang/p/10805768.html

企业级工作流解决方案(八)--微服务Tcp消息传输模型之服务端处理相关推荐

  1. 企业级工作流解决方案(七)--微服务Tcp消息传输模型之消息编解码

    Tcp消息传输主要参照surging来做的,做了部分裁剪和改动,详细参见:https://github.com/dotnetcore/surging Json-rpc没有定义消息如何传输,因此,Jso ...

  2. 企业级工作流解决方案(十六)--工作流--工作流实体模型

    DDD思想强调先有领域实体模型定义,再有数据库设计,数据库只是做为领域模型的一种持久化介质,但是在工作中,还是习惯性的先做数据库设计,再翻译成领域实体模型.所有我还是以数据库设计为起点,讲解工作流整体 ...

  3. 企业级工作流解决方案(十五)--集成Abp和ng-alain--Abp其他改造

    配置功能增强 Abp定义了各种配置接口,但是没有定义这些配置数据从哪里来,但是管理配置数据对于一个应用程序来说,是必不可少的一件事情. .net的配置数据管理,一般放在Web.config文件或者Ap ...

  4. 企业级工作流解决方案(十)--集成Abp和ng-alain--权限系统

    权限系统 应用系统离不开权限控制,权限中心不一定能抽象出所有的业务场景,这里定义的权限系统不一定能够满足所有的场景,但应该可以满足多数的业务需求. Abp的zero项目也定义了权限相关的表,但里面很多 ...

  5. 用python实现TCP协议传输功能(服务端代码)

    与客户端代码不同(客户端代码请看我的上一篇博客),服务端需要绑定端口号,设置监听服务,多了两个特殊的步骤,需要两行新的代码实现 准备:windows作为客户端,windows上安装网络调试助手,lin ...

  6. 大模型带来的MaaS(模型即服务)的商业模式将带来什么?

    目录 答:一场革命性的多行业变革 1.1开端 1.2技术基础 1.3前沿技术带来的新产品对现有生活的改变 1.4新的商业体系MaaS(模型即服务)的奠基 1.5MaaS(模型即服务)商业模式 1.6全 ...

  7. 「模型即服务AI」1分钟调用SOTA人脸检测,同时搭建时光相册小应用

    时光相册应用效果 团队模型.论文.博文.直播合集,点击此处浏览 一.物料 人脸检测:https://modelscope.cn/models/damo/cv_resnet101_face-detect ...

  8. 架构师图谱之微服务和消息队列

    概述 "架构师图谱"是一个很宏大的命题,特别是优秀的架构师自身也是"由点到面再到图",一点点成长积累起来,尝试写这篇文章的目的更多的是结合自身的一些架构.研发. ...

  9. ASP.NET XML Web 服务的工作流解决方案

    XML Web 服务通过创建组成终端对终端工作流解决方案的应用程序的方式而启用了一个强大的机制.对于在需要长期运行的情节(如那些在业务对业务的事务处理中发现的)之中,这样的解决方案是比较适宜的. Bi ...

最新文章

  1. 弱类型、强类型、动态类型、静态类型语言的区别是什么?
  2. python3爬虫入门教程-Python3爬虫学习入门教程
  3. VTK:几何对象之Pyramid
  4. Android 学习资源[转]
  5. rest php,REST介绍与REST在PHP中的应用
  6. 安装AdventureWorks2008R2示例数据库
  7. 设计师们做UI设计和交互设计、界面设计等一般会去什么网站呢?
  8. Linux ipv6设置
  9. 电子技术背后的数学本质【2】(反馈电路的分析和运算放大器)
  10. (中英)作文 —— 标题与小标题
  11. 2015070610 - 看到很多所谓的大牛
  12. CSAPP Lab2:Bomb Lab
  13. python技术学什么好呢_想自学一些实用的技术,学什么好?
  14. oracle 19c 由于MGA产生的文件扩展不足导致的ORA-04030
  15. 直播预告| CVPR专场四来了!
  16. C# 微信网页协议 代码记录
  17. (总结)密码破解之王:Ophcrack彩虹表(Rainbow Tables)原理详解(附:120G彩虹表下载)
  18. html5支持4k视频,【4K电影大礼包】目前压缩最好的五部4KHEVC(H.265)格式电影
  19. 人生有三苦人生四大乐事
  20. 【SystemVerilog基础】SystemVerilog语法之逻辑等(==)、全等(===)和匹配等(==?)

热门文章

  1. 有人要在「动物森友会」上开AI学术会议,我看你们就是在家想玩游戏吧
  2. GraphQL(二):GraphQL服务搭建
  3. golang版try..catch..
  4. 微信小程序开发系列一:微信小程序的申请和开发环境的搭建
  5. spring注解@Component、@Repository、@Service、@Controller
  6. 华为手机 开启LOGCAT
  7. [转]MySQL 5.6 my.cnf配置优化
  8. Windbg 基础命令 《第一篇》
  9. 【译】Linux系统和性能监控(2)
  10. 秋季4类疾病患者忌吃螃蟹