第七章:C#響應(yīng)式編程System.Reactive
第七章:C#響應(yīng)式編程System.Reactive
響應(yīng)式編程(Reactive Programming)是一種通過響應(yīng)事件來編寫代碼的編程方式,特別適合處理用戶界面或任何需要應(yīng)對外部事件的程序。它避免了輪詢的開銷,允許系統(tǒng)在事件發(fā)生時自動觸發(fā)代碼。
更多詳細(xì)內(nèi)容參考:Introduction to Rx.NET
7.1 為什么選擇響應(yīng)式編程?
在現(xiàn)代應(yīng)用程序中,異步操作和事件驅(qū)動的需求變得越來越普遍。無論是響應(yīng)用戶界面的交互、處理實時數(shù)據(jù)流,還是處理 I/O 操作,程序都需要能夠高效地處理隨時可能發(fā)生的事件。傳統(tǒng)的編程方式有時顯得笨重、復(fù)雜,難以處理這些情況。而 Rx.NET(Reactive Extensions for .NET) 提供了一種聲明式、簡潔的方式來處理異步事件流。
1. 事件流的重要性
事件流(streams of events)在許多應(yīng)用中是核心部分,例如:
- 用戶交互:按鈕點擊、輸入變化等。
- 實時數(shù)據(jù):股票行情、傳感器數(shù)據(jù)等。
- 異步操作:網(wǎng)絡(luò)請求、文件讀寫等。
傳統(tǒng)上,這些操作通常通過回調(diào)、事件處理或異步編程模式來實現(xiàn),但這些方式容易導(dǎo)致代碼復(fù)雜、難以維護。
2. Rx.NET 的優(yōu)勢
Rx.NET 通過提供一個統(tǒng)一的方式來處理事件流,解決了傳統(tǒng)方法的許多問題。它的優(yōu)點包括:
-
聲明式的事件流處理:通過 Rx.NET,開發(fā)者可以像操作集合(如數(shù)組、列表)一樣處理事件流。你可以用熟悉的 LINQ 風(fēng)格的方法來過濾、轉(zhuǎn)換和組合事件。
-
簡化異步編程:Rx.NET 內(nèi)置了對異步事件的處理,避免了復(fù)雜的回調(diào)地獄。它提供了強大的工具來處理并發(fā)和異步任務(wù)。
-
處理復(fù)雜的事件交互:Rx.NET 可以輕松處理多個事件源的組合、合并和分割。例如,你可以將兩個不同的事件流合并,然后根據(jù)需要對它們進行處理。
-
更好的錯誤處理:Rx.NET 通過其流式操作,提供了一個一致的錯誤處理機制,避免了散亂的 try-catch 塊。
-
可測試性:Rx.NET 提供了內(nèi)置的工具,可以模擬事件流,幫助開發(fā)者輕松測試異步操作。
3. Rx.NET 的適用場景
Rx.NET 適用于處理異步事件流的場景,包括但不限于:
- 用戶界面編程:響應(yīng)用戶輸入,處理復(fù)雜的界面交互。
- 實時數(shù)據(jù)處理:處理實時的市場數(shù)據(jù)、傳感器數(shù)據(jù)等。
- 并發(fā)處理:協(xié)調(diào)多個異步操作,優(yōu)化系統(tǒng)性能。
- 響應(yīng)式系統(tǒng):例如聊天應(yīng)用、社交媒體更新、流媒體播放等。
4. Rx.NET 的核心思想
Rx.NET 的核心思想是將一切看作一個流。不管是鍵盤按鍵、鼠標(biāo)點擊,還是復(fù)雜的異步操作,Rx.NET 都將其抽象為 IObservable<T>,允許開發(fā)者通過 IObserver<T> 訂閱并響應(yīng)這些事件。
這種序列化思維(thinking in sequences)是 Rx.NET 最強大的概念之一。它讓開發(fā)者可以將傳統(tǒng)的順序操作模式,轉(zhuǎn)變?yōu)閷κ录牧魇讲僮鳎喕藦?fù)雜業(yè)務(wù)邏輯的表達(dá)。
小結(jié)
Rx.NET 通過其聲明式的事件流處理方式,簡化了異步編程,尤其適合需要應(yīng)對異步事件和并發(fā)操作的場景。它不僅減少了代碼的復(fù)雜度,還提供了強大的工具來處理事件流的組合、轉(zhuǎn)換和錯誤處理。通過 Rx.NET,你可以更輕松地構(gòu)建高效、可擴展的響應(yīng)式系統(tǒng)。
7.2 主要概念和類型
在深入了解如何將 .NET 事件轉(zhuǎn)換為可觀察的流之前,我們需要先掌握一些 Reactive Extensions (Rx) 的核心概念和類型。理解這些基礎(chǔ)知識將幫助我們在后續(xù)章節(jié)中更輕松地使用 Rx.NET 處理事件流和異步數(shù)據(jù)流。
若要使用System.Reactive,需要在應(yīng)用程序中安裝用于System.Reactive的NuGet包。
Rx.NET和System.Reactive是同一個東西,只是不同的叫法而已。只不過Rx.NET更多是在上下文中用作簡稱,而System.Reactive則是具體的包名。
1. IObservable<T>
IObservable<T> 是 Rx 的核心接口,它代表一個推送(push)數(shù)據(jù)流的源。它與傳統(tǒng)的 IEnumerable<T> 不同,IEnumerable<T> 是拉取(pull)數(shù)據(jù)的方式,而 IObservable<T> 是推送數(shù)據(jù)的方式。IObservable<T> 可以用來表示一個事件流、異步操作、或其他隨時間變化的數(shù)據(jù)源。
public interface IObservable<out T>
{
IDisposable Subscribe(IObserver<T> observer);
}
如何使用:如果你有一個 IObservable<T>,你可以通過調(diào)用 Subscribe 方法來訂閱它,從而接收數(shù)據(jù)流中的事件。Subscribe 方法返回一個 IDisposable,允許你取消訂閱以停止接收數(shù)據(jù)。
2. IObserver<T>
IObserver<T> 是與 IObservable<T> 相對應(yīng)的接口,它定義了如何處理來自 IObservable<T> 的事件流。IObserver<T> 有三個方法:
OnNext(T value):當(dāng)有新數(shù)據(jù)推送時調(diào)用。OnError(Exception error):當(dāng)發(fā)生錯誤時調(diào)用。OnCompleted():當(dāng)數(shù)據(jù)流完成時調(diào)用。
public interface IObserver<in T>
{
void OnNext(T value);
void OnError(Exception error);
void OnCompleted();
}
如何使用:當(dāng)你訂閱一個 IObservable<T> 時,你通常會提供一個 IObserver<T>,該 IObserver<T> 定義了如何處理每一個數(shù)據(jù)事件、錯誤以及完成通知。
3. Push vs Pull(推 vs 拉)
-
Pull 模式:在傳統(tǒng)的
IEnumerable<T>中,消費者通過foreach從集合中拉取數(shù)據(jù),數(shù)據(jù)的產(chǎn)生是由消費者控制的。 -
Push 模式:在
IObservable<T>中,數(shù)據(jù)是主動推送給消費者的,消費者只需要訂閱,不需要主動請求數(shù)據(jù)。數(shù)據(jù)的產(chǎn)生是由IObservable<T>控制的,消費者被動接收。
4. 熱(Hot)和冷(Cold)Observable
IObservable<T> 可以分為熱和冷兩種類型:
-
冷 Observable:數(shù)據(jù)流在訂閱時才開始產(chǎn)生事件。每個訂閱者會從頭開始接收數(shù)據(jù)。例如,讀取文件或從集合中推送數(shù)據(jù)。
-
熱 Observable:數(shù)據(jù)流在創(chuàng)建時就開始產(chǎn)生事件。訂閱者只能接收到在它訂閱之后產(chǎn)生的事件,訂閱之前發(fā)生的事件將無法捕獲。例如,鼠標(biāo)點擊事件、傳感器數(shù)據(jù)等實時事件。
5. 訂閱與取消訂閱
當(dāng)我們訂閱一個 IObservable<T> 時,實際上是注冊了一個 IObserver<T>,以便接收事件。這個訂閱可以通過 IDisposable 接口來取消,以終止繼續(xù)接收事件。
var subscription = observable.Subscribe(
onNext: value => Console.WriteLine($"Received: {value}"),
onError: error => Console.WriteLine($"Error: {error.Message}"),
onCompleted: () => Console.WriteLine("Completed")
);
// 取消訂閱
subscription.Dispose();
6. 常用操作符
Rx.NET 提供了大量操作符,用于轉(zhuǎn)換、過濾和組合事件流。這些操作符類似于 LINQ,允許我們以聲明式的方式處理數(shù)據(jù)流。常用的操作符包括:
Select:類似于 LINQ 的Select操作符,用于映射數(shù)據(jù)流中的每個元素。Where:用于過濾數(shù)據(jù)流,只保留符合條件的元素。Merge:合并多個IObservable<T>數(shù)據(jù)流。Throttle:對事件流進行節(jié)流,忽略短時間內(nèi)重復(fù)的事件。
observable
.Where(value => value > 10)
.Select(value => value * 2)
.Subscribe(value => Console.WriteLine($"Processed value: {value}"));
7. 調(diào)度器(Schedulers)
Rx.NET 中的調(diào)度器用于控制代碼在特定線程或上下文中執(zhí)行。常見的調(diào)度器有:
Scheduler.CurrentThread:在當(dāng)前線程上執(zhí)行。Scheduler.NewThread:在新線程上執(zhí)行。Scheduler.Default:在線程池中執(zhí)行。DispatcherScheduler:用于 WPF 或 WinForms 應(yīng)用程序的 UI 線程調(diào)度。
你可以使用調(diào)度器來控制數(shù)據(jù)流的訂閱和觀察行為:
observable
.ObserveOn(Scheduler.NewThread) // 在新線程上觀察數(shù)據(jù)
.SubscribeOn(Scheduler.CurrentThread) // 在當(dāng)前線程上訂閱
.Subscribe(value => Console.WriteLine($"Received on new thread: {value}"));
8. 總結(jié)
在 Rx.NET 中,IObservable<T> 和 IObserver<T> 是最基本的構(gòu)建塊。IObservable<T> 用于表示事件流,而 IObserver<T> 則用于處理這些事件。通過訂閱一個 IObservable<T>,我們可以獲得事件的推送,并使用各種 LINQ 風(fēng)格的操作符來對事件流進行處理。理解這些基礎(chǔ)概念有助于我們更好地在后續(xù)章節(jié)中處理 .NET 事件并將其轉(zhuǎn)換為響應(yīng)式的 IObservable<T> 數(shù)據(jù)流。
7.3 創(chuàng)建可觀察序列
在 Rx.NET 中,可觀察序列(Observable Sequence) 是事件流的核心。一個可觀察序列可以是任何類型的異步數(shù)據(jù)源,例如按鈕點擊、傳感器數(shù)據(jù)、或者網(wǎng)絡(luò)請求的結(jié)果。在這一小節(jié)中,我們將討論幾種常用的創(chuàng)建可觀察序列的方法。
1. 基本的 Observable 創(chuàng)建方法
Rx.NET 提供了多種創(chuàng)建可觀察序列的方式。最常用的方式之一是使用 Observable.Create,它允許我們手動定義事件流的行為。
示例:使用 Observable.Create
IObservable<int> observable = Observable.Create<int>(observer =>
{
// 模擬數(shù)據(jù)流
observer.OnNext(1);
observer.OnNext(2);
observer.OnNext(3);
// 完成流
observer.OnCompleted();
// 返回 IDisposable,用于取消訂閱
return Disposable.Empty;
});
在這個例子中,我們創(chuàng)建了一個簡單的 IObservable<int>,它推送了三個整數(shù)并調(diào)用了 OnCompleted 來結(jié)束數(shù)據(jù)流。我們還返回了 Disposable.Empty,表示沒有特定的取消訂閱邏輯。
2. 使用現(xiàn)成的工廠方法
Rx.NET 提供了許多工廠方法來快捷創(chuàng)建常見的可觀察序列。這些方法可以幫助我們快速創(chuàng)建序列,而不需要手動實現(xiàn) IObservable 接口。
2.1 Observable.Return
Observable.Return 用于創(chuàng)建一個只發(fā)出單個值的簡單序列。
IObservable<int> singleValue = Observable.Return(42);
singleValue.Subscribe(value => Console.WriteLine($"Received: {value}"));
這個序列只會發(fā)出一個值 42,然后立即結(jié)束。
2.2 Observable.Range
Observable.Range 用于創(chuàng)建一個發(fā)出整數(shù)序列的可觀察流。
IObservable<int> range = Observable.Range(1, 5);
range.Subscribe(value => Console.WriteLine($"Received: {value}"));
這個序列會發(fā)出從 1 到 5 的整數(shù),并在最后調(diào)用 OnCompleted。
2.3 Observable.Empty
Observable.Empty 創(chuàng)建一個立即完成的空序列。
IObservable<int> empty = Observable.Empty<int>();
empty.Subscribe(
onNext: value => Console.WriteLine($"Received: {value}"),
onCompleted: () => Console.WriteLine("Completed")
);
此序列不會發(fā)出任何值,它只會調(diào)用 OnCompleted。
2.4 Observable.Never
Observable.Never 創(chuàng)建一個永不發(fā)出任何事件的序列。它既不會觸發(fā) OnNext,也不會觸發(fā) OnCompleted 或 OnError。
IObservable<int> never = Observable.Never<int>();
never.Subscribe(
onNext: value => Console.WriteLine($"Received: {value}"),
onCompleted: () => Console.WriteLine("Completed")
);
這個序列永遠(yuǎn)不會結(jié)束,也不會發(fā)出任何事件。
2.5 Observable.Throw
Observable.Throw 創(chuàng)建一個立即發(fā)出錯誤的序列。
IObservable<int> error = Observable.Throw<int>(new Exception("An error occurred"));
error.Subscribe(
onNext: value => Console.WriteLine($"Received: {value}"),
onError: ex => Console.WriteLine($"Error: {ex.Message}")
);
此序列不會發(fā)出任何值,而是直接調(diào)用 OnError 傳遞異常。
3. 異步序列
我們還可以創(chuàng)建異步的可觀察序列,它們可以用于處理異步操作或定時事件。
3.1 Observable.Timer
Observable.Timer 創(chuàng)建一個在指定時間后觸發(fā)的序列,它可以用于延遲事件。
IObservable<long> timer = Observable.Timer(TimeSpan.FromSeconds(2));
timer.Subscribe(value => Console.WriteLine($"Timer fired: {value}"));
在這個例子中,2 秒之后序列會發(fā)出一個值,并調(diào)用 OnCompleted。
3.2 Observable.Interval
Observable.Interval 創(chuàng)建一個定時觸發(fā)的序列,按照指定的時間間隔重復(fù)發(fā)出值。
IObservable<long> interval = Observable.Interval(TimeSpan.FromSeconds(1));
interval.Subscribe(value => Console.WriteLine($"Tick: {value}"));
這個例子中,Observable.Interval 每 1 秒發(fā)出一個值(從 0 開始遞增)。
4. 將現(xiàn)有的數(shù)據(jù)源轉(zhuǎn)換為 Observable
除了手動創(chuàng)建 Observable 之外,Rx.NET 還提供了一些工具,用來將現(xiàn)有的數(shù)據(jù)源(如任務(wù)、事件等)轉(zhuǎn)換為 Observable。
4.1 使用 Task 創(chuàng)建 Observable
Rx.NET 提供了將 Task 轉(zhuǎn)換為 IObservable 的方法。
Task<int> task = Task.FromResult(42);
IObservable<int> taskObservable = task.ToObservable();
taskObservable.Subscribe(value => Console.WriteLine($"Task result: {value}"));
這個例子展示了如何將一個 Task 轉(zhuǎn)換為 Observable,并在任務(wù)完成時發(fā)出結(jié)果。
4.2 使用 FromEventPattern 將事件轉(zhuǎn)換為 Observable
我們可以使用 FromEventPattern 將標(biāo)準(zhǔn)的 .NET 事件轉(zhuǎn)換為 Observable。這個部分將在下一小節(jié)詳細(xì)討論。
5. 總結(jié)
- 創(chuàng)建可觀察序列 是使用 Rx.NET 的第一步。我們可以通過手動創(chuàng)建、使用現(xiàn)成的工廠方法、或?qū)F(xiàn)有的數(shù)據(jù)源轉(zhuǎn)換來創(chuàng)建
IObservable<T>。 - Rx 提供了多種簡單的工廠方法,如
Return、Range、Empty等,幫助我們快速創(chuàng)建各種數(shù)據(jù)流。 - 我們還可以使用
Observable.Timer和Observable.Interval來處理定時事件流。 - 最后,Rx.NET 可以將
Task和事件等異步源輕松轉(zhuǎn)換為 Observable。
在理解了如何創(chuàng)建可觀察序列之后,接下來我們將討論如何將 .NET 事件轉(zhuǎn)換為可觀察序列,在下一小節(jié) 7.4 轉(zhuǎn)換 .NET 事件 中會詳細(xì)介紹。
7.4 轉(zhuǎn)換 .NET 事件
問題背景
在 .NET 中,事件是處理異步操作的常見方式,而在 Reactive Extensions (Rx) 中,我們使用 IObservable<T> 來處理數(shù)據(jù)流。為了讓傳統(tǒng)的 .NET 事件能與 Rx 的響應(yīng)式編程模型兼容,我們需要將事件轉(zhuǎn)換為 IObservable<T>。這個過程可以通過 Observable.FromEvent 或 Observable.FromEventPattern 來實現(xiàn)。
事件轉(zhuǎn)換的核心
FromEvent:適用于不符合標(biāo)準(zhǔn)事件模式的事件。FromEventPattern:適用于標(biāo)準(zhǔn)的 .NET 事件,特別是使用EventHandler<T>的事件。例如,ProgressChanged和Elapsed事件。
示例 1:將事件轉(zhuǎn)換為 Observable
假設(shè)我們有一個按鈕點擊的事件 Click,我們想將它轉(zhuǎn)換成一個 IObservable,并在每次點擊時執(zhí)行對應(yīng)的響應(yīng)動作。
var button = new Button();
// 將 Click 事件轉(zhuǎn)換為 Observable
IObservable<EventPattern<EventArgs>> clicks =
Observable.FromEventPattern<EventHandler, EventArgs>(
handler => button.Click += handler,
handler => button.Click -= handler
);
// 訂閱事件,處理點擊行為
clicks.Subscribe(click => Console.WriteLine("Button clicked!"));
在這個例子中,FromEventPattern 將按鈕的 Click 事件轉(zhuǎn)換為 IObservable<EventPattern<EventArgs>>。每當(dāng)按鈕被點擊時,OnNext 會被觸發(fā),輸出 "Button clicked!"。
示例 2:處理帶有數(shù)據(jù)的事件
假設(shè)我們有一個進度條,每次進度更新時會觸發(fā) ProgressChanged 事件。我們可以使用 FromEventPattern 將該事件轉(zhuǎn)換成 Observable,并在每次進度變化時處理數(shù)據(jù)。
var progress = new Progress<int>();
// 將 ProgressChanged 事件轉(zhuǎn)換為 Observable
IObservable<EventPattern<int>> progressReports =
Observable.FromEventPattern<EventHandler<int>, int>(
handler => progress.ProgressChanged += handler,
handler => progress.ProgressChanged -= handler
);
// 打印每次進度變化的值
progressReports.Subscribe(report => Console.WriteLine("Progress: " + report.EventArgs));
在這個例子中,ProgressChanged 是一個標(biāo)準(zhǔn)的 EventHandler<T> 類型事件,因此我們可以簡單地使用 FromEventPattern 來包裝它。每當(dāng)進度更新時,OnNext 會被觸發(fā),并打印當(dāng)前的進度值。
示例 3:處理自定義事件
如果我們遇到自定義的事件類型,它可能不符合 EventHandler<T> 的標(biāo)準(zhǔn)模式。在這種情況下,我們可以使用 FromEvent。假設(shè)有一個自定義的事件 OnTemperatureChanged,我們可以這樣處理:
public class Thermometer
{
public event Action<double> OnTemperatureChanged;
public void SimulateTemperatureChange(double newTemp)
{
OnTemperatureChanged?.Invoke(newTemp);
}
}
var thermometer = new Thermometer();
// 將自定義事件轉(zhuǎn)換為 Observable
IObservable<double> temperatureChanges =
Observable.FromEvent<double>(
handler => thermometer.OnTemperatureChanged += handler,
handler => thermometer.OnTemperatureChanged -= handler
);
// 訂閱溫度變化事件
temperatureChanges.Subscribe(temp => Console.WriteLine($"Temperature changed to: {temp}°C"));
// 模擬溫度變化
thermometer.SimulateTemperatureChange(23.5);
thermometer.SimulateTemperatureChange(24.0);
在這個例子中,OnTemperatureChanged 是一個自定義的 Action<double> 委托。我們使用 FromEvent 將其轉(zhuǎn)化為 IObservable<double>,并在每次溫度變化時輸出新的溫度值。
異常處理
有些事件可能會在執(zhí)行中拋出異常。例如,WebClient 的 DownloadStringCompleted 事件可能會因為網(wǎng)絡(luò)問題而在 EventArgs 中包含錯誤。Rx 默認(rèn)將這些錯誤視為數(shù)據(jù),而不是異常。這時,我們需要手動處理這些錯誤。
var client = new WebClient();
// 將 DownloadStringCompleted 事件轉(zhuǎn)換為 Observable
IObservable<EventPattern<DownloadStringCompletedEventArgs>> downloadedStrings =
Observable.FromEventPattern<DownloadStringCompletedEventArgs>(
handler => client.DownloadStringCompleted += handler,
handler => client.DownloadStringCompleted -= handler
);
// 處理下載結(jié)果或錯誤
downloadedStrings.Subscribe(
data =>
{
if (data.EventArgs.Error != null)
Console.WriteLine("Download failed: " + data.EventArgs.Error.Message);
else
Console.WriteLine("Downloaded: " + data.EventArgs.Result);
}
);
// 發(fā)起異步下載
client.DownloadStringAsync(new Uri("http://example.com"));
在這個例子中,DownloadStringCompletedEventArgs 包含了下載結(jié)果或錯誤信息。我們通過檢查 eventArgs.Error 來判斷是否發(fā)生了錯誤,并在控制臺中輸出相應(yīng)的信息。
線程上下文問題
在某些情況下,事件的訂閱和取消訂閱必須在特定的上下文中執(zhí)行。例如,UI 事件必須在 UI 線程上訂閱。System.Reactive 提供了 SubscribeOn 操作符來控制訂閱的線程上下文:
IObservable<EventPattern<EventArgs>> clicks =
Observable.FromEventPattern<EventHandler, EventArgs>(
handler => button.Click += handler,
handler => button.Click -= handler
);
// 使用 SubscribeOn 指定事件處理需要在 UI 線程上執(zhí)行
clicks
.SubscribeOn(Scheduler.CurrentThread)
.Subscribe(click => Console.WriteLine("Button clicked on UI thread"));
在這個例子中,我們使用 SubscribeOn 來確保事件的訂閱在 UI 線程上執(zhí)行。
總結(jié)
FromEventPattern適用于標(biāo)準(zhǔn)的EventHandler<T>事件。FromEvent適用于自定義的或不符合標(biāo)準(zhǔn)的事件。- 當(dāng)事件被轉(zhuǎn)換為
IObservable<T>之后,Rx 的各種操作符(如過濾、轉(zhuǎn)換、合并等)就可以輕松應(yīng)用到事件流上,幫助我們簡化異步編程。
7.5 向上下文發(fā)送通知
問題背景
在 Rx.NET 中,事件通知(如 OnNext)可以從任何線程發(fā)出,特別是當(dāng)你使用像 Observable.Interval 這類基于定時器的操作符時,通知可能來自不同的線程池線程。這種行為通常對后臺處理沒有問題,但在某些場景下,尤其是涉及 UI 的場景時,線程問題就變得至關(guān)重要。例如,許多 UI 框架要求 UI 更新必須在主線程(UI 線程)上進行。如果通知來自后臺線程而你試圖更新 UI 元素,就會拋出異常。因此,我們需要確保所有相關(guān)的通知能在正確的上下文中處理。
解決方案:使用 ObserveOn
ObserveOn 是 Rx.NET 提供的一個運算符,它可以將事件通知(如 OnNext、OnCompleted、OnError)切換到指定的調(diào)度器或線程上下文中。通過 ObserveOn,我們可以將通知從后臺線程切換到 UI 線程,確保 UI 更新在正確的線程中進行。
注意:
ObserveOn控制的是可觀察通知的執(zhí)行上下文。不要將它與SubscribeOn混淆,后者控制的是訂閱(即添加/移除事件處理程序)的代碼所在的上下文。簡而言之:
ObserveOn:決定通知在哪個上下文或線程上發(fā)出。SubscribeOn:決定訂閱邏輯在哪個上下文或線程上執(zhí)行。
示例 1:切換到 UI 線程
假設(shè)我們有一個按鈕點擊事件,每次點擊后我們啟動一個 Observable.Interval,每秒發(fā)出一次 OnNext 通知。由于 Interval 默認(rèn)使用線程池線程,我們需要將這些通知切換到 UI 線程來處理,確保 UI 更新在正確的線程上進行。
private void Button_Click(object sender, RoutedEventArgs e)
{
// 獲取當(dāng)前的 UI 線程上下文
SynchronizationContext uiContext = SynchronizationContext.Current;
Trace.WriteLine($"UI thread is {Environment.CurrentManagedThreadId}");
// 創(chuàng)建基于時間間隔的 Observable
Observable.Interval(TimeSpan.FromSeconds(1))
// 切換到 UI 線程上下文處理通知
.ObserveOn(uiContext)
.Subscribe(x => Trace.WriteLine(
$"Interval {x} on thread {Environment.CurrentManagedThreadId}"));
}
輸出示例:
UI thread is 9
Interval 0 on thread 9
Interval 1 on thread 9
Interval 2 on thread 9
在這個例子中,Observable.Interval 每秒發(fā)出一個值。由于我們使用了 ObserveOn 運算符,并傳遞了當(dāng)前的 SynchronizationContext,所有通知都會在 UI 線程(線程 9)上執(zhí)行。即使 Interval 默認(rèn)使用后臺線程,ObserveOn 也確保了通知會切換到 UI 線程處理。
示例 2:從 UI 線程切換到后臺線程處理復(fù)雜計算
在某些情況下,你可能希望從 UI 線程切換到后臺線程來處理一些耗時的計算任務(wù)。例如,當(dāng)鼠標(biāo)移動時,我們可能需要進行 CPU 密集型的計算。我們可以使用 ObserveOn 將計算任務(wù)移到后臺線程上進行處理,避免阻塞 UI 線程,然后再將結(jié)果切換回 UI 線程顯示。
private void SetupMouseMoveProcessing()
{
SynchronizationContext uiContext = SynchronizationContext.Current;
Trace.WriteLine($"UI thread is {Environment.CurrentManagedThreadId}");
Observable.FromEventPattern<MouseEventHandler, MouseEventArgs>(
handler => (s, a) => handler(s, a),
handler => this.MouseMove += handler,
handler => this.MouseMove -= handler)
.Select(evt => evt.EventArgs.GetPosition(this))
// 切換到后臺線程進行計算
.ObserveOn(Scheduler.Default)
.Select(position =>
{
// 模擬復(fù)雜計算
Thread.Sleep(100); // 假設(shè)計算需要花費一些時間
var result = position.X + position.Y;
var thread = Environment.CurrentManagedThreadId;
Trace.WriteLine($"Calculated result {result} on thread {thread}");
return result;
})
// 將結(jié)果切換回 UI 線程
.ObserveOn(uiContext)
.Subscribe(result => Trace.WriteLine(
$"Result {result} on thread {Environment.CurrentManagedThreadId}"));
}
過程分析:
- 我們從
MouseMove事件創(chuàng)建了一個Observable,每次鼠標(biāo)移動時都將捕獲鼠標(biāo)位置。 - 使用
ObserveOn(Scheduler.Default)將事件流切換到后臺線程,進行耗時的計算(模擬了 100 毫秒的延遲)。 - 計算完成后,使用
ObserveOn(uiContext)將結(jié)果切換回 UI 線程,以便安全地更新 UI 或其他需要在 UI 線程執(zhí)行的操作。
輸出示例:
UI thread is 9
Calculated result 150 on thread 10
Result 150 on thread 9
Calculated result 200 on thread 10
Result 200 on thread 9
解釋:
- 鼠標(biāo)移動事件最初在 UI 線程上觸發(fā),
Observable.FromEventPattern捕獲這些事件。 - 計算任務(wù)被切換到后臺線程(線程 10),避免阻塞 UI 線程。
- 計算完成后,結(jié)果被切換回 UI 線程(線程 9)進行處理。
延遲與隊列問題
在這個例子中,由于鼠標(biāo)移動頻率比計算速度快(每次計算需要 100 毫秒),計算和結(jié)果會出現(xiàn)延遲,因為事件會排隊等待處理。這意味著鼠標(biāo)移動事件在后臺線程中會被排隊處理,而不是實時計算最新的鼠標(biāo)位置。
為了解決這種延遲問題,Rx.NET 提供了很多運算符,比如節(jié)流(Throttle),來減少事件的頻率。你可以在高頻繁的事件流中使用這些運算符來減輕負(fù)載。
總結(jié)
- Rx.NET 默認(rèn)不區(qū)分線程:事件通知可以來自任何線程,特別是像
Observable.Interval等操作符使用的線程池線程。 ObserveOn運算符:允許我們將事件流切換到指定的線程或調(diào)度器上。對于 UI 操作,通常需要從后臺線程切換回 UI 線程。SubscribeOn運算符:與ObserveOn不同,SubscribeOn決定的是訂閱邏輯(即添加和移除事件處理程序)在哪個線程上執(zhí)行。- 處理復(fù)雜計算的場景:我們可以使用
ObserveOn(Scheduler.Default)將計算任務(wù)移到后臺線程,避免阻塞 UI 線程,然后通過ObserveOn切換回 UI 線程處理結(jié)果。
通過 ObserveOn,我們可以在不同的線程或上下文間靈活切換,確保所有操作都在合適的線程中完成。
7.6 使用窗口和緩沖來分組事件數(shù)據(jù)
問題背景
在處理事件流時,經(jīng)常會遇到這樣一種需求:我們需要對事件進行分組處理。比如,你需要每兩個事件成對處理,或者在特定的時間窗口內(nèi)處理收到的所有事件。為了解決這些問題,Rx.NET 提供了兩個強大的運算符:Buffer 和 Window。
- Buffer(緩沖):收集一組事件,并在該組完成后,將這些事件作為一個集合發(fā)出。
- Window(窗口):按邏輯分組事件,但在事件到達(dá)時就直接傳遞出去。
Window會返回一個IObservable<IObservable<T>>,即“事件流的事件流”。
解決方案:Buffer 和 Window 運算符
Buffer 和 Window 可以根據(jù)事件的數(shù)量或時間來對事件進行分組。下面,我們將使用一些具體的示例來說明它們的工作方式。
示例 1:使用 Buffer 按數(shù)量分組事件
Buffer 會累積一定數(shù)量的事件,當(dāng)達(dá)到指定的數(shù)量后,將這些事件作為一個集合發(fā)出。
例如,使用 Observable.Interval 每秒發(fā)出一個 OnNext 通知,我們可以使用 Buffer(2) 每次將兩個事件組成一個集合。
Observable.Interval(TimeSpan.FromSeconds(1))
.Buffer(2) // 每兩個事件分組
.Subscribe(bufferedItems =>
{
Trace.WriteLine($"{DateTime.Now.Second}: Got {bufferedItems[0]} and {bufferedItems[1]}");
});
輸出示例:
13: Got 0 and 1
15: Got 2 and 3
17: Got 4 and 5
19: Got 6 and 7
21: Got 8 and 9
在這個例子中,Buffer(2) 將每次收到的兩個事件組成一個 IList<T> 集合,并一起發(fā)出。每秒一個事件,因此每兩秒我們可以看到一對事件被處理。
示例 2:使用 Window 按數(shù)量分組事件
Window 的工作方式與 Buffer 類似,但它不會等待所有事件都到達(dá)后再發(fā)出集合,而是立即發(fā)出一個 IObservable<T>(即一個新的“事件流”),并且在這個新的事件流中逐一發(fā)出事件。
下面是一個類似的示例,使用 Window(2) 每次分組兩個事件:
Observable.Interval(TimeSpan.FromSeconds(1))
.Window(2) // 每兩個事件分組
.Subscribe(window =>
{
Trace.WriteLine($"{DateTime.Now.Second}: Starting new group");
window.Subscribe(
x => Trace.WriteLine($"{DateTime.Now.Second}: Saw {x}"),
() => Trace.WriteLine($"{DateTime.Now.Second}: Ending group"));
});
輸出示例:
17: Starting new group
18: Saw 0
19: Saw 1
19: Ending group
19: Starting new group
20: Saw 2
21: Saw 3
21: Ending group
21: Starting new group
22: Saw 4
23: Saw 5
23: Ending group
在這個例子中,Window(2) 每兩個事件分配一個新的窗口(即 IObservable<T>),并在該窗口內(nèi)逐個發(fā)出事件。當(dāng)窗口的事件接收完畢后,窗口會觸發(fā) OnCompleted,并結(jié)束當(dāng)前的分組。
示例 3:使用 Buffer 按時間分組事件
除了按事件數(shù)量分組,Buffer 也可以按照時間窗口來分組。比如,我們可以在每 1 秒內(nèi)收集所有事件,并將它們作為一個集合發(fā)出。這在處理高頻率的事件流時非常有用,比如鼠標(biāo)移動事件。
private void Button_Click(object sender, RoutedEventArgs e)
{
Observable.FromEventPattern<MouseEventHandler, MouseEventArgs>(
handler => (s, a) => handler(s, a),
handler => this.MouseMove += handler,
handler => this.MouseMove -= handler)
.Buffer(TimeSpan.FromSeconds(1)) // 每1秒緩沖一次
.Subscribe(events =>
{
Trace.WriteLine($"{DateTime.Now.Second}: Saw {events.Count} items.");
});
}
輸出示例:
10: Saw 5 items.
11: Saw 3 items.
12: Saw 7 items.
在這個例子中,Buffer(TimeSpan.FromSeconds(1)) 會每隔一秒收集該秒內(nèi)的所有鼠標(biāo)移動事件,并將這些事件作為一個集合發(fā)出。輸出的事件數(shù)量取決于用戶在該秒內(nèi)移動鼠標(biāo)的頻率。
示例 4:使用 Window 按時間分組事件
類似地,Window 也可以按照時間窗口來分組,但與 Buffer 不同,它會立即發(fā)出窗口(即 IObservable<T>),并在該窗口內(nèi)逐個發(fā)出事件。
private void Button_Click(object sender, RoutedEventArgs e)
{
Observable.FromEventPattern<MouseEventHandler, MouseEventArgs>(
handler => (s, a) => handler(s, a),
handler => this.MouseMove += handler,
handler => this.MouseMove -= handler)
.Window(TimeSpan.FromSeconds(1)) // 每1秒創(chuàng)建一個新窗口
.Subscribe(window =>
{
Trace.WriteLine($"{DateTime.Now.Second}: New window started");
window.Subscribe(
evt => Trace.WriteLine($"{DateTime.Now.Second}: Mouse moved"),
() => Trace.WriteLine($"{DateTime.Now.Second}: Window closed"));
});
}
輸出示例:
10: New window started
10: Mouse moved
10: Mouse moved
11: Window closed
11: New window started
11: Mouse moved
12: Window closed
在這個例子中,Window(TimeSpan.FromSeconds(1)) 每秒開啟一個新的窗口,每次鼠標(biāo)移動時,事件會被立即發(fā)出,同時每秒的窗口結(jié)束時會觸發(fā) OnCompleted,關(guān)閉當(dāng)前窗口。
Buffer 和 Window 的區(qū)別
-
Buffer:將事件收集到一個集合中,直到分組條件滿足(如達(dá)到指定數(shù)量或時間窗口結(jié)束),然后一次性發(fā)出整個集合。返回類型是IObservable<IList<T>>,即事件集合的可觀察流。 -
Window:按分組條件(如數(shù)量或時間)創(chuàng)建一個新的窗口(即IObservable<T>),并在事件到達(dá)時立即發(fā)出。返回類型是IObservable<IObservable<T>>,即“事件流的事件流”。
總結(jié)
Buffer和Window是 Rx.NET 中常用的運算符,用于對事件流進行分組處理。Buffer會等待事件組完成后再發(fā)出整個集合,而Window會立即發(fā)出新的窗口并在其中逐個發(fā)出事件。- 這兩個運算符都支持按事件數(shù)量或時間分組,適用于不同的場景。
通過使用 Buffer 和 Window,我們可以更高效地處理批量事件或時間敏感的事件流,尤其是在需要對事件進行分組、批處理或窗口化時。
7.7 超時
問題背景
在某些情況下,你可能希望事件在一定的時間內(nèi)到達(dá)。如果事件未能在規(guī)定的時間內(nèi)到達(dá),程序仍需要能夠及時響應(yīng)。這種需求在處理異步操作時非常常見,比如等待來自 Web 服務(wù)的響應(yīng)。如果響應(yīng)過慢,程序應(yīng)該超時并采取相應(yīng)的措施,而不是無限期地等待。
解決方案:Timeout 運算符
Timeout 運算符為事件流創(chuàng)建了一個滑動的超時窗口。每當(dāng)有新事件到來時,超時窗口會被重置。如果超時窗口內(nèi)沒有收到新事件,則超時窗口過期,并且 Timeout 運算符會通過 OnError 通知,發(fā)出一個包含 TimeoutException 的終止信號。
示例 1:對 Web 請求應(yīng)用超時
以下示例向一個示例域名發(fā)起 Web 請求,并為該請求設(shè)置了 1 秒的超時時間。如果超過 1 秒還沒有得到響應(yīng),則會拋出 TimeoutException。
void GetWithTimeout(HttpClient client)
{
client.GetStringAsync("http://exampleurl").ToObservable()
.Timeout(TimeSpan.FromSeconds(1)) // 設(shè)置1秒超時
.Subscribe(
x => Trace.WriteLine($"{DateTime.Now.Second}: Saw {x.Length}"),
ex => Trace.WriteLine(ex)); // 當(dāng)超時發(fā)生時,輸出異常信息
}
在這個例子中,如果 Web 請求在 1 秒內(nèi)沒有完成,Timeout 運算符會自動終止流,并通過 OnError 發(fā)出 TimeoutException。這使得程序可以對超時情況做出響應(yīng),而不是無限期等待。
Timeout 的常見應(yīng)用場景
- Web 請求:在等待 Web 服務(wù)響應(yīng)時,防止請求長時間掛起。
- 異步任務(wù):限制異步操作的執(zhí)行時間,確保程序能夠及時超時并采取相應(yīng)行動。
示例 2:為鼠標(biāo)移動事件設(shè)置超時
Timeout 可以應(yīng)用于任何事件流,除了異步操作,它也可以用于用戶輸入、傳感器數(shù)據(jù)等事件流。以下示例為鼠標(biāo)移動事件設(shè)置了 1 秒的超時時間。如果 1 秒內(nèi)沒有收到鼠標(biāo)移動事件,程序會拋出 TimeoutException。
private void Button_Click(object sender, RoutedEventArgs e)
{
Observable.FromEventPattern<MouseEventHandler, MouseEventArgs>(
handler => (s, a) => handler(s, a),
handler => MouseMove += handler,
handler => MouseMove -= handler)
.Select(x => x.EventArgs.GetPosition(this))
.Timeout(TimeSpan.FromSeconds(1)) // 設(shè)置1秒超時
.Subscribe(
x => Trace.WriteLine($"{DateTime.Now.Second}: Saw {x.X + x.Y}"),
ex => Trace.WriteLine(ex)); // 超時后輸出異常信息
}
輸出示例:
16: Saw 180
16: Saw 178
16: Saw 177
16: Saw 176
System.TimeoutException: The operation has timed out.
在這個例子中,鼠標(biāo)移動了幾次后停止,1 秒內(nèi)沒有新的鼠標(biāo)移動事件,因此 Timeout 運算符觸發(fā)了 TimeoutException,并終止了事件流。
使用 Timeout 的重載方法
有時,你可能不希望在超時發(fā)生時立即終止事件流。Timeout 運算符提供了一個重載版本,允許你在超時發(fā)生時切換到另一個事件流,而不是通過異常終止當(dāng)前流。
示例 3:在超時后切換到鼠標(biāo)點擊事件流
以下示例在超時之前監(jiān)聽鼠標(biāo)移動事件。如果超時發(fā)生后,還沒有新的鼠標(biāo)移動事件,則切換到監(jiān)聽鼠標(biāo)點擊事件:
private void Button_Click(object sender, RoutedEventArgs e)
{
// 鼠標(biāo)點擊事件流
IObservable<Point> clicks =
Observable.FromEventPattern<MouseButtonEventHandler, MouseButtonEventArgs>(
handler => (s, a) => handler(s, a),
handler => MouseDown += handler,
handler => MouseDown -= handler)
.Select(x => x.EventArgs.GetPosition(this));
// 鼠標(biāo)移動事件流,超時后切換到點擊事件流
Observable.FromEventPattern<MouseEventHandler, MouseEventArgs>(
handler => (s, a) => handler(s, a),
handler => MouseMove += handler,
handler => MouseMove -= handler)
.Select(x => x.EventArgs.GetPosition(this))
.Timeout(TimeSpan.FromSeconds(1), clicks) // 超時后切換到 clicks 流
.Subscribe(
x => Trace.WriteLine($"{DateTime.Now.Second}: Saw {x.X},{x.Y}"),
ex => Trace.WriteLine(ex)); // 輸出異常信息
}
輸出示例:
49: Saw 95,39
49: Saw 94,39
49: Saw 94,38
49: Saw 94,37
53: Saw 130,141
55: Saw 469,4
在這個例子中,程序開始時監(jiān)聽鼠標(biāo)移動事件。當(dāng)鼠標(biāo)靜止超過 1 秒后,程序切換到監(jiān)聽鼠標(biāo)點擊事件。鼠標(biāo)移動事件流超時后,程序捕獲了兩次鼠標(biāo)點擊事件。
總結(jié) Timeout 的使用方式
- 默認(rèn)行為:當(dāng)事件流中沒有在指定的時間內(nèi)收到事件,
Timeout運算符會發(fā)出TimeoutException,并通過OnError終止流。 - 可選行為:通過
Timeout的重載方法,可以在超時發(fā)生時切換到另一個事件流,而不是終止當(dāng)前流。
討論
對于一些關(guān)鍵應(yīng)用來說,Timeout 是一個必不可少的運算符,因為它確保了應(yīng)用程序在任何情況下都能及時響應(yīng)。對于異步操作,尤其是 Web 請求,Timeout 可以防止系統(tǒng)長時間等待,進而導(dǎo)致資源浪費。而在用戶輸入流中,Timeout 也可以用于處理用戶長時間不活動的情況。
需要注意的是,使用 Timeout 并不會取消底層的操作(例如 HTTP 請求)。當(dāng) Timeout 觸發(fā)時,底層操作仍然會繼續(xù)執(zhí)行,直到成功或失敗。這意味著,程序可能會啟動一些不再關(guān)心的異步操作,開發(fā)者需要考慮如何處理這些操作的結(jié)果。
總結(jié)
Timeout運算符:用于確保事件流能夠在指定時間內(nèi)響應(yīng)。如果事件流在超時窗口內(nèi)沒有事件到達(dá),Timeout會觸發(fā)TimeoutException,終止流。- 適用場景:
Timeout常用于異步操作(如 Web 請求)和用戶輸入流,確保程序不會長時間等待。 - 重載行為:
Timeout可以在超時發(fā)生后,切換到另一個事件流,而不是直接拋出異常終止流。

浙公網(wǎng)安備 33010602011771號