WCF進階:將編碼后的字節流壓縮傳輸
在前面兩篇文章WCF進階:將消息正文Base64編碼和WCF進階:為每個操作附加身份信息中講述了如何通過攔截消息的方式來記錄或者修改消息,這種方式在特定條件下可以改變消息編碼格式,但實現方式并不規范,而且使用范圍也有限制。 WCF缺省提供了三種編碼器(MessageEncoder):TextMessageEncoder,BinaryMessageEncoder,MtomMessageEncoder。事實上也是基于XML可以有三種格式:Text,Binary,MTOM,而XmlDictionaryWriter也提供了三種創建Writer的方法,CreateTextWriter,CreateBinaryWriter,CreateMtomWriter他們分別用于將XML以文本,二進制,MTOM保存。三種保存形式各有利弊,Text便于理解,更通用,Binary體積小,MTOM是優化之后的二進制,適用于較大的二進制傳輸。但無論使用哪種,最終在網絡上傳輸的都是字節流或者叫字節數組。在Binding中處于最后一個的總是TransportBindingElement,也就是說當要傳遞的數據到達TransportBindingElement之后,其實已經是字節數組了。在TransportBindingElement之上,可以有事務處理器,會話處理器,消息安全處理器,編碼器,傳輸安全處理器等,在編碼器之前,主要的處理對象是Message,而之后,主要的處理對象就是Stream(Byte[]),從這點我們也就清楚了之前在學習安全體系的時候,總是習慣將安全劃分為消息級別的安全和傳輸級別安全兩種了。我們這次要實現的其實是將已經經過編碼器編碼好的字節流壓縮后傳輸,而不是傳統意義上的消息編碼。這點也需要大家深層次的理解。
雖然我們實現的傳輸層面上的壓縮編碼,但實現機理和自定義MessageEncoder是一樣的,MessageEncoder是所有編碼器的基類,它有幾個非常重要的方法和屬性
public abstract Message ReadMessage(ArraySegment<byte> buffer, BufferManager bufferManager, string contentType); public ArraySegment<byte> WriteMessage(Message message, int maxMessageSize, BufferManager bufferManager); // Properties public abstract string ContentType { get; } public abstract string MediaType { get; } public abstract MessageVersion MessageVersion { get; }
ReadMessage和WriteMessage是完成消息 <->字節數組轉換的,這兩個方法(還有幾個重載),在實現自定義編碼器的時候是最為重要的,我們主要是通過重寫它們來完成自定義轉換。上面我們也說過了,我們要實現的是傳輸層的壓縮編碼,那么需要有一個缺省編碼,為此我們設計了名為CompressEncoder的自定義編碼類:
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.ServiceModel.Channels; using System.IO; namespace RobinLib { public class CompressEncoder : MessageEncoder { CompressEncoderFactory factory; MessageEncoder innserEncoder; private CompressAlgorithm algorithm; public CompressEncoder(CompressEncoderFactory encoderFactory, CompressAlgorithm algorithm) { factory = encoderFactory; this.algorithm = algorithm; innserEncoder = factory.InnerMessageEncodingBindingElement.CreateMessageEncoderFactory().Encoder; } public override string ContentType { get { return innserEncoder.ContentType; } } public override string MediaType { get { return innserEncoder.MediaType; } } public override MessageVersion MessageVersion { get { return innserEncoder.MessageVersion; } } public override bool IsContentTypeSupported(string contentType) { return innserEncoder.IsContentTypeSupported(contentType); } public override T GetProperty<T>() { return innserEncoder.GetProperty<T>(); } public override Message ReadMessage(ArraySegment<byte> buffer, BufferManager bufferManager, string contentType) { ArraySegment<byte> bytes = new Compressor(algorithm).DeCompress(buffer); int totalLength = bytes.Count; byte[] totalBytes = bufferManager.TakeBuffer(totalLength); Array.Copy(bytes.Array, 0, totalBytes, 0, bytes.Count); ArraySegment<byte> byteArray = new ArraySegment<byte>(totalBytes, 0, bytes.Count); bufferManager.ReturnBuffer(byteArray.Array); Message msg = innserEncoder.ReadMessage(byteArray, bufferManager, contentType); return msg; } public override Message ReadMessage(System.IO.Stream stream, int maxSizeOfHeaders, string contentType) { //讀取消息的時候,二進制流為加密的,需要解壓 Stream ms = new Compressor(algorithm).DeCompress(stream); Message msg = innserEncoder.ReadMessage(ms, maxSizeOfHeaders, contentType); return msg; } public override ArraySegment<byte> WriteMessage(Message message, int maxMessageSize, BufferManager bufferManager, int messageOffset) { ArraySegment<byte> bytes = innserEncoder.WriteMessage(message, maxMessageSize, bufferManager); ArraySegment<byte> buffer = new Compressor(algorithm).Compress(bytes); int totalLength = buffer.Count + messageOffset; byte[] totalBytes = bufferManager.TakeBuffer(totalLength); Array.Copy(buffer.Array, 0, totalBytes, messageOffset, buffer.Count); ArraySegment<byte> byteArray = new ArraySegment<byte>(totalBytes, messageOffset, buffer.Count); Console.WriteLine("算法:"+algorithm+",原來字節流大小:"+bytes.Count+",壓縮后字節流大小:"+byteArray.Count); return byteArray; } public override void WriteMessage(Message message, System.IO.Stream stream) { System.IO.MemoryStream ms = new System.IO.MemoryStream(); innserEncoder.WriteMessage(message, ms); stream = new Compressor(algorithm).Compress(ms); } } }
在這個類中需要知道上層編碼器是什么,我們用MessageEncoder innserEncoder來指定,在WriteMessage時候,將消息用內置編碼器轉換為字節數組,然后用壓縮算法壓縮這個數組,形成壓縮后字節數組傳遞給到下一層,而在讀取Message的時候,首先將收到的字節數組解壓縮,最后將解壓縮后字節數組用內置編碼器轉換為Message對象。其中Compressor是一個功能類,用于將字節數組壓縮或者解壓縮,代碼為:
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.IO; using System.IO.Compression; namespace RobinLib { public class Compressor { private CompressAlgorithm algorithm; public Compressor(CompressAlgorithm algorithm) { this.algorithm = algorithm; } //壓縮數組 public ArraySegment<byte> Compress(ArraySegment<byte> data) { MemoryStream ms = new MemoryStream(); if (algorithm == CompressAlgorithm.GZip) { Stream compressStream = new GZipStream(ms, CompressionMode.Compress, true); compressStream.Write(data.Array, 0, data.Count); compressStream.Close(); } else { Stream compressStream = new DeflateStream(ms, CompressionMode.Compress, true); compressStream.Write(data.Array, 0, data.Count); compressStream.Close(); } byte[] newByteArray = new byte[ms.Length]; ms.Seek(0, SeekOrigin.Begin); ms.Read(newByteArray, 0, newByteArray.Length); ArraySegment<byte> bytes = new ArraySegment<byte>(newByteArray); return bytes; } //壓縮流 public Stream Compress(Stream stream) { MemoryStream ms = new MemoryStream(); if (algorithm == CompressAlgorithm.GZip) { Stream compressStream = new GZipStream(ms, CompressionMode.Compress, true); byte[] buffer = new byte[stream.Length]; stream.Read(buffer, 0, buffer.Length); compressStream.Write(buffer, 0, buffer.Length); compressStream.Close(); } else { Stream compressStream = new DeflateStream(ms, CompressionMode.Compress, true); byte[] buffer = new byte[stream.Length]; stream.Read(buffer, 0, buffer.Length); compressStream.Write(buffer, 0, buffer.Length); compressStream.Close(); } return ms; } //解壓縮數組 public ArraySegment<byte> DeCompress(ArraySegment<byte> data) { MemoryStream ms = new MemoryStream(); ms.Write(data.Array, 0, data.Count); ms.Seek(0, SeekOrigin.Begin); if (algorithm == CompressAlgorithm.GZip) { Stream compressStream = new GZipStream(ms, CompressionMode.Decompress, false); byte[] newByteArray = RetrieveBytesFromStream(compressStream, 1); compressStream.Close(); return new ArraySegment<byte>(newByteArray); } else { Stream compressStream = new DeflateStream(ms, CompressionMode.Decompress, false); byte[] newByteArray = RetrieveBytesFromStream(compressStream, 1); compressStream.Close(); return new ArraySegment<byte>(newByteArray); } } //解壓縮數組 public Stream DeCompress(Stream stream) { stream.Seek(0, SeekOrigin.Begin); if (algorithm == CompressAlgorithm.GZip) { Stream compressStream = new GZipStream(stream, CompressionMode.Decompress, false); byte[] newByteArray = RetrieveBytesFromStream(compressStream, 1); compressStream.Close(); return new MemoryStream(newByteArray); } else { Stream compressStream = new DeflateStream(stream, CompressionMode.Decompress, false); byte[] newByteArray = RetrieveBytesFromStream(compressStream, 1); compressStream.Close(); return new MemoryStream(newByteArray); } } public static byte[] RetrieveBytesFromStream(Stream stream, int bytesblock) { List<byte> lst = new List<byte>(); byte[] data = new byte[1024]; int totalCount = 0; while (true) { int bytesRead = stream.Read(data, 0, data.Length); if (bytesRead == 0) { break; } byte[] buffers = new byte[bytesRead]; Array.Copy(data, buffers, bytesRead); lst.AddRange(buffers); totalCount += bytesRead; } return lst.ToArray(); } } }
到此,其實我們的自定義編碼器應該編寫好了,接下來如何使用它成為我們最為關心的事情。每一個MessageEncoder都對應一個MessageEncoderFactory,在MessageEncodingBindingElement中能返回這個MessageEncoderFactory,然后通過在自定義BindingElement創建監聽通道(BuildChannelListener)和通道工廠(BuildChannelFactory)的時候,將BindingElement添加到BindingContext,這樣就能最終消費我們上面實現的CompressEncoder。
CompressEncoderFactory的代碼實現為:
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.ServiceModel.Channels; namespace RobinLib { public class CompressEncoderFactory:MessageEncoderFactory { private MessageEncodingBindingElement innerMessageEncodingBindingElement; CompressEncoder messageEncoder; private CompressAlgorithm algorithm; public CompressEncoderFactory(MessageEncodingBindingElement innerMessageEncodingBindingElement, CompressAlgorithm algorithm) { this.innerMessageEncodingBindingElement = innerMessageEncodingBindingElement; this.algorithm = algorithm; messageEncoder = new CompressEncoder(this,algorithm); } public override MessageEncoder CreateSessionEncoder() { return base.CreateSessionEncoder(); } public override MessageEncoder Encoder { get { return messageEncoder; } } public override MessageVersion MessageVersion { get { return innerMessageEncodingBindingElement.MessageVersion; } } public MessageEncodingBindingElement InnerMessageEncodingBindingElement { get { return innerMessageEncodingBindingElement; } } } }
自定義的MessageEncoderBindingElement代碼為:
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.ServiceModel.Channels; using System.ServiceModel; using System.Xml; namespace RobinLib { public sealed class CompressEncodingBindingElement : MessageEncodingBindingElement { private XmlDictionaryReaderQuotas readerQuotas; private MessageEncodingBindingElement innerMessageEncodingBindingElement; private CompressAlgorithm algorithm; public MessageEncodingBindingElement InnerMessageEncodingBindingElement { get { return innerMessageEncodingBindingElement; } } public CompressAlgorithm CompressAlgorithm { get { return algorithm; } } public CompressEncodingBindingElement(MessageEncodingBindingElement innerMessageEncodingBindingElement, CompressAlgorithm algorithm) { this.readerQuotas = new XmlDictionaryReaderQuotas(); this.algorithm = algorithm; this.innerMessageEncodingBindingElement = innerMessageEncodingBindingElement; } public override IChannelFactory<TChannel> BuildChannelFactory<TChannel>(BindingContext context) { context.BindingParameters.Add(this); return context.BuildInnerChannelFactory<TChannel>(); } public override IChannelListener<TChannel> BuildChannelListener<TChannel>(BindingContext context) { context.BindingParameters.Add(this); return context.BuildInnerChannelListener<TChannel>(); } public override bool CanBuildChannelFactory<TChannel>(BindingContext context) { context.BindingParameters.Add(this); return context.CanBuildInnerChannelFactory<TChannel>(); } public override bool CanBuildChannelListener<TChannel>(BindingContext context) { context.BindingParameters.Add(this); return context.CanBuildInnerChannelListener<TChannel>(); } public override MessageEncoderFactory CreateMessageEncoderFactory() { return new CompressEncoderFactory(innerMessageEncodingBindingElement,algorithm); } public override T GetProperty<T>(BindingContext context) { if (typeof(T) == typeof(XmlDictionaryReaderQuotas)) { return this.readerQuotas as T; } return base.GetProperty<T>(context); } public override MessageVersion MessageVersion { get { return innerMessageEncodingBindingElement.MessageVersion; } set { innerMessageEncodingBindingElement.MessageVersion = value; } } public override BindingElement Clone() { return new CompressEncodingBindingElement(innerMessageEncodingBindingElement,algorithm); } } }
最終,我們可以使用CustomeBinding創建宿主和客戶端。
服務端:
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.ServiceModel; using Robin_Wcf_CustomMessageEncoder_SvcLib; using System.ServiceModel.Channels; using RobinLib; namespace Robin_Wcf_CustomMessageEncoder_Host { class Program { static void Main(string[] args) { //服務地址 Uri baseAddress = new Uri("http://127.0.0.1:8081/Robin_Wcf_Formatter"); ServiceHost host = new ServiceHost(typeof(Service1), new Uri[] { baseAddress }); //服務綁定 ICollection<BindingElement> bindingElements = new List<BindingElement>(); HttpTransportBindingElement httpBindingElement = new HttpTransportBindingElement(); CompressEncodingBindingElement textBindingElement = new CompressEncodingBindingElement(new TextMessageEncodingBindingElement(),CompressAlgorithm.Deflate); bindingElements.Add(textBindingElement); bindingElements.Add(httpBindingElement); CustomBinding bind = new CustomBinding(bindingElements); host.AddServiceEndpoint(typeof(IService1), bind, ""); if (host.Description.Behaviors.Find<System.ServiceModel.Description.ServiceMetadataBehavior>() == null) { System.ServiceModel.Description.ServiceMetadataBehavior svcMetaBehavior = new System.ServiceModel.Description.ServiceMetadataBehavior(); svcMetaBehavior.HttpGetEnabled = true; svcMetaBehavior.HttpGetUrl = new Uri("http://127.0.0.1:8001/Mex"); host.Description.Behaviors.Add(svcMetaBehavior); } host.Opened += new EventHandler(delegate(object obj, EventArgs e) { Console.WriteLine("服務已經啟動!"); }); host.Open(); Console.Read(); } } }
客戶端:
using System; using System.Collections.Generic; using System.Linq; using System.Text; using RobinLib; using System.ServiceModel.Channels; using Robin_Wcf_CustomMessageEncoder_ClientApp.ServiceReference1; namespace Robin_Wcf_CustomMessageEncoder_ClientApp { class Program { static void Main(string[] args) { System.Threading.Thread.Sleep(5300); ICollection<BindingElement> bindingElements = new List<BindingElement>(); HttpTransportBindingElement httpBindingElement = new HttpTransportBindingElement(); CompressEncodingBindingElement textBindingElement = new CompressEncodingBindingElement(new BinaryMessageEncodingBindingElement(), CompressAlgorithm.GZip); bindingElements.Add(textBindingElement); bindingElements.Add(httpBindingElement); CustomBinding bind = new CustomBinding(bindingElements); ServiceReference1.IService1 svc = new ServiceReference1.Service1Client(bind, new System.ServiceModel.EndpointAddress("http://127.0.0.1:8081/Robin_Wcf_Formatter")); string pres = svc.GetData(10); Console.WriteLine(pres); CompositeType ct = svc.GetDataUsingDataContract(new CompositeType()); System.IO.MemoryStream ms = new System.IO.MemoryStream(); for (int i = 0; i < 1000000; i++) { byte[] buffer = BitConverter.GetBytes(i); ms.Write(buffer, 0, buffer.Length); } System.IO.Stream stream = svc.GetStream(ms); Console.Read(); } } }
我們可以更改CompressEncodingBindingElement textBindingElement = new CompressEncodingBindingElement(new BinaryMessageEncodingBindingElement(), CompressAlgorithm.GZip);,指定內置MessageEncoder和壓縮算法。
最后附件一句,不是所有的數組壓縮后體積都變小的,只有文本類型的壓縮后,效果比較明顯。運行程序,當內置TextMessageEncodingBindingElement的時候,我們得到的效果為:
此時說明壓縮效果非常明顯,
而當內置BinaryMessageEncodingBindingElement的時候,壓縮效果不再突出,甚至起到反作用。
如果有朋友需要使用壓縮傳輸,可以直接下載項目,引用其中的RobinLib.dll,然后使用自定義Binding。
項目文件:/jillzhang/Robin_Wcf_CustomMessageEncoder.rar
下文我們將演示實現對稱加密傳輸。
出處:http://jillzhang.cnblogs.com/
本文版權歸作者和博客園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文連接,否則保留追究法律責任的權利。

浙公網安備 33010602011771號