Rx/IObserverとIObservableとIDisposable

Last-modified: 2020-02-07 (金) 02:54:04

概要

Rxでは主に3つのインターフェースが登場する。

  • IObserver
  • IObservable
  • IDisposable

IObserver

public interface IObserver<in T>
{
    void OnNext(T value);          //更新通知
    void OnError(Exception error); //エラー通知
    void OnCompleted();            //完了通知
}

監視するインターフェース。
各通知に対する処理を実装する。

IObservable

public interface IObservable<out T>
{
    IDisposable Subscribe(IObserver<T> observer); //購読
}

監視されるインターフェース。
通知先であるObserverを登録するメソッドSubscribe()を実装する。

IDisposable

public interface IDisposable
{
    void Dispose(); //購読解除
}

後片付けインターフェース。
RxではDispose()で購読解除を行う。

これ自体は汎用インターフェースであり、アンマネージリソース等が絡むパターンはテンプレ化しておいた方が良いですね。

実装例

エラーチェックや二重購読などの細かいところは省略。

//監視するクラス
public class MyObserver<T> : IObserver<T>
{
    public void OnNext(T value)
    {
        Console.WriteLine($"OnNext:{value.ToString()}");
    }

    public void OnError(Exception error)
    {
        Console.WriteLine("OnError");
    }

    public void OnCompleted()
    {
        Console.WriteLine("OnCompleted");
    }
}
//監視されるクラス
public class MyObservable : IObservable<int>
{
    private int value;

    public int Value
    {
        get => value;
        set
        {
            if (value == this.value)
                return;
            this.value = value;
            foreach (var o in Observers.ToList())
                o.OnNext(value);
        }
    }

    private IList<IObserver<int>> Observers { get; }
        = new List<IObserver<int>>();

    public IDisposable Subscribe(IObserver<int> observer)
    {
        Observers.Add(observer);
        return new UnSubscriber(Observers, observer);
    }

    public void End()
    {
        foreach (var observer in Observers)
            observer.OnCompleted();

        Observers.Clear();
    }

    //購読解除クラス
    private class UnSubscriber : IDisposable
    {
        private ICollection<IObserver<int>> _observers;
        private IObserver<int> _observer;

        public UnSubscriber(
            ICollection<IObserver<int>> observers,
            IObserver<int> observer)
        {
            _observers = observers;
            _observer = observer;
        }

        public void Dispose()
        {
            _observers.Remove(_observer);
            Console.WriteLine("Dispose");
        }
    }
}

お試し実行1

実行コード

var myObs = new MyObservable();
var dis = myObs.Subscribe(new MyObserver<int>());
myObs.Value = 10;
myObs.Value = 20;
dis.Dispose();
myObs.Value = 30;

結果

OnNext:10
OnNext:20
Dispose

お試し実行2

実行コード

var myObs = new MyObservable();
myObs.FirstAsync().Subscribe(new MyObserver<int>());
myObs.Take(3).Subscribe(new MyObserver<int>());

myObs.Value = 111;
myObs.Value = 2222;
myObs.Value = 33333;
myObs.Value = 444444;

結果

OnNext:111
OnCompleted
Dispose
OnNext:111
OnNext:2222
OnNext:33333
OnCompleted
Dispose