Laravel11 從0開發 Swoole-Reverb 擴展包(四) - 觸發一個廣播事件是如何到reverb服務的呢?
前情提要
今天這節開始,我們就從reverb啟動這個過程進行源代碼的學習分析。
廣播驅動
但是在看reverb啟動過程前,這節我們先看看laravel Broadcasting 的新驅動的這部分源碼,當我們使用reverb后,廣播事件的觸發等操作就由新的驅動負責了。
追蹤源碼的技巧
我是根據reverb是一個新的driver,于是我通過ReverbDrvier在laravel包目錄進行了查找(搜索或者 ctrl+p),然后快速的就在vendor/laravel/framework/src/Illuminate/Broadcasting/BroadcastManager.php
找到了我們新的驅動創建的方法:
protected function createReverbDriver(array $config)
{
return $this->createPusherDriver($config);
}
BroadcastManager 的 driver方法 通過 工廠模式來驅動實例的創建,通過策略模式來替換不同的驅動。
通過代碼我們知道reverb是兼容適配的pusher,因此看到這里的朋友可以去看看我第二節寫的pusher 的知識。
觸發一個廣播事件是如何到reverb服務的呢?
要找到這個具體的實現,我們還是從追源代碼開始,一步步找。首先,我們知道觸發事件的調用方法有好幾個操作方式,我大致羅列如下:
// event(new \App\Events\DemoPushEvent('你好呀,歡迎你們使用我們的聊天軟件'));
// \App\Events\DemoPushEvent::dispatch('你好呀,歡迎你們使用我們的聊天軟件');
// broadcast(new \App\Events\DemoPushEvent(Auth::user()->getAuthIdentifier(), '你好呀,歡迎你們使用我們的聊天軟件'))->toOthers();
我們就用 \App\Events\DemoPushEvent::dispatch 這種方式來追代碼。我們通過命令行生成的事件,都會使用一個特性:Dispatchable,dispatch就是里面的方法,而dispatch實際調用的代碼是:
app('events')->dispatch(...$args);
接下來,我們就追到了vendor/laravel/framework/src/Illuminate/Events/Dispatcher.php里的public function dispatch($event, $payload = [], $halt = false)方法了,這個dispatch方法的作用就是 觸發事件并調用偵聽器。繼續往下,我們看到了找到核心的方法:protected function invokeListeners($event, $payload, $halt = false),這個方法里面我們要找的代碼是:
if ($this->shouldBroadcast($payload)) {
$this->broadcastEvent($payload[0]);
}
而shouldBroadcast也對應了文檔說明的,如何確定是:確定負載是否具有可廣播事件,同時可以廣播的事件。關鍵條件:實現ShouldBroadcast和broadcastWhen是否 為真
protected function shouldBroadcast(array $payload)
{
return isset($payload[0]) &&
$payload[0] instanceof ShouldBroadcast &&
$this->broadcastWhen($payload[0]);
}
在繼續往下走,進入broadcastEvent后,我們回到了vendor/laravel/framework/src/Illuminate/Broadcasting/BroadcastManager.phpBroadcastManager里面,然后我們就開始分析這個queue方法,它的作用實際就是:將給定事件排隊以進行廣播,不過如果事件實現的是ShouldBroadcastNow接口,那么就會立即廣播出去。
public function queue($event)
{
if ($event instanceof ShouldBroadcastNow ||
(is_object($event) &&
method_exists($event, 'shouldBroadcastNow') &&
$event->shouldBroadcastNow())) {
return $this->app->make(BusDispatcherContract::class)->dispatchNow(new BroadcastEvent(clone $event));
}
$queue = null;
if (method_exists($event, 'broadcastQueue')) {
$queue = $event->broadcastQueue();
} elseif (isset($event->broadcastQueue)) {
$queue = $event->broadcastQueue;
} elseif (isset($event->queue)) {
$queue = $event->queue;
}
$broadcastEvent = new BroadcastEvent(clone $event);
if ($event instanceof ShouldBeUnique) {
$broadcastEvent = new UniqueBroadcastEvent(clone $event);
if ($this->mustBeUniqueAndCannotAcquireLock($broadcastEvent)) {
return;
}
}
$this->app->make('queue')
->connection($event->connection ?? null)
->pushOn($queue, $broadcastEvent);
}
因此,我們就走到了vendor/laravel/framework/src/Illuminate/Bus/Dispatcher.php文件里來分析。
ShouldBroadcastNow 立即觸發
public function dispatchNow($command, $handler = null)
{
$uses = class_uses_recursive($command);
if (isset($uses[InteractsWithQueue::class], $uses[Queueable::class]) && ! $command->job) {
$command->setJob(new SyncJob($this->container, json_encode([]), 'sync', 'sync'));
}
if ($handler || $handler = $this->getCommandHandler($command)) {
$callback = function ($command) use ($handler) {
$method = method_exists($handler, 'handle') ? 'handle' : '__invoke';
return $handler->{$method}($command);
};
} else {
$callback = function ($command) {
$method = method_exists($command, 'handle') ? 'handle' : '__invoke';
return $this->container->call([$command, $method]);
};
}
return $this->pipeline->send($command)->through($this->pipes)->then($callback);
}
這個方法的作用是:將命令分派到當前進程中的相應處理程序,而不使用同步隊列。
整個方法還是有些晦澀,我就替觀眾老爺用權威的AI給我們進行這個代碼分析:
這段代碼來自 Laravel 11 的 Illuminate\Bus\Dispatcher,它的 dispatchNow() 方法用于同步地執行 Laravel 任務(Command)。在 Laravel 的 任務派發機制 中,這個方法用于立即執行給定的命令(Command),而不會通過隊列異步執行。
1. 處理命令的隊列屬性
$uses = class_uses_recursive($command);
if (isset($uses[InteractsWithQueue::class], $uses[Queueable::class]) && ! $command->job) {
$command->setJob(new SyncJob($this->container, json_encode([]), 'sync', 'sync'));
}
解析
class_uses_recursive($command):獲取$command類及其所有父類、trait 使用的 trait 列表。InteractsWithQueue::class和Queueable::class:- 如果
$command使用了這兩個 trait,說明它是一個可以被隊列化的任務。 ! $command->job:如果這個任務沒有被分配到具體的 Job,說明它還沒有真正進入隊列。setJob(new SyncJob(...)):如果滿足條件,就創建一個同步任務 (SyncJob),表示這個任務將在當前進程內立即執行,而不是放入隊列異步執行。
- 如果
2. 確定任務執行的處理器
if ($handler || $handler = $this->getCommandHandler($command)) {
$callback = function ($command) use ($handler) {
$method = method_exists($handler, 'handle') ? 'handle' : '__invoke';
return $handler->{$method}($command);
};
} else {
$callback = function ($command) {
$method = method_exists($command, 'handle') ? 'handle' : '__invoke';
return $this->container->call([$command, $method]);
};
}
解析
getCommandHandler($command):獲取與$command關聯的處理器(Handler)。- 如果
$command具有一個特定的 Handler 類,那么它將使用handle()或__invoke()方法來執行該任務。 - 如果
$command自身 具有handle()方法,則直接調用$command->handle()。 $this->container->call([...]):使用 Laravel 服務容器 調用handle()方法,支持 依賴注入。
3. 通過管道(Pipeline)執行命令
return $this->pipeline->send($command)->through($this->pipes)->then($callback);
解析
- Laravel Pipeline 是一個中間件管道,類似 HTTP 中間件:
send($command): 將$command任務送入處理流程。through($this->pipes): 通過 任務中間件 進行處理,比如日志、權限檢查等。then($callback): 執行最終的任務邏輯(上面定義的$callback)。
流程總結
-
檢查
$command是否支持隊列:- 如果它實現了
Queueable,但沒有job,就為它創建一個SyncJob,讓它立即執行,而不是進入隊列。
- 如果它實現了
-
確定如何執行
$command:- 如果
$command有專門的 處理器 (Handler),就用handle()或__invoke()方法執行。 - 如果
$command自身 有handle()方法,就直接調用它。
- 如果
-
通過 Laravel Pipeline 處理任務:
- 任務可能會經過多個中間件(類似 HTTP 請求的中間件)。
- 最終,
then($callback)負責真正執行任務。
dispatchNow()強制同步執行 Laravel 任務,不會進入隊列。- 如果命令使用了
Queueable,但未進入隊列,dispatchNow()會創建SyncJob,仍然同步執行。 - 任務可以有單獨的 Handler,Laravel 會自動解析并執行它。
- 使用
Pipeline允許任務經過多個中間件,例如日志、權限檢查等。
好的,上面的,AI說的很清楚了,那么我通過dump給大家看下拿到的$command到底是啥:
Illuminate\Broadcasting\BroadcastEvent Object (
[event] => App\Events\DemoPushEvent Object (
[userId] => 1 [message] => 你好呀,歡迎你們使用我們的聊天軟件 [type] => system [socket] => ) [tries] => [timeout] => [backoff] => [maxExceptions] => [connection] => [queue] => [delay] => [afterCommit] => [middleware] => Array ( ) [chained] => Array ( ) [chainConnection] => [chainQueue] => [chainCatchCallbacks] => )
打印后,我們就又悟了,繼續走到:Illuminate\Broadcasting\BroadcastEvent , 到了這里,我們就看handle方法:處理排隊的作業
public function handle(BroadcastingFactory $manager)
{
$name = method_exists($this->event, 'broadcastAs')
? $this->event->broadcastAs() : get_class($this->event);
$channels = Arr::wrap($this->event->broadcastOn());
if (empty($channels)) {
return;
}
$connections = method_exists($this->event, 'broadcastConnections')
? $this->event->broadcastConnections()
: [null];
$payload = $this->getPayloadFromEvent($this->event);
foreach ($connections as $connection) {
$manager->connection($connection)->broadcast(
$this->getConnectionChannels($channels, $connection),
$name,
$this->getConnectionPayload($payload, $connection)
);
}
}
整個方法里,$connections 中的broadcastConnections也是文檔上提到的,可以針對事件定義廣播連接,獲取應廣播事件的廣播連接。 事件如果沒有連接,默認的值:[null],然后是$payload 拿到就是你事件定義的屬性后解析的數組。再然后就是遍歷連接執行廣播了。
廣播的操作又把我們帶回到vendor/laravel/framework/src/Illuminate/Broadcasting/Broadcasters/PusherBroadcaster.php Pusher廣播驅動了,真的是山路十八彎,寫到這里,突然想到了:“過度封裝”,不過我個人還是覺得設計優雅。回到正題,我們來看public function broadcast(array $channels, $event, array $payload = [])這個方法:廣播給定的事件
public function broadcast(array $channels, $event, array $payload = [])
{
$socket = Arr::pull($payload, 'socket');
$parameters = $socket !== null ? ['socket_id' => $socket] : [];
$channels = new Collection($this->formatChannels($channels));
try {
$channels->chunk(100)->each(function ($channels) use ($event, $payload, $parameters) {
$this->pusher->trigger($channels->toArray(), $event, $payload, $parameters);
});
} catch (ApiErrorException $e) {
throw new BroadcastException(
sprintf('Pusher error: %s.', $e->getMessage())
);
}
}
我們重點關心$this->pusher->trigger,觸發請求這里。首先,我還是dump trigger的幾個參數給大家看看:
array(1) { [0]=> string(11) "demo-push.1" }
string(24) "App\Events\DemoPushEvent"
array(3) { ["userId"]=> int(1) ["message"]=>
string(51) "你好呀,歡迎你們使用我們的聊天軟件" ["type"]=> string(6) "system" }
array(0) { }
走到這里的朋友,??我們,已經到了最后一步了,因此,我們繼續走到了vendor/pusher/pusher-php-server/src/Pusher.php,然后我們就來看trigger 方法。這個方法的作用是:通過提供事件名稱和有效負載來觸發事件。(可選)提供套接字 ID 以排除客戶端(很可能是發送方)。
trigger實際就是發送了一個post請求,而這個post請求的地址是啥,這個是我們關心的,我繼續給大家打印:
http://localhost:8083/apps/531292/events
那這個host和port 是在哪里來的呢,這個就是我們 broadcasting.php 廣播配置文件下面配置:
connections' => [
'reverb' => [
'driver' => 'reverb',
'key' => env('REVERB_APP_KEY'),
'secret' => env('REVERB_APP_SECRET'),
'app_id' => env('REVERB_APP_ID'),
'options' => [
'host' => env('REVERB_HOST'),
'port' => env('REVERB_PORT', 443),
'scheme' => env('REVERB_SCHEME', 'https'),
'useTLS' => env('REVERB_SCHEME', 'https') === 'https',
],
'client_options' => [
// Guzzle client options: https://docs.guzzlephp.org/en/stable/request-options.html
],
],
也就是我們reverb???服務。
總結
帶著大家走了一遍立即觸發廣播的流程,我們最終知道了:實際通過http請求與reverb通信。那么,延遲隊列發送,最后也應該是這樣的,我就不帶著大家繼續追代碼了(如果有誤,可以評論區留言指正,一起學習)。我們下一節就來給大家分析reverb服務端的程序代碼。如果有興趣的朋友,可以從第2節看起走,因為里面涉及到pusher的通信對接數據協議。

浙公網安備 33010602011771號