# -*- coding: utf-8 -*- ''' 登录 买入 卖出 撤单 ''' import pandas as pd import numpy as np import sys import datetime import time import scipy.io import os import uuid import signal import win32api import win32con import win32com,win32com.client import win32console import win32process import win32clipboard import win32gui import pywinauto # 需要pip安装 pip install pywinauto import pywinauto.controls.hwndwrapper as hwndWrapper import pywinauto.findwindows as findwindows import pywinauto.controls.win32_controls as win32_controls def getclip(): win32clipboard.OpenClipboard() formats=[] lastformat=0 while 1: nextfomat=win32clipboard.EnumClipboardFormats(lastformat) if 0==nextfomat: break else: formats.append(nextfomat) lastformat=nextfomat if len(formats)==0: win32clipboard.CloseClipboard() return '' clipmemory=win32clipboard.GetClipboardData(13)#13是str # print(' format:{} clipboard:{}'.format(lastformat,clipmemory)) win32clipboard.CloseClipboard() return clipmemory class zyc_time: def __init__(self): pass def zyc_timeNow(self): t=time.localtime(time.time()) tS=time.strftime("%Y-%m-%d %H:%M:%S",t) return tS def zyc_print(self,strG): tS=self.zyc_timeNow() print(tS+' '+strG) def start(self): self.tt_start=time.time() def end(self): self.tt_end=time.time() self.tt=self.tt_end-self.tt_start self.zyc_print('耗时: '+str(round(self.tt,3))+' 秒') class ths_api(zyc_time): __is_run=0 def __init__(self): zyc_time.__init__(self) # 同花顺客户端登录 def 同花顺客户端登录(self,Name='网上股票交易系统5.0'): # 获取主句柄 同花顺模拟账户 42763175 密码 789789 i=0;self.hand=0 self.hand = win32gui.FindWindowEx(0, self.hand, None,Name) while self.hand==0 and i<5: i=i+1 self.hand = win32gui.FindWindowEx(0, self.hand, None,Name) time.sleep(0.1) if self.hand==0: self.zyc_print('请登录同花顺股票统一客户端') return else: pass self.zyc_print('获取主句柄成功,self.hand='+str(self.hand)) # 获取买入句柄 # 点击F1 win32gui.PostMessage(self.hand, win32con.WM_KEYDOWN, win32con.VK_F1) # 发送双向交易窗口 win32gui.PostMessage(self.hand, win32con.WM_KEYUP, win32con.VK_F1) time.sleep(0.5) self.买入句柄={} i = 0; self.买入句柄['hand'] = 0 self.买入句柄['hand'] = win32gui.GetDlgItem(self.hand, 0xE900) self.买入句柄['hand'] = win32gui.GetDlgItem(self.买入句柄['hand'], 0xE901) while self.买入句柄['hand'] == 0 and i < 5: i = i + 1 self.买入句柄['hand'] = win32gui.GetDlgItem(self.hand, 0xE900) self.买入句柄['hand'] = win32gui.GetDlgItem(self.买入句柄['hand'], 0xE901) time.sleep(0.1) if self.买入句柄['hand'] == 0: self.zyc_print('请登录同花顺股票统一客户端') return else: pass self.zyc_print('获取买入句柄成功,买入句柄=' + str(self.买入句柄['hand'])) i = 0 self.买入句柄['hand_stock']=0 self.买入句柄['hand_price'] = 0 self.买入句柄['hand_num'] = 0 self.买入句柄['hand_buy'] = 0 while ( self.买入句柄['hand_stock']== 0 or self.买入句柄['hand_price'] ==0 or self.买入句柄['hand_num'] == 0 or self.买入句柄['hand_buy'] == 0 ) and i < 10: i = i + 1 self.买入句柄['hand_stock'] = win32gui.GetDlgItem(self.买入句柄['hand'], 1032) self.买入句柄['hand_price'] = win32gui.GetDlgItem(self.买入句柄['hand'], 1033) self.买入句柄['hand_num'] = win32gui.GetDlgItem(self.买入句柄['hand'], 1034) self.买入句柄['hand_buy'] = win32gui.GetDlgItem(self.买入句柄['hand'], 1006) time.sleep(0.1) if i>5: self.zyc_print('异常') # 点击F6 win32gui.PostMessage(self.hand, win32con.WM_KEYDOWN, win32con.VK_F6) # 发送双向交易窗口 win32gui.PostMessage(self.hand, win32con.WM_KEYUP, win32con.VK_F6) time.sleep(0.5) i = 0 self.买入句柄['hand_F6'] = 0 while self.买入句柄['hand_F6'] == 0 and i < 5: self.买入句柄['hand_F6'] = win32gui.GetDlgItem(self.买入句柄['hand'], 1047) time.sleep(0.5) # 点击F7 win32gui.PostMessage(self.hand, win32con.WM_KEYDOWN, win32con.VK_F7) # 发送双向交易窗口 win32gui.PostMessage(self.hand, win32con.WM_KEYUP, win32con.VK_F7) time.sleep(0.5) i = 0 self.买入句柄['hand_F7'] = 0 while self.买入句柄['hand_F7'] == 0 and i < 5: self.买入句柄['hand_F7'] = win32gui.GetDlgItem(self.买入句柄['hand'], 1047) time.sleep(0.5) # 点击F8 win32gui.PostMessage(self.hand, win32con.WM_KEYDOWN, win32con.VK_F8) # 发送双向交易窗口 win32gui.PostMessage(self.hand, win32con.WM_KEYUP, win32con.VK_F8) time.sleep(0.5) i = 0 self.买入句柄['hand_F8'] = 0 while self.买入句柄['hand_F8'] == 0 and i < 5: self.买入句柄['hand_F8'] = win32gui.GetDlgItem(self.买入句柄['hand'], 1047) time.sleep(0.5) # 获取卖出句柄 # 点击F2 win32gui.PostMessage(self.hand, win32con.WM_KEYDOWN, win32con.VK_F2) # 发送双向交易窗口 win32gui.PostMessage(self.hand, win32con.WM_KEYUP, win32con.VK_F2) time.sleep(0.5) self.卖出句柄 = {} i = 0; self.卖出句柄['hand'] = 0 self.卖出句柄['hand'] = win32gui.GetDlgItem(self.hand, 0xE900) self.卖出句柄['hand'] = win32gui.GetDlgItem(self.卖出句柄['hand'], 0xE901) while self.卖出句柄['hand'] == 0 and i < 5: i = i + 1 self.卖出句柄['hand'] = win32gui.GetDlgItem(self.hand, 0xE900) self.卖出句柄['hand'] = win32gui.GetDlgItem(self.卖出句柄['hand'], 0xE901) time.sleep(0.1) if self.卖出句柄['hand'] == 0: self.zyc_print('请登录同花顺股票统一客户端') return else: pass self.zyc_print('获取卖出句柄成功,卖出句柄=' + str(self.卖出句柄['hand'])) i = 0 self.卖出句柄['hand_stock'] = 0 self.卖出句柄['hand_price'] = 0 self.卖出句柄['hand_num'] = 0 self.卖出句柄['hand_sell'] = 0 while (self.卖出句柄['hand_stock'] == 0 or self.卖出句柄['hand_price'] == 0 or self.卖出句柄['hand_num'] == 0 or self.卖出句柄['hand_sell'] == 0) and i < 10: i = i + 1 self.卖出句柄['hand_stock'] = win32gui.GetDlgItem(self.卖出句柄['hand'], 1032) self.卖出句柄['hand_price'] = win32gui.GetDlgItem(self.卖出句柄['hand'], 1033) self.卖出句柄['hand_num'] = win32gui.GetDlgItem(self.卖出句柄['hand'], 1034) self.卖出句柄['hand_sell'] = win32gui.GetDlgItem(self.卖出句柄['hand'], 1006) time.sleep(0.1) if i > 5: self.zyc_print('异常') # 获取撤单句柄 # 点击F3 win32gui.PostMessage(self.hand, win32con.WM_KEYDOWN, win32con.VK_F3) # 发送双向交易窗口 win32gui.PostMessage(self.hand, win32con.WM_KEYUP, win32con.VK_F3) time.sleep(0.5) self.撤单句柄 = {} i = 0; self.撤单句柄['hand'] = 0 self.撤单句柄['hand'] = win32gui.GetDlgItem(self.hand, 0xE900) self.撤单句柄['hand'] = win32gui.GetDlgItem(self.撤单句柄['hand'], 0xE901) while self.撤单句柄['hand'] == 0 and i < 5: i = i + 1 self.撤单句柄['hand'] = win32gui.GetDlgItem(self.hand, 0xE900) self.撤单句柄['hand'] = win32gui.GetDlgItem(self.撤单句柄['hand'], 0xE901) time.sleep(0.1) if self.撤单句柄['hand'] == 0: self.zyc_print('请登录同花顺股票统一客户端') return else: pass self.zyc_print('获取撤单句柄成功,撤单句柄=' + str(self.撤单句柄['hand'])) i = 0 self.撤单句柄['撤单'] = 0 self.撤单句柄['全撤'] = 0 self.卖出句柄['撤买'] = 0 self.卖出句柄['撤卖'] = 0 while (self.撤单句柄['撤单'] == 0 or self.撤单句柄['全撤'] == 0 or self.撤单句柄['撤买'] == 0 or self.撤单句柄['撤卖'] == 0) and i < 10: i = i + 1 self.撤单句柄['撤单'] = win32gui.GetDlgItem(self.撤单句柄['hand'], 1099) self.撤单句柄['全撤'] = win32gui.GetDlgItem(self.撤单句柄['hand'], 30001) self.撤单句柄['撤买'] = win32gui.GetDlgItem(self.撤单句柄['hand'], 30002) self.撤单句柄['撤卖'] = win32gui.GetDlgItem(self.撤单句柄['hand'], 30003) time.sleep(0.1) if i > 5: self.zyc_print('异常') # 获取查询资金句柄 # 点击F4 win32gui.PostMessage(self.hand, win32con.WM_KEYDOWN, win32con.VK_F4) # 发送双向交易窗口 win32gui.PostMessage(self.hand, win32con.WM_KEYUP, win32con.VK_F4) time.sleep(0.5) self.查询句柄 = {} i = 0; self.查询句柄['hand'] = 0 self.查询句柄['hand'] = win32gui.GetDlgItem(self.hand, 0xE900) self.查询句柄['hand'] = win32gui.GetDlgItem(self.查询句柄['hand'], 0xE901) while self.查询句柄['hand'] == 0 and i < 5: i = i + 1 self.查询句柄['hand'] = win32gui.GetDlgItem(self.hand, 0xE900) self.查询句柄['hand'] = win32gui.GetDlgItem(self.查询句柄['hand'], 0xE901) time.sleep(0.1) if self.查询句柄['hand'] == 0: self.zyc_print('请登录同花顺股票统一客户端') return else: pass self.zyc_print('获取查询句柄成功,查询句柄=' + str(self.查询句柄['hand'])) # 买入 def 买入(self,stock,price,num): self.start() self.KnkDlg() win32gui.PostMessage(self.hand, win32con.WM_KEYDOWN, win32con.VK_F1) win32gui.PostMessage(self.hand, win32con.WM_KEYUP, win32con.VK_F1) time.sleep(0.1) i=0 while i==0 : try: win32_controls.EditWrapper(self.买入句柄['hand_stock']).set_edit_text(stock) win32_controls.EditWrapper(self.买入句柄['hand_price']).set_edit_text(price) win32_controls.EditWrapper(self.买入句柄['hand_num']).set_edit_text(num) except: pass a = win32_controls.EditWrapper(self.买入句柄['hand_stock']).texts()[1] == stock b = win32_controls.EditWrapper(self.买入句柄['hand_price']).texts()[1] == price c = win32_controls.EditWrapper(self.买入句柄['hand_num']).texts()[1] == num if a and b and c: i=1 if i==0: time.sleep(0.1) hand_buy = win32_controls.ButtonWrapper(self.买入句柄['hand_buy']) while hand_buy.IsVisible() == 0 or hand_buy.IsEnabled() == 0: time.sleep(0.1) hand_buy.Click() self.end() if self.KnkDlg()==0: self.zyc_print('买入失败,'+stock+','+price+','+num) return 1 self.zyc_print('买入成功,'+stock+','+price+','+num) return 0 # 卖出 def 卖出(self,stock,price,num): self.start() self.KnkDlg() win32gui.PostMessage(self.hand, win32con.WM_KEYDOWN, win32con.VK_F2) win32gui.PostMessage(self.hand, win32con.WM_KEYUP, win32con.VK_F2) time.sleep(0.1) i=0 while i==0 : try: win32_controls.EditWrapper(self.卖出句柄['hand_stock']).set_edit_text(stock) win32_controls.EditWrapper(self.卖出句柄['hand_price']).set_edit_text(price) win32_controls.EditWrapper(self.卖出句柄['hand_num']).set_edit_text(num) except: pass a = win32_controls.EditWrapper(self.卖出句柄['hand_stock']).texts()[1] == stock b = win32_controls.EditWrapper(self.卖出句柄['hand_price']).texts()[1] == price c = win32_controls.EditWrapper(self.卖出句柄['hand_num']).texts()[1] == num if a and b and c: i=1 if i==0: time.sleep(0.1) hand_sell = win32_controls.ButtonWrapper(self.卖出句柄['hand_sell']) while hand_sell.IsVisible() == 0 or hand_sell.IsEnabled() == 0: time.sleep(0.1) hand_sell.Click() self.end() if self.KnkDlg()==0: self.zyc_print('卖出失败,'+stock+','+price+','+num) return 1 self.zyc_print('卖出成功,'+stock+','+price+','+num) return 0 # 撤单 def 撤单(self,batch=None): # self.撤单(batch='all')#全撤 # self.撤单(batch='buy')#全撤 # self.撤单(batch='sell')#全撤 # self.撤单(single='600784,7.00,证券买入')#按条件撤单 #self.撤单(subnum='1234')#按委托单号撤单 # self.KnkDlg() win32gui.PostMessage(self.hand, win32con.WM_KEYDOWN, win32con.VK_F3) win32gui.PostMessage(self.hand, win32con.WM_KEYUP, win32con.VK_F3) time.sleep(0.5) if batch=='all': hwndWrapper.HwndWrapper(self.撤单句柄['全撤']).SetFocus() win32_controls.ButtonWrapper(self.撤单句柄['全撤']).Click() elif batch=='buy': hwndWrapper.HwndWrapper(self.撤单句柄['撤买']).SetFocus() win32_controls.ButtonWrapper(self.撤单句柄['撤买']).Click() elif batch=='sell': hwndWrapper.HwndWrapper(self.撤单句柄['撤卖']).SetFocus() win32_controls.ButtonWrapper(self.撤单句柄['撤卖']).Click() else: pass trec=time.time() while True: if time.time()-trec>10: break time.sleep(0.5) if self.KnkDlg()==1: break return 0 def KnkDlg(self): # 关闭所有同进程同线程的对话框 thread, processId = win32process.GetWindowThreadProcessId(self.hand) temp_hwnd = pywinauto.findwindows.find_windows(top_level_only=True, class_name='#32770', parent=None) for hdlg in temp_hwnd: dlgthread, dlgprocessId = win32process.GetWindowThreadProcessId(hdlg) if dlgprocessId==processId and win32gui.IsWindowVisible(hdlg) : hcmd1 = pywinauto.findwindows.find_windows(top_level_only=False, class_name='Button', title_re='是',parent=hdlg) hcmd2 = pywinauto.findwindows.find_windows(top_level_only=False, class_name='Button', title_re='确定',parent=hdlg) hcmd3 = pywinauto.findwindows.find_windows(top_level_only=False, class_name='Button', title_re='确认',parent=hdlg) time.sleep(0.1) if len(hcmd1)>0: hwndWrapper.HwndWrapper(hcmd1[0]).SetFocus() win32_controls.ButtonWrapper(hcmd1[0]).Click() return 0 elif len(hcmd2)>0: hwndWrapper.HwndWrapper(hcmd2[0]).SetFocus() win32_controls.ButtonWrapper(hcmd2[0]).Click() return 0 elif len(hcmd3)>0: hwndWrapper.HwndWrapper(hcmd3[0]).SetFocus() win32_controls.ButtonWrapper(hcmd3[0]).Click() return 0 return 1 def chkwnd(self): if win32gui.IsWindow(self.hand)==0 or self.hand==0: print ('未发现交易窗口!') return 1 trec=time.time() while win32gui.IsWindowVisible( self.hand)==0: if time.time()-trec>3: print('无法调出隐藏的窗口!') return 1 thread, processId = win32process.GetWindowThreadProcessId(self.hand) p = psutil.Process(processId) win32process.CreateProcess(p.exe(),'',None,None,0,win32process.CREATE_NO_WINDOW,None,None,win32process.STARTUPINFO()) time.sleep(0.1) return 0 if __name__=='__main__': pass self=ths_api() self.同花顺客户端登录() for i in range(3): is_buy= self.买入('000001','9.00','100') # stock='000001';price='10.5';num='300' is_sell= self.卖出('000001','11.35','100') # stock='000001';price='10.5';num='300' self.撤单(batch='all')#全撤