Disruptor - Getting started

要使用 Disruptor 必須先將套件加入專案中,透過 NuGet 將之載入即可:


套件載入後我們就可以開始來使用 Disruptor 了。首先,必須要撰寫 EventHandler,用來消費生產者所生產的資料。撰寫上很簡單,只要實作 IEventHandler 泛型介面即可,泛型介面要指定預期的資料型態,並在 OnEvent 方法中進行資料的處理。


像是如果要在收到資料時顯示一些相關的訊息在主控台上,我們就可像下面這樣撰寫 EventHandler:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
... 
public class Data {
public string Value { get; set; }
}
public class DataEventHandler : IEventHandler<Data>
{
public string Name { get; private set; }
public DataEventHandler(string name) {
this.Name = name;
}
public void OnEvent(Data data, long sequence, bool endOfBatch) {
Console.WriteLine("Thread = {0}, Handler = {1}, Sequence = {2}, Value = {3}", Thread.CurrentThread.ManagedThreadId.ToString(), this.Name, sequence, data.Value);
}
}
...


EventHandler 好了,接著就是撰寫 Producer 生產資料的部分以及資料怎樣在 Consumer 間流動。


這邊我們必須要了解到 Disruptor 有兩種不同的撰寫方式,一種是 DSL 寫法,一種是 Non-DSL 寫法。


DSL 的寫法比較簡潔,首先要告訴 Disruptor 怎樣初始 Ringbuffer 的每個元素,以及 Ringbuffer 的大小,建立出 Disruptor 的物件實體。接著要決定資料怎樣在 Consumer 間流動。再來要 Start Disruptor,取得 Ringbuffer,然後產生資料往 Ringbuffer 上塞。


程式寫起來就像下面這樣:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
... 
var disruptor = new Disruptor.Dsl.Disruptor<Data>(() => new Data(), (int)Math.Pow(2,4), TaskScheduler.Default);

disruptor.HandleEventsWith(new DataEventHandler("Handler1"));

var ringBuffer = disruptor.Start();
var sequenceNo = ringBuffer.Next();
var data = ringBuffer[sequenceNo];

data.Value = "Hello";
ringBuffer.Publish(sequenceNo);
sequenceNo = ringBuffer.Next();

data = ringBuffer[sequenceNo];
data.Value = "World";
ringBuffer.Publish(sequenceNo);

disruptor.Shutdown();
...



Non-DSL 寫起來相對複雜些,一樣要告訴 Disruptor 怎樣初始 Ringbuffer 的每個元素,以及 Ringbuffer 的大小,但是這邊改建立出 Ringbuffer,接著要用 EventProcessor 與 Barrier 去設定資料怎樣在 Consumer 間流動,然後要用非同步的方式調用 EventProcessor 的 Run 方法,再來就一樣是產生資料往 Ringbuffer 上塞。


程式寫起來就像下面這樣:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
... 
var ringBuffer = RingBuffer<Data>.CreateSingleProducer(() => new Data(), (int)Math.Pow(2, 4));
var barrier = ringBuffer.NewBarrier();
var eventProcessor = new BatchEventProcessor<Data>(ringBuffer, barrier, new DataEventHandler("Handler1"));

Task.Factory.StartNew(() => eventProcessor.Run());

var sequenceNo = ringBuffer.Next();
var data = ringBuffer[sequenceNo];
data.Value = "Hello";
ringBuffer.Publish(sequenceNo);

sequenceNo = ringBuffer.Next();
data = ringBuffer[sequenceNo];
data.Value = "World";
ringBuffer.Publish(sequenceNo);

eventProcessor.Halt();

Application.DoEvents();
...