# coding=utf-8 import pymysql import os import configparser """ /* @:param: python version:3.7 @:param: author Chen @:param: Date: 2019.01.19 @:param: @env: beta alpha @db_name: see_db,ec_db */ """ class EnvConfig(object): def __init__(self,env,db_name): config_dir = os.path.split(os.path.realpath(__file__))[0] config_path = os.path.join(config_dir, "config.ini") config = configparser.ConfigParser() config.read(config_path) self.ip = config.get(env,'ip') self.user = config.get(env,'user') self.pwd =config.get(env,'pwd') self.db_name = config.get(env,db_name) self.port = int(config.get(env,'port')) """ /* cases: t_job=GetConnectDb("see_db","beta") */ """ class GetConnectDb(EnvConfig): def __init__(self,env,db_name): EnvConfig.__init__(self,env,db_name) ip=self.ip port=self.port user=self.user pwd=self.pwd db=self.db_name self.connect=pymysql.connect(ip,port,user,pwd,db,charset='utf8') self.cursor=self.connect.cursor() try: if self.cursor: pass except ConnectionError as e: print(e) """ /* @:param: sql Usage:select * xx t_job where pipelineId='6444df4555sa55dgd'; @:param: rows type is int ,get the first rows of data */ """ def disconnect_db(self): try: self.cursor.close() self.connect.close() except ConnectionAbortedError as e: try: self.disconnect_db() except ConnectionAbortedError as e: print(e) print(e) def exe_select(self,sql,rows=None): try: if rows == None: try: self.cursor.execute(sql) res=self.cursor.fetchall() return res except SyntaxError as e: print(e) finally: self.disconnect_db() else: try: self.cursor.execute(sql) res=self.cursor.fetchmany(rows) return res except SyntaxError as e: print(e) finally: self.disconnect_db() except Exception as e: print(e) # /* # @:param: # Usage: # when is self.execute(sql) Usage: # sql = insert into t_job(column1,column2,column3)values(a,b,c) # array=None # when is self.executemany(sql,array) Usage: # sql = insert into t_job(id,user,date)values(%s,%s,%s) # array = [(1001,"dev",'2018-11-25-11-12-43'),(1002,"test",'2018-11-25-11-12-44')] # @:param: rows type is int ,get the first rows of data # */ def exe_insert(self,sql,array=None): try: if array is None: try: self.cursor.execute(sql) self.connect.commit() except SyntaxError as e: self.connect.rollback() print(e) finally: self.disconnect_db() else: try: self.cursor.executemany(sql,array) self.connect.commit() except SyntaxError as e: self.connect.rollback() print(e) finally: self.disconnect_db() except Exception as e: print(e) """ /* update sql : @:param: sql */ """ def update_opt(self,sql): try: self.cursor.execute(sql) self.connect.commit() except Exception as e: self.connect.rollback() print(e) finally: self.disconnect_db() def delete_opt(self,sql): try: self.cursor.execute(sql) self.connect.commit() except SyntaxError as e: self.connect.rollback() print(e) finally: self.disconnect_db()