完整的主要包含以下几点:
1、股票池或是期货池,股票池的作用就是一个相对比较优秀的交易标的集合,或是删除比较差的交易交易集合而留下啦的集合
2、择时,何时进何时出,如均线(金叉死叉)、MACD等技术指标
3、仓位管理和风险控制,当择时信号发生,发出买入信号,需要买多少,买多了风险大,买少了收益太少,这就需要进行仓位管理;风险控制一般指止盈止损,尤其是止损
四种止盈方法
- 目标盈利法
- 回撤止盈法
- 大福波动止盈
- 技术信号止盈
四种止损方法
- 最大亏损发
- 波动率倍数止损
- 波动率标准差止损
- 技术信号止损
风险参考指标--波动率
- 标准差
- 平均真实波幅ATR
ATR:近N天真实波幅TR的算术平均值
TR :以下指标中的最大值今日最高价减去今日最低价
今日最高价减去昨日收盘价的绝对值
今日最低价减去昨日收盘价的绝对值
举例:ATR的计算
目标盈利法和最大亏损法
大幅波动止盈、波动率倍数止损
回撤止盈(浮动/跟踪止盈)
技术信号止盈/止损
以双均线交叉信号为例:
交易系统中与盈利相关的要素
仓位管理
不同阶段:建仓、加仓/减仓、清仓
资金分配:头寸规模的确定核分配
确定头寸规模的四种模型
- 每一固定金额交易一个单位
- 等价值交易单位
- 百分比风险模型
- 百分比波动幅度模型
模型1:每一固定金额交易一个单位
资金池:
把资金等分为相同金额的若干份,再出现买入信号的情况下,每份只允许交易一个单位的投资标的(比如1手股票,或是1份期货合约)
举例:
总资金:一百万
平均分为10份,每份金额:10万
交易实例一:
(待买入标的:工商银行qquad 价格:5元 qquad 一个交易单位=100股)
(买入量 = 100股 qquad 买入金额=5 * 100 = 500元)
交易实例二:
(待买入标的:贵州茅台 qquad 价格:750元 qquad 一个交易单位=100股)
(买入量 = 100股 qquad 买入金额=750 * 100 = 7.5万元)
模型2:等价值交易单位
资金池
把资金等分为相同金额的若干份,在出现买入信号的情况下,按该金额计算出每份允许交易的次投资标的单位个数(如M手股票,或N份期货合约)
举例:
总资金:一百万
平均分为10份,每份金额:10万
交易实例一:
(待买入标的:工商银行qquad 价格:5元)
(买入量 = 100000/5=20000股 qquad 买入金额=10万元)
交易实例二:
$待买入标的:贵州茅台 qquad 价格:750元 ( )买入量 = 100000/750= 100股 qquad 买入金额=7.5万元$
模型3:百分比风险模型
资金池:
根据每次交易允许承担的最大风险占总资金的比例,以及每个投资标的可接受的最大损失(即初始止损额度R),折算出可建立头寸的单位个数
CPR计算公式:
举例:
总资金:一百万
总风险:1%
单个交易标的风险:5%
交易实例:
(待买入标的:海康威视qquad 价格:40元)
(买入量 = (100000*1%)/(40 * 5%)=5000股 qquad 买入金额=40* 5000 = 20万元)
百分比波动幅度模型
资金池
根据每次交易允许承担的最大风险占总资金的比例,以及每个投资标的在一段时间内的价格波动幅度(即有可能有利或不利的价格变动范围V),折算出可建立头寸的单位个数
举例:
总资金:一百万
总风险:1%
单个交易标的风险:3倍ATR
交易实例:
(待买入标的:海康威视qquad 价格:40元 ATR=2%)
(买入量 = (100000*1%)/(40 * 2% * 3)=4100股 qquad 买入金额=40* 4100 = 16.4万元)
常见的加仓方法
交易系统的核心要素
- 市场:买卖什么
- 头寸规模:买卖多少
- 入市:什么时候买进
- 止损:什么时候放弃一个亏损的头寸
- 退出:什么时候退出一个盈利的头寸
- 战术:怎么买卖
交易系统的实现
股票池候选:上证50成分股,不调仓
双均线策略:
- 日K级别:MA10 vs MA30
- 开盘前检查信号,金叉买,死叉卖
头寸规模确定
*均仓方案(等额资金分配)
借助聚宽平台,对该模型进行编码回测:
代码如下:
# 导入函数库
from jqdata import *
# 均线
MA_WIN_1 = 10
MA_WIN_2 = 30
# 初始化函数,设定基准等等
def initialize(context):
set_benchmark('000300.XSHG')
set_option('use_real_price', True)
# log.set_level('order', 'error')
# 股票类每笔交易时的手续费是:买入时佣金万分之三,卖出时佣金万分之三加千分之一印花税, 每笔交易佣金最低扣5块钱
set_order_cost(OrderCost(close_tax=0.001, open_commission=0.0003, close_commission=0.0003, min_commission=5), type='stock')
# 定时运行函数
run_daily(before_market_open, time='before_open', reference_security='000300.XSHG')
run_daily(market_open, time='every_bar', reference_security='000300.XSHG')
run_daily(after_market_close, time='after_close', reference_security='000300.XSHG')
# 股票池 - 上证50
g.stock_pool = get_index_stocks("000016.XSHG", date=context.current_dt)
g.init_cash = context.portfolio.starting_cash # 启动资金
# 开盘前运行函数
def before_market_open(context):
look_ahead_n = max(MA_WIN_1, MA_WIN_2) + 1
g.up_cross_signaled = set()
g.down_cross_signaled = set()
for code in g.stock_pool:
df = attribute_history(code, look_ahead_n, "1d", ["close"], skip_paused=True) # 该函数返回结果不包括当天数据
if len(df) != look_ahead_n:
continue
close = df["close"]
ma_short = close.rolling(MA_WIN_1).mean() # 短时均线
ma_long = close.rolling(MA_WIN_2).mean() # 长时均线
uc_flags = (ma_short.shift(1) <= ma_long.shift(1)) & (ma_short > ma_long) # 上穿标志
dc_flags = (ma_short.shift(1) >= ma_long.shift(1)) & (ma_short < ma_long) # 下穿标志
if uc_flags.iloc[-1]:
g.up_cross_signaled.add(code)
if dc_flags.iloc[-1]:
g.down_cross_signaled.add(code)
# 开盘时运行函数
def market_open(context):
cur_dt = context.current_dt.date() # 当前日期
p = context.portfolio # 资金账户
current_data = get_current_data()
each_cash = g.init_cash / len(g.stock_pool) # 每只股票分配的资金
# 卖出均线死叉信号的持仓股
for code, pos in p.positions.items():
if code in g.down_cross_signaled:
order_target(code, 0)
# 买入均线金叉信号的持仓股
for code in g.up_cross_signaled:
if code not in p.positions:
if current_data[code].paused:
continue
open_price = current_data[code].day_open
num_to_buy = each_cash / open_price // 100 * 100
order(code, num_to_buy)
# 收盘后运行函数
def after_market_close(context):
p = context.portfolio
pos_level = p.positions_value / p.total_value
record(pos_level=pos_level)
回测结果:
2、按盈利比例均匀加仓
- 记录每只持仓股最后一次的买入价
- 如以当日开盘价相对于前一次买入价的盈利比例超过某个阈值,则等额加一次仓
注:前提是当日没有任何止损或其他买入信号的发生
代码实现:
# 导入函数库
from jqdata import *
import pandas as pd
# 均线
MA_WIN_1 = 10
MA_WIN_2 = 30
# #加仓判断阈值
INC_POS_PF_RATE = 0.05
# 初始化函数,设定基准等等
def initialize(context):
# 设定沪深300作为基准
set_benchmark('000300.XSHG')
# 开启动态复权模式(真实价格)
set_option('use_real_price', True)
# 输出内容到日志 log.info()
log.info('初始函数开始运行且全局只运行一次')
# 过滤掉order系列API产生的比error级别低的log
# log.set_level('order', 'error')
### 股票相关设定 ###
# 股票类每笔交易时的手续费是:买入时佣金万分之三,卖出时佣金万分之三加千分之一印花税, 每笔交易佣金最低扣5块钱
set_order_cost(OrderCost(close_tax=0.001, open_commission=0.0003, close_commission=0.0003, min_commission=5), type='stock')
## 运行函数(reference_security为运行时间的参考标的;传入的标的只做种类区分,因此传入'000300.XSHG'或'510300.XSHG'是一样的)
# 开盘前运行
run_daily(before_market_open, time='before_open', reference_security='000300.XSHG')
# 每分钟运行
run_daily(market_open, time='every_bar', reference_security='000300.XSHG')
# 收盘后运行
run_daily(after_market_close, time='after_close', reference_security='000300.XSHG')
#股票池-上证50
g.stock_pool = get_index_stocks('000016.XSHG',date=context.current_dt)
g.init_cash = context.portfolio.starting_cash#启动资金
g.last_entry_prices = {code:None for code in g.stock_pool}
## 开盘前运行函数
def before_market_open(context):
look_ahead_n = max(MA_WIN_1,MA_WIN_2)+1
g.up_cross_signaled = set()
g.down_cross_signaled = set()
for code in g.stock_pool:
df = attribute_history(code,look_ahead_n,'1d',fields=['close'],skip_paused=True)
if len(df) != look_ahead_n:
continue
close = df['close']
ma_short = close.rolling(MA_WIN_1).mean()#短时均线
ma_long = close.rolling(MA_WIN_2).mean()#长时均线
uc_flags = (ma_short.shift(1) <= ma_long.shift(1)) & (ma_short > ma_long)#上穿标志
dc_flags = (ma_short.shift(1) >= ma_long.shift(1)) & (ma_short < ma_long)#下穿标志
if uc_flags.iloc[-1]:
g.up_cross_signaled.add(code)
if dc_flags.iloc[-1]:
g.down_cross_signaled.add(code)
print(g.up_cross_signaled)
## 开盘时运行函数
def market_open(context):
log.info('函数运行时间(market_open):'+str(context.current_dt.time()))
p = context.portfolio#资金账户
current_data = get_current_data()
#每只股票分配的资金
each_cash = g.init_cash/len(g.stock_pool)
#卖出均线、死叉信号的持仓股
for code ,pos in p.positions.items():
if code in g.down_cross_signaled:
order_target(code,0)
g.last_entry_prices[code]= None#更完美的实现还要判断成交状态,后面同理
#买入均线金叉信号的持仓股
for code in g.up_cross_signaled:
if code not in p.positions:
if current_data[code].paused:
continue
open_price = current_data[code].day_open
num_to_buy = each_cash / open_price // 100 *100
order(code,num_to_buy)
g.last_entry_prices[code] = open_price
#检查有无符合加仓条件的持仓股
for code,pos in p.positions.items():
if current_data[code].paused:
continue
if pos.today_amount==0 and pos.closeable_amount > 0:
open_price = current_data[code].day_open
last_entry = g.last_entry_prices[code]
if (open_price - last_entry) / last_entry >= INC_POS_PF_RATE:
order_value(code, each_cash)
g.last_entry_prices[code] = open_price
## 收盘后运行函数
def after_market_close(context):
log.info(str('函数运行时间(after_market_close):'+str(context.current_dt.time())))
p = context.portfolio
pos_level = p.positions_value / p.total_value
record(pos_level=pos_level)
trades = get_trades()
for _trade in trades.values():
log.info('成交记录:'+str(_trade))
log.info('一天结束')
log.info('##############################################################')
回测结果:
3、按波动率分配资金
- 计算一段时间内的ATR作为波动率
- 每只股票允许承担一定百分比的风险,折算到具体的风险金额
- 以该风险金额和波动率的值来确定建仓个股的资金头寸
波动率越大,分配的头寸越小
波动率越小,分配的头寸越大
代码实现:
# 导入函数库
from jqdata import *
import pandas as pd
# 均线
MA_WIN_1 = 10
MA_WIN_2 = 30
# #加仓判断阈值
# INC_POS_PF_RATE = 0.05
#ATR计算窗口大小
ATR_WIN_SIZE= 20
#总风险因子
RISK_RATIO=0.001
#个股分析因子
STOCK_RISK_RATIO =1
# 初始化函数,设定基准等等
def initialize(context):
# 设定沪深300作为基准
set_benchmark('000300.XSHG')
# 开启动态复权模式(真实价格)
set_option('use_real_price', True)
# 输出内容到日志 log.info()
log.info('初始函数开始运行且全局只运行一次')
# 过滤掉order系列API产生的比error级别低的log
# log.set_level('order', 'error')
### 股票相关设定 ###
# 股票类每笔交易时的手续费是:买入时佣金万分之三,卖出时佣金万分之三加千分之一印花税, 每笔交易佣金最低扣5块钱
set_order_cost(OrderCost(close_tax=0.001, open_commission=0.0003, close_commission=0.0003, min_commission=5), type='stock')
## 运行函数(reference_security为运行时间的参考标的;传入的标的只做种类区分,因此传入'000300.XSHG'或'510300.XSHG'是一样的)
# 开盘前运行
run_daily(before_market_open, time='before_open', reference_security='000300.XSHG')
# 每分钟运行
run_daily(market_open, time='every_bar', reference_security='000300.XSHG')
# 收盘后运行
run_daily(after_market_close, time='after_close', reference_security='000300.XSHG')
#股票池-上证50
g.stock_pool = get_index_stocks('000016.XSHG',date=context.current_dt)
g.init_cash = context.portfolio.starting_cash#启动资金
g.last_entry_prices = {code:None for code in g.stock_pool}
## 开盘前运行函数
def before_market_open(context):
look_ahead_n = max(MA_WIN_1,MA_WIN_2)+1
g.up_cross_signaled = set()
g.down_cross_signaled = set()
for code in g.stock_pool:
df = attribute_history(code,look_ahead_n,'1d',fields=['close'],skip_paused=True)
if len(df) != look_ahead_n:
continue
close = df['close']
ma_short = close.rolling(MA_WIN_1).mean()#短时均线
ma_long = close.rolling(MA_WIN_2).mean()#长时均线
uc_flags = (ma_short.shift(1) <= ma_long.shift(1)) & (ma_short > ma_long)#上穿标志
dc_flags = (ma_short.shift(1) >= ma_long.shift(1)) & (ma_short < ma_long)#下穿标志
if uc_flags.iloc[-1]:
g.up_cross_signaled.add(code)
if dc_flags.iloc[-1]:
g.down_cross_signaled.add(code)
print(g.up_cross_signaled)
## 开盘时运行函数
def market_open(context):
log.info('函数运行时间(market_open):'+str(context.current_dt.time()))
p = context.portfolio#资金账户
current_data = get_current_data()
#每只股票分配的资金
each_cash = g.init_cash/len(g.stock_pool)
#卖出均线、死叉信号的持仓股
for code ,pos in p.positions.items():
if code in g.down_cross_signaled:
order_target(code,0)
g.last_entry_prices[code]= None#更完美的实现还要判断成交状态,后面同理
#买入均线金叉信号的持仓股
for code in g.up_cross_signaled:
if code not in p.positions:
if current_data[code].paused:
continue
df = attribute_history(code, ATR_WIN_SIZE+1, "1d", ["high", "low", "close"], skip_paused=True)
if len(df) != ATR_WIN_SIZE+1:
continue
df["pdc"] = df["close"].shift(1)
tr = df.apply(lambda x: max(x["high"]-x["low"], abs(x["high"]-x["pdc"]), abs(x["pdc"]-x["low"])), axis=1)
atr = tr[-ATR_WIN_SIZE:].mean()
num_to_buy = (g.init_cash * RISK_RATIO) / (atr * STOCK_RISK_RATIO)
order(code,num_to_buy)
## 收盘后运行函数
def after_market_close(context):
log.info(str('函数运行时间(after_market_close):'+str(context.current_dt.time())))
p = context.portfolio
pos_level = p.positions_value / p.total_value
record(pos_level=pos_level)
trades = get_trades()
for _trade in trades.values():
log.info('成交记录:'+str(_trade))
log.info('一天结束')
log.info('##############################################################')
回测结果:
4 回撤止盈(浮动止损)
算法描述:
- 建仓后,时刻记录股价走势的最高点
- 计算最新价相对于前期高点下跌幅度的百分比
- 如果下跌百分比大于某个阈值,则止盈退出
基于按波动率分配资金的方案进行改进实验
- 目标:减小回撤
代码实现:
# 导入函数库
from jqdata import *
import pandas as pd
# 回撤的幅度
MAX_DROP_RATE = 0.03
# 初始化函数,设定基准等等
def initialize(context):
# 设定沪深300作为基准
set_benchmark('000300.XSHG')
# 开启动态复权模式(真实价格)
set_option('use_real_price', True)
# 输出内容到日志 log.info()
log.info('初始函数开始运行且全局只运行一次')
# 过滤掉order系列API产生的比error级别低的log
# log.set_level('order', 'error')
### 股票相关设定 ###
# 股票类每笔交易时的手续费是:买入时佣金万分之三,卖出时佣金万分之三加千分之一印花税, 每笔交易佣金最低扣5块钱
set_order_cost(OrderCost(close_tax=0.001, open_commission=0.0003, close_commission=0.0003, min_commission=5), type='stock')
## 运行函数(reference_security为运行时间的参考标的;传入的标的只做种类区分,因此传入'000300.XSHG'或'510300.XSHG'是一样的)
# 开盘前运行
run_daily(before_market_open, time='before_open', reference_security='000300.XSHG')
# 每分钟运行
run_daily(market_open, time='every_bar', reference_security='000300.XSHG')
# 收盘后运行
run_daily(after_market_close, time='after_close', reference_security='000300.XSHG')
#股票池-上证50
g.stock_pool = get_index_stocks('000016.XSHG',date=context.current_dt)
g.init_cash = context.portfolio.starting_cash#启动资金
g.entry_dates = {code:None for code in g.stock_pool}
## 开盘前运行函数
def before_market_open(context):
look_ahead_n = max(MA_WIN_1,MA_WIN_2)+1
g.up_cross_signaled = set()
g.down_cross_signaled = set()
for code in g.stock_pool:
df = attribute_history(code,look_ahead_n,'1d',fields=['close'],skip_paused=True)
if len(df) != look_ahead_n:
continue
close = df['close']
ma_short = close.rolling(MA_WIN_1).mean()#短时均线
ma_long = close.rolling(MA_WIN_2).mean()#长时均线
uc_flags = (ma_short.shift(1) <= ma_long.shift(1)) & (ma_short > ma_long)#上穿标志
dc_flags = (ma_short.shift(1) >= ma_long.shift(1)) & (ma_short < ma_long)#下穿标志
if uc_flags.iloc[-1]:
g.up_cross_signaled.add(code)
if dc_flags.iloc[-1]:
g.down_cross_signaled.add(code)
## 开盘时运行函数
def market_open(context):
log.info('函数运行时间(market_open):'+str(context.current_dt.time()))
cur_dt = context.current_dt.date() # 当前日期
p = context.portfolio#资金账户
current_data = get_current_data()
#每只股票分配的资金
each_cash = g.init_cash/len(g.stock_pool)
#卖出均线、死叉信号的持仓股
for code ,pos in p.positions.items():
if code in g.down_cross_signaled:
order_target(code,0)
# 买入均线金叉信号的持仓股
for code in g.up_cross_signaled:
if code not in p.positions:
if current_data[code].paused:
continue
df = attribute_history(code, ATR_WIN_SIZE+1, "1d", ["high", "low", "close"], skip_paused=True)
if len(df) != ATR_WIN_SIZE+1:
continue
df["pdc"] = df["close"].shift(1)
tr = df.apply(lambda x: max(x["high"]-x["low"], abs(x["high"]-x["pdc"]), abs(x["pdc"]-x["low"])), axis=1)
atr = tr[-ATR_WIN_SIZE:].mean()
num_to_buy = g.init_cash * RISK_RATIO / atr // 100 * 100
order(code, num_to_buy)
g.entry_dates[code] = cur_dt
# 检查有无符合回撤止盈条件的持仓股
for code, pos in p.positions.items():
if current_data[code].paused:
continue
if pos.today_amount == 0 and pos.closeable_amount > 0:
last_entry_date = g.entry_dates[code]
prev_date = context.current_dt - timedelta(days=1)
# 计算截止到前一天的HHV,避免未来函数,并且考虑到前复权问题,每个交易日都是重新取数据计算
df = get_price(code, start_date=last_entry_date, end_date=prev_date, frequency="1d", fields=["high"], skip_paused=True)
hhv = df["high"].max()
drop_rate = (hhv - current_data[code].day_open) / hhv # 以当日开盘价计算,或者,也可以用前一交易日的收盘价计算
if drop_rate > MAX_DROP_RATE:
order_target(code, 0)
## 收盘后运行函数
def after_market_close(context):
log.info(str('函数运行时间(after_market_close):'+str(context.current_dt.time())))
p = context.portfolio
pos_level = p.positions_value / p.total_value
record(pos_level=pos_level)
trades = get_trades()
for _trade in trades.values():
log.info('成交记录:'+str(_trade))
log.info('一天结束')
log.info('##############################################################')
回测结果