zoukankan      html  css  js  c++  java
  • 【股票基础选取】

    import talib
    import pandas as pd
    import time
    import numpy as np
    
    class demo:
        
        
        def __init__(self):
            print('demo 类')
            self.h5_path = 'Z:/data/stock_data_py'
        def 加载数据(self,name,var):
            print('正在加载数据'+self.h5_path+'/'+var+'/'+name)
            df = pd.read_hdf(self.h5_path + '/' + var + '/' + name + '.h5')
            return df
        def demo_run(self,strategy_name):
            t = time.time() # 计时开始
            # 1.加载数据
            stocklist=self.加载数据('stocklist','list')
            stocklist['codenum'] = stocklist['code'].apply(lambda x: x[:6])
            closef_pd=self.加载数据('closef_pd','day')
            openf_pd=self.加载数据('openf_pd','day')
            highf_pd=self.加载数据('highf_pd','day')
            lowf_pd=self.加载数据('lowf_pd','day')
            amt_pd=self.加载数据('amt_pd','day')
            # 2.选股思路计算
            
            # MACD()
            def MACD():
                t = time.time() # 计时开始
                p = (12, 26, 9)
                def get_macd(x):
                    DIF, DEA, MACD= talib.MACD(x.values, p[0], p[1], p[2])
                    return MACD
                dfA = closef_pd.apply(get_macd, axis=0)
                print(time.time() - t) # 计时结束
                npA = dfA.values.T
                N = 1
                
                maxt = np.max(npA[:, -N - 5:-N-1], axis=1)
                
                ind=np.where( (np.max(npA[:, -N - 5:-N-1], axis=1) < 0) & (npA[:, -N] > 0) )[0]
                
                return ind
            
            # 可以扩展写一下其他的指标
            # KDJ 
            def KDJ():
                t = time.time() # 计时开始
                def get_kdj(x):
                    ndim=x.name
                    try:
                        k,d=talib.STOCH(highf_pd[ndim].values,lowf_pd[ndim].values,x.values,9,3,0,3,0)
                        j=3*d-2*k
                    except:
                        j=np.array( [np.nan]*len(highf_pd) )
                    return j
                dfA = closef_pd.apply(get_kdj, axis=0)
                print(time.time() - t) # 计时结束
                npA = dfA.values.T
                N = 1
                ind=np.where( (npA[:, -N-1] < 20) & (npA[:, -N] > 20) )[0]
                return ind
            # ind=KDJ()
            # RSI
            def RSI():
                t = time.time() # 计时开始
                def get_rsi(x):
                    ndim=x.name
                    rsi6=talib.RSI(x.values,6)
                    rsi12=talib.RSI(x.values,12)
                    rsi24=talib.RSI(x.values,24)
                    rsi=rsi6-rsi12
                    if  rsi6[-2]<50 and rsi[-2]<0 and  rsi[-1]>0:
                        return 1
                    else:
                        return 0
                
                dfA = closef_pd.apply(get_rsi, axis=0)
                print(time.time() - t) # 计时结束
                npA = dfA.values.T
                N = 1
                ind=np.where( npA==1 )[0]
                return ind
            
            # BOLL
            def BOLL():
                t = time.time() # 计时开始
                def get_rsi(x):
                    upper, middle, lower= talib.BBANDS(x.values, 20,2,2)
                    return x.values-upper
                
                dfA = closef_pd.apply(get_rsi, axis=0)
                print(time.time() - t) # 计时结束
                npA = dfA.values.T
                N = 1
                
                ind=np.where( (np.max(npA[:, -N - 10:-N-1], axis=1) < 0) & (npA[:, -N] > 0) )[0]
                return ind
            
            # TRIX
            def TRIX():
                pass
                t = time.time() # 计时开始
                def get_trix(x):
                    trix_data= talib.TRIX(x.values, 12)
                    try:
                        trix_ma=talib.MA(trix_data,20)
                    except:
                        trix_ma=np.array( [np.nan]*len(x) )
                    return trix_data-trix_ma
                
                dfA= closef_pd.apply(get_trix, axis=0)
                print(time.time() - t) # 计时结束
                npA = dfA.values.T
                N = 1
                ind=np.where( (np.max(npA[:, -N - 10:-N-1], axis=1) < 0) & (npA[:, -N] > 0) )[0]
                return ind
            def 共振():
                # 求交集
                pass
                indA=TRIX()
                indB=MACD()
                indC=RSI()
                indD=BOLL()
                #数组的交集
                ind=np.intersect1d(indC,indB)
                return ind
            ind=eval(strategy_name+'()')
            temp = ind
            npB = stocklist.iloc[ind]
            print(time.time() - t) # 计时结束
            return npB
        
        
    
    if __name__=='__main__':
        self=demo()
        listMACD=self.demo_run(strategy_name='MACD')
        #listKDJ=self.demo_run(strategy_name='KDJ')
        #listRSI=self.demo_run(strategy_name='RSI')
        # listBOLL=self.demo_run(strategy_name='BOLL')
        #listTRIX=self.demo_run(strategy_name='TRIX')
        #list共振=self.demo_run(strategy_name='共振')
  • 相关阅读:
    wso2使用
    wso2安装
    CLR 编译函数的两种结果的原因
    hduoj4311
    通过Git在本地局域网中的两台电脑间同步代码
    Git基本操作之强制推送覆盖仓库
    设置Mac共享网络给其他设备
    谷歌浏览器设置无图浏览模式
    加载到SGA中的库缓存对象超过阈值
    webBrowser 禁止屏蔽alert confirm open showModalDialog
  • 原文地址:https://www.cnblogs.com/fyandy/p/9650896.html
Copyright © 2011-2022 走看看