Django實時通信實戰:WebSocket與ASGI全解析(下)
一、實戰:構建實時聊天室
環境準備
下面將使用 Django Channels 構建一個多用戶實時聊天室。Django Channels的介紹、安裝與配置,參考上篇。
實現 WebSocket 消費者
創建mysite\myapp_infra\websocket\consumers.py文件,這是處理 WebSocket 連接的核心。主要實現了如下方法:
connect:處理新連接,驗證用戶token,將用戶通道寫入緩存,并加入默認組disconnect:處理連接斷開,刪除用戶的緩存通道記錄,將用戶從所在的房間組中移除receive:處理接收到的消息。首先進行心跳檢測(ping/pong),然后驗證用戶身份,解析兩層 JSON 結構的消息內容。根據消息類型demo-message-send處理文本消息:若指定接收用戶則單發,否則群發廣播。broadcast_message:處理群發消息,從事件中提取payload數據并發送給所有連接的客戶端single_message:處理單發消息,從事件中提取payload數據并發送給指定的客戶端
"""WebScoket 消費者"""
import json
import logging
from django.core.cache import cache
from django.conf import settings
from channels.generic.websocket import AsyncWebsocketConsumer
from rest_framework_simplejwt.tokens import RefreshToken
from rest_framework_simplejwt.exceptions import TokenError
logger = logging.getLogger(__name__)
class InfraConsumer(AsyncWebsocketConsumer):
"""基礎設施-WebSocket 異步消費者"""
async def connect(self):
"""當客戶端發起 WebSocket 連接時調用"""
query_params = self.scope["query_string"].decode()
# 從查詢參數獲取token
token_param = [
param.split("=")
for param in query_params.split("&")
if param.startswith("token=")
]
if not token_param or len(token_param[0]) != 2:
logger.error("缺少token參數")
await self.close(code=4001) # 自定義錯誤碼
return
token = token_param[0][1]
try:
# 驗證Refresh Token有效性
refresh = RefreshToken(token)
user_id = refresh["user_id"]
self.scope["user"] = {"id": user_id, "token_type": "refresh"}
# 登錄成功后,將用戶通道寫入緩存
cache.set(f"user_{user_id}_channel", self.channel_name)
# 加入默認組
self.room_group_name = settings.DEFAULT_GROUP_NAME
await self.channel_layer.group_add(self.room_group_name, self.channel_name)
# 接受客戶端的WebSocket連接請求
await self.accept()
except TokenError as e:
logger.error("無效token")
await self.close(code=4003)
except Exception as e:
logger.error(str(e))
await self.close(code=4000)
async def disconnect(self, close_code):
"""當客戶端斷開 WebSocket 連接時調用"""
# 獲取當前用戶
user = self.scope.get("user")
if user:
user_id = user.get("id")
# 移除用戶通道
cache.delete(f"user_{user_id}_channel")
# 將用戶從組中移除
room_group_name = getattr(self, "room_group_name", None)
if room_group_name:
await self.channel_layer.group_discard(
room_group_name, self.channel_name
)
async def receive(self, text_data=None, bytes_data=None):
"""當接收到客戶端發送的消息時調用"""
# 心跳檢測
if text_data == "ping":
await self.send(text_data="pong")
return
user = self.scope.get("user")
if not user:
logger.warning(f"匿名用戶訪問拒絕:{text_data}")
return
logger.info(f"收到用戶 {user.get('id')} 發送消息:{text_data}")
# 因為消息采用了兩次JSON封裝,這里進行兩次JSON解析
try:
outer_payload = json.loads(text_data) # 外層JSON解釋
message_type = outer_payload.get("type")
raw_content = outer_payload.get("content", "{}")
inner_content = json.loads(raw_content) # 內層JSON解釋
except json.JSONDecodeError:
logger.error("內容解析失敗")
return
# 處理業務消息
if message_type == "demo-message-send":
message_text = inner_content.get("text", "").strip()
if not message_text:
logger.warning("消息內容不能為空")
return
# 構建響應字典
content = {
"fromUserId": user.get("id"),
"text": message_text,
"single": False, # 默認群發
}
# 判斷接收對象
target_user_id = inner_content.get("toUserId")
if target_user_id not in [None, "", 0]:
# 單發
await self._send_single_message(target_user_id, content)
else:
# 群發廣播
await self._send_broadcast_message(content)
def _build_response(self, content):
"""構建標準化的響應消息,進行兩次JSON封裝"""
# 第一次封裝:添加消息類型
inner_message = {
"type": "demo-message-receive",
"content": json.dumps(content),
}
# 第二次封裝:整體序列化
return json.dumps(inner_message)
async def _send_single_message(self, target_user_id, content):
"""向指定用戶發送單條消息"""
# 獲取用戶通道
target_channel = cache.get(f"user_{target_user_id}_channel")
if not target_channel:
return False
# 構建并發送消息
message = self._build_response(content)
await self.channel_layer.send(
target_channel,
{
"type": "single.message",
"payload": message,
},
)
return True
async def _send_broadcast_message(self, content):
"""向房間內所有用戶廣播消息"""
message = self._build_response(content)
await self.channel_layer.group_send(
self.room_group_name,
{
"type": "broadcast.message",
"payload": message,
},
)
async def broadcast_message(self, event):
"""處理群發消息"""
payload = event["payload"]
await self.send(text_data=payload)
async def single_message(self, event):
"""處理單發消息"""
payload = event["payload"]
await self.send(text_data=payload)
配置 WebSocket 路由
創建mysite\myapp_infra\websocket\routing.py文件,定義 WebSocket 的 URL 路由
"""WebSocket 路由配置"""
from django.urls import re_path, path
from .consumers import InfraConsumer
websocket_urlpatterns = [
# 調用as_asgi()類方法來獲取一個 ASGI 應用程序
re_path(r"^infra/ws/?$", InfraConsumer.as_asgi()),
]
然后在項目的mysite\mysite\asgi.py配置中添加 ASGI 路由
import os
from django.core.asgi import get_asgi_application
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack
# 設置環境變量并初始化Django應用
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "myproject.settings")
django_application = get_asgi_application()
def get_websocket_application():
"""延遲導入WebSocket路由(避免循環導入)"""
from myapp_infra.websocket.routing import websocket_urlpatterns
return AuthMiddlewareStack(URLRouter(websocket_urlpatterns))
# 協議路由:區分HTTP和WebSocket請求
application = ProtocolTypeRouter(
{
"http": django_application,
"websocket": get_websocket_application(),
}
)
創建聊天界面模板
以Vue3界面為例,代碼實現了一個 WebSocket 客戶端界面,功能包括:
- 連接控制:顯示連接狀態、開啟/關閉 WebSocket 連接。
- 消息發送:輸入消息內容并選擇接收人(單發或群發),點擊發送按鈕通過 WebSocket 發送消息。
- 消息接收與展示:監聽 WebSocket 返回的數據,解析不同類型的消息(如單發、群發、系統通知)并在右側列表中倒序展示。
- 用戶列表獲取:頁面加載時獲取用戶列表用于選擇消息接收人。

實現效果
使用 ASGI 運行項目
# 開發環境啟動
uvicorn mysite.asgi:application --reload
使用不同的瀏覽器,分別登錄不同的用戶,實現實時互發消息。

二、生產環境部署
Nginx 配置
使用 Nginx 作為反向代理,添加以下配置來處理 WebSocket 連接。這段配置告訴 Nginx 如何正確處理 WebSocket 升級請求。
location /ws/ {
proxy_pass http://backend;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
}
使用 Gunicorn+Uvicorn 部署
Gunicorn(全稱 Green Unicorn)是一個用于 UNIX 的 高性能 Python WSGI HTTP 服務器。Gunicorn只能用于 Linux 系統,Windows上無法使用。
安裝
pip install gunicorn
# 查看版本
gunicorn -v
gunicorn -h
Gunicorn 作為進程管理器,配合 Uvicorn 工作進程處理 ASGI 應用
- -w 工作進程的數量。默認啟動 1 個 Worker 進程
- -k 要運行的工作進程類型。
- -b 指定要綁定的服務器套接字。
# 基本啟動命令
gunicorn -b 0.0.0.0:8000 -k uvicorn.workers.UvicornWorker mysite.asgi:application
優化 Worker 數量
Gunicorn Worker 數量的設置對性能影響很大,推薦的公式是:(CPU核心數 × 2) + 1。我們可以編寫一個啟動腳本來自動計算最佳 Worker 數量
#!/bin/bash
# run_gunicorn.sh
# 計算最佳 Worker 數
CORES=$(nproc)
WORKERS=$((CORES * 2 + 1))
# 限制最大Worker數(避免內存不足)
MAX_WORKERS=8
if [ $WORKERS -gt $MAX_WORKERS ]; then
WORKERS=$MAX_WORKERS
fi
# 啟動命令
gunicorn -b 0.0.0.0:8000 \
--workers $WORKERS \
--max-requests 1000 \ # 預防內存泄漏
--timeout 120 \ # 超時控制
--keep-alive 5 \ # Keep-Alive
-k uvicorn.workers.UvicornWorker \
mysite.asgi:application
添加執行權限并運行
chmod +x run_gunicorn.sh
./run_gunicorn.sh
您正在閱讀的是《Django從入門到實戰》專欄!關注不迷路~

本文將使用 Django Channels 構建一個多用戶實時聊天室,并詳細介紹如何在生產環境中部署 WebSocket 應用。
浙公網安備 33010602011771號