【workerman】uniapp+thinkPHP5使用GatewayWorker實(shí)現(xiàn)實(shí)時(shí)通訊
前言
之前公司需要一個(gè)內(nèi)部的通訊軟件,就叫我做一個(gè)。通訊軟件嘛,就離不開通訊了,然后我就想到了長(zhǎng)連接。這里本人用的是GatewayWorker框架。
什么是GatewayWorker框架?
GatewayWorker是基于Workerman開發(fā)的一套TCP長(zhǎng)連接的應(yīng)用框架,實(shí)現(xiàn)了單發(fā)、群發(fā)、廣播等接口,內(nèi)置了mysql類庫(kù),GatewayWorker分為Gateway進(jìn)程和Worker進(jìn)程,支持分布式部署,能夠支持大量的連接數(shù)。
GatewayWorker的工作原理

1、啟動(dòng)所有進(jìn)程(GatewayWorker、business、register)
2、GatewayWorker和business進(jìn)程啟動(dòng)后向register請(qǐng)求注冊(cè)
3、register服務(wù)收到注冊(cè)請(qǐng)求后,把所有Gateway的通訊地址保存在內(nèi)存中同時(shí)把內(nèi)存中所有的Gateway的通訊地址發(fā)給business
4、business進(jìn)程得到所有的Gateway內(nèi)部通訊地址后進(jìn)行連接GatewayWorker
5、如果有新的GatewayWorker服務(wù)進(jìn)行register,則將新的Gateway內(nèi)部通訊地址列表將廣播給所有buiness并建立連接
6、如果有GatewayWorker下線,則Register服務(wù)會(huì)收到通知,會(huì)將該GatewayWorker內(nèi)部通訊地址刪除,然后廣播新的內(nèi)部通訊地址列表給所有business
7、此時(shí)GatewayWorker與buiness已經(jīng)建立起長(zhǎng)連接
8、客戶端的事件及接受的數(shù)據(jù)全部由GatewayWorker轉(zhuǎn)發(fā)給business進(jìn)行處理。
目錄結(jié)構(gòu)
├── Applications // 項(xiàng)目應(yīng)用目錄
│ └── YourAppGateway // 建立一個(gè)存放workman的目錄,名字隨意
│ ├── Events.php // 處理主邏輯業(yè)務(wù)的文件,管理onConnect onMessage onClose 等方法
│ ├── start_gateway.php // gateway進(jìn)程啟動(dòng)腳本、配置服務(wù)注冊(cè)地址、端口號(hào)、進(jìn)程數(shù)等參數(shù)
│ ├── start_businessworker.php // 用戶進(jìn)程的啟動(dòng)腳本
│ └── start_register.php // 注冊(cè)服務(wù)的啟動(dòng)腳本
│
├── start.php // 全局啟動(dòng)腳本,此腳本會(huì)依次加載Applications/YourAppGateway/start*.php對(duì)所有腳本進(jìn)行啟動(dòng)
│
└── vendor // GatewayWorker框架和Workerman框架源碼目錄
GatewayWorker實(shí)現(xiàn)
以寶塔為例
1.安裝composer
登錄SSH終端,使用以下命令下載Composer的安裝腳本:
php -r "copy('https://getcomposer.org/installer', 'composer-setup.php');"
運(yùn)行下面的命令來安裝Composer:
php composer-setup.php --install-dir=/usr/local/bin --filename=composer
檢查composer版本
composer -v //檢查composer版本
2.安裝workerman
在項(xiàng)目根目錄打開寶塔終端,輸入以下命令安裝workman
composer require topthink/think-worker
3.安裝GatewayWorker
在項(xiàng)目根目錄打開寶塔終端,輸入以下命令安裝GatewayWorker
composer require workerman/gateway-worker
4.實(shí)現(xiàn)代碼
可以選擇官方提供的demo 鏈接:http://www.workerman.net/download/GatewayWorker.zip
或者使用我根據(jù)demo改編而來的
先在項(xiàng)目應(yīng)用目錄(一般是Applications)下新建一個(gè)文件存儲(chǔ)以下四個(gè)進(jìn)程文件
start_gateway.php
<?php
use \Workerman\Worker;
use \Workerman\WebServer;
use \GatewayWorker\Gateway;
use \GatewayWorker\BusinessWorker;
use \Workerman\Autoloader;
// 自動(dòng)加載類
require_once __DIR__ . '/../../vendor/autoload.php';
// gateway 進(jìn)程,這里使用Text協(xié)議,可以用telnet測(cè)試
$gateway = new Gateway("websocket://0.0.0.0:8283");
// gateway名稱,status方便查看
$gateway->name = 'YourAppGateway';
// gateway進(jìn)程數(shù)
$gateway->count = 200;
// 本機(jī)ip,分布式部署時(shí)使用內(nèi)網(wǎng)ip
$gateway->lanIp = '127.0.0.1';
// 內(nèi)部通訊起始端口,假如$gateway->count=4,起始端口為4000
// 則一般會(huì)使用4000 4001 4002 4003 4個(gè)端口作為內(nèi)部通訊端口
$gateway->startPort = 2900;
// 服務(wù)注冊(cè)地址、端口
$gateway->registerAddress = '127.0.0.1:1237';
// 心跳間隔
//$gateway->pingInterval = 10;
// 心跳數(shù)據(jù)
//$gateway->pingData = '{"type":"ping"}';
/*
// 當(dāng)客戶端連接上來時(shí),設(shè)置連接的onWebSocketConnect,即在websocket握手時(shí)的回調(diào)
$gateway->onConnect = function($connection)
{
$connection->onWebSocketConnect = function($connection , $http_header)
{
// 可以在這里判斷連接來源是否合法,不合法就關(guān)掉連接
// $_SERVER['HTTP_ORIGIN']標(biāo)識(shí)來自哪個(gè)站點(diǎn)的頁(yè)面發(fā)起的websocket鏈接
if($_SERVER['HTTP_ORIGIN'] != 'http://kedou.workerman.net')
{
$connection->close();
}
// onWebSocketConnect 里面$_GET $_SERVER是可用的
// var_dump($_GET, $_SERVER);
};
};
*/
// 如果不是在根目錄啟動(dòng),則運(yùn)行runAll方法
if(!defined('GLOBAL_START'))
{
Worker::runAll();
}
start_businessworker.php
<?php
use Workerman\Worker;
use Workerman\WebServer;
use GatewayWorker\Gateway;
use GatewayWorker\BusinessWorker;
use Workerman\Autoloader;
// 自動(dòng)加載類
require_once __DIR__ . '/../../vendor/autoload.php';
// bussinessWorker 進(jìn)程
$worker = new BusinessWorker();
// worker名稱
$worker->name = 'YourAppBusinessWorker';
// bussinessWorker進(jìn)程數(shù)量
$worker->count = 200;
// 服務(wù)注冊(cè)地址、端口
$worker->registerAddress = '127.0.0.1:1237';
// 如果不是在根目錄啟動(dòng),則運(yùn)行runAll方法
if(!defined('GLOBAL_START'))
{
Worker::runAll();
}
start_register.php
<?php
use \Workerman\Worker;
use \GatewayWorker\Register;
// 自動(dòng)加載類
require_once __DIR__ . '/../../vendor/autoload.php';
// register 必須是text協(xié)議 1237為端口
$register = new Register('text://0.0.0.0:1237');
// 如果不是在根目錄啟動(dòng),則運(yùn)行runAll方法
if(!defined('GLOBAL_START'))
{
Worker::runAll();
}
Events.php
<?php
use \GatewayWorker\Lib\Gateway;
/**
* 主邏輯
* 主要是處理 onConnect onMessage onClose 三個(gè)方法
*/
class Events
{
/**
* 當(dāng)客戶端連接時(shí)觸發(fā)
*
* @param int $client_id 連接id
*/
public static function onConnect($client_id)
{
echo "【新的客戶端鏈接】:client_id:".$client_id.PHP_EOL;
// 向當(dāng)前client_id發(fā)送數(shù)據(jù)
Gateway::sendToClient($client_id, "");
// 向所有人發(fā)送
$data=[
'client_id'=>$client_id,
'message'=>'歡迎'.$client_id.'登錄!',
'data'=>[]
];
Gateway::sendToAll(json_encode($data));
// Gateway::sendToAll("$client_id login\r\n");
}
/**
* 當(dāng)客戶端發(fā)來消息時(shí)觸發(fā)
* @param int $client_id 連接id
* @param mixed $message 具體消息
*/
public static function onMessage($client_id, $message){
$data=[
'client_id'=>$client_id,
'message'=>$client_id.'說:'.$result['message'],
'data'=>$message
];
Gateway::sendToAll(json_encode($data));
// 向所有人發(fā)送
// Gateway::sendToAll("$client_id said $message\r\n");
}
/**
* 當(dāng)用戶斷開連接時(shí)觸發(fā)
* @param int $client_id 連接id
*/
public static function onClose($client_id)
{
// 向所有人發(fā)送
// GateWay::sendToAll("$client_id 退出了!\r\n");
}
}
隨后在項(xiàng)目的根目錄下新建一個(gè)啟動(dòng)文件
start_all_workman.php
<?php
ini_set('display_errors', 'on');
use Workerman\Worker;
if(strpos(strtolower(PHP_OS), 'win') === 0)
{
exit("start.php not support windows, please use start_for_win.bat\n");
}
// 檢查擴(kuò)展
if(!extension_loaded('pcntl'))
{
exit("Please install pcntl extension. See http://doc3.workerman.net/appendices/install-extension.html\n");
}
if(!extension_loaded('posix'))
{
exit("Please install posix extension. See http://doc3.workerman.net/appendices/install-extension.html\n");
}
// 標(biāo)記是全局啟動(dòng)
define('GLOBAL_START', 1);
require_once __DIR__ . '/vendor/autoload.php';
// 加載所有Applications/*/start.php,以便啟動(dòng)所有服務(wù)
foreach(glob(__DIR__.'/application/此處請(qǐng)改成你自己命名存放workman的目錄名/start*.php') as $start_file)
{
require_once $start_file;
}
// 運(yùn)行所有服務(wù)
Worker::runAll();
注意開啟端口后記得去放行端口!!!除了寶塔放行以外,你的服務(wù)器(阿里云/騰訊云等等)也記得要去放行!!!
啟動(dòng)workman
在項(xiàng)目根目錄下打開終端,輸入php start_all_workman.php start -d ,開啟守護(hù)進(jìn)程,如果出現(xiàn)一下頁(yè)面即成功開啟

如想關(guān)閉workman進(jìn)程則輸入php start_all_workman.php stop 進(jìn)行關(guān)閉
GatewayWorker使用
如果你的網(wǎng)站使用的是Https協(xié)議的話,WebSocket必須使用wss協(xié)議
但是wss協(xié)議不支持IP:端口的形式,而是只能寫域名+url
所以為了解決使用https協(xié)議而WebSocket不能連接的問題,可以使用Nginx進(jìn)行反向代理
在網(wǎng)站配置文件的server下加入以下代碼
location /connectWorkman(名字隨你取,別跟其他反向代理重名就行)
{
proxy_pass http://127.0.0.1:8283;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "Upgrade";
proxy_set_header X-Real-IP $remote_addr;
}
前端使用(uniapp)
init() {
SocketTask = uni.connectSocket({
url: 'wss://chat.gdpaimaihui.com/auction', //正式
header: {
'content-type': 'application/json'
},
success: function(res) {
console.log('WebSocket連接創(chuàng)建', res);
},
fail: function(err) {
uni.showToast({
title: '網(wǎng)絡(luò)異常!',
icon: 'none'
});
console.log(err);
}
});
//websocket監(jiān)聽事件
SocketTask.onOpen((res) => {
socketOpen = true
canReconnect = true
console.log('監(jiān)聽 WebSocket 連接打開事件。', res);
//websocket連接后可以啟動(dòng)個(gè)定時(shí)器,每隔一段時(shí)間進(jìn)行心跳一次,以防心跳停止斷開連接
this.timer = setInterval(() => {
SocketTask.send({
data: '心跳',
success() {
// console.log('發(fā)送心跳成功');
}
})
}, 2000)
});
SocketTask.onError((onError) => {
console.log('監(jiān)聽 WebSocket 錯(cuò)誤。錯(cuò)誤信息', onError);
socketOpen = false;
if (canReconnect) {
this.reconnect()
canReconnect = false
}
});
SocketTask.onMessage((res) => {
console.log('監(jiān)聽WebSocket接受到服務(wù)器的消息事件。服務(wù)器返回的消息', res);
});
},
//重新連接
reconnect() {
if (!socketOpen) {
let count = 0;
reconnectInterval = setInterval(() => {
console.log("正在嘗試重連")
uni.showToast({
title: '正在嘗試重連',
icon: 'none'
})
this.init();
count++
console.log();
//重連一定次數(shù)后就不再重連
if (count >= reconnectTimes) {
clearInterval(reconnectInterval)
uni.showToast({
title: '網(wǎng)絡(luò)異常或服務(wù)器錯(cuò)誤',
icon: 'none'
})
}
}, reconnectDelay)
}
}
上述為之前給公司做內(nèi)部通訊軟件時(shí)個(gè)人整理內(nèi)容,水平有限,如有錯(cuò)誤之處,望各位園友不吝賜教!如果覺得不錯(cuò),請(qǐng)點(diǎn)擊推薦和關(guān)注!謝謝~??????? [鮮花][鮮花][鮮花]

浙公網(wǎng)安備 33010602011771號(hào)