python類、線程、定時、信號量、窗體簡單構建
線程
import threading
print("# 啟動串口線程")
serial_worker = threading.Thread(target=new_def_function, daemon=True)
serial_worker.start()
線程阻塞
import threading
import time
print("# 創建一個信號量對象,允許同時有 2 個線程訪問資源")
semaphore = threading.Semaphore(1)
print("# 定義一個函數作為線程的執行體")
def worker_thread(name):
while(True):
print(f"{name} 線程正在等待獲取信號量...")
semaphore.acquire()
try:
print(f"{name} 線程已獲取信號量,開始執行")
time.sleep(2) # 模擬耗時操作
print(f"{name} 線程執行完畢,釋放信號量\n")
finally:
semaphore.release()
serial_worker = threading.Thread(target=worker_thread,args=(f"線程",), daemon=True)
serial_worker.start()
while(True):
semaphore.acquire()
print("主函數while")
time.sleep(1)
print("主函數whileOK\n")
# # 釋放兩次信號量,相當于喂兩次狗
# semaphore.release()
semaphore.release()
time.sleep(0.1)
通過 ** semaphore = threading.Semaphore(1) **初值和 ** semaphore.acquire()、semaphore.release() 實現,也可以,通過 ** event = threading.Event() 和 ** event.set() **,以及 ** event.wait()、event.clear() **實現。
# 創建一個事件對象,用于線程間的同步
event = threading.Event()
def worker_thread(name):
while True:
print(f"--{name} 線程正在等待喂狗...")
# 等待事件被設置
event.wait()
# 事件被設置后,重置事件
event.clear()
try:
print(f"{name} 線程開始執行")
while True:
# 設置事件,通知子線程可以執行
event.set()
if event_begin.is_set():
print("event_begin 已被設置,主程序繼續執行")
print("==主函數喂狗\n")
線程_定時器
def repeatead_timer(interval, function, *args, **kwargs):
stopped = threading.Event()
def loop():
while not stopped.wait(interval):
function(*args, **kwargs)
threading.Thread(target = loop).start()
return stopped
timer_stopped = repeatead_timer(0.5, window.myhandle) 類中函數
窗體
from PyQt5.QtWidgets import QApplication, QMainWindow, QLabel, QPushButton, QComboBox, QVBoxLayout, QWidget, QLineEdit, QHBoxLayout
from PyQt5.QtWidgets import QSpacerItem, QTextEdit
class SerialReader(QMainWindow):# 2. 主窗口類定義 - SerialReader1
def __init__(self):
super().__init__()
self.init_ui()# 2.1 界面初始化 - init_ui
self.connect_signals()
def init_ui(self):
self.setWindowTitle("串口數據讀取 - 經緯度顯示")
print("# 查找可用串口并添加到下拉框")
port_names = ["1","2","3","4"] # ports = list_ports.comports(); [port.device for port in ports]
self.serial_port_combobox_s1 = QComboBox(self)
self.serial_port_combobox_s1.addItems(port_names)
left_layout = QVBoxLayout()
port_layout = QHBoxLayout()
port_layout.addWidget(QLabel("串口_S1:"))
port_layout.addWidget(self.serial_port_combobox_s1)
left_layout.addLayout(port_layout)
main_layout = QHBoxLayout()
main_layout.addLayout(left_layout)
spacer = QSpacerItem(20, 20 * 3)
main_layout.addItem(spacer)
central_widget = QWidget()
central_widget.setLayout(main_layout)
self.setCentralWidget(central_widget)
def add_data(self): # self.
self.update_plot(100)
def update_plot(self,f):
pass
def connect_signals(self):
self.serial_port_combobox_s1.currentIndexChanged.connect(self.toggle_serial_reading)
def toggle_serial_reading(self):
print("hello")
import sys
if __name__ == "__main__":
try:
app = QApplication(sys.argv)
window = SerialReader()
window.show()
sys.exit(app.exec_())
except KeyboardInterrupt:
print("程序終止")
浙公網安備 33010602011771號