springboot~disruptor異步隊(duì)列
Disruptor
Disruptor是英國(guó)外匯交易公司LMAX開(kāi)發(fā)的一個(gè)高性能隊(duì)列,研發(fā)的初衷是解決內(nèi)存隊(duì)列的延遲問(wèn)題(在性能測(cè)試中發(fā)現(xiàn)竟然與I/O操作處于同樣的數(shù)量級(jí))。
Java內(nèi)置隊(duì)列的問(wèn)題
介紹Disruptor之前,我們先來(lái)看一看常用的線(xiàn)程安全的內(nèi)置隊(duì)列有什么問(wèn)題。Java的內(nèi)置隊(duì)列如下表所示。

隊(duì)列的底層一般分成三種:數(shù)組、鏈表和堆。其中,堆一般情況下是為了實(shí)現(xiàn)帶有優(yōu)先級(jí)特性的隊(duì)列,暫且不考慮。
從數(shù)組和鏈表兩種數(shù)據(jù)結(jié)構(gòu)來(lái)看,基于數(shù)組線(xiàn)程安全的隊(duì)列,比較典型的是ArrayBlockingQueue,它主要通過(guò)加鎖的方式來(lái)保證線(xiàn)程安全;基于鏈表的線(xiàn)程安全隊(duì)列分成LinkedBlockingQueue和ConcurrentLinkedQueue兩大類(lèi),前者也通過(guò)鎖的方式來(lái)實(shí)現(xiàn)線(xiàn)程安全,而后者以及上面表格中的LinkedTransferQueue都是通過(guò)原子變量compare and swap(以下簡(jiǎn)稱(chēng)“CAS”)這種不加鎖的方式來(lái)實(shí)現(xiàn)的。
但是對(duì) volatile類(lèi)型的變量進(jìn)行 CAS 操作,存在偽共享問(wèn)題,下面介紹一下
偽共享
CPU的緩存系統(tǒng)是以緩存行(cache line)為單位存儲(chǔ)的,一般的大小為64bytes。在多線(xiàn)程程序的執(zhí)行過(guò)程中,存在著一種情況,多個(gè)需要頻繁修改的變量存在同一個(gè)緩存行當(dāng)中。
假設(shè):有兩個(gè)線(xiàn)程分別訪(fǎng)問(wèn)并修改X和Y這兩個(gè)變量,X和Y恰好在同一個(gè)緩存行上,這兩個(gè)線(xiàn)程分別在不同的CPU上執(zhí)行。那么每個(gè)CPU分別更新好X和Y時(shí)將緩存行刷入內(nèi)存時(shí),發(fā)現(xiàn)有別的修改了各自緩存行內(nèi)的數(shù)據(jù),這時(shí)緩存行會(huì)失效,從L3中重新獲取。這樣的話(huà),程序執(zhí)行效率明顯下降。為了減少這種情況的發(fā)生,其實(shí)就是避免X和Y在同一個(gè)緩存行中,可以主動(dòng)添加一些無(wú)關(guān)變量將緩存行填充滿(mǎn),比如在X對(duì)象中添加一些變量,讓它有64 Byte那么大,正好占滿(mǎn)一個(gè)緩存行。

偽共享問(wèn)題 的解決方案
簡(jiǎn)單的說(shuō),就是 以空間換時(shí)間: 使用占位字節(jié),將變量的所在的 緩沖行 塞滿(mǎn)。
disruptor 無(wú)鎖框架就是這么干的。
Disruptor框架是如何解決偽共享問(wèn)題的?
在Disruptor中有一個(gè)重要的類(lèi)Sequence,該類(lèi)包裝了一個(gè)volatile修飾的long類(lèi)型數(shù)據(jù)value,無(wú)論是Disruptor中的基于數(shù)組實(shí)現(xiàn)的緩沖區(qū)RingBuffer,還是生產(chǎn)者,消費(fèi)者,都有各自獨(dú)立的Sequence,RingBuffer緩沖區(qū)中,Sequence標(biāo)示著寫(xiě)入進(jìn)度,例如每次生產(chǎn)者要寫(xiě)入數(shù)據(jù)進(jìn)緩沖區(qū)時(shí),都要調(diào)用RingBuffer.next()來(lái)獲得下一個(gè)可使用的相對(duì)位置。對(duì)于生產(chǎn)者和消費(fèi)者來(lái)說(shuō),Sequence標(biāo)示著它們的事件序號(hào)。
例子
/**
* 停車(chē)場(chǎng)問(wèn)題.
* 1) 事件對(duì)象Event
* 2)三個(gè)消費(fèi)者Handler
* 3)一個(gè)生產(chǎn)者Processer
* 4)執(zhí)行Main方法
*/
public class DisruptorCar {
private static final Integer NUM = 1; // 1,10,100,1000
/**
* 測(cè)試入口 ,
* 一個(gè)生產(chǎn)者(汽車(chē)進(jìn)入停車(chē)場(chǎng));
* 三個(gè)消費(fèi)者(一個(gè)記錄汽車(chē)信息,一個(gè)發(fā)送消息給系統(tǒng),一個(gè)發(fā)送消息告知司機(jī))
* 前兩個(gè)消費(fèi)者同步執(zhí)行,都有結(jié)果了再執(zhí)行第三個(gè)消費(fèi)者
*/
@Test
public void main() throws InterruptedException {
long beginTime = System.currentTimeMillis();
int bufferSize = 2048; // 2的N次方
try {
// 創(chuàng)建線(xiàn)程池,負(fù)責(zé)處理Disruptor的四個(gè)消費(fèi)者
ExecutorService executor = Executors.newFixedThreadPool(4);
// 初始化一個(gè) Disruptor
Disruptor<MyInParkingDataEvent> disruptor = new Disruptor<MyInParkingDataEvent>(new EventFactory<MyInParkingDataEvent>() {
@Override
public MyInParkingDataEvent newInstance() {
return new MyInParkingDataEvent(); // Event 初始化工廠(chǎng)
}
}, bufferSize, executor, ProducerType.SINGLE, new YieldingWaitStrategy());
// 使用disruptor創(chuàng)建消費(fèi)者組 MyParkingDataInDbHandler 和 MyParkingDataToKafkaHandler
EventHandlerGroup<MyInParkingDataEvent> handlerGroup = disruptor.handleEventsWith(
new MyParkingDataInDbHandler(), new MyParkingDataToKafkaHandler());
// 當(dāng)上面兩個(gè)消費(fèi)者處理結(jié)束后在消耗 smsHandler
MyParkingDataSmsHandler myParkingDataSmsHandler = new MyParkingDataSmsHandler();
handlerGroup.then(myParkingDataSmsHandler);
// 啟動(dòng)Disruptor
disruptor.start();
CountDownLatch countDownLatch = new CountDownLatch(1); // 一個(gè)生產(chǎn)者線(xiàn)程準(zhǔn)備好了就可以通知主線(xiàn)程繼續(xù)工作了
// 生產(chǎn)者生成數(shù)據(jù)
executor.submit(new MyInParkingDataEventPublisher(countDownLatch, disruptor));
countDownLatch.await(); // 等待生產(chǎn)者結(jié)束
disruptor.shutdown();
executor.shutdown();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("總耗時(shí):" + (System.currentTimeMillis() - beginTime));
}
public class MyInParkingDataEvent {
private String carLicense; // 車(chē)牌號(hào)
public String getCarLicense() {
return carLicense;
}
public void setCarLicense(String carLicense) {
this.carLicense = carLicense;
}
}
/**
* Handler 第一個(gè)消費(fèi)者,負(fù)責(zé)保存進(jìn)場(chǎng)汽車(chē)的信息
*/
public class MyParkingDataInDbHandler implements EventHandler<MyInParkingDataEvent>, WorkHandler<MyInParkingDataEvent> {
@Override
public void onEvent(MyInParkingDataEvent myInParkingDataEvent) throws Exception {
long threadId = Thread.currentThread().getId(); // 獲取當(dāng)前線(xiàn)程id
String carLicense = myInParkingDataEvent.getCarLicense(); // 獲取車(chē)牌號(hào)
System.out.println(String.format("Thread Id %s 保存 %s 到數(shù)據(jù)庫(kù)中 ....", threadId, carLicense));
}
@Override
public void onEvent(MyInParkingDataEvent myInParkingDataEvent, long sequence, boolean endOfBatch)
throws Exception {
this.onEvent(myInParkingDataEvent);
}
}
/**
* 第二個(gè)消費(fèi)者,負(fù)責(zé)發(fā)送通知告知工作人員(Kafka是一種高吞吐量的分布式發(fā)布訂閱消息系統(tǒng))
*/
public class MyParkingDataToKafkaHandler implements EventHandler<MyInParkingDataEvent> {
@Override
public void onEvent(MyInParkingDataEvent myInParkingDataEvent, long sequence, boolean endOfBatch)
throws Exception {
long threadId = Thread.currentThread().getId(); // 獲取當(dāng)前線(xiàn)程id
String carLicense = myInParkingDataEvent.getCarLicense(); // 獲取車(chē)牌號(hào)
System.out.println(String.format("Thread Id %s 發(fā)送 %s 進(jìn)入停車(chē)場(chǎng)信息給 kafka系統(tǒng)...", threadId, carLicense));
}
}
/**
* 第三個(gè)消費(fèi)者,sms短信服務(wù),告知司機(jī)你已經(jīng)進(jìn)入停車(chē)場(chǎng),計(jì)費(fèi)開(kāi)始。
*/
public class MyParkingDataSmsHandler implements EventHandler<MyInParkingDataEvent> {
@Override
public void onEvent(MyInParkingDataEvent myInParkingDataEvent, long sequence, boolean endOfBatch)
throws Exception {
long threadId = Thread.currentThread().getId(); // 獲取當(dāng)前線(xiàn)程id
String carLicense = myInParkingDataEvent.getCarLicense(); // 獲取車(chē)牌號(hào)
System.out.println(String.format("Thread Id %s 給 %s 的車(chē)主發(fā)送一條短信,并告知他計(jì)費(fèi)開(kāi)始了 ....", threadId, carLicense));
}
}
/**
* 生產(chǎn)者,進(jìn)入停車(chē)場(chǎng)的車(chē)輛
*/
public class MyInParkingDataEventPublisher implements Runnable {
private CountDownLatch countDownLatch; // 用于監(jiān)聽(tīng)初始化操作,等初始化執(zhí)行完畢后,通知主線(xiàn)程繼續(xù)工作
private Disruptor<MyInParkingDataEvent> disruptor;
public MyInParkingDataEventPublisher(CountDownLatch countDownLatch,
Disruptor<MyInParkingDataEvent> disruptor) {
this.countDownLatch = countDownLatch;
this.disruptor = disruptor;
}
@Override
public void run() {
MyInParkingDataEventTranslator eventTranslator = new MyInParkingDataEventTranslator();
try {
for (int i = 0; i < NUM; i++) {
disruptor.publishEvent(eventTranslator);
Thread.sleep(1000); // 假設(shè)一秒鐘進(jìn)一輛車(chē)
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
countDownLatch.countDown(); // 執(zhí)行完畢后通知 await()方法
System.out.println(NUM + "輛車(chē)已經(jīng)全部進(jìn)入進(jìn)入停車(chē)場(chǎng)!");
}
}
}
class MyInParkingDataEventTranslator implements EventTranslator<MyInParkingDataEvent> {
@Override
public void translateTo(MyInParkingDataEvent myInParkingDataEvent, long sequence) {
this.generateData(myInParkingDataEvent);
}
private MyInParkingDataEvent generateData(MyInParkingDataEvent myInParkingDataEvent) {
myInParkingDataEvent.setCarLicense("車(chē)牌號(hào): 鄂A-" + (int) (Math.random() * 100000)); // 隨機(jī)生成一個(gè)車(chē)牌號(hào)
System.out.println("Thread Id " + Thread.currentThread().getId() + " 寫(xiě)完一個(gè)event");
return myInParkingDataEvent;
}
}
}
浙公網(wǎng)安備 33010602011771號(hào)