1、创建数据库(strategy)、表(trade_date 交易日)
create database strategy default character set utf8 collate utf8_general_ci;
CREATE TABLE `trade_date` (
`id` int(8) NOT NULL AUTO_INCREMENT,
`day` varchar(8) NOT NULL ,
`year` varchar(4) NOT NULL,
`month` varchar(6) NOT NULL,
`desc` varchar(200),
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
int:2147483647
long:9223372036854775807
2、接口功能(python实现):
a:计算从起始日期到当天的所有交易日【周六日除外】(数据库无关---起始日期为传入参数)
所用到的url资料:
url:http://blog.chinaunix.net/uid-26425155-id-3022902.html
http://outofmemory.cn/code-snippet/1841/python-time-date-module-usage-summary
http://2057.iteye.com/blog/1958624
http://www.cnblogs.com/rollenholt/archive/2012/04/10/2441542.html
http://blog.chinaunix.net/uid-26425155-id-3022902.html
http://2057.iteye.com/blog/1958624
所需要的最有用资料:http://www.cnblogs.com/BeginMan/archive/2013/04/08/3007403.html
b:trade_date数据库中数据补全功能
如果数据库为空:则插入从指定日期到当天的交易日数据
如果数据不为空,则做增量操作。(包括第二天新增第二天当天的交易日数据,或服务器停止几天后,向数据库中插入这段时间内的交易日数据)
url:
python中的mysql数据库操作:http://www.cnblogs.com/fnng/p/3565912.html
python中的配置文件读取:http://blog.chinaunix.net/uid-24585858-id-4820471.html
python中全局变量:先定义变量A,在某方法中调用:global A 之后对A进行赋值。 别的方法中使用到A时都将是赋值之后的值。
python中的数据库操作事务处理:try...except...finally except时:rollback finally时:cur.close() conn.close()
python中的定时任务:分别为每天的新增操作和该方法执行时的定时操作。
每天定时任务执行:http://bbs.csdn.net/topics/390950625
思路:使用线程实现。run方法中。
执行操作 #系统启动当时执行操作
计算第二天设置的任务执行时间距离当前系统时间秒数。time.sleep(秒数)
执行操作 #系统启动第二天执行任务
while(True):
time,sleep(24*60*60) #等待一天
执行操作。
python中初始化时进行的操作:写入线程的init()方法中,在创建线程时执行操作。
python中MySQLdb中的线程池机制:DBUtils.Pooleddb
http://www.cnblogs.com/Xjng/p/3437694.html
报错:'module' object is not callable
原因:模块中的方法没有导入。 在eclipse中需要将DBUtils.PooledDB.PooledDB()方法导入,才可以执行PooledDB()
eclipse中将DBUtils.PooledDB.PooledDB 强制引入
在模块中引入语句:from DBUtils.PooledDB import PooledDB
安装MySQLdb后,命令行验证安装成功,但eclipse报错:unresolved 。解决:http://blog.csdn.net/ao_xue1234/article/details/8191974
需考虑的事情:
该表是否已经存在:可在创建表的语句执行之前通过show tables得到库中所有的表,判断是否已存在,若存在,才会执行创建。
向库中插入数据时,起始交易日需配置文件指定。
连接数据库的参数:需配置文件指定
向数据库插入数据时,mysql表主键的赋值。【因为新建表时设置主键为自动增长。所以,可以写sql语句时,指定insert into test(name1,name2,name3) values(val1,val2,val3) 这样就不需要为主键赋值了。但是如果字段很多时,就不适合。】
如果操作失败时的回滚,事务处理。
验证输入日期是否有效。try:...except:... 在try中对字符串进行向date转换,若抛异常,return False 表示验证不通过。
3、python引入模块操作:
python多个模块引入同一个模块时,并不会每次引入都重新载入这个模块。url:http://blog.chinaunix.net/uid-20684384-id-1895613.html
4、成品:
模块代码:
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
#coding:utf-8 from datetime import * import ConfigParser import MySQLdb import threading import time from DBUtils.PooledDB import PooledDB """ 股票交易日模块, 该模块默认会启动向数据库插入操作的定时任务线程。在该线程中初始化数据库连接池。 """ #全局变量 POOL=None START_DAY=None HOUR=None MINUTE=None SECOND=None TABLE_NAME=None #本模块中用到的sql语句 为全局变量 在init_sql()中被初始化。 重点操作:此时TABLE_NAME已经被初始化 show_tables_sql="show tables" CREATE_TABLE_SQL=None INSERT_SQL=None SELECT_ALL_SQL=None SELECT_MAX_DAY_SQL=None SELECT_DAYS_BETWEEN_DAY_SQL=None SELECT_DAYS_BETWEEN_YEAR_SQL=None SELECT_DAYS_BETWEEN_MONTH_SQL=None def init(): "初始化,执行读取配置文件和为sql赋值的操作" __read_conf() __init_sql() def __read_conf(): "初始化数据库链接、初始股票交易日期、任务执行时间" global POOL,TABLE_NAME,START_DAY,HOUR,MINUTE,SECOND cf=ConfigParser.ConfigParser() cf.read("../conf/cfg.properties") db_host=cf.get("db", "host") db_port=cf.get("db","port") db_user=cf.get("db","user") db_passwd=cf.get("db","passwd") db_db=cf.get("db","db") max_connection=cf.get("db","max_connection") TABLE_NAME=cf.get("db","table_name") #创建数据库连接池 POOL= PooledDB(MySQLdb,int(max_connection),host=db_host,port=int(db_port),user=db_user,passwd=db_passwd,db=db_db,charset="utf8") #给最初的交易日期赋值 START_DAY=cf.get("time","start_trade_date") #为执行定时任务的时间赋值 HOUR=int(cf.get("task","hour")) MINUTE=int(cf.get("task","minute")) SECOND=int(cf.get("task","SECOND")) def __init_sql(): "初始化sql语句" global CREATE_TABLE_SQL,INSERT_SQL,SELECT_ALL_SQL,SELECT_MAX_DAY_SQL,SELECT_DAYS_BETWEEN_DAY_SQL,SELECT_DAYS_BETWEEN_YEAR_SQL,SELECT_DAYS_BETWEEN_MONTH_SQL CREATE_TABLE_SQL="CREATE TABLE {tableName} (" " `id` int(8) NOT NULL AUTO_INCREMENT," " `day` varchar(8) NOT NULL ," "`year` varchar(4) NOT NULL," "`month` varchar(6) NOT NULL," "`desc` varchar(200)," "PRIMARY KEY (`id`)" ") ENGINE=InnoDB DEFAULT CHARSET=utf8;".format(tableName=TABLE_NAME) INSERT_SQL="insert into {tableName}(day,year,month) values(%s,%s,%s)".format(tableName=TABLE_NAME) SELECT_ALL_SQL="select * from {tableName}".format(tableName=TABLE_NAME) SELECT_MAX_DAY_SQL="select max(day) from {tableName}".format(tableName=TABLE_NAME) SELECT_DAYS_BETWEEN_DAY_SQL="select day from {tableName} where day>=%s and day<=%s order by day".format(tableName=TABLE_NAME) SELECT_DAYS_BETWEEN_YEAR_SQL="select day from {tableName} where year>=%s and year<=%s order by day".format(tableName=TABLE_NAME) SELECT_DAYS_BETWEEN_MONTH_SQL="select day from {tableName} where month>=%s and month<=%s order by day".format(tableName=TABLE_NAME) def create_table(): "先查询数据库中所有表,判断当前要创建的表是否存在,若不存在,则创建。" try: conn=POOL.connection() cur=conn.cursor() tables=cur.fetchmany(cur.execute(show_tables_sql)) if (TABLE_NAME,) not in tables: cur.execute(CREATE_TABLE_SQL) finally: cur.close() conn.close() def insert_tradeDate(): """ 向交易日数据表中插入数据。 如果数据库为空,则向数据库插入从起始日到当天的所有交易日数据 如果数据库不为空,则向数据库插入到当天为止数据库中缺失的所有交易日数据 返回:插入的记录数。 """ try: start_day=START_DAY conn=POOL.connection() cur=conn.cursor() count=cur.execute(SELECT_ALL_SQL) if count!=0: #数据库中存在交易日数据 max_day_db=cur.fetchmany(cur.execute(SELECT_MAX_DAY_SQL))[0][0] #得到数据库中交易日表中最近一条数据的day start_day_date=datetime.strptime(max_day_db,"%Y%m%d").date()+timedelta(days=1) start_day=start_day_date.strftime("%Y%m%d") #向交易日数据表中插入的起始交易日 dates=get_tradeDates(start_day) #默认截止日期为当天 if len(dates)>=1: insert_dates=[(x,x[:4],x[:6]) for x in dates if isinstance(x,str) and len(x)==8] #执行insert时的插入参数列表 cur.executemany(INSERT_SQL,insert_dates) #批量执行插入操作 conn.commit() return len(dates) else: return 0 except: conn.rollback() return 0 finally: cur.close() conn.close() def insert_test(): """ 日常测试。。。。。。 """ try: start_day=START_DAY conn=POOL.connection() cur=conn.cursor() count=cur.execute(SELECT_ALL_SQL) if count!=0: #数据库中存在交易日数据 max_day_db=cur.fetchmany(cur.execute(SELECT_MAX_DAY_SQL))[0][0] #得到中交易日表中最近一条数据的day start_day_date=datetime.strptime(max_day_db,"%Y%m%d").date()+timedelta(days=1) start_day=start_day_date.strftime("%Y%m%d") #向交易日数据表中插入的起始交易日 start_day_date=datetime.strptime(start_day,"%Y%m%d").date() end_date=start_day_date+timedelta(days=10) end_day=end_date.strftime("%Y%m%d") #测试时,每次插入起始时间之后10天内的交易日 dates=get_tradeDates(start_day,end_day) if len(dates)>1: insert_dates=[(x,x[:4],x[:6]) for x in dates if isinstance(x,str) and len(x)==8] #执行insert时的插入参数列表 cur.executemany(INSERT_SQL,insert_dates) #批量执行插入操作 conn.commit() return len(dates) else: return 0 except: conn.rollback() return 0 finally: cur.close() conn.close() def select_between_days(start_day,end_day): """ 从数据库中查询day字段范围在[start_day,end_day]间的所有交易日字段,返回交易日列表 start_day:格式为:"20150701",类型为:str end_day:格式为:"20150702",类型为:str """ if not __is_validate_day(start_day, end_day): return [] return __select_days_between(SELECT_DAYS_BETWEEN_DAY_SQL, start_day, end_day) def select_between_years(start_year,end_year): """ 从数据库中查询year字段范围在[start_year,end_year]间的所有交易日字段,返回交易日列表 start_year:格式为:"2015",类型为:str end_year:格式为:"2015",类型为:str """ if not __is_validate_year(start_year, end_year): return [] return __select_days_between(SELECT_DAYS_BETWEEN_YEAR_SQL, start_year, end_year) def select_between_months(start_month,end_month): """ 从数据库中查询month字段范围在[start_month,end_month]间的所有交易日字段,返回交易日列表 start_month:格式为:"201501",类型为:str end_month:格式为:"201502",类型为:str """ if not __is_validate_month(start_month, end_month): return [] return __select_days_between(SELECT_DAYS_BETWEEN_MONTH_SQL, start_month, end_month) def get_tradeDates(start_date,end_date=date.today().strftime("%Y%m%d")): """ 计算指定[start_date,end_date]区间内的所有交易日。 start_date:格式为:"19900101",类型为str end_date:格式为:"19920101",类型为str。 默认为当天 """ try: result=[] if not __is_validate_day(start_date, end_date): return result start=datetime.strptime(start_date,"%Y%m%d").date() end=datetime.strptime(end_date,"%Y%m%d").date() tmp=start while tmp<=end: if tmp.isoweekday()!=6 and tmp.isoweekday()!=7: result.append(tmp.strftime("%Y%m%d")) tmp=tmp+timedelta(days=1) return result except: return result def __select_days_between(sql,start,end): "根据传入的sql和start,end参数执行查询,并返回满足条件的交易日day列表" try: conn=POOL.connection() cur=conn.cursor() dates=cur.fetchmany(cur.execute(sql,(start,end))) return [x[0] for x in dates] except: return [] finally: cur.close() conn.close() def __is_validate_day(start,end): "验证start与end 日期是否为有效 yyyymmdd 格式的字符串" try: datetime.strptime(start,"%Y%m%d") datetime.strptime(end,"%Y%m%d") return True except: return False def __is_validate_year(start,end): "验证start与end 年份是否为有效 yyyy 格式的字符串" try: datetime.strptime(start,"%Y") datetime.strptime(end,"%Y") return True except: return False def __is_validate_month(start,end): "验证start与end 年份是否为有效 yyyymm 格式的字符串" try: datetime.strptime(start,"%Y%m") datetime.strptime(end,"%Y%m") return True except: return False class TimedInsertTask(threading.Thread): """ 向数据库插入交易日期的每天定时任务线程。 线程初始化init时:读取配置文件,初始化模块所用sql,初始化数据库连接池。 动态判断并进行创建表操作 线程刚运行时,执行数据库插入操作。 休眠到第二天的指定时间,执行插入操作。之后每隔一天执行插入。 达到每天定时执行任务的效果。 """ def __init__(self): init() create_table() threading.Thread.__init__(self) def run(self): insert_tradeDate() seconds=get_first_sleep_seconds(HOUR, MINUTE, SECOND) time.sleep(seconds) insert_tradeDate() while True: time.sleep(24*60*60) insert_tradeDate() def get_first_sleep_seconds(hour,minute,second): "获得系统当前时间到第二天指定任务执行时间所需等待的秒数。" now_date=datetime.now() dest_date=now_date.replace(hour=hour,minute=minute,second=second)+timedelta(days=1) delta=dest_date-now_date result=delta.total_seconds() return result "默认启动向数据库插入交易日数据的线程。提供了该模块操作数据库所需的数据库连接池" TimedInsertTask().start() if __name__ == "__main__": "测试代码---" for x in range(100): print "---------------------【x=",x,"】---------------------" print "数据库中查询2013年和2014年间的交易日为:%d 天" %len(select_between_years("2013", "2014")) print "数据库中查询20130101到20140203交易日有 %d 天" %len(select_between_days("20130101", "20140203")) print "数据库中查询201401 到201505月交易日个数为: %d 天" %len(select_between_months("201401", "201505")) time.sleep(10)
配置文件:
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
[db] host=127.0.0.1 port=3306 user=root passwd=123456 db=strategy max_connection=20 table_name=test5 [time] #format:yyyymmdd start_trade_date=20130101 [task] #hour:[0-23] minute:[0:59] second:[0:59] cycle:xxx/s eg:60 hour=12 minute=02 second=20
别的模块测试操作:
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
#coding=utf-8 import trade_date.TradeDate as td if __name__ == "__main__": print td.select_between_days("20150701", "20150806")
代码层级: