python3的schedule模塊
一、schedule模塊:
1、基本操作:
import schedule
import time
def hello(name):
print("hello %s" % name)
def job():
print("I'm working...")
# 每十分鐘執(zhí)行任務(wù)
schedule.every(10).minutes.do(job)
# 每個(gè)小時(shí)執(zhí)行任務(wù)
schedule.every().hour.do(job)
# 每天的10:30執(zhí)行任務(wù)
schedule.every().day.at("10:30").do(job)
# 每個(gè)月執(zhí)行任務(wù)
schedule.every().monday.do(job)
# 每個(gè)星期三的13:15分執(zhí)行任務(wù)
schedule.every().wednesday.at("13:15").do(job)
# 每分鐘的第17秒執(zhí)行任務(wù)
schedule.every().minute.at(":17").do(job)
schedule.every(5).seconds.do(hello,"kevin")
while True:
schedule.run_pending()
time.sleep(1)
上面的代碼表示每10分鐘執(zhí)行一次 job 函數(shù),非常簡(jiǎn)單方便。你只需要引入 schedule 模塊,通過調(diào)用scedule.every(時(shí)間數(shù)).時(shí)間類型.do(job) 發(fā)布周期任務(wù)。
發(fā)布后的周期任務(wù)需要用run_pending函數(shù)來檢測(cè)是否執(zhí)行,因此需要一個(gè)While循環(huán)不斷地輪詢這個(gè)函數(shù)。
2、只執(zhí)行一次
不過如果你想只運(yùn)行一次任務(wù)的話,可以這么配:
import schedule
import time
def job_that_executes_once():
# 此處編寫的任務(wù)只會(huì)執(zhí)行一次...
return schedule.CancelJob
schedule.every().day.at('22:30').do(job_that_executes_once)
while True:
schedule.run_pending()
time.sleep(1)
3、獲取所有作業(yè):
all_jobs = schedule.get_jobs()
print(all_jobs)
4、取消所有作業(yè):
schedule.clear()
5、通過標(biāo)簽獲取或取消部分作業(yè):
# .tag 打標(biāo)簽
schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend')
schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend')
schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')
schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest')
# get_jobs(標(biāo)簽):可以獲取所有該標(biāo)簽的任務(wù)
friends = schedule.get_jobs('friend')
# 取消所有 daily-tasks 標(biāo)簽的任務(wù)
schedule.clear('daily-tasks')
6、設(shè)定作業(yè)截止時(shí)間:
import schedule
from datetime import datetime, timedelta, time
def job():
print('Boo')
# 每個(gè)小時(shí)運(yùn)行作業(yè),18:30后停止
schedule.every(1).hours.until("18:30").do(job)
# 每個(gè)小時(shí)運(yùn)行作業(yè),2030-01-01 18:33 today
schedule.every(1).hours.until("2030-01-01 18:33").do(job)
# 每個(gè)小時(shí)運(yùn)行作業(yè),8個(gè)小時(shí)后停止
schedule.every(1).hours.until(timedelta(hours=8)).do(job)
# 每個(gè)小時(shí)運(yùn)行作業(yè),11:32:42后停止
schedule.every(1).hours.until(time(11, 33, 42)).do(job)
# 每個(gè)小時(shí)運(yùn)行作業(yè),2020-5-17 11:36:20后停止
schedule.every(1).hours.until(datetime(2020, 5, 17, 11, 36, 20)).do(job)
7、立即運(yùn)行所有作業(yè),而不管其安排如何
import schedule
def job_1():
print('Foo')
def job_2():
print('Bar')
schedule.every().monday.at("12:40").do(job_1)
schedule.every().tuesday.at("16:40").do(job_2)
schedule.run_all()
# 立即運(yùn)行所有作業(yè),每次作業(yè)間隔10秒
schedule.run_all(delay_seconds=10)
8、裝飾器安排作業(yè)
from schedule import every, repeat, run_pending
import time
# 此裝飾器效果等同于 schedule.every(10).minutes.do(job)
@repeat(every(10).minutes)
def job():
print("I am a scheduled job")
while True:
run_pending()
time.sleep(1)
9、并行執(zhí)行
不過你可以通過多線程的形式來運(yùn)行每個(gè)作業(yè)以解決此限制:
import threading
import time
import schedule
def job1():
print("I'm running on thread %s" % threading.current_thread())
def job2():
print("I'm running on thread %s" % threading.current_thread())
def job3():
print("I'm running on thread %s" % threading.current_thread())
def run_threaded(job_func):
job_thread = threading.Thread(target=job_func)
job_thread.start()
schedule.every(10).seconds.do(run_threaded, job1)
schedule.every(10).seconds.do(run_threaded, job2)
schedule.every(10).seconds.do(run_threaded, job3)
while True:
schedule.run_pending()
time.sleep(1)
10、日志記錄
Schedule 模塊同時(shí)也支持 logging 日志記錄,這么使用:
import schedule
import logging
logging.basicConfig()
schedule_logger = logging.getLogger('schedule')
# 日志級(jí)別為DEBUG
schedule_logger.setLevel(level=logging.DEBUG)
def job():
print("Hello, Logs")
schedule.every().second.do(job)
schedule.run_all()
schedule.clear()
效果如下:
DEBUG:schedule:Running *all* 1 jobs with 0s delay in between
DEBUG:schedule:Running job Job(interval=1, unit=seconds, do=job, args=(), kwargs={})
Hello, Logs
DEBUG:schedule:Deleting *all* jobs
11、異常處理
Schedule 不會(huì)自動(dòng)捕捉異常,它遇到異常會(huì)直接拋出,這會(huì)導(dǎo)致一個(gè)嚴(yán)重的問題:后續(xù)所有的作業(yè)都會(huì)被中斷執(zhí)行,因此我們需要捕捉到這些異常。
你可以手動(dòng)捕捉,但是某些你預(yù)料不到的情況需要程序進(jìn)行自動(dòng)捕獲,加一個(gè)裝飾器就能做到了:
import functools
def catch_exceptions(cancel_on_failure=False):
def catch_exceptions_decorator(job_func):
@functools.wraps(job_func)
def wrapper(*args, **kwargs):
try:
return job_func(*args, **kwargs)
except:
import traceback
print(traceback.format_exc())
if cancel_on_failure:
return schedule.CancelJob
return wrapper
return catch_exceptions_decorator
@catch_exceptions(cancel_on_failure=True)
def bad_task():
return 1 / 0
schedule.every(5).minutes.do(bad_task)
這樣,bad_task在執(zhí)行時(shí)遇到的任何錯(cuò)誤,都會(huì)被catch_exceptions 捕獲,這點(diǎn)在保證調(diào)度任務(wù)正常運(yùn)轉(zhuǎn)的時(shí)候非常關(guān)鍵。
浙公網(wǎng)安備 33010602011771號(hào)