Disruptor - WorkerPool

Disruptor 的 EventHandler,Consumer 間會相互合作,會依序消費收到所有的資料。但有的場景我們可能會需要多個 Consumer 分攤處理收到的資料,這時可以採用 WorkerHandler。


像是我們可以撰寫下面這樣的 WorkerHandler:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
...
public class Data
{
public string Value { get; set; }
}

public class DataWorkHandler : IWorkHandler<Data >
{
public string Name { get; private set; }
public DataWorkHandler( string name)
{
this.Name = name;
}
public void OnEvent( Data @event)
{
Console.WriteLine( "Thread = {0}, Handler = {1}, Value = {2} ", Thread.CurrentThread.ManagedThreadId.ToString(), this.Name, @event.Value);
}
}
...


這邊建造多個 WorkerHandler,帶入 Disruptor 的 HandleEventsWithWorkerPool 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
...
var disruptor = new Disruptor.Dsl. Disruptor< Data>(() => new Data(), (int)Math .Pow(2, 10), TaskScheduler.Default, ProducerType.SINGLE, new YieldingWaitStrategy());

var workers = new IWorkHandler<Data>[]{
new DataWorkHandler("Handler1"),
new DataWorkHandler("Handler2"),
new DataWorkHandler("Handler3")};

disruptor.HandleEventsWithWorkerPool(workers);

var ringBuffer = disruptor.Start();

while (true)
{
var sequenceNo = ringBuffer.Next();
var data = ringBuffer[sequenceNo];
data.Value = sequenceNo.ToString();
ringBuffer.Publish(sequenceNo);
Thread.Sleep(250);
}

disruptor.Shutdown();
...


運作時可以看到收到的資料是被分攤處理的。