Below you will find pages that utilize the taxonomy term “Disruptor”
Posts
Disruptor - WorkerPool
Disruptor 的 EventHandler,Consumer 間會相互合作,會依序消費收到所有的資料。但有的場景我們可能會需要多個 Consumer 分攤處理收到的資料,這時可以採用 WorkerHandler。
像是我們可以撰寫下面這樣的 WorkerHandler:
... public class Data { public string Value { get; set; } } public class DataWorkHandler : IWorkHandler<Data > { public string Name { get; private set; } public DataWorkHandler( string name) { this.Name = name; } public void OnEvent( Data @event) { Console.WriteLine( "Thread = {0}, Handler = {1}, Value = {2} ", Thread.CurrentThread.ManagedThreadId.ToString(), this.Name, @event.Value); } } ... 這邊建造多個 WorkerHandler,帶入 Disruptor 的 HandleEventsWithWorkerPool 方法。
read morePosts
Disruptor - Consumer's WaitStrategy
Disruptor 內建幾種等待策略,可用以設定消費者怎樣等待生產者的資料。在實務上,我們可能要針對不同的產品特性下去調整等待的策略。
預設的等待策略為 BlockingWaitStrategy,內部是用 Lock 與條件變數下去實作,用於對低延遲與高產能不是那麼重視的情境。雖然產能不高,但能確保在不同的環境下有一致的性能表現。
這邊筆者做了簡單的測試,在 Unicast 1p-1c 的狀況下瘋狂寫入,每秒約處理 145 筆資料。
{% img /images/posts/DisruptorWaitStrategy/1.png %}
如果讓 Producer 不要那麼頻繁的產生資料,那會看到消費者在等待的同時 CPU 會忙於等待。
{% img /images/posts/DisruptorWaitStrategy/2.png %}
SleepingWaitStrategy 策略則是用迴圈下去睡眠等待。可以看到這邊的實驗,每秒處理可達到約 18xxxx 筆資料。
{% img /images/posts/DisruptorWaitStrategy/3.png %}
且等待生產者資料時 CPU 耗費也不高。
{% img /images/posts/DisruptorWaitStrategy/4.png %}
BusySpinWaitStrategy 策略套到同樣的實驗,每秒處理約 144 筆資料。
{% img /images/posts/DisruptorWaitStrategy/5.png %}
等待資料時 CPU 也是飆高狀態。
{% img /images/posts/DisruptorWaitStrategy/6.png %}
YieldingWaitStrategy 策略套到同樣的實驗,每秒處理約 24xxxx 筆資料.
{% img /images/posts/DisruptorWaitStrategy/7.png %}
等待資料時 CPU 依舊飆高。
{% img /images/posts/DisruptorWaitStrategy/8.png %}
read morePosts
Disruptor - High Performance Inter-Thread Messaging Library
Disruptor 是 LMAX 提出的高效線程通信套件,能夠以很低的延遲產生很大量的吞吐量,實務上 LMAX 藉此得以在一個線程裡每秒處理6百萬訂單。
高效線程通信套件這個詞彙如果太抽象,我們也可以將之視為高效低延遲的生產者與消費者模式框架,使用這個框架可以很容易的套用生產者與消費者模式,且整個模式的工作流程可輕易的設定與調動。
Disruptor 之所以能有如此優異的效能,是因為裡面運用了很多技術在。像是使用 Compare and swap (CAS) 去避開鎖的開銷,使用 memory barrier 去控制執行緒間的運行順序,使用 Cache line padding 避免 false sharing 的問題,以及預先將記憶體配置並重用以避免不必要的 GC 回收。
Disruptor 最經典的使用架構就是 LMAX 所提出的架構,分為三個部分,一個是輸入的 Disruptor,一個是商業邏輯處理的部份,最後是輸出的 Disruptor。
{% img /images/posts/Disruptor/1.png %}
輸入的 Disruptor 這邊,會先將進來的事件存起來,如果發生什麼問題,可以讓他重新運轉。接著會將事件傳送給其他 Slave 節點(LMAX 這邊是用 IP Multicasting 去同步所有 Slave 節點),以實現 HA 架構。最後是拆解任務與資料,讓後續商業邏輯的處理比較容易。
{% img /images/posts/Disruptor/2.png %}
商業邏輯處理這邊要注意的就是要盡可能的快速,所以需要的資料都放置在記憶體中是最好的,且要避免 IO 的處理。
輸出 Disruptor 這邊,則負責將資料合併,導到該去的地方。
整個架構就像下面這樣:
{% img /images/posts/Disruptor/3.png %}
Link Disruptor by LMAX-Exchange
read morePosts
Disruptor - Diamond: 1P – 3C
使用 Disruptor 時我們必須決定資料要怎樣在 Consumer 間流動,這有些常用的 Pattern 可供參考,只要熟悉這些 Pattern 那 Consumer 間有多複雜的協作應該都不是問題。
這邊要介紹的是 Diamond: 1P – 3C,一個 Producer 負責生產資料,兩個 Consumer 同時消費資料後,再交由第三個 Consumer 處理。其依賴關係圖會像這樣:
{% img /images/posts/DisruptorDiamond1P3C/1.png %}
可以進一步簡化成下面這樣:
{% img /images/posts/DisruptorDiamond1P3C/2.png %}
透過 DSL 的方式撰寫,只要同時將兩個 EventHandler 帶入 HandleEventWith,再用 Then 方法串接第三個 EventHandler 即可,像是下面這樣:
... var disruptor = new Disruptor.Dsl.Disruptor<Data>(() => new Data(), (int)Math.Pow(2,4), TaskScheduler.Default); disruptor.HandleEventsWith(new DataEventHandler("Handler1"), new DataEventHandler("Handler2")).Then(new DataEventHandler("Handler3")); var ringBuffer = disruptor.Start(); ... disruptor.Shutdown(); ... 是改用 Non-DSL 撰寫的話,本來的依賴關係圖形就會變成下面這樣:
{% img /images/posts/DisruptorDiamond1P3C/3.png %}
read morePosts
Disruptor - Multicast: 1P – 3C
使用 Disruptor 時我們必須決定資料要怎樣在 Consumer 間流動,這有些常用的 Pattern 可供參考,只要熟悉這些 Pattern 那 Consumer 間有多複雜的協作應該都不是問題。
這邊要介紹的是 Multicast: 1P – 3C,一個 Producer 負責生產資料,三個 Consumer 同時消費資料。其依賴關係圖會像這樣:
{% img /images/posts/DisruptorMulticast1P3C/1.png %}
可進一步簡化成下面這樣:
{% img /images/posts/DisruptorMulticast1P3C/2.png %}
透過 DSL 的方式撰寫,只要同時將三個 EventHandler 帶入 HandleEventWith 即可,像是下面這樣:
... var disruptor = new Disruptor.Dsl.Disruptor<Data>(() => new Data(), (int)Math.Pow(2,4), TaskScheduler.Default); disruptor.HandleEventsWith(new DataEventHandler("Handler1"), new DataEventHandler("Handler2"), new DataEventHandler("Handler3")); var ringBuffer = disruptor.Start(); ... disruptor.Shutdown(); … 是改用 Non-DSL 撰寫的話,本來的依賴關係圖形就會變成下面這樣:
{% img /images/posts/DisruptorMulticast1P3C/3.png %}
用程式來寫,就是三個 EventProcessor 共用同一個 Barrier。
read morePosts
Disruptor - Three Step Pipeline: 1P – 3C
使用 Disruptor 時我們必須決定資料要怎樣在 Consumer 間流動,這有些常用的 Pattern 可供參考,只要熟悉這些 Pattern 那 Consumer 間有多複雜的協作應該都不是問題。
這邊要介紹的是 Three Step Pipeline: 1P – 3C,一個 Producer 負責生產資料,三個 Consumer 接續消費資料。簡單說,這就是多執行緒程式常見的流水線 Pattern。其依賴關係圖會像這樣:
{% img /images/posts/DisruptorStepPipeline1P3C/1.png %}
可進一步簡化成下面這樣:
{% img /images/posts/DisruptorStepPipeline1P3C/2.png %}
透過 DSL 的方式撰寫,就是用 Then 去串接後續的 EventHandler,像是下面這樣:
... var disruptor = new Disruptor.Dsl.Disruptor<Data>(() => new Data(), (int)Math.Pow(2,4), TaskScheduler.Default); disruptor.HandleEventsWith(new DataEventHandler("Handler1")) .Then(new DataEventHandler("Handler2")) .Then(new DataEventHandler("Handler3")); var ringBuffer = disruptor.Start(); ... disruptor.Shutdown(); … 若是改用 Non-DSL 撰寫的話,依賴關係圖形就會變成下面這樣:
{% img /images/posts/DisruptorStepPipeline1P3C/3.png %}
用程式來寫,就是要建立一個 Barrier 將之帶入並建立 EventProcessor,接著將第一個 EventProcessor 的 Sequence 帶入建立出第二個 Barrier,再用第二個 Barrier 建立第二個 EventProcessor,最後用第二個 EventProcessor 的 Sequence 建立出第三個 Barrier,用第三個 Barrier 去建立第三個 EventProcessor 即可。
read morePosts
Disruptor - Unicast: 1P - 1C
使用 Disruptor 時我們必須決定資料要怎樣在 Consumer 間流動,這有些常用的 Pattern 可供參考,只要熟悉這些 Pattern 那 Consumer 間有多複雜的協作應該都不是問題。
最簡單的 Pattern 就是 Unicast: 1P - 1C,一個 Producer 負責生產資料,一個 Consumer 負責消費資料。依賴關係圖會像這樣:
{% img /images/posts/DisruptorUnicast1P1C/1.png %}
可以將之簡化成下面這樣:
{% img /images/posts/DisruptorUnicast1P1C/2.png %}
透過 DSL 的方式撰寫的話會像下面這樣:
... using Disruptor; namespace ConsoleApplication29 { … class Program { static void Main( string[] args) { var disruptor = new Disruptor.Dsl. Disruptor<Data>(() => new Data(), (int)Math .Pow(2,4), TaskScheduler.Default); disruptor.HandleEventsWith(new DataEventHandler("Handler1”)); var ringBuffer = disruptor.Start(); var idx = 0; while ( true) { var sequenceNo = ringBuffer.
read morePosts
Disruptor - Getting started
要使用 Disruptor 必須先將套件加入專案中,透過 NuGet 將之載入即可:
{% img /images/posts/DisruptorGettingStarted/1.png %}
套件載入後我們就可以開始來使用 Disruptor 了。首先,必須要撰寫 EventHandler,用來消費生產者所生產的資料。撰寫上很簡單,只要實作 IEventHandler 泛型介面即可,泛型介面要指定預期的資料型態,並在 OnEvent 方法中進行資料的處理。
像是如果要在收到資料時顯示一些相關的訊息在主控台上,我們就可像下面這樣撰寫 EventHandler:
... public class Data { public string Value { get; set; } } public class DataEventHandler : IEventHandler<Data> { public string Name { get; private set; } public DataEventHandler(string name) { this.Name = name; } public void OnEvent(Data data, long sequence, bool endOfBatch) { Console.WriteLine("Thread = {0}, Handler = {1}, Sequence = {2}, Value = {3}", Thread.
read morePosts
Disruptor - Ringbuffer
Ringbuffer 是 Disruptor 的核心部分,使用 Disruptor 一定會圍繞著 Ringbuffer,Producer 會往 Ringbuffer 塞資料,Consumer 會從 RingBuffer 消費資料,且必須要觀察 Ringbuffer 的使用狀況並視情況調整架構或是其大小。所以使用 Disruptor 必須要對 Ringbuffer 有一定的認知。
{% img /images/posts/DisruptorRingbuffer/1.png %}
Ringbuffer 是一頭尾串接的環形陣列,資料一邊循序的存放,一邊循序的消耗。就像下面這樣:
{% img /images/posts/DisruptorRingbuffer/2.gif %}
存放在內部的資料都會有一個對應的編號,用 sequence % buffer size 就可以知道資料存放在陣列的哪個位置,但 Disruptor Ringbuffer 這邊是用 Sequence & (buffer size - 1) 的方式去做,以取得較佳的效能,但 buffer size 必須為 2 ^ n。
這邊簡單的做個測試…
usingSystem; usingSystem.Diagnostics; namespaceConsoleApplication25 { classProgram { staticvoidMain(string[] args) { varcounts = newint[] { 10, 100, 1000, 10000, 100000, 1000000, 10000000, 100000000, 1000000000 }; foreach(varcountincounts) { Console.
read more