zoukankan      html  css  js  c++  java
  • 基于tornado---异步并发接口

    1、目的

      由于有多个程序和脚本需要对mysql进行读写数据库,每次在脚本中进行数据库的连接、用cursor进行操作过于麻烦,因此希望可以有一个脚本开放接口,只需要传入sql语句,就可以返回结果回来。因此有需要一个可以支持并发量较大的脚本来进行数据库操作。以上就要求我的接口具有异步非阻塞、在结果返回前保持长连接、并发大。因此单纯的多线程和协程没办法满足要求,就需要用到tornado框架。

    2、程序

        1)客户端代码(通过requests 调用接口)

    import reuqests
    POST = requests.post # post请求方式
    def db_query(sql, method='query'):
        db_api = 'http://127.0.0.1:8000/db/api'
        db_base = "dbbase"
        db_ret = POST(db_api, data=json_encode({'method': method, 'sql': sql, 'dbbase': db_base, 'pwd': 'password123'}))
        if db_ret.status_code == 200:
            if json_decode(db_ret.text)['status'] == 'True':
                db_ret_data = json_decode(db_ret.text)['data']
                return {'status': 'ok', 'data': db_ret_data, 'err': ''}
            else:
                return {'status': 'False', 'data': [], 'err': json_decode(db_ret.text)['data']}
        else:
            return {'status': 'False', 'data': [], 'err': 'connect error'}

     2)服务端代码(基于tornado框架)

    # coding=utf8
    import MySQLdb
    import MySQLdb.cursors
    import tornado
    import tornado.ioloop
    import tornado.web
    import tornado.gen
    from tornado.concurrent import run_on_executor
    from concurrent.futures import ThreadPoolExecutor
    
    
    class DB_CONFIG:
        config = {
            'dbbase': ('authentic', 'password'), # 数据库user、password
        }
    
    
    class DB:
        def __init__(self, dbbase):
            user, password = DB_CONFIG.config.get(dbbase, (None, None))
            if user == None or password == None:
                raise Exception('KeyError', dbbase)
            db_host = '192.168.xx.xx'
            db_port = 1234
    
            self.db = MySQLdb.connect(db_host, user, password, dbbase, port=db_port, cursorclass=MySQLdb.cursors.DictCursor)
    
            self.cursor = self.db.cursor()
    
        def query(self, sql):
            self.cursor.execute(sql)
            return self.cursor.fetchall()
    
        def commit(self, sql):
            try:
                self.cursor.execute(sql)
                self.db.commit()
                return {'status': True, 'data': ''}
            except Exception as e:
                self.db.rollback()
                return {'status': False, 'data': e}
    
        def close(self):
            self.db.close()
    
    
    class ServiceHandler(tornado.web.RequestHandler):
        executor = ThreadPoolExecutor(900)  # 必须定义一个executor的属性,然后run_on_executor装饰器才会游泳。
    
        @run_on_executor  # 线程内运行;query函数被run_on_executor包裹(语法糖),将该函数的执行传递给线程池executor的线程执行,优化了处理耗时性任务,以致达到不阻塞主线程的效果。
        def query(self, dbname, method, sql):
            db = DB(dbname)
            ret = ''
            if method == 'query':
                ret = db.query(sql)
            elif method == 'commit':
                ret = db.commit(sql)
            db.close()
            return ret
    
        @tornado.web.asynchronous  # 保持长连接,直到处理后返回
        @tornado.gen.coroutine  # 异步、协程处理;增加并发量
        def post(self):
            data = tornado.escape.json_decode(self.request.body)  # 获取参数,json.loads()解码
            r = {'status': '', 'data': ''}
            if not data.get('pwd', None):
                r['status'], r['data'] = ('False', 'not password')
            elif not data.get('dbbase', None):
                r['status'], r['data'] = ('False', 'not DB select')
            else:
                if data['pwd'] != 'password123)': # 接口的密码认证
                    r = {'status': 'False', 'data': 'password error'}
                elif data['method'] == 'query':
    
                    d = yield self.query(data['dbbase'], 'query', data['sql'])
                    r = {'status': 'True', 'data': d}
                elif data['method'] == 'commit':
    
                    db_r = yield self.query(data['dbbase'], 'commit', data['sql'])
                    if db_r['status']:
                        r = {'status': 'True', 'data': 'commit sucessful'}
                    else:
                        r = {'status': 'False', 'data': db_r['data']}
                else:
                    r = {'status': 'False', 'data': 'method Invaild'}
            self.write(tornado.escape.json_encode(r))  # 写入返回信息写入response
            self.finish()  # 结束服务
    
        def get(self):
            return self.post()
    
    
    if __name__ == "__main__":
        application = tornado.web.Application([
            (r"/db/api", ServiceHandler),  # 路由映射
        ])
        application.listen(8000)  # 监听端口
        tornado.ioloop.IOLoop.instance().start()  # 启动服务

    3、请求举例(post请求)

      1) sql语句

     sql = 'select vid from video where status=1 group by vid ORDER BY num ASC limit 100'

       2)客户端调用db_query函数

    db_ret = db_query(sql, 'query') # 其中query是数据库操作的方法 query为查询/commit 为insert/update/delete等数据库修改操作时使用

      通过调用db_query传入参数sql语句和操作方式,返回结果或错误内容

  • 相关阅读:
    cinder支持nfs快照
    浏览器输入URL到返回页面的全过程
    按需制作最小的本地yum源
    创建可执行bin安装文件
    RPCVersionCapError: Requested message version, 4.17 is incompatible. It needs to be equal in major version and less than or equal in minor version as the specified version cap 4.11.
    惠普IPMI登陆不上
    Linux进程状态——top,ps中看到进程状态D,S,Z的含义
    openstack-neutron基本的网络类型以及分析
    openstack octavia的实现与分析(二)原理,架构与基本流程
    flask上下文流程图
  • 原文地址:https://www.cnblogs.com/kakawith/p/11232756.html
Copyright © 2011-2022 走看看