概要
Rxでは主に3つのインターフェースが登場する。
- IObserver
- IObservable
- IDisposable
IObserver
public interface IObserver<in T>
{
void OnNext(T value); //更新通知
void OnError(Exception error); //エラー通知
void OnCompleted(); //完了通知
}
監視するインターフェース。
各通知に対する処理を実装する。
IObservable
public interface IObservable<out T>
{
IDisposable Subscribe(IObserver<T> observer); //購読
}
監視されるインターフェース。
通知先であるObserverを登録するメソッドSubscribe()を実装する。
IDisposable
public interface IDisposable
{
void Dispose(); //購読解除
}
後片付けインターフェース。
RxではDispose()で購読解除を行う。
これ自体は汎用インターフェースであり、アンマネージリソース等が絡むパターンはテンプレ化しておいた方が良いですね。
実装例
エラーチェックや二重購読などの細かいところは省略。
//監視するクラス
public class MyObserver<T> : IObserver<T>
{
public void OnNext(T value)
{
Console.WriteLine($"OnNext:{value.ToString()}");
}
public void OnError(Exception error)
{
Console.WriteLine("OnError");
}
public void OnCompleted()
{
Console.WriteLine("OnCompleted");
}
}
//監視されるクラス
public class MyObservable : IObservable<int>
{
private int value;
public int Value
{
get => value;
set
{
if (value == this.value)
return;
this.value = value;
foreach (var o in Observers.ToList())
o.OnNext(value);
}
}
private IList<IObserver<int>> Observers { get; }
= new List<IObserver<int>>();
public IDisposable Subscribe(IObserver<int> observer)
{
Observers.Add(observer);
return new UnSubscriber(Observers, observer);
}
public void End()
{
foreach (var observer in Observers)
observer.OnCompleted();
Observers.Clear();
}
//購読解除クラス
private class UnSubscriber : IDisposable
{
private ICollection<IObserver<int>> _observers;
private IObserver<int> _observer;
public UnSubscriber(
ICollection<IObserver<int>> observers,
IObserver<int> observer)
{
_observers = observers;
_observer = observer;
}
public void Dispose()
{
_observers.Remove(_observer);
Console.WriteLine("Dispose");
}
}
}
お試し実行1
実行コード
var myObs = new MyObservable();
var dis = myObs.Subscribe(new MyObserver<int>());
myObs.Value = 10;
myObs.Value = 20;
dis.Dispose();
myObs.Value = 30;
結果
OnNext:10 OnNext:20 Dispose
お試し実行2
実行コード
var myObs = new MyObservable();
myObs.FirstAsync().Subscribe(new MyObserver<int>());
myObs.Take(3).Subscribe(new MyObserver<int>());
myObs.Value = 111;
myObs.Value = 2222;
myObs.Value = 33333;
myObs.Value = 444444;
結果
OnNext:111 OnCompleted Dispose OnNext:111 OnNext:2222 OnNext:33333 OnCompleted Dispose