C#使用IObservable和IObserver实现观察者模式

被观察者 IObservable<string>

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Utils
{
/// <summary>
/// 消息- 被观察者,提供订阅接口
/// </summary>
public class MsgTracker : IObservable<string>
{
public MsgTracker()
{
observers = new List<IObserver<string>>();
}

private List<IObserver<string>> observers;

public IDisposable Subscribe(IObserver<string> observer)
{
if (!observers.Contains(observer))
observers.Add(observer);
return new Unsubscriber(observers, observer);
}

// 用于取消订阅通知的IDisposable对象的实现
private class Unsubscriber : IDisposable
{
private List<IObserver<string>> _observers;
private IObserver<string> _observer;

public Unsubscriber(List<IObserver<string>> observers, IObserver<string> observer)
{
this._observers = observers;
this._observer = observer;
}

public void Dispose()
{
if (_observer != null && _observers.Contains(_observer))
_observers.Remove(_observer);
}
}

public void TrackMsg(string msg)
{
foreach (var observer in observers)
{
if (string.IsNullOrWhiteSpace(msg))
observer.OnError(new MsgUnknownException());
else
observer.OnNext(msg);
}
}
public void EndMsg()
{
foreach (var observer in observers.ToArray())
if (observers.Contains(observer))
observer.OnCompleted();

observers.Clear();
}

}
public class MsgUnknownException : Exception
{
internal MsgUnknownException()
{ }
}
}

观察者 IObserver<string>

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Utils
{
/// <summary>
/// 消息发布者
/// </summary>
public class MsgReporter : IObserver<string>
{
private IDisposable unsubscriber;
private Action<string> _next;

public MsgReporter(Action<string> next)
{
this._next = next;
}

public virtual void Subscribe(IObservable<string> provider)
{
if (provider != null)
unsubscriber = provider.Subscribe(this);
}
// 取消订阅
public virtual void Unsubscribe()
{
unsubscriber.Dispose();
}

public virtual void OnCompleted()
{
this.Unsubscribe();
}

public virtual void OnError(Exception e)
{
}

public virtual void OnNext(string value)
{
_next(value);
}
}
}

使用

1
2
3
4
5
6
7
8
9
MsgTracker msgTracker = new MsgTracker();

MsgReporter reporter1 = new MsgReporter(msg =>
{
this.richTextBox1.AppendText(msg);
});

msgTracker.Subscribe(reporter1);
msgTracker.TrackMsg($"时间:{DateTime.Now:HH:mm:ss}-->{str}");

参考:

IObservable 接口