Disruptor - WorkerPool
Disruptor 的 EventHandler,Consumer 間會相互合作,會依序消費收到所有的資料。但有的場景我們可能會需要多個 Consumer 分攤處理收到的資料,這時可以採用 WorkerHandler。
像是我們可以撰寫下面這樣的 WorkerHandler:
...
public class Data
{
public string Value { get; set; }
}
public class DataWorkHandler : IWorkHandler<Data >
{
public string Name { get; private set; }
public DataWorkHandler( string name)
{
this.Name = name;
}
public void OnEvent( Data @event)
{
Console.WriteLine( "Thread = {0}, Handler = {1}, Value = {2} ", Thread.CurrentThread.ManagedThreadId.ToString(), this.Name, @event.Value);
}
}
...
這邊建造多個 WorkerHandler,帶入 Disruptor 的 HandleEventsWithWorkerPool 方法。
...
var disruptor = new Disruptor.Dsl. Disruptor< Data>(() => new Data(), (int)Math .Pow(2, 10), TaskScheduler.Default, ProducerType.SINGLE, new YieldingWaitStrategy());
var workers = new IWorkHandler<Data>[]{
new DataWorkHandler("Handler1"),
new DataWorkHandler("Handler2"),
new DataWorkHandler("Handler3")};
disruptor.HandleEventsWithWorkerPool(workers);
var ringBuffer = disruptor.Start();
while (true)
{
var sequenceNo = ringBuffer.Next();
var data = ringBuffer[sequenceNo];
data.Value = sequenceNo.ToString();
ringBuffer.Publish(sequenceNo);
Thread.Sleep(250);
}
disruptor.Shutdown();
...
運作時可以看到收到的資料是被分攤處理的。
{% img /images/posts/DisruptorWorkerPool/1.png %}