Tcp消息传输主要参照surging来做的,做了部分裁剪和改动,详细参见:https://github.com/dotnetcore/surging

  Json-rpc没有定义消息如何传输,因此,Json-Rpc RpcRequest对象和RpcResponse对象需要一个传输载体,这里的传输对象主是TransportMessage,如下代码,这里的Content请求时为RcpRequest对象,答复时为RpcResponse对象,答复时Header一般情况下为空。

/// <summary>/// 传输消息模型。/// </summary>public class TransportMessage{public TransportMessage(){}[MethodImpl(MethodImplOptions.AggressiveInlining)]public TransportMessage(object content,object headers){if (content == null)throw new ArgumentNullException(nameof(content));Content = content;Headers = headers;ContentType = content.GetType().FullName;}[MethodImpl(MethodImplOptions.AggressiveInlining)]public TransportMessage(object content, object headers, string fullName){if (content == null)throw new ArgumentNullException(nameof(content));Headers = headers;Content = content;ContentType = fullName;}/// <summary>/// 消息Id。/// </summary>public string Id { get; set; }/// <summary>/// 消息内容。/// </summary>public object Content { get; set; }/// <summary>/// 消息传输Header/// </summary>public object Headers { get; set; }/// <summary>/// 内容类型。/// </summary>public string ContentType { get; set; }/// <summary>/// 是否调用消息。/// </summary>/// <returns>如果是则返回true,否则返回false。</returns>
        [MethodImpl(MethodImplOptions.AggressiveInlining)]public bool IsInvokeMessage(){return ContentType == MessagePackTransportMessageType.jsonRequestTypeName;}/// <summary>/// 是否是调用结果消息。/// </summary>/// <returns>如果是则返回true,否则返回false。</returns>
        [MethodImpl(MethodImplOptions.AggressiveInlining)]public bool IsInvokeResultMessage(){return ContentType == MessagePackTransportMessageType.jsonResponseTypeName;}/// <summary>/// 获取内容。/// </summary>/// <typeparam name="T">内容类型。</typeparam>/// <returns>内容实例。</returns>
        [MethodImpl(MethodImplOptions.AggressiveInlining)]public T GetContent<T>(){return (T)Content;}/// <summary>/// 获取Header。/// </summary>/// <typeparam name="T">Header类型。</typeparam>/// <returns>Header实例。</returns>
        [MethodImpl(MethodImplOptions.AggressiveInlining)]public T GetHeaders<T>(){return (T)Headers;}/// <summary>/// 创建一个调用传输消息。/// </summary>/// <param name="invokeMessage">调用实例。</param>/// <returns>调用传输消息。</returns>  public static TransportMessage CreateInvokeMessage(JsonRequest invokeMessage,NameValueCollection nameValueCollection){return new TransportMessage(invokeMessage, nameValueCollection, MessagePackTransportMessageType.jsonRequestTypeName){Id = Guid.NewGuid().ToString("N")};}/// <summary>/// 创建一个调用结果传输消息。/// </summary>/// <param name="id">消息Id。</param>/// <param name="invokeResultMessage">调用结果实例。</param>/// <returns>调用结果传输消息。</returns>  public static TransportMessage CreateInvokeResultMessage(string id, JsonResponse jsonResponse,NameValueCollection nameValueCollection){return new TransportMessage(jsonResponse, nameValueCollection, MessagePackTransportMessageType.jsonResponseTypeName){Id = id};}}

  TransportMessage需要在dotnetty中传输,则需要对TransportMessage进行编码解码

消息编解码器

public interface ITransportMessageEncoder{byte[] Encode(TransportMessage message);
}
public interface ITransportMessageDecoder{TransportMessage Decode(byte[] data);}

Json编解码

  平时编码中经常用的方式

public sealed class JsonTransportMessageEncoder : ITransportMessageEncoder{#region Implementation of ITransportMessageEncoderpublic byte[] Encode(TransportMessage message){var content = JsonConvert.SerializeObject(message);return Encoding.UTF8.GetBytes(content);}#endregion Implementation of ITransportMessageEncoder
}public sealed class JsonTransportMessageDecoder : ITransportMessageDecoder{#region Implementation of ITransportMessageDecoderpublic TransportMessage Decode(byte[] data){var content = Encoding.UTF8.GetString(data);var message = JsonConvert.DeserializeObject<TransportMessage>(content);if (message.IsInvokeMessage()){message.Content = JsonConvert.DeserializeObject<JsonRequest>(message.Content.ToString());}if (message.IsInvokeResultMessage()){message.Content = JsonConvert.DeserializeObject<JsonResponse>(message.Content.ToString());}return message;}#endregion Implementation of ITransportMessageDecoder}

MessagePack

  官网地址:https://msgpack.org/

  贴出代码,不过多的解释

[MessagePackObject]public class MessagePackTransportMessage{public MessagePackTransportMessage(TransportMessage transportMessage){Id = transportMessage.Id;ContentType = transportMessage.ContentType;object contentObject;if (transportMessage.IsInvokeMessage()){contentObject = new MessagePackJsonRequest(transportMessage.GetContent<JsonRequest>());}else if (transportMessage.IsInvokeResultMessage()){contentObject = new MessagePackJsonResponse(transportMessage.GetContent<JsonResponse>());}else{throw new NotSupportedException($"无法支持的消息类型:{ContentType}!");}Content = SerializerUtilitys.Serialize(contentObject);var headersObject = transportMessage.GetHeaders<NameValueCollection>();Headers = SerializerUtilitys.Serialize(JsonConvert.SerializeObject(MessagePackTransportMessageType.NvcToDictionary(headersObject)));}public MessagePackTransportMessage(){}[Key(0)]public string Id { get; set; }[Key(1)]public byte[] Content { get; set; }[Key(2)]public byte[] Headers { get; set; }[Key(3)]public string ContentType { get; set; }[MethodImpl(MethodImplOptions.AggressiveInlining)]public bool IsInvokeMessage(){return ContentType == MessagePackTransportMessageType.jsonRequestTypeName;}[MethodImpl(MethodImplOptions.AggressiveInlining)]public bool IsInvokeResultMessage(){return ContentType == MessagePackTransportMessageType.jsonResponseTypeName;}public TransportMessage GetTransportMessage(){var message = new TransportMessage{ContentType = ContentType,Id = Id,Content = null,Headers = null,};object contentObject;if (IsInvokeMessage()){contentObject =SerializerUtilitys.Deserialize<MessagePackJsonRequest>(Content).GetJsonRequest();}else if (IsInvokeResultMessage()){contentObject =SerializerUtilitys.Deserialize<MessagePackJsonResponse>(Content).GetJsonResponse();}else{throw new NotSupportedException($"无法支持的消息类型:{ContentType}!");}message.Content = contentObject;var headers = SerializerUtilitys.Deserialize<string>(Headers);message.Headers = JsonConvert.DeserializeObject(headers);return message;}
}public sealed class MessagePackTransportMessageEncoder:ITransportMessageEncoder{#region Implementation of ITransportMessageEncoder[MethodImpl(MethodImplOptions.AggressiveInlining)]public byte[] Encode(TransportMessage message){var transportMessage = new MessagePackTransportMessage(message){Id = message.Id,ContentType = message.ContentType,};return SerializerUtilitys.Serialize(transportMessage);}#endregion Implementation of ITransportMessageEncoder}public sealed class MessagePackTransportMessageDecoder : ITransportMessageDecoder{#region Implementation of ITransportMessageDecoder[MethodImpl(MethodImplOptions.AggressiveInlining)]public TransportMessage Decode(byte[] data){var message = SerializerUtilitys.Deserialize<MessagePackTransportMessage>(data);return message.GetTransportMessage();}#endregion Implementation of ITransportMessageDecoder}

ProtoBuffer

  这个应该听得比较多

[ProtoContract]public class ProtoBufferTransportMessage{public ProtoBufferTransportMessage(TransportMessage transportMessage){Id = transportMessage.Id;ContentType = transportMessage.ContentType;object contentObject;if (transportMessage.IsInvokeMessage()){contentObject = new ProtoBufferJsonRequest(transportMessage.GetContent<JsonRequest>());}else if (transportMessage.IsInvokeResultMessage()){contentObject = new ProtoBufferJsonResponse(transportMessage.GetContent<JsonResponse>());}else{throw new NotSupportedException($"无法支持的消息类型:{ContentType}!");}Content = SerializerUtilitys.Serialize(contentObject);Headers = SerializerUtilitys.Serialize(transportMessage.GetHeaders<NameValueCollection>());}public ProtoBufferTransportMessage(){}[ProtoMember(1)]public string Id { get; set; }[ProtoMember(2)]public byte[] Content { get; set; }[ProtoMember(3)]public byte[] Headers { get; set; }[ProtoMember(4)]public string ContentType { get; set; }public bool IsInvokeMessage(){return ContentType == MessagePackTransportMessageType.jsonRequestTypeName;}public bool IsInvokeResultMessage(){return ContentType == MessagePackTransportMessageType.jsonResponseTypeName;}public TransportMessage GetTransportMessage(){var message = new TransportMessage{ContentType = ContentType,Id = Id,Content = null,Headers = null,};object contentObject;if (IsInvokeMessage()){contentObject =SerializerUtilitys.Deserialize<ProtoBufferJsonRequest>(Content).GetJsonRequest();}else if (IsInvokeResultMessage()){contentObject =SerializerUtilitys.Deserialize<ProtoBufferJsonResponse>(Content).GetJsonResponse();}else{throw new NotSupportedException($"无法支持的消息类型:{ContentType}!");}message.Content = contentObject;message.Headers = SerializerUtilitys.Deserialize<NameValueCollection>(Headers);return message;}
}public sealed class ProtoBufferTransportMessageEncoder : ITransportMessageEncoder{#region Implementation of ITransportMessageEncoderpublic byte[] Encode(TransportMessage message){var transportMessage = new ProtoBufferTransportMessage(message){Id = message.Id,ContentType = message.ContentType,};return SerializerUtilitys.Serialize(transportMessage);}#endregion Implementation of ITransportMessageEncoder}public sealed class ProtoBufferTransportMessageDecoder : ITransportMessageDecoder{#region Implementation of ITransportMessageDecoderpublic TransportMessage Decode(byte[] data){var message = SerializerUtilitys.Deserialize<ProtoBufferTransportMessage>(data);return message.GetTransportMessage();}#endregion Implementation of ITransportMessageDecoder}

转载于:https://www.cnblogs.com/spritekuang/p/10805754.html

企业级工作流解决方案(七)--微服务Tcp消息传输模型之消息编解码相关推荐

  1. 详解OSI七层网络模型 TCP/IP四层模型

    @TOC这里对OSI七层网络模型和TCP/IP四层模型及其下的各层和TCP协议的设计核心做出总结供大家参考,如有错误欢迎指出讨论! OSI七层网络模型 & TCP/IP四层模型 OSI七层模型 ...

  2. tcp/ip协议中消息传输对帧消息的操作

    2019独角兽企业重金招聘Python工程师标准>>> 接口:Framer.java: package com.tcpip;import java.io.IOException; i ...

  3. 企业级工作流解决方案(八)--微服务Tcp消息传输模型之服务端处理

    服务端启动 服务端启动主要做几件事情,1. 从配置文件读取服务配置(主要是服务监听端口和编解码配置),2. 注册编解码器工厂,3. 启动dotnetty监听端口,4. 读取配置文件,解析全局消息处理模 ...

  4. springcloud微服务架构开发实战:分布式消息总线

    消息总线的定义 前面在1.4.2节中强调过,在微服务架构中,经常会使用REST 服务或基于消息的通信机制. 在3.6节中也详细介绍了消息通信的实现方式.消息总线就是一种基于消息的通信机制. 消息总线是 ...

  5. 企业级工作流解决方案(十六)--工作流--工作流实体模型

    DDD思想强调先有领域实体模型定义,再有数据库设计,数据库只是做为领域模型的一种持久化介质,但是在工作中,还是习惯性的先做数据库设计,再翻译成领域实体模型.所有我还是以数据库设计为起点,讲解工作流整体 ...

  6. .NET Core微服务之基于EasyNetQ使用RabbitMQ消息队列

    Tip: 此篇已加入.NET Core微服务基础系列文章索引 一.消息队列与RabbitMQ 1.1 消息队列 "消息"是在两台计算机间传送的数据单位.消息可以非常简单,例如只包含 ...

  7. 微服务解决方案_微服务为您提供正确的解决方案

    微服务解决方案 I have been writing about Microservices for quite a few years, both its benefits and its dow ...

  8. 微服务架构开发实战:分布式消息总线,实现配置信息的自动更新

    实现配置信息的自动更新 在上一篇文章中节演示了集成Spring Cloud Bus 的过程.在示例中,当微服务实例启动的时候,可以去加载最新的配置信息.当时这种做法有一定的局限性,即只有在应用启动的过 ...

  9. OSI七层与TCP/IP四层模型

    2.层次关系 一.物理层:    目的:保证原始数据比特流的无误传输.    任务:物理层定义电压.接口.线缆标准.传输距离等.物理层协议定义接口的四个基本功能特性:     机械特性:说明了接口所用 ...

最新文章

  1. [转] Java快速教程
  2. 项目进展情况如何更好地管理?
  3. VCSA 6.5 HA配置 之三 :准备工作
  4. 让FFMPEG支持实时流“伴随”转码
  5. python基础教程:装饰器
  6. 只有一个显示器但是显示两个显示器_小米34寸曲面显示器深度体验 办公体验极佳 但是还有个大弱点...
  7. 网校系统推荐eduline_网校系统的各项功能的用处
  8. 就算忘了自己也忘不了你
  9. 有趣的算法(四)最通俗易懂的KMP算法解析
  10. 如何理解互斥锁、条件锁、读写锁以及自旋锁?(转载)
  11. python基础学习
  12. html js实现分页代码,js分页代码示例
  13. 苹果cmsv10黑色炫酷自适应在线视频网站简约模板源码
  14. 解决微信调整字体大小导致页面样式混乱的问题
  15. OpenStack基金会更名,开源基础设施开启新十年
  16. 计算机科学导论第一章练习解答
  17. 【网易微专业】算法原理与实践 2
  18. 一个元素位于另一个元素之上,点击上面的元素引发下面元素事件操作
  19. Zabbix 配置钉钉告警
  20. 个人申请微信H5支付接口(个人免签约支付平台)

热门文章

  1. 测试面试题,自动化测试与性能测试篇(附答案)
  2. 2019服务器部署文件管理,在 Windows Server 2019 中部署文件共享见证 | Microsoft Docs
  3. 自动化测试——接口测试——增删改查
  4. C语言创建学生姓名分数链表,C语言编程 编写程序,建立一个学生数据链表,学生的数据包括学号、姓名、成绩。...
  5. 2019年参加迅雷链宣讲会日记
  6. 2022春季“金三银四”跳槽必备:软件测试面试题(附带答案)
  7. 技术提升为管理,最重要的能力是什么?
  8. 从【银行销冠】化身测试工程师,小哥这样实现了人生的逆转
  9. java socket 浏览器_java实现websocket(图文)
  10. python编程从入门到实践书中出错的地方_读书笔记「Python编程:从入门到实践」_10.文件和异常...