In .NET Framework and .NET Core, System.IObservable<T> and System.IObserver<T> are the standard interfaces provided for implementing the Observer Pattern.
By using these interfaces, you can build a push-based notification system while keeping the data provider and the data receiver loosely coupled.
In this article, I will explain the standard implementation method using a scenario where a temperature sensor (provider) measures data and notifies a monitoring screen (observer).
Implementation Key Points
When using standard .NET interfaces, the roles are divided as follows:
- IObservable<T>: The Data Provider (Subject). It has a
Subscribemethod to accept observers. - IObserver<T>: The Data Receiver (Observer). It implements three methods:
OnNext(receive value),OnError(handle error), andOnCompleted(notification finished). - IDisposable: Used to provide a mechanism for unsubscribing.
Complete Implementation Code
Below is a complete code example that implements a temperature sensor (provider) and a monitor (observer).
using System;
using System.Collections.Generic;
using System.Threading;
namespace ObserverPatternExample
{
// Data class to be notified
public class TemperatureData
{
public DateTime Timestamp { get; }
public double Temperature { get; }
public TemperatureData(double temperature)
{
Timestamp = DateTime.Now;
Temperature = temperature;
}
}
// Data Provider: Temperature Sensor
// Implements IObservable<T>
public class TemperatureSensor : IObservable<TemperatureData>
{
// List to hold registered observers
private readonly List<IObserver<TemperatureData>> _observers;
public TemperatureSensor()
{
_observers = new List<IObserver<TemperatureData>>();
}
// Called when an observer starts subscribing
// Must return an IDisposable object for unsubscribing
public IDisposable Subscribe(IObserver<TemperatureData> observer)
{
if (!_observers.Contains(observer))
{
_observers.Add(observer);
}
// Return a class that provides the unsubscribe function
return new Unsubscriber(_observers, observer);
}
// Method to measure data and notify subscribers (Simulation)
public void RunSensorSimulation()
{
var random = new Random();
// Simulate 5 measurements
for (int i = 0; i < 5; i++)
{
// Generate a random temperature between -10 and 40
double temp = Math.Round(-10 + (random.NextDouble() * 50), 1);
var data = new TemperatureData(temp);
// Notify all observers of the new value
foreach (var observer in _observers)
{
observer.OnNext(data);
}
Thread.Sleep(500); // Wait 0.5 seconds
}
// Notify that measurement is complete
foreach (var observer in _observers.ToArray())
{
if (_observers.Contains(observer))
{
observer.OnCompleted();
}
}
_observers.Clear();
}
// Method to notify when an error occurs
public void ReportError()
{
var ex = new Exception("Sensor malfunction detected.");
foreach (var observer in _observers)
{
observer.OnError(ex);
}
}
// Internal class to manage unsubscribing
private class Unsubscriber : IDisposable
{
private readonly List<IObserver<TemperatureData>> _observers;
private readonly IObserver<TemperatureData> _observer;
public Unsubscriber(List<IObserver<TemperatureData>> observers, IObserver<TemperatureData> observer)
{
_observers = observers;
_observer = observer;
}
public void Dispose()
{
if (_observer != null && _observers.Contains(_observer))
{
_observers.Remove(_observer);
Console.WriteLine(">> Unsubscribed.");
}
}
}
}
// Data Receiver (Observer): Monitoring Screen
// Implements IObserver<T>
public class TemperatureMonitor : IObserver<TemperatureData>
{
private readonly string _name;
private IDisposable _cancellation;
public TemperatureMonitor(string name)
{
_name = name;
}
// Process to start subscribing
public void Subscribe(TemperatureSensor provider)
{
_cancellation = provider.Subscribe(this);
Console.WriteLine($"> [{_name}] Started monitoring sensor.");
}
// Process to stop subscribing
public void Unsubscribe()
{
_cancellation?.Dispose();
}
// Process when new data is received
public void OnNext(TemperatureData value)
{
Console.WriteLine($"[{_name}] Received: {value.Timestamp:HH:mm:ss} - Temp {value.Temperature}C");
}
// Process when an error occurs
public void OnError(Exception error)
{
Console.WriteLine($"[{_name}] Error occurred: {error.Message}");
}
// Process when data transmission is complete
public void OnCompleted()
{
Console.WriteLine($"[{_name}] Measurement completion notification received.");
}
}
// Execution Class
class Program
{
static void Main(string[] args)
{
// Instantiate Provider (Sensor)
var sensor = new TemperatureSensor();
// Instantiate Observers (Monitors)
var monitor1 = new TemperatureMonitor("Monitor A");
var monitor2 = new TemperatureMonitor("Monitor B");
// Start Subscription
monitor1.Subscribe(sensor);
monitor2.Subscribe(sensor);
Console.WriteLine("--- Measurement Start ---");
// Run Sensor (Notifications occur)
sensor.RunSensorSimulation();
Console.WriteLine("--- Process End ---");
}
}
}
Code Explanation
1. Unsubscribing Mechanism (Unsubscriber)
The IObservable<T>.Subscribe method is designed to return IDisposable. This allows the subscriber to stop receiving notifications (unsubscribe) by calling the Dispose method on the returned object.
In the code above, we defined a private inner class called Unsubscriber inside the TemperatureSensor class. This class is responsible for removing the specific observer from the provider’s list when Dispose is called. This ensures that unsubscribing is done safely and correctly.
2. Notification Flow
When data is generated on the provider side, it loops through the list of observers and calls the OnNext method for each one. This passes the data synchronously to all registered monitors.
Additionally, when the series of data transmissions is finished, OnCompleted is called to inform the observers that no more data will arrive. This design allows the observer side to release resources or perform cleanup tasks at the appropriate time.
Summary
In this article, I explained how to implement the Observer Pattern using IObservable<T> and IObserver<T>.
The biggest advantage of using these standard interfaces is the ability to build a flexible notification system while keeping the provider and receiver loosely coupled. Specifically, since the Subscribe method returns IDisposable, the lifecycle management for unsubscribing follows a unified rule.
This pattern is also the foundational concept for Reactive Extensions (Rx), a powerful library for handling asynchronous stream processing. It is recommended to first understand the basic implementation with standard interfaces, and then consider introducing Rx when more advanced event processing is required.
