【C#】IObserverとIObservableを用いたイベント通知の実装

.NET Frameworkおよび.NET Core以降では、オブザーバーパターン(Observer Pattern)を実装するための標準インターフェースとして、System.IObservable<T>System.IObserver<T> が用意されています。これらを利用することで、データ発行元(プロバイダー)とデータ受信側(オブザーバー)を疎結合に保ちながら、プッシュ型の通知システムを構築することが可能です。

本記事では、気温センサーが温度データを計測し、それを監視モニターへ通知するというシナリオを用いて、標準的な実装方法を解説します。

目次

実装のポイント

.NETの標準インターフェースを利用する場合、以下の役割分担となります。

  • IObservable<T>: データの提供元(Subject)。Subscribe メソッドを持ち、オブザーバーを受け入れます。
  • IObserver<T>: データの受信者(Observer)。OnNext(値の受信)、OnError(エラー発生)、OnCompleted(完了)の3つのメソッドを実装します。
  • IDisposable: 購読解除(Unsubscribe)の仕組みを提供するために使用されます。

完全な実装コード

以下は、温度センサー(プロバイダー)とモニター(オブザーバー)を実装し、実際に動作させる完全なコード例です。

using System;
using System.Collections.Generic;
using System.Threading;

namespace ObserverPatternExample
{
    // 通知するデータクラス
    public class TemperatureData
    {
        public DateTime Timestamp { get; }
        public double Temperature { get; }

        public TemperatureData(double temperature)
        {
            Timestamp = DateTime.Now;
            Temperature = temperature;
        }
    }

    // データ提供元(プロバイダー):気温センサー
    // IObservable<T>を実装する
    public class TemperatureSensor : IObservable<TemperatureData>
    {
        // 登録されたオブザーバーを保持するリスト
        private readonly List<IObserver<TemperatureData>> _observers;

        public TemperatureSensor()
        {
            _observers = new List<IObserver<TemperatureData>>();
        }

        // オブザーバーが購読を開始する際に呼ばれる
        // 購読解除用のIDisposableオブジェクトを返す必要がある
        public IDisposable Subscribe(IObserver<TemperatureData> observer)
        {
            if (!_observers.Contains(observer))
            {
                _observers.Add(observer);
            }
            // 購読解除機能を提供するクラスを返す
            return new Unsubscriber(_observers, observer);
        }

        // データを計測し、登録者に通知するメソッド(シミュレーション用)
        public void RunSensorSimulation()
        {
            var random = new Random();
            
            // 5回計測を行うシミュレーション
            for (int i = 0; i < 5; i++)
            {
                // -10度から40度の範囲でランダムな温度を生成
                double temp = Math.Round(-10 + (random.NextDouble() * 50), 1);
                var data = new TemperatureData(temp);

                // 全てのオブザーバーに新しい値を通知
                foreach (var observer in _observers)
                {
                    observer.OnNext(data);
                }

                Thread.Sleep(500); // 0.5秒待機
            }

            // 計測終了を通知
            foreach (var observer in _observers.ToArray())
            {
                if (_observers.Contains(observer))
                {
                    observer.OnCompleted();
                }
            }
            
            _observers.Clear();
        }

        // 異常発生時の通知用
        public void ReportError()
        {
            var ex = new Exception("センサーの故障を検知しました。");
            foreach (var observer in _observers)
            {
                observer.OnError(ex);
            }
        }

        // 購読解除を管理する内部クラス
        private class Unsubscriber : IDisposable
        {
            private readonly List<IObserver<TemperatureData>> _observers;
            private readonly IObserver<TemperatureData> _observer;

            public Unsubscriber(List<IObserver<TemperatureData>> observers, IObserver<TemperatureData> observer)
            {
                _observers = observers;
                _observer = observer;
            }

            public void Dispose()
            {
                if (_observer != null && _observers.Contains(_observer))
                {
                    _observers.Remove(_observer);
                    Console.WriteLine(">> 購読が解除されました。");
                }
            }
        }
    }

    // データ受信側(オブザーバー):監視モニター
    // IObserver<T>を実装する
    public class TemperatureMonitor : IObserver<TemperatureData>
    {
        private readonly string _name;
        private IDisposable _cancellation;

        public TemperatureMonitor(string name)
        {
            _name = name;
        }

        // 購読開始処理
        public void Subscribe(TemperatureSensor provider)
        {
            _cancellation = provider.Subscribe(this);
            Console.WriteLine($"> [{_name}] センサーの監視を開始しました。");
        }

        // 購読停止処理
        public void Unsubscribe()
        {
            _cancellation?.Dispose();
        }

        // 新しいデータを受信した時の処理
        public void OnNext(TemperatureData value)
        {
            Console.WriteLine($"[{_name}] 受信: {value.Timestamp:HH:mm:ss} - 気温 {value.Temperature}℃");
        }

        // エラーが発生した時の処理
        public void OnError(Exception error)
        {
            Console.WriteLine($"[{_name}] エラー発生: {error.Message}");
        }

        // データ送信が完了した時の処理
        public void OnCompleted()
        {
            Console.WriteLine($"[{_name}] 計測終了通知を受信しました。");
        }
    }

    // 実行用クラス
    class Program
    {
        static void Main(string[] args)
        {
            // プロバイダー(センサー)のインスタンス化
            var sensor = new TemperatureSensor();

            // オブザーバー(モニター)のインスタンス化
            var monitor1 = new TemperatureMonitor("モニターA");
            var monitor2 = new TemperatureMonitor("モニターB");

            // 購読開始
            monitor1.Subscribe(sensor);
            monitor2.Subscribe(sensor);

            Console.WriteLine("--- 計測開始 ---");
            
            // センサー稼働(通知が発生)
            sensor.RunSensorSimulation();

            Console.WriteLine("--- 処理終了 ---");
        }
    }
}

コードの解説

1. 購読解除の仕組み (Unsubscriber)

IObservable<T>.Subscribe メソッドは IDisposable を返す仕様となっています。これは、購読者が通知を受け取るのを止めたい場合(購読解除)に、返されたオブジェクトの Dispose メソッドを呼び出すためです。

上記のコードでは、TemperatureSensor クラス内にプライベートな内部クラス Unsubscriber を定義しています。このクラスは、Dispose が呼ばれた際に、プロバイダーのリストから対象のオブザーバーを削除する責務を持ちます。これにより、安全かつ確実に購読解除を行うことができます。

2. 通知のフロー

プロバイダー側でデータが生成されると、保持しているオブザーバーリストをループし、各オブザーバーの OnNext メソッドを呼び出します。これにより、登録されている全てのモニターに対して同期的にデータが渡されます。

また、一連のデータ送信が終了した際には OnCompleted を呼び出し、これ以上データが来ないことをオブザーバーに伝えます。この設計により、オブザーバー側はリソースの解放や終了処理を適切なタイミングで行うことが可能になります。

まとめ

本記事では、IObservable<T>IObserver<T> を活用したオブザーバーパターンの実装方法について解説しました。

この標準インターフェースを利用する最大のメリットは、データ提供側と受信側を疎結合に保ちながら、柔軟な通知システムを構築できる点にあります。特に、Subscribe メソッドが IDisposable を返す設計になっているため、購読解除のライフサイクル管理が統一的なルールで行えることが大きな特徴です。

このパターンは、非同期ストリーム処理を扱う強力なライブラリである Reactive Extensions (Rx) の基礎となる概念でもあります。まずは標準インターフェースでの基本実装を理解し、より高度なイベント処理が必要になった段階で Rx の導入を検討すると良いでしょう。

よかったらシェアしてね!
  • URLをコピーしました!
  • URLをコピーしました!

この記事を書いた人

私が勉強したこと、実践したこと、してることを書いているブログです。
主に資産運用について書いていたのですが、
最近はプログラミングに興味があるので、今はそればっかりです。

目次