Laravel11 從0開發 Swoole-Reverb 擴展包(四) - 觸發一個廣播事件到reverb服務之后是如何轉發給前端訂閱的呢(下)?
前情提要
上一篇我們講到了reverb服務的通信上下文和路由處理,路由實現了pusher關聯的幾種請求。那么這一篇我們主要來講混響服務Server
混響 Server
負責基于 ReactPHP 的 SocketServer 和事件循環構建一個 HTTP 服務器(實現了一個輕量級、異步的 HTTP 服務器。通過注冊事件、請求解析、異常捕獲等機制,保證了請求的正確處理和異常情況下的優雅響應)。它主要實現了以下功能: 連接管理:通過監聽新連接(__invoke 方法)來處理數據事件,并將其傳遞給請求處理邏輯。 請求解析與分發:將原始數據轉換為 PSR-7 Request 對象,并交由路由器分發。 異常處理與錯誤響應:捕獲請求調度過程中的異常,返回對應 HTTP 狀態碼和消息。 垃圾回收優化:通過定時器調用垃圾回收,降低內存碎片。 TLS 檢測:判斷服務器是否支持加密連接。
server啟動
我們走到 vendor/laravel/reverb/src/Servers/Reverb/Http/Server.php文件,代碼不多,我先貼出來
<?php
namespace Laravel\Reverb\Servers\Reverb\Http;
use Illuminate\Support\Str;
use Laravel\Reverb\Loggers\Log;
use Laravel\Reverb\Servers\Reverb\Concerns\ClosesConnections;
use OverflowException;
use Psr\Http\Message\RequestInterface;
use React\EventLoop\Loop;
use React\EventLoop\LoopInterface;
use React\Socket\ConnectionInterface;
use React\Socket\ServerInterface;
use Symfony\Component\HttpKernel\Exception\HttpException;
use Throwable;
class Server
{
use ClosesConnections;
/**
* Create a new Http server instance.
*/
public function __construct(protected ServerInterface $socket, protected Router $router, protected int $maxRequestSize, protected ?LoopInterface $loop = null)
{
gc_disable();
$this->loop = $loop ?: Loop::get();
$this->loop->addPeriodicTimer(30, fn () => gc_collect_cycles());
$socket->on('connection', $this);
}
/**
* Start the Http server
*/
public function start(): void
{
$this->loop->run();
}
/**
* Handle an incoming request.
*/
protected function handleRequest(string $message, Connection $connection): void
{
if ($connection->isConnected()) {
return;
}
if (($request = $this->createRequest($message, $connection)) === null) {
return;
}
$connection->connect();
try {
$this->router->dispatch($request, $connection);
} catch (HttpException $e) {
$this->close($connection, $e->getStatusCode(), $e->getMessage());
} catch (Throwable $e) {
Log::error($e->getMessage());
$this->close($connection, 500, 'Internal server error.');
}
}
/**
* Create a Psr7 request from the incoming message.
*/
protected function createRequest(string $message, Connection $connection): ?RequestInterface
{
try {
$request = Request::from($message, $connection, $this->maxRequestSize);
} catch (OverflowException $e) {
$this->close($connection, 413, 'Payload too large.');
} catch (Throwable $e) {
$this->close($connection, 400, 'Bad request.');
}
return $request ?? null;
}
/**
* Stop the Http server
*/
public function stop(): void
{
$this->loop->stop();
$this->socket->close();
}
/**
* Invoke the server with a new connection instance.
*/
public function __invoke(ConnectionInterface $connection): void
{
$connection = new Connection($connection);
$connection->on('data', function ($data) use ($connection) {
$this->handleRequest($data, $connection);
});
}
/**
* Determine whether the server has TLS support.
*/
public function isSecure(): bool
{
return Str::startsWith($this->socket->getAddress(), 'tls://');
}
}
首先我們關注start方法,這個方法就是外層工廠調用服務啟動的方法:啟動服務
public function start(): void
{
$this->loop->run();
}
start() 方法簡單地啟動事件循環。所有注冊到事件循環的定時器、IO 事件和連接處理都將在 run() 方法調用后開始執行。 這是整個 HTTP 服務器的入口,調用后服務器進入阻塞狀態等待事件發生。
在走到我們的構造函數:
public function __construct(protected ServerInterface $socket, protected Router $router, protected int $maxRequestSize, protected ?LoopInterface $loop = null)
{
// 調用 gc_disable() 禁用 PHP 的自動垃圾回收
gc_disable();
$this->loop = $loop ?: Loop::get();
//然后使用事件循環的定時器每 30 秒手動觸發一次 gc_collect_cycles(),以便更好地控制內存管理,防止自動垃圾回收帶來的性能波動
$this->loop->addPeriodicTimer(30, fn () => gc_collect_cycles());
$socket->on('connection', $this);
}
大家在這里可以停下來???下,我們會發現這里有兩點是值得我們學習的,接下來我就和大家一起來學習以下兩點:
垃圾回收機制
由于PHP 的垃圾回收不會立即執行,而是滿足條件后觸發,因此,php提供了手動操作gc的方法,那么通過reverb的案例,我們也有機會用到自己的項目代碼中。
下面是AI的總結:
1. PHP 7 及以后的垃圾回收機制
PHP 7 繼續使用 引用計數(Reference Counting) 作為主要的內存管理機制,并配合 循環引用檢測(Cycle Detection) 進行垃圾回收。
1.1 主要組成部分
-
引用計數(Reference Counting, RC)
- PHP 的變量是基于引用計數進行管理的,每個變量都有一個 引用計數器(refcount)。
- 當變量被賦值或傳遞時,引用計數增加。
- 當變量的作用域結束或 unset() 釋放變量時,引用計數減少。
- 當引用計數歸零時,變量占用的內存被立即釋放。
-
循環引用檢測(Cycle Collection)
- PHP 7 及以后版本采用 三代垃圾回收機制(Generational GC) 解決循環引用問題(即兩個或多個對象互相引用,導致引用計數永遠不會歸零)。
- 采用 分代收集(Generational Collection),將變量分為 年輕代(young)、中生代(middle-aged)、老年代(old),減少不必要的垃圾回收操作。
-
分代垃圾回收(Generational Garbage Collection)
- 年輕代(young):剛創建的變量,GC 觸發頻率高。
- 中生代(middle-aged):已存活一段時間的變量,GC 觸發頻率較低。
- 老年代(old):存活很久的變量,GC 觸發最少。
- 這種策略減少了垃圾回收對性能的影響,提高了執行效率。
2. PHP 7 及以后的垃圾回收策略
2.1 觸發條件
PHP 的垃圾回收不會立即執行,而是滿足以下條件時觸發:
- 變量的引用計數降為 0,立即釋放內存(適用于無循環引用的變量)。
- 垃圾回收閾值觸發,即:
- 當創建的新變量數量超過 GC 閾值(默認 10,000),GC 可能會運行。
- 當 PHP 發現有循環引用,會進入 GC 過程,釋放循環引用的內存。
2.2 關鍵優化點
- 減少不必要的 GC 觸發
- 由于采用了分代垃圾回收,PHP 不會每次都掃描所有變量,而是優先回收年輕代變量,減少影響。
- 提升 GC 執行效率
- PHP 7 優化了 GC 算法,使得清理循環引用的操作更快。
- 優化變量管理
- PHP 7 對
zend_mm_heap(PHP 內存管理器)進行了改進,提高了變量分配和回收的效率。
- PHP 7 對
3. 常見問題及優化建議
3.1 避免循環引用
如果 PHP 代碼中存在對象相互引用,GC 可能不會立即釋放內存。解決方法:
-
使用
WeakReference(PHP 7.4+):$obj1 = new stdClass(); $obj2 = new stdClass(); $obj1->ref = WeakReference::create($obj2); $obj2->ref = WeakReference::create($obj1);這樣不會增加引用計數,GC 觸發時可以正常回收。
-
手動釋放引用
$obj1->ref = null; $obj2->ref = null;
3.2 手動觸發垃圾回收
如果內存使用過高,可以手動調用 gc_collect_cycles() 進行垃圾回收:
gc_collect_cycles();
但通常情況下,PHP 的 GC 機制已經足夠智能,不需要手動調用。
3.3 關閉/調整 GC
-
禁用 GC(在特定情況下提高性能):
gc_disable();適用于短生命周期的腳本,例如 CLI 工具、任務隊列等,避免 GC 影響執行速度。
-
重新啟用 GC
gc_enable(); -
調整 GC 閾值
可以通過gc_mem_caches()調整回收策略,優化長時間運行的應用。
4. 總結
| PHP 版本 | 垃圾回收機制 | 優化點 |
|---|---|---|
| PHP 7+ | 引入 分代垃圾回收(Generational GC) | 提高 GC 效率,減少性能開銷 |
| PHP 7.4+ | 引入 WeakReference |
避免不必要的引用計數 |
__invoke使用
__invoke 是 PHP 的魔術方法之一,允許對象像函數一樣被調用。它的主要作用是 讓類的實例可以像函數一樣執行,從而提供更靈活的代碼設計
$socket->on('connection', $this);,connection 對應的是$this,新連接處理就在__invoke。因此我們關注的點就在__invoke上了,可以看到laravel框架有很多的地方使用了__invoke,那么對于我們來說也是有機會用到自己的代碼里的。
新連接處理
我們現在重點關心__invoke里面的代碼,當有數據到達時調用 handleRequest() 方法處理,深入到handleRequest里面。
handleRequest 處理流程
- 連接狀態判斷: 開始檢查連接是否已經建立(isConnected()),如果已經連接,則直接返回,避免重復處理
- 調用 createRequest() 將原始消息轉換為 PSR-7 Request 對象。如果請求創建失敗(返回 null),則直接返回,不進行后續調度。
- 連接激活: 成功創建請求后,調用 $connection->connect() 激活連接,表示已準備好處理請求。
- 路由分發: 使用注入的 $router 對象,根據請求內容分發到對應的控制器或處理邏輯。
- 異常處理: 如果調度過程中捕獲到 HttpException,則根據異常狀態碼和消息關閉連接。捕獲所有其他異常時,先記錄錯誤日志,然后返回 500 狀態碼,告知客戶端服務器內部錯誤。
路由分發
我們現在就來到了vendor/laravel/reverb/src/Servers/Reverb/Http/Router.php,路由類里面,繼續看核心的dispatch
public function dispatch(RequestInterface $request, Connection $connection): mixed
{
$uri = $request->getUri();
$context = $this->matcher->getContext();
$context->setMethod($request->getMethod());
$context->setHost($uri->getHost());
try {
$route = $this->matcher->match($uri->getPath());
} catch (MethodNotAllowedException $e) {
return $this->close($connection, 405, 'Method not allowed.', ['Allow' => $e->getAllowedMethods()]);
} catch (ResourceNotFoundException $e) {
return $this->close($connection, 404, 'Not found.');
}
$controller = $this->controller($route);
if ($this->isWebSocketRequest($request)) {
$wsConnection = $this->attemptUpgrade($request, $connection);
return $controller($request, $wsConnection, ...Arr::except($route, ['_controller', '_route']));
}
$routeParameters = Arr::except($route, [
'_controller',
'_route',
]) + ['request' => $request, 'connection' => $connection];
$response = $controller(
...$this->arguments($controller, $routeParameters)
);
return $response instanceof PromiseInterface ?
$response->then(fn ($response) => $connection->send($response)->close()) :
$connection->send($response)->close();
}
這個路由分發寫的也很好,很好的利用了Symfony 的Route 來實現,寫到這里就讓想起了上家公司框架里面也是大量使用symfony的route http command process 等核心組件來構建api服務,不得不說symfony才是精品。同時對于我們來說,也可以把symfony的好的組件用于自己的項目。
同時路由也共同處理著http request 以及 ws on message 的流程。
除了路由,我們這里還能學到的一個點就是:webscoket 協議的處理。因此,我們就重點來看下這個。
webscoket 協議的處理
協議升級
我們先關注核心的代碼邏輯:
if ($this->isWebSocketRequest($request)) {
$wsConnection = $this->attemptUpgrade($request, $connection);
return $controller($request, $wsConnection, ...Arr::except($route, ['_controller', '_route']));
}
- 如果請求是一個 WebSocket 請求(通過 Upgrade: websocket 頭部判斷),就嘗試進行協議升級。
- 升級成功后,將請求 $request 和升級后的連接對象 $wsConnection 傳遞給控制器 $controller 處理。
- Arr::except($route, ['_controller', '_route']) 是 Laravel 的輔助函數,表示排除控制器相關信息,傳遞剩余路由參數
也就是我們前端發起建立的ws連接后(比如是:ws://localhost:8083/app/2lza6dryoslsyxss6ub4?protocol=7&client=js&version=8.4.0&flash=false),就會走到協議升級的$controller處理。那具體處理在哪呢,這個就要回到我們上節提到的:pusherRoutes里定義的路由了。那么,我們快馬加鞭的回到那里去,同時也說明下,這里對ws協議沒有講完。我準備在下面的內容在進行說明

業務具體處理
我們回到了vendor/laravel/reverb/src/Servers/Reverb/Factory.php 的pusherRoutes方法里面,然后就看第一條路由:
$routes->add('sockets', Route::get('/app/{appKey}', new PusherController(app(PusherServer::class), app(ApplicationProvider::class))));
是不是一下子就破案了呢,這個路由匹配了ws連接地址/app/2lza6dryoslsyxss6ub4,因此PusherController 就是ws的處理。因此我們就繼續走,來到:vendor/laravel/reverb/src/Protocols/Pusher/Http/Controllers/PusherController.php
同樣的controller也是用到了__invoke,因此我們直接盤它。
Pusher 協議
1. 消息格式
Pusher 協議中的消息格式是 JSON,一般結構如下:
{
"event": "event-name",
"data": "stringified JSON",
"channel": "optional-channel-name"
}
例子:
{
"event": "pusher:subscribe",
"data": {
"channel": "private-chat.123"
}
}
或者消息推送:
{
"event": "client-message",
"data": {
"text": "Hello"
},
"channel": "chat.123"
}
2. 控制器中處理的 WebSocket 生命周期
控制器里注冊了三種 WebSocket 事件監聽:
$connection->onMessage(fn ($message) => ... );
$connection->onControl(fn (FrameInterface $message) => ... );
$connection->onClose(fn () => ... );
onMessage()
當瀏覽器發送 WebSocket 消息時:
fn ($message) => $this->server->message($reverbConnection, (string) $message)
- 將接收到的消息字符串傳給
PusherServer::message()處理。 - 這個方法里會解析 JSON,判斷 event 類型,比如:
pusher:subscribe:表示客戶端訂閱頻道。client-event:客戶端發送自定義消息。ping/pong:心跳檢查。
- 然后進行路由、鑒權、廣播等邏輯。
onControl()
fn (FrameInterface $message) => $this->server->control($reverbConnection, $message)
- 控制幀,如
ping、pong、close等幀。 - 比如,客戶端發送 ping,這里可以回應 pong。
- 這部分屬于 WebSocket 協議的低層部分,保證連接活躍。
onClose()
fn () => $this->server->close($reverbConnection)
- 當連接關閉時,做清理,比如移除連接、取消訂閱、廣播離線消息等。
3. 連接初始化
$this->server->open($reverbConnection);
- 通知
PusherServer有一個新的連接建立了。 - 它可能會給客戶端推送一個
pusher:connection_established消息:
{
"event": "pusher:connection_established",
"data": {
"socket_id": "some-unique-id",
"activity_timeout": 120
}
}
這個是 Pusher 協議里約定的,客戶端拿到 socket_id 后,才能訂閱私有頻道等。
當 appKey 無效時,會推送一個標準的錯誤:
$connection->send('{"event":"pusher:error","data":"{\"code\":4001,\"message\":\"Application does not exist\"}"}');
- 作為 WebSocket 路由的入口
- 實現 Pusher 協議的握手、訂閱、消息處理等
- 封裝了低層的
Connection和高層的Application為ReverbConnection - 注冊了完整的 WebSocket 生命周期事件(消息、控制幀、關閉)
而 Pusher 協議的數據結構 是基于 JSON 的,所有通信事件都通過 event + data (+channel) 來傳遞和解析,保持了高度的靈活性和可擴展性。
為了一起學習,我們用gpt4來系統總結下websocket的知識。
一、WebSocket 是什么?
WebSocket 是一種 基于 TCP 的雙向通信協議,它允許瀏覽器和服務器之間建立一個 持久的連接,雙方可以隨時互發數據,而無需每次都重新建立連接(像 HTTP 那樣)。
它最初由 RFC 6455 規范定義。
二、WebSocket 建立過程
1. 握手階段(HTTP 協議完成升級)
WebSocket 連接開始于一個 HTTP GET 請求,客戶端發送如下請求:
GET /app/abc123 HTTP/1.1
Host: example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Sec-WebSocket-Version: 13
關鍵字段說明:
Upgrade: websocket:告訴服務器要升級為 WebSocket 協議。Connection: Upgrade:和上面的配套,表示連接要升級。Sec-WebSocket-Key:一個隨機的 base64 編碼字符串,用于安全校驗。Sec-WebSocket-Version:WebSocket 協議版本,當前為13。
2. 服務器響應握手
服務器驗證合法后,會響應如下內容:
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
Sec-WebSocket-Accept是通過客戶端的Sec-WebSocket-Key計算得來的:base64_encode(sha1($clientKey . '258EAFA5-E914-47DA-95CA-C5AB0DC85B11', true));
一旦握手完成,HTTP 連接就升級為 WebSocket,之后傳輸的所有數據都不再是 HTTP 格式,而是 WebSocket 幀格式。
三、WebSocket 數據幀結構
WebSocket 的通信是以 幀(frame) 為單位的,每一幀都包含:
幀結構(簡化圖)
0 1 2 3
+-------+-------+---------------+-------------------------------+
|FIN| RSV | OPCODE | MASK | Payload Len | Extended Len | MASK Key |
+-------+-------+---------------+-------------------------------+
| Payload Data (possibly masked) |
+---------------------------------------------------------------+
重要字段:
| 字段名 | 描述 |
|---|---|
| FIN | 1 位,是否是消息最后一幀。通常為 1。 |
| Opcode | 表示幀的類型(文本、二進制、ping 等)。 |
| MASK | 1 位,是否啟用掩碼(客戶端必須設置為 1,服務端返回為 0)。 |
| Payload Length | 數據長度(可能需要擴展長度字段)。 |
| Masking Key | 4 字節,客戶端加密數據使用的密鑰。 |
| Payload Data | 實際傳輸的數據,客戶端發出時必須被掩碼處理。 |
Opcode 類型
| Opcode | 描述 |
|---|---|
| 0x0 | 連續幀(后續幀) |
| 0x1 | 文本幀(UTF-8) |
| 0x2 | 二進制幀 |
| 0x8 | 關閉連接 |
| 0x9 | Ping(心跳) |
| 0xA | Pong(回應) |
示例:發送一條文本消息 "hi"(客戶端 -> 服務端)
Opcode = 0x1(文本幀)Payload = "hi",長度為 2 字節MASK = 1(客戶端發送時必須掩碼)- 使用掩碼 key 加密 payload
服務端接收到后會反掩碼還原出原始文本。
四、控制幀
控制幀是管理連接用的:
- Ping / Pong:用于心跳機制,確保連接活躍。
- Close:通知對方關閉連接,可以攜帶關閉原因和狀態碼。
五、連接關閉
當任意一方想關閉連接,會發送一個 Opcode = 0x8 的幀,并可附帶一個狀態碼(如 1000 表示正常關閉)。
六、與 Laravel Reverb 的對應關系
你之前看到的 onMessage, onControl, onClose 實際就是對上述底層幀的響應封裝:
onMessage:處理Opcode=0x1的文本幀。onControl:處理Ping/Pong/Close控制幀。onClose:對應連接斷開(可能是收到 Close 幀,或 TCP 斷了)。
總結一句話
WebSocket 是在 TCP 上建立的持久雙向通信協議,先通過 HTTP 升級,然后通過一套專門的二進制幀結構進行通信,幀可以是文本、二進制、Ping/Pong 或 Close。
WebSocket 幀結構圖

各字段說明:
- FIN (1 bit):是否為消息最后一幀(1 表示是,0 表示后面還有)。
- RSV1, RSV2, RSV3 (各 1 bit):保留位,通常為 0。
- Opcode (4 bits):
0x1:文本幀0x2:二進制幀0x8:關閉連接0x9:Ping0xA:Pong
- MASK (1 bit):
- 客戶端必須設置為 1(數據經過掩碼處理)
- 服務端必須設置為 0(不使用掩碼)
- Payload Len (7 bits):
- 小于 126:直接寫長度
- 等于 126:后面擴展 16 位表示長度
- 等于 127:后面擴展 64 位表示長度
- Extended Payload Len:只有在 Payload 長度大于等于 126 時才出現。
- Masking Key (32 bits):
- 客戶端發送數據時用此 key 掩碼實際內容
- Payload Data:真正的數據內容(可能是掩碼處理的)
好,我們繼續看一個真實的 WebSocket 抓包數據示例,演示一次客戶端發送文本消息 "hi" 的原始幀數據,以及如何解析它。
場景:瀏覽器向服務器發送消息 "hi"
我們抓包看到 WebSocket 幀如下(十六進制):
81 82 37 fa 21 3d 5f 9f 44 52
逐字節解析:
| 字節 | 含義 |
|---|---|
81 |
FIN=1, Opcode=1(文本幀) |
82 |
MASK=1, Payload length=2(表示2個字節的內容) |
37 fa 21 3d |
掩碼 key(masking key) |
5f 9f |
被掩碼處理過的 "hi" 數據 |
掩碼還原 Payload
掩碼算法(RFC6455 標準):
payload[i] = encoded[i] ^ masking_key[i % 4]
還原過程:
原始數據(masked): 0x5f 0x9f
掩碼 key: 0x37 0xfa 0x21 0x3d
還原:
byte 1: 0x5f ^ 0x37 = 0x68 = 'h'
byte 2: 0x9f ^ 0xfa = 0x69 = 'i'
→ 得出還原結果:"hi"
圖解總結:
[81] -> FIN + Opcode(0x1 = 文本幀)
[82] -> MASK=1, Payload長度=2
[37 fa 21 3d] -> 掩碼 key
[5f 9f] -> 掩碼后的 payload(hi)
還原后 payload: "hi"
再舉個服務端發送回客戶端的數據幀
假設服務端發回文本消息 "ok",不需要掩碼:
81 02 6f 6b
| 字節 | 含義 |
|---|---|
81 |
FIN=1, Opcode=1(文本幀) |
02 |
MASK=0, Payload 長度 = 2 |
6f 6b |
字符 "o" 和 "k" 的 ASCII(0x6F 0x6B) |
總結
這一節就到這里了,希望對你有用。同時,提前也提前吹下swoole的風:swoole底層處理了很多步驟,比如:

這樣簡化了上層開發,同時也放開了上層開發的處理,因此我們在使用技術框架、組件的時候應當更仔細閱讀其文檔。

浙公網安備 33010602011771號