zoukankan      html  css  js  c++  java
  • 开始做股票数据服务器

    一.数据采集接口

      1.tushare

      2.sina

      3.通联

    二.数据采集模块:

      为了保证采集到上层的数据可信,完整;定义两个类:

        数据包:数据包为一次网络采集的数据片段,数据包封装各种接口,获取数据。数据包能够提供一个唯一的标识,以提供数据集识别;且数据包能够检测自身判断自身是否完成。还可以提供end标志,接收到该数据包表示整个数据集的完成。

        数据集:为一系列的多个数据包,数据包在时间生成上可能不连续,可以持续很长时间,因此,数据集必须能够识别数据包片段,避免数据包的重复,而且数据集能够持久化(关键信息放入文件或者数据库中),可供以后恢复。数据集能够判断整个集合是否传输完成。数据集向上提供数据的生成器,或者保存到数据库。   

      

    class myque(deque):
        def put(self,element):
            self.append(element)
        def get(self):
            return self.pop()
    class WebDataFrameBasic():
        '''
        数据包,一个web数据的片段
        '''
        def __init__(self,query_info={}):
            '''
            '''
            self.query_info=query_info
    
        def start(self,queue=None):
            '''
            开始获取数据,数据存放到queue中
            queue中item的格式:{state:,label:,data:}
            label:unique
            state:end.finish,fail
            :return:
            '''
    
        def unique(self):
            '''
            唯一标志该数据片段的关键字:数据集通过查询该片段保证是否已经获得,是重复片段
            '''
        def _is_finished(self):
            '''
            该片段是否成功传输
            '''
    
        def _is_end(self):
            '''
            是否是在后一个数据片段
            '''
    
    class WebDataSeqBasic():
        '''
        数据集,或者数据流
        '''
        STATE_TYPE={
            "START":0,
            "RUNNING":1,
            "FINISH":2,
            "END":3
        }
        def __init__(self,thread_num=1,info={},id=None):
            '''
            如果id为空,则自动创建一个数据集的id,并保存为一个记录
            如果不为空,则在数据库或者文件中查询看是否有相应的记录,如果有则获取信息,创建实例
            :param name:数据集的名称
            :param id:唯一标志一个数据集
            :return:
            '''
    
            if thread_num>1:#开启多线程模式
    
                self.thread_mode=True
                self.pool=ThreadPoolExecutor(thread_num)
                self.queue=Queue()
    
            else:#单线程模式
    
                self.thread_mode=False
                self.queue=myque()
    
            if id:
                self._load(id)
            else:
                max_id=WebDataSeqBasic._load_max_id()
                self.id=max_id+1
                self._save(self.id)
                self.frame_state={} #该字典中填写数据片段的 STATE_TYPE
                self.frame_try={}#该数据段的尝试次数
                self.end_flags=False #当收到的数据片段标记_is_end为True时候会为真
                self.info=info
    
        def _load(self,id):
            '''
            从外部载入数据集
            :param id:
            :return:
            '''
    
        def _save(self,id):
            '''
            将该数据集保存到外部
            :param id:
            :return:
            '''
    
        @classmethod
        def _load_max_id(cls):
            return 0
    
        def is_finished(self):
            '''
            是否完成
            :return;
            '''
            finish=[WebDataSeqBasic.STATE_TYPE["FINISH"],WebDataSeqBasic.STATE_TYPE["END"]]
            if self.end_flags and all([item in finish for item in self.frame_state.values()]):
                return True
            else:
                return False
    
        def task_gen(self):
            '''
            获取下一个任务的WebDataFrame实例
            :return:
            '''
            raise NotImplementedError
    
        def transport(self):
            '''
            传输
            :return:
            '''
            for webdata in self.task_gen():
                label=webdata.unique()
                self.frame_state[label]=WebDataSeqBasic.STATE_TYPE["RUNNING"]
                if label not in self.frame_try:
                    self.frame_try[label]=1
                else:
                    self.frame_try[label]+=1
                if self.thread_mode:
                    self.pool.submit(webdata.start,self.queue)
                else:
                    webdata.start(self.queue)
            while 1:
                self.collect()
                if self.is_finished():
                    break
        def collect(self):
            '''
            从queue中获取信息,并更新状态
            :return:
            '''
            item=self.queue.pop()
            if item["state"]=="end":#end状态表示这是最后一帧,且完成了该帧
                self.end_flags=True
            if item["state"]in ["finish","end"]:
                self.frame_state[item["label"]]=WebDataSeqBasic.STATE_TYPE["FINISH"]
                self.handle_data(item["data"])
            else:#
                self.frame_state[item["label"]]=WebDataSeqBasic.STATE_TYPE["START"]
    
        def handle_data(self,data):
            raise NotImplementedError
    

    程序分为收集数据模块,格式转换模块(dataframe转为nametuple,json与字典互转等),etl数据存储模块,db(sqlachemy,django),db查询模块,计算模块(主要采集dataframe数据作为输入),以及REST的接口。

       dataframe转为nametuple,方便将记录存储

    from collections import namedtuple
    from numpy import  float64
    def numpy2type(data):
        '''
        将numpy格式的data转为普通的python格式
        :return:
        '''
        import numpy
        if "numpy" in str(type(data)):
            if issubclass(type(data),float):
                return float(data)
            elif issubclass(type(data),int):
                return int(data)
        else:
            return data
    
    def df2tuple_gen(df,table_name,map={}):
        '''
        dataframe转为nametuple的生成器
        map为记录的列名,df的列名的对应关系
        '''
        record=namedtuple(table_name,map.keys())
        assert set(map.values())-{'index'} < set(df.columns)
        rows,columns=df.shape
        for row in range(rows):
            info={}
            for key in map:
                if map[key]=="index":
                    info[key]=numpy2type(df.index[row])
                    continue
                info[key]=numpy2type(df.iloc[row][map[key]])
            yield record(**info)
    

      针对tushare创建一个特别数据处理集

    class TushareBase(WebDataSeqBasic):
        '''
        仅仅只有单个请求的接口
        通过tushare接口来操作数据
        '''
        def __init__(self,data_cls,thread_num=1,info={},id=None):
            super(TushareBase,self).__init__(thread_num,info,id)
            self.data_cls=data_cls
    
        def task_gen(self):
            yield self.data_cls(self.info)
    
        def handle_data(self,data):
            '''
            调用数据库接口,存储数据
            :param data:
            :return:
            '''
            op=BaseOperate()
            for record in data:
                op.createfrtuple(record)
    

      例子,获取股票数据,并保存

    class StockData(WebDataFrameBasic):
        def hanlde_data(self,data_gen):
            '''
            对网络数据进行特别清洗
            :param data_gen:数据生成器
            :return:
            '''
            for record in data_gen:
                date=str(record.timeToMarket)
                yield record._replace(timeToMarket=date)
        def start(self,queue=None):
            df=ts.get_stock_basics()
            from etl.df_trans import df2tuple_gen
            data=self.hanlde_data(df2tuple_gen(df,"Stock",
                              {'code':'index','name':'name','industry':'industry',
                               'area':'area','outstanding':'outstanding','totals':'totals',
                               'timeToMarket':'timeToMarket'}))
            queue.put({"state":"end","data":data,"label":self.unique()})
        def unique(self):
            return "get_stock_basics"
    

    二.使用数据库:postgresql

      ubuntu本身集成了postgresql,所以只需要安装客户端

    $apt-get install postgresql

      创建postgresql的实例:

        -d:实例存储目录  

        -e:采取编码

    $pg_createcluster 9.3 xxx -u postgres -d /val/postgresql/xxx -e utf8     ###xxx为实例名字

      出现错误:The locale requested by the environment is invalid.

      修改环境变量,重启,(实验过放在/etc/profile,或者.profile 里面,无效果)

    $echo "LC_ALL=en_US.UTF-8"  >> /etc/environment

      我创建的实例名称不是postgres,  使用psql登录会出现 Peer authentication failed for user "postgres"

      修改/etc/postgresql/版本号/实例名/pg_hba.conf

    $vi /etc/postgresql/9.3/xxx/pg_hba.conf

      其中peer为使用系统的用户名,密码登录方式

      md5是使用用户名,密码的登录方式

      trust,信任登录方式,无密码

      postgresql常用命令:查看详细情形,http://www.postgres.cn/docs/9.3/

    postgres=# l    ###显示数据库
    
    postgres=# q    ###退出
    
    postgres=# c  数据库名    ###转到另一个数据库
    
    postgres=# dt      ###查看表
    
    postgres=# d      ###查看表
    
    postgres=# di      ###查看表索引

    postgres=# du   ###查看用户

      远程登录postgresql服务器

        a.使用vim 修改pg_hba.conf文件,添加一行:

          host  all  all  0.0.0.0/24  md5

         其中0.0.0.0/24可以更为内网段

        b.再修改postgresql.conf,将注释listen_addresses = '*' 除掉

    三.数据库操作

      1.sqlachemy:如果需要数据支持更好性能,可以考虑添加sqlachemy的ORM层接口

      2.优化数据库:

        a.大量插入一次提交:使用装饰器,对于func函数的操作进行记录,只有达到一定次数后才会进行flush

        def flush_wrapper(func=None):
            '''
            func函数被调用length次数时候,才会进行flush
            :param func:
            :return:
            '''
            def wrap(self,*arg,**kwargs):
                try:
                    func(self,*arg,**kwargs)
                    if self.count==self.length:
                        self.session.flush()
                        self.count=0
                    else:
                        self.count+=1
                except Exception as e:
                    self.session.close()
                    LOG.error(str(e))
                    raise e
            return wrap
    

        b.cache的使用:由于数据重复插入可能性存在,所以需要每次插入都需要查询数据库,为了节省时间,可以一次性将原来数据读入到内存当中。

          实现思想:

            建立cache_keys:仅仅记录数据库记录的关键字字段(该字段必须能够唯一定位一条记录),caches_keys包括所有记录的关键字

            建立cache:cache里面加载了全部或者部分数据库中的相关对象的记录。

            部分cache的策略:

              1.如果外部数据的顺序与数据库中的某种顺序一致的话,可以进行预读取部分数据的策略,比如读取100条,当没有命中时候,则按照顺序将选择框,选择当前记录以及往下的顺序100条。

            更新策略:n/m(新记录,所有记录)      

              如果不使用cache:则进行了m次的查询,以及后面的n次插入,以及多次更新。

              不重复创建:该函数读入外部数据,如果外部数据在cache_keys中找到相应值,就不予更新,继续读取下条记录了;如果没有找到,就创建记录;花费时间:进行了一次所有记录读取+进行了n次insert动作

              不重复创建或者更新:该函数读入外部数据,如果外部数据在cache_keys中找到相应值,则使用当前数据更新数据库。如果没有找到则创建记录;花费时间:进行了m-n次的更新,n次插入;        

        def init_cache(self,key=None,maxlen=None):
            '''
            初始化数据库缓存
            key必须是model的unique非空字段
            :return:
            '''
            self.key=key
            self.cache_keys={}#存放缓存的记录的唯一非空关键字,凭借该key可以找到并唯一找到一条记录,必须对象的全部keys
            if maxlen:
                self.cache=deque(maxlen=maxlen)#长度为maxlen的队列
            else:
                self.cache=deque()#长度无限的队列
            for index ,record in enumerate(self.query.all()):
                self.cache_keys[getattr(record,'code')]=weakref.ref(record)
                if not maxlen or index <maxlen:
                    self.cache.append(record)
    
        def is_hit(self,record):
            '''
            是否在keys中查到值
            :return:
            '''
            key_value=getattr(record,self.key)
            if key_value in self.cache_keys:
                return self.cache_keys[key_value]
            else:#当没有该记录则将该记录添加到缓存当中,并返回False
                self.cache_keys[key_value]=weakref.ref(record)
                self.cache.append(record)
                return False
    

    四.web框架  

      1.django:进行快速开发

  • 相关阅读:
    广域网(ppp协议、HDLC协议)
    0120. Triangle (M)
    0589. N-ary Tree Preorder Traversal (E)
    0377. Combination Sum IV (M)
    1074. Number of Submatrices That Sum to Target (H)
    1209. Remove All Adjacent Duplicates in String II (M)
    0509. Fibonacci Number (E)
    0086. Partition List (M)
    0667. Beautiful Arrangement II (M)
    1302. Deepest Leaves Sum (M)
  • 原文地址:https://www.cnblogs.com/yasmi/p/5238762.html
Copyright © 2011-2022 走看看