sub和pub套接字丟失消息研究
sub和pub套接字丟失消息研究
問題描述:
使用pub和sub套接字時,無論是先啟動服務端還是客戶端,在連接之后就會丟失第一個消息。
代碼示例:
pub:
import zmq
import time
# 創建上下文
context = zmq.Context()
# 創建 PUB 套接字
socket = context.socket(zmq.PUB)
# 綁定到端口 5555
socket.connect("tcp://192.168.198.198:5555")
for i in range(5):
# 向所有訂閱者發布消息
socket.send_string("center-Hello from agent to cneter!")
socket.send_string("app-Hello from agent to app!")
print("Sent message from A")
time.sleep(2) # 每秒發送一次消息
sub:
import zmq
# 創建上下文
context = zmq.Context()
# 創建 SUB 套接字
socket = context.socket(zmq.SUB)
# 連接到主機
socket.bind("tcp://192.168.198.198:5555")
# 訂閱所有消息
socket.setsockopt_string(zmq.SUBSCRIBE, "app-")
while True:
# 接收消息
message = socket.recv_string()
msg = message.split("-")[1]
print(f"Received message on agent: {msg}")
結果展示:
發生原因:
訂閱者延遲訂閱(Subscription delay)
- ZeroMQ 的
SUB套接字會在接收到消息之前向PUB套接字聲明自己的訂閱主題(例如,訂閱的消息類型)。如果SUB套接字在PUB套接字已經開始發布消息之后才連接或開始訂閱,那么SUB會錯過PUB發布的第一個消息。 - 原因:
PUB套接字在發送第一條消息時,SUB套接字可能還沒有來得及完成其訂閱過程,導致第一條消息在SUB訂閱之前發送,因此會丟失。 - 解決方法:確保
SUB套接字在PUB開始發布之前已經連接并完成訂閱。通常,PUB套接字應該啟動并保持一段時間,然后再啟動SUB套接字,以確保SUB套接字在PUB發布消息時已經準備好接收消息。
解決方案:
在連接之后等待一會,讓訂閱者連接,然后再發送消息。
pub:
import time
import zmq
def publisher():
context = zmq.Context()
pub_socket = context.socket(zmq.PUB)
pub_socket.bind("tcp://192.168.198.198:5555")
# 等待訂閱者連接
print("Publisher waiting for subscribers to connect...")
time.sleep(2) # 等待訂閱者連接
# 發布消息
for i in range(5):
message = "Hello, World!"
print(f"Publishing: {message}")
pub_socket.send_string(message)
time.sleep(1)
if __name__ == "__main__":
publisher()
sub:
import zmq
def subscriber():
context = zmq.Context()
sub_socket = context.socket(zmq.SUB)
sub_socket.connect("tcp://192.168.198.198:5555")
# 設置訂閱過濾器,訂閱所有消息
sub_socket.setsockopt_string(zmq.SUBSCRIBE, "") # 訂閱所有消息
# 讀取消息
while True:
message = sub_socket.recv_string()
print(f"Received: {message}")
if __name__ == "__main__":
subscriber()
結果展示:

浙公網安備 33010602011771號