zoukankan      html  css  js  c++  java
  • Python-数据库开发

    1、驱动:

      MySQL基于TCP 协议之上开发,但是网络连接后,传输的数据必须遵循MySQL的协议。  

      封装好 MySQL协议的包,就是驱动程序

      MySQL的驱动:

      • MySQLdb:最有名的库,对MySQL的c client封装实现,只支持 Python2
      • MySQL官方Connector
      • pymysql:语法兼容MySQLdb,使用Python写的库,支持Python3

    2、pymysql使用

      安装:pip install  pymysql

      创建数据库和表:  

     1 CREATE  DATABASE IF NOT EXISTS school;
     2 SHOW  DATABASES ;
     3 USE school
     4 
     5 CREATE TABLE `students` (
     6   id int(10) NOT NULL  AUTO_INCREMENT,
     7   name VARCHAR(20) NOT NULL ,
     8   age INT(10) DEFAULT  NULL ,
     9   PRIMARY KEY (id)
    10 ) ENGINE =InnoDB DEFAULT  CHARSET = utf8mb4

      连接Connect:

        首先,必须建立一个传输数据通道------连接

        pymsql.connect() 方法返回的是Connections 模块下的Connection类实例,connect方法传参就是给Connection类的__init__ 提供参数。 

      1 class Connection(object):
      2     """
      3     Representation of a socket with a mysql server.
      4 
      5     The proper way to get an instance of this class is to call
      6     connect().
      7 
      8     Establish a connection to the MySQL database. Accepts several
      9     arguments:
     10 
     11     :param host: Host where the database server is located
     12     :param user: Username to log in as
     13     :param password: Password to use.
     14     :param database: Database to use, None to not use a particular one.
     15     :param port: MySQL port to use, default is usually OK. (default: 3306)
     16     :param bind_address: When the client has multiple network interfaces, specify
     17         the interface from which to connect to the host. Argument can be
     18         a hostname or an IP address.
     19     :param unix_socket: Optionally, you can use a unix socket rather than TCP/IP.
     20     :param read_timeout: The timeout for reading from the connection in seconds (default: None - no timeout)
     21     :param write_timeout: The timeout for writing to the connection in seconds (default: None - no timeout)
     22     :param charset: Charset you want to use.
     23     :param sql_mode: Default SQL_MODE to use.
     24     :param read_default_file:
     25         Specifies  my.cnf file to read these parameters from under the [client] section.
     26     :param conv:
     27         Conversion dictionary to use instead of the default one.
     28         This is used to provide custom marshalling and unmarshaling of types.
     29         See converters.
     30     :param use_unicode:
     31         Whether or not to default to unicode strings.
     32         This option defaults to true for Py3k.
     33     :param client_flag: Custom flags to send to MySQL. Find potential values in constants.CLIENT.
     34     :param cursorclass: Custom cursor class to use.
     35     :param init_command: Initial SQL statement to run when connection is established.
     36     :param connect_timeout: Timeout before throwing an exception when connecting.
     37         (default: 10, min: 1, max: 31536000)
     38     :param ssl:
     39         A dict of arguments similar to mysql_ssl_set()'s parameters.
     40         For now the capath and cipher arguments are not supported.
     41     :param read_default_group: Group to read from in the configuration file.
     42     :param compress: Not supported
     43     :param named_pipe: Not supported
     44     :param autocommit: Autocommit mode. None means use server default. (default: False)
     45     :param local_infile: Boolean to enable the use of LOAD DATA LOCAL command. (default: False)
     46     :param max_allowed_packet: Max size of packet sent to server in bytes. (default: 16MB)
     47         Only used to limit size of "LOAD LOCAL INFILE" data packet smaller than default (16KB).
     48     :param defer_connect: Don't explicitly connect on contruction - wait for connect call.
     49         (default: False)
     50     :param auth_plugin_map: A dict of plugin names to a class that processes that plugin.
     51         The class will take the Connection object as the argument to the constructor.
     52         The class needs an authenticate method taking an authentication packet as
     53         an argument.  For the dialog plugin, a prompt(echo, prompt) method can be used
     54         (if no authenticate method) for returning a string from the user. (experimental)
     55     :param server_public_key: SHA256 authenticaiton plugin public key value. (default: None)
     56     :param db: Alias for database. (for compatibility to MySQLdb)
     57     :param passwd: Alias for password. (for compatibility to MySQLdb)
     58     :param binary_prefix: Add _binary prefix on bytes and bytearray. (default: False)
     59 
     60     See `Connection <https://www.python.org/dev/peps/pep-0249/#connection-objects>`_ in the
     61     specification.
     62     """
     63 
     64     _sock = None
     65     _auth_plugin_name = ''
     66     _closed = False
     67     _secure = False
     68 
     69     def __init__(self, host=None, user=None, password="",
     70                  database=None, port=0, unix_socket=None,
     71                  charset='', sql_mode=None,
     72                  read_default_file=None, conv=None, use_unicode=None,
     73                  client_flag=0, cursorclass=Cursor, init_command=None,
     74                  connect_timeout=10, ssl=None, read_default_group=None,
     75                  compress=None, named_pipe=None,
     76                  autocommit=False, db=None, passwd=None, local_infile=False,
     77                  max_allowed_packet=16*1024*1024, defer_connect=False,
     78                  auth_plugin_map=None, read_timeout=None, write_timeout=None,
     79                  bind_address=None, binary_prefix=False, program_name=None,
     80                  server_public_key=None):
     81         if use_unicode is None and sys.version_info[0] > 2:
     82             use_unicode = True
     83 
     84         if db is not None and database is None:
     85             database = db
     86         if passwd is not None and not password:
     87             password = passwd
     88 
     89         if compress or named_pipe:
     90             raise NotImplementedError("compress and named_pipe arguments are not supported")
     91 
     92         self._local_infile = bool(local_infile)
     93         if self._local_infile:
     94             client_flag |= CLIENT.LOCAL_FILES
     95 
     96         if read_default_group and not read_default_file:
     97             if sys.platform.startswith("win"):
     98                 read_default_file = "c:\my.ini"
     99             else:
    100                 read_default_file = "/etc/my.cnf"
    101 
    102         if read_default_file:
    103             if not read_default_group:
    104                 read_default_group = "client"
    105 
    106             cfg = Parser()
    107             cfg.read(os.path.expanduser(read_default_file))
    108 
    109             def _config(key, arg):
    110                 if arg:
    111                     return arg
    112                 try:
    113                     return cfg.get(read_default_group, key)
    114                 except Exception:
    115                     return arg
    116 
    117             user = _config("user", user)
    118             password = _config("password", password)
    119             host = _config("host", host)
    120             database = _config("database", database)
    121             unix_socket = _config("socket", unix_socket)
    122             port = int(_config("port", port))
    123             bind_address = _config("bind-address", bind_address)
    124             charset = _config("default-character-set", charset)
    125             if not ssl:
    126                 ssl = {}
    127             if isinstance(ssl, dict):
    128                 for key in ["ca", "capath", "cert", "key", "cipher"]:
    129                     value = _config("ssl-" + key, ssl.get(key))
    130                     if value:
    131                         ssl[key] = value
    132 
    133         self.ssl = False
    134         if ssl:
    135             if not SSL_ENABLED:
    136                 raise NotImplementedError("ssl module not found")
    137             self.ssl = True
    138             client_flag |= CLIENT.SSL
    139             self.ctx = self._create_ssl_ctx(ssl)
    140 
    141         self.host = host or "localhost"
    142         self.port = port or 3306
    143         self.user = user or DEFAULT_USER
    144         self.password = password or b""
    145         if isinstance(self.password, text_type):
    146             self.password = self.password.encode('latin1')
    147         self.db = database
    148         self.unix_socket = unix_socket
    149         self.bind_address = bind_address
    150         if not (0 < connect_timeout <= 31536000):
    151             raise ValueError("connect_timeout should be >0 and <=31536000")
    152         self.connect_timeout = connect_timeout or None
    153         if read_timeout is not None and read_timeout <= 0:
    154             raise ValueError("read_timeout should be >= 0")
    155         self._read_timeout = read_timeout
    156         if write_timeout is not None and write_timeout <= 0:
    157             raise ValueError("write_timeout should be >= 0")
    158         self._write_timeout = write_timeout
    159         if charset:
    160             self.charset = charset
    161             self.use_unicode = True
    162         else:
    163             self.charset = DEFAULT_CHARSET
    164             self.use_unicode = False
    165 
    166         if use_unicode is not None:
    167             self.use_unicode = use_unicode
    168 
    169         self.encoding = charset_by_name(self.charset).encoding
    170 
    171         client_flag |= CLIENT.CAPABILITIES
    172         if self.db:
    173             client_flag |= CLIENT.CONNECT_WITH_DB
    174 
    175         self.client_flag = client_flag
    176 
    177         self.cursorclass = cursorclass
    178 
    179         self._result = None
    180         self._affected_rows = 0
    181         self.host_info = "Not connected"
    182 
    183         #: specified autocommit mode. None means use server default.
    184         self.autocommit_mode = autocommit
    185 
    186         if conv is None:
    187             conv = converters.conversions
    188 
    189         # Need for MySQLdb compatibility.
    190         self.encoders = dict([(k, v) for (k, v) in conv.items() if type(k) is not int])
    191         self.decoders = dict([(k, v) for (k, v) in conv.items() if type(k) is int])
    192         self.sql_mode = sql_mode
    193         self.init_command = init_command
    194         self.max_allowed_packet = max_allowed_packet
    195         self._auth_plugin_map = auth_plugin_map or {}
    196         self._binary_prefix = binary_prefix
    197         self.server_public_key = server_public_key
    198 
    199         self._connect_attrs = {
    200             '_client_name': 'pymysql',
    201             '_pid': str(os.getpid()),
    202             '_client_version': VERSION_STRING,
    203         }
    204         if program_name:
    205             self._connect_attrs["program_name"] = program_name
    206         elif sys.argv:
    207             self._connect_attrs["program_name"] = sys.argv[0]
    208 
    209         if defer_connect:
    210             self._sock = None
    211         else:
    212             self.connect()
    213 
    214     def _create_ssl_ctx(self, sslp):
    215         if isinstance(sslp, ssl.SSLContext):
    216             return sslp
    217         ca = sslp.get('ca')
    218         capath = sslp.get('capath')
    219         hasnoca = ca is None and capath is None
    220         ctx = ssl.create_default_context(cafile=ca, capath=capath)
    221         ctx.check_hostname = not hasnoca and sslp.get('check_hostname', True)
    222         ctx.verify_mode = ssl.CERT_NONE if hasnoca else ssl.CERT_REQUIRED
    223         if 'cert' in sslp:
    224             ctx.load_cert_chain(sslp['cert'], keyfile=sslp.get('key'))
    225         if 'cipher' in sslp:
    226             ctx.set_ciphers(sslp['cipher'])
    227         ctx.options |= ssl.OP_NO_SSLv2
    228         ctx.options |= ssl.OP_NO_SSLv3
    229         return ctx
    230 
    231     def close(self):
    232         """
    233         Send the quit message and close the socket.
    234 
    235         See `Connection.close() <https://www.python.org/dev/peps/pep-0249/#Connection.close>`_
    236         in the specification.
    237 
    238         :raise Error: If the connection is already closed.
    239         """
    240         if self._closed:
    241             raise err.Error("Already closed")
    242         self._closed = True
    243         if self._sock is None:
    244             return
    245         send_data = struct.pack('<iB', 1, COMMAND.COM_QUIT)
    246         try:
    247             self._write_bytes(send_data)
    248         except Exception:
    249             pass
    250         finally:
    251             self._force_close()
    252 
    253     @property
    254     def open(self):
    255         """Return True if the connection is open"""
    256         return self._sock is not None
    257 
    258     def _force_close(self):
    259         """Close connection without QUIT message"""
    260         if self._sock:
    261             try:
    262                 self._sock.close()
    263             except:  # noqa
    264                 pass
    265         self._sock = None
    266         self._rfile = None
    267 
    268     __del__ = _force_close
    269 
    270     def autocommit(self, value):
    271         self.autocommit_mode = bool(value)
    272         current = self.get_autocommit()
    273         if value != current:
    274             self._send_autocommit_mode()
    275 
    276     def get_autocommit(self):
    277         return bool(self.server_status &
    278                     SERVER_STATUS.SERVER_STATUS_AUTOCOMMIT)
    279 
    280     def _read_ok_packet(self):
    281         pkt = self._read_packet()
    282         if not pkt.is_ok_packet():
    283             raise err.OperationalError(2014, "Command Out of Sync")
    284         ok = OKPacketWrapper(pkt)
    285         self.server_status = ok.server_status
    286         return ok
    287 
    288     def _send_autocommit_mode(self):
    289         """Set whether or not to commit after every execute()"""
    290         self._execute_command(COMMAND.COM_QUERY, "SET AUTOCOMMIT = %s" %
    291                               self.escape(self.autocommit_mode))
    292         self._read_ok_packet()
    293 
    294     def begin(self):
    295         """Begin transaction."""
    296         self._execute_command(COMMAND.COM_QUERY, "BEGIN")
    297         self._read_ok_packet()
    298 
    299     def commit(self):
    300         """
    301         Commit changes to stable storage.
    302 
    303         See `Connection.commit() <https://www.python.org/dev/peps/pep-0249/#commit>`_
    304         in the specification.
    305         """
    306         self._execute_command(COMMAND.COM_QUERY, "COMMIT")
    307         self._read_ok_packet()
    308 
    309     def rollback(self):
    310         """
    311         Roll back the current transaction.
    312 
    313         See `Connection.rollback() <https://www.python.org/dev/peps/pep-0249/#rollback>`_
    314         in the specification.
    315         """
    316         self._execute_command(COMMAND.COM_QUERY, "ROLLBACK")
    317         self._read_ok_packet()
    318 
    319     def show_warnings(self):
    320         """Send the "SHOW WARNINGS" SQL command."""
    321         self._execute_command(COMMAND.COM_QUERY, "SHOW WARNINGS")
    322         result = MySQLResult(self)
    323         result.read()
    324         return result.rows
    325 
    326     def select_db(self, db):
    327         """
    328         Set current db.
    329 
    330         :param db: The name of the db.
    331         """
    332         self._execute_command(COMMAND.COM_INIT_DB, db)
    333         self._read_ok_packet()
    334 
    335     def escape(self, obj, mapping=None):
    336         """Escape whatever value you pass to it.
    337 
    338         Non-standard, for internal use; do not use this in your applications.
    339         """
    340         if isinstance(obj, str_type):
    341             return "'" + self.escape_string(obj) + "'"
    342         if isinstance(obj, (bytes, bytearray)):
    343             ret = self._quote_bytes(obj)
    344             if self._binary_prefix:
    345                 ret = "_binary" + ret
    346             return ret
    347         return converters.escape_item(obj, self.charset, mapping=mapping)
    348 
    349     def literal(self, obj):
    350         """Alias for escape()
    351 
    352         Non-standard, for internal use; do not use this in your applications.
    353         """
    354         return self.escape(obj, self.encoders)
    355 
    356     def escape_string(self, s):
    357         if (self.server_status &
    358                 SERVER_STATUS.SERVER_STATUS_NO_BACKSLASH_ESCAPES):
    359             return s.replace("'", "''")
    360         return converters.escape_string(s)
    361 
    362     def _quote_bytes(self, s):
    363         if (self.server_status &
    364                 SERVER_STATUS.SERVER_STATUS_NO_BACKSLASH_ESCAPES):
    365             return "'%s'" % (_fast_surrogateescape(s.replace(b"'", b"''")),)
    366         return converters.escape_bytes(s)
    367 
    368     def cursor(self, cursor=None):
    369         """
    370         Create a new cursor to execute queries with.
    371 
    372         :param cursor: The type of cursor to create; one of :py:class:`Cursor`,
    373             :py:class:`SSCursor`, :py:class:`DictCursor`, or :py:class:`SSDictCursor`.
    374             None means use Cursor.
    375         """
    376         if cursor:
    377             return cursor(self)
    378         return self.cursorclass(self)
    379 
    380     def __enter__(self):
    381         """Context manager that returns a Cursor"""
    382         return self.cursor()
    383 
    384     def __exit__(self, exc, value, traceback):
    385         """On successful exit, commit. On exception, rollback"""
    386         if exc:
    387             self.rollback()
    388         else:
    389             self.commit()
    390 
    391     # The following methods are INTERNAL USE ONLY (called from Cursor)
    392     def query(self, sql, unbuffered=False):
    393         # if DEBUG:
    394         #     print("DEBUG: sending query:", sql)
    395         if isinstance(sql, text_type) and not (JYTHON or IRONPYTHON):
    396             if PY2:
    397                 sql = sql.encode(self.encoding)
    398             else:
    399                 sql = sql.encode(self.encoding, 'surrogateescape')
    400         self._execute_command(COMMAND.COM_QUERY, sql)
    401         self._affected_rows = self._read_query_result(unbuffered=unbuffered)
    402         return self._affected_rows
    403 
    404     def next_result(self, unbuffered=False):
    405         self._affected_rows = self._read_query_result(unbuffered=unbuffered)
    406         return self._affected_rows
    407 
    408     def affected_rows(self):
    409         return self._affected_rows
    410 
    411     def kill(self, thread_id):
    412         arg = struct.pack('<I', thread_id)
    413         self._execute_command(COMMAND.COM_PROCESS_KILL, arg)
    414         return self._read_ok_packet()
    415 
    416     def ping(self, reconnect=True):
    417         """
    418         Check if the server is alive.
    419 
    420         :param reconnect: If the connection is closed, reconnect.
    421         :raise Error: If the connection is closed and reconnect=False.
    422         """
    423         if self._sock is None:
    424             if reconnect:
    425                 self.connect()
    426                 reconnect = False
    427             else:
    428                 raise err.Error("Already closed")
    429         try:
    430             self._execute_command(COMMAND.COM_PING, "")
    431             self._read_ok_packet()
    432         except Exception:
    433             if reconnect:
    434                 self.connect()
    435                 self.ping(False)
    436             else:
    437                 raise
    438 
    439     def set_charset(self, charset):
    440         # Make sure charset is supported.
    441         encoding = charset_by_name(charset).encoding
    442 
    443         self._execute_command(COMMAND.COM_QUERY, "SET NAMES %s" % self.escape(charset))
    444         self._read_packet()
    445         self.charset = charset
    446         self.encoding = encoding
    447 
    448     def connect(self, sock=None):
    449         self._closed = False
    450         try:
    451             if sock is None:
    452                 if self.unix_socket:
    453                     sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
    454                     sock.settimeout(self.connect_timeout)
    455                     sock.connect(self.unix_socket)
    456                     self.host_info = "Localhost via UNIX socket"
    457                     self._secure = True
    458                     if DEBUG: print('connected using unix_socket')
    459                 else:
    460                     kwargs = {}
    461                     if self.bind_address is not None:
    462                         kwargs['source_address'] = (self.bind_address, 0)
    463                     while True:
    464                         try:
    465                             sock = socket.create_connection(
    466                                 (self.host, self.port), self.connect_timeout,
    467                                 **kwargs)
    468                             break
    469                         except (OSError, IOError) as e:
    470                             if e.errno == errno.EINTR:
    471                                 continue
    472                             raise
    473                     self.host_info = "socket %s:%d" % (self.host, self.port)
    474                     if DEBUG: print('connected using socket')
    475                     sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
    476                 sock.settimeout(None)
    477                 sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
    478             self._sock = sock
    479             self._rfile = _makefile(sock, 'rb')
    480             self._next_seq_id = 0
    481 
    482             self._get_server_information()
    483             self._request_authentication()
    484 
    485             if self.sql_mode is not None:
    486                 c = self.cursor()
    487                 c.execute("SET sql_mode=%s", (self.sql_mode,))
    488 
    489             if self.init_command is not None:
    490                 c = self.cursor()
    491                 c.execute(self.init_command)
    492                 c.close()
    493                 self.commit()
    494 
    495             if self.autocommit_mode is not None:
    496                 self.autocommit(self.autocommit_mode)
    497         except BaseException as e:
    498             self._rfile = None
    499             if sock is not None:
    500                 try:
    501                     sock.close()
    502                 except:  # noqa
    503                     pass
    504 
    505             if isinstance(e, (OSError, IOError, socket.error)):
    506                 exc = err.OperationalError(
    507                         2003,
    508                         "Can't connect to MySQL server on %r (%s)" % (
    509                             self.host, e))
    510                 # Keep original exception and traceback to investigate error.
    511                 exc.original_exception = e
    512                 exc.traceback = traceback.format_exc()
    513                 if DEBUG: print(exc.traceback)
    514                 raise exc
    515 
    516             # If e is neither DatabaseError or IOError, It's a bug.
    517             # But raising AssertionError hides original error.
    518             # So just reraise it.
    519             raise
    520 
    521     def write_packet(self, payload):
    522         """Writes an entire "mysql packet" in its entirety to the network
    523         addings its length and sequence number.
    524         """
    525         # Internal note: when you build packet manualy and calls _write_bytes()
    526         # directly, you should set self._next_seq_id properly.
    527         data = pack_int24(len(payload)) + int2byte(self._next_seq_id) + payload
    528         if DEBUG: dump_packet(data)
    529         self._write_bytes(data)
    530         self._next_seq_id = (self._next_seq_id + 1) % 256
    531 
    532     def _read_packet(self, packet_type=MysqlPacket):
    533         """Read an entire "mysql packet" in its entirety from the network
    534         and return a MysqlPacket type that represents the results.
    535 
    536         :raise OperationalError: If the connection to the MySQL server is lost.
    537         :raise InternalError: If the packet sequence number is wrong.
    538         """
    539         buff = b''
    540         while True:
    541             packet_header = self._read_bytes(4)
    542             #if DEBUG: dump_packet(packet_header)
    543 
    544             btrl, btrh, packet_number = struct.unpack('<HBB', packet_header)
    545             bytes_to_read = btrl + (btrh << 16)
    546             if packet_number != self._next_seq_id:
    547                 self._force_close()
    548                 if packet_number == 0:
    549                     # MariaDB sends error packet with seqno==0 when shutdown
    550                     raise err.OperationalError(
    551                         CR.CR_SERVER_LOST,
    552                         "Lost connection to MySQL server during query")
    553                 raise err.InternalError(
    554                     "Packet sequence number wrong - got %d expected %d"
    555                     % (packet_number, self._next_seq_id))
    556             self._next_seq_id = (self._next_seq_id + 1) % 256
    557 
    558             recv_data = self._read_bytes(bytes_to_read)
    559             if DEBUG: dump_packet(recv_data)
    560             buff += recv_data
    561             # https://dev.mysql.com/doc/internals/en/sending-more-than-16mbyte.html
    562             if bytes_to_read == 0xffffff:
    563                 continue
    564             if bytes_to_read < MAX_PACKET_LEN:
    565                 break
    566 
    567         packet = packet_type(buff, self.encoding)
    568         packet.check_error()
    569         return packet
    570 
    571     def _read_bytes(self, num_bytes):
    572         self._sock.settimeout(self._read_timeout)
    573         while True:
    574             try:
    575                 data = self._rfile.read(num_bytes)
    576                 break
    577             except (IOError, OSError) as e:
    578                 if e.errno == errno.EINTR:
    579                     continue
    580                 self._force_close()
    581                 raise err.OperationalError(
    582                     CR.CR_SERVER_LOST,
    583                     "Lost connection to MySQL server during query (%s)" % (e,))
    584         if len(data) < num_bytes:
    585             self._force_close()
    586             raise err.OperationalError(
    587                 CR.CR_SERVER_LOST, "Lost connection to MySQL server during query")
    588         return data
    589 
    590     def _write_bytes(self, data):
    591         self._sock.settimeout(self._write_timeout)
    592         try:
    593             self._sock.sendall(data)
    594         except IOError as e:
    595             self._force_close()
    596             raise err.OperationalError(
    597                 CR.CR_SERVER_GONE_ERROR,
    598                 "MySQL server has gone away (%r)" % (e,))
    599 
    600     def _read_query_result(self, unbuffered=False):
    601         self._result = None
    602         if unbuffered:
    603             try:
    604                 result = MySQLResult(self)
    605                 result.init_unbuffered_query()
    606             except:
    607                 result.unbuffered_active = False
    608                 result.connection = None
    609                 raise
    610         else:
    611             result = MySQLResult(self)
    612             result.read()
    613         self._result = result
    614         if result.server_status is not None:
    615             self.server_status = result.server_status
    616         return result.affected_rows
    617 
    618     def insert_id(self):
    619         if self._result:
    620             return self._result.insert_id
    621         else:
    622             return 0
    623 
    624     def _execute_command(self, command, sql):
    625         """
    626         :raise InterfaceError: If the connection is closed.
    627         :raise ValueError: If no username was specified.
    628         """
    629         if not self._sock:
    630             raise err.InterfaceError("(0, '')")
    631 
    632         # If the last query was unbuffered, make sure it finishes before
    633         # sending new commands
    634         if self._result is not None:
    635             if self._result.unbuffered_active:
    636                 warnings.warn("Previous unbuffered result was left incomplete")
    637                 self._result._finish_unbuffered_query()
    638             while self._result.has_next:
    639                 self.next_result()
    640             self._result = None
    641 
    642         if isinstance(sql, text_type):
    643             sql = sql.encode(self.encoding)
    644 
    645         packet_size = min(MAX_PACKET_LEN, len(sql) + 1)  # +1 is for command
    646 
    647         # tiny optimization: build first packet manually instead of
    648         # calling self..write_packet()
    649         prelude = struct.pack('<iB', packet_size, command)
    650         packet = prelude + sql[:packet_size-1]
    651         self._write_bytes(packet)
    652         if DEBUG: dump_packet(packet)
    653         self._next_seq_id = 1
    654 
    655         if packet_size < MAX_PACKET_LEN:
    656             return
    657 
    658         sql = sql[packet_size-1:]
    659         while True:
    660             packet_size = min(MAX_PACKET_LEN, len(sql))
    661             self.write_packet(sql[:packet_size])
    662             sql = sql[packet_size:]
    663             if not sql and packet_size < MAX_PACKET_LEN:
    664                 break
    665 
    666     def _request_authentication(self):
    667         # https://dev.mysql.com/doc/internals/en/connection-phase-packets.html#packet-Protocol::HandshakeResponse
    668         if int(self.server_version.split('.', 1)[0]) >= 5:
    669             self.client_flag |= CLIENT.MULTI_RESULTS
    670 
    671         if self.user is None:
    672             raise ValueError("Did not specify a username")
    673 
    674         charset_id = charset_by_name(self.charset).id
    675         if isinstance(self.user, text_type):
    676             self.user = self.user.encode(self.encoding)
    677 
    678         data_init = struct.pack('<iIB23s', self.client_flag, MAX_PACKET_LEN, charset_id, b'')
    679 
    680         if self.ssl and self.server_capabilities & CLIENT.SSL:
    681             self.write_packet(data_init)
    682 
    683             self._sock = self.ctx.wrap_socket(self._sock, server_hostname=self.host)
    684             self._rfile = _makefile(self._sock, 'rb')
    685             self._secure = True
    686 
    687         data = data_init + self.user + b''
    688 
    689         authresp = b''
    690         plugin_name = None
    691 
    692         if self._auth_plugin_name in ('', 'mysql_native_password'):
    693             authresp = _auth.scramble_native_password(self.password, self.salt)
    694         elif self._auth_plugin_name == 'caching_sha2_password':
    695             plugin_name = b'caching_sha2_password'
    696             if self.password:
    697                 if DEBUG:
    698                     print("caching_sha2: trying fast path")
    699                 authresp = _auth.scramble_caching_sha2(self.password, self.salt)
    700             else:
    701                 if DEBUG:
    702                     print("caching_sha2: empty password")
    703         elif self._auth_plugin_name == 'sha256_password':
    704             plugin_name = b'sha256_password'
    705             if self.ssl and self.server_capabilities & CLIENT.SSL:
    706                 authresp = self.password + b''
    707             elif self.password:
    708                 authresp = b'1'  # request public key
    709             else:
    710                 authresp = b''  # empty password
    711 
    712         if self.server_capabilities & CLIENT.PLUGIN_AUTH_LENENC_CLIENT_DATA:
    713             data += lenenc_int(len(authresp)) + authresp
    714         elif self.server_capabilities & CLIENT.SECURE_CONNECTION:
    715             data += struct.pack('B', len(authresp)) + authresp
    716         else:  # pragma: no cover - not testing against servers without secure auth (>=5.0)
    717             data += authresp + b''
    718 
    719         if self.db and self.server_capabilities & CLIENT.CONNECT_WITH_DB:
    720             if isinstance(self.db, text_type):
    721                 self.db = self.db.encode(self.encoding)
    722             data += self.db + b''
    723 
    724         if self.server_capabilities & CLIENT.PLUGIN_AUTH:
    725             data += (plugin_name or b'') + b''
    726 
    727         if self.server_capabilities & CLIENT.CONNECT_ATTRS:
    728             connect_attrs = b''
    729             for k, v in self._connect_attrs.items():
    730                 k = k.encode('utf8')
    731                 connect_attrs += struct.pack('B', len(k)) + k
    732                 v = v.encode('utf8')
    733                 connect_attrs += struct.pack('B', len(v)) + v
    734             data += struct.pack('B', len(connect_attrs)) + connect_attrs
    735 
    736         self.write_packet(data)
    737         auth_packet = self._read_packet()
    738 
    739         # if authentication method isn't accepted the first byte
    740         # will have the octet 254
    741         if auth_packet.is_auth_switch_request():
    742             if DEBUG: print("received auth switch")
    743             # https://dev.mysql.com/doc/internals/en/connection-phase-packets.html#packet-Protocol::AuthSwitchRequest
    744             auth_packet.read_uint8() # 0xfe packet identifier
    745             plugin_name = auth_packet.read_string()
    746             if self.server_capabilities & CLIENT.PLUGIN_AUTH and plugin_name is not None:
    747                 auth_packet = self._process_auth(plugin_name, auth_packet)
    748             else:
    749                 # send legacy handshake
    750                 data = _auth.scramble_old_password(self.password, self.salt) + b''
    751                 self.write_packet(data)
    752                 auth_packet = self._read_packet()
    753         elif auth_packet.is_extra_auth_data():
    754             if DEBUG:
    755                 print("received extra data")
    756             # https://dev.mysql.com/doc/internals/en/successful-authentication.html
    757             if self._auth_plugin_name == "caching_sha2_password":
    758                 auth_packet = _auth.caching_sha2_password_auth(self, auth_packet)
    759             elif self._auth_plugin_name == "sha256_password":
    760                 auth_packet = _auth.sha256_password_auth(self, auth_packet)
    761             else:
    762                 raise err.OperationalError("Received extra packet for auth method %r", self._auth_plugin_name)
    763 
    764         if DEBUG: print("Succeed to auth")
    765 
    766     def _process_auth(self, plugin_name, auth_packet):
    767         handler = self._get_auth_plugin_handler(plugin_name)
    768         if handler:
    769             try:
    770                 return handler.authenticate(auth_packet)
    771             except AttributeError:
    772                 if plugin_name != b'dialog':
    773                     raise err.OperationalError(2059, "Authentication plugin '%s'"
    774                               " not loaded: - %r missing authenticate method" % (plugin_name, type(handler)))
    775         if plugin_name == b"caching_sha2_password":
    776             return _auth.caching_sha2_password_auth(self, auth_packet)
    777         elif plugin_name == b"sha256_password":
    778             return _auth.sha256_password_auth(self, auth_packet)
    779         elif plugin_name == b"mysql_native_password":
    780             data = _auth.scramble_native_password(self.password, auth_packet.read_all())
    781         elif plugin_name == b"mysql_old_password":
    782             data = _auth.scramble_old_password(self.password, auth_packet.read_all()) + b''
    783         elif plugin_name == b"mysql_clear_password":
    784             # https://dev.mysql.com/doc/internals/en/clear-text-authentication.html
    785             data = self.password + b''
    786         elif plugin_name == b"dialog":
    787             pkt = auth_packet
    788             while True:
    789                 flag = pkt.read_uint8()
    790                 echo = (flag & 0x06) == 0x02
    791                 last = (flag & 0x01) == 0x01
    792                 prompt = pkt.read_all()
    793 
    794                 if prompt == b"Password: ":
    795                     self.write_packet(self.password + b'')
    796                 elif handler:
    797                     resp = 'no response - TypeError within plugin.prompt method'
    798                     try:
    799                         resp = handler.prompt(echo, prompt)
    800                         self.write_packet(resp + b'')
    801                     except AttributeError:
    802                         raise err.OperationalError(2059, "Authentication plugin '%s'" 
    803                                   " not loaded: - %r missing prompt method" % (plugin_name, handler))
    804                     except TypeError:
    805                         raise err.OperationalError(2061, "Authentication plugin '%s'" 
    806                                   " %r didn't respond with string. Returned '%r' to prompt %r" % (plugin_name, handler, resp, prompt))
    807                 else:
    808                     raise err.OperationalError(2059, "Authentication plugin '%s' (%r) not configured" % (plugin_name, handler))
    809                 pkt = self._read_packet()
    810                 pkt.check_error()
    811                 if pkt.is_ok_packet() or last:
    812                     break
    813             return pkt
    814         else:
    815             raise err.OperationalError(2059, "Authentication plugin '%s' not configured" % plugin_name)
    816 
    817         self.write_packet(data)
    818         pkt = self._read_packet()
    819         pkt.check_error()
    820         return pkt
    821 
    822     def _get_auth_plugin_handler(self, plugin_name):
    823         plugin_class = self._auth_plugin_map.get(plugin_name)
    824         if not plugin_class and isinstance(plugin_name, bytes):
    825             plugin_class = self._auth_plugin_map.get(plugin_name.decode('ascii'))
    826         if plugin_class:
    827             try:
    828                 handler = plugin_class(self)
    829             except TypeError:
    830                 raise err.OperationalError(2059, "Authentication plugin '%s'"
    831                     " not loaded: - %r cannot be constructed with connection object" % (plugin_name, plugin_class))
    832         else:
    833             handler = None
    834         return handler
    835 
    836     # _mysql support
    837     def thread_id(self):
    838         return self.server_thread_id[0]
    839 
    840     def character_set_name(self):
    841         return self.charset
    842 
    843     def get_host_info(self):
    844         return self.host_info
    845 
    846     def get_proto_info(self):
    847         return self.protocol_version
    848 
    849     def _get_server_information(self):
    850         i = 0
    851         packet = self._read_packet()
    852         data = packet.get_all_data()
    853 
    854         self.protocol_version = byte2int(data[i:i+1])
    855         i += 1
    856 
    857         server_end = data.find(b'', i)
    858         self.server_version = data[i:server_end].decode('latin1')
    859         i = server_end + 1
    860 
    861         self.server_thread_id = struct.unpack('<I', data[i:i+4])
    862         i += 4
    863 
    864         self.salt = data[i:i+8]
    865         i += 9  # 8 + 1(filler)
    866 
    867         self.server_capabilities = struct.unpack('<H', data[i:i+2])[0]
    868         i += 2
    869 
    870         if len(data) >= i + 6:
    871             lang, stat, cap_h, salt_len = struct.unpack('<BHHB', data[i:i+6])
    872             i += 6
    873             # TODO: deprecate server_language and server_charset.
    874             # mysqlclient-python doesn't provide it.
    875             self.server_language = lang
    876             try:
    877                 self.server_charset = charset_by_id(lang).name
    878             except KeyError:
    879                 # unknown collation
    880                 self.server_charset = None
    881 
    882             self.server_status = stat
    883             if DEBUG: print("server_status: %x" % stat)
    884 
    885             self.server_capabilities |= cap_h << 16
    886             if DEBUG: print("salt_len:", salt_len)
    887             salt_len = max(12, salt_len - 9)
    888 
    889         # reserved
    890         i += 10
    891 
    892         if len(data) >= i + salt_len:
    893             # salt_len includes auth_plugin_data_part_1 and filler
    894             self.salt += data[i:i+salt_len]
    895             i += salt_len
    896 
    897         i+=1
    898         # AUTH PLUGIN NAME may appear here.
    899         if self.server_capabilities & CLIENT.PLUGIN_AUTH and len(data) >= i:
    900             # Due to Bug#59453 the auth-plugin-name is missing the terminating
    901             # NUL-char in versions prior to 5.5.10 and 5.6.2.
    902             # ref: https://dev.mysql.com/doc/internals/en/connection-phase-packets.html#packet-Protocol::Handshake
    903             # didn't use version checks as mariadb is corrected and reports
    904             # earlier than those two.
    905             server_end = data.find(b'', i)
    906             if server_end < 0: # pragma: no cover - very specific upstream bug
    907                 # not found  and last field so take it all
    908                 self._auth_plugin_name = data[i:].decode('utf-8')
    909             else:
    910                 self._auth_plugin_name = data[i:server_end].decode('utf-8')
    911 
    912     def get_server_info(self):
    913         return self.server_version
    914 
    915     Warning = err.Warning
    916     Error = err.Error
    917     InterfaceError = err.InterfaceError
    918     DatabaseError = err.DatabaseError
    919     DataError = err.DataError
    920     OperationalError = err.OperationalError
    921     IntegrityError = err.IntegrityError
    922     InternalError = err.InternalError
    923     ProgrammingError = err.ProgrammingError
    924     NotSupportedError = err.NotSupportedError
    Clnnection类

       

      游标Cursor

        操作数据库,必须使用游标,需要现获取 一个游标对象,

        Connection.cursor(cursor=None) 方法返回一个新的游标。

        连接没有关闭前,游标对象可以反复使用。

        cursor 参数,可以指定一个Cursor 类,如果为None,则使用默认Cursor类

      操作数据库

        数据库操作需要使用Cursor类的实例,提供execute()方法,执行sql 语句,成功返回影响的行数。

        默认是非自动提交的,需要手动提交,这也是业务上的默认需要。

      新增记录
        使用 insert into 语句插入数据。  

     1 import pymysql
     2 
     3 conn = None
     4 try:
     5     conn = pymysql.connect('192.168.112.111', 'root', '123456', 'test')
     6     # reconnect=True,会尝试重连一次,False 不会尝试重连
     7     conn.ping(False) # 如果 活着,返回None
     8     cursor = conn.cursor()
     9     insert_sql = "INSERT INTO t1 (name,age) VALUES('tom',20)"
    10     rows = cursor.execute(insert_sql)
    11     conn.commit() #  原码中 没开启自动提交。
    12                   # :param autocommit: Autocommit mode. None means use server default. (default: False)
    13     print(rows) # 返回影响的行数。此处只插入一行,所以影响的行数 为 1.
    14 except Exception as e:
    15     print(e)
    16 finally:
    17     if conn:
    18         conn.close()

      查询数据:(注意,要选择的字段以及行数,刚刚够用最佳,否则,影响服务器端,带宽,以及本地缓存空间)

     1 import pymysql
     2 
     3 conn = None
     4 try:
     5     # 实例化一个 cconnection类的实例
     6     conn = pymysql.connect('192.168.112.111', 'root', '123456', 'test')
     7     # Connection.cursor(cursor=None) 创建游标
     8     cursor = conn.cursor()
     9 
    10     insert_sql = "SELECT * FROM t1"
    11     count = cursor.execute(insert_sql)
    12     print(count)
    13 
    14     rows = cursor.fetchone()
    15     print(rows)
    16     print(cursor.rowcount, cursor.rownumber) # 影响的行数,匹配到的第几行
    17 
    18     rows = cursor.fetchone()
    19     print(rows)
    20     print(cursor.rowcount, cursor.rownumber)
    21 
    22     rows = cursor.fetchmany(2)
    23     print(rows)
    24     print(cursor.rowcount, cursor.rownumber)
    25 
    26     rows = cursor.fetchall()
    27     print(rows)
    28     print(cursor.rowcount, cursor.rownumber)
    29     
    30 except Exception as e:
    31     print(e)
    32 finally:
    33     if conn:
    34         conn.close()

      结果:

     1 D:python3.7python.exe E:/code_pycharm/test_in_class/tt14.py
     2 7
     3 (1, 'e', None)
     4 7 1
     5 (3, 'tom', 20)
     6 7 2
     7 ((4, 'jack', 30), (5, 'lilei', 30))
     8 7 4
     9 ((6, 'jerry', 20), (30, 'tom1', 30), (31, 'rot', 30))
    10 7 7
    11 
    12 Process finished with exit code 0
    结果

      将字段 也 返回  

     1 import pymysql
     2 
     3 conn = None
     4 curosr =None
     5 
     6 try:
     7     # 实例化一个 cconnection类的实例
     8     conn = pymysql.connect('192.168.112.111', 'root', '123456', 'test')
     9     print(conn)
    10     # Connection.cursor(cursor=None) 创建游标
    11     # 默认cursor=None
    12     # 传入一个增强后的cursor类,创建游标(原码是一个Mixin类
    13     cursor = conn.cursor(pymysql.cursors.DictCursor)
    14 
    15     insert_sql = "SELECT * FROM t1"
    16     count = cursor.execute(insert_sql)
    17     print(count)
    18 
    19     print(cursor.fetchall())
    20 
    21     conn.commit()
    22 
    23 
    24 except Exception as e:
    25     print(e)
    26 finally:
    27     if conn:
    28         conn.close()

      结果:

    1 <pymysql.connections.Connection object at 0x000000000297CB70>
    2 7
    3 [{'id': 1, 'name': 'e', 'age': None}, {'id': 3, 'name': 'tom', 'age': 20}, {'id': 4, 'name': 'jack', 'age': 30}, {'id': 5, 'name': 'lilei', 'age': 30}, {'id': 6, 'name': 'jerry', 'age': 20}, {'id': 30, 'name': 'tom1', 'age': 30}, {'id': 31, 'name': 'rot', 'age': 30}]

      

      事务管理:

      Connection 类有三个方法 :

      1. begin 开始事务
      2. commint 提交事务
      3. rollback 回滚事务 
     1 import pymysql
     2 
     3 conn = None
     4 curosr =None
     5 
     6 try:
     7     # 实例化一个 cconnection类的实例
     8     conn = pymysql.connect('192.168.112.111', 'root', '123456', 'test')
     9     curosr = conn.cursor()
    10 
    11     # 批量插入 方式 1:
    12     for i in range(10):
    13         sql = "INSERT INTO t1 (name, age) values('to',12)"
    14         rows = curosr.execute(sql)
    15 
    16     # 批量插入 方式 2:
    17     sql = "INSERT INTO t1 (name, age) values(%s,%s)"
    18     rows = curosr.executemany(sql, (('t{}'.format(i), 30+i) for i in range(5)))
    19   
    20     conn.commit() # 以后只要有 commit,就记得在出现异常出,增加回滚
    21 
    22 except Exception as e:
    23     print(e)
    24     conn.rollback()
    25 finally:
    26     if conn:
    27         conn.close()
    28     if curosr:
    29         curosr.close()

      事实上,方式2 就是方式1:

    self.rowcount = sum(self.execute(query, arg) for arg in args)

    总结: 一般的执行流程:

    • 建立连接
    • 获取游标
    • 执行SQL
    • 提交任务
    • 释放资源  

    SQL 注入攻击:

      找出用户id 为 6 的用户信息的SQL 语句如下:

      SELECT * FROM students WHERE id= 6

      本以为如上的查询语句,但是实施上,SELECT * FROM `t1` WHERE age=30 or 1; 也就是通过字符拼接,还是可以获取所有的表中数据。

      现在,要求可以找出某个id对应用户信息,代码如下:

    1 userid = 5
    2 sql = "SELECT * FROM stedents WHERE id={}".format(userid)

      userid 可以变,例如从客户端 request请求 中获取,直接拼接到查询字符串中。

      可是,如果userid = ‘5 or 1=1’   呢?

      运行的结果 竟然是返回了所有的数据。

      (这就是利用 拼接 ,包括引号等 ,拼接出 类似 SELECT * FROM `t1` WHERE age=30 or 1)

      SQL 注入攻击:

        猜测 后台数据库的查询语句使用拼接字符串的方式,从而经过设计为服务端传参,令其拼接出特殊字符串的SQL 语句,返回攻击者想要的结果。

      永远不要相信客户端传来的数据是规范的 ,安全的。!!!

      如何解决:

        1、参数化查询,可以有效防止注入攻击,并提高查询的效率。

        2、同时不要报 详细的异常给客户端。

      Cursor.execute(query, args= None):

        args, 必须是元组,列表或字典,如果查询字符串使用 %(name)s, 就必须使用字典。

       测试:

     1 import pymysql
     2 from pymysql.cursors import  DictCursor
     3 
     4 conn = pymysql.connect('192.168.112.111', 'root','123456', 'test')
     5 cursor = conn.cursor(DictCursor)
     6 
     7 userid = '5 or 1=1'
     8 sql = 'SELECT * FROM t1 WHERE id=%s'
     9 cursor.execut(sql,(userid,)) # 参数化查询
    10 print(cursor.fetchall())
    11 
    12 sql = 'SELECT * FROM t1 WHERE `name` like %(name)s and age > %(age)s'
    13 cursor.execute(sql, {'name':'tom%', 'age':25}) # 参数化查询
    14 
    15 if cursor:
    16     cursor.close()
    17 if conn:
    18     conn.close()

       结果:没有吧所有的数据显示出来。只显示要查询的行

      [ { 'age':20, 'id' : 5, 'name':'tom0'}]

       参数化查询为什么会提高效率:

         原因是 ---SQL 语句缓存,

        数据库服务器一般会对SQL 语句编译和缓存,编译只对SQL 语句部分,所以参数中就算有SQL 指令也不会被执行。

        编译过程,需要词法分析,语义分析,生成AST(语法抽象树,倒转的),优化,生成执行计划等过程,比较耗费资源。

        服务端 会先查找是否对同一条查询语句进行了缓存,如果缓存未失效,则不需要再次编译,从而降低了编译的成本,降低了内存消耗

        可以认为SQL 语句字符串就是一个KEY, 如果使用拼接方案,每次发送过去的SQL 语句都不一样,都需要编译并缓存。

        大量查询的时候,首选参数化查询。

        开发时,应该使用参数化查询。

        注意:这里的查询字符串的缓存,不是查询结果的缓存

     上下文支持:

       连接类源代码:

     1 class Connection(object): # 返回一个conn
     2     def __enter__(self):
     3         """Context manager that returns a Cursor"""
     4         return self.cursor()
     5 
     6     def __exit__(self, exc, value, traceback):
     7         """On successful exit, commit. On exception, rollback"""
     8         if exc:
     9             self.rollback()
    10         else:
    11             self.commit()

       游标类源代码:

    1 class Cursor(object):
    2     def __enter__(self):
    3         return self
    4 
    5     def __exit__(self, *exc_info):
    6         del exc_info
    7         self.close()

       可以看出,连接类 的上下文,进入之前,conn实例,调用cursor() 方法,创建游标,结束的时候,,如果异常,回滚,如果正常,则只是提交。并没有关闭连接

      游标类,进入的时候,返回游标实例,退出的时候,关闭 了游标类。

      举例 1:

     1 import pymysql
     2 from pymysql.cursors import  DictCursor
     3 
     4 conn = pymysql.connect('192.168.112.111', 'root','123456', 'test')
     5 
     6 try:
     7     with conn.cursor() as cursor:
     8         sql = "select * from t1"
     9         cursor.execute(sql)
    10     conn.commit()
    11     # 只需要提交事务,cursor 已经在退出with的时候,就关闭了
    12     
    13 except Exception as e:
    14     print(e)
    15     conn.rollback()    
    16 finally:
    17     conn.close() # 

      举例 2:

     1 import pymysql
     2 from pymysql.cursors import  DictCursor
     3 
     4 conn = pymysql.connect('192.168.112.111', 'root','123456', 'test')
     5 
     6 
     7 with conn as cursor:
     8     sql = "select * from t1"
     9     cursor.execute(sql)
    10     
    11 # 此外 在 __enter__ 里创建的curor 也没有关掉,所以可以继续 使用
    12 sql = "select * from t2"
    13 cursor.execute(sql)
    14 
    15 # conn 的上下文 退出with 的时候,没有将conn 关掉
    16 cursor.close()
    17 conn.close()

       举例 3:

     1 import pymysql
     2 from pymysql.cursors import  DictCursor
     3 
     4 conn = pymysql.connect('192.168.112.111', 'root','123456', 'test')
     5 
     6 
     7 with conn as cursor:
     8     with cursor:
     9         sql = "select * from t1"
    10         cursor.execute(sql)
    11 
    12 
    13 # conn 的上下文 退出with 的时候,没有将conn 关掉,但是cursor 关了
    14 conn.close()

      总结:连接不需要反反复复重建销毁,应该是多个cursor 共享一个conn

    print(conn.ping(True))
    为什么要坚持,想一想当初!
  • 相关阅读:
    (转)大数据量高并发的数据库优化与sql优化
    SQL Server查询优化方法参考(转)
    CString和LPCSTR区别(转)
    delphi读写剪贴板的一些参考
    Delphi和VC混合编程总结
    Delphi 一些函数解释
    伪共享与volatile
    happens-before原则
    递归与回溯-排立组合
    二叉树
  • 原文地址:https://www.cnblogs.com/JerryZao/p/9937441.html
Copyright © 2011-2022 走看看