Implementing Event Notifications with IObserver and IObservable in C#

In .NET Framework and .NET Core, System.IObservable<T> and System.IObserver<T> are the standard interfaces provided for implementing the Observer Pattern.

By using these interfaces, you can build a push-based notification system while keeping the data provider and the data receiver loosely coupled.

In this article, I will explain the standard implementation method using a scenario where a temperature sensor (provider) measures data and notifies a monitoring screen (observer).

目次

Implementation Key Points

When using standard .NET interfaces, the roles are divided as follows:

  • IObservable<T>: The Data Provider (Subject). It has a Subscribe method to accept observers.
  • IObserver<T>: The Data Receiver (Observer). It implements three methods: OnNext (receive value), OnError (handle error), and OnCompleted (notification finished).
  • IDisposable: Used to provide a mechanism for unsubscribing.

Complete Implementation Code

Below is a complete code example that implements a temperature sensor (provider) and a monitor (observer).

using System;
using System.Collections.Generic;
using System.Threading;

namespace ObserverPatternExample
{
    // Data class to be notified
    public class TemperatureData
    {
        public DateTime Timestamp { get; }
        public double Temperature { get; }

        public TemperatureData(double temperature)
        {
            Timestamp = DateTime.Now;
            Temperature = temperature;
        }
    }

    // Data Provider: Temperature Sensor
    // Implements IObservable<T>
    public class TemperatureSensor : IObservable<TemperatureData>
    {
        // List to hold registered observers
        private readonly List<IObserver<TemperatureData>> _observers;

        public TemperatureSensor()
        {
            _observers = new List<IObserver<TemperatureData>>();
        }

        // Called when an observer starts subscribing
        // Must return an IDisposable object for unsubscribing
        public IDisposable Subscribe(IObserver<TemperatureData> observer)
        {
            if (!_observers.Contains(observer))
            {
                _observers.Add(observer);
            }
            // Return a class that provides the unsubscribe function
            return new Unsubscriber(_observers, observer);
        }

        // Method to measure data and notify subscribers (Simulation)
        public void RunSensorSimulation()
        {
            var random = new Random();
            
            // Simulate 5 measurements
            for (int i = 0; i < 5; i++)
            {
                // Generate a random temperature between -10 and 40
                double temp = Math.Round(-10 + (random.NextDouble() * 50), 1);
                var data = new TemperatureData(temp);

                // Notify all observers of the new value
                foreach (var observer in _observers)
                {
                    observer.OnNext(data);
                }

                Thread.Sleep(500); // Wait 0.5 seconds
            }

            // Notify that measurement is complete
            foreach (var observer in _observers.ToArray())
            {
                if (_observers.Contains(observer))
                {
                    observer.OnCompleted();
                }
            }
            
            _observers.Clear();
        }

        // Method to notify when an error occurs
        public void ReportError()
        {
            var ex = new Exception("Sensor malfunction detected.");
            foreach (var observer in _observers)
            {
                observer.OnError(ex);
            }
        }

        // Internal class to manage unsubscribing
        private class Unsubscriber : IDisposable
        {
            private readonly List<IObserver<TemperatureData>> _observers;
            private readonly IObserver<TemperatureData> _observer;

            public Unsubscriber(List<IObserver<TemperatureData>> observers, IObserver<TemperatureData> observer)
            {
                _observers = observers;
                _observer = observer;
            }

            public void Dispose()
            {
                if (_observer != null && _observers.Contains(_observer))
                {
                    _observers.Remove(_observer);
                    Console.WriteLine(">> Unsubscribed.");
                }
            }
        }
    }

    // Data Receiver (Observer): Monitoring Screen
    // Implements IObserver<T>
    public class TemperatureMonitor : IObserver<TemperatureData>
    {
        private readonly string _name;
        private IDisposable _cancellation;

        public TemperatureMonitor(string name)
        {
            _name = name;
        }

        // Process to start subscribing
        public void Subscribe(TemperatureSensor provider)
        {
            _cancellation = provider.Subscribe(this);
            Console.WriteLine($"> [{_name}] Started monitoring sensor.");
        }

        // Process to stop subscribing
        public void Unsubscribe()
        {
            _cancellation?.Dispose();
        }

        // Process when new data is received
        public void OnNext(TemperatureData value)
        {
            Console.WriteLine($"[{_name}] Received: {value.Timestamp:HH:mm:ss} - Temp {value.Temperature}C");
        }

        // Process when an error occurs
        public void OnError(Exception error)
        {
            Console.WriteLine($"[{_name}] Error occurred: {error.Message}");
        }

        // Process when data transmission is complete
        public void OnCompleted()
        {
            Console.WriteLine($"[{_name}] Measurement completion notification received.");
        }
    }

    // Execution Class
    class Program
    {
        static void Main(string[] args)
        {
            // Instantiate Provider (Sensor)
            var sensor = new TemperatureSensor();

            // Instantiate Observers (Monitors)
            var monitor1 = new TemperatureMonitor("Monitor A");
            var monitor2 = new TemperatureMonitor("Monitor B");

            // Start Subscription
            monitor1.Subscribe(sensor);
            monitor2.Subscribe(sensor);

            Console.WriteLine("--- Measurement Start ---");
            
            // Run Sensor (Notifications occur)
            sensor.RunSensorSimulation();

            Console.WriteLine("--- Process End ---");
        }
    }
}

Code Explanation

1. Unsubscribing Mechanism (Unsubscriber)

The IObservable<T>.Subscribe method is designed to return IDisposable. This allows the subscriber to stop receiving notifications (unsubscribe) by calling the Dispose method on the returned object.

In the code above, we defined a private inner class called Unsubscriber inside the TemperatureSensor class. This class is responsible for removing the specific observer from the provider’s list when Dispose is called. This ensures that unsubscribing is done safely and correctly.

2. Notification Flow

When data is generated on the provider side, it loops through the list of observers and calls the OnNext method for each one. This passes the data synchronously to all registered monitors.

Additionally, when the series of data transmissions is finished, OnCompleted is called to inform the observers that no more data will arrive. This design allows the observer side to release resources or perform cleanup tasks at the appropriate time.

Summary

In this article, I explained how to implement the Observer Pattern using IObservable<T> and IObserver<T>.

The biggest advantage of using these standard interfaces is the ability to build a flexible notification system while keeping the provider and receiver loosely coupled. Specifically, since the Subscribe method returns IDisposable, the lifecycle management for unsubscribing follows a unified rule.

This pattern is also the foundational concept for Reactive Extensions (Rx), a powerful library for handling asynchronous stream processing. It is recommended to first understand the basic implementation with standard interfaces, and then consider introducing Rx when more advanced event processing is required.

よかったらシェアしてね!
  • URLをコピーしました!
  • URLをコピーしました!

この記事を書いた人

私が勉強したこと、実践したこと、してることを書いているブログです。
主に資産運用について書いていたのですが、
最近はプログラミングに興味があるので、今はそればっかりです。

目次