Laravel11 從0開發(fā) Swoole-Reverb 擴(kuò)展包(四) - 觸發(fā)一個(gè)廣播事件到reverb服務(wù)之后是如何轉(zhuǎn)發(fā)給前端訂閱的呢(上)?
前情提要
我們在上一節(jié)分析了觸發(fā)廣播事件發(fā)送到reverb服務(wù)的過程,這一節(jié)我們就來分析,reverb的服務(wù)啟動(dòng)過程。在看源碼之前,我們先說明一點(diǎn),reverb混響服務(wù)(ws響應(yīng)+http響應(yīng))是基于reactPHP實(shí)現(xiàn)的單線程+event loop(事件循環(huán))。同時(shí)保持嚴(yán)謹(jǐn)和自我學(xué)習(xí),我也會(huì)把一些重要的概念通過ai接收后寫到文章內(nèi),但也不會(huì)太多了,影響觀感,更多的知識(shí)點(diǎn)也需要我們自己去看其他的文章。
event loop
Event Loop(事件循環(huán))是一種編程模式,主要用于處理異步任務(wù),特別是在單線程環(huán)境下(如 JavaScript 和 Python 的 async/await 機(jī)制,swoole reactphp workerman的實(shí)現(xiàn)機(jī)制)。它的作用是管理和調(diào)度異步任務(wù),確保非阻塞執(zhí)行。
工作原理
同步任務(wù)先執(zhí)行:程序先執(zhí)行同步代碼,遇到異步任務(wù)(如 I/O 操作、網(wǎng)絡(luò)請求、定時(shí)器等)時(shí),將其交給事件循環(huán)調(diào)度。
異步任務(wù)放入隊(duì)列:異步任務(wù)(如 Promise、setTimeout、I/O 操作)被推入不同的任務(wù)隊(duì)列(如微任務(wù)隊(duì)列、宏任務(wù)隊(duì)列)。
事件循環(huán)調(diào)度:
先執(zhí)行微任務(wù)隊(duì)列(Microtasks):如 Promise.then()、process.nextTick()。
再執(zhí)行宏任務(wù)隊(duì)列(Macrotasks):如 setTimeout、setImmediate、I/O 操作等。
循環(huán)執(zhí)行:事件循環(huán)不斷檢查是否有新的任務(wù)可執(zhí)行,直到程序結(jié)束。
并發(fā)和并行
并發(fā)(Concurrency) 和 并行(Parallelism) 是計(jì)算機(jī)科學(xué)中的兩個(gè)概念,主要用于描述程序的執(zhí)行方式。
1. 并發(fā)(Concurrency)
-
定義:多個(gè)任務(wù)在同一時(shí)間段交替執(zhí)行,但不一定同時(shí)運(yùn)行。
-
特點(diǎn):
- 任務(wù)看起來是同時(shí)進(jìn)行的,但實(shí)際上 CPU 在多個(gè)任務(wù)之間快速切換。
- 適用于 I/O 密集型任務(wù)(如網(wǎng)絡(luò)請求、文件讀寫)。
- 主要依賴多線程(Threads)或協(xié)程(Coroutines),但仍運(yùn)行在單核 CPU 上。
-
示例:
- 你在聽音樂的同時(shí)瀏覽網(wǎng)頁(實(shí)際上 CPU 在不同任務(wù)之間切換)。
- Python 的
asyncio通過await實(shí)現(xiàn)并發(fā)任務(wù)。
2. 并行(Parallelism)
-
定義:多個(gè)任務(wù)真正同時(shí)運(yùn)行,通常依賴于多核 CPU。
-
特點(diǎn):
- 任務(wù)同時(shí)執(zhí)行,提高計(jì)算效率。
- 適用于 CPU 密集型任務(wù)(如視頻渲染、科學(xué)計(jì)算)。
- 依賴多進(jìn)程(Processes)或多線程(Threads),需要多個(gè) CPU 核心。
-
示例:
- 你和朋友同時(shí)各自用一臺(tái)電腦玩游戲(多個(gè)核心真正同時(shí)執(zhí)行)。
- 使用
multiprocessing在 Python 中進(jìn)行并行計(jì)算。
3. 關(guān)鍵區(qū)別
| 并發(fā)(Concurrency) | 并行(Parallelism) | |
|---|---|---|
| 執(zhí)行方式 | 任務(wù)交替進(jìn)行(看起來同時(shí)) | 任務(wù)真正同時(shí)執(zhí)行 |
| 依賴 | 線程、協(xié)程 | 進(jìn)程、多個(gè) CPU 核心 |
| 適用場景 | I/O 密集型任務(wù)(網(wǎng)絡(luò)請求、數(shù)據(jù)庫操作) | CPU 密集型任務(wù)(科學(xué)計(jì)算、加密運(yùn)算) |
| 示例 | Python asyncio,JavaScript Promise |
Python multiprocessing,多核計(jì)算 |
4. 總結(jié)
- 并發(fā) = 任務(wù)交替執(zhí)行,但不一定同時(shí)運(yùn)行(單核 CPU 可實(shí)現(xiàn))。
- 并行 = 任務(wù)真正同時(shí)運(yùn)行,需要多核 CPU 支持。
- 并發(fā)提升程序響應(yīng)速度,并行提升計(jì)算效率。
兩者可以結(jié)合使用,比如在一個(gè)并發(fā)系統(tǒng)中,每個(gè)并發(fā)任務(wù)內(nèi)部再使用并行計(jì)算來加速處理!
進(jìn)程(Process)和線程(Thread)在操作系統(tǒng)中會(huì)經(jīng)歷多個(gè)狀態(tài)切換,下面是它們的典型狀態(tài)圖:
1. 進(jìn)程狀態(tài)切換圖
進(jìn)程的生命周期通常包含以下 5 種狀態(tài):
- 新建(New):進(jìn)程被創(chuàng)建,但尚未執(zhí)行。
- 就緒(Ready):進(jìn)程已準(zhǔn)備好運(yùn)行,但等待 CPU 調(diào)度。
- 運(yùn)行(Running):進(jìn)程占用 CPU,并執(zhí)行任務(wù)。
- 等待(Waiting / Blocked):進(jìn)程因 I/O 操作或其他事件阻塞,無法繼續(xù)執(zhí)行。
- 終止(Terminated):進(jìn)程執(zhí)行完畢或被終止,進(jìn)入銷毀狀態(tài)。
狀態(tài)切換圖:
+-----------+
| New(新建) |
+-----------+
|
v
+-----------------+
| Ready(就緒) |
+-----------------+
| ↑
CPU調(diào)度 |
v
+-----------------+
| Running(運(yùn)行) |
+-----------------+
| | |
運(yùn)行完成 | I/O等待或資源不足
v v v
+-------------+ +------------------+
| Terminated | | Waiting(等待) |
|(終止) | |(阻塞/掛起) |
+-------------+ +------------------+
|
v
返回就緒隊(duì)列
2. 線程狀態(tài)切換圖
線程的狀態(tài)切換與進(jìn)程類似,但線程是輕量級的,通常共享進(jìn)程的資源。其狀態(tài)如下:
- 新建(New):線程被創(chuàng)建但未啟動(dòng)。
- 就緒(Ready):線程可以運(yùn)行,但等待 CPU 資源。
- 運(yùn)行(Running):線程正在執(zhí)行任務(wù)。
- 阻塞(Blocked / Waiting):線程等待 I/O 或其他線程釋放資源。
- 終止(Terminated):線程任務(wù)完成或異常終止。
線程狀態(tài)切換圖:
+-----------+
| New(新建) |
+-----------+
|
v
+----------------+
| Ready(就緒) |
+----------------+
| ↑
CPU調(diào)度 |
v
+----------------+
| Running(運(yùn)行) |
+----------------+
| | |
完成 | 等待I/O或鎖
v v v
+-------------+ +------------------+
| Terminated | | Blocked(阻塞) |
|(終止) | |(等待資源) |
+-------------+ +------------------+
|
v
返回就緒隊(duì)列
3. 進(jìn)程 vs 線程 狀態(tài)切換
| 狀態(tài) | 進(jìn)程(Process) | 線程(Thread) |
|---|---|---|
| 新建 | 創(chuàng)建新的進(jìn)程 | 創(chuàng)建新的線程 |
| 就緒 | 進(jìn)程等待 CPU 資源 | 線程等待 CPU 資源 |
| 運(yùn)行 | 進(jìn)程正在執(zhí)行任務(wù) | 線程正在執(zhí)行任務(wù) |
| 阻塞 | 進(jìn)程等待 I/O 或資源 | 線程等待 I/O 或鎖 |
| 終止 | 進(jìn)程完成或被終止 | 線程完成或被終止 |
區(qū)別:
- 進(jìn)程切換開銷較大,涉及 CPU 狀態(tài)、內(nèi)存等信息。
- 線程切換較輕量,多個(gè)線程共享進(jìn)程資源。
這兩者的切換過程可以通過任務(wù)管理器或top 命令查看 CPU 占用情況!
啟動(dòng)流程分析
我們直接走到vendor/laravel/reverb/src/Servers/Reverb/Console/Commands/StartServer.php,它就是啟動(dòng)服務(wù)的終端命令文件。在看我們啟動(dòng)服務(wù)前,我們先總結(jié)一個(gè)注解:
#[AsCommand(name: 'reverb:start')]
在 PHP 8 中,#[AsCommand(name: 'reverb:start')] 這個(gè)注解(Attribute)通常用于標(biāo)記類或方法,以提供元數(shù)據(jù)。它主要用于 Symfony Console 命令 或 自定義框架組件,用于定義 CLI 命令。
回到代碼,我們就看handle的方法。
public function handle(): void
{
if ($this->option('debug')) {
$this->laravel->instance(Logger::class, new CliLogger($this->output));
}
$config = $this->laravel['config']['reverb.servers.reverb'];
$loop = Loop::get();
$server = ServerFactory::make(
$host = $this->option('host') ?: $config['host'],
$port = $this->option('port') ?: $config['port'],
$hostname = $this->option('hostname') ?: $config['hostname'],
$config['max_request_size'] ?? 10_000,
$config['options'] ?? [],
loop: $loop
);
$this->ensureHorizontalScalability($loop);
$this->ensureStaleConnectionsAreCleaned($loop);
$this->ensureRestartCommandIsRespected($server, $loop, $host, $port);
$this->ensurePulseEventsAreCollected($loop, $config['pulse_ingest_interval']);
$this->ensureTelescopeEntriesAreCollected($loop, $config['telescope_ingest_interval'] ?? 15);
$this->components->info('Starting ' . ($server->isSecure() ? 'secure ' : '') . "server on {$host}:{$port}" . (($hostname && $hostname !== $host) ? " ({$hostname})" : ''));
$server->start();
}
我逐個(gè)說明下一些核心的地方:
ServerFactory::make: 服務(wù)初始化, 支持自定義主機(jī)、端口、主機(jī)名以及請求大小等配置。ensureHorizontalScalability:水平擴(kuò)展分布式支持,通過廣播(PubSub)啟用水平擴(kuò)展。連接 PubSubProvider 并訂閱事件,以便在多實(shí)例部署中同步消息。這個(gè)在reverb.php的配置文件對應(yīng)的配置為:
//...
'scaling' => [
'enabled' => env('REVERB_SCALING_ENABLED', false),
'channel' => env('REVERB_SCALING_CHANNEL', 'reverb'),
'server' => [
'url' => env('REDIS_URL'),
'host' => env('REDIS_HOST', '127.0.0.1'),
'port' => env('REDIS_PORT', '6379'),
'username' => env('REDIS_USERNAME'),
'password' => env('REDIS_PASSWORD'),
'database' => env('REDIS_DB', '0'),
],
],
- ensureStaleConnectionsAreCleaned: 使用事件循環(huán)定期清理連接
- ensureRestartCommandIsRespected: 定時(shí)檢查是否發(fā)送了重啟信號,在
reverb:restart的時(shí)候會(huì)設(shè)置重啟時(shí)間,定時(shí)器發(fā)現(xiàn)重啟時(shí)間對的上就重啟服務(wù) - ensurePulseEventsAreCollected:如果啟用了,安排 Pulse 收集事件。這個(gè)就是要用到pulse兼容組件
接著,講下ensureHorizontalScalability方法:
它通過PubSubProvider類來利用redis的發(fā)布訂閱,在開啟分布式scaling擴(kuò)展時(shí),多臺(tái)服務(wù)共享通信的數(shù)據(jù)。
我們繼續(xù)??,來到vendor/laravel/reverb/src/Servers/Reverb/Factory.php,這個(gè)類的用途:使用 Laravel 生態(tài)和 ReactPHP 創(chuàng)建了一個(gè)支持 Pusher 協(xié)議的 WebSocket 服務(wù)器。它支持 TLS(加密連接)以及自定義協(xié)議擴(kuò)展(目前只支持 Pusher 協(xié)議)。
我們關(guān)注make方法,考慮到篇幅,我這邊就直接說明整個(gè)作用:
- 事件循環(huán)初始化:若沒有提供 $loop,則通過 Loop::get() 獲取全局事件循環(huán)實(shí)例。
- 路由選擇:使用 match 表達(dá)式,根據(jù) $protocol 值調(diào)用對應(yīng)的路由構(gòu)造方法(此處只支持 'pusher' 協(xié)議,調(diào)用 static::makePusherRouter()),如果傳入不支持的協(xié)議,則拋出異常。
- TLS 配置: 調(diào)用 static::configureTls() 方法,將傳入的 $options['tls'] 與 $hostname 進(jìn)行配置,返回處理后的 TLS 選項(xiàng)。隨后,根據(jù)是否使用 TLS(由 static::usesTls() 判斷),構(gòu)建最終的 URI 字符串,決定是使用 tls:// 還是普通的 host:port 格式。
- 使用處理后的 URI、選項(xiàng)、路由以及事件循環(huán),創(chuàng)建一個(gè) HttpServer 實(shí)例,并將最大請求大小傳入。
接下來,我們重點(diǎn)定位到:makePusherRouter和pusherRoutes,我們一個(gè)個(gè)來分析:
makePusherRouter
public static function makePusherRouter(): Router
{
app()->singleton(
ChannelManager::class,
fn () => new ArrayChannelManager
);
app()->bind(
ChannelConnectionManager::class,
fn () => new ArrayChannelConnectionManager
);
app()->singleton(
PubSubIncomingMessageHandler::class,
fn () => new PusherPubSubIncomingMessageHandler,
);
return new Router(new UrlMatcher(static::pusherRoutes(), new RequestContext));
}
依賴綁定:
使用 app()->singleton() 與 app()->bind() 將接口與具體實(shí)現(xiàn)綁定到 Laravel 服務(wù)容器中:
- ChannelManager::class 被綁定為 ArrayChannelManager 的單例,意味著在整個(gè)應(yīng)用中只會(huì)有一個(gè)實(shí)例,用于管理頻道信息。
- ChannelConnectionManager::class 被綁定為 ArrayChannelConnectionManager,用于管理連接信息。
- PubSubIncomingMessageHandler::class 被綁定為 PusherPubSubIncomingMessageHandler,用于處理從 Pub/Sub 系統(tǒng)傳來的消息。
路由構(gòu)造:
- 調(diào)用 static::pusherRoutes() 獲取定義好的路由集合。
- 創(chuàng)建一個(gè) UrlMatcher 對象,并傳入路由集合和新的 RequestContext(用于匹配請求 URL)。
- 使用這個(gè) UrlMatcher 構(gòu)造一個(gè) Router 實(shí)例,并返回。
ChannelManager和ArrayChannelManager這個(gè)就是維護(hù)整個(gè)通信流程中 應(yīng)用-連接-通道(事件)關(guān)系的核心,在我后面用swoole實(shí)現(xiàn)多進(jìn)程的reverb服務(wù)里,我使用了redis+lock分別實(shí)現(xiàn)了他們。大家有個(gè)印象就行。
pusherRoutes
protected static function pusherRoutes(): RouteCollection
{
// 詳細(xì)解析:
// 路由集合構(gòu)建:
//
// 創(chuàng)建一個(gè)新的 RouteCollection 實(shí)例用于存放所有的路由定義。
// 路由添加:
//
// 每一條路由使用 Route::get 或 Route::post 定義,其中包含 URL 模板和對應(yīng)的控制器實(shí)例。
// 例如:
// sockets 路由:GET 請求 /app/{appKey},對應(yīng) PusherController,傳入 PusherServer 與 ApplicationProvider 實(shí)例(通過 Laravel 容器解析)。
// events 路由:POST 請求 /apps/{appId}/events,對應(yīng) EventsController,用于處理事件推送請求。
// 其他路由同理,包括批量事件、連接查詢、頻道信息、用戶終止連接等。
// 作用:
//
// 將所有 Pusher 協(xié)議相關(guān)的 API 接口集中管理,便于后續(xù)維護(hù)和擴(kuò)展。
// 通過 URL 模板和控制器綁定,框架能夠根據(jù)請求路徑準(zhǔn)確匹配到對應(yīng)的處理邏輯。
$routes = new RouteCollection;
$routes->add('sockets', Route::get('/app/{appKey}', new PusherController(app(PusherServer::class), app(ApplicationProvider::class))));
$routes->add('events', Route::post('/apps/{appId}/events', new EventsController));
$routes->add('events_batch', Route::post('/apps/{appId}/batch_events', new EventsBatchController));
$routes->add('connections', Route::get('/apps/{appId}/connections', new ConnectionsController));
$routes->add('channels', Route::get('/apps/{appId}/channels', new ChannelsController));
$routes->add('channel', Route::get('/apps/{appId}/channels/{channel}', new ChannelController));
$routes->add('channel_users', Route::get('/apps/{appId}/channels/{channel}/users', new ChannelUsersController));
$routes->add('users_terminate', Route::post('/apps/{appId}/users/{userId}/terminate_connections', new UsersTerminateController));
$routes->add('health_check', Route::get('/up', new HealthCheckController));
return $routes;
}
路由組織了我們整個(gè)http接口請求的過程,我們也直觀的知道pusher通信涉及到的處理,那么我就總結(jié)下每個(gè)路由的作用:
Pusher 是一個(gè) WebSocket 推送服務(wù),主要用于實(shí)現(xiàn)實(shí)時(shí)通信。以下是 pusherRoutes() 方法中定義的每個(gè)路由的作用解析:
-
sockets(WebSocket 連接路由)
$routes->add('sockets', Route::get('/app/{appKey}', new PusherController(app(PusherServer::class), app(ApplicationProvider::class))));- 作用: 用于建立 WebSocket 連接。
- 請求類型:
GET - URL:
/app/{appKey} - 控制器:
PusherController - 參數(shù):
appKey(應(yīng)用的 API Key) - 解析:
- 客戶端會(huì)通過 WebSocket 連接到該路由,以便與 Pusher 服務(wù)器建立實(shí)時(shí)連接。
- 服務(wù)器通過
PusherServer和ApplicationProvider處理連接邏輯。
-
events(事件推送)
$routes->add('events', Route::post('/apps/{appId}/events', new EventsController));- 作用: 負(fù)責(zé)處理客戶端推送的事件(消息)。
- 請求類型:
POST - URL:
/apps/{appId}/events - 控制器:
EventsController - 參數(shù):
appId(應(yīng)用 ID) - 解析:
- 該路由用于處理客戶端推送的事件,并將消息廣播到相應(yīng)的頻道或用戶。
-
events_batch(批量事件推送)
$routes->add('events_batch', Route::post('/apps/{appId}/batch_events', new EventsBatchController));- 作用: 允許客戶端一次性推送多個(gè)事件,提高批量處理效率。
- 請求類型:
POST - URL:
/apps/{appId}/batch_events - 控制器:
EventsBatchController - 解析:
- 該路由支持批量事件推送,可以減少客戶端與服務(wù)器之間的請求次數(shù),提高性能。
-
connections(查詢連接信息)
$routes->add('connections', Route::get('/apps/{appId}/connections', new ConnectionsController));- 作用: 查詢當(dāng)前應(yīng)用下的所有活躍連接。
- 請求類型:
GET - URL:
/apps/{appId}/connections - 控制器:
ConnectionsController - 解析:
- 該 API 允許管理員或服務(wù)端查詢當(dāng)前應(yīng)用的 WebSocket 連接信息,比如在線用戶數(shù)量。
-
channels(查詢所有頻道信息)
$routes->add('channels', Route::get('/apps/{appId}/channels', new ChannelsController));- 作用: 獲取當(dāng)前應(yīng)用的所有活躍頻道列表。
- 請求類型:
GET - URL:
/apps/{appId}/channels - 控制器:
ChannelsController - 解析:
- 允許查詢當(dāng)前應(yīng)用正在使用的所有 WebSocket 頻道,常用于管理面板或分析工具。
-
channel(查詢單個(gè)頻道信息)
$routes->add('channel', Route::get('/apps/{appId}/channels/{channel}', new ChannelController));- 作用: 獲取指定頻道的詳細(xì)信息。
- 請求類型:
GET - URL:
/apps/{appId}/channels/{channel} - 控制器:
ChannelController - 參數(shù):
channel(頻道名稱) - 解析:
- 用于查詢某個(gè)具體頻道的狀態(tài),比如訂閱用戶數(shù)等信息。
-
channel_users(查詢頻道在線用戶列表)
$routes->add('channel_users', Route::get('/apps/{appId}/channels/{channel}/users', new ChannelUsersController));- 作用: 獲取指定頻道的在線用戶列表(僅適用于 Presence 頻道)。
- 請求類型:
GET - URL:
/apps/{appId}/channels/{channel}/users - 控制器:
ChannelUsersController - 解析:
- 適用于 Presence 頻道,可以獲取該頻道所有在線用戶的信息。
-
users_terminate(終止用戶連接)
$routes->add('users_terminate', Route::post('/apps/{appId}/users/{userId}/terminate_connections', new UsersTerminateController));- 作用: 斷開某個(gè)用戶的所有連接。
- 請求類型:
POST - URL:
/apps/{appId}/users/{userId}/terminate_connections - 控制器:
UsersTerminateController - 參數(shù):
userId(用戶 ID) - 解析:
- 該 API 用于強(qiáng)制斷開某個(gè)用戶的所有 WebSocket 連接,通常用于管理或安全策略。
-
health_check(健康檢查)
$routes->add('health_check', Route::get('/up', new HealthCheckController));- 作用: 用于檢測 Pusher 服務(wù)器是否正常運(yùn)行。
- 請求類型:
GET - URL:
/up - 控制器:
HealthCheckController - 解析:
- 該 API 用于監(jiān)控 Pusher 服務(wù)的健康狀態(tài),一般用于運(yùn)維或監(jiān)控系統(tǒng)。
總結(jié):
| 路由名稱 | 請求類型 | 作用 |
|---|---|---|
| sockets | GET |
負(fù)責(zé) WebSocket 連接 |
| events | POST |
處理單個(gè)事件推送 |
| events_batch | POST |
處理批量事件推送 |
| connections | GET |
查詢所有 WebSocket 連接 |
| channels | GET |
獲取當(dāng)前應(yīng)用所有頻道 |
| channel | GET |
查詢單個(gè)頻道詳情 |
| channel_users | GET |
查詢 Presence 頻道在線用戶 |
| users_terminate | POST |
終止指定用戶的所有連接 |
| health_check | GET |
檢查服務(wù)器健康狀態(tài) |
總結(jié)
考慮到篇幅和閱讀感,后面的內(nèi)容下一節(jié)說

浙公網(wǎng)安備 33010602011771號