<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      Django實時通信實戰:WebSocket與ASGI全解析(下)

      一、實戰:構建實時聊天室

      環境準備

      下面將使用 Django Channels 構建一個多用戶實時聊天室。Django Channels的介紹、安裝與配置,參考上篇。

      實現 WebSocket 消費者

      創建mysite\myapp_infra\websocket\consumers.py文件,這是處理 WebSocket 連接的核心。主要實現了如下方法:

      • connect:處理新連接,驗證用戶token,將用戶通道寫入緩存,并加入默認組
      • disconnect:處理連接斷開,刪除用戶的緩存通道記錄,將用戶從所在的房間組中移除
      • receive:處理接收到的消息。首先進行心跳檢測(ping/pong),然后驗證用戶身份,解析兩層 JSON 結構的消息內容。根據消息類型 demo-message-send 處理文本消息:若指定接收用戶則單發,否則群發廣播。
      • broadcast_message:處理群發消息,從事件中提取payload數據并發送給所有連接的客戶端
      • single_message:處理單發消息,從事件中提取payload數據并發送給指定的客戶端
      """WebScoket 消費者"""
      
      import json
      import logging
      from django.core.cache import cache
      from django.conf import settings
      from channels.generic.websocket import AsyncWebsocketConsumer
      from rest_framework_simplejwt.tokens import RefreshToken
      from rest_framework_simplejwt.exceptions import TokenError
      
      logger = logging.getLogger(__name__)
      
      
      class InfraConsumer(AsyncWebsocketConsumer):
          """基礎設施-WebSocket 異步消費者"""
      
          async def connect(self):
              """當客戶端發起 WebSocket 連接時調用"""
              query_params = self.scope["query_string"].decode()
              # 從查詢參數獲取token
              token_param = [
                  param.split("=")
                  for param in query_params.split("&")
                  if param.startswith("token=")
              ]
              if not token_param or len(token_param[0]) != 2:
                  logger.error("缺少token參數")
                  await self.close(code=4001)  # 自定義錯誤碼
                  return
      
              token = token_param[0][1]
              try:
                  # 驗證Refresh Token有效性
                  refresh = RefreshToken(token)
                  user_id = refresh["user_id"]
                  self.scope["user"] = {"id": user_id, "token_type": "refresh"}
      
                  # 登錄成功后,將用戶通道寫入緩存
                  cache.set(f"user_{user_id}_channel", self.channel_name)
                  # 加入默認組
                  self.room_group_name = settings.DEFAULT_GROUP_NAME
                  await self.channel_layer.group_add(self.room_group_name, self.channel_name)
                  # 接受客戶端的WebSocket連接請求
                  await self.accept()
              except TokenError as e:
                  logger.error("無效token")
                  await self.close(code=4003)
              except Exception as e:
                  logger.error(str(e))
                  await self.close(code=4000)
      
          async def disconnect(self, close_code):
              """當客戶端斷開 WebSocket 連接時調用"""
              # 獲取當前用戶
              user = self.scope.get("user")
              if user:
                  user_id = user.get("id")
                  # 移除用戶通道
                  cache.delete(f"user_{user_id}_channel")
                  # 將用戶從組中移除
                  room_group_name = getattr(self, "room_group_name", None)
                  if room_group_name:
                      await self.channel_layer.group_discard(
                          room_group_name, self.channel_name
                      )
      
          async def receive(self, text_data=None, bytes_data=None):
              """當接收到客戶端發送的消息時調用"""
              # 心跳檢測
              if text_data == "ping":
                  await self.send(text_data="pong")
                  return
      
              user = self.scope.get("user")
              if not user:
                  logger.warning(f"匿名用戶訪問拒絕:{text_data}")
                  return
              logger.info(f"收到用戶 {user.get('id')} 發送消息:{text_data}")
      
              # 因為消息采用了兩次JSON封裝,這里進行兩次JSON解析
              try:
                  outer_payload = json.loads(text_data)  # 外層JSON解釋
                  message_type = outer_payload.get("type")
                  raw_content = outer_payload.get("content", "{}")
                  inner_content = json.loads(raw_content)  # 內層JSON解釋
              except json.JSONDecodeError:
                  logger.error("內容解析失敗")
                  return
      
              # 處理業務消息
              if message_type == "demo-message-send":
                  message_text = inner_content.get("text", "").strip()
                  if not message_text:
                      logger.warning("消息內容不能為空")
                      return
      
                  # 構建響應字典
                  content = {
                      "fromUserId": user.get("id"),
                      "text": message_text,
                      "single": False,  # 默認群發
                  }
      
                  # 判斷接收對象
                  target_user_id = inner_content.get("toUserId")
                  if target_user_id not in [None, "", 0]:
                      # 單發
                      await self._send_single_message(target_user_id, content)
                  else:
                      # 群發廣播
                      await self._send_broadcast_message(content)
      
          def _build_response(self, content):
              """構建標準化的響應消息,進行兩次JSON封裝"""
              # 第一次封裝:添加消息類型
              inner_message = {
                  "type": "demo-message-receive",
                  "content": json.dumps(content),
              }
              # 第二次封裝:整體序列化
              return json.dumps(inner_message)
      
          async def _send_single_message(self, target_user_id, content):
              """向指定用戶發送單條消息"""
              # 獲取用戶通道
              target_channel = cache.get(f"user_{target_user_id}_channel")
              if not target_channel:
                  return False
      
              # 構建并發送消息
              message = self._build_response(content)
              await self.channel_layer.send(
                  target_channel,
                  {
                      "type": "single.message",
                      "payload": message,
                  },
              )
              return True
      
          async def _send_broadcast_message(self, content):
              """向房間內所有用戶廣播消息"""
              message = self._build_response(content)
              await self.channel_layer.group_send(
                  self.room_group_name,
                  {
                      "type": "broadcast.message",
                      "payload": message,
                  },
              )
      
          async def broadcast_message(self, event):
              """處理群發消息"""
              payload = event["payload"]
              await self.send(text_data=payload)
      
          async def single_message(self, event):
              """處理單發消息"""
              payload = event["payload"]
              await self.send(text_data=payload)
      

      點擊查看完整代碼

      配置 WebSocket 路由

      創建mysite\myapp_infra\websocket\routing.py文件,定義 WebSocket 的 URL 路由

      """WebSocket 路由配置"""
      
      from django.urls import re_path, path
      from .consumers import InfraConsumer
      
      websocket_urlpatterns = [
          # 調用as_asgi()類方法來獲取一個 ASGI 應用程序
          re_path(r"^infra/ws/?$", InfraConsumer.as_asgi()),
      ]
      

      然后在項目的mysite\mysite\asgi.py配置中添加 ASGI 路由

      import os
      from django.core.asgi import get_asgi_application
      from channels.routing import ProtocolTypeRouter, URLRouter
      from channels.auth import AuthMiddlewareStack
      
      # 設置環境變量并初始化Django應用
      os.environ.setdefault("DJANGO_SETTINGS_MODULE", "myproject.settings")
      django_application = get_asgi_application()
      
      
      def get_websocket_application():
          """延遲導入WebSocket路由(避免循環導入)"""
          from myapp_infra.websocket.routing import websocket_urlpatterns
      
          return AuthMiddlewareStack(URLRouter(websocket_urlpatterns))
      
      
      # 協議路由:區分HTTP和WebSocket請求
      application = ProtocolTypeRouter(
          {
              "http": django_application,
              "websocket": get_websocket_application(),
          }
      )
      

      點擊查看完整代碼

      創建聊天界面模板

      以Vue3界面為例,代碼實現了一個 WebSocket 客戶端界面,功能包括:

      • 連接控制:顯示連接狀態、開啟/關閉 WebSocket 連接。
      • 消息發送:輸入消息內容并選擇接收人(單發或群發),點擊發送按鈕通過 WebSocket 發送消息。
      • 消息接收與展示:監聽 WebSocket 返回的數據,解析不同類型的消息(如單發、群發、系統通知)并在右側列表中倒序展示。
      • 用戶列表獲取:頁面加載時獲取用戶列表用于選擇消息接收人。

      image-20250727152815069

      點擊查看完整代碼

      實現效果

      使用 ASGI 運行項目

      # 開發環境啟動
      uvicorn mysite.asgi:application --reload
      

      使用不同的瀏覽器,分別登錄不同的用戶,實現實時互發消息。

      image-20250727091154265

      二、生產環境部署

      Nginx 配置

      使用 Nginx 作為反向代理,添加以下配置來處理 WebSocket 連接。這段配置告訴 Nginx 如何正確處理 WebSocket 升級請求。

      location /ws/ {
          proxy_pass http://backend;
          proxy_http_version 1.1;
          proxy_set_header Upgrade $http_upgrade;
          proxy_set_header Connection "upgrade";
      }
      

      使用 Gunicorn+Uvicorn 部署

      Gunicorn(全稱 Green Unicorn)是一個用于 UNIX 的 高性能 Python WSGI HTTP 服務器。Gunicorn只能用于 Linux 系統,Windows上無法使用。

      安裝

      pip install gunicorn
      
      # 查看版本
      gunicorn -v
      gunicorn -h
      

      Gunicorn 作為進程管理器,配合 Uvicorn 工作進程處理 ASGI 應用

      • -w 工作進程的數量。默認啟動 1 個 Worker 進程
      • -k 要運行的工作進程類型。
      • -b 指定要綁定的服務器套接字。
      # 基本啟動命令
      gunicorn -b 0.0.0.0:8000 -k uvicorn.workers.UvicornWorker mysite.asgi:application
      

      優化 Worker 數量

      Gunicorn Worker 數量的設置對性能影響很大,推薦的公式是:(CPU核心數 × 2) + 1。我們可以編寫一個啟動腳本來自動計算最佳 Worker 數量

      #!/bin/bash
      # run_gunicorn.sh
      
      # 計算最佳 Worker 數
      CORES=$(nproc)
      WORKERS=$((CORES * 2 + 1))
      
      # 限制最大Worker數(避免內存不足)
      MAX_WORKERS=8
      if [ $WORKERS -gt $MAX_WORKERS ]; then
        WORKERS=$MAX_WORKERS
      fi
      
      # 啟動命令
      gunicorn -b 0.0.0.0:8000 \
        --workers $WORKERS \
        --max-requests 1000 \         # 預防內存泄漏
        --timeout 120 \               # 超時控制
        --keep-alive 5 \              # Keep-Alive
        -k uvicorn.workers.UvicornWorker \
        mysite.asgi:application
      

      添加執行權限并運行

      chmod +x run_gunicorn.sh
      ./run_gunicorn.sh
      

      您正在閱讀的是《Django從入門到實戰》專欄!關注不迷路~

      posted @ 2025-07-27 15:48  小王子1024  閱讀(183)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 丁香婷婷色综合激情五月| 26uuu另类亚洲欧美日本| 精品精品国产国产自在线| 亚洲av免费成人在线| 国产AV无码专区亚洲AV紧身裤| 成人免费无码大片a毛片| 亚洲人成网线在线播放VA| 柠檬福利第一导航在线| 亚洲精品国产成人| 中国少妇人妻xxxxx| 香蕉乱码成人久久天堂爱| 国产综合久久99久久| 一区二区三区精品自拍视频| 成人网站免费观看永久视频下载| 亚洲欧美人成网站在线观看看| 无码熟妇人妻AV影音先锋| 日韩一区二区三区高清视频| 亚洲欧洲一区二区免费| japanese无码中文字幕| 久久香蕉国产线看观看猫咪av| 国产精品一二三区蜜臀av| 麻豆成人精品国产免费| 久久综合九色综合欧洲98| 日韩av一区二区高清不卡| 久久人人97超碰人人澡爱香蕉| 国产大尺度一区二区视频| 亚洲av日韩av永久无码电影| 精品国产中文字幕在线| 亚洲护士一区二区三区| 无码精品人妻一区二区三区中| 国产国语一级毛片| 国产自国产自愉自愉免费24区| 麻豆国产va免费精品高清在线| 久久综合国产色美利坚| 欧美人与动人物牲交免费观看| 国产亚洲综合区成人国产| 国产无遮挡又黄又爽不要vip软件| 广德县| 91福利视频一区二区| 国产h视频在线观看| 国产高颜值极品嫩模视频|