Odoo 異步非阻塞任務
一、概述
queue_job 是一個用于在 Odoo 中實現異步任務隊列的模塊,它能夠幫助你處理長時間運行的任務(例如導入大量數據、生成報表等),并且可以在后臺異步執行,而不會阻塞 Odoo 的主線程。使用 queue_job 可以顯著提升 Odoo 系統的性能和響應速度。
常見的使用場景包括:
- 數據導入導出
- 生成和發送報表
- 批量處理操作
- 定期任務調度
二、Demo
from odoo import models, fields, api from odoo.addons.queue_job.job import job import random class DemoQueueJob(models.Model): _name = 'demo.queue.job' _description = 'Demo Queue Job' name = fields.Char('Job Name', default="Queue Job Demo") description = fields.Text('Description') @job(priority=10) # priority 值越大優先級越高 def my_async_method(self, arg1, arg2): """ 異步任務的實際執行方法 """ print(f"開始處理異步任務:{self.name}, 參數: {arg1}, {arg2}") # 模擬一些長時間的操作 if random.choice([True, False]): # 隨機模擬任務失敗 raise ValueError("模擬任務失敗!") print(f"異步任務 {self.name} 完成.") return True def trigger_async_task(self): """ apply_async觸發異步任務并加入隊列。單獨調用不會觸發 """ self.my_async_method.apply_async(args=('value1', 'value2')) def schedule_task(self): """ 定時調度任務,每小時執行一次 """ self.my_async_method.apply_async( interval=3600, # 每小時運行一次 nextcall=fields.Datetime.now(), # 從現在開始 args=('scheduled', 'task') ) def trigger_task_with_retry(self): """ 啟動一個帶有重試機制的任務 """ try: self.my_async_method.apply_async( args=('fail', 'test'), # 強制模擬失敗 max_retries=3 # 最大重試次數為 3。也可以在job中設置max_retries ) except Exception as e: print(f"任務失敗,錯誤信息:{e}")
demo2
from odoo import models, fields from odoo.addons.queue_job.job import job import random class DemoQueueJob(models.Model): _name = 'demo.queue.job' _description = 'Demo Queue Job' name = fields.Char('Job Name', default="Queue Job Demo") description = fields.Text('Description') @job(max_retries=3, default_retry_delay=60) # 設置最大重試次數和每次重試的延遲時間(單位:秒) def my_async_method(self, arg1, arg2): """ 異步任務的實際執行方法 """ print(f"開始處理異步任務:{self.name}, 參數: {arg1}, {arg2}") # 模擬任務失敗:故意觸發異常以測試重試機制 if random.choice([True, False]): # 隨機模擬任務失敗 print("任務失敗,準備重試...") raise ValueError("模擬任務失敗!") print(f"異步任務 {self.name} 完成.") return True def trigger_task_with_retry(self): """ 啟動一個帶有重試機制的任務 """ self.my_async_method.apply_async( args=('fail', 'test') # 強制模擬失敗 )
Odoo18之后的版本
1 # models.Model模型中自帶with_delay來異步執行 2 def __self_with_delay(self): 3 return self.with_delay( 4 priority=10, # 優先級 5 eta=datetime.now() + timedelta(microseconds=200), # 延遲200毫秒執行 6 max_retries=3, # 最大重試次數3 7 description=f"3D模型解析【{self.file_md5}】", # 描述 8 channel=self._name, # 通道 9 identity_key=self.file_md5, # 使用唯一標識符 10 )

浙公網安備 33010602011771號