<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      Python 基于隊(duì)列實(shí)現(xiàn) tcp socket 連接池

      連接池實(shí)現(xiàn)

      socket_pool.py

      # -*- coding:utf-8 -*-
      import socket
      import time
      import threading
      import os
      import logging
      import traceback
      from queue import Queue, Empty
      
      _logger = logging.getLogger('mylogger')
      
      class SocketPool:
          def __init__(self, host, port, min_connections=10, max_connections=10):
              '''
              初始化Socket連接池
              :param host: 目標(biāo)主機(jī)地址
              :param port: 目標(biāo)端口號
              :param min_connections: 最小連接數(shù)
              :param max_connections: 最大連接數(shù)
              '''
              self.host = host
              self.port = port
              self.min_connections = min_connections
              self.max_connections = max_connections
              self.busy_sockets_dict = {} # 存放從連接池取出的socket的id
              self._sock_lock = threading.Lock()  # 線程鎖保證計(jì)數(shù)正確
              self._pool = Queue(max_connections)  # 基于線程安全的隊(duì)列存儲連接
              self._lock = threading.Lock()        # 線程鎖保證資源安全:
              self._init_pool()                    # 預(yù)創(chuàng)建連接
              self._start_health_check()           # 啟動連接健康檢查線程
      
          def _init_pool(self):
              '''預(yù)創(chuàng)建連接并填充到池中'''
              
              for _ in range(self.min_connections):
                  sock = self._create_socket()
                  self._pool.put(sock)
      
          def _create_socket(self):
              '''創(chuàng)建新的Socket連接'''
              
              sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
              try:
                  sock.connect((self.host, self.port))
                  return sock
              except socket.error as e:
                  raise ConnectionError(f'Failed to connect: {e}')  # 連接失敗拋出異常
      
          def _start_health_check(self):
              '''啟動后臺線程定期檢查連接有效性'''
              
              def check():
                  while True:
                      with self._lock:
                          for _ in range(self._pool.qsize()):
                              sock = self._pool.get()
                              self.busy_sockets_dict[sock] = 1
                              try:
                                  sock.send(b'PING<END>')  # 發(fā)送心跳包驗(yàn)證連接狀態(tài)
                                  # 以下 11 為服務(wù)端返回?cái)?shù)據(jù)字節(jié)長度,不能亂寫,否則會導(dǎo)致獲取非健康檢查響應(yīng)報文數(shù)據(jù)存在多余內(nèi)容,不符合格式,從而導(dǎo)致數(shù)據(jù)解析問題
                                  sock.recv(11)
                                  self._pool.put(sock)
                                  self.busy_sockets_dict.pop(sock)
                              except (socket.error, ConnectionResetError):
                                  _logger.error('socket連接健康檢查出錯:%s, 關(guān)閉失效連接并創(chuàng)建新連接替換' % traceback.format_exc())
                                  sock.close()  # 關(guān)閉失效連接并創(chuàng)建新連接替換
                                  self.busy_sockets_dict.pop(sock)
      
                                  new_sock = self._create_socket()
                                  self._pool.put(new_sock)
                          
                          # 如果sock數(shù)量小于最小數(shù)量,則補(bǔ)充
                          for _ in range(0, self.min_connections - self._pool.qsize()):
                              new_sock = self._create_socket()
                              self._pool.put(new_sock)
                      time.sleep(60)  # 每60秒檢查一次
              threading.Thread(target=check, daemon=True).start()
      
          def get_connection(self):
              '''
              從池中獲取一個可用連接
              :return: socket對象
              '''
              
              with self._sock_lock:
                  if self._pool.empty():
                      if len(self.busy_sockets_dict.keys()) < self.max_connections:
                          new_sock = self._create_socket()
                          self.busy_sockets_dict[new_sock] = 1
                          return new_sock
                      else:
                          raise Empty('No available connections in pool')
                  else:
                      try:
                          sock = self._pool.get(block=False)
                          self.busy_sockets_dict[sock] = 1
                          return sock
                      except Exception:
                          _logger.error('獲取socket連接出錯:%s' % traceback.format_exc())
                          raise
                     
      
          def release_connection(self, sock):
              '''
              將連接歸還到池中
              :param sock: 待歸還的socket對象
              '''
              if not sock._closed:
                  self._pool.put(sock)
              if sock in self.busy_sockets_dict:
                  self.busy_sockets_dict.pop(sock)
      
      
          def close_all(self):
              '''關(guān)閉池中所有連接'''
              
              while not self._pool.empty():
                  sock = self._pool.get()
                  sock.close()
                  self.busy_sockets_dict.pop(sock.id)
              self.busy_sockets_dict = {} # 兜底
      
      host = os.environ.get('MODBUS_TCP_SERVER_HOST', '127.0.0.1')
      port = int(os.environ.get('MODBUS_TCP_SERVER_PORT', '9000'))
      min_connections = int(os.environ.get('DJANGO_SOCKET_POOL_MAX_CONNECTIONS', '10'))
      max_connections = int(os.environ.get('DJANGO_SOCKET_POOL_MAX_CONNECTIONS', '100'))
      socketPool = SocketPool(host, port, min_connections, max_connections)
      

      使用連接池

      from socket_pool import socketPool
      
      def send_socket_msg(data):
          global socketPool
          
          try:
              sock = None
              # 獲取連接(支持超時控制)
              sock = socketPool.get_connection()
              # 發(fā)送數(shù)據(jù)
              sock.sendall(data.encode('utf-8'))
          except Exception:
              error_msg = '發(fā)送消息出錯:%s' % traceback.format_exc()
              _logger.error(error_msg)
              
              if sock is not None:
                  sock.close()
                  socketPool.release_connection(sock)
              return send_socket_msg(data)
          
          response = ''
          try:
              while True:
                  chunk = sock.recv(4096)
                  chunk = chunk.decode('utf-8')
                  response += chunk
                  if response.endswith('<END>'):
                      response = response.rstrip('<END>')
                      return {'success':True, 'message':response}
          except Exception:
              error_msg = '獲取消息出錯:%s' % traceback.format_exc()
              _logger.error(error_msg)
              return {'success':False, 'message': error_msg}
          finally:
              # 必須歸還連接!
              socketPool.release_connection(sock)
      
      posted @ 2025-05-02 11:15  授客  閱讀(167)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 97久久综合亚洲色hezyo| 国内精品久久久久久久97牛牛| 国产免费高清69式视频在线观看| 成人aⅴ综合视频国产| 综1合AV在线播放| 亚洲高清日韩专区精品| 亚洲成在人线AⅤ中文字幕| 亚洲精品成人福利网站| 国产乱码精品一区二区三区中文 | 免费无码成人AV片在线| 无码天堂va亚洲va在线va| 色呦呦九九七七国产精品| 亚洲精品无码成人A片九色播放 | 中文字幕国产精品二区| 中文字幕日韩一区二区三区不卡| 免费AV片在线观看网址| 亚洲国产av无码综合原创国产| 国产精品一区二区性色av| 激情内射亚州一区二区三区爱妻| 午夜福利影院不卡影院| 免费三级网站| 午夜福利国产区在线观看| 婷婷色香五月综合缴缴情香蕉| 人妻激情乱人伦视频| 东京热一精品无码av| 无码专区 人妻系列 在线| 国产欧美日韩精品丝袜高跟鞋| 国产精品久久国产精麻豆| 日本一本正道综合久久dvd | 国产婷婷综合在线视频中文 | 一区二区免费高清观看国产丝瓜| 精品国产成人一区二区| 国产亚洲精品成人av在线| 国产jjizz女人多水喷水| 精品人妻中文字幕有码在线| 中文国产不卡一区二区| 国产女人水真多18毛片18精品 | 沙河市| 热久在线免费观看视频| 亚洲国产成人久久精品app| 2019国产精品青青草原|