zoukankan      html  css  js  c++  java
  • 仓位管理与风险控制

    完整的主要包含以下几点:
    1、股票池或是期货池,股票池的作用就是一个相对比较优秀的交易标的集合,或是删除比较差的交易交易集合而留下啦的集合
    2、择时,何时进何时出,如均线(金叉死叉)、MACD等技术指标
    3、仓位管理和风险控制,当择时信号发生,发出买入信号,需要买多少,买多了风险大,买少了收益太少,这就需要进行仓位管理;风险控制一般指止盈止损,尤其是止损

    四种止盈方法

    • 目标盈利法
    • 回撤止盈法
    • 大福波动止盈
    • 技术信号止盈

    四种止损方法

    • 最大亏损发
    • 波动率倍数止损
    • 波动率标准差止损
    • 技术信号止损

    风险参考指标--波动率

    • 标准差
    • 平均真实波幅ATR

    ATR:近N天真实波幅TR的算术平均值
    TR :以下指标中的最大值

    今日最高价减去今日最低价
    今日最高价减去昨日收盘价的绝对值
    今日最低价减去昨日收盘价的绝对值

    举例:ATR的计算

    [Day1(高低开收)=(16.3,15.93,15.97,16.23) ]

    [Day2(高低开收)=(16.66,16.13,16.17,16.47) ]

    [Day2 ; TR = Max(0.53,0.43,0.1) = 0.53 ]

    [Day2 ; ATR = 观察窗口内每天TR的平均值 ]

    目标盈利法和最大亏损法

    大幅波动止盈、波动率倍数止损


    回撤止盈(浮动/跟踪止盈)

    技术信号止盈/止损

    以双均线交叉信号为例:

    交易系统中与盈利相关的要素

    仓位管理

    不同阶段:建仓、加仓/减仓、清仓
    资金分配:头寸规模的确定核分配

    确定头寸规模的四种模型

    • 每一固定金额交易一个单位
    • 等价值交易单位
    • 百分比风险模型
    • 百分比波动幅度模型
    模型1:每一固定金额交易一个单位

    资金池:

    把资金等分为相同金额的若干份,再出现买入信号的情况下,每份只允许交易一个单位的投资标的(比如1手股票,或是1份期货合约)
    举例:
    总资金:一百万
    平均分为10份,每份金额:10万
    交易实例一:

    (待买入标的:工商银行qquad 价格:5元 qquad 一个交易单位=100股)
    (买入量 = 100股 qquad 买入金额=5 * 100 = 500元)

    交易实例二:

    (待买入标的:贵州茅台 qquad 价格:750元 qquad 一个交易单位=100股)
    (买入量 = 100股 qquad 买入金额=750 * 100 = 7.5万元)

    模型2:等价值交易单位

    资金池

    把资金等分为相同金额的若干份,在出现买入信号的情况下,按该金额计算出每份允许交易的次投资标的单位个数(如M手股票,或N份期货合约)
    举例:
    总资金:一百万
    平均分为10份,每份金额:10万
    交易实例一:

    (待买入标的:工商银行qquad 价格:5元)
    (买入量 = 100000/5=20000股 qquad 买入金额=10万元)

    交易实例二:

    $待买入标的:贵州茅台 qquad 价格:750元 ( )买入量 = 100000/750= 100股 qquad 买入金额=7.5万元$

    模型3:百分比风险模型

    资金池:

    根据每次交易允许承担的最大风险占总资金的比例,以及每个投资标的可接受的最大损失(即初始止损额度R),折算出可建立头寸的单位个数
    CPR计算公式:

    [P(头寸规模) = C(现金) / R(每股风险) ]

    举例:
    总资金:一百万
    总风险:1%
    单个交易标的风险:5%
    交易实例:

    (待买入标的:海康威视qquad 价格:40元)
    (买入量 = (100000*1%)/(40 * 5%)=5000股 qquad 买入金额=40* 5000 = 20万元)

    百分比波动幅度模型

    资金池

    根据每次交易允许承担的最大风险占总资金的比例,以及每个投资标的在一段时间内的价格波动幅度(即有可能有利或不利的价格变动范围V),折算出可建立头寸的单位个数
    举例:
    总资金:一百万
    总风险:1%
    单个交易标的风险:3倍ATR
    交易实例:

    (待买入标的:海康威视qquad 价格:40元 ATR=2%)
    (买入量 = (100000*1%)/(40 * 2% * 3)=4100股 qquad 买入金额=40* 4100 = 16.4万元)

    常见的加仓方法

    交易系统的核心要素

    • 市场:买卖什么
    • 头寸规模:买卖多少
    • 入市:什么时候买进
    • 止损:什么时候放弃一个亏损的头寸
    • 退出:什么时候退出一个盈利的头寸
    • 战术:怎么买卖

    交易系统的实现

    股票池候选:上证50成分股,不调仓
    双均线策略:

    • 日K级别:MA10 vs MA30
    • 开盘前检查信号,金叉买,死叉卖
      头寸规模确定
      *均仓方案(等额资金分配)

    借助聚宽平台,对该模型进行编码回测:
    代码如下:

    # 导入函数库
    from jqdata import *
    # 均线
    MA_WIN_1 = 10
    MA_WIN_2 = 30
    
    # 初始化函数,设定基准等等
    def initialize(context):
        set_benchmark('000300.XSHG')
        set_option('use_real_price', True)
        # log.set_level('order', 'error')
        
        # 股票类每笔交易时的手续费是:买入时佣金万分之三,卖出时佣金万分之三加千分之一印花税, 每笔交易佣金最低扣5块钱
        set_order_cost(OrderCost(close_tax=0.001, open_commission=0.0003, close_commission=0.0003, min_commission=5), type='stock')
    
        # 定时运行函数
        run_daily(before_market_open, time='before_open', reference_security='000300.XSHG') 
        run_daily(market_open, time='every_bar', reference_security='000300.XSHG')
        run_daily(after_market_close, time='after_close', reference_security='000300.XSHG')
        # 股票池 - 上证50
        g.stock_pool = get_index_stocks("000016.XSHG", date=context.current_dt)
        g.init_cash = context.portfolio.starting_cash  # 启动资金
    
    # 开盘前运行函数
    def before_market_open(context):
        look_ahead_n = max(MA_WIN_1, MA_WIN_2) + 1
        g.up_cross_signaled = set()
        g.down_cross_signaled = set()
        for code in g.stock_pool:
            df = attribute_history(code, look_ahead_n, "1d", ["close"], skip_paused=True)  # 该函数返回结果不包括当天数据
            if len(df) != look_ahead_n:
                continue
            close = df["close"]
            ma_short = close.rolling(MA_WIN_1).mean()  # 短时均线
            ma_long = close.rolling(MA_WIN_2).mean()  # 长时均线
            uc_flags = (ma_short.shift(1) <= ma_long.shift(1)) & (ma_short > ma_long)  # 上穿标志
            dc_flags = (ma_short.shift(1) >= ma_long.shift(1)) & (ma_short < ma_long)  # 下穿标志
            if uc_flags.iloc[-1]:
                g.up_cross_signaled.add(code)
            if dc_flags.iloc[-1]:
                g.down_cross_signaled.add(code)
    
    # 开盘时运行函数
    def market_open(context):
        cur_dt = context.current_dt.date()  # 当前日期
        p = context.portfolio  # 资金账户
        current_data = get_current_data()
    
        each_cash = g.init_cash / len(g.stock_pool)  # 每只股票分配的资金
        
        # 卖出均线死叉信号的持仓股
        for code, pos in p.positions.items():
            if code in g.down_cross_signaled:
                order_target(code, 0)
    
        # 买入均线金叉信号的持仓股
        for code in g.up_cross_signaled:
            if code not in p.positions:
                if current_data[code].paused:
                    continue
                open_price = current_data[code].day_open
                num_to_buy = each_cash / open_price // 100 * 100
                order(code, num_to_buy)
    
    # 收盘后运行函数  
    def after_market_close(context):
        p = context.portfolio
        pos_level = p.positions_value / p.total_value
        record(pos_level=pos_level)
    
    

    回测结果:

    2、按盈利比例均匀加仓

    • 记录每只持仓股最后一次的买入价
    • 如以当日开盘价相对于前一次买入价的盈利比例超过某个阈值,则等额加一次仓
      注:前提是当日没有任何止损或其他买入信号的发生

    代码实现:

    # 导入函数库
    from jqdata import *
    import pandas as pd
    # 均线
    MA_WIN_1 = 10
    MA_WIN_2 = 30
    
    # #加仓判断阈值
    INC_POS_PF_RATE = 0.05
    
    
    # 初始化函数,设定基准等等
    def initialize(context):
        # 设定沪深300作为基准
        set_benchmark('000300.XSHG')
        # 开启动态复权模式(真实价格)
        set_option('use_real_price', True)
        # 输出内容到日志 log.info()
        log.info('初始函数开始运行且全局只运行一次')
        # 过滤掉order系列API产生的比error级别低的log
        # log.set_level('order', 'error')
    
        ### 股票相关设定 ###
        # 股票类每笔交易时的手续费是:买入时佣金万分之三,卖出时佣金万分之三加千分之一印花税, 每笔交易佣金最低扣5块钱
        set_order_cost(OrderCost(close_tax=0.001, open_commission=0.0003, close_commission=0.0003, min_commission=5), type='stock')
    
        ## 运行函数(reference_security为运行时间的参考标的;传入的标的只做种类区分,因此传入'000300.XSHG'或'510300.XSHG'是一样的)
          # 开盘前运行
        run_daily(before_market_open, time='before_open', reference_security='000300.XSHG')
          # 每分钟运行
        run_daily(market_open, time='every_bar', reference_security='000300.XSHG')
          # 收盘后运行
        run_daily(after_market_close, time='after_close', reference_security='000300.XSHG')
        #股票池-上证50
        g.stock_pool = get_index_stocks('000016.XSHG',date=context.current_dt)
        g.init_cash = context.portfolio.starting_cash#启动资金
        g.last_entry_prices = {code:None for code in g.stock_pool}
        
    ## 开盘前运行函数
    def before_market_open(context):
        look_ahead_n = max(MA_WIN_1,MA_WIN_2)+1
        g.up_cross_signaled = set()
        g.down_cross_signaled = set()
        for code in g.stock_pool:
            df = attribute_history(code,look_ahead_n,'1d',fields=['close'],skip_paused=True)
            if len(df) != look_ahead_n:
                continue
            close = df['close']
            ma_short = close.rolling(MA_WIN_1).mean()#短时均线
            ma_long = close.rolling(MA_WIN_2).mean()#长时均线
            uc_flags = (ma_short.shift(1) <= ma_long.shift(1)) & (ma_short > ma_long)#上穿标志
            dc_flags = (ma_short.shift(1) >= ma_long.shift(1)) & (ma_short < ma_long)#下穿标志
            if uc_flags.iloc[-1]:
                g.up_cross_signaled.add(code)
            if dc_flags.iloc[-1]:
                g.down_cross_signaled.add(code)
            print(g.up_cross_signaled)  
    
    ## 开盘时运行函数
    def market_open(context):
        log.info('函数运行时间(market_open):'+str(context.current_dt.time()))
        p = context.portfolio#资金账户
        current_data = get_current_data()
        
        #每只股票分配的资金
        each_cash = g.init_cash/len(g.stock_pool)
        #卖出均线、死叉信号的持仓股
        for code ,pos in p.positions.items():
            if code in g.down_cross_signaled:
                order_target(code,0)
                g.last_entry_prices[code]= None#更完美的实现还要判断成交状态,后面同理
                
        #买入均线金叉信号的持仓股
        for code in g.up_cross_signaled:
            if code not in p.positions:
                if current_data[code].paused:
                    continue
                open_price = current_data[code].day_open
                num_to_buy = each_cash / open_price // 100 *100
                order(code,num_to_buy)
                g.last_entry_prices[code] = open_price
        #检查有无符合加仓条件的持仓股
        for code,pos  in p.positions.items():
            if current_data[code].paused:
                continue
            if pos.today_amount==0 and pos.closeable_amount > 0:
                open_price = current_data[code].day_open
                last_entry = g.last_entry_prices[code]
                if (open_price - last_entry) / last_entry >= INC_POS_PF_RATE:
                    order_value(code, each_cash)
                    g.last_entry_prices[code] = open_price
    
    ## 收盘后运行函数
    def after_market_close(context):
        log.info(str('函数运行时间(after_market_close):'+str(context.current_dt.time())))
        p = context.portfolio
        pos_level = p.positions_value / p.total_value
        record(pos_level=pos_level)
        trades = get_trades()
        for _trade in trades.values():
            log.info('成交记录:'+str(_trade))
        log.info('一天结束')
        log.info('##############################################################')
    
    

    回测结果:

    3、按波动率分配资金

    • 计算一段时间内的ATR作为波动率
    • 每只股票允许承担一定百分比的风险,折算到具体的风险金额
    • 以该风险金额和波动率的值来确定建仓个股的资金头寸

    波动率越大,分配的头寸越小
    波动率越小,分配的头寸越大

    代码实现:

    # 导入函数库
    from jqdata import *
    import pandas as pd
    # 均线
    MA_WIN_1 = 10
    MA_WIN_2 = 30
    
    # #加仓判断阈值
    # INC_POS_PF_RATE = 0.05
    #ATR计算窗口大小
    ATR_WIN_SIZE= 20 
    #总风险因子
    RISK_RATIO=0.001
    #个股分析因子
    STOCK_RISK_RATIO =1
    
    # 初始化函数,设定基准等等
    def initialize(context):
        # 设定沪深300作为基准
        set_benchmark('000300.XSHG')
        # 开启动态复权模式(真实价格)
        set_option('use_real_price', True)
        # 输出内容到日志 log.info()
        log.info('初始函数开始运行且全局只运行一次')
        # 过滤掉order系列API产生的比error级别低的log
        # log.set_level('order', 'error')
    
        ### 股票相关设定 ###
        # 股票类每笔交易时的手续费是:买入时佣金万分之三,卖出时佣金万分之三加千分之一印花税, 每笔交易佣金最低扣5块钱
        set_order_cost(OrderCost(close_tax=0.001, open_commission=0.0003, close_commission=0.0003, min_commission=5), type='stock')
    
        ## 运行函数(reference_security为运行时间的参考标的;传入的标的只做种类区分,因此传入'000300.XSHG'或'510300.XSHG'是一样的)
          # 开盘前运行
        run_daily(before_market_open, time='before_open', reference_security='000300.XSHG')
          # 每分钟运行
        run_daily(market_open, time='every_bar', reference_security='000300.XSHG')
          # 收盘后运行
        run_daily(after_market_close, time='after_close', reference_security='000300.XSHG')
        #股票池-上证50
        g.stock_pool = get_index_stocks('000016.XSHG',date=context.current_dt)
        g.init_cash = context.portfolio.starting_cash#启动资金
        g.last_entry_prices = {code:None for code in g.stock_pool}
        
    ## 开盘前运行函数
    def before_market_open(context):
        look_ahead_n = max(MA_WIN_1,MA_WIN_2)+1
        g.up_cross_signaled = set()
        g.down_cross_signaled = set()
        for code in g.stock_pool:
            df = attribute_history(code,look_ahead_n,'1d',fields=['close'],skip_paused=True)
            if len(df) != look_ahead_n:
                continue
            close = df['close']
            ma_short = close.rolling(MA_WIN_1).mean()#短时均线
            ma_long = close.rolling(MA_WIN_2).mean()#长时均线
            uc_flags = (ma_short.shift(1) <= ma_long.shift(1)) & (ma_short > ma_long)#上穿标志
            dc_flags = (ma_short.shift(1) >= ma_long.shift(1)) & (ma_short < ma_long)#下穿标志
            if uc_flags.iloc[-1]:
                g.up_cross_signaled.add(code)
            if dc_flags.iloc[-1]:
                g.down_cross_signaled.add(code)
            print(g.up_cross_signaled)  
    
    ## 开盘时运行函数
    def market_open(context):
        log.info('函数运行时间(market_open):'+str(context.current_dt.time()))
        p = context.portfolio#资金账户
        current_data = get_current_data()
        
        #每只股票分配的资金
        each_cash = g.init_cash/len(g.stock_pool)
        #卖出均线、死叉信号的持仓股
        for code ,pos in p.positions.items():
            if code in g.down_cross_signaled:
                order_target(code,0)
                g.last_entry_prices[code]= None#更完美的实现还要判断成交状态,后面同理
                
        #买入均线金叉信号的持仓股
        for code in g.up_cross_signaled:
            if code not in p.positions:
                if current_data[code].paused:
                    continue
                df = attribute_history(code, ATR_WIN_SIZE+1, "1d", ["high", "low", "close"], skip_paused=True)
                if len(df) != ATR_WIN_SIZE+1:
                    continue
                df["pdc"] = df["close"].shift(1)
                tr = df.apply(lambda x: max(x["high"]-x["low"], abs(x["high"]-x["pdc"]), abs(x["pdc"]-x["low"])), axis=1)
                atr = tr[-ATR_WIN_SIZE:].mean()
                num_to_buy = (g.init_cash * RISK_RATIO) / (atr * STOCK_RISK_RATIO) 
                
                order(code,num_to_buy)
      
    ## 收盘后运行函数
    def after_market_close(context):
        log.info(str('函数运行时间(after_market_close):'+str(context.current_dt.time())))
        p = context.portfolio
        pos_level = p.positions_value / p.total_value
        record(pos_level=pos_level)
        trades = get_trades()
        for _trade in trades.values():
            log.info('成交记录:'+str(_trade))
        log.info('一天结束')
        log.info('##############################################################')
    

    回测结果:

    4 回撤止盈(浮动止损)
    算法描述:

    • 建仓后,时刻记录股价走势的最高点
    • 计算最新价相对于前期高点下跌幅度的百分比
    • 如果下跌百分比大于某个阈值,则止盈退出

    基于按波动率分配资金的方案进行改进实验

    • 目标:减小回撤

    代码实现:

    # 导入函数库
    from jqdata import *
    import pandas as pd
    
    # 回撤的幅度
    MAX_DROP_RATE = 0.03
    # 初始化函数,设定基准等等
    def initialize(context):
        # 设定沪深300作为基准
        set_benchmark('000300.XSHG')
        # 开启动态复权模式(真实价格)
        set_option('use_real_price', True)
        # 输出内容到日志 log.info()
        log.info('初始函数开始运行且全局只运行一次')
        # 过滤掉order系列API产生的比error级别低的log
        # log.set_level('order', 'error')
    
        ### 股票相关设定 ###
        # 股票类每笔交易时的手续费是:买入时佣金万分之三,卖出时佣金万分之三加千分之一印花税, 每笔交易佣金最低扣5块钱
        set_order_cost(OrderCost(close_tax=0.001, open_commission=0.0003, close_commission=0.0003, min_commission=5), type='stock')
    
        ## 运行函数(reference_security为运行时间的参考标的;传入的标的只做种类区分,因此传入'000300.XSHG'或'510300.XSHG'是一样的)
          # 开盘前运行
        run_daily(before_market_open, time='before_open', reference_security='000300.XSHG')
          # 每分钟运行
        run_daily(market_open, time='every_bar', reference_security='000300.XSHG')
          # 收盘后运行
        run_daily(after_market_close, time='after_close', reference_security='000300.XSHG')
        #股票池-上证50
        g.stock_pool = get_index_stocks('000016.XSHG',date=context.current_dt)
        g.init_cash = context.portfolio.starting_cash#启动资金
        g.entry_dates = {code:None for code in g.stock_pool}
        
    ## 开盘前运行函数
    def before_market_open(context):
        look_ahead_n = max(MA_WIN_1,MA_WIN_2)+1
        g.up_cross_signaled = set()
        g.down_cross_signaled = set()
        for code in g.stock_pool:
            df = attribute_history(code,look_ahead_n,'1d',fields=['close'],skip_paused=True)
            if len(df) != look_ahead_n:
                continue
            close = df['close']
            ma_short = close.rolling(MA_WIN_1).mean()#短时均线
            ma_long = close.rolling(MA_WIN_2).mean()#长时均线
            uc_flags = (ma_short.shift(1) <= ma_long.shift(1)) & (ma_short > ma_long)#上穿标志
            dc_flags = (ma_short.shift(1) >= ma_long.shift(1)) & (ma_short < ma_long)#下穿标志
            if uc_flags.iloc[-1]:
                g.up_cross_signaled.add(code)
            if dc_flags.iloc[-1]:
                g.down_cross_signaled.add(code)
    
    ## 开盘时运行函数
    def market_open(context):
        log.info('函数运行时间(market_open):'+str(context.current_dt.time()))
        cur_dt = context.current_dt.date()  # 当前日期
        p = context.portfolio#资金账户
        current_data = get_current_data()
        
        #每只股票分配的资金
        each_cash = g.init_cash/len(g.stock_pool)
        #卖出均线、死叉信号的持仓股
        for code ,pos in p.positions.items():
            if code in g.down_cross_signaled:
                order_target(code,0)
                
        # 买入均线金叉信号的持仓股
        for code in g.up_cross_signaled:
            if code not in p.positions:
                if current_data[code].paused:
                    continue
                df = attribute_history(code, ATR_WIN_SIZE+1, "1d", ["high", "low", "close"], skip_paused=True)
                if len(df) != ATR_WIN_SIZE+1:
                    continue
                df["pdc"] = df["close"].shift(1)
                tr = df.apply(lambda x: max(x["high"]-x["low"], abs(x["high"]-x["pdc"]), abs(x["pdc"]-x["low"])), axis=1)
                atr = tr[-ATR_WIN_SIZE:].mean()
                num_to_buy = g.init_cash * RISK_RATIO / atr // 100 * 100
                order(code, num_to_buy)
                
                g.entry_dates[code] = cur_dt
    
        # 检查有无符合回撤止盈条件的持仓股
        for code, pos in p.positions.items():
            if current_data[code].paused:
                continue
    
            if pos.today_amount == 0 and pos.closeable_amount > 0:
                last_entry_date = g.entry_dates[code]
                prev_date = context.current_dt - timedelta(days=1)
    
                # 计算截止到前一天的HHV,避免未来函数,并且考虑到前复权问题,每个交易日都是重新取数据计算
                df = get_price(code, start_date=last_entry_date, end_date=prev_date, frequency="1d", fields=["high"], skip_paused=True)
                hhv = df["high"].max()
    
                drop_rate = (hhv - current_data[code].day_open) / hhv  # 以当日开盘价计算,或者,也可以用前一交易日的收盘价计算
                if drop_rate > MAX_DROP_RATE:
                    order_target(code, 0)
      
    ## 收盘后运行函数
    def after_market_close(context):
        log.info(str('函数运行时间(after_market_close):'+str(context.current_dt.time())))
        p = context.portfolio
        pos_level = p.positions_value / p.total_value
        record(pos_level=pos_level)
        trades = get_trades()
        for _trade in trades.values():
            log.info('成交记录:'+str(_trade))
        log.info('一天结束')
        log.info('##############################################################')
    
    

    回测结果

  • 相关阅读:
    VUE ElementUI Tree JAVA Mybatis实现 麦克斯
    VUE 创建工程 项目 麦克斯
    Go——关于Time包
    etcd——是什么做什么如何用
    php——composer安装与使用
    TinyXml——Linux下TinyXml的编译
    Mac下eclipse安装 lombok 插件
    gitlab——搭建私有gitlab服务
    apachehttpd——Linux/Mac源码安装apachehttpd
    mongo——通过docker查看mongo集群的状态和数据
  • 原文地址:https://www.cnblogs.com/whiteBear/p/13176219.html
Copyright © 2011-2022 走看看