<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      第七章:C#響應(yīng)式編程System.Reactive

      第七章:C#響應(yīng)式編程System.Reactive

      目錄


      響應(yīng)式編程(Reactive Programming)是一種通過響應(yīng)事件來編寫代碼的編程方式,特別適合處理用戶界面或任何需要應(yīng)對外部事件的程序。它避免了輪詢的開銷,允許系統(tǒng)在事件發(fā)生時自動觸發(fā)代碼。

      更多詳細(xì)內(nèi)容參考:Introduction to Rx.NET

      7.1 為什么選擇響應(yīng)式編程?

      在現(xiàn)代應(yīng)用程序中,異步操作事件驅(qū)動的需求變得越來越普遍。無論是響應(yīng)用戶界面的交互、處理實時數(shù)據(jù)流,還是處理 I/O 操作,程序都需要能夠高效地處理隨時可能發(fā)生的事件。傳統(tǒng)的編程方式有時顯得笨重、復(fù)雜,難以處理這些情況。而 Rx.NET(Reactive Extensions for .NET) 提供了一種聲明式、簡潔的方式來處理異步事件流

      1. 事件流的重要性

      事件流(streams of events)在許多應(yīng)用中是核心部分,例如:

      • 用戶交互:按鈕點擊、輸入變化等。
      • 實時數(shù)據(jù):股票行情、傳感器數(shù)據(jù)等。
      • 異步操作:網(wǎng)絡(luò)請求、文件讀寫等。

      傳統(tǒng)上,這些操作通常通過回調(diào)事件處理異步編程模式來實現(xiàn),但這些方式容易導(dǎo)致代碼復(fù)雜、難以維護。

      2. Rx.NET 的優(yōu)勢

      Rx.NET 通過提供一個統(tǒng)一的方式來處理事件流,解決了傳統(tǒng)方法的許多問題。它的優(yōu)點包括:

      • 聲明式的事件流處理:通過 Rx.NET,開發(fā)者可以像操作集合(如數(shù)組、列表)一樣處理事件流。你可以用熟悉的 LINQ 風(fēng)格的方法來過濾、轉(zhuǎn)換和組合事件。

      • 簡化異步編程:Rx.NET 內(nèi)置了對異步事件的處理,避免了復(fù)雜的回調(diào)地獄。它提供了強大的工具來處理并發(fā)和異步任務(wù)。

      • 處理復(fù)雜的事件交互:Rx.NET 可以輕松處理多個事件源的組合、合并和分割。例如,你可以將兩個不同的事件流合并,然后根據(jù)需要對它們進行處理。

      • 更好的錯誤處理:Rx.NET 通過其流式操作,提供了一個一致的錯誤處理機制,避免了散亂的 try-catch 塊。

      • 可測試性:Rx.NET 提供了內(nèi)置的工具,可以模擬事件流,幫助開發(fā)者輕松測試異步操作。

      3. Rx.NET 的適用場景

      Rx.NET 適用于處理異步事件流的場景,包括但不限于:

      • 用戶界面編程:響應(yīng)用戶輸入,處理復(fù)雜的界面交互。
      • 實時數(shù)據(jù)處理:處理實時的市場數(shù)據(jù)、傳感器數(shù)據(jù)等。
      • 并發(fā)處理:協(xié)調(diào)多個異步操作,優(yōu)化系統(tǒng)性能。
      • 響應(yīng)式系統(tǒng):例如聊天應(yīng)用、社交媒體更新、流媒體播放等。

      4. Rx.NET 的核心思想

      Rx.NET 的核心思想是將一切看作一個流。不管是鍵盤按鍵、鼠標(biāo)點擊,還是復(fù)雜的異步操作,Rx.NET 都將其抽象為 IObservable<T>,允許開發(fā)者通過 IObserver<T> 訂閱并響應(yīng)這些事件。

      這種序列化思維(thinking in sequences)是 Rx.NET 最強大的概念之一。它讓開發(fā)者可以將傳統(tǒng)的順序操作模式,轉(zhuǎn)變?yōu)閷κ录牧魇讲僮鳎喕藦?fù)雜業(yè)務(wù)邏輯的表達(dá)。

      小結(jié)

      Rx.NET 通過其聲明式的事件流處理方式,簡化了異步編程,尤其適合需要應(yīng)對異步事件并發(fā)操作的場景。它不僅減少了代碼的復(fù)雜度,還提供了強大的工具來處理事件流的組合、轉(zhuǎn)換和錯誤處理。通過 Rx.NET,你可以更輕松地構(gòu)建高效、可擴展的響應(yīng)式系統(tǒng)。


      7.2 主要概念和類型

      在深入了解如何將 .NET 事件轉(zhuǎn)換為可觀察的流之前,我們需要先掌握一些 Reactive Extensions (Rx) 的核心概念和類型。理解這些基礎(chǔ)知識將幫助我們在后續(xù)章節(jié)中更輕松地使用 Rx.NET 處理事件流和異步數(shù)據(jù)流。

      若要使用System.Reactive,需要在應(yīng)用程序中安裝用于System.Reactive的NuGet包。

      Rx.NET和System.Reactive是同一個東西,只是不同的叫法而已。只不過Rx.NET更多是在上下文中用作簡稱,而System.Reactive則是具體的包名。

      1. IObservable<T>

      IObservable<T> 是 Rx 的核心接口,它代表一個推送(push)數(shù)據(jù)流的源。它與傳統(tǒng)的 IEnumerable<T> 不同,IEnumerable<T>拉取(pull)數(shù)據(jù)的方式,而 IObservable<T>推送數(shù)據(jù)的方式。IObservable<T> 可以用來表示一個事件流、異步操作、或其他隨時間變化的數(shù)據(jù)源。

      public interface IObservable<out T>
      {
          IDisposable Subscribe(IObserver<T> observer);
      }
      

      如何使用:如果你有一個 IObservable<T>,你可以通過調(diào)用 Subscribe 方法來訂閱它,從而接收數(shù)據(jù)流中的事件。Subscribe 方法返回一個 IDisposable,允許你取消訂閱以停止接收數(shù)據(jù)。

      2. IObserver<T>

      IObserver<T> 是與 IObservable<T> 相對應(yīng)的接口,它定義了如何處理來自 IObservable<T> 的事件流。IObserver<T> 有三個方法:

      • OnNext(T value):當(dāng)有新數(shù)據(jù)推送時調(diào)用。
      • OnError(Exception error):當(dāng)發(fā)生錯誤時調(diào)用。
      • OnCompleted():當(dāng)數(shù)據(jù)流完成時調(diào)用。
      public interface IObserver<in T>
      {
          void OnNext(T value);
          void OnError(Exception error);
          void OnCompleted();
      }
      

      如何使用:當(dāng)你訂閱一個 IObservable<T> 時,你通常會提供一個 IObserver<T>,該 IObserver<T> 定義了如何處理每一個數(shù)據(jù)事件、錯誤以及完成通知。

      3. Push vs Pull(推 vs 拉)

      • Pull 模式:在傳統(tǒng)的 IEnumerable<T> 中,消費者通過 foreach 從集合中拉取數(shù)據(jù),數(shù)據(jù)的產(chǎn)生是由消費者控制的。

      • Push 模式:在 IObservable<T> 中,數(shù)據(jù)是主動推送給消費者的,消費者只需要訂閱,不需要主動請求數(shù)據(jù)。數(shù)據(jù)的產(chǎn)生是由 IObservable<T> 控制的,消費者被動接收。

      4. 熱(Hot)和冷(Cold)Observable

      IObservable<T> 可以分為兩種類型:

      • 冷 Observable:數(shù)據(jù)流在訂閱時才開始產(chǎn)生事件。每個訂閱者會從頭開始接收數(shù)據(jù)。例如,讀取文件或從集合中推送數(shù)據(jù)。

      • 熱 Observable:數(shù)據(jù)流在創(chuàng)建時就開始產(chǎn)生事件。訂閱者只能接收到在它訂閱之后產(chǎn)生的事件,訂閱之前發(fā)生的事件將無法捕獲。例如,鼠標(biāo)點擊事件、傳感器數(shù)據(jù)等實時事件。

      5. 訂閱與取消訂閱

      當(dāng)我們訂閱一個 IObservable<T> 時,實際上是注冊了一個 IObserver<T>,以便接收事件。這個訂閱可以通過 IDisposable 接口來取消,以終止繼續(xù)接收事件。

      var subscription = observable.Subscribe(
          onNext: value => Console.WriteLine($"Received: {value}"),
          onError: error => Console.WriteLine($"Error: {error.Message}"),
          onCompleted: () => Console.WriteLine("Completed")
      );
      
      // 取消訂閱
      subscription.Dispose();
      

      6. 常用操作符

      Rx.NET 提供了大量操作符,用于轉(zhuǎn)換、過濾和組合事件流。這些操作符類似于 LINQ,允許我們以聲明式的方式處理數(shù)據(jù)流。常用的操作符包括:

      • Select:類似于 LINQ 的 Select 操作符,用于映射數(shù)據(jù)流中的每個元素。
      • Where:用于過濾數(shù)據(jù)流,只保留符合條件的元素。
      • Merge:合并多個 IObservable<T> 數(shù)據(jù)流。
      • Throttle:對事件流進行節(jié)流,忽略短時間內(nèi)重復(fù)的事件。
      observable
          .Where(value => value > 10)
          .Select(value => value * 2)
          .Subscribe(value => Console.WriteLine($"Processed value: {value}"));
      

      7. 調(diào)度器(Schedulers)

      Rx.NET 中的調(diào)度器用于控制代碼在特定線程或上下文中執(zhí)行。常見的調(diào)度器有:

      • Scheduler.CurrentThread:在當(dāng)前線程上執(zhí)行。
      • Scheduler.NewThread:在新線程上執(zhí)行。
      • Scheduler.Default:在線程池中執(zhí)行。
      • DispatcherScheduler:用于 WPF 或 WinForms 應(yīng)用程序的 UI 線程調(diào)度。

      你可以使用調(diào)度器來控制數(shù)據(jù)流的訂閱和觀察行為:

      observable
          .ObserveOn(Scheduler.NewThread)  // 在新線程上觀察數(shù)據(jù)
          .SubscribeOn(Scheduler.CurrentThread)  // 在當(dāng)前線程上訂閱
          .Subscribe(value => Console.WriteLine($"Received on new thread: {value}"));
      

      8. 總結(jié)

      在 Rx.NET 中,IObservable<T>IObserver<T> 是最基本的構(gòu)建塊。IObservable<T> 用于表示事件流,而 IObserver<T> 則用于處理這些事件。通過訂閱一個 IObservable<T>,我們可以獲得事件的推送,并使用各種 LINQ 風(fēng)格的操作符來對事件流進行處理。理解這些基礎(chǔ)概念有助于我們更好地在后續(xù)章節(jié)中處理 .NET 事件并將其轉(zhuǎn)換為響應(yīng)式的 IObservable<T> 數(shù)據(jù)流。


      7.3 創(chuàng)建可觀察序列

      在 Rx.NET 中,可觀察序列(Observable Sequence) 是事件流的核心。一個可觀察序列可以是任何類型的異步數(shù)據(jù)源,例如按鈕點擊、傳感器數(shù)據(jù)、或者網(wǎng)絡(luò)請求的結(jié)果。在這一小節(jié)中,我們將討論幾種常用的創(chuàng)建可觀察序列的方法。

      1. 基本的 Observable 創(chuàng)建方法

      Rx.NET 提供了多種創(chuàng)建可觀察序列的方式。最常用的方式之一是使用 Observable.Create,它允許我們手動定義事件流的行為。

      示例:使用 Observable.Create

      IObservable<int> observable = Observable.Create<int>(observer =>
      {
          // 模擬數(shù)據(jù)流
          observer.OnNext(1);
          observer.OnNext(2);
          observer.OnNext(3);
          
          // 完成流
          observer.OnCompleted();
      
          // 返回 IDisposable,用于取消訂閱
          return Disposable.Empty;
      });
      

      在這個例子中,我們創(chuàng)建了一個簡單的 IObservable<int>,它推送了三個整數(shù)并調(diào)用了 OnCompleted 來結(jié)束數(shù)據(jù)流。我們還返回了 Disposable.Empty,表示沒有特定的取消訂閱邏輯。

      2. 使用現(xiàn)成的工廠方法

      Rx.NET 提供了許多工廠方法來快捷創(chuàng)建常見的可觀察序列。這些方法可以幫助我們快速創(chuàng)建序列,而不需要手動實現(xiàn) IObservable 接口。

      2.1 Observable.Return

      Observable.Return 用于創(chuàng)建一個只發(fā)出單個值的簡單序列。

      IObservable<int> singleValue = Observable.Return(42);
      singleValue.Subscribe(value => Console.WriteLine($"Received: {value}"));
      

      這個序列只會發(fā)出一個值 42,然后立即結(jié)束。

      2.2 Observable.Range

      Observable.Range 用于創(chuàng)建一個發(fā)出整數(shù)序列的可觀察流。

      IObservable<int> range = Observable.Range(1, 5);
      range.Subscribe(value => Console.WriteLine($"Received: {value}"));
      

      這個序列會發(fā)出從 15 的整數(shù),并在最后調(diào)用 OnCompleted

      2.3 Observable.Empty

      Observable.Empty 創(chuàng)建一個立即完成的空序列。

      IObservable<int> empty = Observable.Empty<int>();
      empty.Subscribe(
          onNext: value => Console.WriteLine($"Received: {value}"),
          onCompleted: () => Console.WriteLine("Completed")
      );
      

      此序列不會發(fā)出任何值,它只會調(diào)用 OnCompleted

      2.4 Observable.Never

      Observable.Never 創(chuàng)建一個永不發(fā)出任何事件的序列。它既不會觸發(fā) OnNext,也不會觸發(fā) OnCompletedOnError

      IObservable<int> never = Observable.Never<int>();
      never.Subscribe(
          onNext: value => Console.WriteLine($"Received: {value}"),
          onCompleted: () => Console.WriteLine("Completed")
      );
      

      這個序列永遠(yuǎn)不會結(jié)束,也不會發(fā)出任何事件。

      2.5 Observable.Throw

      Observable.Throw 創(chuàng)建一個立即發(fā)出錯誤的序列。

      IObservable<int> error = Observable.Throw<int>(new Exception("An error occurred"));
      error.Subscribe(
          onNext: value => Console.WriteLine($"Received: {value}"),
          onError: ex => Console.WriteLine($"Error: {ex.Message}")
      );
      

      此序列不會發(fā)出任何值,而是直接調(diào)用 OnError 傳遞異常。

      3. 異步序列

      我們還可以創(chuàng)建異步的可觀察序列,它們可以用于處理異步操作或定時事件。

      3.1 Observable.Timer

      Observable.Timer 創(chuàng)建一個在指定時間后觸發(fā)的序列,它可以用于延遲事件。

      IObservable<long> timer = Observable.Timer(TimeSpan.FromSeconds(2));
      timer.Subscribe(value => Console.WriteLine($"Timer fired: {value}"));
      

      在這個例子中,2 秒之后序列會發(fā)出一個值,并調(diào)用 OnCompleted

      3.2 Observable.Interval

      Observable.Interval 創(chuàng)建一個定時觸發(fā)的序列,按照指定的時間間隔重復(fù)發(fā)出值。

      IObservable<long> interval = Observable.Interval(TimeSpan.FromSeconds(1));
      interval.Subscribe(value => Console.WriteLine($"Tick: {value}"));
      

      這個例子中,Observable.Interval 每 1 秒發(fā)出一個值(從 0 開始遞增)。

      4. 將現(xiàn)有的數(shù)據(jù)源轉(zhuǎn)換為 Observable

      除了手動創(chuàng)建 Observable 之外,Rx.NET 還提供了一些工具,用來將現(xiàn)有的數(shù)據(jù)源(如任務(wù)、事件等)轉(zhuǎn)換為 Observable。

      4.1 使用 Task 創(chuàng)建 Observable

      Rx.NET 提供了將 Task 轉(zhuǎn)換為 IObservable 的方法。

      Task<int> task = Task.FromResult(42);
      IObservable<int> taskObservable = task.ToObservable();
      
      taskObservable.Subscribe(value => Console.WriteLine($"Task result: {value}"));
      

      這個例子展示了如何將一個 Task 轉(zhuǎn)換為 Observable,并在任務(wù)完成時發(fā)出結(jié)果。

      4.2 使用 FromEventPattern 將事件轉(zhuǎn)換為 Observable

      我們可以使用 FromEventPattern 將標(biāo)準(zhǔn)的 .NET 事件轉(zhuǎn)換為 Observable。這個部分將在下一小節(jié)詳細(xì)討論。

      5. 總結(jié)

      • 創(chuàng)建可觀察序列 是使用 Rx.NET 的第一步。我們可以通過手動創(chuàng)建、使用現(xiàn)成的工廠方法、或?qū)F(xiàn)有的數(shù)據(jù)源轉(zhuǎn)換來創(chuàng)建 IObservable<T>
      • Rx 提供了多種簡單的工廠方法,如 ReturnRangeEmpty 等,幫助我們快速創(chuàng)建各種數(shù)據(jù)流。
      • 我們還可以使用 Observable.TimerObservable.Interval 來處理定時事件流。
      • 最后,Rx.NET 可以將 Task 和事件等異步源輕松轉(zhuǎn)換為 Observable。

      在理解了如何創(chuàng)建可觀察序列之后,接下來我們將討論如何將 .NET 事件轉(zhuǎn)換為可觀察序列,在下一小節(jié) 7.4 轉(zhuǎn)換 .NET 事件 中會詳細(xì)介紹。


      7.4 轉(zhuǎn)換 .NET 事件

      問題背景

      在 .NET 中,事件是處理異步操作的常見方式,而在 Reactive Extensions (Rx) 中,我們使用 IObservable<T> 來處理數(shù)據(jù)流。為了讓傳統(tǒng)的 .NET 事件能與 Rx 的響應(yīng)式編程模型兼容,我們需要將事件轉(zhuǎn)換為 IObservable<T>。這個過程可以通過 Observable.FromEventObservable.FromEventPattern 來實現(xiàn)。

      事件轉(zhuǎn)換的核心

      • FromEvent:適用于不符合標(biāo)準(zhǔn)事件模式的事件。
      • FromEventPattern:適用于標(biāo)準(zhǔn)的 .NET 事件,特別是使用 EventHandler<T> 的事件。例如,ProgressChangedElapsed 事件。

      示例 1:將事件轉(zhuǎn)換為 Observable

      假設(shè)我們有一個按鈕點擊的事件 Click,我們想將它轉(zhuǎn)換成一個 IObservable,并在每次點擊時執(zhí)行對應(yīng)的響應(yīng)動作。

      var button = new Button();
      
      // 將 Click 事件轉(zhuǎn)換為 Observable
      IObservable<EventPattern<EventArgs>> clicks =
          Observable.FromEventPattern<EventHandler, EventArgs>(
              handler => button.Click += handler,
              handler => button.Click -= handler
          );
      
      // 訂閱事件,處理點擊行為
      clicks.Subscribe(click => Console.WriteLine("Button clicked!"));
      

      在這個例子中,FromEventPattern 將按鈕的 Click 事件轉(zhuǎn)換為 IObservable<EventPattern<EventArgs>>。每當(dāng)按鈕被點擊時,OnNext 會被觸發(fā),輸出 "Button clicked!"。

      示例 2:處理帶有數(shù)據(jù)的事件

      假設(shè)我們有一個進度條,每次進度更新時會觸發(fā) ProgressChanged 事件。我們可以使用 FromEventPattern 將該事件轉(zhuǎn)換成 Observable,并在每次進度變化時處理數(shù)據(jù)。

      var progress = new Progress<int>();
      
      // 將 ProgressChanged 事件轉(zhuǎn)換為 Observable
      IObservable<EventPattern<int>> progressReports =
          Observable.FromEventPattern<EventHandler<int>, int>(
              handler => progress.ProgressChanged += handler,
              handler => progress.ProgressChanged -= handler
          );
      
      // 打印每次進度變化的值
      progressReports.Subscribe(report => Console.WriteLine("Progress: " + report.EventArgs));
      

      在這個例子中,ProgressChanged 是一個標(biāo)準(zhǔn)的 EventHandler<T> 類型事件,因此我們可以簡單地使用 FromEventPattern 來包裝它。每當(dāng)進度更新時,OnNext 會被觸發(fā),并打印當(dāng)前的進度值。

      示例 3:處理自定義事件

      如果我們遇到自定義的事件類型,它可能不符合 EventHandler<T> 的標(biāo)準(zhǔn)模式。在這種情況下,我們可以使用 FromEvent。假設(shè)有一個自定義的事件 OnTemperatureChanged,我們可以這樣處理:

      public class Thermometer
      {
          public event Action<double> OnTemperatureChanged;
      
          public void SimulateTemperatureChange(double newTemp)
          {
              OnTemperatureChanged?.Invoke(newTemp);
          }
      }
      
      var thermometer = new Thermometer();
      
      // 將自定義事件轉(zhuǎn)換為 Observable
      IObservable<double> temperatureChanges =
          Observable.FromEvent<double>(
              handler => thermometer.OnTemperatureChanged += handler,
              handler => thermometer.OnTemperatureChanged -= handler
          );
      
      // 訂閱溫度變化事件
      temperatureChanges.Subscribe(temp => Console.WriteLine($"Temperature changed to: {temp}°C"));
      
      // 模擬溫度變化
      thermometer.SimulateTemperatureChange(23.5);
      thermometer.SimulateTemperatureChange(24.0);
      

      在這個例子中,OnTemperatureChanged 是一個自定義的 Action<double> 委托。我們使用 FromEvent 將其轉(zhuǎn)化為 IObservable<double>,并在每次溫度變化時輸出新的溫度值。

      異常處理

      有些事件可能會在執(zhí)行中拋出異常。例如,WebClientDownloadStringCompleted 事件可能會因為網(wǎng)絡(luò)問題而在 EventArgs 中包含錯誤。Rx 默認(rèn)將這些錯誤視為數(shù)據(jù),而不是異常。這時,我們需要手動處理這些錯誤。

      var client = new WebClient();
      
      // 將 DownloadStringCompleted 事件轉(zhuǎn)換為 Observable
      IObservable<EventPattern<DownloadStringCompletedEventArgs>> downloadedStrings =
          Observable.FromEventPattern<DownloadStringCompletedEventArgs>(
              handler => client.DownloadStringCompleted += handler,
              handler => client.DownloadStringCompleted -= handler
          );
      
      // 處理下載結(jié)果或錯誤
      downloadedStrings.Subscribe(
          data =>
          {
              if (data.EventArgs.Error != null)
                  Console.WriteLine("Download failed: " + data.EventArgs.Error.Message);
              else
                  Console.WriteLine("Downloaded: " + data.EventArgs.Result);
          }
      );
      
      // 發(fā)起異步下載
      client.DownloadStringAsync(new Uri("http://example.com"));
      

      在這個例子中,DownloadStringCompletedEventArgs 包含了下載結(jié)果或錯誤信息。我們通過檢查 eventArgs.Error 來判斷是否發(fā)生了錯誤,并在控制臺中輸出相應(yīng)的信息。

      線程上下文問題

      在某些情況下,事件的訂閱和取消訂閱必須在特定的上下文中執(zhí)行。例如,UI 事件必須在 UI 線程上訂閱。System.Reactive 提供了 SubscribeOn 操作符來控制訂閱的線程上下文:

      IObservable<EventPattern<EventArgs>> clicks =
          Observable.FromEventPattern<EventHandler, EventArgs>(
              handler => button.Click += handler,
              handler => button.Click -= handler
          );
      
      // 使用 SubscribeOn 指定事件處理需要在 UI 線程上執(zhí)行
      clicks
          .SubscribeOn(Scheduler.CurrentThread)
          .Subscribe(click => Console.WriteLine("Button clicked on UI thread"));
      

      在這個例子中,我們使用 SubscribeOn 來確保事件的訂閱在 UI 線程上執(zhí)行。

      總結(jié)

      • FromEventPattern 適用于標(biāo)準(zhǔn)的 EventHandler<T> 事件。
      • FromEvent 適用于自定義的或不符合標(biāo)準(zhǔn)的事件。
      • 當(dāng)事件被轉(zhuǎn)換為 IObservable<T> 之后,Rx 的各種操作符(如過濾、轉(zhuǎn)換、合并等)就可以輕松應(yīng)用到事件流上,幫助我們簡化異步編程。

      7.5 向上下文發(fā)送通知

      問題背景

      在 Rx.NET 中,事件通知(如 OnNext)可以從任何線程發(fā)出,特別是當(dāng)你使用像 Observable.Interval 這類基于定時器的操作符時,通知可能來自不同的線程池線程。這種行為通常對后臺處理沒有問題,但在某些場景下,尤其是涉及 UI 的場景時,線程問題就變得至關(guān)重要。例如,許多 UI 框架要求 UI 更新必須在主線程(UI 線程)上進行。如果通知來自后臺線程而你試圖更新 UI 元素,就會拋出異常。因此,我們需要確保所有相關(guān)的通知能在正確的上下文中處理。

      解決方案:使用 ObserveOn

      ObserveOn 是 Rx.NET 提供的一個運算符,它可以將事件通知(如 OnNextOnCompletedOnError)切換到指定的調(diào)度器或線程上下文中。通過 ObserveOn,我們可以將通知從后臺線程切換到 UI 線程,確保 UI 更新在正確的線程中進行。

      注意ObserveOn 控制的是可觀察通知的執(zhí)行上下文。不要將它與 SubscribeOn 混淆,后者控制的是訂閱(即添加/移除事件處理程序)的代碼所在的上下文。簡而言之:

      • ObserveOn:決定通知在哪個上下文或線程上發(fā)出。
      • SubscribeOn:決定訂閱邏輯在哪個上下文或線程上執(zhí)行。

      示例 1:切換到 UI 線程

      假設(shè)我們有一個按鈕點擊事件,每次點擊后我們啟動一個 Observable.Interval,每秒發(fā)出一次 OnNext 通知。由于 Interval 默認(rèn)使用線程池線程,我們需要將這些通知切換到 UI 線程來處理,確保 UI 更新在正確的線程上進行。

      private void Button_Click(object sender, RoutedEventArgs e)
      {
          // 獲取當(dāng)前的 UI 線程上下文
          SynchronizationContext uiContext = SynchronizationContext.Current;
      
          Trace.WriteLine($"UI thread is {Environment.CurrentManagedThreadId}");
      
          // 創(chuàng)建基于時間間隔的 Observable
          Observable.Interval(TimeSpan.FromSeconds(1))
              // 切換到 UI 線程上下文處理通知
              .ObserveOn(uiContext)
              .Subscribe(x => Trace.WriteLine(
                  $"Interval {x} on thread {Environment.CurrentManagedThreadId}"));
      }
      

      輸出示例:

      UI thread is 9
      Interval 0 on thread 9
      Interval 1 on thread 9
      Interval 2 on thread 9
      

      在這個例子中,Observable.Interval 每秒發(fā)出一個值。由于我們使用了 ObserveOn 運算符,并傳遞了當(dāng)前的 SynchronizationContext,所有通知都會在 UI 線程(線程 9)上執(zhí)行。即使 Interval 默認(rèn)使用后臺線程,ObserveOn 也確保了通知會切換到 UI 線程處理。

      示例 2:從 UI 線程切換到后臺線程處理復(fù)雜計算

      在某些情況下,你可能希望從 UI 線程切換到后臺線程來處理一些耗時的計算任務(wù)。例如,當(dāng)鼠標(biāo)移動時,我們可能需要進行 CPU 密集型的計算。我們可以使用 ObserveOn 將計算任務(wù)移到后臺線程上進行處理,避免阻塞 UI 線程,然后再將結(jié)果切換回 UI 線程顯示。

      private void SetupMouseMoveProcessing()
      {
          SynchronizationContext uiContext = SynchronizationContext.Current;
      
          Trace.WriteLine($"UI thread is {Environment.CurrentManagedThreadId}");
      
          Observable.FromEventPattern<MouseEventHandler, MouseEventArgs>(
                  handler => (s, a) => handler(s, a),
                  handler => this.MouseMove += handler,
                  handler => this.MouseMove -= handler)
              .Select(evt => evt.EventArgs.GetPosition(this))
              // 切換到后臺線程進行計算
              .ObserveOn(Scheduler.Default)
              .Select(position =>
              {
                  // 模擬復(fù)雜計算
                  Thread.Sleep(100);  // 假設(shè)計算需要花費一些時間
                  var result = position.X + position.Y;
                  var thread = Environment.CurrentManagedThreadId;
                  Trace.WriteLine($"Calculated result {result} on thread {thread}");
                  return result;
              })
              // 將結(jié)果切換回 UI 線程
              .ObserveOn(uiContext)
              .Subscribe(result => Trace.WriteLine(
                  $"Result {result} on thread {Environment.CurrentManagedThreadId}"));
      }
      

      過程分析:

      1. 我們從 MouseMove 事件創(chuàng)建了一個 Observable,每次鼠標(biāo)移動時都將捕獲鼠標(biāo)位置。
      2. 使用 ObserveOn(Scheduler.Default) 將事件流切換到后臺線程,進行耗時的計算(模擬了 100 毫秒的延遲)。
      3. 計算完成后,使用 ObserveOn(uiContext) 將結(jié)果切換回 UI 線程,以便安全地更新 UI 或其他需要在 UI 線程執(zhí)行的操作。

      輸出示例:

      UI thread is 9
      Calculated result 150 on thread 10
      Result 150 on thread 9
      Calculated result 200 on thread 10
      Result 200 on thread 9
      

      解釋:

      • 鼠標(biāo)移動事件最初在 UI 線程上觸發(fā),Observable.FromEventPattern 捕獲這些事件。
      • 計算任務(wù)被切換到后臺線程(線程 10),避免阻塞 UI 線程。
      • 計算完成后,結(jié)果被切換回 UI 線程(線程 9)進行處理。

      延遲與隊列問題

      在這個例子中,由于鼠標(biāo)移動頻率比計算速度快(每次計算需要 100 毫秒),計算和結(jié)果會出現(xiàn)延遲,因為事件會排隊等待處理。這意味著鼠標(biāo)移動事件在后臺線程中會被排隊處理,而不是實時計算最新的鼠標(biāo)位置。

      為了解決這種延遲問題,Rx.NET 提供了很多運算符,比如節(jié)流(Throttle),來減少事件的頻率。你可以在高頻繁的事件流中使用這些運算符來減輕負(fù)載。

      總結(jié)

      • Rx.NET 默認(rèn)不區(qū)分線程:事件通知可以來自任何線程,特別是像 Observable.Interval 等操作符使用的線程池線程。
      • ObserveOn 運算符:允許我們將事件流切換到指定的線程或調(diào)度器上。對于 UI 操作,通常需要從后臺線程切換回 UI 線程。
      • SubscribeOn 運算符:與 ObserveOn 不同,SubscribeOn 決定的是訂閱邏輯(即添加和移除事件處理程序)在哪個線程上執(zhí)行。
      • 處理復(fù)雜計算的場景:我們可以使用 ObserveOn(Scheduler.Default) 將計算任務(wù)移到后臺線程,避免阻塞 UI 線程,然后通過 ObserveOn 切換回 UI 線程處理結(jié)果。

      通過 ObserveOn,我們可以在不同的線程或上下文間靈活切換,確保所有操作都在合適的線程中完成。


      7.6 使用窗口和緩沖來分組事件數(shù)據(jù)

      問題背景

      在處理事件流時,經(jīng)常會遇到這樣一種需求:我們需要對事件進行分組處理。比如,你需要每兩個事件成對處理,或者在特定的時間窗口內(nèi)處理收到的所有事件。為了解決這些問題,Rx.NET 提供了兩個強大的運算符:BufferWindow

      • Buffer(緩沖):收集一組事件,并在該組完成后,將這些事件作為一個集合發(fā)出。
      • Window(窗口):按邏輯分組事件,但在事件到達(dá)時就直接傳遞出去。Window 會返回一個 IObservable<IObservable<T>>,即“事件流的事件流”。

      解決方案:BufferWindow 運算符

      BufferWindow 可以根據(jù)事件的數(shù)量或時間來對事件進行分組。下面,我們將使用一些具體的示例來說明它們的工作方式。

      示例 1:使用 Buffer 按數(shù)量分組事件

      Buffer 會累積一定數(shù)量的事件,當(dāng)達(dá)到指定的數(shù)量后,將這些事件作為一個集合發(fā)出。

      例如,使用 Observable.Interval 每秒發(fā)出一個 OnNext 通知,我們可以使用 Buffer(2) 每次將兩個事件組成一個集合。

      Observable.Interval(TimeSpan.FromSeconds(1))
          .Buffer(2)   // 每兩個事件分組
          .Subscribe(bufferedItems => 
          {
              Trace.WriteLine($"{DateTime.Now.Second}: Got {bufferedItems[0]} and {bufferedItems[1]}");
          });
      

      輸出示例:

      13: Got 0 and 1
      15: Got 2 and 3
      17: Got 4 and 5
      19: Got 6 and 7
      21: Got 8 and 9
      

      在這個例子中,Buffer(2) 將每次收到的兩個事件組成一個 IList<T> 集合,并一起發(fā)出。每秒一個事件,因此每兩秒我們可以看到一對事件被處理。

      示例 2:使用 Window 按數(shù)量分組事件

      Window 的工作方式與 Buffer 類似,但它不會等待所有事件都到達(dá)后再發(fā)出集合,而是立即發(fā)出一個 IObservable<T>(即一個新的“事件流”),并且在這個新的事件流中逐一發(fā)出事件。

      下面是一個類似的示例,使用 Window(2) 每次分組兩個事件:

      Observable.Interval(TimeSpan.FromSeconds(1))
          .Window(2)   // 每兩個事件分組
          .Subscribe(window =>
          {
              Trace.WriteLine($"{DateTime.Now.Second}: Starting new group");
              window.Subscribe(
                  x => Trace.WriteLine($"{DateTime.Now.Second}: Saw {x}"),
                  () => Trace.WriteLine($"{DateTime.Now.Second}: Ending group"));
          });
      

      輸出示例:

      17: Starting new group
      18: Saw 0
      19: Saw 1
      19: Ending group
      19: Starting new group
      20: Saw 2
      21: Saw 3
      21: Ending group
      21: Starting new group
      22: Saw 4
      23: Saw 5
      23: Ending group
      

      在這個例子中,Window(2) 每兩個事件分配一個新的窗口(即 IObservable<T>),并在該窗口內(nèi)逐個發(fā)出事件。當(dāng)窗口的事件接收完畢后,窗口會觸發(fā) OnCompleted,并結(jié)束當(dāng)前的分組。

      示例 3:使用 Buffer 按時間分組事件

      除了按事件數(shù)量分組,Buffer 也可以按照時間窗口來分組。比如,我們可以在每 1 秒內(nèi)收集所有事件,并將它們作為一個集合發(fā)出。這在處理高頻率的事件流時非常有用,比如鼠標(biāo)移動事件。

      private void Button_Click(object sender, RoutedEventArgs e)
      {
          Observable.FromEventPattern<MouseEventHandler, MouseEventArgs>(
                  handler => (s, a) => handler(s, a),
                  handler => this.MouseMove += handler,
                  handler => this.MouseMove -= handler)
              .Buffer(TimeSpan.FromSeconds(1))  // 每1秒緩沖一次
              .Subscribe(events =>
              {
                  Trace.WriteLine($"{DateTime.Now.Second}: Saw {events.Count} items.");
              });
      }
      

      輸出示例:

      10: Saw 5 items.
      11: Saw 3 items.
      12: Saw 7 items.
      

      在這個例子中,Buffer(TimeSpan.FromSeconds(1)) 會每隔一秒收集該秒內(nèi)的所有鼠標(biāo)移動事件,并將這些事件作為一個集合發(fā)出。輸出的事件數(shù)量取決于用戶在該秒內(nèi)移動鼠標(biāo)的頻率。

      示例 4:使用 Window 按時間分組事件

      類似地,Window 也可以按照時間窗口來分組,但與 Buffer 不同,它會立即發(fā)出窗口(即 IObservable<T>),并在該窗口內(nèi)逐個發(fā)出事件。

      private void Button_Click(object sender, RoutedEventArgs e)
      {
          Observable.FromEventPattern<MouseEventHandler, MouseEventArgs>(
                  handler => (s, a) => handler(s, a),
                  handler => this.MouseMove += handler,
                  handler => this.MouseMove -= handler)
              .Window(TimeSpan.FromSeconds(1))  // 每1秒創(chuàng)建一個新窗口
              .Subscribe(window =>
              {
                  Trace.WriteLine($"{DateTime.Now.Second}: New window started");
                  window.Subscribe(
                      evt => Trace.WriteLine($"{DateTime.Now.Second}: Mouse moved"),
                      () => Trace.WriteLine($"{DateTime.Now.Second}: Window closed"));
              });
      }
      

      輸出示例:

      10: New window started
      10: Mouse moved
      10: Mouse moved
      11: Window closed
      11: New window started
      11: Mouse moved
      12: Window closed
      

      在這個例子中,Window(TimeSpan.FromSeconds(1)) 每秒開啟一個新的窗口,每次鼠標(biāo)移動時,事件會被立即發(fā)出,同時每秒的窗口結(jié)束時會觸發(fā) OnCompleted,關(guān)閉當(dāng)前窗口。

      BufferWindow 的區(qū)別

      • Buffer:將事件收集到一個集合中,直到分組條件滿足(如達(dá)到指定數(shù)量或時間窗口結(jié)束),然后一次性發(fā)出整個集合。返回類型是 IObservable<IList<T>>,即事件集合的可觀察流。

      • Window:按分組條件(如數(shù)量或時間)創(chuàng)建一個新的窗口(即IObservable<T>),并在事件到達(dá)時立即發(fā)出。返回類型是 IObservable<IObservable<T>>,即“事件流的事件流”。

      總結(jié)

      • BufferWindow 是 Rx.NET 中常用的運算符,用于對事件流進行分組處理。
      • Buffer 會等待事件組完成后再發(fā)出整個集合,而 Window 會立即發(fā)出新的窗口并在其中逐個發(fā)出事件。
      • 這兩個運算符都支持按事件數(shù)量或時間分組,適用于不同的場景。

      通過使用 BufferWindow,我們可以更高效地處理批量事件或時間敏感的事件流,尤其是在需要對事件進行分組、批處理或窗口化時。


      7.7 超時

      問題背景

      在某些情況下,你可能希望事件在一定的時間內(nèi)到達(dá)。如果事件未能在規(guī)定的時間內(nèi)到達(dá),程序仍需要能夠及時響應(yīng)。這種需求在處理異步操作時非常常見,比如等待來自 Web 服務(wù)的響應(yīng)。如果響應(yīng)過慢,程序應(yīng)該超時并采取相應(yīng)的措施,而不是無限期地等待。

      解決方案:Timeout 運算符

      Timeout 運算符為事件流創(chuàng)建了一個滑動的超時窗口。每當(dāng)有新事件到來時,超時窗口會被重置。如果超時窗口內(nèi)沒有收到新事件,則超時窗口過期,并且 Timeout 運算符會通過 OnError 通知,發(fā)出一個包含 TimeoutException 的終止信號。

      示例 1:對 Web 請求應(yīng)用超時

      以下示例向一個示例域名發(fā)起 Web 請求,并為該請求設(shè)置了 1 秒的超時時間。如果超過 1 秒還沒有得到響應(yīng),則會拋出 TimeoutException

      void GetWithTimeout(HttpClient client)
      {
          client.GetStringAsync("http://exampleurl").ToObservable()
              .Timeout(TimeSpan.FromSeconds(1))  // 設(shè)置1秒超時
              .Subscribe(
                  x => Trace.WriteLine($"{DateTime.Now.Second}: Saw {x.Length}"),
                  ex => Trace.WriteLine(ex));  // 當(dāng)超時發(fā)生時,輸出異常信息
      }
      

      在這個例子中,如果 Web 請求在 1 秒內(nèi)沒有完成,Timeout 運算符會自動終止流,并通過 OnError 發(fā)出 TimeoutException。這使得程序可以對超時情況做出響應(yīng),而不是無限期等待。

      Timeout 的常見應(yīng)用場景

      • Web 請求:在等待 Web 服務(wù)響應(yīng)時,防止請求長時間掛起。
      • 異步任務(wù):限制異步操作的執(zhí)行時間,確保程序能夠及時超時并采取相應(yīng)行動。

      示例 2:為鼠標(biāo)移動事件設(shè)置超時

      Timeout 可以應(yīng)用于任何事件流,除了異步操作,它也可以用于用戶輸入、傳感器數(shù)據(jù)等事件流。以下示例為鼠標(biāo)移動事件設(shè)置了 1 秒的超時時間。如果 1 秒內(nèi)沒有收到鼠標(biāo)移動事件,程序會拋出 TimeoutException

      private void Button_Click(object sender, RoutedEventArgs e)
      {
          Observable.FromEventPattern<MouseEventHandler, MouseEventArgs>(
                  handler => (s, a) => handler(s, a),
                  handler => MouseMove += handler,
                  handler => MouseMove -= handler)
              .Select(x => x.EventArgs.GetPosition(this))
              .Timeout(TimeSpan.FromSeconds(1))  // 設(shè)置1秒超時
              .Subscribe(
                  x => Trace.WriteLine($"{DateTime.Now.Second}: Saw {x.X + x.Y}"),
                  ex => Trace.WriteLine(ex));  // 超時后輸出異常信息
      }
      

      輸出示例:

      16: Saw 180
      16: Saw 178
      16: Saw 177
      16: Saw 176
      System.TimeoutException: The operation has timed out.
      

      在這個例子中,鼠標(biāo)移動了幾次后停止,1 秒內(nèi)沒有新的鼠標(biāo)移動事件,因此 Timeout 運算符觸發(fā)了 TimeoutException,并終止了事件流。

      使用 Timeout 的重載方法

      有時,你可能不希望在超時發(fā)生時立即終止事件流。Timeout 運算符提供了一個重載版本,允許你在超時發(fā)生時切換到另一個事件流,而不是通過異常終止當(dāng)前流。

      示例 3:在超時后切換到鼠標(biāo)點擊事件流

      以下示例在超時之前監(jiān)聽鼠標(biāo)移動事件。如果超時發(fā)生后,還沒有新的鼠標(biāo)移動事件,則切換到監(jiān)聽鼠標(biāo)點擊事件:

      private void Button_Click(object sender, RoutedEventArgs e)
      {
          // 鼠標(biāo)點擊事件流
          IObservable<Point> clicks =
              Observable.FromEventPattern<MouseButtonEventHandler, MouseButtonEventArgs>(
                  handler => (s, a) => handler(s, a),
                  handler => MouseDown += handler,
                  handler => MouseDown -= handler)
              .Select(x => x.EventArgs.GetPosition(this));
      
          // 鼠標(biāo)移動事件流,超時后切換到點擊事件流
          Observable.FromEventPattern<MouseEventHandler, MouseEventArgs>(
                  handler => (s, a) => handler(s, a),
                  handler => MouseMove += handler,
                  handler => MouseMove -= handler)
              .Select(x => x.EventArgs.GetPosition(this))
              .Timeout(TimeSpan.FromSeconds(1), clicks)  // 超時后切換到 clicks 流
              .Subscribe(
                  x => Trace.WriteLine($"{DateTime.Now.Second}: Saw {x.X},{x.Y}"),
                  ex => Trace.WriteLine(ex));  // 輸出異常信息
      }
      

      輸出示例:

      49: Saw 95,39
      49: Saw 94,39
      49: Saw 94,38
      49: Saw 94,37
      53: Saw 130,141
      55: Saw 469,4
      

      在這個例子中,程序開始時監(jiān)聽鼠標(biāo)移動事件。當(dāng)鼠標(biāo)靜止超過 1 秒后,程序切換到監(jiān)聽鼠標(biāo)點擊事件。鼠標(biāo)移動事件流超時后,程序捕獲了兩次鼠標(biāo)點擊事件。

      總結(jié) Timeout 的使用方式

      • 默認(rèn)行為:當(dāng)事件流中沒有在指定的時間內(nèi)收到事件,Timeout 運算符會發(fā)出 TimeoutException,并通過 OnError 終止流。
      • 可選行為:通過 Timeout 的重載方法,可以在超時發(fā)生時切換到另一個事件流,而不是終止當(dāng)前流。

      討論

      對于一些關(guān)鍵應(yīng)用來說,Timeout 是一個必不可少的運算符,因為它確保了應(yīng)用程序在任何情況下都能及時響應(yīng)。對于異步操作,尤其是 Web 請求,Timeout 可以防止系統(tǒng)長時間等待,進而導(dǎo)致資源浪費。而在用戶輸入流中,Timeout 也可以用于處理用戶長時間不活動的情況。

      需要注意的是,使用 Timeout 并不會取消底層的操作(例如 HTTP 請求)。當(dāng) Timeout 觸發(fā)時,底層操作仍然會繼續(xù)執(zhí)行,直到成功或失敗。這意味著,程序可能會啟動一些不再關(guān)心的異步操作,開發(fā)者需要考慮如何處理這些操作的結(jié)果。

      總結(jié)

      • Timeout 運算符:用于確保事件流能夠在指定時間內(nèi)響應(yīng)。如果事件流在超時窗口內(nèi)沒有事件到達(dá),Timeout 會觸發(fā) TimeoutException,終止流。
      • 適用場景Timeout 常用于異步操作(如 Web 請求)和用戶輸入流,確保程序不會長時間等待。
      • 重載行為Timeout 可以在超時發(fā)生后,切換到另一個事件流,而不是直接拋出異常終止流。
      posted @ 2024-12-09 16:04  平元兄  閱讀(1587)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 亚洲欧美日韩综合一区在线| 国产精品久久无中文字幕| 正在播放国产对白孕妇作爱| 国产成人午夜精品福利| 日韩区中文字幕在线观看| 久久人人爽人人爽人人av| 午夜免费无码福利视频麻豆| 疯狂三人交性欧美| 国产精品人妻中文字幕| 人妻少妇精品无码专区二区| 人妻丝袜AV中文系列先锋影音| 亚洲精品无码久久久影院相关影片| 久久精品一本到东京热| 无码精品人妻一区二区三区中 | 久久久久久久久久久久中文字幕| 毛片大全真人在线| 成在线人免费视频| 天堂网在线观看| 免费人妻无码不卡中文18禁| 大香伊蕉在人线国产av| 伊人久久大香线蕉综合观| 天堂网www在线资源网| 在线播放亚洲成人av| 精品中文人妻在线不卡| 日本一区二区三区四区黄色| 中文字幕日韩精品人妻| 平阳县| 少妇被粗大的猛烈xx动态图| 亚洲国内精品一区二区| 综合色一色综合久久网| 国产在线精品一区二区夜色| 视频二区国产精品职场同事| 国产福利永久在线视频无毒不卡| 国产一区日韩二区三区| 99精品久久久久久久婷婷| 少妇熟女天堂网av| 欧美成人一区二区三区不卡| 少妇被粗大的猛烈进出69影院一| 人妻少妇精品无码专区二区 | 性久久久久久| 国产精品一区二区三区激情|