zoukankan      html  css  js  c++  java
  • 金融量化分析策略展示

    在聚宽网实现回测:https://www.joinquant.com/algorithm/index/edit?algorithmId=a5d6decffd9805f8f7896e431ca619c4

    还有米筐、优矿来进行回测。

    一、双均线策略

    import jqdata
    
    p1 = 5
    p2 = 60
    
    
    def initialize(context):
        set_benchmark('000300.XSHG')
        set_option('use_real_price', True)
        set_order_cost(OrderCost(close_tax=0.001, open_commission=0.0003, close_commission=0.0003, min_commission=5), type='stock')
        g.security = ['601318.XSHG']
        
    def handle_data(context, data):
        for stock in g.security:
            hist = attribute_history(stock, count=p2)
            ma5 = hist['close'][-5:].mean()
            ma60 = hist['close'].mean()
            
            if ma5 > ma60 and stock not in context.portfolio.positions:
                order = order_value(stock, context.portfolio.available_cash)
                if order:
                    print('Buying: %dat Price: %.2f' % (order.amount, order.price))
            elif ma5 < ma60 and stock in context.portfolio.positions:
                order = order_target_value(stock, 0)
                if order:
                    print('Selling: %d at Price: %.2f' % (order.amount, order.price))
        record(ma5=ma5, ma60=ma60)
    View Code

    二、因子选股

    # 导入函数库
    import jqdata
    
    # 初始化函数,设定基准等等
    def initialize(context):
        set_benchmark('000300.XSHG')
        set_option('use_real_price', True)
        set_order_cost(OrderCost(close_tax=0.001, open_commission=0.0003, close_commission=0.0003, min_commission=5), type='stock')
        g.security = get_index_stocks('000300.XSHG')
        g.q = query(valuation).filter(valuation.code.in_(g.security))
        g.N = 10
        run_monthly(handle, 1)
    
    def handle(context):
        df = get_fundamentals(g.q)
        df = df.sort(columns='market_cap')
        df = df[:g.N]
        tohold = df['code'].values
        
        for stock in context.portfolio.positions:
            if stock not in tohold:
                #卖出
                order_target(stock, 0)
                
        tobuy = [stock for stock in tohold if stock not in context.portfolio.positions]
        
        cash = context.portfolio.available_cash
        n = len(tobuy)
        #买入
        for stock in tobuy:
            order_value(stock, int(cash/n))
    View Code

    三、均值回归

    import jqdata
    import math
    import numpy as np
    import pandas as pd
    
    def initialize(context):
        set_option('use_real_price', True)
        set_order_cost(OrderCost(close_tax=0.001, open_commission=0.0003, close_commission=0.0003, min_commission=5), type='stock')
        
        g.benchmark = '000300.XSHG'
        set_benchmark(g.benchmark)
        
        g.ma_days = 30
        g.stock_num = 10
        
        run_monthly(handle, 1)
        #run_monthly(handle, 11)
        
    def handle(context):
        tohold = get_hold_list(context)
        for stock in context.portfolio.positions:
            if stock not in tohold:
                order_target_value(stock, 0)
        
        tobuy = [stock for stock in tohold if stock not in context.portfolio.positions]
        
        if len(tobuy)>0:
            cash = context.portfolio.available_cash
            cash_every_stock = cash / len(tobuy)
            
            for stock in tobuy:
                order_value(stock, cash_every_stock)
    
    def get_hold_list(context):
        stock_pool = get_index_stocks(g.benchmark)
        stock_score = pd.Series(index=stock_pool)
        for stock in stock_pool:
            df = attribute_history(stock, g.ma_days, '1d', ['close'])
            ma = df.mean()[0]
            current_price = get_current_data()[stock].day_open
            ratio = (ma - current_price) / ma
            stock_score[stock] = ratio
        return stock_score.nlargest(g.stock_num).index.values
    View Code

    四、布林带策略

    # 导入函数库
    import jqdata
    
    def initialize(context):
        set_option('use_real_price', True)
        set_order_cost(OrderCost(close_tax=0.001, open_commission=0.0003, close_commission=0.0003, min_commission=5), type='stock')
        set_benchmark('000300.XSHG')
        
        g.security = ['600036.XSHG']
        
        g.N = 2
        g.ma_days = 20
        
    def handle_data(context, data):
        for stock in g.security:
            df = attribute_history(stock, g.ma_days)
            middle = df['close'].mean()
            upper = middle + g.N * df['close'].std()
            lower = middle - g.N * df['close'].std()
            
            p = get_current_data()[stock].day_open
            # 如果价格突破阻力线
            if p >= upper and stock in context.portfolio.positions:
                order_target(stock, 0)
        
        cash = context.portfolio.available_cash / len(g.security)
        
        for stock in g.security:
            df = attribute_history(stock, g.ma_days)
            middle = df['close'].mean()
            upper = middle + g.N * df['close'].std()
            lower = middle - g.N * df['close'].std()
            
            p = get_current_data()[stock].day_open
            # 如果价格跌破支撑线
            
            if p <= lower and stock not in context.portfolio.positions:
                order_target(stock, cash)
    View Code

    五、PEG策略

    def initialize(context):
        set_option('use_real_price', True)
        set_order_cost(OrderCost(close_tax=0.001, open_commission=0.0003, close_commission=0.0003, min_commission=5), type='stock')
        set_benchmark('000300.XSHG')
        
        g.security = get_index_stocks('000300.XSHG')
        g.N = 20
        run_monthly(handle, 1)
        
    def handle(context):
        df = get_fundamentals(query(valuation.code, valuation.pe_ratio, indicator.inc_net_profit_year_on_year).filter(valuation.code.in_(g.security)))
        df = df[(df['pe_ratio']>0) & (df['inc_net_profit_year_on_year']>0)]
        df['PEG'] = df['pe_ratio'] / df['inc_net_profit_year_on_year'] / 100
        df = df.sort(columns='PEG')[:g.N]
        tohold = df['code'].values
        
        for stock in context.portfolio.positions:
            if stock not in tohold:
                order_target_value(stock, 0)
        
        tobuy = [stock for stock in tohold if stock not in context.portfolio.positions]
        
        if len(tobuy)>0:
            print('Buying')
            cash = context.portfolio.available_cash
            cash_every_stock = cash / len(tobuy)
            
            for stock in tobuy:
                order_value(stock, cash_every_stock)
    View Code

    六、羊驼交易法则-表现最优入池

    def initialize(context):
        
        set_option('use_real_price', True)
        set_order_cost(OrderCost(close_tax=0.001, open_commission=0.0003, close_commission=0.0003, min_commission=5), type='stock')
        
        set_benchmark('000300.XSHG')
        
        g.security = get_index_stocks('000300.XSHG')
        g.period = 3
        g.N = 10
        g.M = 2
        print("init")
        run_monthly(handle, 10)
        
        stocks = get_sorted_stocks(context, g.security)[:g.N]
        cash = context.portfolio.available_cash / len(stocks)
        for stock in stocks:
            order_value(stock, cash)
        
        
    def get_sorted_stocks(context, stocks):
        df = history(g.period, field='close', security_list=stocks).T
        df['ret'] = (df.iloc[:,-1] - df.iloc[:,0]) / df.iloc[:,0]
        df = df.sort(columns='ret', ascending=False)
        return df.index.values
        
    def handle(context):
        stocks = get_sorted_stocks(context, context.portfolio.positions.keys())
        
        for stock in stocks[-g.M:]:
            order_target(stock, 0)
            
        stocks = get_sorted_stocks(context, g.security)
        
        cash = context.portfolio.available_cash / g.M
        
        for stock in stocks:
            if len(context.portfolio.positions) >= g.N:
                break
            if stock not in context.portfolio.positions:
                order_target(stock, cash)
    View Code

    七、海龟交易法则

    可参考:https://www.joinquant.com/post/1401?f=study&m=algorithm

    import jqdata
    import math
    import numpy as np
    import pandas as pd
    from collections import deque
    
    def initialize(context):
        
        set_option('use_real_price', True)
        set_order_cost(OrderCost(close_tax=0.001, open_commission=0.0003, close_commission=0.0003, min_commission=5), type='stock')
    
        g.security = '000060.XSHE'
        set_benchmark(g.security)
        g.in_day = 20
        g.out_day = 10
        g.today_units = 0
        g.current_units = 0
        g.N=deque(maxlen=19)
        g.current_N = 0
        g.last_buy_price = 0
        
        price = attribute_history(g.security, g.N.maxlen*2+1, '1d', ('high', 'low', 'close'))
        
        for i in range(g.N.maxlen+1, g.N.maxlen*2+1):
            li = []
            for j in range(i-19,i+1):
                a = price['high'][j]-price['low'][j]
                b = abs(price['high'][j]-price['close'][j-1])
                c = abs(price['low'][j]-price['close'][j-1])
                li.append(max(a,b,c))
            current_N = np.array(li).mean()
            g.N.append(current_N)
            
        
    def before_trading_start(context):
        g.current_N = cal_N()
        g.today_units = 0
    
        
    def handle_data(context, data):
        dt = context.current_dt
        current_price = data[g.security].price #上一分钟价格
        value = context.portfolio.total_value
        cash = context.portfolio.available_cash
        
        unit = math.floor(value * 0.01 / g.current_N)
        
            
        if g.current_units == 0:
            buy(current_price, cash, unit)
        else:
            if stop_loss(current_price):
                return
            if sell(current_price):
                return
            addin(current_price, cash, unit)
        
    def cal_N():
        # if len(g.N) < g.N.maxlen:
        #     price = attribute_history(g.security, g.N.maxlen+2, '1d', ('high', 'low', 'close'))
        #     li = []
        #     for i in range(1, g.N.maxlen+2):
        #         a = price['high'][i]-price['low'][i]
        #         b = abs(price['high'][i]-price['close'][i-1])
        #         c = abs(price['low'][i]-price['close'][i-1])
        #         li.append(max(a,b,c))
        #     current_N = np.array(li).mean()
        # else:
        price = attribute_history(g.security, 2, '1d', ('high', 'low', 'close'))
        a = price['high'][1]-price['low'][1]
        b = abs(price['high'][1]-price['close'][0])
        c = abs(price['low'][1]-price['close'][0])
        current_N = (max(a,b,c) + np.array(g.N).sum())/(g.N.maxlen+1)
        g.N.append(current_N)
        return current_N
        
    def buy(current_price, cash, unit):
        price = attribute_history(g.security, g.in_day, '1d', ('close',))
        if current_price > max(price['close']):
            shares = cash / current_price
            if shares >= unit:
                print("buying %d" % unit)
                o = order(g.security, unit)
                g.last_buy_price = o.price
                g.current_units += 1
                g.today_units += 1
                return True
        return False
                
    
    def addin(current_price, cash, unit):
        if current_price >= g.last_buy_price + 0.5 * g.current_N:
            shares = cash / current_price
            if shares >= unit:
                print("adding %d" % unit)
                o = order(g.security, unit)
                g.last_buy_price = o.price
                g.current_units += 1
                g.today_units += 1
                return True
        return False
                
    def sell(current_price):
        price = attribute_history(g.security, g.out_day, '1d', ('close',))
        if current_price < min(price['close']):
            print("selling")
            order_target(g.security, 0)
            g.current_units = g.today_units
            return True
        return False
            
    def stop_loss(current_price):
        if current_price < g.last_buy_price - 2 * g.current_N:
            print("stop loss")
            order_target(g.security, 0)
            g.current_units = g.today_units
            return True
        return False
    View Code

    八、鳄鱼交易法则

    # 导入函数库
    import jqdata
    import numpy as np
    
    # 初始化函数,设定基准等等
    def initialize(context):
        set_option('use_real_price', True)
        set_order_cost(OrderCost(close_tax=0.001, open_commission=0.0003, close_commission=0.0003, min_commission=5), type='stock')
        set_benchmark('000300.XSHG')
        
        g.up_price = {} #向上碎形最高价
        g.low_price = {} #向下碎形最低价
        g.up_fractal_exists = {} #判断有效向上碎形
        g.down_fractal_exists = {} #判断有效向下碎形
        g.AO_index = {} #存放连续的AO指标数据
        g.cal_AC_index = {} #计算AC指标中转存储
        g.AC_index = {} #存放连续的AC指标数据
        g.amount = {} #满仓仓位
        g.stock = get_index_stocks('000300.XSHG')
        g.buy_stock = []
        g.month = context.current_dt.month
        run_monthly(select_universe,1,'open')
        
    #重置全局变量
    def reset_global():
        g.up_price = {} #向上碎形最高价
        g.low_price = {} #向下碎形最低价
        g.up_fractal_exists = {} #判断有效向上碎形
        g.down_fractal_exists = {} #判断有效向下碎形
        g.AO_index = {} #存放连续的AO指标数据
        g.cal_AC_index = {} #计算AC指标中转存储
        g.AC_index = {} #存放连续的AC指标数据
        g.amount = {} #满仓仓位
        g.buy_stock = []
    
    def initial_stock_global(stock):
        g.up_price[stock] = 0
        g.low_price[stock] = 0
        g.up_fractal_exists[stock] = False
        g.down_fractal_exists[stock] = False #判断有效向下碎形
        g.AO_index[stock] = [0] #存放连续的AO指标数据
        g.cal_AC_index[stock] = [0]  #计算AC指标中转存储
        g.AC_index[stock] = [0] #存放连续的AC指标数据
        g.amount[stock] = 0 #满仓仓位
    
    #轮换选股后清空持仓
    def reset_position(context):
        for stock in g.buy_stock:
            order_target(stock,0)
            log.info("sell %s for reset position"%stock)
    
    #选股
    def select_universe(context):
        #每三个月操作一次
        month = context.current_dt.month
        if month%6 != g.month%6:
            return
        #清空全局变量
        reset_position(context)
        reset_global()
        hist = history(30,'1d','close',g.stock,df = False)
        for stock in g.stock:
            if is_sleeping_alligator(stock,hist,20):
                g.buy_stock.append(stock)
                #初始化该股票全局变量
                initial_stock_global(stock)
        print g.buy_stock
        return None
        
    #睡着的鳄鱼
    def is_sleeping_alligator(stock,hist,nday):
        for i in range(nday):
            if is_struggle(stock,hist,i) == False:
                return False
        return True
    
    #均线纠缠,BRG三线非常接近
    def is_struggle(stock,hist,delta):
        blue_line = hist[stock][-21-delta:-8-delta].mean()
        red_line = hist[stock][-13-delta:-5-delta].mean()
        green_line = hist[stock][-8-delta:-3-delta].mean()
        if abs(blue_line/red_line-1)<0.02 and abs(red_line/green_line-1)<0.02:
            return True
        else:
            return False
            
    #判断 向上 或 向下 碎形
    def is_fractal(stock,direction):
        hist = attribute_history(stock, 5, fields=[direction])
        if direction == 'high':
            if np.all(hist.iloc[:2] < hist.iloc[2]) and np.all(hist.iloc[3:] < hist.iloc[2]):
                g.up_price[stock] = hist.iloc[2].values
                return True
        elif direction == 'low':
            if np.all(hist.iloc[:2] > hist.iloc[2]) and np.all(hist.iloc[3:] > hist.iloc[2]):
                g.low_price[stock] = hist.iloc[2].values
                return True
        return False
        
    #通过比较碎形与红线位置,判断碎形是否有效
    def is_effective_fractal(stock, direction):
        if is_fractal(stock,direction):
            hist = attribute_history(stock, 11)
            red_line = hist['close'][:-3].mean()
            close_price = hist['close'][-1]
            if direction == 'high':
                if close_price > red_line:
                    g.up_fractal_exists[stock] = True
                else:
                    g.up_fractal_exists[stock] = False
            elif direction == 'low':
                if close_price < red_line:
                    g.down_fractal_exists[stock] = True
                else:
                    g.down_fractal_exists[stock] = False
        
    
    #N日内最高价格的N日线
    def nday_high_point(stock,n):
        hist = history(2*n,'1d','high',[stock],df = False)[stock]
        high_point = []
        for i in range(n):
            high_point.append(max(hist[-5-i:-1-i]))
        return np.array(high_point).mean()
    
    #N日内最低价格的N日线
    def nday_low_point(stock,n):
        hist = history(2*n,'1d','low',[stock],df = False)[stock]
        low_point = []
        for i in range(n):
            low_point.append(max(hist[-5-i:-1-i]))
        return np.array(low_point).mean()
    
    #AO=5日内(最高-最低)/2的5日移动平均-34日内(最高-最低)/2的34日移动平均
    def AO_index(stock):
        g.AO_index[stock].append(nday_high_point(stock,5)/2 + nday_low_point(stock,5)/2
                          - nday_high_point(stock,34)/2 - nday_low_point(stock,34)/2)
        return None
    
    #AO-AO的5日平均值的5日平均
    def AC_index(stock):
        AO_index(stock)
        if len(g.AO_index[stock]) >= 5:
            g.cal_AC_index[stock].append(g.AO_index[stock][-1] - np.array(g.AO_index[stock][-5:]).mean())
            if len(g.cal_AC_index[stock]) >=5:
                g.AC_index[stock].append(np.array(g.cal_AC_index[stock][-5:]).mean())
    
    #判断序列n日上行
    def is_up_going(alist,n):
        if len(alist) < n:
            return False
        for i in range(n-1):
            if alist[-(1+i)] <= alist[-(2+i)]:
                return False
        return True
    
    #判断序列n日下行
    def is_down_going(alist,n):
        if len(alist) < n:
            return False
        for i in range(n-1):
            if alist[-(1+i)] >= alist[-(2+i)]:
                return False
        return True
    
    #碎形被突破
    def active_fractal(stock,direction):
        close_price = history(1,'1d','close',[stock],df=False)[stock][0]
        if direction == 'up' and close_price > g.up_price[stock]:
            return True
        elif direction == 'down' and close_price < g.low_price[stock]:
            return True
        return False
    
    #进场,初始仓位
    def set_initial_position(stock,context):
        close_price = history(1,'1d','close',[stock],df=False)[stock][0]
        g.amount[stock] = context.portfolio.cash/close_price/len(g.buy_stock)*3
        order(stock, g.amount[stock])
        log.info("buying %s 股数为 %s"%(stock,g.amount[stock]))
        g.down_fractal_exists[stock] = False
    
    #卖出
    def sell_all_stock(stock,context):
        order_target(stock,0)
        log.info("selling %s"%stock)
        g.up_fractal_exists[stock] = False
    
    #加仓
    def adjust_position(stock,context,position):
        order(stock,g.amount[stock]*position)
        log.info("adjust position buying %s 股数为 %s"%(stock,g.amount[stock]*position))
    
    # 计算股票前n日收益率
    def security_return(days,security_code):
        hist1 = attribute_history(security_code, days + 1, '1d', 'close',df=False)
        security_returns = (hist1['close'][-1]-hist1['close'][0])/hist1['close'][0]
        return security_returns
    
    # 止损,根据前n日收益率
    def conduct_nday_stoploss(context,security_code,days,bench):
        if  security_return(days,security_code)<= bench:
            for stock in g.buy_stock:
                order_target_value(stock,0)
                log.info("Sell %s for stoploss" %stock)
            return True
        else:
            return False
    
    # 计算股票累计收益率(从建仓至今)
    def security_accumulate_return(context,data,stock):
        current_price = data[stock].price
        cost = context.portfolio.positions[stock].avg_cost
        if cost != 0:
            return (current_price-cost)/cost
        else:
            return None
    
    # 个股止损,根据累计收益
    def conduct_accumulate_stoploss(context,data,stock,bench):
        if security_accumulate_return(context,data,stock) != None
        and security_accumulate_return(context,data,stock) < bench:
            order_target_value(stock,0)
            log.info("Sell %s for stoploss" %stock)
            return True
        else:
            return False
    
    # 个股止盈,根据累计收益
    def conduct_accumulate_stopwin(context,data,stock,bench):
        if security_accumulate_return(context,data,stock) != None
        and security_accumulate_return(context,data,stock) > bench:
            order_target_value(stock,0)
            log.info("Sell %s for stopwin" %stock)
            return True
        else:
            return False
    
    def handle_data(context,data):
        #大盘止损
        if conduct_nday_stoploss(context,'000300.XSHG',3,-0.03):
            return
        for stock in g.buy_stock:
            #个股止损
            if conduct_accumulate_stopwin(context,data,stock,0.3)
            or conduct_accumulate_stoploss(context,data,stock,-0.1):
                return
            #计算AO,AC指标
            AC_index(stock)
            #空仓时,寻找机会入场
            if context.portfolio.positions[stock].amount == 0:
                #计算向上碎形
                is_effective_fractal(stock,'high')
                #有效向上碎形存在,并被突破,买入
                if g.up_fractal_exists and active_fractal(stock,'up'):
                    close_price = history(5, '1d', 'close', [stock],df = False)
                    if is_up_going(g.AO_index[stock],5)
                    and is_up_going(g.AC_index[stock],3)
                    and is_up_going(close_price[stock],2):
                        set_initial_position(stock,context)
            #有持仓时,加仓或离场
            else:
                #计算向下碎形
                is_effective_fractal(stock,'low')
                #出场条件1:有效向下碎形存在,并被突破,卖出
                if g.down_fractal_exists and active_fractal(stock,'down'):
                    sell_all_stock(stock,context)
                    return
    View Code
  • 相关阅读:
    笔记本无线网卡和有线网卡同时用及网络知识回顾总结
    DSPack初次使用小结
    常见加解密算法及Delphi应用程序图标总结
    Delphi窗体创建释放过程及单元文件小结
    怪异的JavaScript的Case语句
    交换机与路由器的区别
    DirectShow学习笔记总结
    Git的提交与查看差异
    Laravel 5使用Laravel Excel实现Excel/CSV文件导入导出的功能详解
    laravel5的Bcrypt加密方式对系统保存密码的小结
  • 原文地址:https://www.cnblogs.com/lxyoung/p/7421988.html
Copyright © 2011-2022 走看看