Disruptor 的 EventHandler,Consumer 間會相互合作,會依序消費收到所有的資料。但有的場景我們可能會需要多個 Consumer 分攤處理收到的資料,這時可以採用 WorkerHandler。

像是我們可以撰寫下面這樣的 WorkerHandler:

...
public class Data
{
public string Value { get; set; }
}

public class DataWorkHandler : IWorkHandler
{
public string Name { get; private set; }
public DataWorkHandler( string name)
{
this.Name = name;
}
public void OnEvent( Data @event)
{
Console.WriteLine( "Thread = {0}, Handler = {1}, Value = {2} ", Thread.CurrentThread.ManagedThreadId.ToString(), this.Name, @event.Value);
}
}
...

這邊建造多個 WorkerHandler,帶入 Disruptor 的 HandleEventsWithWorkerPool 方法。

...
var disruptor = new Disruptor.Dsl. Disruptor(() => new Data(), (int)Math .Pow(2, 10), TaskScheduler.Default, ProducerType.SINGLE, new YieldingWaitStrategy());

var workers = new IWorkHandler[]{
new DataWorkHandler("Handler1"),
new DataWorkHandler("Handler2"),
new DataWorkHandler("Handler3")};

disruptor.HandleEventsWithWorkerPool(workers);

var ringBuffer = disruptor.Start();

while (true)
{
var sequenceNo = ringBuffer.Next();
var data = ringBuffer[sequenceNo];
data.Value = sequenceNo.ToString();
ringBuffer.Publish(sequenceNo);
Thread.Sleep(250);
}

disruptor.Shutdown();
...

運作時可以看到收到的資料是被分攤處理的。

{% img /images/posts/DisruptorWorkerPool/1.png %}