mymysql.py
class MyMysql2(object): def __init__(self, host = '', user = '', passwd = '', db = '', port = 3306, charset= 'utf8'): self.host = host self.user = user self.passwd = passwd self.db = db self.port = port self.charset= charset self.conn = None self.connet() def connet(self): try: self.conn = pymysql.connect(host=self.host, user=self.user, passwd=self.passwd, port=int(self.port) , database=self.db, charset=self.charset, cursorclass = pymysql.cursors.DictCursor) return True except Exception as e: print(e) return False def _reConn (self,num = 2,stime = 1): _number = 0 _status = True while _status and _number <= num: try: self.conn.ping() #cping 校验连接是否异常 _status = False except: if self.connet()==True: #重新连接,成功退出 _status = False break _number +=1 time.sleep(stime) #连接不成功,休眠3秒钟,继续循环,知道成功或重试次数结束 if _status == True: return (False,'数据库连接失败') else: return (True,'数据库连接成功') def query (self, sql_list): try: ret=self._reConn() if ret[0] == False: return ret self.cur = self.conn.cursor() for sql_str in sql_list: self.count_nb=self.cur.execute(sql_str) self.result = self.cur.fetchall() self.conn.commit() self.cur.close () self.conn.close() return (True,self.result,self.count_nb) except Exception as e: return (False,[e.args[1]]) def close (self): self.conn.close()
调用:
myconn = MyMysql2('1.1.1.1', 'user', 'password', 'database', 3308) ret=myconn.query(['select * from user'])
tornado之mysql长链接
import tornado.ioloop import tornado.web import requests import json import os import time from tornado import httpserver import tornado.options from tornado.options import options , define from datetime import datetime import pymysql from pymysql.cursors import DictCursor as DicCur def myget(url): ret = requests.get(url=url) return ret.json() def mypost(url,data): data=json.dumps(data) ret = requests.post(url, data=data) return ret.json() #基本handler class BaseHandler(tornado.web.RequestHandler): def get_user_ip(self): if 'X-Real-Ip' in dict(self.request.headers): user_ip=dict(self.request.headers)['X-Real-Ip'] elif 'X-Forwarded-For' in dict(self.request.headers): user_ip = dict(self.request.headers)['X-Forwarded-For'] else: user_ip=self.request.remote_ip return user_ip # 记录日志: def on_finish(self): method = self.request.method host = self.request.host remote_ip = self.get_user_ip() uri = self.request.uri version = self.request.version time_ = datetime.now().strftime("%Y-%m-%d_%H:%M:%S") data_dic = self.request.arguments msg = {} for k, v in data_dic.items(): msg[k] = v[0].decode().strip() if 'password' in msg: msg['password'] = "***" if method == "GET": ret_msg = { 'date_time': time_, 'request_url': str(uri).split('?')[0], 'method': method, 'host': host, 'remote_ip': remote_ip, 'version': version, 'data_msg': msg, } else: ret_msg = { 'date_time': time_, 'request_url': uri, 'method': method, 'host': host, 'remote_ip': remote_ip, 'version': version, 'data_msg': msg, } log_str='%s %s %s %s %s'%(ret_msg['date_time'],ret_msg['method'],ret_msg['version'],ret_msg['host'],ret_msg['request_url']) applog_file = os.path.join(os.path.dirname(__file__), 'access.log') with open(applog_file, 'a+') as f: f.write("%s " % log_str) def get_mysql_conn(self): conn_count=1 for i in range(5): try: if not self.application.mysql_conn: pymysql_config = self.application.pymysql_config #print(pymysql_config) self.application.mysql_conn=pymysql.connect(**pymysql_config) else: self.application.mysql_conn.ping() return (True,self.application.mysql_conn) except Exception as e: #print(e) time.sleep(2) conn_count+=1 print("mysql conn retyr: %s" % conn_count) if conn_count == 6: return (False,str(e)) def do_mysql_query(self,query_list): ''' 注意: query_list可以传入多个sql一起执行 query_list期望格式: [[sql1,args_list1],[sql2,args_list2]] 本方法的return有两种情况: (1) (True,result_last,rows_last) result_last和rows_last,只能反映最后一个语句的返回情况(如果所有语句都没出错的话) (2) (False,str(e),0) 如果有任意一条语句出错,返回的就是第一条出错的语句的执行结果; 如果有任意一条语句出错,所有语句的执行都不会成功; ''' cur_conn=self.get_mysql_conn() if not cur_conn[0]: return (False,cur_conn[1],0) try: cur_conn=cur_conn[1] with cur_conn.cursor() as cur: cur.execute("SET NAMES utf8mb4") cur.execute("SET AUTOCOMMIT = 0") for sql_argslist in query_list: cur.execute(sql_argslist[0],sql_argslist[1]) cur_conn.commit() rows_last = cur.rowcount result_last = cur.fetchall() if not result_last: result_last=[] return (True,result_last,rows_last) ######################################## except Exception as e: cur_conn.rollback() if cur: cur.close() return (False,str(e),0) #定义一个类继承application class MyApplication(tornado.web.Application): def __init__(self): pymysql_config = {} pymysql_config['port'] = 3306 pymysql_config['host'] = '1.1.1.1' pymysql_config['user'] = 'user' pymysql_config['password'] = 'pass' pymysql_config['db'] = 'user' pymysql_config['charset'] = 'utf8mb4' pymysql_config['cursorclass'] = DicCur self.pymysql_config=pymysql_config self.mysql_conn = False tornado.web.Application.__init__(self, handlers=handlerSettings) #可以添加debug=debug,xsrf_cookies=xsrf_cookies,**settings class avatarUpdataHandler(BaseHandler): def get(self, *args, **kwargs): ret=self.do_mysql_query([['select * from mysql;',()]]) print(ret) self.write('hellow word') if __name__ == "__main__": define("port", default=8899,help="port 8899", type=int) handlerSettings=[ (r"/avatar", avatarUpdataHandler), ] app = MyApplication() http_server = tornado.httpserver.HTTPServer(app) http_server.listen(options.port) # http_server.start(5) http_server.start(1) tornado.ioloop.IOLoop.instance().start()