一、One-way MEP V.S. Responsible Service

我们知道MSMQ天生就具有异步的特性,它只能以One-way的MEP(Message Exchange Pattern)进行通信。Client和Service之间采用One-way MEP的话就意味着Client调用Service之后立即返回,它无法获得Service的执行结果,也无法捕捉Service运行的Exception。下图简单表述了基于MSMQ的WCF Service中Client和Service的交互。


但是在有些场景 中,这是无法容忍的。再拿我在上一篇文章的Order Delivery的例子来说。Client向Service提交了Order,却无法确认该Order是否被Service正确处理,这显然是不能接受的。我们今天就来讨论一下,如何创建一个Responsive Service来解决这个问题:Client不再是对Service的执行情况一无所知,它可以获知Order是否被Service正确处理了。

二、 Solution

虽然我们的目的很简单:当Client向Service递交了Order之后,能以某种方式获知Order的执行结果;对于Service端来说,在正确把Order从Message Queue中获取出来、并正确处理之后,能够向Order的递交者发送一个Acknowledge Message。为了简单起见,这个Acknowledge Message包含两组信息:

  • Order No.: 被处理的Order的一个能够为一标志它的ID。
  • Exception: 如果处理失败的Exception,如果成功处理为null。

要在WCF中实现这样的目的,对于Request/Reply MEP来说是简单而直接的:Client向Service递交Order,并等待Service的Response,Service在处理接收到Order之后直接将处理结果 返回给Client就可以了。但是我们说过MSMQ天生就是异步的,我们只有采取一种间接的方式实现“曲线救国”。

我们的解决方案是:在每个Client Domain也创建一个基于MSMQ的本地的WCF Service,用于接收来自Order处理端发送的Acknowledge Message。对于处理Order 的Service来说,在正确处理Order之后,想对应的Client发送Acknowledge Message。下图简单演示整个过程:


三、Implementation

了解了上面的Solution之后,我们来看看该Solution在真正实现过程中有什么样的困难。对于处理Order的Service来说,在向Client端发送Acknowledge Message的时候,它必须要知道该Order对应的Client的Response Service的MSMQ的Address以及其他和Operation相关的Context信息(在这里我们不需要,不过考虑到扩展性,我们把包括了address的Context的信息 封装到一个了Class中,在这里叫做:OrderResponseContext)。而这些Context却不能在Configuration中进行配置,因为他可以同时面临着很多个Client:比如每个Client用于接收Response 的Message Queue的address都不一样。所以这个OrderResponseContext必须通过对应的Client来提供。基于此,我们具有两面两种解决方式:

方式一、修改Service Contract,把OrderResponseContext当成是Operation的一个参数

这是我们最容易想到的,比如我们原来的Operation这样定义:

namespace Artech.ResponsiveQueuedService.Contract
{
    [ServiceContract]
    [ServiceKnownType(typeof(Order))]
    public interface IOrderProcessor
    {
        [OperationContract(IsOneWay = true)]
        void Submit(Order order);
    }
}

现在变成:

namespace Artech.ResponsiveQueuedService.Contract
{
    [ServiceContract]
    [ServiceKnownType(typeof(Order))]
    public interface IOrderProcessor
    {
        [OperationContract(IsOneWay = true)]
        void Submit(Order order, OrderResponseContext responseContext);
    }
}

虽然这种方式看起来不错,但是却不值得推荐。在一般情况下,我们的Contract需要是很稳定的,一经确定就不能轻易更改,因为Contract是被交互的多方共同支持的,牵一发动全身;此外,从Service Contract代表的是Service的一个Interface,他是对业务逻辑的抽象、和具体实现无关,而对于我们的例子来说,我们仅仅是定义一个递交Order的Operation,从业务逻辑来看,OrderResponseContext和抽象的业务逻辑毫无关系。基于此,我们需要寻求一种和Service Contract无关的解决方式:

方式二、将OrderResponseContext放到Soap Message 的Header中

其实我们要解决的问题很简单,就是要把OrderResponseContext的信息置于Soap Message中发送到Service。而我们知道,Soap的Header具有极强的可伸缩性,原则上,我们可以把任何控制信息置于Header中。基于WCF的编程模式很容易地帮助我们实现对Soap Header的插入和获取:

我们可以通过下面的方式获得当前Operation Context的Incoming Message Headers和Outgoing Message Headers

OperationContext.Current.IncomingMessageHeaders
OperationContext.Current.OutgoingMessageHeaders

如果我们要把一个OrderResponseContext 对象插入到当前Operation Context的Outgoing Message Headers中,我们可以通过下面的代码来实现:

OrderResponseContext context = new OrderResponseContext();
MessageHeader<OrderResponseContext> header = new MessageHeader<OrderResponseContext>( context);
OperationContext.Current.OutgoingMessageHeaders.Add(header.GetUntypedHeader("name", "namespace"));

相应的,我们可以通过下面的代码从Outgoing Message Headers OrderResponseContext的数据获取的内容:

OrderResponseContext context = OperationContext.Current.IncomingMessageHeaders.GetHeader<OrderResponseContext>("name", "namespace"));

四、Sample

我们照例给出一个完整的Sample,下面是整个Solution的结构:


除了一贯使用的4层结构(Contract-Service-Hosting-Client),还为ResponseService增加了下面两层:

  • Localservice: 作为Client Domain的ResponseService。
  • LocalHosting:Host Localservice。

1.Contract:  Artech.ResponsiveQueuedService.Contract

Service Contract: Artech.ResponsiveQueuedService.Contract. IOrderProcessor

using System;
using System.Collections.Generic;
using System.Text;
using System.ServiceModel;

namespace Artech.ResponsiveQueuedService.Contract
{
    [ServiceContract]
    [ServiceKnownType(typeof(Order))]
    public interface IOrderProcessor
    {
        [OperationContract(IsOneWay = true)]
        void Submit(Order order);
    }
}

Service Contract: Artech.ResponsiveQueuedService.Contract.IOrderRessponse

using System;
using System.Collections.Generic;
using System.Text;
using System.ServiceModel;

namespace Artech.ResponsiveQueuedService.Contract
{
    [ServiceContract]
    public interface  IOrderRessponse
    {
        [OperationContract(IsOneWay =true)]
        void SubmitOrderResponse(Guid orderNo,FaultException exception);
    }
}

接收来自Order processing端的Response:Order No.和Exception。

Data Contract: Artech.ResponsiveQueuedService.Contract.Order

using System;
using System.Collections.Generic;
using System.Text;
using System.Runtime.Serialization;

namespace Artech.ResponsiveQueuedService.Contract
{
    [DataContract]
    public class Order
    {
        Private Fields#region Private Fields
        private Guid _orderNo;
        private DateTime _orderDate;
        private Guid _supplierID;
        private string _supplierName;
        #endregion

        Constructors#region Constructors
        public Order(Guid orderNo, DateTime orderDate, Guid supplierID, string supplierName)
        {
            this._orderNo = orderNo;
            this._orderDate = orderDate;
            this._supplierID = supplierID;
            this._supplierName = supplierName;
        }

        #endregion

        Public Properties#region Public Properties
        [DataMember]
        public Guid OrderNo
        {
            get { return _orderNo; }
            set { _orderNo = value; }
        }

        [DataMember]
        public DateTime OrderDate
        {
            get { return _orderDate; }
            set { _orderDate = value; }
        }

        [DataMember]
        public Guid SupplierID
        {
            get { return _supplierID; }
            set { _supplierID = value; }
        }

        [DataMember]
        public string SupplierName
        {
            get { return _supplierName; }
            set { _supplierName = value; }
        }
        #endregion

        Public Methods#region Public Methods
        public override string ToString()
        {
            string description = string.Format("Order No./t: {0}/n/tOrder Date/t: {1}/n/tSupplier No./t: {2}/n/tSupplier Name/t: {3}", 
                this._orderNo, this._orderDate.ToString("yyyy/MM/dd"), this._supplierID, this._supplierName);
            return description;
        }
        #endregion
    }
}

对Order的封装。

Data Contract:Artech.ResponsiveQueuedService.Contract. OrderResponseContext

using System;
using System.Collections.Generic;
using System.Text;
using System.Runtime.Serialization;
using System.ServiceModel;

namespace Artech.ResponsiveQueuedService.Contract
{    
    [DataContract]
    public class OrderResponseContext
    {
        private Uri _responseAddress;

        [DataMember]
        public Uri ResponseAddress
        {
            get { return _responseAddress; }
            set { _responseAddress = value; }
        }

        public static OrderResponseContext Current
        {
            get
            {
                if (OperationContext.Current == null)
                {
                    return null;
                }

                return OperationContext.Current.IncomingMessageHeaders.GetHeader<OrderResponseContext>("OrderResponseContext", "Artech.ResponsiveQueuedService.Contract");
            }
            set
            {
                MessageHeader<OrderResponseContext> header = new MessageHeader<OrderResponseContext>(value);
                OperationContext.Current.OutgoingMessageHeaders.Add(header.GetUntypedHeader("OrderResponseContext", "Artech.ResponsiveQueuedService.Contract"));
            }
        }
    }
}

ResponseAddress代表Host在Client Domain的Response Service的Address。同过Current把OrderResponseContext插入到Outgoing Message Headers中、以及从Ingoing Message Headers取出OrderResponseContext对象。

2.Order Processing Service:Artech.ResponsiveQueuedService.Service

using System;
using System.Collections.Generic;
using System.Text;
using Artech.ResponsiveQueuedService.Contract;
using System.ServiceModel;
using System.Net.Security;

namespace Artech.ResponsiveQueuedService.Service
{
    public class OrderProcessorService:IOrderProcessor
    {
        private void ProcessOrder(Order order)
        {

            if (order.OrderDate < DateTime.Today)
            {
                throw new Exception();
            }
        }

        IOrderProcessor Members#region IOrderProcessor Members

        public void Submit(Order order)
        {
            Console.WriteLine("Begin to process the order of the order No.: {0}", order.OrderNo);
            FaultException exception= null;
            if (order.OrderDate < DateTime.Today)
            {
                exception = new FaultException(new FaultReason("The order has expried"), new FaultCode("sender"));
                Console.WriteLine("It's fail to process the order./n/tOrder No.: {0}/n/tReason:{1}", order.OrderNo, "The order has expried");
            }
            else
            {
                Console.WriteLine("It's successful to process the order./n/tOrder No.: {0}", order.OrderNo);
            }

            NetMsmqBinding binding = new NetMsmqBinding();
            binding.ExactlyOnce = false;
            binding.Security.Transport.MsmqAuthenticationMode = MsmqAuthenticationMode.None;
            binding.Security.Transport.MsmqProtectionLevel = ProtectionLevel.None;
            ChannelFactory<IOrderRessponse> channelFacotry = new ChannelFactory<IOrderRessponse>(binding);
            OrderResponseContext responseContext = OrderResponseContext.Current;
            IOrderRessponse channel = channelFacotry.CreateChannel(new EndpointAddress(responseContext.ResponseAddress));

            using (OperationContextScope contextScope = new OperationContextScope(channel as IContextChannel))
            {
                channel.SubmitOrderResponse(order.OrderNo, exception);
            }
        }
        #endregion
    }
}

在这里我们模拟了这样的场景:先通过Order Date判断Order是否过期,如果过期创建一个FaultException,否则正确处理该Order,然后通过OrderResponseContext.Current从Incoming Message Header中获取封装在OrderResponseContext对象中的Response Address,创建Binding并调用Response Service.

3. Order Processing Service Hosting: Artech.ResponsiveQueuedService.Hosting

Configuration

<?xml version="1.0" encoding="utf-8" ?>
<configuration>
  <appSettings>
    <add key="msmqPath" value="./private$/orderprocessor"/>
  </appSettings>
  <system.serviceModel>
    <bindings>
      <netMsmqBinding>
        <binding name="MsmqBinding" exactlyOnce="false" useActiveDirectory="false">
          <security>
            <transport msmqAuthenticationMode="None" msmqProtectionLevel="None" />
          </security>
        </binding>
      </netMsmqBinding>
    </bindings>
    <services>
      <service name="Artech.ResponsiveQueuedService.Service.OrderProcessorService">
        <endpoint address="net.msmq://localhost/private/orderprocessor" binding="netMsmqBinding"
            bindingConfiguration="MsmqBinding" contract="Artech.ResponsiveQueuedService.Contract.IOrderProcessor" />
      </service>
    </services>
  </system.serviceModel>
</configuration>

Program

using System;
using System.Collections.Generic;
using System.Text;
using Artech.ResponsiveQueuedService.Service;
using System.ServiceModel;
using System.Configuration;
using System.Messaging;

namespace Artech.ResponsiveQueuedService.Hosting
{
    class Program
    {
        static void Main(string[] args)
        {
            string path = ConfigurationManager.AppSettings["msmqPath"];
            if (!MessageQueue.Exists(path))
            {
                MessageQueue.Create(path);
            }

            using (ServiceHost host = new ServiceHost(typeof(OrderProcessorService)))
            {
                host.Opened += delegate
                {
                    Console.WriteLine("The Order Processor service has begun to listen");
                };

                host.Open();

                Console.Read();
            }
        }
    }
}

4. Response Service: Artech.ResponsiveQueuedService.LocalService.OrderRessponseService

using System;
using System.Collections.Generic;
using System.Text;
using Artech.ResponsiveQueuedService.Contract;
using System.ServiceModel;

namespace Artech.ResponsiveQueuedService.LocalService
{
    public class OrderRessponseService : IOrderRessponse
    {
        IOrderRessponse Members#region IOrderRessponse Members

        public void SubmitOrderResponse(Guid orderNo, FaultException exception)
        {
            if (exception == null)
            {
                Console.WriteLine("It's successful to process the order!/n/tOrder No.: {0}",orderNo);
            }
            else
            {
                Console.WriteLine("It's fail to process the order!/n/tOrder No.: {0}/n/tReason: {1}", orderNo, exception.Message);
            }
        }

        #endregion
    }
}

5. Response Service Hosting: Artech.ResponsiveQueuedService.LocalhHosting

Configuration

<?xml version="1.0" encoding="utf-8" ?>
<configuration>
  <appSettings>
    <add key="msmqPath" value="./private$/orderresponse"/>
  </appSettings>
  <system.serviceModel>
    <bindings>
      <netMsmqBinding>
        <binding name="msmqBinding" exactlyOnce="false">
          <security>
            <transport msmqAuthenticationMode="None" msmqProtectionLevel="None" />
          </security>
        </binding>
      </netMsmqBinding>
    </bindings>
    <services>
      <service name="Artech.ResponsiveQueuedService.LocalService.OrderRessponseService">
        <endpoint address="net.msmq://localhost/private/orderresponse" binding="netMsmqBinding"
            bindingConfiguration="msmqBinding" contract="Artech.ResponsiveQueuedService.Contract.IOrderRessponse" />
      </service>
    </services>
  </system.serviceModel>
</configuration>

Program

using System;
using System.Collections.Generic;
using System.Text;
using Artech.ResponsiveQueuedService.LocalService;
using System.Configuration;
using System.ServiceModel;
using System.Messaging;

namespace Artech.ResponsiveQueuedService.LocalhHosting
{
    class Program
    {
        static void Main(string[] args)
        {
            string path = ConfigurationManager.AppSettings["msmqPath"];
            if (!MessageQueue.Exists(path))
            {
                MessageQueue.Create(path);
            }

            using (ServiceHost host = new ServiceHost(typeof(OrderRessponseService)))
            {
                host.Opened += delegate
                {
                    Console.WriteLine("The Order Response service has begun to listen");
                };

                host.Open();

                Console.Read();
            }
        }
    }
}

6. Client: Artech.ResponsiveQueuedService.Client

Configuration:

<?xml version="1.0" encoding="utf-8" ?>
<configuration>
  <appSettings>
    <add key="msmqPath" value="net.msmq://localhost/private/orderresponse"/>
  </appSettings>
  <system.serviceModel>
    <bindings>
      <netMsmqBinding>
        <binding name="MsmqBinding" exactlyOnce="false" useActiveDirectory="false">
          <security>
            <transport msmqAuthenticationMode="None" msmqProtectionLevel="None" />
          </security>
        </binding>
      </netMsmqBinding>
    </bindings>
    <client>
      <endpoint address="net.msmq://localhost/private/orderprocessor" binding="netMsmqBinding"
            bindingConfiguration="MsmqBinding" contract="Artech.ResponsiveQueuedService.Contract.IOrderProcessor" name="defaultEndpoint" />
    </client>
  </system.serviceModel>
</configuration>

Program:

using System;
using System.Collections.Generic;
using System.Text;
using System.Configuration;
using System.ServiceModel;
using Artech.ResponsiveQueuedService.Contract;
using System.Messaging;

namespace Artech.ResponsiveQueuedService.Clinet
{
    class Program
    {
        static void Main(string[] args)
        {
            Order order1 = new Order(Guid.NewGuid(), DateTime.Today.AddDays(5), Guid.NewGuid(), "Supplier A");
            Order order2 = new Order(Guid.NewGuid(), DateTime.Today.AddDays(-5), Guid.NewGuid(), "Supplier A");

            string path = ConfigurationManager.AppSettings["msmqPath"];
            Uri address = new Uri(path);
            OrderResponseContext context = new OrderResponseContext();
            context.ResponseAddress = address;

            ChannelFactory<IOrderProcessor> channelFactory = new ChannelFactory<IOrderProcessor>("defaultEndpoint");
            IOrderProcessor orderProcessor = channelFactory.CreateChannel();

            using (OperationContextScope contextScope = new OperationContextScope(orderProcessor as IContextChannel))
            {
                Console.WriteLine("Submit the order of order No.: {0}", order1.OrderNo);
                OrderResponseContext.Current = context;
                orderProcessor.Submit(order1);
            }

            using (OperationContextScope contextScope = new OperationContextScope(orderProcessor as IContextChannel))
            {
                Console.WriteLine("Submit the order of order No.: {0}", order2.OrderNo);
                OrderResponseContext.Current = context;
                orderProcessor.Submit(order2);
            }

            Console.Read();
        }
    }
}

我创建了两个Order对象, 其中一个已经过期。从Configuration中取出Response Address并购建一个OrderResponseContext,然后分两次将这两个Order向Order Processing Service递交。在调用Order Processing Order的Operation Context Scope中,通过OrderResponseContext.Current将OrderResponseContext对象插入Outcoming Message Header中。

我们现在运行一下整个程序,看看最终的输出结果:

Client:


Order Processing:


Order Response:

Reference:
Build a Queued WCF Response Service

作者:Artech
出处:http://artech.cnblogs.com
本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。

我的WCF之旅(13):创建基于MSMQ的Responsive Service相关推荐

  1. 我的WCF之旅 (11): 再谈WCF的双向通讯-基于Http的双向通讯 V.S. 基于TCP的双向通讯...

    在一个基于面向服务的分布式环境中,借助一个标准的.平台无关的Communication Infrastructure,各个Service通过SOAP Message实现相互之间的交互.这个交互的过程实 ...

  2. 我的WCF之旅(7):面向服务架构(SOA)和面向对象编程(OOP)的结合——如何实现Service Contract的继承...

    当今的IT领域,SOA已经成为了一个非常时髦的词,对SOA风靡的程度已经让很多人对SOA,对面向服务产生误解.其中很大一部分人甚至认为面向服务将是面向对象的终结,现在的面向对象将会被面向服务完全代替. ...

  3. 我的WCF之旅(6):在Winform Application中调用Duplex Service出现TimeoutException的原因和解决方案...

    几个星期之前写了一篇关于如何通过WCF进行 双向通信的文章([原创]我的WCF之旅(3):在WCF中实现双向通信(Bi-directional Communication) ),在文章中我提供了一个如 ...

  4. 《我的WCF之旅》博文系列汇总

    WCF是构建和运行互联系统的一系列技术的总称,它是建立在Web Service架构上的一个全新的通信平台.你可以把它看成是.NET平台上的新一代的Web Service.WCF为我们提供了安全.可靠的 ...

  5. 我的WCF之旅(10):如何在WCF进行Exception Handling

    在任何Application的开发中,对不可预知的异常进行troubleshooting时,异常处理显得尤为重要.对于一般的.NET系统来说,我们简单地借助try/catch可以很容易地实现这一功能. ...

  6. 我的WCF之旅(12):使用MSMQ进行Reliable Messaging(转载)

    一.为什么要使用MSMQ 在一个分布式的环境中,我们往往需要根据具体的 情况采用不同的方式进行数据的传输.比如在一个Intranet内,我们一般通过TCP进行高效的数据通信:而在一个Internet的 ...

  7. [转载]我的WCF之旅(3):在WCF中实现双工通信

    http://www.cnblogs.com/artech/archive/2007/03/02/661969.html 双工(Duplex)模式的消息交换方式体现在消息交换过程中,参与的双方均可以向 ...

  8. 13款基于jQuery Mobile的布局插件和示例

    现在已经进入了移动互联网时代,因此将你的网站迁移到移动设备上就显得比较重要的.问题是,如何在移动设备的小屏幕中呈现你的网站中的所有内容呢? 本文介绍13款基于jQuery Mobile的布局插件和示例 ...

  9. FMS3系列(三):创建基于FMS的流媒体播放程序

    本文主要介绍怎么去创建基于FMS的流媒体播放程序,Flash客户端通过网络加载FMS服务器上的视频流文件(.flv,.mp4等),实现视频流的播放. 要实现媒体流文件的播放是非常简单的,只要在FMS服 ...

最新文章

  1. 独家 | 7个新手数据讲述者犯下的致命错误
  2. 原相机水印怎么改字_抖音/自媒体做影视二次剪辑,如何下载高清无水印视频?...
  3. Markdown 编辑器才是yyds|CSDN编辑器测评
  4. 对花不准(错版、套版不准、错花)的原因
  5. 面向终端计算机网络的应用原理,山东科技大学计算机网络原理及应用复习提纲.doc...
  6. 真香啊,20张高清数据分析全知识地图,要学的东西全都概况了
  7. 产品读书《粉丝经济:传统企业转型互联网的突破口》
  8. vue 项目中高德地图 API 使用流程
  9. 什么是数字孪生技术?
  10. 常用 vm 参数分析
  11. 桂电深信服CTF之MSC真假压缩包
  12. Zynq-PS-SDK(4) 之 GIC 配置
  13. win7系统蓝屏故障以及常见的解决方案
  14. echarts中对整个图形旋转90°(实现div旋转90度)
  15. php str splice,PHP array_splice()函数使用方法
  16. Git本地实现服务器搭建
  17. 《鹿鼎记》的第一遍读后感作文3600字
  18. 【算法笔记】全域哈希表
  19. 斐逊K2 刷breed
  20. mysql routeros_转-RouterOS流量控制方案

热门文章

  1. oracle ob 使用基础之基础
  2. NumPy 百题大冲关,冲鸭!
  3. 解决ssh7.4升级8.5后环境变量失效和无法登录问题
  4. Nginx负载均衡的原理及流程分析
  5. Nginx的官方简介
  6. 选择器Selector
  7. JMM中有哪些方法建立happen-before规则
  8. 项目中applicaiton.yml配置文件详细讲解
  9. AOP日志-前置通知操作
  10. SpringMVC拦截器之介绍和搭建环境