zoukankan      html  css  js  c++  java
  • 期货、期权tick数据接收

    功能:

    1、开启之后,7*24自动运行。

    2、在共享内存中存放当个交易日的tick数据,方便随时取用。

    3、支持多行情源取数据。经过测试一个行情源峰值带宽要求为20M,所以使用时要配合带宽限制。

    4、夜盘结束时输出一下tick数据,白盘结束时输出所有tick。

    5、支持查询指令:

     运行时如下:

    贴上代码:

    // FutureDataReceive.cpp : 定义控制台应用程序的入口点。
    //
    
    #include "stdafx.h"
    #include "ThostFtdcMdSpiImpl.hpp"
    #include "ThostFtdcTraderSpiImpl.hpp"
    #include "future_helper.h"
    #include "TinyConfig.h"
    #include "FileMap.h"
    #include <boost/lockfree/queue.hpp>
    #include <iostream>
    #include <conio.h>
    
    CFileMap g_fileMap;
    TinyConfig g_config;
    CThostFtdcTraderSpiImpl g_trade;
    vector<shared_ptr<CThostFtdcMdSpiImpl>> g_vecQuote;
    
    int g_nPreTradingDay;
    int g_nTradingDay;
    bool g_bThread = true;
    
    boost::lockfree::queue<CThostFtdcDepthMarketDataField> g_buffer(1000);
    shared_ptr<thread> g_thdDealData = nullptr;
    vector<CThostFtdcDepthMarketDataField> g_vecBuffer;
    
    void OnLog(const char* p)
    {
    	printf(p);
    	ofstream ofs_log("./log.data.receive.txt", ios_base::app);
    	ofs_log << future_helper::get_local_datetime().c_str() << " " << p;
    	ofs_log.close();
    }
    
    #pragma region trade module
    ///trade call back
    bool g_bTradeLoginIn = false;
    void OnTradeComplete()
    {
    	int nTradingDay = atoi(g_trade.GetTradingDay());
    	if (g_nTradingDay != 0 && g_nTradingDay != nTradingDay)
    		g_nPreTradingDay = g_nTradingDay;
    
    	g_nTradingDay = nTradingDay;
    	g_bTradeLoginIn = true;
    	printf("交易日:%d-%d\n", g_nPreTradingDay, g_nTradingDay);
    }
    
    void OnTradeRspMessage(const char* from, const char* msg, future_msg_type msg_type)
    {
    	OnLog(future_helper::format_string("%s%s:%s\n", msg_type == future_msg_type::CTP_SUCCESS ? "" : "*", from, msg).c_str());
    }
    
    void TradeLogin()
    {
    	int retry_time = 3;
    	while (1)
    	{
    		g_bTradeLoginIn = false;
    		g_trade.SetCallBackFunc(OnTradeComplete, nullptr, OnTradeRspMessage);
    		g_trade.SetAuthenticate(g_config.GetValue("Trade", "product").c_str(), g_config.GetValue("Trade", "id").c_str(), g_config.GetValue("Trade", "code").c_str());
    		g_trade.InitParams(g_config.GetValue("Trade", "ip").c_str(), g_config.GetValue("Trade", "broker").c_str(),
    			g_config.GetValue("Trade", "user").c_str(), g_config.GetValue("Trade", "password").c_str());
    
    		int tick = GetTickCount();
    		while (1)
    		{
    			if (g_bTradeLoginIn) break;
    			this_thread::sleep_for(chrono::milliseconds(100));
    			if (GetTickCount() - tick > 60 * 1000) break;
    		}
    
    		g_trade.Release();
    		g_fileMap.SetTradingDay(g_nPreTradingDay, g_nTradingDay);
    		if (g_bTradeLoginIn) break;
    		retry_time--;
    		OnLog("交易 登录超时[1分钟],重新登录\n");
    		if (retry_time == 0)
    		{
    			OnLog("交易 登录次数达到3次,不再尝试重新登录!\n");
    			break;
    		}
    	}
    }
    //////////////////////////////////////////////////////////////////////////
    #pragma endregion trade module
    
    ///market call back
    int g_nMarketLoginIn = 0;
    void OnMarketComplete()
    {
    	g_nMarketLoginIn++;
    }
    
    void OnMarketRspMessage(const char* from, const char* msg, future_msg_type msg_type)
    {
    	OnLog(future_helper::format_string("%s%s:%s\n", msg_type == future_msg_type::CTP_SUCCESS ? "" : "*", from, msg).c_str());
    }
    
    void OnDepthMarketData(CThostFtdcDepthMarketDataField* p)
    {
    	if (!p) return;
    	g_buffer.push(*p);
    }
    
    void MarketLogin()
    {
    	int retry_time = 3;
    	std::vector<CThostFtdcInstrumentField> vecTradingCode;
    	g_trade.GetTradingCode(vecTradingCode, THOST_FTDC_PC_Futures);
    	g_trade.GetTradingCode(vecTradingCode, THOST_FTDC_PC_Options);
    	printf("订阅合约:%d\n", vecTradingCode.size());
    	g_fileMap.SetTradingCode(vecTradingCode);
    	while (1)
    	{
    		g_nMarketLoginIn = 0;
    		for (int i = 1; i <= 3; i++)
    		{
    			string this_ip = g_config.GetValue("Market", future_helper::format_string("ip%d", i).c_str());
    			if (this_ip == "") break;
    			auto pMdSpi = make_shared<CThostFtdcMdSpiImpl>();
    			g_vecQuote.push_back(pMdSpi);
    			pMdSpi->SetTradingCode(vecTradingCode);
    			pMdSpi->SetCallBackFunc(OnDepthMarketData, OnMarketRspMessage, nullptr, OnMarketComplete);
    			pMdSpi->InitParams(this_ip.c_str(), "8888", "88888888", "88888888");
    		}
    
    		int tick = GetTickCount();
    		while (1)
    		{
    			if (g_nMarketLoginIn == g_vecQuote.size()) break;
    			this_thread::sleep_for(chrono::milliseconds(100));
    			if (GetTickCount() - tick > 60 * 1000) break;
    		}
    
    		if (g_nMarketLoginIn > 0 && g_nMarketLoginIn != g_vecQuote.size())
    		{
    			int nUnconnectIndex = 1;
    			OnLog("***********************\n");
    			OnLog("*有未能正常登录的行情账号:\n");
    			for (auto iter = g_vecQuote.begin(); iter != g_vecQuote.end(); iter++)
    			{
    				if (!(*iter)->IsConnected())
    				{
    					OnLog(future_helper::format_string("%d、%s\n", nUnconnectIndex, (*iter)->GetFrontIP()).c_str());
    					nUnconnectIndex++;
    				}
    			}
    			OnLog("***********************\n");
    			OnLog("已有行情连接成功,未连接的不尝试重连\n");
    		}
    
    		if (g_nMarketLoginIn > 0) break;
    		OnLog("行情 登录超时[1分钟],重新登录\n");
    		retry_time--;
    		if (retry_time == 0)
    		{
    			OnLog("行情 登录次数达到3次,不再尝试重新登录!\n");
    			break;
    		}
    	}
    }
    
    void ThreadConsumeTick()
    {
    	OnLog("处理线程已启动\n");
    	map<string, CThostFtdcDepthMarketDataField> m_mapShot;
    	while (g_bThread)
    	{
    		if (g_buffer.empty())
    		{
    			this_thread::sleep_for(chrono::milliseconds(5));
    			continue;
    		}
    
    		CThostFtdcDepthMarketDataField data;
    		g_buffer.pop(data);
    		if (data.UpdateTime[2] == ':') {
    			data.UpdateTime[2] = data.UpdateTime[3];
    			data.UpdateTime[3] = data.UpdateTime[4];
    			data.UpdateTime[4] = data.UpdateTime[6];
    			data.UpdateTime[5] = data.UpdateTime[7];
    			data.UpdateTime[6] = 0;
    		}
    		
    		if (atoi(data.UpdateTime) > 180000) {
    			_snprintf_s(data.TradingDay, 9, "%d", g_nPreTradingDay);
    		}
    		else {
    			_snprintf_s(data.TradingDay, 9, "%d", g_nTradingDay);
    		}
    
    		bool bNewTick = false;
    		auto find_shot = m_mapShot.find(data.InstrumentID);
    		if (find_shot == m_mapShot.end())
    		{
    			bNewTick = true;
    			m_mapShot[data.InstrumentID] = data;
    		}
    		else
    		{
    			long long llThis = future_helper::to_longlong(atoi(data.TradingDay), atoi(data.UpdateTime)) * 1000 + data.UpdateMillisec;
    			long long llLast = future_helper::to_longlong(atoi(find_shot->second.TradingDay), atoi(find_shot->second.UpdateTime)) * 1000 + find_shot->second.UpdateMillisec;
    			if (llThis > llLast || find_shot->second.Volume < data.Volume)
    			{//郑商所没有毫秒
    				bNewTick = true;
    				find_shot->second = data;
    			}
    		}
    
    		if (bNewTick && g_fileMap.IsOK()) {
    			g_fileMap.AddDepthData(&data);
    		}
    		else if (bNewTick) {
    			g_vecBuffer.push_back(data);
    		}
    	}
    	OnLog("处理线程已退出\n");
    }
    
    //bReset == true:清空共享内存中的tick数据
    void OutPutTickData(bool bReset)
    {
    	unsigned int* exist_size;
    	CThostFtdcDepthMarketDataField* p = g_fileMap.GetDepthData(&exist_size);
    	if (!p || *exist_size <= 0) {
    		OnLog("共享内存中无任何tick数据\n从内存中导出\n");
    
    		if (g_vecBuffer.size() == 0)
    		{
    			OnLog("内存中无任何tick数据\n");
    			return;
    		}
    
    		future_helper::safe_create_floder((g_config.GetValue("Path", "tick_normal") + "\\" + future_helper::to_string(g_nTradingDay / 10000)).c_str());
    		ofstream ofs_tick(g_config.GetValue("Path", "tick_normal") + "\\" + future_helper::to_string(g_nTradingDay / 10000) +
    			"\\data_" + future_helper::to_string(g_nTradingDay) + ".txt", ios_base::out);
    		for (auto& field : g_vecBuffer)
    		{
    			char szBuf[1024];
    			_snprintf_s(szBuf, 1024,
    				//交易日,最后修改时间,最后修改毫秒,合约代码,
    				//最新价,上次结算价,昨收盘,昨持仓量,
    				//今开盘,最高价,最低价,数量,成交金额,持仓量,
    				//涨停板价,跌停板价,
    				//申买价一,申买量一,申卖价一,申卖量一,
    				//当日均价
    				"%s,%s,%d,%s,%.4f,%.4f,%.4f,%.4f,%.4f,%.4f,%.4f,%d,%.4f,%.4f,%.4f,%.4f,%.4f,%d,%.4f,%d,%.4f\n",
    				field.TradingDay, field.UpdateTime, field.UpdateMillisec, field.InstrumentID,
    				field.LastPrice, field.PreSettlementPrice, field.PreClosePrice, field.PreOpenInterest,
    				field.OpenPrice, field.HighestPrice, field.LowestPrice, field.Volume, field.Turnover, field.OpenInterest,
    				field.UpperLimitPrice, field.LowerLimitPrice,
    				field.BidPrice1, field.BidVolume1, field.AskPrice1, field.AskVolume1,
    				field.AveragePrice);
    			ofs_tick << szBuf;
    		}
    		return;
    	}
    
    	OnLog(future_helper::format_string("输出共享内存中的tick数据 %d 个\n", *exist_size).c_str());
    	future_helper::safe_create_floder((g_config.GetValue("Path", "tick_normal") + "\\" + future_helper::to_string(g_nTradingDay / 10000)).c_str());
    	ofstream ofs_tick(g_config.GetValue("Path", "tick_normal") + "\\" + future_helper::to_string(g_nTradingDay / 10000) +
    		"\\data_" + future_helper::to_string(g_nTradingDay) + ".txt", ios_base::out);
    	for (int i = 0; i < *exist_size; i++)
    	{
    		auto& field = *(CThostFtdcDepthMarketDataField*)(p + i);
    		char szBuf[1024];
    		_snprintf_s(szBuf, 1024,
    			//交易日,最后修改时间,最后修改毫秒,合约代码,
    			//最新价,上次结算价,昨收盘,昨持仓量,
    			//今开盘,最高价,最低价,数量,成交金额,持仓量,
    			//涨停板价,跌停板价,
    			//申买价一,申买量一,申卖价一,申卖量一,
    			//当日均价
    			"%s,%s,%d,%s,%.3f,%.4f,%.4f,%.4f,%.4f,%.4f,%.4f,%d,%.4f,%.4f,%.4f,%.4f,%.4f,%d,%.4f,%d,%.4f",
    			field.TradingDay, field.UpdateTime, field.UpdateMillisec, field.InstrumentID,
    			future_helper::check_double(field.LastPrice), future_helper::check_double(field.PreSettlementPrice), future_helper::check_double(field.PreClosePrice), future_helper::check_double(field.PreOpenInterest),
    			future_helper::check_double(field.OpenPrice), future_helper::check_double(field.HighestPrice), future_helper::check_double(field.LowestPrice), field.Volume, future_helper::check_double(field.Turnover), future_helper::check_double(field.OpenInterest),
    			future_helper::check_double(field.UpperLimitPrice), future_helper::check_double(field.LowerLimitPrice),
    			future_helper::check_double(field.BidPrice1), field.BidVolume1, future_helper::check_double(field.AskPrice1), field.AskVolume1,
    			future_helper::check_double(field.AveragePrice));
    		ofs_tick << szBuf << endl;
    	}
    	ofs_tick.flush();
    	ofs_tick.close();
    
    	if (bReset)
    		*exist_size = 0;
    }
    
    void Open()
    {
    	printf("%s\n", future_helper::get_local_datetime().c_str());
    	g_nMarketLoginIn = 0;
    	OnLog("区间交易开始!\n");
    	if (!g_thdDealData) {
    		g_bThread = true;
    		g_thdDealData = make_shared<thread>(ThreadConsumeTick);
    	}
    
    	if (!g_fileMap.CreateFileMap(FILE_MAP_KEY))
    	{
    		OnLog("共享内存创建失败!直接存文件...\n");
    	}
    	else
    	{
    		g_fileMap.InitDefaultRange();
    	}
    
    	TradeLogin();
    	unsigned int* exist_count;
    	auto pTick = g_fileMap.GetDepthData(&exist_count);
    	int local_time = atoi(future_helper::get_local_time(false).c_str());
    	if ((!pTick || *exist_count == 0) && (local_time < 151500 || local_time > 180000))
    	{
    		ifstream ifs_tick(g_config.GetValue("Path", "tick_normal") + "\\" + future_helper::to_string(g_nTradingDay / 10000) +
    			"\\data_" + future_helper::to_string(g_nTradingDay) + ".txt", ios_base::in);
    		if (ifs_tick.is_open())
    		{
    			OnLog("本地加载tick到共享内存:");
    			int tick_count = 0;
    			CThostFtdcDepthMarketDataField data;
    			char szLine[1024];
    			while (ifs_tick.getline(szLine, 1024))
    			{
    				future_helper::LineToStruct(szLine, data);
    				g_fileMap.AddDepthData(&data);
    				tick_count++;
    			}
    			OnLog(future_helper::format_string("%d条\n", tick_count).c_str());
    		}
    	}
    	MarketLogin();
    }
    
    void Close()
    {
    	printf("%s\n", future_helper::get_local_datetime().c_str());
    	int close_count = 0;
    	for (auto& md : g_vecQuote)
    	{
    		if (md->IsConnected()) {
    			close_count++;
    			md->ReleaseAPI();
    		}
    	}
    	g_vecQuote.clear();
    	if (close_count > 0) {
    		OnLog(future_helper::format_string("区间交易结束!关闭行情接收:%d\n", close_count).c_str());
    		if (g_thdDealData && g_thdDealData->joinable()) {
    			g_bThread = false;
    			g_thdDealData->join();
    			g_thdDealData = nullptr;
    		}
    	}
    
    	g_nMarketLoginIn = 0;
    	OnLog(future_helper::format_string("共享内存使用%d/%d(mb)=%.2f%%%%\n",
    		g_fileMap.GetTotalUsedSpace() / 1024 / 1024, MAX_PAGE_SIZE / 1024 / 1024,
    		(double)g_fileMap.GetTotalUsedSpace() * 100 / MAX_PAGE_SIZE).c_str());
    }
    
    
    int main()
    {
    	if (!g_config.Open((future_helper::GetWorkingDir() + "\\system.ini").c_str()))
    	{
    		OnLog("system.ini打开失败\n");
    		system("pause");
    	}
    
    	OnLog("执行开启共享内存测试...");
    	if (!g_fileMap.CreateFileMap(FILE_MAP_KEY))
    	{
    		OnLog("失败!");
    		system("pause");
    	}
    	else
    	{
    		OnLog("成功!\n");
    		g_fileMap.Release();
    	}
    
    	OnLog("==========start==========\n");
    
    	printf("初次启动需要输入前交易日,用来更新夜盘的日期...");
    	scanf_s("%d", &g_nPreTradingDay);
    	int night_tick_count = -1;
    	int last_local_time = atoi(future_helper::get_local_time(false).c_str());
    
    	while (1)
    	{
    		if (_kbhit() != 0)
    		{
    			printf("**********不要长时间阻塞此处,这样将导致软件无法正常工作!***********\n");
    			printf("**********记得输入完之后按回车键哦!                      ***********\n");
    			string str;
    			std::getline(std::cin, str);
    			if (str == "quit")
    			{
    				Close();
    				break;
    			}
    			else if (str == "help")
    			{
    				printf("quit:退出\n");
    				printf("help:帮助\n");
    				printf("open:手动开启接收\n");
    				printf("close:手动关闭接收\n");
    				printf("tradingday:交易日\n");
    				printf("tradingcode:可交易合约\n");
    				printf("tickdata:输出接收到的tick数据\n");
    				printf("sharememory:共享内存使用率\n");
    			}
    			else if (str == "open")
    			{
    				Open();
    			}
    			else if (str == "close")
    			{
    				Close();
    			}
    			else if (str == "tradingday")
    			{
    				printf("交易日:%d-%d\n", g_nPreTradingDay, g_nTradingDay);
    			}
    			else if (str == "tradingcode")
    			{
    				printf("期货可交易合约:\n");
    				map<string, vector<CThostFtdcInstrumentField>> mapClasses;
    				vector<CThostFtdcInstrumentField> vec;
    				g_trade.GetTradingCode(vec, THOST_FTDC_PC_Futures);
    				for (auto& item : vec)
    				{
    					mapClasses[item.ProductID].push_back(item);
    				}
    
    				for (auto iter = mapClasses.begin(); iter != mapClasses.end(); iter++)
    				{
    					printf("%s:", iter->first.c_str());
    					for (auto& item : iter->second)
    					{
    						printf("%s ", item.InstrumentID);
    					}
    					printf("\n");
    				}
    			}
    			else if (str == "tickdata")
    			{
    				OutPutTickData(false);
    			}
    			else if (str == "sharememory")
    			{
    				unsigned int* exist_size;
    				CThostFtdcDepthMarketDataField* p = g_fileMap.GetDepthData(&exist_size);
    				OnLog(future_helper::format_string("tick数据个数:%d\n共享内存使用%d/%d(mb)=%.2f%%%%\n",
    					p ? *exist_size : 0,
    					g_fileMap.GetTotalUsedSpace() / 1024 / 1024, MAX_PAGE_SIZE / 1024 / 1024,
    					(double)g_fileMap.GetTotalUsedSpace() * 100 / MAX_PAGE_SIZE).c_str());
    			}
    		}
    
    		int local_time = atoi(future_helper::get_local_time(false).c_str());
    		if ((local_time >= 23500 && last_local_time < 23500) ||
    			(local_time >= 155000 && last_local_time < 155000))
    		{
    			unsigned int* exist_count;
    			auto pTick = g_fileMap.GetDepthData(&exist_count);
    			Close();
    			if ((local_time >= 155000 && last_local_time < 155000)){
    				OnLog("交易日结束了?...");
    				if (night_tick_count == -1 || *exist_count > night_tick_count) {
    					OnLog("是的!\n");
    					OutPutTickData(true);
    					if (night_tick_count != -1) {
    						OnLog("打开TickToKline.exe\n");
    						::ShellExecute(NULL, "open", (future_helper::GetWorkingDir() + "\\TickToKline.exe").c_str(), "1", NULL, SW_SHOW);
    					}
    					
    					night_tick_count = -1;
    				}
    				else
    				{
    					OnLog(future_helper::format_string("没有!(共享内存的数据没有比夜盘多%d-%d)!\n",
    						night_tick_count, *exist_count).c_str());
    				}
    			}
    			else
    			{
    				OnLog("夜盘结束了?...");
    				if (night_tick_count == -1 && *exist_count > 0) {
    					OnLog("是的!输出一次夜盘数据\n");
    					night_tick_count = *exist_count;
    					OutPutTickData(false);
    				}
    				else
    				{
    					OnLog("现在不是夜盘结束时间!(没有夜盘数据 或者 已经输出过一次夜盘数据了)\n");
    				}
    			}
    		}
    		if ((local_time >= 82000 && last_local_time < 82000) ||
    			(local_time >= 202000 && last_local_time < 202000))
    		{
    			Close();
    			Open();
    		}
    
    		last_local_time = local_time;
    		this_thread::sleep_for(chrono::seconds(1));
    	}
    
        return 0;
    }
    

      

     欢迎讨论交流,qq896250188

  • 相关阅读:
    Socket 之 同步以及异步通信
    Socket 之 c#实现Socket网络编程
    Socket 之 API函数介绍
    Socket 之 原理与编程基础
    C# 之 user32函数库
    WinServer 之 访问同网段服务器 或 同一服务器多虚拟机间的访问
    annex-b格式
    FLV文件格式解析
    PHP5中的stdClass
    web服务器【apache/nginx] 关闭目录的浏览权限
  • 原文地址:https://www.cnblogs.com/rmdmn/p/11211472.html
Copyright © 2011-2022 走看看