WebSocket 連接維護(hù):從基礎(chǔ)實(shí)現(xiàn)到生產(chǎn)級(jí)解決方案
在傳統(tǒng)的HTTP協(xié)議中,客戶端必須主動(dòng)發(fā)起請(qǐng)求才能獲取數(shù)據(jù)。而實(shí)時(shí)應(yīng)用場(chǎng)景如在線聊天、股票行情、多人協(xié)作等,需要服務(wù)端能主動(dòng)推送數(shù)據(jù)。WebSocket協(xié)議它具有:
- ??雙向通信??:客戶端和服務(wù)端可以同時(shí)發(fā)送消息
- ??低延遲??:建立連接后消息即時(shí)傳輸
- ??高效??:相比HTTP輪詢節(jié)省大量帶寬
這是一個(gè)原生的websocket封裝實(shí)現(xiàn)主要用于了解底層的實(shí)現(xiàn),實(shí)際生產(chǎn)中推薦用 Socket.IO、SockJS這種第三方插件去做。自己手寫原生有開發(fā)成本,兼容性,??維護(hù)成本高等缺點(diǎn)。
一、技術(shù)架構(gòu)概述
要實(shí)現(xiàn)一個(gè)完整的websocket,需要一個(gè)websocket實(shí)例,鏈接方法,監(jiān)聽消息,發(fā)送消息,斷開連接,心跳維護(hù)鏈接。這里主要講前端的封裝,后端可以用express實(shí)現(xiàn)一個(gè)簡單的node服務(wù)。后端的方法和前端一樣也是有鏈接,消息監(jiān)聽,發(fā)送消息,關(guān)閉連接,在加上后端的定時(shí)推送。
完整的 WebSocket 實(shí)現(xiàn)需要包含以下核心模塊:
- WebSocket 實(shí)例管理
- 連接建立與狀態(tài)維護(hù)
- 雙向消息通信機(jī)制
- 心跳保活策略
- 異常恢復(fù)機(jī)制
本文主要聚焦前端實(shí)現(xiàn),后端示例采用 Node.js + Express 構(gòu)建。值得注意的是,生產(chǎn)環(huán)境的后端實(shí)現(xiàn)還需要考慮集群部署、消息廣播等進(jìn)階功能。
二、前端基礎(chǔ)實(shí)現(xiàn)
WebSocket 是瀏覽器原生支持的通信協(xié)議,以下是一個(gè)功能完備的基礎(chǔ)實(shí)現(xiàn):
<!DOCTYPE html>
<html>
<head>
<title>WebSocket 通信演示</title>
<style>
#output {
margin-top: 20px;
border: 1px solid #eee;
padding: 10px;
min-height: 100px;
}
</style>
</head>
<body>
<button onclick="connect()">建立連接</button>
<button onclick="sendMessage()">發(fā)送消息</button>
<div id="output"></div>
<script>
// WebSocket 實(shí)例與狀態(tài)管理
let socket;
let heartbeatTimer;
const HEARTBEAT_INTERVAL = 30000; // 心跳間隔30秒
// 建立WebSocket連接
function connect() {
socket = new WebSocket('ws://localhost:3000');
// 連接成功回調(diào)
socket.onopen = () => {
log('連接已建立');
startHeartbeat();
};
// 消息接收處理
socket.onmessage = (event) => {
const data = parseMessage(event.data);
if (data.type !== 'heartbeat') {
log(`收到消息: ${event.data}`);
}
};
// 連接關(guān)閉處理
socket.onclose = () => {
clearInterval(heartbeatTimer);
log('連接已斷開');
// 可在此處添加自動(dòng)重連邏輯
};
// 錯(cuò)誤處理
socket.onerror = (error) => {
log(`連接錯(cuò)誤: ${error.message}`);
};
}
// 消息發(fā)送封裝
function sendMessage() {
if (!socket || socket.readyState !== WebSocket.OPEN) {
return alert('請(qǐng)先建立連接');
}
const message = prompt('請(qǐng)輸入消息內(nèi)容');
if (message) {
socket.send(message);
log(`已發(fā)送: ${message}`);
}
}
// 心跳保活機(jī)制
function startHeartbeat() {
clearInterval(heartbeatTimer); // 清除舊定時(shí)器
heartbeatTimer = setInterval(() => {
if (socket.readyState === WebSocket.OPEN) {
const heartbeatMsg = JSON.stringify({
type: 'heartbeat',
timestamp: Date.now()
});
socket.send(heartbeatMsg);
}
}, HEARTBEAT_INTERVAL);
}
// 消息解析輔助函數(shù)
function parseMessage(data) {
try {
return JSON.parse(data);
} catch {
return { type: 'text', content: data };
}
}
// 日志輸出
function log(message) {
const output = document.getElementById('output');
output.innerHTML += `<p>${new Date().toLocaleTimeString()}: ${message}</p>`;
output.scrollTop = output.scrollHeight;
}
</script>
</body>
</html>
關(guān)鍵實(shí)現(xiàn)說明:
- ??心跳機(jī)制??:通過定時(shí)發(fā)送特殊格式的心跳包,保持連接活躍并檢測(cè)連接狀態(tài)
- ??狀態(tài)管理??:通過
readyState判斷連接狀態(tài),避免在不可用狀態(tài)下操作 - ??錯(cuò)誤處理??:統(tǒng)一處理連接錯(cuò)誤和異常情況
- ??消息分類??:通過消息類型字段區(qū)分心跳包和業(yè)務(wù)消息
二、初步封裝
核心模塊封裝
- 連接管理機(jī)制
- 自動(dòng)重連策略:實(shí)現(xiàn)指數(shù)退避重連算法,上限5次重連嘗試,最大間隔10秒
- 連接狀態(tài)隔離:通過閉包封裝WebSocket實(shí)例,防止外部狀態(tài)污染
- 統(tǒng)一連接入口:對(duì)外暴露connect方法支持DOM元素綁定和自動(dòng)重連
- 消息保障體系
- 消息緩沖隊(duì)列:在連接未就緒時(shí)暫存消息(pendingMessages)
- 消息可靠傳輸:通過flushPendingMessages方法確保連接建立后順序發(fā)送緩沖消息
- 傳輸狀態(tài)檢測(cè):實(shí)時(shí)校驗(yàn)readyState狀態(tài)機(jī),保證消息投遞有效性
- 連接健康監(jiān)測(cè)
- 心跳檢測(cè)機(jī)制:每30秒發(fā)送心跳包維持長連接
- 異常斷開處理:自動(dòng)清除心跳定時(shí)器防止內(nèi)存泄漏
- 連接狀態(tài)同步:通過onopen/onclose事件維護(hù)重連計(jì)數(shù)器
- 數(shù)據(jù)渲染層
- 響應(yīng)式數(shù)據(jù)更新:采用統(tǒng)一的數(shù)據(jù)接收處理器(onmessage)
- 數(shù)據(jù)格式化輸出:使用JSON.stringify進(jìn)行數(shù)據(jù)美化展示
- DOM智能更新:通過elment引用緩存實(shí)現(xiàn)精準(zhǔn)內(nèi)容更新
export default function ws() { let useWebsocket; // const receivedata = {} // let elment; // 存儲(chǔ) DOM 元素 let reconnectAttempts = 0; // 重連次數(shù) const MAX_RECONNECTS = 5; // const pendingMessages = []; // 暫存消息的隊(duì)列 function connect(el) { if (el) elment = el; if (useWebsocket) { console.warn('WebSocket 已存在,請(qǐng)勿重復(fù)連接'); return; } else { useWebsocket = new WebSocket('ws://localhost:3000'); } useWebsocket.onopen = () => { console.log('連接成功'); reconnectAttempts = 0; // 連接成功后發(fā)送所有暫存消息 flushPendingMessages(); startHeartbeat(); }; useWebsocket.onmessage = (event) => { console.log('收到消息: ' + event.data); const data = JSON.parse(event.data) receivedata[data.type] = data.data console.log('receivedata--', receivedata); changeInnerhtml(receivedata); // 直接渲染最新數(shù)據(jù) }; useWebsocket.onclose = () => { console.log('連接關(guān)閉'); if (reconnectAttempts < MAX_RECONNECTS) { const delay = Math.min(1000 * (reconnectAttempts + 1), 10000); console.log(`連接斷開,${delay}ms后嘗試重連...`); setTimeout(() => { reconnectAttempts++; connect(); }, delay); } else { console.log('達(dá)到最大重連次數(shù),停止嘗試'); } }; } // 心跳配置 const HEARTBEAT_INTERVAL = 30000; // 30秒 let heartbeatTimer; function startHeartbeat() { heartbeatTimer = setInterval(() => { if (useWebsocket.readyState === WebSocket.OPEN) { useWebsocket.send(JSON.stringify({ type: 'heartbeat', timestamp: Date.now() })); } }, HEARTBEAT_INTERVAL); } //發(fā)送消息 /* function sendMessage(message) { if (useWebsocket.readyState === WebSocket.OPEN) { useWebsocket.send(JSON.stringify({ type: 'message', data: message })); } else { log('WebSocket 連接未建立,無法發(fā)送消息'); } } */ // 發(fā)送所有暫存消息 const flushPendingMessages = () => { if (useWebsocket.readyState !== WebSocket.OPEN) return; while (pendingMessages.length > 0) { const message = pendingMessages.shift(); if (message) { console.log('延遲發(fā)送的消息---', message); useWebsocket.send(message); } } }; // 發(fā)送消息(核心改進(jìn)) const sendMessage = (message) => { console.log('準(zhǔn)備發(fā)送消息---', message); // 統(tǒng)一通過readyState判斷 if (useWebsocket.readyState === WebSocket.OPEN) { console.log('立即發(fā)送消息---', message); useWebsocket.send(message); } else { console.log('消息進(jìn)入隊(duì)列---', message); pendingMessages.push(message); } }; //關(guān)閉連接 function closeConnection() { if (useWebsocket) { useWebsocket.close(); useWebsocket = null; } } //在頁面輸出結(jié)果 function changeInnerhtml(data) { if (!elment) return; const text = JSON.stringify(data, null, 2); // 格式化 elment.innerHTML += `<p>${text}</p>`; } return { connect, sendMessage, closeConnection, receivedata } }
自動(dòng)連接使用實(shí)例:
<!DOCTYPE html>
<html>
<head>
<title>WebSocket 測(cè)試</title>
</head>
<body>
<button id="connect-btn">連接</button>
<button id="send-btn">發(fā)送消息</button>
<div id="output"></div>
<script type="module">
//自動(dòng)連接
import ws from './websocket.js';
const { connect, sendMessage } = ws();
const connectBtn = document.querySelector('#connect-btn');
const sendBtn = document.querySelector('#send-btn');
const output = document.querySelector('#output');
connect(output);
sendMessage('Hello, World!');
sendMessage('發(fā)送消息');
</script>
</body>
</html>
手動(dòng)連接點(diǎn)擊事件:
<!DOCTYPE html>
<html>
<head>
<title>WebSocket 測(cè)試</title>
</head>
<body>
<button id="connect-btn">連接</button>
<button id="send-btn">發(fā)送消息</button>
<div id="output"></div>
<script type="module">
import ws from './websocket.js';
const websocket = ws(); // 先調(diào)用函數(shù)初始化
const connectBtn = document.querySelector('#connect-btn');
const sendBtn = document.querySelector('#send-btn');
const output = document.querySelector('#output');
//建立連接
connectBtn.addEventListener('click', () => {
websocket.connect(output);
});
//發(fā)送消息
sendBtn.addEventListener('click', () => {
const message = prompt('輸入要發(fā)送的消息');
if (message) websocket.sendMessage(message);
});
</script>
</body>
</html>
三、服務(wù)器架構(gòu)設(shè)計(jì)
本方案采用 Express + ws 模塊構(gòu)建 WebSocket 服務(wù)器,具有以下特點(diǎn):
- ??雙協(xié)議支持??:同時(shí)支持 HTTP 和 WebSocket 協(xié)議
- ??連接管理??:實(shí)時(shí)跟蹤客戶端連接狀態(tài)
- ??消息廣播??:支持向所有客戶端推送消息
- ??類型化消息??:結(jié)構(gòu)化數(shù)據(jù)格式,便于前端處理
- ??定時(shí)推送??:服務(wù)端主動(dòng)推送機(jī)制
const express = require('express');
const WebSocket = require('ws');
// 創(chuàng)建 Express 應(yīng)用
const app = express();
// 直接使用 app.listen() 啟動(dòng)服務(wù)器并監(jiān)聽端口
const PORT = 3000;
const server = app.listen(PORT, () => {
console.log(`服務(wù)器運(yùn)行在 http://localhost:${PORT}`);
console.log(`WebSocket 運(yùn)行在 ws://localhost:${PORT}`);
});
// 創(chuàng)建 WebSocket 服務(wù)器,附加到 HTTP 服務(wù)器上
const wss = new WebSocket.Server({ server });
// 存儲(chǔ)所有連接的客戶端
const clients = new Set();
// WebSocket 連接處理
wss.on('connection', (ws) => {
console.log('新的客戶端連接');
// 將新客戶端添加到集合中
clients.add(ws);
// 當(dāng)收到客戶端消息時(shí)
ws.on('message', (message) => {
console.log(`收到消息: ${message}`);
console.log('發(fā)送消息給所有客戶端clients---', clients);
// 廣播消息給所有客戶端
clients.forEach(client => {
console.log(client === ws, client.readyState === WebSocket.OPEN);
if (client.readyState === WebSocket.OPEN) {
client.send(`服務(wù)器轉(zhuǎn)發(fā): ${message}`);
}
});
});
// 當(dāng)客戶端斷開連接時(shí)
ws.on('close', () => {
console.log('客戶端斷開連接');
clients.delete(ws);
});
// 1. 推送歡迎消息
ws.send(JSON.stringify({
type: 'welcome',
data: '連接服務(wù)器成功',
timestamp: Date.now()
}));
// 2. 推送系統(tǒng)狀態(tài)
ws.send(JSON.stringify({
type: 'system_status',
data: { cpu: 45, memory: 80 },
timestamp: Date.now()
}));
// 3. 定時(shí)推送通知
setInterval(() => {
ws.send(JSON.stringify({
type: 'notification',
data: '您有1條新消息',
timestamp: Date.now()
}));
}, 5000);
});
// Express 路由
app.get('/', (req, res) => {
res.send(`
<h1>WebSocket 服務(wù)器</h1>
<p>打開瀏覽器控制臺(tái)并運(yùn)行以下代碼測(cè)試:</p>
<pre>
const ws = new WebSocket('ws://localhost:3000');
ws.onmessage = (event) => console.log('收到消息:', event.data);
ws.onopen = () => ws.send('你好,服務(wù)器!');
</pre>
<p>當(dāng)前連接數(shù): ${clients.size}</p>
`);
});
這個(gè)實(shí)現(xiàn)提供了完整的 WebSocket 服務(wù)器解決方案,包括連接管理、消息處理、錯(cuò)誤處理和性能監(jiān)控等功能,可直接用于和前面前端代碼進(jìn)行聯(lián)調(diào)。

浙公網(wǎng)安備 33010602011771號(hào)