一、为什么要使用MSMQ

在一个分布式的环境中,我们往往需要根据具体的 情况采用不同的方式进行数据的传输。比如在一个Intranet内,我们一般通过TCP进行高效的数据通信;而在一个Internet的环境中,我们则通 常使用Http进行跨平台的数据交换。而这些通信方式具有一个显著的特点,那就是他们是基于Connection的,也就是说,交互双方在进行通信的时候 必须保证有一个可用的Connection存在于他们之间。而在某些时候,比如那些使用拨号连接的用户、以及使用便携式计算机的用户,我们不能保证在他们 和需要访问的Server之间有一个的可靠的连接,在这种情况下,基于Messaging Queue的连接就显得尤为重要了。我们今天就来谈谈在WCF中如何使用MSMQ。

MSMQ不仅仅是作为支持客户端连接工具而存在,合理的使用MSMQ可以在很大程度上提升系统的Performance和Scalability。我们先来看看MSMQ能给我们带来怎样的好处:

1.MSMQ是基于Disconnection

MSMQ通过Message Queue进行通信,这种通信方式为离线工作成为了可能。比如在介绍MSMQ时都会提到的Order Delivery的例子:在一个基于B2C的系统中,订单从各种各样的客户传来,由于 客户的各异性,不能保证每个客户在每时每刻都和用于接收订单的Server保持一个可靠的连接,我们有时候甚至允许客户即使在离线的 情况下也可以递交订单(虽然订单不能发送到订单的接收方,但是我们可以通过某种机制保证先在本地保存该订单,一旦连接建立,则马上向接收方递交订单),而 MSMQ则有效地提供了这样的机制:Server端建立一个Message Queue来接收来个客户的订单,客户端通过向该Message Queue发送承载了订单数据的Message实现订单的递交。如果在客户离线的情况下,他仍然可以通过客户端程序进行订单递交的操作,存储着订单数据的 Message会被暂时保存在本地的Message Queue中,一旦客户联机,MSMQ将Message从中取出,发送到真正的接收方,而这个动作对于用户的透明的。

2.MSMQ天生是One-way、异步的

在MSMQ中,Message始终以One-way的方式进行发送,所以MSMQ具有天生的异步特性。所以MSMQ使用于那些对于用户的请求,Server端无需立即响应的场景。也就是说Server对数据的处理无需和Client的数据的发送进行同步,它可以独自地按照自己的Schedule进行工作。这可以避免峰值负载。比如Server端可以在一个相对低负载的时段(比如深夜)来对接收到的Order进行批处理,而无需一天24小时一直进行Order的监听、接收和处理。

3.MSMQ能够提供高质量的Reliable Messaging

我们知道,在一般的情况下,如果Client端 以异步的方式对Service进行调用就意味着:Client无法获知Message是否成功抵达Service端;也不会获得Service端执行的结 果和出错信息。但是我们仍然说MSMQ为我们提供了可靠的传输(Reliable Messaging),这主要是因为MSMQ为我们提供一些列Reliable Messaging的机制:

  • 超时机制(Timeout):可以设置发送和接收的时间,超出该时间则被认为操作失败。
  • 确认机制(Acknowledgement):当Message成功抵达Destination Queue,或者被成功接收,向发送端发送一个Acknowledgement message用以确认操作的状态。
  • 日志机制(Journaling):当Message被发送或接收后,被Copy一份存放在Journal Queue中。

此外,MSMQ还提供了死信队列(Dead letter Queue)用以保存发送失败的message。这一切保证了保证了Reliable Messaging。

二、 MSMQ在WCF的运用

在WCF中,MSMQ提供的数据传输功能被封装在一个Binding中,提供WCF Endpoint之间、以及Endpoint和现有的基于MSMQ的Application进行通信的实现。为此WCF为我们提供了两种不同的built-in binding:

  • NetMsmqBinding: 从提供的功能和使用 方式上看,NetMsmqBinding和一般使用的binding,比如basicHttpBinding,netTcpBinding没有什么区别: 在两个Endpoint之间实现了数据的通信,所不同的是,它提供的是基于MSMQ的Reliable Messaging。从变成模式上看,和一般的binding完全一样。
  • MsmqIntegrationBinding: 从命名上我们可以看出,MsmqIntegrationBinding主要用于需要将我们的WCF Application和现有的基于MSMQ的Application集成的情况。MsmqIntegrationBinding实现了WCF Endpoint和某个Message Queue进行数据的通信,具体来说,就是实现了单一的向某个Message Queue 发送Message,和从某个Message Queue中接收Message的功能。从编程模式上看,也有所不同,比如Operation只接收一个MsmqMessage<T>的参数。

这是Client和Service通信的图示:

三、MSMQ和Transaction

MSMQ提供对Transaction的支持。在一般的情况下,MSMQ通过Message Queue Transaction实现对Transaction的原生的支持,借助Message Queue Transaction,可以把基于一个或多个Message Queue的相关操作纳入同一个Transaction中。

Message Queue Transaction仅仅限于基于Message Queue的操作,倘若操作涉及到另外一些资源,比如SQL Server, 则可以使用基于DTC的分布式Transaction

对于WCF中MSMQ,由于Client和Service的相对独立(可能Client发送Message到Service处理Message会相隔很长一段时间),所以Client和Service的操作只能纳入不同的Transaction中,如下图。

四、Sample1:NetMsmqBinding

我们首先做一个基于NetMsmqBinding Sample,实现的功能就是我们开篇所提出的Order Delivery。我们说过,NetMsmqBinding和一般的binding在实现的功能和变成模式上完全一样。下面是我们熟悉的4层结构:


1.Contract

DataContract:Order & OrderItem

using System;
using System.Collections.Generic;
using System.Text;
using System.Runtime.Serialization;

namespace Artech.QueuedService.Contract
{
    [DataContract]
    [KnownType(typeof(OrderItem))]
    public class Order
    {
        Private Fields

        Constructors

        Public Properties

        Public Methods
    }
}
using System;
using System.Collections.Generic;
using System.Text;
using System.Runtime.Serialization;

namespace Artech.QueuedService.Contract
{
    [DataContract]
    public class OrderItem
    {
        Private Fields

        Constructors

        Public Properties
    }
}

ServiceContract: IOrderProcessor

using System;
using System.Collections.Generic;
using System.Text;
using System.ServiceModel;

namespace Artech.QueuedService.Contract
{
    [ServiceContract] 
    [ServiceKnownType(typeof(Order))]
    public interface IOrderProcessor
    {
        [OperationContract(IsOneWay = true)]
        void Submit(Order order);
    }
}

2.Service:IOrderProcessor

using System;
using System.Collections.Generic;
using System.Text;
using Artech.QueuedService.Contract;
using System.ServiceModel;

namespace Artech.QueuedService.Service
{
    public class OrderProcessorService:IOrderProcessor
    {
        ISubmitOrder Members
    }
}
using System;
using System.Collections.Generic;
using System.Text;
using Artech.QueuedService.Contract;

namespace Artech.QueuedService.Service
{
    public static class Orders
    {
        private static IDictionary<Guid, Order> _orderList = new Dictionary<Guid, Order>();

        public static void Add(Order order)
        {
            _orderList.Add(order.OrderNo, order);
        }

        public static Order GetOrder(Guid orderNo)
        {
            return _orderList[orderNo];
        }
    }
}

3.Hosting

Configuration

<?xml version="1.0" encoding="utf-8" ?>
<configuration>
    <system.serviceModel>
        <bindings>
            <netMsmqBinding>
                <binding name="msmqBinding">
                    <security>
                        <transport msmqAuthenticationMode="None" msmqProtectionLevel="None" />
                        <message clientCredentialType="None" />
                    </security>
                </binding>
            </netMsmqBinding>
        </bindings>
        <services>
            <service name="Artech.QueuedService.Service. OrderProcessorService">
                <endpoint address="net.msmq://localhost/private/orders" binding="netMsmqBinding"
                    bindingConfiguration="msmqBinding" contract="Artech.QueuedService.Contract.IOrderProcessor" />
            </service>
        </services>
    </system.serviceModel>
</configuration>

在默认的情况下,netMsmqBinding 的msmqAuthenticationModeWindowsDomain,由于基于WindowsDomain必须安装AD,利于在本机模拟,我把msmqAuthenticationMode改为None,相应的ProtectionLevel和clientCredentialType改为None。

Program:

using System;
using System.Collections.Generic;
using System.Text;
using System.Messaging;
using System.ServiceModel;
using Artech.QueuedService.Service;

namespace Artech.QueuedService.Hosting
{
    class Program
    {
        static void Main(string[] args)
        {
            string path = @".\private$\orders";
            if(!MessageQueue.Exists(path))
            {
                MessageQueue.Create(path,true);
            }

            using (ServiceHost host = new ServiceHost(typeof(OrderProcessorService)))
            {
                host.Opened += delegate
                {
                    Console.WriteLine("Service has begun to listen\n\n");
                };

                host.Open();

                Console.Read();
            }
        }
    }
}

在Host Service之前,通过MessageQueue.Create创建一个Message Queue,第二个参数为表明Queue是否支持Transaction的indicator,这里支持Transaction。

4.Client:

Configuration

<?xml version="1.0" encoding="utf-8" ?>
<configuration>
    <system.serviceModel>
        <bindings>
            <netMsmqBinding>
                <binding name="msmqBinding">
                    <security>
                        <transport msmqAuthenticationMode="None" msmqProtectionLevel="None" />
                        <message clientCredentialType="None" />
                    </security>
                </binding>
            </netMsmqBinding>
        </bindings>
        <client>
            <endpoint address="net.msmq://localhost/private/orders" binding="netMsmqBinding"
                bindingConfiguration="msmqBinding" contract="Artech.QueuedService.Contract.IOrderProcessor"
                name="defaultEndpoint" />
        </client>
    </system.serviceModel>
</configuration>

Program

using System;
using System.Collections.Generic;
using System.Text;
using Artech.QueuedService.Contract;
using System.ServiceModel;
using System.Transactions;

namespace Artech.QueuedService.Client
{
    class Program
    {
        static void Main(string[] args)
        {
            ChannelFactory<IOrderProcessor> channelFactory = new ChannelFactory<IOrderProcessor>("defaultEndpoint");
            IOrderProcessor channel = channelFactory.CreateChannel();

            Order order = new Order(Guid.NewGuid(),DateTime.Today,Guid.NewGuid(),"A Company");
            order.OrderItems.Add(new OrderItem(Guid.NewGuid(),"PC",5000,20));
            order.OrderItems.Add(new OrderItem(Guid.NewGuid(),"Printer",7000,2));

            Console.WriteLine("Submit order to server");
            using (TransactionScope scope = new TransactionScope(TransactionScopeOption.Required))
            { 
                 channel.Submit(order);
                 scope.Complete();
            }       
            Console.Read();
        }
    }
}

先后运行Host和Client,Host端有下面的输出:

转载于:https://www.cnblogs.com/zpc870921/archive/2012/09/10/2678629.html

我的WCF之旅(12):使用MSMQ进行Reliable Messaging(转载)相关推荐

  1. 我的WCF之旅(7):面向服务架构(SOA)和面向对象编程(OOP)的结合——如何实现Service Contract的继承...

    当今的IT领域,SOA已经成为了一个非常时髦的词,对SOA风靡的程度已经让很多人对SOA,对面向服务产生误解.其中很大一部分人甚至认为面向服务将是面向对象的终结,现在的面向对象将会被面向服务完全代替. ...

  2. 我的WCF之旅(6):在Winform Application中调用Duplex Service出现TimeoutException的原因和解决方案...

    几个星期之前写了一篇关于如何通过WCF进行 双向通信的文章([原创]我的WCF之旅(3):在WCF中实现双向通信(Bi-directional Communication) ),在文章中我提供了一个如 ...

  3. 我的WCF之旅 (11): 再谈WCF的双向通讯-基于Http的双向通讯 V.S. 基于TCP的双向通讯...

    在一个基于面向服务的分布式环境中,借助一个标准的.平台无关的Communication Infrastructure,各个Service通过SOAP Message实现相互之间的交互.这个交互的过程实 ...

  4. 《我的WCF之旅》博文系列汇总

    WCF是构建和运行互联系统的一系列技术的总称,它是建立在Web Service架构上的一个全新的通信平台.你可以把它看成是.NET平台上的新一代的Web Service.WCF为我们提供了安全.可靠的 ...

  5. 我的WCF之旅(10):如何在WCF进行Exception Handling

    在任何Application的开发中,对不可预知的异常进行troubleshooting时,异常处理显得尤为重要.对于一般的.NET系统来说,我们简单地借助try/catch可以很容易地实现这一功能. ...

  6. 我的WCF之旅(13):创建基于MSMQ的Responsive Service

    一.One-way MEP V.S. Responsible Service 我们知道MSMQ天生就具有异步的特性,它只能以One-way的MEP(Message Exchange Pattern)进 ...

  7. [转载]我的WCF之旅(3):在WCF中实现双工通信

    http://www.cnblogs.com/artech/archive/2007/03/02/661969.html 双工(Duplex)模式的消息交换方式体现在消息交换过程中,参与的双方均可以向 ...

  8. 我的WCF之旅(1):创建一个简单的WCF程序

    http://www.cnblogs.com/artech/archive/2007/02/26/656901.html 为了使读者对基于WCF的编程模型有一个直观的映像,我将带领读者一步一步地创建一 ...

  9. 我的WCF之旅(4):WCF中的序列化[下篇]

    ... ...续Part I XMLSerializer 提到XMLSerializer,我想绝大多数人都知道这是asmx采用的Serializer.首先我们还是来看一个例子,通过比较Managed ...

最新文章

  1. c语言 获得回车按键控制输入法,android调用输入软键盘回车键跟删除键
  2. 老鼠之Atlas之旅(一):Atlas官方站点
  3. Java基础(二) 程序格式
  4. mysql Decimal(M,D)解释
  5. 多台工作站搭建MPI并行环境
  6. 数百个 HTML5 例子学习 HT 图形组件 – 拓扑图篇
  7. 整理 | 程序员必读书单1.0
  8. PAT乙级题目——1002写出这个数
  9. docker容器的跨主机访问
  10. 西电计算机学硕毕业要求,关于计算机科学与技术学院2020年12月研究生学位申请工作的通知...
  11. vue 判断设备是手机端还是pc端
  12. 樊登读书会用事实说话读后感_用事实说话樊登读书笔记
  13. MATLAB期末复习
  14. Excel设置数据有效性实现单元格下拉菜单的3种方法
  15. 记一次RATEL脱壳配合Il2CppDumper解密完成的样本分析
  16. 【CUDA开发】CUDA的安装、Nvidia显卡型号及测试
  17. 辉芒微IO单片机FT60F121-RB
  18. 面向对象的三大基本特性,五大基本原则。
  19. php提取文章图片作缩略图,Wordpress自动提取文章内第一张图作为缩略图方法 | WordPress指南...
  20. proteus教程——读写AT24C02

热门文章

  1. mui框架中dialog框的实现
  2. Appium base knowledge
  3. C# .NET 根据Url链接保存Image图片到本地磁盘
  4. ibatis时间比较大小
  5. jenkins+findbugs+checkstyle+PMD静态代码检查(二)
  6. UVa 242 邮票和信封(完全背包)
  7. 类似于HttpModule的注入方式的事件机制示例
  8. Android模拟器无法上网问题
  9. MaxCompute,基于Serverless的高可用大数据服务
  10. window编程_消息分类