[譯]何時使用 Parallel.ForEach,何時使用 PLINQ
原作者: Pamela Vagata, Parallel Computing Platform Group, Microsoft Corporation
原文pdf:http://download.csdn.NET/detail/sqlchen/7509513
====================================================================
簡介
當需要為多核機器進行優化的時候,最好先檢查下你的程序是否有處理能夠分割開來進行并行處理。(例如,有一個巨大的數據集合,其中的元素需要一個一個進行彼此獨立的耗時計算)。
.net framework 4 中提供了 Parallel.ForEach 和 PLINQ 來幫助我們進行并行處理,本文探討這兩者的差別及適用的場景。
Parallel.ForEach
Parallel.ForEach 是 foreach 的多線程實現,他們都能對 IEnumerable<T> 類型對象進行遍歷,Parallel.ForEach 的特殊之處在于它使用多線程來執行循環體內的代碼段。
Parallel.ForEach 最常用的形式如下:
public static ParallelLoopResult ForEach<TSource>( IEnumerable<TSource> source, Action<TSource> body)
PLINQ
PLINQ 也是一種對數據進行并行處理的編程模型,它通過 LINQ 的語法來實現類似 Parallel.ForEach 的多線程并行處理。
場景一:簡單數據 之 獨立操作的并行處理(使用 Parallel.ForEach)
示例代碼:
public static void IndependentAction(IEnumerable<T> source, Action<T> action)
{
Parallel.ForEach(source, element => action(element));
}
理由:
1. 雖然 PLINQ 也提供了一個類似的 ForAll 接口,但它對于簡單的獨立操作太重量化了。
2. 使用 Parallel.ForEach 你還能夠設定 ParallelOptions.MaxDegreeOfParalelism
參數(指定最多需要多少個線程),這樣當 ThreadPool
資源匱乏(甚至當可用線程數<MaxDegreeOfParalelism)的時候, Parallel.ForEach
依然能夠順利運行,并且當后續有更多可用線程出現時,Parallel.ForEach 也能及時地利用這些線程。PLINQ
只能通過WithDegreeOfParallelism 方法來要求固定的線程數,即:要求了幾個就是幾個,不會多也不會少。
場景二:順序數據 之 并行處理(使用 PLINQ 來維持數據順序)
當輸出的數據序列需要保持原始的順序時采用 PLINQ 的 AsOrdered 方法非常簡單高效。
示例代碼:
public static void GrayscaleTransformation(IEnumerable<Frame> Movie)
{
var ProcessedMovie =
Movie
.AsParallel()
.AsOrdered()
.Select(frame => ConvertToGrayscale(frame));
foreach (var grayscaleFrame in ProcessedMovie)
{
// Movie frames will be evaluated lazily
}
}
理由:
1. Parallel.ForEach 實現起來需要繞一些彎路,首先你需要使用以下的重載在方法:
public static ParallelLoopResult ForEach<TSource >(
IEnumerable<TSource> source,
Action<TSource, ParallelLoopState, Int64> body)
這個重載的 Action 多包含了 index 參數,這樣你在輸出的時候就能利用這個值來維持原先的序列順序。請看下面的例子:
public static double [] PairwiseMultiply(double[] v1, double[] v2)
{
var length = Math.Min(v1.Length, v2.Lenth);
double[] result = new double[length];
Parallel.ForEach(v1, (element, loopstate, elementIndex) =>
result[elementIndex] = element * v2[elementIndex]);
return result;
}
你可能已經意識到這里有個明顯的問題:我們使用了固定長度的數組。如果傳入的是 IEnumerable 那么你有4個解決方案:
(1) 調用 IEnumerable.Count() 來獲取數據長度,然后用這個值實例化一個固定長度的數組,然后使用上例的代碼。
(2) The second option would be to materialize the original collection before using it; in the event that your input data set is prohibitively large, neither of the first two options will be feasible.(沒看懂貼原文)
(3) 第三種方式是采用返回一個哈希集合的方式,這種方式下通常需要至少2倍于傳入數據的內存,所以處理大數據時請慎用。
(4) 自己實現排序算法(保證傳入數據與傳出數據經過排序后次序一致)
2. 相比之下 PLINQ 的 AsOrdered 方法如此簡單,而且該方法能處理流式的數據,從而允許傳入數據是延遲實現的(lazy materialized)
場景三:流數據 之 并行處理(使用 PLINQ)
PLINQ 能輸出流數據,這個特性在一下場合非常有用:
1. 結果集不需要是一個完整的處理完畢的數組,即:任何時間點下內存中僅保持數組中的部分信息
2. 你能夠在一個單線程上遍歷輸出結果(就好像他們已經存在/處理完了)
示例:
public static void AnalyzeStocks(IEnumerable<Stock> Stocks)
{
var StockRiskPortfolio =
Stocks
.AsParallel()
.AsOrdered()
.Select(stock => new { Stock = stock, Risk = ComputeRisk(stock)})
.Where(stockRisk => ExpensiveRiskAnalysis(stockRisk.Risk));
foreach (var stockRisk in StockRiskPortfolio)
{
SomeStockComputation(stockRisk.Risk);
// StockRiskPortfolio will be a stream of results
}
}
這里使用一個單線程的 foreach 來對 PLINQ 的輸出進行后續處理,通常情況下 foreach 不需要等待 PLINQ 處理完所有數據就能開始運作。
PLINQ 也允許指定輸出緩存的方式,具體可參照 PLINQ 的 WithMergeOptions 方法,及 ParallelMergeOptions 枚舉
場景四:處理兩個集合(使用 PLINQ)
PLINQ 的 Zip 方法提供了同時遍歷兩個集合并進行結合元算的方法,并且它可以與其他查詢處理操作結合,實現非常復雜的機能。
示例:
public static IEnumerable<T> Zipping<T>(IEnumerable<T> a, IEnumerable<T> b)
{
return
a
.AsParallel()
.AsOrdered()
.Select(element => ExpensiveComputation(element))
.Zip(
b
.AsParallel()
.AsOrdered()
.Select(element => DifferentExpensiveComputation(element)),
(a_element, b_element) => Combine(a_element,b_element));
}
示例中的兩個數據源能夠并行處理,當雙方都有一個可用元素時提供給 Zip 進行后續處理(Combine)。
Parallel.ForEach 也能實現類似的 Zip 處理:
public static IEnumerable<T> Zipping<T>(IEnumerable<T> a, IEnumerable<T> b)
{
var numElements = Math.Min(a.Count(), b.Count());
var result = new T[numElements];
Parallel.ForEach(a,
(element, loopstate, index) =>
{
var a_element = ExpensiveComputation(element);
var b_element = DifferentExpensiveComputation(b.ElementAt(index));
result[index] = Combine(a_element, b_element);
});
return result;
}
當然使用 Parallel.ForEach 后你就得自己確認是否要維持原始序列,并且要注意數組越界訪問的問題。
場景五:線程局部變量
Parallel.ForEach 提供了一個線程局部變量的重載,定義如下:
public static ParallelLoopResult ForEach<TSource, TLocal>(
IEnumerable<TSource> source,
Func<TLocal> localInit,
Func<TSource, ParallelLoopState, TLocal,TLocal> body,
Action<TLocal> localFinally)
使用的示例:
public static List<R> Filtering<T,R>(IEnumerable<T> source)
{
var results = new List<R>();
using (SemaphoreSlim sem = new SemaphoreSlim(1))
{
Parallel.ForEach(source,
() => new List<R>(),
(element, loopstate, localStorage) =>
{
bool filter = filterFunction(element);
if (filter)
localStorage.Add(element);
return localStorage;
},
(finalStorage) =>
{
lock(myLock)
{
results.AddRange(finalStorage)
};
});
}
return results;
}
線程局部變量有什么優勢呢?請看下面的例子(一個網頁抓取程序):
public static void UnsafeDownloadUrls ()
{
WebClient webclient = new WebClient();
Parallel.ForEach(urls,
(url,loopstate,index) =>
{
webclient.DownloadFile(url, filenames[index] + ".dat");
Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url);
});
}
通常第一版代碼是這么寫的,但是運行時會報錯“System.NotSupportedException -> WebClient does not support concurrent I/O operations.”。這是因為多個線程無法同時訪問同一個 WebClient 對象。所以我們會把 WebClient 對象定義到線程中來:
public static void BAD_DownloadUrls ()
{
Parallel.ForEach(urls,
(url,loopstate,index) =>
{
WebClient webclient = new WebClient();
webclient.DownloadFile(url, filenames[index] + ".dat");
Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url);
});
}
修改之后依然有問題,因為你的機器不是服務器,大量實例化的 WebClient 迅速達到你機器允許的虛擬連接上限數。線程局部變量可以解決這個問題:
public static void downloadUrlsSafe()
{
Parallel.ForEach(urls,
() => new WebClient(),
(url, loopstate, index, webclient) =>
{
webclient.DownloadFile(url, filenames[index]+".dat");
Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url);
return webclient;
},
(webclient) => { });
}
這樣的寫法保證了我們能獲得足夠的 WebClient 實例,同時這些 WebClient 實例彼此隔離僅僅屬于各自關聯的線程。
雖然 PLINQ 提供了 ThreadLocal<T> 對象來實現類似的功能:
public static void downloadUrl()
{
var webclient = new ThreadLocal<WebClient>(()=> new WebClient ());
var res =
urls
.AsParallel()
.ForAll(
url =>
{
webclient.Value.DownloadFile(url, host[url] +".dat"));
Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url);
});
}
但是請注意:ThreadLocal<T> 相對而言開銷更大!
場景五:退出操作 (使用 Parallel.ForEach)
Parallel.ForEach 有個重載聲明如下,其中包含一個 ParallelLoopState 對象:
public static ParallelLoopResult ForEach<TSource >(
IEnumerable<TSource> source,
Action<TSource, ParallelLoopState> body)
ParallelLoopState.Stop() 提供了退出循環的方法,這種方式要比其他兩種方法更快。這個方法通知循環不要再啟動執行新的迭代,并盡可能快的推出循環。
ParallelLoopState.IsStopped 屬性可用來判定其他迭代是否調用了 Stop 方法。
示例:
public static boolean FindAny<T,T>(IEnumerable<T> TSpace, T match) where T: IEqualityComparer<T>
{
var matchFound = false;
Parallel.ForEach(TSpace,
(curValue, loopstate) =>
{
if (curValue.Equals(match) )
{
matchFound = true;
loopstate.Stop();
}
});
return matchFound;
}
ParallelLoopState.Break() 通知循環繼續執行本元素前的迭代,但不執行本元素之后的迭代。最前調用 Break 的起作用,并被記錄到 ParallelLoopState.LowestBreakIteration 屬性中。這種處理方式通常被應用在一個有序的查找處理中,比如你有一個排序過的數組,你想在其中查找匹配元素的最小 index,那么可以使用以下的代碼:
public static int FindLowestIndex<T,T>(IEnumerable<T> TSpace, T match) where T: IEqualityComparer<T>
{
var loopResult = Parallel.ForEach(source,
(curValue, loopState, curIndex) =>
{
if (curValue.Equals(match))
{
loopState.Break();
}
});
var matchedIndex = loopResult.LowestBreakIteration;
return matchedIndex.HasValue ? matchedIndex : -1;
}
雖然 PLINQ 也提供了退出的機制(cancellation token),但相對來說退出的時機并沒有 Parallel.ForEach 那么及時。
作者:
RDIF
出處:
http://www.rzrgm.cn/huyong/
Email:
406590790@qq.com
QQ:
406590790
微信:
13005007127(同手機號)
框架官網:
http://www.guosisoft.com/
http://www.rdiframework.net/
框架其他博客:
http://blog.csdn.net/chinahuyong
http://www.rzrgm.cn/huyong
國思RDIF開發框架
,
給用戶和開發者最佳的.Net框架平臺方案,為企業快速構建跨平臺、企業級的應用提供強大支持。
關于作者:系統架構師、信息系統項目管理師、DBA。專注于微軟平臺項目架構、管理和企業解決方案,多年項目開發與管理經驗,曾多次組織并開發多個大型項目,在面向對象、面向服務以及數據庫領域有一定的造詣。現主要從事基于
RDIF
框架的技術開發、咨詢工作,主要服務于金融、醫療衛生、鐵路、電信、物流、物聯網、制造、零售等行業。
如有問題或建議,請多多賜教!
本文版權歸作者和CNBLOGS博客共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文連接,如有問題,可以通過微信、郵箱、QQ等聯系我,非常感謝。

浙公網安備 33010602011771號