# 克隆自聚寬文章:https://www.joinquant.com/post/38940
# 標題:年初至今4倍,極致的Day Trading,56.8%勝率
# 作者:Dr.QYQ
'''
優化說明:
1.使用修正標準分
rsrs_score的算法有:
僅斜率slope,效果一般;
僅標準分zscore,效果不錯;
修正標準分 = zscore * r2,效果最佳;
右偏標準分 = 修正標準分 * slope,效果不錯。
2.將原策略的每次持有兩只etf改成只買最優的一個,收益顯著提高
3.將每周調倉換成每日調倉,收益顯著提高
4.因為交易etf,所以手續費設為萬分之三,印花稅設為零,未設置滑點
5.修改股票池中候選etf,刪除銀行,紅利等收益較弱品種,增加納指etf以增加不同國家市場間輪動的可能性
6.根據研報,默認參數介已設定為最優
7.加入防未來函數
8.增加擇時與選股模塊的打印日志,方便觀察每筆操作依據
'''
# 導入函數庫
from jqdata import *
from jqlib.technical_analysis import *
import numpy as np
from datetime import datetime, timedelta
import pandas as pd
# 初始化函數
def initialize(context):
# 設定滬深300作為基準
set_benchmark('000300.XSHG')
# 用真實價格交易
set_option('use_real_price', True)
# 打開防未來函數
set_option("avoid_future_data", True)
# 將滑點設置為0
set_slippage(FixedSlippage(0.001))
# 設置交易成本萬分之三
# set_order_cost(OrderCost(open_tax=0, close_tax=0, open_commission=0.0003, close_commission=0.0003, close_today_commission=0, min_commission=5),
# type='fund')
# 股票類每筆交易時的手續費是:買入時無傭金,賣出時傭金萬分之1.5,印花稅0.1%, 每筆交易傭金最低扣5塊錢
set_order_cost(OrderCost(close_tax=0.001,
close_commission=0.00015, min_commission=5), type='stock')
# 過濾order中低于error級別的日志
log.set_level('order', 'error')
# 初始化各類全局變量
# 動量輪動參數
g.stock_num = 5 # 篩選的標的支數。
g.stock_tobuy = 1 # 需要購買的股票數
g.momentum_day = 29 # 最新動量參考最近momentum_day的
g.num_days = 5 # 計算分數變化
# rsrs擇時參數
g.ref_stock = '000300.XSHG' # 用ref_stock做擇時計算的基礎數據
g.N = 14 # 計算最新斜率slope,擬合度r2參考最近N天
g.M = 600 # 計算最新標準分zscore,rsrs_score參考最近M天
g.score_threshold = 0.7 # rsrs標準分指標閾值
# 個股擇時參數
g.sec_data_num = 5 # 個股數據點數
# g.take_profit = 0.12 # 移動止盈
# ma擇時參數
g.mean_day = 7 # 計算ref_stock結束ma收盤價,參考最近mean_day
# 計算初始ma收盤價,參考(mean_day + mean_diff_day)天前,窗口為mean_diff_day的一段時間
g.mean_diff_day = 10
g.slope_series = initial_slope_series()[:-1] # 除去回測第一天的slope,避免運行時重復加入
# 設置交易時間,每天運行
run_daily(my_trade, time='09:31', reference_security='000300.XSHG')
run_daily(check_lose, time='14:50', reference_security='000300.XSHG')
# run_daily(check_profit, time='10:00')
run_daily(print_trade_info, time='15:05', reference_security='000300.XSHG')
# 0-0 選取股票池
def get_stock_pool():
# preday = str(date.today() - timedelta(1)) # get previous date
# 從多個熱門概念中選出市值在50億以上,500億以下的標的。
concept_names = list(set([
"虛擬現實",
"元宇宙",
"鋰電池",
"集成電路",
"國產軟件",
"MiniLED",
"智能穿戴",
"智能電網",
"智能醫療",
"風電",
"核電",
"電力物聯網",
"電力改革",
"量子通信",
"互聯網+",
"光伏",
"工業4.0",
"特高壓",
"氟化工",
"煤化工",
"稀土永磁",
"白酒",
"煤炭",
"鈷",
"鹽湖提鋰",
"磷化工",
"草甘膦",
"航運",
"第三代半導體",
"太陽能",
"柔性屏",
"芯片",
"新能源",
"智能音箱",
"蘋果",
"特斯拉",
"寧德時代",
"碳中和",
"軍工",
"軍民融合",
"海工裝備",
"超級電容",
"區塊鏈",
"邊緣計算",
"云計算",
"數字貨幣",
"人工智能",
"汽車電子",
"無人駕駛",
"車聯網",
"網約車",
"充電樁",
"冷鏈物流",
"OLED",
"大飛機",
"大數據",
"燃料電池",
"醫療器械",
"生物疫苗",
"生物醫藥",
"輔助生殖",
"健康中國",
"基因測序",
"超級真菌",
"節能環保",
"裝配式建筑",
"鄉村振興",
"建筑節能",
"文化傳媒",
"電子競技",
"網絡游戲",
"數據中心",
"高端裝備",
'三胎',
'養老',
"稀缺資源",
"稀土永磁",
"新材料",
"綠色電力"
]))
all_concepts = get_concepts()
concept_codes = []
for name in concept_names:
#print(f'concept is:{name}')
code = all_concepts[all_concepts['name'] == name].index[0]
concept_codes.append(code)
all_concept_stocks = []
for concept in concept_codes:
all_concept_stocks += get_concept_stocks(concept)
q = query(valuation.code).filter(valuation.market_cap >= 30,
valuation.market_cap <= 1000, valuation.code.in_(all_concept_stocks))
stock_df = get_fundamentals(q)
stock_pool = [code for code in stock_df['code']]
# 移除創業板和科創板標的
stock_pool = [code for code in stock_pool if not (
code.startswith('30') or code.startswith('688'))]
stock_pool = filter_st_stock(stock_pool) # 去除st
stock_pool = filter_paused_stock(stock_pool) # 去除停牌
return stock_pool
# 1-1 選股模塊-動量因子輪動
# 基于股票年化收益和判定系數打分,并按照分數從大到小排名
def get_rank(stock_pool, context):
'''get rank score for stocks in stock pool'''
send_info = []
stock_dict_list = []
for stock in stock_pool:
score_list = []
pre_dt = context.current_dt - timedelta(1)
data = get_price(
stock,
end_date=context.current_dt,
count=100, # 多取幾天以防數據不夠
frequency="120m",
fields=["close"],
skip_paused=True,
) # 最新的在最下面
security_info = get_security_info(stock)
stock_name = security_info.display_name
# print(f'stock name {stock_name}')
# 對于次新股票,可能沒有數據,所以要drop NA
data = data.dropna()
# 收盤價
y = data["log"] = np.log(data.close)
# print(f'{len(y)} data points')
# 分析的數據個數(天)
x = data["num"] = np.arange(data.log.size)
# 擬合 1 次多項式
# y = kx + b, slope 為斜率 k,intercept 為截距 b
# slope, intercept = np.polyfit(x, y, 1)
# 直接連接首尾點計算斜率
if len(y) < g.momentum_day + g.num_days:
print("次新股,用所有數據")
slope = (y.iloc[-1] - y.iloc[0]) / g.momentum_day # 最新的在最上面
# print(f'slope: {slope}\n')
# 擬合出截距
try:
_, intercept = np.polyfit(x,y,1)
except ValueError:
print('Can not fit intercept, use first y value instead')
intercept = y.iloc[0]
# intercept = y.iloc[0]
# (e ^ slope) ^ 250 - 1
annualized_returns = math.pow(math.exp(slope), 250) - 1
r_squared = 1 - (
sum((y - (slope * x + intercept)) ** 2) /
((g.momentum_day - 1) * np.var(y, ddof=1))
)
score = annualized_returns * np.abs(r_squared)
# print(f'score: {score}\n')
score_list.append(score)
else:
slope = [(y.iloc[-1-D] - y.iloc[-g.momentum_day-D]) /
g.momentum_day for D in range(g.num_days)] # 最新的在最上面
# print(f'slope: {slope}\n')
# intercept = [y.iloc[-g.momentum_day-D] for D in range(g.num_days)]
# (e ^ slope) ^ 250 - 1
for i in range(g.num_days):
annualized_returns = math.pow(math.exp(slope[i]), 250) - 1
if i == 0: # 如果i=0,則前n天數據為df.iloc[-n::]
_, intercept = np.polyfit(x[-g.momentum_day-i::], y.iloc[-g.momentum_day-i::],1)
r_squared = 1 - (
sum((y.iloc[-g.momentum_day-i::] - (slope[i] * x[-g.momentum_day-i::] + intercept)) ** 2) /
((g.momentum_day - 1) * np.var(y[-g.momentum_day-i::], ddof=1))
)
score = annualized_returns * np.abs(r_squared)
# print(f'score: {score}\n')
score_list.append(score)
else:
_, intercept = np.polyfit(x[-g.momentum_day-i:-i], y.iloc[-g.momentum_day-i:-i],1)
r_squared = 1 - (
sum((y.iloc[-g.momentum_day-i:-i] - (slope[i] * x[-g.momentum_day-i:-i] + intercept)) ** 2) /
((g.momentum_day - 1) * np.var(y[-g.momentum_day-i:-i], ddof=1))
)
score = annualized_returns * np.abs(r_squared)
# print(f'score: {score}\n')
score_list.append(score)
stock_dict_tmp = {stock_name: score_list}
stock_dict_list.append(stock_dict_tmp)
# merge list of dictionaries into one dictionary
stock_dict = {k: v for d in stock_dict_list for k, v in d.items()}
# create pandas dataframe from dict
stock_df = pd.DataFrame.from_dict(stock_dict, orient='index')
stock_df['code'] = stock_pool
# sort by latest score
stock_df = stock_df.sort_values(by=[0], ascending=False)
# get top num stocks
stock_top_names = stock_df.index.values[:g.stock_num]
stock_top_codes = stock_df['code'].values[:g.stock_num]
# get all stock names and codes
stock_names = stock_df.index.values
stock_codes = stock_df['code'].values
# print results
print("#" * 30 + "候選" + "#" * 30)
for name, code in zip(stock_top_names, stock_top_codes):
print("{}({}):{}".format(name, code,
stock_df[0][name]))
print("#" * 64)
return stock_top_names,stock_top_codes, stock_df
def rank_stock_change(df):
'''rank_stock_plot
line plot the score variation for *num_days* for each rank stock.
'''
df.drop(['code'],axis=1, inplace=True)
stock_df = df.head(g.stock_num) # 只取頭部
rank_stock_dif = stock_df.diff(
axis=1, periods=-1).dropna(axis=1)
return rank_stock_dif
# 2-1 擇時模塊-計算線性回歸統計值
# 對輸入的自變量每日最低價x(series)和因變量每日最高價y(series)建立OLS回歸模型,返回元組(截距,斜率,擬合度)
def get_ols(x, y):
slope, intercept = np.polyfit(x, y, 1)
r2 = 1 - (sum((y - (slope * x + intercept))**2) /
((len(y) - 1) * np.var(y, ddof=1)))
return (intercept, slope, r2)
# 2-2 擇時模塊-設定初始斜率序列
# 通過前M日最高最低價的線性回歸計算初始的斜率,返回斜率的列表
def initial_slope_series():
data = attribute_history(g.ref_stock, g.N + g.M, '1d', ['high', 'low'])
return [get_ols(data.low[i:i+g.N], data.high[i:i+g.N])[1] for i in range(g.M)]
# 2-3 擇時模塊-計算標準分
# 通過斜率列表計算并返回截至回測結束日的最新標準分
def get_zscore(slope_series):
mean = np.mean(slope_series)
std = np.std(slope_series)
return (slope_series[-1] - mean) / std
# 2-4 擇時模塊-計算綜合信號
# 1.獲得rsrs與MA信號,rsrs信號算法參考優化說明,MA信號為一段時間兩個端點的MA數值比較大小
# 2.信號同時為True時返回買入信號,同為False時返回賣出信號,其余情況返回持倉不變信號
# 3.改進:加入個股的賣點判據
def get_timing_signal(stock, rank_stock_diff, context):
'''
計算大盤信號: RSRS + MA
'''
# 計算MA信號
close_data = attribute_history(
g.ref_stock, g.mean_day + g.mean_diff_day, '1d', ['close'])
today_MA = close_data.close[g.mean_diff_day:].mean()
before_MA = close_data.close[:-g.mean_diff_day].mean()
# 計算rsrs信號
high_low_data = attribute_history(g.ref_stock, g.N, '1d', ['high', 'low'])
intercept, slope, r2 = get_ols(high_low_data.low, high_low_data.high)
g.slope_series.append(slope)
rsrs_score = get_zscore(g.slope_series[-g.M:]) * r2 # 修正標準分
print(
f'today_MA is {today_MA}, before_MA is {before_MA}, rsrs score is {rsrs_score}')
'''
個股擇時:
1. MA5買賣點
2. 3日斜率
3. 移動止盈
4. 效果不如不要。。。。
5. 試試MCAD
6. 試試KDJ
'''
# 計算個股x日斜率
# close_data_sec = attribute_history(stock, g.sec_data_num, '1d', ['close'])
# current_price = attribute_history(
# stock, 1, '1m', ['close']) # get current stock price
# MA_sec = close_data_sec.close.mean()
# print(f'現價:{current_price.close[-1]}, MA{g.sec_data_num}: {MA_sec}')
# 計算2日斜率
# close_data_sec_ = attribute_history(stock, 2, '1d', ['close'])
# y = close_data_sec_['log'] = np.log(close_data_sec_.close)
# x = close_data_sec_['num'] = np.arange(close_data_sec_.log.size)
# slope_sec,_ = np.polyfit(x,y,1)
# print(f'Slope < 0: {slope_sec<0}')
# 移動止盈
# stock_data = attribute_history(stock,g.sec_data_num,'1d',['close','high'])
# stock_price = attribute_history(stock, 1, '1m', 'close')
# highest = stock_data.close.max()
# profit = highest*(1-g.take_profit) # 移動止盈線
# MACD
# dif, dea, macd = MACD(stock, check_date=context.current_dt,
# SHORT=12, LONG=29, MID=7, unit='1d')
# KDJ
# K, D, J = KDJ(stock, check_date=context.current_dt,
# unit='1d', N=9, M1=3, M2=3)
# 連續num_days分數變化
# stock_name = get_security_info(stock).display_name
# print(f'今日自選股是{stock_name}')
stock_dif = rank_stock_diff.loc[stock]
# 如果連續num_days日下降即sig=num_days,賣出
sig = np.sum((stock_dif < 0).astype(int))
print(f'連續下降{sig}日')
#綜合判斷所有信號:大盤信號 + 個股信號
if sig < 2: # rsrs_score > g.score_threshold and today_MA > before_MA and sig < 2 :
print('BUY')
return "BUY"
# (rsrs_score < -g.score_threshold and today_MA < before_MA) or sig >= 2:
elif sig >= 2:
print('SELL')
return "SELL"
else:
print('KEEP')
return "KEEP"
# 3-1 過濾模塊-過濾停牌股票
# 輸入選股列表,返回剔除停牌股票后的列表
def filter_paused_stock(stock_list):
current_data = get_current_data()
return [stock for stock in stock_list if not current_data[stock].paused]
# 3-2 過濾模塊-過濾ST及其他具有退市標簽的股票
# 輸入選股列表,返回剔除ST及其他具有退市標簽股票后的列表
def filter_st_stock(stock_list):
current_data = get_current_data()
return [stock for stock in stock_list
if not current_data[stock].is_st]
# 3-3 過濾模塊-過濾漲停的股票
# 輸入選股列表,返回剔除未持有且已漲停股票后的列表
def filter_limitup_stock(context, stock_list):
last_prices = history(1, unit='1m', field='close',
security_list=stock_list)
current_data = get_current_data()
# 已存在于持倉的股票即使漲停也不過濾,避免此股票再次可買,但因被過濾而導致選擇別的股票
return [stock for stock in stock_list if stock in context.portfolio.positions.keys()
or last_prices[stock][-1] < current_data[stock].high_limit]
# 3-4 過濾模塊-過濾跌停的股票
# 輸入股票列表,返回剔除已跌停股票后的列表
def filter_limitdown_stock(context, stock_list):
last_prices = history(1, unit='1m', field='close',
security_list=stock_list)
current_data = get_current_data()
return [stock for stock in stock_list if stock in context.portfolio.positions.keys()
or last_prices[stock][-1] > current_data[stock].low_limit]
# 4-1 交易模塊-自定義下單
# 報單成功返回報單(不代表一定會成交),否則返回None,應用于
def order_target_value_(security, value):
if value == 0:
log.debug("Selling out %s" % (security))
else:
log.debug("Order %s to value %f" % (security, value))
# 如果股票停牌,創建報單會失敗,order_target_value 返回None
# 如果股票漲跌停,創建報單會成功,order_target_value 返回Order,但是報單會取消
# 部成部撤的報單,聚寬狀態是已撤,此時成交量>0,可通過成交量判斷是否有成交
return order_target_value(security, value)
# 4-2 交易模塊-開倉
# 買入指定價值的證券,報單成功并成交(包括全部成交或部分成交,此時成交量大于0)返回True,報單失敗或者報單成功但被取消(此時成交量等于0),返回False
def open_position(security, value):
order = order_target_value_(security, value)
if order != None and order.filled > 0:
return True
return False
# 4-3 交易模塊-平倉
# 賣出指定持倉,報單成功并全部成交返回True,報單失敗或者報單成功但被取消(此時成交量等于0),或者報單非全部成交,返回False
def close_position(position):
security = position.security
if position.total_amount != 0:
order = order_target_value_(security, 0) # 可能會因停牌失敗
else:
print(f'目前沒有持有{get_security_info(security).display_name}')
return position
if order != None:
if order.status == OrderStatus.held and order.filled == order.amount:
return True
return False
# 4-4 交易模塊-調倉
# 當擇時信號為買入時開始調倉,輸入過濾模塊處理后的股票列表,執行交易模塊中的開平倉操作
def adjust_position(context, buy_stock, stock_position):
# 根據股票數量分倉
# 此處只根據可用金額平均分配購買,不能保證每個倉位平均分配
position_count = len(context.portfolio.positions)
if buy_stock not in context.portfolio.positions:
if g.stock_tobuy > position_count:
value = context.portfolio.cash / (g.stock_tobuy - position_count)
if context.portfolio.positions[buy_stock].total_amount == 0:
open_position(buy_stock, value)
else:
stock = list(context.portfolio.positions.keys())[
stock_position]
log.info("[%s]已不在應買入列表中" % (stock))
position = context.portfolio.positions[stock]
close_position(position)
# recount the securities after selling out old ones
position_count = len(context.portfolio.positions)
value = context.portfolio.cash / (g.stock_tobuy - position_count)
if context.portfolio.positions[buy_stock].total_amount == 0:
open_position(buy_stock, value)
else:
log.info("[%s]已經持有無需重復買入" % (buy_stock))
# 4-5 交易模塊-擇時交易
# 結合擇時模塊綜合信號進行交易
def my_trade(context):
# if context.current_dt.minute != 35:
# return
# 以下的代碼每小時跑一次
# 獲取選股列表并過濾掉:st,st*,退市,漲停,跌停,停牌
stock_pool = get_stock_pool()
checkout_names, check_out_list, rank_stock = get_rank(stock_pool, context)
print(check_out_list)
print(f'check_out_list is {check_out_list}')
rank_stock_diff = rank_stock_change(rank_stock)
# print(f'stock_df is:\n {g.stock_df}')
# check_out_list = filter_st_stock(check_out_list)
# check_out_list = filter_limitup_stock(context, check_out_list)
check_out_list = filter_limitdown_stock(context, check_out_list)
check_out_list = filter_paused_stock(check_out_list)
if not check_out_list: # empoty list is False
print(f'Stock is limit up or limit down.')
else:
# check if the position still in the buying list
for stock_pos in context.portfolio.positions:
if stock_pos not in check_out_list[0:g.stock_tobuy]:
log.info("舊龍頭已不再買入列表,賣出")
position = context.portfolio.positions[stock_pos]
close_position(position)
# print('今日自選股:{}'.format(get_security_info(check_out_list[0]).display_name))
# 獲取綜合擇時信號
count = 0
for stock_name, stock_code in zip(checkout_names[0:g.stock_tobuy], check_out_list[0:g.stock_tobuy]):
print(f'今日自選股:{stock_name}')
timing_signal = get_timing_signal(stock_name, rank_stock_diff, context)
print(f'{stock_name} 今日擇時信號:{timing_signal}')
# 開始交易
if timing_signal == 'SELL':
position = context.portfolio.positions[stock_code]
close_position(position)
elif timing_signal == 'BUY' or timing_signal == 'KEEP':
adjust_position(context, stock_code, count)
count += 1
# break # only buy one stock
else:
pass
# 4-6 交易模塊-止損
# 檢查持倉并進行必要的止損操作
def check_lose(context):
for position in list(context.portfolio.positions.values()):
securities = position.security
cost = position.avg_cost
price = position.price
ret = 100*(price/cost-1)
value = position.value
amount = position.total_amount
# 這里設定15%止損
if ret <= -15:
order_target_value(position.security, 0)
print("!!!!!!觸發止損信號: 標的={},標的價值={},浮動盈虧={}% !!!!!!"
.format(securities, format(value, '.2f'), format(ret, '.2f')))
log.info('虧死了,溜溜溜')
# 4-7 交易模塊-止盈
# 根據移動止盈線止盈
def check_profit(context):
for stock in context.portfolio.positions:
position = context.portfolio.positions[stock]
security = position.security
price = attribute_history(security, 1, '1m', 'close')
highest = attribute_history(security, g.sec_data_num, '1d', 'high')
if price.close[-1] < highest.high.max()*(1-g.take_profit):
print('觸發止盈,賣賣賣')
close_position(position)
# 5-1 復盤模塊-打印
# 打印每日持倉信息
def print_trade_info(context):
# 打印當天成交記錄
trades = get_trades()
for _trade in trades.values():
print('成交記錄:'+str(_trade))
# 打印賬戶信息
for position in list(context.portfolio.positions.values()):
securities = position.security
cost = position.avg_cost
price = position.price
ret = 100*(price/cost-1)
value = position.value
amount = position.total_amount
print('代碼:{}'.format(securities))
print('成本價:{}'.format(format(cost, '.2f')))
print('現價:{}'.format(price))
print('收益率:{}%'.format(format(ret, '.2f')))
print('持倉(股):{}'.format(amount))
print('市值:{}'.format(format(value, '.2f')))
print('一天結束')
print('———————————————————————————————————————分割線————————————————————————————————————————')