PLINQ (AsParallel) を使用してデータを並列処理しても、最終的な結果を foreach ループで受け取って処理してしまうと、その段階で並列処理の結果が1つのスレッド(メインスレッド)にマージされ、直列処理に戻ってしまいます。これでは、パイプライン全体のパフォーマンスを最大化できません。
データの変換から最終的な出力・書き込みまで、すべての工程を並列のまま実行し切るには、ForAll 拡張メソッドを使用します。
ForAllによる完全並列処理の実装
ForAll は、並列クエリの結果に対して、複数のスレッドで同時にアクションを実行するためのメソッドです。
以下のサンプルコードでは、多数のIoTセンサーから受信したデータを解析し、その結果を並列に出力(保存)するシナリオをシミュレーションします。通常の foreach とは異なり、複数のスレッドが同時に Console.WriteLine を呼び出す挙動に注目してください。
サンプルコード
using System;
using System.Collections.Concurrent; // ThreadSafeなコレクション用
using System.Linq;
using System.Threading;
public class Program
{
public static void Main()
{
// 1. データソース: 多数のセンサーデバイス情報
var sensorData = new[]
{
new { DeviceId = "SENS-001", Voltage = 12.5, Temperature = 45.2 },
new { DeviceId = "SENS-002", Voltage = 11.8, Temperature = 42.1 },
new { DeviceId = "SENS-003", Voltage = 12.0, Temperature = 48.5 },
new { DeviceId = "SENS-004", Voltage = 12.2, Temperature = 44.0 },
new { DeviceId = "SENS-005", Voltage = 11.5, Temperature = 39.8 },
new { DeviceId = "SENS-006", Voltage = 12.8, Temperature = 51.2 },
};
Console.WriteLine("--- 解析とレポート出力を並列実行 ---");
// 2. PLINQによる解析パイプライン
sensorData
.AsParallel() // 並列化
.WithDegreeOfParallelism(3) // 3スレッドで実行
.Select(sensor =>
{
// データ解析フェーズ(並列実行)
double efficiency = (sensor.Temperature / sensor.Voltage) * 10.0;
string status = efficiency > 40.0 ? "Warning" : "Normal";
return new
{
sensor.DeviceId,
Efficiency = efficiency,
Status = status,
// どのスレッドで計算されたか記録
ThreadId = Thread.CurrentThread.ManagedThreadId
};
})
// 3. ForAllによる並列出力フェーズ
// 結果をマージせず、各スレッドがそのまま後続の処理を実行する
.ForAll(result =>
{
// ここも並列に実行されるため、排他制御が必要なリソースへのアクセスには注意が必要
// 今回はコンソール出力(スレッドセーフだが表示順は不定)
Console.WriteLine(
$"[{result.ThreadId}] ID:{result.DeviceId} " +
$"効率:{result.Efficiency:F1} 状態:{result.Status}"
);
});
}
}
解説と技術的なポイント
1. foreach と ForAll の決定的な違い
- foreach: PLINQの結果バッファから、メインスレッドが1つずつデータを取り出して処理します(マージコストが発生し、処理はシングルスレッドになる)。
- ForAll: クエリを処理していた各スレッドが、マージすることなくそのままデリゲート内の処理を実行します。並列度を維持したまま終了できます。
2. 処理速度の向上
データを「集約」する必要がない場合(例:個別のファイルに書き出す、データベースに個別にインサートする、APIに投げつける等)は、ForAll を使うことでマージのオーバーヘッドを削減でき、スループットが向上します。
3. スレッドセーフ性の考慮
ForAll 内の処理は完全に並列で走るため、競合状態(レースコンディション)に注意が必要です。
- 悪い例:
List<T>.Add()をForAll内で呼ぶ(Listはスレッドセーフではないため壊れる)。 - 良い例:
ConcurrentBag<T>.Add()を使う、またはlockで排他制御を行う(ただしlockすると並列性のメリットは薄れる)。
コンソール出力などはOS側で同期が取られますが、行単位で出力が混ざらないだけであり、表示順序はバラバラになります。順序が不要で、かつ個別の処理が独立している場合に最適なメソッドです。
