IObservable
IEnumerable & IEnumerator這種迭代器模式屬於拉出型通知的通用機制,也就是說資料需由消費者自行從提供者拉出,就像是我們在食堂點餐必需排隊取餐,取餐的速度取決於食堂生產的速度,只要還沒排到或還在準備,我們都只能站在隊伍中繼續等候,待拿到餐點才能離開對伍找位置坐下開動,而IObservable
IObservable
public interface IObservable<out T>
{
IDisposable Subscribe(IObserver<T> observer);
}
IObserver
public interface IObserver<in T>
{
void OnCompleted();
void OnError(Exception error);
void OnNext(T value);
}
看到這邊不知道大家有沒有發現,其實IObservable
這邊要注意一點,若是細看上面的IObservable
private class Unsubscriber : IDisposable
{
private List<IObserver<T>> m_Observers { get; set; }
private IObserver<T> m_Observer { get; set; }
public Unsubscriber(List<IObserver<T>> observers, IObserver<T> observer)
{
this.m_Observers = observers;
this.m_Observer = observer;
}
public void Dispose()
{
if (m_Observer != null && m_Observers.Contains(m_Observer))
{
m_Observers.Remove(m_Observer);
}
}
}
在訂閱時將取消訂閱用的物件實體回傳,供訂閱者取消訂閱用:
public IDisposable Subscribe(IObserver<T> observer)
{
if (!m_Observers.Contains(observer))
{
m_Observers.Add(observer);
}
return new Unsubscriber(m_Observers, observer);
}
這邊實作了一個較完整點的範例:
Observable.CS
using System;
using System.Collections.Generic;
public class Observable<T> : IObservable<T>
{
#region Class
private class Unsubscriber : IDisposable
{
private List<IObserver<T>> m_Observers { get; set; }
private IObserver<T> m_Observer { get; set; }
public Unsubscriber(List<IObserver<T>> observers, IObserver<T> observer)
{
this.m_Observers = observers;
this.m_Observer = observer;
}
public void Dispose()
{
if (m_Observer != null && m_Observers.Contains(m_Observer))
{
m_Observers.Remove(m_Observer);
}
}
}
#endregion
#region Enum
enum NotifyType
{
Next,
Completed,
Error
}
#endregion
#region Var
private List<IObserver<T>> _observers;
#endregion
#region Protected Property
protected List<IObserver<T>> m_Observers
{
get
{
if (_observers == null)
_observers = new List<IObserver<T>>();
return _observers;
}
}
#endregion
#region Private Method
private void NotifyError(Exception error)
{
foreach (IObserver<T> observer in m_Observers)
{
observer.OnError(error);
}
}
#endregion
#region Protected Method
protected void NotifyNext(T obj)
{
try
{
foreach (IObserver<T> observer in m_Observers)
{
observer.OnNext(obj);
}
}
catch (Exception e)
{
NotifyError(e);
}
}
protected void NotifyCompleted()
{
try
{
foreach (IObserver<T> observer in m_Observers)
{
observer.OnCompleted();
}
}
catch (Exception e)
{
NotifyError(e);
}
}
#endregion
#region Public Method
public IDisposable Subscribe(IObserver<T> observer)
{
if (!m_Observers.Contains(observer))
{
m_Observers.Add(observer);
}
return new Unsubscriber(m_Observers, observer);
}
#endregion
}
Program.CS
using System;
using System.Collections.Generic;
namespace ConsoleApplication5
{
class Program
{
static void Main(string[] args)
{
BlogViwer guest = new BlogViwer();
Blog levelUp = new Blog("Level Up", "Larry Nung");
levelUp.Subscribe(guest);
levelUp.AddArticle(".NET 4.0 New Feature - Environment.Is64BitProcess & Environment.Is64BitOperatingSystem", "...");
levelUp.AddArticle("LINQ to CSV library", "...");
levelUp.AddArticle("[C#][VB.NET]最大公因數 & 最小公倍數", "...");
}
}
class Blog : Observable<string>
{
List<string> _articles;
private List<string> Articles
{
get
{
if (_articles == null)
_articles = new List<string>();
return _articles;
}
}
public String Name { get; set; }
public String Owner { get; set; }
public Blog(String name, string owner)
{
this.Name = name;
this.Owner = owner;
}
public void AddArticle(string title, string content)
{
string article = title + Environment.NewLine + content;
Articles.Add(article);
NotifyNext(article);
}
}
class BlogViwer : IObserver<string>
{
public void OnCompleted()
{
Console.WriteLine("Completed...");
}
public void OnError(Exception error)
{
Console.WriteLine("Error...");
}
public void OnNext(string value)
{
Console.WriteLine(value);
Console.WriteLine(new string('"', 50));
}
}
}
執行結果如下:
![]()
Download
ObserverDemo.zip
Link
- IObserver(Of T) 介面
- IObservable(Of T) 介面
- IObserver and IObservable - A New addition to BCL