我的WCF之旅(13):創建基于MSMQ的Responsive Service
我們知道MSMQ天生就具有異步的特性,它只能以One-way的MEP(Message Exchange Pattern)進行通信。Client和Service之間采用One-way MEP的話就意味著Client調用Service之后立即返回,它無法獲得Service的執行結果,也無法捕捉Service運行的Exception。下圖簡單表述了基于MSMQ的WCF Service中Client和Service的交互。

但是在有些場景 中,這是無法容忍的。再拿我在上一篇文章的Order Delivery的例子來說。Client向Service提交了Order,卻無法確認該Order是否被Service正確處理,這顯然是不能接受的。我們今天就來討論一下,如何創建一個Responsive Service來解決這個問題:Client不再是對Service的執行情況一無所知,它可以獲知Order是否被Service正確處理了。
二、 Solution
雖然我們的目的很簡單:當Client向Service遞交了Order之后,能以某種方式獲知Order的執行結果;對于Service端來說,在正確把Order從Message Queue中獲取出來、并正確處理之后,能夠向Order的遞交者發送一個Acknowledge Message。為了簡單起見,這個Acknowledge Message包含兩組信息:
-
Order No.: 被處理的Order的一個能夠為一標志它的ID。
-
Exception: 如果處理失敗的Exception,如果成功處理為null。
要在WCF中實現這樣的目的,對于Request/Reply MEP來說是簡單而直接的:Client向Service遞交Order,并等待Service的Response,Service在處理接收到Order之后直接將處理結果 返回給Client就可以了。但是我們說過MSMQ天生就是異步的,我們只有采取一種間接的方式實現“曲線救國”。
我們的解決方案是:在每個Client Domain也創建一個基于MSMQ的本地的WCF Service,用于接收來自Order處理端發送的Acknowledge Message。對于處理Order 的Service來說,在正確處理Order之后,想對應的Client發送Acknowledge Message。下圖簡單演示整個過程:

三、Implementation
了解了上面的Solution之后,我們來看看該Solution在真正實現過程中有什么樣的困難。對于處理Order的Service來說,在向Client端發送Acknowledge Message的時候,它必須要知道該Order對應的Client的Response Service的MSMQ的Address以及其他和Operation相關的Context信息(在這里我們不需要,不過考慮到擴展性,我們把包括了address的Context的信息 封裝到一個了Class中,在這里叫做:OrderResponseContext)。而這些Context卻不能在Configuration中進行配置,因為他可以同時面臨著很多個Client:比如每個Client用于接收Response 的Message Queue的address都不一樣。所以這個OrderResponseContext必須通過對應的Client來提供。基于此,我們具有兩面兩種解決方式:
方式一、修改Service Contract,把OrderResponseContext當成是Operation的一個參數
這是我們最容易想到的,比如我們原來的Operation這樣定義:
namespace Artech.ResponsiveQueuedService.Contract
{
[ServiceContract]
[ServiceKnownType(typeof(Order))]
public interface IOrderProcessor
{
[OperationContract(IsOneWay = true)]
void Submit(Order order);
}
}
現在變成:
namespace Artech.ResponsiveQueuedService.Contract
{
[ServiceContract]
[ServiceKnownType(typeof(Order))]
public interface IOrderProcessor
{
[OperationContract(IsOneWay = true)]
void Submit(Order order, OrderResponseContext responseContext);
}
}
雖然這種方式看起來不錯,但是卻不值得推薦。在一般情況下,我們的Contract需要是很穩定的,一經確定就不能輕易更改,因為Contract是被交互的多方共同支持的,牽一發動全身;此外,從Service Contract代表的是Service的一個Interface,他是對業務邏輯的抽象、和具體實現無關,而對于我們的例子來說,我們僅僅是定義一個遞交Order的Operation,從業務邏輯來看,OrderResponseContext和抽象的業務邏輯毫無關系。基于此,我們需要尋求一種和Service Contract無關的解決方式:
方式二、將OrderResponseContext放到Soap Message 的Header中
其實我們要解決的問題很簡單,就是要把OrderResponseContext的信息置于Soap Message中發送到Service。而我們知道,Soap的Header具有極強的可伸縮性,原則上,我們可以把任何控制信息置于Header中。基于WCF的編程模式很容易地幫助我們實現對Soap Header的插入和獲取:
我們可以通過下面的方式獲得當前Operation Context的Incoming Message Headers和Outgoing Message Headers
OperationContext.Current.IncomingMessageHeaders
OperationContext.Current.OutgoingMessageHeaders
如果我們要把一個OrderResponseContext 對象插入到當前Operation Context的Outgoing Message Headers中,我們可以通過下面的代碼來實現:
OrderResponseContext context = new OrderResponseContext();
MessageHeader<OrderResponseContext> header = new MessageHeader<OrderResponseContext>( context);
OperationContext.Current.OutgoingMessageHeaders.Add(header.GetUntypedHeader("name", "namespace"));
相應的,我們可以通過下面的代碼從Outgoing Message Headers OrderResponseContext的數據獲取的內容:
OrderResponseContext context = OperationContext.Current.IncomingMessageHeaders.GetHeader<OrderResponseContext>("name", "namespace"));四、Sample
我們照例給出一個完整的Sample,下面是整個Solution的結構:

除了一貫使用的4層結構(Contract-Service-Hosting-Client),還為ResponseService增加了下面兩層:
-
Localservice: 作為Client Domain的ResponseService。
-
LocalHosting:Host Localservice。
1.Contract: Artech.ResponsiveQueuedService.Contract
Service Contract: Artech.ResponsiveQueuedService.Contract. IOrderProcessor
using System;
using System.Collections.Generic;
using System.Text;
using System.ServiceModel;
namespace Artech.ResponsiveQueuedService.Contract
{
[ServiceContract]
[ServiceKnownType(typeof(Order))]
public interface IOrderProcessor
{
[OperationContract(IsOneWay = true)]
void Submit(Order order);
}
}
Service Contract: Artech.ResponsiveQueuedService.Contract.IOrderRessponse
using System;
using System.Collections.Generic;
using System.Text;
using System.ServiceModel;
namespace Artech.ResponsiveQueuedService.Contract
{
[ServiceContract]
public interface IOrderRessponse
{
[OperationContract(IsOneWay =true)]
void SubmitOrderResponse(Guid orderNo,FaultException exception);
}
}
接收來自Order processing端的Response:Order No.和Exception。
Data Contract: Artech.ResponsiveQueuedService.Contract.Order
using System;
using System.Collections.Generic;
using System.Text;
using System.Runtime.Serialization;
namespace Artech.ResponsiveQueuedService.Contract
{
[DataContract]
public class Order
{
Private Fields
Constructors
Public Properties
Public Methods
}
}
對Order的封裝。
Data Contract:Artech.ResponsiveQueuedService.Contract. OrderResponseContext
using System;
using System.Collections.Generic;
using System.Text;
using System.Runtime.Serialization;
using System.ServiceModel;
namespace Artech.ResponsiveQueuedService.Contract
{
[DataContract]
public class OrderResponseContext
{
private Uri _responseAddress;
[DataMember]
public Uri ResponseAddress
{
get { return _responseAddress; }
set { _responseAddress = value; }
}
public static OrderResponseContext Current
{
get
{
if (OperationContext.Current == null)
{
return null;
}
return OperationContext.Current.IncomingMessageHeaders.GetHeader<OrderResponseContext>("OrderResponseContext", "Artech.ResponsiveQueuedService.Contract");
}
set
{
MessageHeader<OrderResponseContext> header = new MessageHeader<OrderResponseContext>(value);
OperationContext.Current.OutgoingMessageHeaders.Add(header.GetUntypedHeader("OrderResponseContext", "Artech.ResponsiveQueuedService.Contract"));
}
}
}
}
ResponseAddress代表Host在Client Domain的Response Service的Address。同過Current把OrderResponseContext插入到Outgoing Message Headers中、以及從Ingoing Message Headers取出OrderResponseContext對象。
2.Order Processing Service:Artech.ResponsiveQueuedService.Service
using System;
using System.Collections.Generic;
using System.Text;
using Artech.ResponsiveQueuedService.Contract;
using System.ServiceModel;
using System.Net.Security;
namespace Artech.ResponsiveQueuedService.Service
{
public class OrderProcessorService:IOrderProcessor
{
private void ProcessOrder(Order order)
{
if (order.OrderDate < DateTime.Today)
{
throw new Exception();
}
}
IOrderProcessor Members
}
}
在這里我們模擬了這樣的場景:先通過Order Date判斷Order是否過期,如果過期創建一個FaultException,否則正確處理該Order,然后通過OrderResponseContext.Current從Incoming Message Header中獲取封裝在OrderResponseContext對象中的Response Address,創建Binding并調用Response Service.
3. Order Processing Service Hosting: Artech.ResponsiveQueuedService.Hosting
Configuration
<?xml version="1.0" encoding="utf-8" ?>
<configuration>
<appSettings>
<add key="msmqPath" value=".\private$\orderprocessor"/>
</appSettings>
<system.serviceModel>
<bindings>
<netMsmqBinding>
<binding name="MsmqBinding" exactlyOnce="false" useActiveDirectory="false">
<security>
<transport msmqAuthenticationMode="None" msmqProtectionLevel="None" />
</security>
</binding>
</netMsmqBinding>
</bindings>
<services>
<service name="Artech.ResponsiveQueuedService.Service.OrderProcessorService">
<endpoint address="net.msmq://localhost/private/orderprocessor" binding="netMsmqBinding"
bindingConfiguration="MsmqBinding" contract="Artech.ResponsiveQueuedService.Contract.IOrderProcessor" />
</service>
</services>
</system.serviceModel>
</configuration>
Program
using System;
using System.Collections.Generic;
using System.Text;
using Artech.ResponsiveQueuedService.Service;
using System.ServiceModel;
using System.Configuration;
using System.Messaging;
namespace Artech.ResponsiveQueuedService.Hosting
{
class Program
{
static void Main(string[] args)
{
string path = ConfigurationManager.AppSettings["msmqPath"];
if (!MessageQueue.Exists(path))
{
MessageQueue.Create(path);
}
using (ServiceHost host = new ServiceHost(typeof(OrderProcessorService)))
{
host.Opened += delegate
{
Console.WriteLine("The Order Processor service has begun to listen
");
};
host.Open();
Console.Read();
}
}
}
}
4. Response Service: Artech.ResponsiveQueuedService.LocalService.OrderRessponseService
using System;
using System.Collections.Generic;
using System.Text;
using Artech.ResponsiveQueuedService.Contract;
using System.ServiceModel;
namespace Artech.ResponsiveQueuedService.LocalService
{
public class OrderRessponseService : IOrderRessponse
{
IOrderRessponse Members
}
}
5. Response Service Hosting: Artech.ResponsiveQueuedService.LocalhHosting
Configuration
<?xml version="1.0" encoding="utf-8" ?>
<configuration>
<appSettings>
<add key="msmqPath" value=".\private$\orderresponse"/>
</appSettings>
<system.serviceModel>
<bindings>
<netMsmqBinding>
<binding name="msmqBinding" exactlyOnce="false">
<security>
<transport msmqAuthenticationMode="None" msmqProtectionLevel="None" />
</security>
</binding>
</netMsmqBinding>
</bindings>
<services>
<service name="Artech.ResponsiveQueuedService.LocalService.OrderRessponseService">
<endpoint address="net.msmq://localhost/private/orderresponse" binding="netMsmqBinding"
bindingConfiguration="msmqBinding" contract="Artech.ResponsiveQueuedService.Contract.IOrderRessponse" />
</service>
</services>
</system.serviceModel>
</configuration>
Program
using System;
using System.Collections.Generic;
using System.Text;
using Artech.ResponsiveQueuedService.LocalService;
using System.Configuration;
using System.ServiceModel;
using System.Messaging;
namespace Artech.ResponsiveQueuedService.LocalhHosting
{
class Program
{
static void Main(string[] args)
{
string path = ConfigurationManager.AppSettings["msmqPath"];
if (!MessageQueue.Exists(path))
{
MessageQueue.Create(path);
}
using (ServiceHost host = new ServiceHost(typeof(OrderRessponseService)))
{
host.Opened += delegate
{
Console.WriteLine("The Order Response service has begun to listen
");
};
host.Open();
Console.Read();
}
}
}
}
6. Client: Artech.ResponsiveQueuedService.Client
Configuration:
<?xml version="1.0" encoding="utf-8" ?>
<configuration>
<appSettings>
<add key="msmqPath" value="net.msmq://localhost/private/orderresponse"/>
</appSettings>
<system.serviceModel>
<bindings>
<netMsmqBinding>
<binding name="MsmqBinding" exactlyOnce="false" useActiveDirectory="false">
<security>
<transport msmqAuthenticationMode="None" msmqProtectionLevel="None" />
</security>
</binding>
</netMsmqBinding>
</bindings>
<client>
<endpoint address="net.msmq://localhost/private/orderprocessor" binding="netMsmqBinding"
bindingConfiguration="MsmqBinding" contract="Artech.ResponsiveQueuedService.Contract.IOrderProcessor" name="defaultEndpoint" />
</client>
</system.serviceModel>
</configuration>
Program:
using System;
using System.Collections.Generic;
using System.Text;
using System.Configuration;
using System.ServiceModel;
using Artech.ResponsiveQueuedService.Contract;
using System.Messaging;
namespace Artech.ResponsiveQueuedService.Clinet
{
class Program
{
static void Main(string[] args)
{
Order order1 = new Order(Guid.NewGuid(), DateTime.Today.AddDays(5), Guid.NewGuid(), "Supplier A");
Order order2 = new Order(Guid.NewGuid(), DateTime.Today.AddDays(-5), Guid.NewGuid(), "Supplier A");
string path = ConfigurationManager.AppSettings["msmqPath"];
Uri address = new Uri(path);
OrderResponseContext context = new OrderResponseContext();
context.ResponseAddress = address;
ChannelFactory<IOrderProcessor> channelFactory = new ChannelFactory<IOrderProcessor>("defaultEndpoint");
IOrderProcessor orderProcessor = channelFactory.CreateChannel();
using (OperationContextScope contextScope = new OperationContextScope(orderProcessor as IContextChannel))
{
Console.WriteLine("Submit the order of order No.: {0}", order1.OrderNo);
OrderResponseContext.Current = context;
orderProcessor.Submit(order1);
}
using (OperationContextScope contextScope = new OperationContextScope(orderProcessor as IContextChannel))
{
Console.WriteLine("Submit the order of order No.: {0}", order2.OrderNo);
OrderResponseContext.Current = context;
orderProcessor.Submit(order2);
}
Console.Read();
}
}
}
我創建了兩個Order對象, 其中一個已經過期。從Configuration中取出Response Address并購建一個OrderResponseContext,然后分兩次將這兩個Order向Order Processing Service遞交。在調用Order Processing Order的Operation Context Scope中,通過OrderResponseContext.Current將OrderResponseContext對象插入Outcoming Message Header中。
我們現在運行一下整個程序,看看最終的輸出結果:
Client:

Order Processing:

Order Response:
Reference:
Build a Queued WCF Response Service
WCF相關內容:
[原創]我的WCF之旅(1):創建一個簡單的WCF程序
[原創]我的WCF之旅(2):Endpoint Overview
[原創]我的WCF之旅(3):在WCF中實現雙向通信(Bi-directional Communication)
[原創]我的WCF之旅(4):WCF中的序列化(Serialization)- Part I
[原創]我的WCF之旅(4):WCF中的序列化(Serialization)- Part II
[原創]我的WCF之旅(5):Service Contract中的重載(Overloading)
[原創]我的WCF之旅(6):在Winform Application中調用Duplex Service出現TimeoutException的原因和解決方案
[原創]我的WCF之旅(7):面向服務架構(SOA)和面向對象編程(OOP)的結合——如何實現Service Contract的繼承
[原創]我的WCF之旅(8):WCF中的Session和Instancing Management
[原創]我的WCF之旅(9):如何在WCF中使用tcpTrace來進行Soap Trace
[原創]我的WCF之旅(10): 如何在WCF進行Exception Handling
[原創]我的WCF之旅(11):再談WCF的雙向通訊-基于Http的雙向通訊 V.S. 基于TCP的雙向通訊
[原創]我的WCF之旅(12):使用MSMQ進行Reliable Messaging
[原創]我的WCF之旅(13):創建基于MSMQ的Responsive Service


我們知道MSMQ天生就具有異步的特性,它只能以One-way的MEP(Message Exchange Pattern)進行通信。Client和Service之間采用One-way MEP的話就意味著Client調用Service之后立即返回,它無法獲得Service的執行結果,也無法捕捉Service運行的Exception。
但是在有些場景 中,這是無法容忍的。再拿我在上一篇文章的Order Delivery的例子來說。Client向Service提交了Order,卻無法確認該Order是否被Service正確處理,這顯然是不能接受的。我們今天就來討論一下,如何創建一個Responsive Service來解決這個問題:Client不再是對Service的執行情況一無所知,它可以獲知Order是否被Service正確處理了。

浙公網安備 33010602011771號