概述

  对于稍微熟悉这两个优秀的项目来说,每个内容单独介绍都不为过,本文只是简介并探讨如何将两部分内容合并起来,使其在某些场景下更适合、更高效。

  NetMQ:ZeroMQ的.Net版本,ZeroMQ简单来说就是局域网内的消息中间件(与MSMQ类似),包括了进程间通讯、点对点通讯、订阅模式通讯等等,底层用更“完美”的Socket实现,ZeroMQ实现了多语言、跨平台、高效率等诸多优势。详细介绍请参考ZeroMQ和NetMQ官方文档:http://zguide.zeromq.org/page:all#Chapter-Sockets-and-Patterns,http://netmq.readthedocs.org/en/latest/introduction/。

  Protocol Buffer:源自与Google内部的开源项目,作为高效的RPC消息协议,相比较Json、XML协议的消息格式,Protobuf在序列化以及数据大小上都具有十分明显的优势,跨平台,协议可读性也接近于Json等等。这里也推荐一篇文章:http://www.ibm.com/developerworks/cn/linux/l-cn-gpb/

定义Protobuf协议

  Protocol Buffer(简称Protobuf)是以.proto的脚本形式实现的通用语义形式,类似于Json格式:

message  WeatherMessage
{ enum CommandType {Debug=0;Weather=1;Other=2;}required CommandType Command=1 [default=Weather];optional string Content=2;message Loaction {required int32 East=1;required int32 North=2;}repeated Loaction UserLocation=3;
}

  这里的Message、required(必选属性)、optional(可有可无属性)、repeated(内部嵌套的类型属性)等都是proto的关键字,具体意义以及为关键字的功能大家可以查看官方文档,这里只介绍如何应用,或者Stephen Liu的文章也不错。

  当然,光定义脚本是不能实现应用的,还需要根据特定的编码语言进行描述,这里利用Protobuf-Net来实现.Net平台的协议实现。

  首先,下载软件包:https://code.google.com/p/protobuf-net/(肯能需要FQ)

  然后,解压并将刚才的.proto文件复制到文件夹ProtoGen下。

  最后,启动CMD并cd到ProtoGen文件夹目录下,运行命令:

  protogen -i: PBWeatherMessage.proto -0: PBWeatherMessage.cs -ns:ProtobufNameSpace

(-i指定了输入,-o指定了输出,-ns指定了生成代码的namespace)

  如果,正确的话(当然了,我给出的脚本是不会错的),就会生成一个PBWeatherMessage.cs文件,这样的话就可以将.cs文件加入到项目中当做一个纯粹的类来使用了。

代码中使用,就是类似于二进制序列化一样,只是这回序列化的是Protobuf专用的序列化方式而已。

  序列化:

                        #region Protobufvar weatherMsg = new NetMQAndProtocolBuffer.PBProtocol.WeatherMessage(){Command = PBProtocol.WeatherMessage.CommandType.Weather,Content = string.Format("{0} {1} {2}", zipcode, temperature, relhumidity), };using (var sm = new MemoryStream()){ProtoBuf.Serializer.Serialize<NetMQAndProtocolBuffer.PBProtocol.WeatherMessage>(sm, weatherMsg);publisher.Send(sm.ToArray());}#endregion

  反序列化:

                      var weatherMsg = new NetMQAndProtocolBuffer.PBProtocol.WeatherMessage();var receivedBytes = subscriber.Receive();using (var sm = new MemoryStream(receivedBytes)){weatherMsg = ProtoBuf.Serializer.Deserialize<NetMQAndProtocolBuffer.PBProtocol.WeatherMessage>(sm);}

  这里就简单介绍完了protobuf协议的使用,下面介绍一下NetMQ+Protobuf的使用。

NetMQ+Protobuf

  接下来我们来改造下NetMQ Sample中的Publisher-Subscriber模式:

  首先下载从GitHub上下载NetMQ Sample: https://github.com/zeromq/netmq

或者下载我的示例代码,其中包含了一个No Protobuf的工程,这个是直接摘自原作者的示例代码。

  服务端Publisher:

            using (var context = NetMQContext.Create())// NetMQ全局维护的Content上下文,建议只有一个并且使用完毕后及时回收。using (var publisher = context.CreatePublisherSocket())// 从Content上下文中创建CreatePublisherSocket,这里如果用其他四种模式之一需要Create其他类型。
            {publisher.Bind("tcp://127.0.0.1:5556");// Bind到指定的IP及端口。var rng = new Random();while (!stopRequested){int zipcode =  rng.Next(0, 99999);// 这里模拟一个随机命令编号(如果非10001,客户端直接丢弃此Publisher发布的消息,实现消息过滤)int temperature = rng.Next(-80, 135);int relhumidity = rng.Next(0, 90);publisher.Send(string.Format("{0} {1} {2}", zipcode, temperature, relhumidity));// 直接Send,干净整洁。
                }}

  客户端Subscriber:

 using (var context = NetMQContext.Create())// 创建全局NetMQ句柄,建议唯一,使用完毕及时回收。 using (var subscriber = context.CreateSubscriberSocket())// 创建Publisher-Subscriber模式的客户端监听。
            {subscriber.Connect("tcp://127.0.0.1:5556");// 连接到指定Socketsubscriber.Subscribe(zipToSubscribeTo.ToString(CultureInfo.InvariantCulture));// 这里创建消息内容的过滤,如果不包含“zipToSubscribeTo”值则不接收消息。for (int i = 0; i < iterations; i++){string results = subscriber.ReceiveString(); // 如果消息以“zipToSubscribeTo”开头,则会返回整条信息。Console.Write(".");// "zip temp relh" ... "10001 84 23" -> ["10001", "84", "23"]string[] split = results.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);// 按照固定模式解码。int zip = int.Parse(split[0]);if (zip != zipToSubscribeTo){throw new Exception(string.Format("Received message for unexpected zipcode: {0} (expected {1})", zip, zipToSubscribeTo));}totalTemp += int.Parse(split[1]);totalHumidity += int.Parse(split[2]);}}

  这就是四种模式之一的发布者模式,使用起来很方便,但是这仅仅传递的是基于String的字符串,还不是一个可以序列化的对象,下一步我们将把消息字符串用Protobuf进行序列化与反序列化,来优化我们的消息格式。

请参考,我的示例代码中的Publisher Pattern工程:

  服务端Publisher:

                using (var context = NetMQContext.Create())using (var publisher = context.CreatePublisherSocket()) {publisher.Bind("tcp://127.0.0.1:5556");var rng = new Random();while (!stopRequested){ int zipcode = rng.Next(10000,10010); //Relpace: rng.Next(0, 99999);int temperature = rng.Next(-80, 135);int relhumidity = rng.Next(0, 90);#region Protobufvar weatherMsg = new NetMQAndProtocolBuffer.PBProtocol.WeatherMessage(){Command = PBProtocol.WeatherMessage.CommandType.Weather,Content = string.Format("{0} {1} {2}", zipcode, temperature, relhumidity), };using (var sm = new MemoryStream()){ProtoBuf.Serializer.Serialize<NetMQAndProtocolBuffer.PBProtocol.WeatherMessage>(sm, weatherMsg);publisher.Send(sm.ToArray());}#endregion// publisher.Send(string.Format("{0} {1} {2}", zipcode, temperature, relhumidity));
                       WriteLine(string.Format("Publisher send message: {0} {1} {2}", zipcode, temperature, relhumidity));System.Threading.Thread.Sleep(100);}}

View Code

  客户端Subscriber:

 using (var context = NetMQContext.Create())using (var subscriber = context.CreateSubscriberSocket()){subscriber.Connect("tcp://127.0.0.1:5556");subscriber.SubscribeToAnyTopic(); // No Command Filter, warn if not set thie method SubscribeToAnyTopic, it will receive nothing.while (true){if (curIndex > iterations) break;var weatherMsg = new NetMQAndProtocolBuffer.PBProtocol.WeatherMessage();var receivedBytes = subscriber.Receive();using (var sm = new MemoryStream(receivedBytes)){weatherMsg = ProtoBuf.Serializer.Deserialize<NetMQAndProtocolBuffer.PBProtocol.WeatherMessage>(sm);}// "zip temp relh" ... "10001 84 23" -> ["10001", "84", "23"]string[] split = weatherMsg.Content.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);int cmdId = int.Parse(split[0]);if (weatherMsg.Command == PBProtocol.WeatherMessage.CommandType.Weather){if (cmdId == zipToSubscribeTo){curIndex++;WriteLine(string.Format("Subscriber receive message: {0}", weatherMsg.Content));totalTemp += int.Parse(split[1]);totalHumidity += int.Parse(split[2]);}}}

  好了,其实单独来看,这两部分内容并为涉及的很深入,只是作为一个技术实践、技术储备,希望其中有问题或者有更好的应用场景,还请各位留言,不胜感谢!

  我的示例代码下载

冷静下来

这里补充一些不足:

  1. NetMQ中的过滤:默认NetMQ支持过滤,可是当我们摒弃String类型传递而转向Protobuf格式的时候NetMQ通道是无法解析其内容的,所以我们需要先解析内容,然后手写一些过滤代码,放弃了原生的支持。subscriber.SubscribeToAnyTopic()监听所有非过滤模式。
  2. NetMQ消息持久化:基于ZMQ的NetMQ设计理念中均不支持数据持久化(相比MSMQ而言,NetMQ不能接收当客户端不在线情况下的消息),所以如果需要持久化还需要做其他工作或者转战其他MQ家族。

引用

ZMQ:http://zguide.zeromq.org/page:all#Chapter-Sockets-and-Patterns

NetMQ:http://netmq.readthedocs.org/en/latest/introduction/

Protocol Buffer:http://www.ibm.com/developerworks/cn/linux/l-cn-gpb/

Stephen Liu:http://www.cnblogs.com/stephen-liu74/archive/2013/01/02/2841485.html

Protobuf-Net:https://code.google.com/p/protobuf-net/

转载于:https://www.cnblogs.com/cuiyansong/p/4326047.html

消息中间件NetMQ结合Protobuf简介相关推荐

  1. 【Android Protobuf 序列化】Protobuf 简介 ( Protobuf 项目简介 | Protobuf 优缺点分析 )

    文章目录 一.Protobuf 简介 二.Protobuf 优缺点分析 1.Protobuf 优点 2.Protobuf 缺点 三.参考资料 一.Protobuf 简介 Protobuf 是 Goog ...

  2. pythongoogle.probuf.timestamp_gRPC快速入门(一)——Protobuf简介

    gRPC快速入门(一)--Protobuf简介 一.Protobuf简介 1.Protobuf简介 Protobuf即Protocol Buffers,是Google公司开发的一种跨语言和平台的序列化 ...

  3. Protobuf—简介,优缺点

    protobuf 是Google旗下的一款平台无关.语言无关,可扩展的序列化结构数据格式.所以很适合用做数据存储和作为不同应用.不同语言之间相互通信的数据交换格式,只要实现相同的协议格式的统一prot ...

  4. 在C#中使用gRPC及protobuf简介

    简介 gRPC提供了很多的语言开发包,C#也可以很容易使用.结合使用protobuf及其编译器,很容易地生成了gRPC的服务stub和proxy. 在CSharp中使用gRPC和Protobuf,可以 ...

  5. 消息中间件原理及JMS简介之一

    摘要: 现今,越来越多的企业面临着各种各样的数据集成和系统整合,CORBA.DCOM.RMI等RPC中间件技术也应运而生,但由于采用RPC同步处理技术,在性能.健壮性.可扩展性上都存在着诸多缺点.而基 ...

  6. 消息中间件原理及JMS简介(2)

    本文首先介绍了消息中间件的原理,然后介绍了目前流行的消息中间件产品和一些开源实现.最后详细分析了SUN及其伙伴公司提出的旨在统一各种消息中间件系统接口的规范(JMS).   2.3 消息中间件的传递模 ...

  7. ProtoBuf简介

    简介 这篇文章浅显地介绍一下Google的数据序列化协议:ProtoBuf(Protocol Buffers),并通过一个简单的例子,展示如何使用这个协议. 数据序列化协议 数据序列化协议用于将数据结 ...

  8. Google 的开源技术protobuf 简介与例子

    2019独角兽企业重金招聘Python工程师标准>>> 今天来介绍一下"Protocol Buffers "(以下简称protobuf)这个玩意儿.本来俺在构思& ...

  9. Google 开源技术protobuf 简介与样例

    今天来介绍一下"Protocol Buffers "(以下简称protobuf)这个玩意儿.本来俺在构思"生产者/消费者模式 "系列的下一个帖子:关于生产者和消 ...

最新文章

  1. BQ27510 电量计的校准 的 C语言实现
  2. 写程序中的16张趣图。
  3. 2020中国互联网房产服务行业用户洞察报告
  4. Hibernate简易BaseDao演示单例
  5. 计算机丢失vcomp110.dll,msvcp110.dll丢失一键修复工具
  6. Java基础编程题目——找出姐妹素数
  7. Docker学习——docker入门
  8. 微带线特性阻抗计算公式_HFSS 计算 微带线 特征阻抗
  9. MySQL参数 之 innodb_buffer_pool_size
  10. 使用PYTHON图像识别实现车牌号码识别
  11. 什么是GAN(生成对抗网络)?
  12. 股票交易日志4 12.16
  13. 并发与多线程相关知识点梳理
  14. 猿创征文 | 项目整合KafkaStream实现文章热度实时计算
  15. matlab散点图加图例,将图例添加到图 - MATLAB Simulink - MathWorks 中国
  16. 牛客网刷编程题 2020年05月27日
  17. 计量经济学之时间序列分析学习笔记(单位根检验、协整检验、单整阶数判断、ECM建模)——基于R(二)
  18. 简单理解---JVM虚拟机
  19. 又一个Mac特洛伊木马被发现!苹果用户要警惕
  20. 两款不错的网络电视录制软件

热门文章

  1. windows7不支持AllocateAndGetTcpExTableFromStack
  2. GPU与CPU版本的矩阵乘法对比
  3. 记录 之 numpy查看数据类型和类型转换
  4. 概率论-1.2概率的定义及其确定方法
  5. cursoradpter自动更新
  6. Indetifier
  7. mac部署文件服务器,MAC 搭建本地服务器
  8. c语言如何让数组的两个数据调换位置_浅论数据结构
  9. Can't find dependent libraries
  10. Spark GraphX