Python 基于隊(duì)列實(shí)現(xiàn) tcp socket 連接池
連接池實(shí)現(xiàn)
socket_pool.py
# -*- coding:utf-8 -*-
import socket
import time
import threading
import os
import logging
import traceback
from queue import Queue, Empty
_logger = logging.getLogger('mylogger')
class SocketPool:
def __init__(self, host, port, min_connections=10, max_connections=10):
'''
初始化Socket連接池
:param host: 目標(biāo)主機(jī)地址
:param port: 目標(biāo)端口號
:param min_connections: 最小連接數(shù)
:param max_connections: 最大連接數(shù)
'''
self.host = host
self.port = port
self.min_connections = min_connections
self.max_connections = max_connections
self.busy_sockets_dict = {} # 存放從連接池取出的socket的id
self._sock_lock = threading.Lock() # 線程鎖保證計(jì)數(shù)正確
self._pool = Queue(max_connections) # 基于線程安全的隊(duì)列存儲連接
self._lock = threading.Lock() # 線程鎖保證資源安全:
self._init_pool() # 預(yù)創(chuàng)建連接
self._start_health_check() # 啟動連接健康檢查線程
def _init_pool(self):
'''預(yù)創(chuàng)建連接并填充到池中'''
for _ in range(self.min_connections):
sock = self._create_socket()
self._pool.put(sock)
def _create_socket(self):
'''創(chuàng)建新的Socket連接'''
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
sock.connect((self.host, self.port))
return sock
except socket.error as e:
raise ConnectionError(f'Failed to connect: {e}') # 連接失敗拋出異常
def _start_health_check(self):
'''啟動后臺線程定期檢查連接有效性'''
def check():
while True:
with self._lock:
for _ in range(self._pool.qsize()):
sock = self._pool.get()
self.busy_sockets_dict[sock] = 1
try:
sock.send(b'PING<END>') # 發(fā)送心跳包驗(yàn)證連接狀態(tài)
# 以下 11 為服務(wù)端返回?cái)?shù)據(jù)字節(jié)長度,不能亂寫,否則會導(dǎo)致獲取非健康檢查響應(yīng)報文數(shù)據(jù)存在多余內(nèi)容,不符合格式,從而導(dǎo)致數(shù)據(jù)解析問題
sock.recv(11)
self._pool.put(sock)
self.busy_sockets_dict.pop(sock)
except (socket.error, ConnectionResetError):
_logger.error('socket連接健康檢查出錯:%s, 關(guān)閉失效連接并創(chuàng)建新連接替換' % traceback.format_exc())
sock.close() # 關(guān)閉失效連接并創(chuàng)建新連接替換
self.busy_sockets_dict.pop(sock)
new_sock = self._create_socket()
self._pool.put(new_sock)
# 如果sock數(shù)量小于最小數(shù)量,則補(bǔ)充
for _ in range(0, self.min_connections - self._pool.qsize()):
new_sock = self._create_socket()
self._pool.put(new_sock)
time.sleep(60) # 每60秒檢查一次
threading.Thread(target=check, daemon=True).start()
def get_connection(self):
'''
從池中獲取一個可用連接
:return: socket對象
'''
with self._sock_lock:
if self._pool.empty():
if len(self.busy_sockets_dict.keys()) < self.max_connections:
new_sock = self._create_socket()
self.busy_sockets_dict[new_sock] = 1
return new_sock
else:
raise Empty('No available connections in pool')
else:
try:
sock = self._pool.get(block=False)
self.busy_sockets_dict[sock] = 1
return sock
except Exception:
_logger.error('獲取socket連接出錯:%s' % traceback.format_exc())
raise
def release_connection(self, sock):
'''
將連接歸還到池中
:param sock: 待歸還的socket對象
'''
if not sock._closed:
self._pool.put(sock)
if sock in self.busy_sockets_dict:
self.busy_sockets_dict.pop(sock)
def close_all(self):
'''關(guān)閉池中所有連接'''
while not self._pool.empty():
sock = self._pool.get()
sock.close()
self.busy_sockets_dict.pop(sock.id)
self.busy_sockets_dict = {} # 兜底
host = os.environ.get('MODBUS_TCP_SERVER_HOST', '127.0.0.1')
port = int(os.environ.get('MODBUS_TCP_SERVER_PORT', '9000'))
min_connections = int(os.environ.get('DJANGO_SOCKET_POOL_MAX_CONNECTIONS', '10'))
max_connections = int(os.environ.get('DJANGO_SOCKET_POOL_MAX_CONNECTIONS', '100'))
socketPool = SocketPool(host, port, min_connections, max_connections)
使用連接池
from socket_pool import socketPool
def send_socket_msg(data):
global socketPool
try:
sock = None
# 獲取連接(支持超時控制)
sock = socketPool.get_connection()
# 發(fā)送數(shù)據(jù)
sock.sendall(data.encode('utf-8'))
except Exception:
error_msg = '發(fā)送消息出錯:%s' % traceback.format_exc()
_logger.error(error_msg)
if sock is not None:
sock.close()
socketPool.release_connection(sock)
return send_socket_msg(data)
response = ''
try:
while True:
chunk = sock.recv(4096)
chunk = chunk.decode('utf-8')
response += chunk
if response.endswith('<END>'):
response = response.rstrip('<END>')
return {'success':True, 'message':response}
except Exception:
error_msg = '獲取消息出錯:%s' % traceback.format_exc()
_logger.error(error_msg)
return {'success':False, 'message': error_msg}
finally:
# 必須歸還連接!
socketPool.release_connection(sock)
作者:授客
微信/QQ:1033553122
全國軟件測試QQ交流群:7156436
Git地址:https://gitee.com/ishouke
友情提示:限于時間倉促,文中可能存在錯誤,歡迎指正、評論!
作者五行缺錢,如果覺得文章對您有幫助,請掃描下邊的二維碼打賞作者,金額隨意,您的支持將是我繼續(xù)創(chuàng)作的源動力,打賞后如有任何疑問,請聯(lián)系我!!!
微信打賞
支付寶打賞 全國軟件測試交流QQ群
浙公網(wǎng)安備 33010602011771號