<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      Laravel11 從0開發 Swoole-Reverb 擴展包(四) - 觸發一個廣播事件是如何到reverb服務的呢?

      前情提要

      今天這節開始,我們就從reverb啟動這個過程進行源代碼的學習分析。

      廣播驅動

      但是在看reverb啟動過程前,這節我們先看看laravel Broadcasting 的新驅動的這部分源碼,當我們使用reverb后,廣播事件的觸發等操作就由新的驅動負責了。

      追蹤源碼的技巧

      我是根據reverb是一個新的driver,于是我通過ReverbDrvier在laravel包目錄進行了查找(搜索或者 ctrl+p),然后快速的就在vendor/laravel/framework/src/Illuminate/Broadcasting/BroadcastManager.php
      找到了我們新的驅動創建的方法:

         protected function createReverbDriver(array $config)
          {
              return $this->createPusherDriver($config);
          }
      

      BroadcastManager 的 driver方法 通過 工廠模式來驅動實例的創建,通過策略模式來替換不同的驅動。
      通過代碼我們知道reverb是兼容適配的pusher,因此看到這里的朋友可以去看看我第二節寫的pusher 的知識。

      觸發一個廣播事件是如何到reverb服務的呢?

      要找到這個具體的實現,我們還是從追源代碼開始,一步步找。首先,我們知道觸發事件的調用方法有好幾個操作方式,我大致羅列如下:

      //        event(new \App\Events\DemoPushEvent('你好呀,歡迎你們使用我們的聊天軟件'));
      //        \App\Events\DemoPushEvent::dispatch('你好呀,歡迎你們使用我們的聊天軟件');
      // broadcast(new \App\Events\DemoPushEvent(Auth::user()->getAuthIdentifier(), '你好呀,歡迎你們使用我們的聊天軟件'))->toOthers();
      

      我們就用 \App\Events\DemoPushEvent::dispatch 這種方式來追代碼。我們通過命令行生成的事件,都會使用一個特性:Dispatchable,dispatch就是里面的方法,而dispatch實際調用的代碼是:

       app('events')->dispatch(...$args);
      

      接下來,我們就追到了vendor/laravel/framework/src/Illuminate/Events/Dispatcher.php里的public function dispatch($event, $payload = [], $halt = false)方法了,這個dispatch方法的作用就是 觸發事件并調用偵聽器。繼續往下,我們看到了找到核心的方法:protected function invokeListeners($event, $payload, $halt = false),這個方法里面我們要找的代碼是:

              if ($this->shouldBroadcast($payload)) {
                  $this->broadcastEvent($payload[0]);
              }
      

      shouldBroadcast也對應了文檔說明的,如何確定是:確定負載是否具有可廣播事件,同時可以廣播的事件。關鍵條件:實現ShouldBroadcast和broadcastWhen是否 為真

          protected function shouldBroadcast(array $payload)
         {
             return isset($payload[0]) &&
                    $payload[0] instanceof ShouldBroadcast &&
                    $this->broadcastWhen($payload[0]);
         }
      

      在繼續往下走,進入broadcastEvent后,我們回到了vendor/laravel/framework/src/Illuminate/Broadcasting/BroadcastManager.phpBroadcastManager里面,然后我們就開始分析這個queue方法,它的作用實際就是:將給定事件排隊以進行廣播,不過如果事件實現的是ShouldBroadcastNow接口,那么就會立即廣播出去。

      public function queue($event)
          {
              if ($event instanceof ShouldBroadcastNow ||
                  (is_object($event) &&
                   method_exists($event, 'shouldBroadcastNow') &&
                   $event->shouldBroadcastNow())) {
                  return $this->app->make(BusDispatcherContract::class)->dispatchNow(new BroadcastEvent(clone $event));
              }
      
              $queue = null;
      
              if (method_exists($event, 'broadcastQueue')) {
                  $queue = $event->broadcastQueue();
              } elseif (isset($event->broadcastQueue)) {
                  $queue = $event->broadcastQueue;
              } elseif (isset($event->queue)) {
                  $queue = $event->queue;
              }
      
              $broadcastEvent = new BroadcastEvent(clone $event);
      
              if ($event instanceof ShouldBeUnique) {
                  $broadcastEvent = new UniqueBroadcastEvent(clone $event);
      
                  if ($this->mustBeUniqueAndCannotAcquireLock($broadcastEvent)) {
                      return;
                  }
              }
      
              $this->app->make('queue')
                  ->connection($event->connection ?? null)
                  ->pushOn($queue, $broadcastEvent);
          }
      

      因此,我們就走到了vendor/laravel/framework/src/Illuminate/Bus/Dispatcher.php文件里來分析。

      ShouldBroadcastNow 立即觸發

          public function dispatchNow($command, $handler = null)
          {
              $uses = class_uses_recursive($command);
      
              if (isset($uses[InteractsWithQueue::class], $uses[Queueable::class]) && ! $command->job) {
                  $command->setJob(new SyncJob($this->container, json_encode([]), 'sync', 'sync'));
              }
      
              if ($handler || $handler = $this->getCommandHandler($command)) {
                  $callback = function ($command) use ($handler) {
                      $method = method_exists($handler, 'handle') ? 'handle' : '__invoke';
      
                      return $handler->{$method}($command);
                  };
              } else {
                  $callback = function ($command) {
                      $method = method_exists($command, 'handle') ? 'handle' : '__invoke';
      
                      return $this->container->call([$command, $method]);
                  };
              }
      
              return $this->pipeline->send($command)->through($this->pipes)->then($callback);
          }
      

      這個方法的作用是:將命令分派到當前進程中的相應處理程序,而不使用同步隊列。
      整個方法還是有些晦澀,我就替觀眾老爺用權威的AI給我們進行這個代碼分析:
      這段代碼來自 Laravel 11 的 Illuminate\Bus\Dispatcher,它的 dispatchNow() 方法用于同步地執行 Laravel 任務(Command)。在 Laravel 的 任務派發機制 中,這個方法用于立即執行給定的命令(Command),而不會通過隊列異步執行。


      1. 處理命令的隊列屬性
      $uses = class_uses_recursive($command);
      
      if (isset($uses[InteractsWithQueue::class], $uses[Queueable::class]) && ! $command->job) {
          $command->setJob(new SyncJob($this->container, json_encode([]), 'sync', 'sync'));
      }
      
      解析
      • class_uses_recursive($command):獲取 $command 類及其所有父類、trait 使用的 trait 列表。
      • InteractsWithQueue::classQueueable::class
        • 如果 $command 使用了這兩個 trait,說明它是一個可以被隊列化的任務。
        • ! $command->job:如果這個任務沒有被分配到具體的 Job,說明它還沒有真正進入隊列。
        • setJob(new SyncJob(...)):如果滿足條件,就創建一個同步任務 (SyncJob),表示這個任務將在當前進程內立即執行,而不是放入隊列異步執行。

      2. 確定任務執行的處理器
      if ($handler || $handler = $this->getCommandHandler($command)) {
          $callback = function ($command) use ($handler) {
              $method = method_exists($handler, 'handle') ? 'handle' : '__invoke';
      
              return $handler->{$method}($command);
          };
      } else {
          $callback = function ($command) {
              $method = method_exists($command, 'handle') ? 'handle' : '__invoke';
      
              return $this->container->call([$command, $method]);
          };
      }
      
      解析
      • getCommandHandler($command):獲取與 $command 關聯的處理器(Handler)。
      • 如果 $command 具有一個特定的 Handler 類,那么它將使用 handle()__invoke() 方法來執行該任務。
      • 如果 $command 自身 具有 handle() 方法,則直接調用 $command->handle()
      • $this->container->call([...]):使用 Laravel 服務容器 調用 handle() 方法,支持 依賴注入

      3. 通過管道(Pipeline)執行命令
      return $this->pipeline->send($command)->through($this->pipes)->then($callback);
      
      解析
      • Laravel Pipeline 是一個中間件管道,類似 HTTP 中間件:
        • send($command): 將 $command 任務送入處理流程。
        • through($this->pipes): 通過 任務中間件 進行處理,比如日志、權限檢查等。
        • then($callback): 執行最終的任務邏輯(上面定義的 $callback)。

      流程總結

      1. 檢查 $command 是否支持隊列

        • 如果它實現了 Queueable,但沒有 job,就為它創建一個 SyncJob,讓它立即執行,而不是進入隊列。
      2. 確定如何執行 $command

        • 如果 $command 有專門的 處理器 (Handler),就用 handle()__invoke() 方法執行。
        • 如果 $command 自身handle() 方法,就直接調用它。
      3. 通過 Laravel Pipeline 處理任務

        • 任務可能會經過多個中間件(類似 HTTP 請求的中間件)。
        • 最終,then($callback) 負責真正執行任務。

      • dispatchNow() 強制同步執行 Laravel 任務,不會進入隊列。
      • 如果命令使用了 Queueable,但未進入隊列dispatchNow() 會創建 SyncJob,仍然同步執行。
      • 任務可以有單獨的 Handler,Laravel 會自動解析并執行它。
      • 使用 Pipeline 允許任務經過多個中間件,例如日志、權限檢查等。

      好的,上面的,AI說的很清楚了,那么我通過dump給大家看下拿到的$command到底是啥:

      Illuminate\Broadcasting\BroadcastEvent Object ( 
      [event] => App\Events\DemoPushEvent Object (
      [userId] => 1 [message] => 你好呀,歡迎你們使用我們的聊天軟件 [type] => system [socket] => ) [tries] => [timeout] => [backoff] => [maxExceptions] => [connection] => [queue] => [delay] => [afterCommit] => [middleware] => Array ( ) [chained] => Array ( ) [chainConnection] => [chainQueue] => [chainCatchCallbacks] => )
      

      打印后,我們就又悟了,繼續走到:Illuminate\Broadcasting\BroadcastEvent , 到了這里,我們就看handle方法:處理排隊的作業

          public function handle(BroadcastingFactory $manager)
          {
              $name = method_exists($this->event, 'broadcastAs')
                      ? $this->event->broadcastAs() : get_class($this->event);
      
              $channels = Arr::wrap($this->event->broadcastOn());
      
              if (empty($channels)) {
                  return;
              }
      
              $connections = method_exists($this->event, 'broadcastConnections')
                                  ? $this->event->broadcastConnections()
                                  : [null];
      
              $payload = $this->getPayloadFromEvent($this->event);
      
              foreach ($connections as $connection) {
                  $manager->connection($connection)->broadcast(
                      $this->getConnectionChannels($channels, $connection),
                      $name,
                      $this->getConnectionPayload($payload, $connection)
                  );
              }
          }
      

      整個方法里,$connections 中的broadcastConnections也是文檔上提到的,可以針對事件定義廣播連接,獲取應廣播事件的廣播連接。 事件如果沒有連接,默認的值:[null],然后是$payload 拿到就是你事件定義的屬性后解析的數組。再然后就是遍歷連接執行廣播了。
      廣播的操作又把我們帶回到vendor/laravel/framework/src/Illuminate/Broadcasting/Broadcasters/PusherBroadcaster.php Pusher廣播驅動了,真的是山路十八彎,寫到這里,突然想到了:“過度封裝”,不過我個人還是覺得設計優雅。回到正題,我們來看public function broadcast(array $channels, $event, array $payload = [])這個方法:廣播給定的事件

          public function broadcast(array $channels, $event, array $payload = [])
          {
              $socket = Arr::pull($payload, 'socket');
      
              $parameters = $socket !== null ? ['socket_id' => $socket] : [];
      
              $channels = new Collection($this->formatChannels($channels));
      
              try {
                  $channels->chunk(100)->each(function ($channels) use ($event, $payload, $parameters) {
                      $this->pusher->trigger($channels->toArray(), $event, $payload, $parameters);
                  });
              } catch (ApiErrorException $e) {
                  throw new BroadcastException(
                      sprintf('Pusher error: %s.', $e->getMessage())
                  );
              }
          }
      

      我們重點關心$this->pusher->trigger,觸發請求這里。首先,我還是dump trigger的幾個參數給大家看看:

      array(1) { [0]=> string(11) "demo-push.1" } 
      string(24) "App\Events\DemoPushEvent" 
      array(3) { ["userId"]=> int(1) ["message"]=> 
      string(51) "你好呀,歡迎你們使用我們的聊天軟件" ["type"]=> string(6) "system" } 
      array(0) { }
      

      走到這里的朋友,??我們,已經到了最后一步了,因此,我們繼續走到了vendor/pusher/pusher-php-server/src/Pusher.php,然后我們就來看trigger 方法。這個方法的作用是:通過提供事件名稱和有效負載來觸發事件。(可選)提供套接字 ID 以排除客戶端(很可能是發送方)。
      trigger實際就是發送了一個post請求,而這個post請求的地址是啥,這個是我們關心的,我繼續給大家打印:
      http://localhost:8083/apps/531292/events
      那這個host和port 是在哪里來的呢,這個就是我們 broadcasting.php 廣播配置文件下面配置:

      connections' => [
      
              'reverb' => [
                  'driver' => 'reverb',
                  'key' => env('REVERB_APP_KEY'),
                  'secret' => env('REVERB_APP_SECRET'),
                  'app_id' => env('REVERB_APP_ID'),
                  'options' => [
                      'host' => env('REVERB_HOST'),
                      'port' => env('REVERB_PORT', 443),
                      'scheme' => env('REVERB_SCHEME', 'https'),
                      'useTLS' => env('REVERB_SCHEME', 'https') === 'https',
                  ],
                  'client_options' => [
                      // Guzzle client options: https://docs.guzzlephp.org/en/stable/request-options.html
                  ],
              ],
      

      也就是我們reverb???服務。

      總結

      帶著大家走了一遍立即觸發廣播的流程,我們最終知道了:實際通過http請求與reverb通信。那么,延遲隊列發送,最后也應該是這樣的,我就不帶著大家繼續追代碼了(如果有誤,可以評論區留言指正,一起學習)。我們下一節就來給大家分析reverb服務端的程序代碼。如果有興趣的朋友,可以從第2節看起走,因為里面涉及到pusher的通信對接數據協議。

      posted @ 2025-03-12 17:09  wanzij  閱讀(319)  評論(0)    收藏  舉報
      TOP
      主站蜘蛛池模板: 一区二区三区放荡人妻| 亚洲欧洲日产国无高清码图片| 国产成人欧美一区二区三区在线| 麻豆国产AV剧情偷闻女邻居内裤 | 体验区试看120秒啪啪免费| 吐鲁番市| 麻豆一区二区三区蜜桃免费| 色吊a中文字幕一二三区| 国产美女69视频免费观看| 国产一区二区三区内射高清| 免费人成再在线观看视频| 人人澡人摸人人添| 日日碰狠狠添天天爽五月婷| 色综合激情丁香七月色综合| 少妇人妻偷人偷人精品| 老子午夜精品无码| 国产激情文学亚洲区综合| 日韩成av在线免费观看| japanese边做边乳喷| 久久天堂无码av网站| 亚洲熟妇熟女久久精品综合| 亚洲人成网站在线播放2019| 亚洲精品乱码久久久久久按摩高清| 亚洲精品自拍在线视频| 欧美人与动zozo| 亚洲欧美自偷自拍视频图片| 亚洲熟妇中文字幕五十路| 国产普通话对白刺激| 亚洲人妻系列中文字幕| 亚洲无人区一码二码三码| 手机在线看片不卡中文字幕| 香港日本三级亚洲三级| 玩两个丰满老熟女久久网| 国产精品久久久天天影视香蕉| 在线日韩日本国产亚洲| 国内精品久久久久影院日本| 亚洲精品自拍在线视频| 一区二区三区四区黄色片| 欧美成人精品一级在线观看| 亚洲中文无码手机永久| 久青草视频在线视频在线|