[C#] Fully Parallel Processing with PLINQ’s ForAll Method

When processing data in parallel using PLINQ (AsParallel), if you consume the final results using a standard foreach loop, the parallel results are merged back into a single thread (the main thread). This returns the process to sequential execution, preventing you from maximizing the performance of the entire pipeline.

To execute every step—from data transformation to final output—in parallel, you should use the ForAll extension method.

目次

Implementing Fully Parallel Processing with ForAll

ForAll is a method that allows multiple threads to execute actions simultaneously on the results of a parallel query.

The following sample code simulates a scenario where data received from many IoT sensors is analyzed and the results are output (saved) in parallel. Note that unlike a normal foreach, multiple threads call Console.WriteLine at the same time.

Sample Code

using System;
using System.Collections.Concurrent; // For ThreadSafe collections
using System.Linq;
using System.Threading;

public class Program
{
    public static void Main()
    {
        // 1. Data Source: Information from many sensor devices
        var sensorData = new[]
        {
            new { DeviceId = "SENS-001", Voltage = 12.5, Temperature = 45.2 },
            new { DeviceId = "SENS-002", Voltage = 11.8, Temperature = 42.1 },
            new { DeviceId = "SENS-003", Voltage = 12.0, Temperature = 48.5 },
            new { DeviceId = "SENS-004", Voltage = 12.2, Temperature = 44.0 },
            new { DeviceId = "SENS-005", Voltage = 11.5, Temperature = 39.8 },
            new { DeviceId = "SENS-006", Voltage = 12.8, Temperature = 51.2 },
        };

        Console.WriteLine("--- Parallel Analysis and Report Output ---");

        // 2. Analysis Pipeline with PLINQ
        sensorData
            .AsParallel()                 // Enable parallelization
            .WithDegreeOfParallelism(3)   // Run with 3 threads
            .Select(sensor =>
            {
                // Data Analysis Phase (Parallel Execution)
                double efficiency = (sensor.Temperature / sensor.Voltage) * 10.0;
                string status = efficiency > 40.0 ? "Warning" : "Normal";
                
                return new
                {
                    sensor.DeviceId,
                    Efficiency = efficiency,
                    Status = status,
                    // Record which thread performed the calculation
                    ThreadId = Thread.CurrentThread.ManagedThreadId
                };
            })
            // 3. Parallel Output Phase with ForAll
            // Results are not merged; each thread executes the subsequent action directly
            .ForAll(result => 
            {
                // This runs in parallel, so be careful when accessing resources requiring exclusive control.
                // Here, console output is thread-safe (though order is random).
                Console.WriteLine(
                    $"[{result.ThreadId}] ID:{result.DeviceId} " +
                    $"Eff:{result.Efficiency:F1} Status:{result.Status}"
                );
            });
    }
}

Explanation and Technical Points

1. Critical Difference between foreach and ForAll

  • foreach: The main thread retrieves data one by one from the PLINQ result buffer. This incurs a merge cost and the final processing becomes single-threaded.
  • ForAll: Each thread processing the query executes the code inside the delegate directly without merging. This maintains parallelism until the very end.

2. Improved Processing Speed

If you do not need to “aggregate” data (e.g., writing to separate files, inserting into a database individually, or sending requests to an API), using ForAll reduces the overhead of merging and improves throughput.

3. Consideration for Thread Safety

Since the processing inside ForAll runs completely in parallel, you must be careful about race conditions.

  • Bad Example: Calling List<T>.Add() inside ForAll (since List is not thread-safe, it will break).
  • Good Example: Using ConcurrentBag<T>.Add() or using lock for exclusive control (though locking reduces the benefits of parallelism).

While console output is synchronized by the OS (lines won’t be mixed up), the display order will be random. This method is best suited for scenarios where order does not matter and individual tasks are independent.

よかったらシェアしてね!
  • URLをコピーしました!
  • URLをコピーしました!

この記事を書いた人

私が勉強したこと、実践したこと、してることを書いているブログです。
主に資産運用について書いていたのですが、
最近はプログラミングに興味があるので、今はそればっかりです。

目次