When processing data in parallel using PLINQ (AsParallel), if you consume the final results using a standard foreach loop, the parallel results are merged back into a single thread (the main thread). This returns the process to sequential execution, preventing you from maximizing the performance of the entire pipeline.
To execute every step—from data transformation to final output—in parallel, you should use the ForAll extension method.
Implementing Fully Parallel Processing with ForAll
ForAll is a method that allows multiple threads to execute actions simultaneously on the results of a parallel query.
The following sample code simulates a scenario where data received from many IoT sensors is analyzed and the results are output (saved) in parallel. Note that unlike a normal foreach, multiple threads call Console.WriteLine at the same time.
Sample Code
using System;
using System.Collections.Concurrent; // For ThreadSafe collections
using System.Linq;
using System.Threading;
public class Program
{
public static void Main()
{
// 1. Data Source: Information from many sensor devices
var sensorData = new[]
{
new { DeviceId = "SENS-001", Voltage = 12.5, Temperature = 45.2 },
new { DeviceId = "SENS-002", Voltage = 11.8, Temperature = 42.1 },
new { DeviceId = "SENS-003", Voltage = 12.0, Temperature = 48.5 },
new { DeviceId = "SENS-004", Voltage = 12.2, Temperature = 44.0 },
new { DeviceId = "SENS-005", Voltage = 11.5, Temperature = 39.8 },
new { DeviceId = "SENS-006", Voltage = 12.8, Temperature = 51.2 },
};
Console.WriteLine("--- Parallel Analysis and Report Output ---");
// 2. Analysis Pipeline with PLINQ
sensorData
.AsParallel() // Enable parallelization
.WithDegreeOfParallelism(3) // Run with 3 threads
.Select(sensor =>
{
// Data Analysis Phase (Parallel Execution)
double efficiency = (sensor.Temperature / sensor.Voltage) * 10.0;
string status = efficiency > 40.0 ? "Warning" : "Normal";
return new
{
sensor.DeviceId,
Efficiency = efficiency,
Status = status,
// Record which thread performed the calculation
ThreadId = Thread.CurrentThread.ManagedThreadId
};
})
// 3. Parallel Output Phase with ForAll
// Results are not merged; each thread executes the subsequent action directly
.ForAll(result =>
{
// This runs in parallel, so be careful when accessing resources requiring exclusive control.
// Here, console output is thread-safe (though order is random).
Console.WriteLine(
$"[{result.ThreadId}] ID:{result.DeviceId} " +
$"Eff:{result.Efficiency:F1} Status:{result.Status}"
);
});
}
}
Explanation and Technical Points
1. Critical Difference between foreach and ForAll
- foreach: The main thread retrieves data one by one from the PLINQ result buffer. This incurs a merge cost and the final processing becomes single-threaded.
- ForAll: Each thread processing the query executes the code inside the delegate directly without merging. This maintains parallelism until the very end.
2. Improved Processing Speed
If you do not need to “aggregate” data (e.g., writing to separate files, inserting into a database individually, or sending requests to an API), using ForAll reduces the overhead of merging and improves throughput.
3. Consideration for Thread Safety
Since the processing inside ForAll runs completely in parallel, you must be careful about race conditions.
- Bad Example: Calling
List<T>.Add()insideForAll(sinceListis not thread-safe, it will break). - Good Example: Using
ConcurrentBag<T>.Add()or usinglockfor exclusive control (though locking reduces the benefits of parallelism).
While console output is synchronized by the OS (lines won’t be mixed up), the display order will be random. This method is best suited for scenarios where order does not matter and individual tasks are independent.
