zoukankan      html  css  js  c++  java
  • python之便携式mysql类和tornado mysql长链接

    mymysql.py

    class MyMysql2(object):
        def __init__(self,
                        host   = '',
                        user   = '',
                        passwd = '',
                        db     = '',
                        port   = 3306,
                        charset= 'utf8'):
            self.host   = host
            self.user   = user
            self.passwd = passwd
            self.db     = db
            self.port   = port
            self.charset= charset
            self.conn   = None
            self.connet()
    
    
        def connet(self):
            try:
                self.conn = pymysql.connect(host=self.host,
                                            user=self.user,
                                            passwd=self.passwd,
                                            port=int(self.port) ,
                                            database=self.db,
                                            charset=self.charset,
                                            cursorclass = pymysql.cursors.DictCursor)
                return True
            except Exception as e:
                print(e)
                return False
    
    
        def _reConn (self,num = 2,stime = 1):
            _number = 0
            _status = True
            while _status and _number <= num:
                try:
                    self.conn.ping()       #cping 校验连接是否异常
                    _status = False
                except:
                    if self.connet()==True: #重新连接,成功退出
                        _status = False
                        break
                    _number +=1
                    time.sleep(stime)      #连接不成功,休眠3秒钟,继续循环,知道成功或重试次数结束
            if _status == True:
                return (False,'数据库连接失败')
            else:
                return (True,'数据库连接成功')
        def query (self, sql_list):
            try:
                ret=self._reConn()
                if ret[0] == False:
                    return ret
                self.cur = self.conn.cursor()
                for sql_str in sql_list:
                    self.count_nb=self.cur.execute(sql_str)
                    self.result = self.cur.fetchall()
                self.conn.commit()
                self.cur.close ()
                self.conn.close()
                return (True,self.result,self.count_nb)
            except Exception as e:
                return (False,[e.args[1]])
    
        def close (self):
    
            self.conn.close()

    调用:

    myconn = MyMysql2('1.1.1.1', 'user', 'password', 'database', 3308)
    ret=myconn.query(['select * from user'])
    

      

    tornado之mysql长链接

    import tornado.ioloop
    import tornado.web
    import requests
    import json
    import os
    import time
    from tornado import httpserver
    import tornado.options
    from tornado.options import options , define
    from datetime import datetime
    import pymysql
    from pymysql.cursors import DictCursor as DicCur
    
    
    def myget(url):
        ret = requests.get(url=url)
        return ret.json()
    
    def mypost(url,data):
        data=json.dumps(data)
        ret = requests.post(url, data=data)
        return ret.json()
    
    
    #基本handler
    class BaseHandler(tornado.web.RequestHandler):
        def get_user_ip(self):
            if 'X-Real-Ip' in dict(self.request.headers):
                user_ip=dict(self.request.headers)['X-Real-Ip']
            elif 'X-Forwarded-For' in dict(self.request.headers):
                user_ip = dict(self.request.headers)['X-Forwarded-For']
            else:
                user_ip=self.request.remote_ip
            return user_ip
        # 记录日志:
        def on_finish(self):
            method = self.request.method
            host = self.request.host
            remote_ip = self.get_user_ip()
            uri = self.request.uri
            version = self.request.version
            time_ = datetime.now().strftime("%Y-%m-%d_%H:%M:%S")
            data_dic = self.request.arguments
            msg = {}
            for k, v in data_dic.items():
                msg[k] = v[0].decode().strip()
            if 'password' in msg:
                msg['password'] = "***"
            if method == "GET":
                ret_msg = {
                    'date_time': time_,
                    'request_url': str(uri).split('?')[0],
                    'method': method,
                    'host': host,
                    'remote_ip': remote_ip,
                    'version': version,
                    'data_msg': msg,
                }
            else:
                ret_msg = {
                    'date_time': time_,
                    'request_url': uri,
                    'method': method,
                    'host': host,
                    'remote_ip': remote_ip,
                    'version': version,
                    'data_msg': msg,
                }
            log_str='%s  %s  %s  %s  %s'%(ret_msg['date_time'],ret_msg['method'],ret_msg['version'],ret_msg['host'],ret_msg['request_url'])
            applog_file = os.path.join(os.path.dirname(__file__), 'access.log')
            with open(applog_file, 'a+') as f:
                f.write("%s
    " % log_str)
    
        def get_mysql_conn(self):
             conn_count=1
             for i in range(5):
                 try:
                     if not self.application.mysql_conn:
                         pymysql_config = self.application.pymysql_config
                         #print(pymysql_config)
                         self.application.mysql_conn=pymysql.connect(**pymysql_config)
                     else:
                         self.application.mysql_conn.ping()
                     return (True,self.application.mysql_conn)
                 except Exception as e:
                     #print(e)
                     time.sleep(2)
                     conn_count+=1
                     print("mysql conn retyr: %s" % conn_count)
                     if conn_count == 6:
                         return (False,str(e))
    
        def do_mysql_query(self,query_list):
            '''
                注意:
                   query_list可以传入多个sql一起执行
                   query_list期望格式: [[sql1,args_list1],[sql2,args_list2]]
                本方法的return有两种情况:
                   (1) (True,result_last,rows_last)
                       result_last和rows_last,只能反映最后一个语句的返回情况(如果所有语句都没出错的话)
                   (2) (False,str(e),0)
                       如果有任意一条语句出错,返回的就是第一条出错的语句的执行结果;
                       如果有任意一条语句出错,所有语句的执行都不会成功;
            '''
            cur_conn=self.get_mysql_conn()
            if not cur_conn[0]:
                return (False,cur_conn[1],0)
            try:
                cur_conn=cur_conn[1]
                with cur_conn.cursor() as cur:
                    cur.execute("SET NAMES utf8mb4")
                    cur.execute("SET AUTOCOMMIT = 0")
                    for sql_argslist in query_list:
                        cur.execute(sql_argslist[0],sql_argslist[1])
                    cur_conn.commit()
                    rows_last = cur.rowcount
                    result_last = cur.fetchall()
                    if not result_last:
                        result_last=[]
                    return (True,result_last,rows_last)
                    ########################################
            except Exception as e:
                cur_conn.rollback()
                if cur: cur.close()
                return (False,str(e),0)
    
    #定义一个类继承application
    class MyApplication(tornado.web.Application):
        def __init__(self):
            pymysql_config = {}
            pymysql_config['port'] = 3306
            pymysql_config['host'] = '1.1.1.1'
            pymysql_config['user'] = 'user'
            pymysql_config['password'] = 'pass'
            pymysql_config['db'] = 'user'
            pymysql_config['charset'] = 'utf8mb4'
            pymysql_config['cursorclass'] = DicCur
    
            self.pymysql_config=pymysql_config
            self.mysql_conn = False
            tornado.web.Application.__init__(self, handlers=handlerSettings)
            #可以添加debug=debug,xsrf_cookies=xsrf_cookies,**settings
    
    
    class avatarUpdataHandler(BaseHandler):
        def get(self, *args, **kwargs):
            ret=self.do_mysql_query([['select * from mysql;',()]])
            print(ret)
            self.write('hellow word')
    
    
    
    if __name__ == "__main__":
    
        define("port", default=8899,help="port 8899", type=int)
    
        handlerSettings=[
                (r"/avatar", avatarUpdataHandler),
            ]
        app = MyApplication()
    
        http_server = tornado.httpserver.HTTPServer(app)
        http_server.listen(options.port)
        # http_server.start(5)
        http_server.start(1)
        tornado.ioloop.IOLoop.instance().start()
    

      

  • 相关阅读:
    栅格数据中加入可见水印
    DWT在栅格数据嵌入不可见水印的应用
    栅格数据嵌入不可见水印的流程
    栅格数据嵌入不可见水印的方法总结
    QIM量化
    哈希函数(hash函数)
    IDEA——IDEA使用Tomcat服务器出现乱码问题
    Quartz学习——SSMM(Spring+SpringMVC+Mybatis+Mysql)和Quartz集成详解(四)
    Quartz学习——Spring和Quartz集成详解(三)
    Quartz学习——Quartz简单入门Demo(二)
  • 原文地址:https://www.cnblogs.com/zhangkui/p/11102741.html
Copyright © 2011-2022 走看看