toga 異步處理
from concurrent import futures
import toga
from toga.style import Pack
from toga.style.pack import COLUMN, CENTER
import asyncio
import time
def sync_function1(name = 'bbb'):
print(f"開始執行同步函數,name: {name}")
time.sleep(2)
print("同步函數執行完成")
return "執行結果"
def func(x):
print(x['data'])
time.sleep(0.5)
def sync_function2():
print(f"開始執行異步函數")
lst = [
{
'data':'1'
},
{
'data': '2'
},
{
'data': '3'
}
]
with futures.ThreadPoolExecutor(max_workers = 4) as executor:
result = executor.map(func, lst)
for r in result:
pass
print("異步函數執行完成")
return "執行結果"
class AsyncApp(toga.App):
async def button_handler(self, _):
await asyncio.to_thread(sync_function1, 'aaa')
await asyncio.to_thread(sync_function1)
await asyncio.to_thread(sync_function2)
def startup(self):
button = toga.Button(
'執行耗時任務',
on_press = self.button_handler,
style = Pack(padding = 50)
)
box = toga.Box(
children = [button],
style = Pack(direction = COLUMN, alignment = CENTER)
)
self.main_window = toga.MainWindow(title = self.formal_name)
self.main_window.content = box
self.main_window.show()
def main():
return AsyncApp('Async Toga App', 'org.example.asynctogaapp')
if __name__ == '__main__':
main().main_loop()
本文來自博客園,作者:Hany47315,轉載請注明原文鏈接:http://www.rzrgm.cn/hany-postq473111315/p/18834200

浙公網安備 33010602011771號