zoukankan      html  css  js  c++  java
  • 修改torndb库为依赖pymysql,使其适应python3,一个更简单的操作数据库的类。

    1、python的MySQLdb和pymysql是两个基本数据库操作包,MySQLdb安装很麻烦,要有c++相关环境,python3也安装不了。

    python3一般安装pymysql,此包与MySQLdb包具有高度的可替换性,只要学习一种库的api,另一种库的操作方法完全一模一样,不需要新学api。

    但这两个都是偏底层,有个硬伤是公有查询方法不好用,因为执行是cursor.excute,取结果却要二次在游标上进行操作,所以一般情况下大部分人都会进行二次封装。

    2、torndb就是一个这样的库,不需要调用者去关心数据库游标,每个公有方法都是直接返回结果,这也是一般情况下对数据库二次封装想达到的目标,有了torndb就不需要苦逼的自己去封装数据库操作了,就像有了好用的requests,70%场景下是不需要再去封装urllib了,也有特殊情况,我基于requests的Session类进行了二次封装,使其更容易使用,封装后在类外部只需要很少的外置代码,通过调用封装的方法就能得到结果了。

    3、痛点就是torndb使依赖的是MySQLdb,安装难,而且里面有几个python语法在python2有,pyhton3去掉了,最后一次官方更新是2014年时候,估计是官方不打算更新来兼容py3了。

    我把它修改成了依赖pymysql,兼容python3,并发布到pypi官网了。可以使用pip install torndb_for_python3来安装这个包,也可以自己复制以下代码。

    torndb_for_python3完全兼容torndb包,api一模一样,torndb_for_python3包的使用方法,去百度搜索torndb的方法就可以了。

    以下代码中有#TODO注释的行是在官方代码上修改了的行。

    torndb_for_python3.py的代码。

    #!/usr/bin/env python
    #
    # Copyright 2009 Facebook
    #
    # Licensed under the Apache License, Version 2.0 (the "License"); you may
    # not use this file except in compliance with the License. You may obtain
    # a copy of the License at
    #
    #     http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
    # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
    # License for the specific language governing permissions and limitations
    # under the License.
    
    """A lightweight wrapper around MySQLdb.
    
    Originally part of the Tornado framework.  The tornado.database module
    is slated for removal in Tornado 3.0, and it is now available separately
    as torndb.
    官方torndb需要搭配MySQLdb包,主要是修改成搭配pymysql,有TODO注释的行都是在官方基础上修改了的,是为了兼容pymsql的参数和兼容python3
    
    
    """
    
    from __future__ import absolute_import, division, with_statement
    
    import copy
    import itertools
    import logging
    import os
    import time
    import pymysql  # TODO  需要安装pymysql
    
    pymysql.install_as_MySQLdb()  # TODO  使用monkey技术将MySQLdb包替换为了pymsql包,所以不需要在代码里面大量替换MySQLdb为pymsql字眼,pymsql和MySQLdb的主要类是鸭子类的关系
    
    try:
        import MySQLdb.constants
        import MySQLdb.converters
        import MySQLdb.cursors
    except ImportError:
        # If MySQLdb isn't available this module won't actually be useable,
        # but we want it to at least be importable on readthedocs.org,
        # which has limitations on third-party modules.
        if 'READTHEDOCS' in os.environ:
            MySQLdb = None
        else:
            raise
    
    version = "0.3"
    version_info = (0, 3, 0, 0)
    
    
    class Connection(object):
        """A lightweight wrapper around MySQLdb DB-API connections.
    
        The main value we provide is wrapping rows in a dict/object so that
        columns can be accessed by name. Typical usage::
    
            db = torndb.Connection("localhost", "mydatabase")
            for article in db.query("SELECT * FROM articles"):
                print article.title
    
        Cursors are hidden by the implementation, but other than that, the methods
        are very similar to the DB-API.
    
        We explicitly set the timezone to UTC and assume the character encoding to
        UTF-8 (can be changed) on all connections to avoid time zone and encoding errors.
    
        The sql_mode parameter is set by default to "traditional", which "gives an error instead of a warning"
        (http://dev.mysql.com/doc/refman/5.0/en/server-sql-mode.html). However, it can be set to
        any other mode including blank (None) thereby explicitly clearing the SQL mode.
        """
    
        def __init__(self, host, database, user=None, password=None,
                     max_idle_time=7 * 3600, connect_timeout=30,  # TODO  由0修改成了30,因为pymsql必须大于0
                     time_zone="+0:00", charset="utf8", sql_mode="TRADITIONAL"):
            self.host = host
            self.database = database
            self.max_idle_time = float(max_idle_time)
    
            args = dict(conv=CONVERSIONS, use_unicode=True, charset=charset,
                        db=database, init_command=('SET time_zone = "%s"' % time_zone),
                        connect_timeout=connect_timeout, sql_mode=sql_mode)
            if user is not None:
                args["user"] = user
            if password is not None:
                args["passwd"] = password
    
            # We accept a path to a MySQL socket file or a host(:port) string
            if "/" in host:
                args["unix_socket"] = host
            else:
                self.socket = None
                pair = host.split(":")
                if len(pair) == 2:
                    args["host"] = pair[0]
                    args["port"] = int(pair[1])
                else:
                    args["host"] = host
                    args["port"] = 3306
    
            self._db = None
            self._db_args = args
            self._last_use_time = time.time()
            try:
                self.reconnect()
            except Exception:
                logging.error("Cannot connect to MySQL on %s", self.host,
                              exc_info=True)
    
        def __del__(self):
            self.close()
    
        def close(self):
            """Closes this database connection."""
            if getattr(self, "_db", None) is not None:
                self._db.close()
                self._db = None
    
        def reconnect(self):
            """Closes the existing database connection and re-opens it."""
            self.close()
            self._db = MySQLdb.connect(**self._db_args)
            self._db.autocommit(True)
    
        def iter(self, query, *parameters, **kwparameters):
            """Returns an iterator for the given query and parameters."""
            self._ensure_connected()
            cursor = MySQLdb.cursors.SSCursor(self._db)
            try:
                self._execute(cursor, query, parameters, kwparameters)
                column_names = [d[0] for d in cursor.description]
                for row in cursor:
                    yield Row(zip(column_names, row))
            finally:
                cursor.close()
    
        def query(self, query, *parameters, **kwparameters):
            """Returns a row list for the given query and parameters."""
            cursor = self._cursor()
            try:
                self._execute(cursor, query, parameters, kwparameters)
                column_names = [d[0] for d in cursor.description]
                # return [Row(itertools.izip(column_names, row)) for row in cursor]   # TODO 修改了此行
                return [Row(itertools.zip_longest(column_names, row)) for row in cursor]
            finally:
                cursor.close()
    
        def get(self, query, *parameters, **kwparameters):
            """Returns the (singular) row returned by the given query.
    
            If the query has no results, returns None.  If it has
            more than one result, raises an exception.
            """
            rows = self.query(query, *parameters, **kwparameters)
            if not rows:
                return None
            elif len(rows) > 1:
                raise Exception("Multiple rows returned for Database.get() query")
            else:
                return rows[0]
    
        # rowcount is a more reasonable default return value than lastrowid,
        # but for historical compatibility execute() must return lastrowid.
        def execute(self, query, *parameters, **kwparameters):
            """Executes the given query, returning the lastrowid from the query."""
            return self.execute_lastrowid(query, *parameters, **kwparameters)
    
        def execute_lastrowid(self, query, *parameters, **kwparameters):
            """Executes the given query, returning the lastrowid from the query."""
            cursor = self._cursor()
            try:
                self._execute(cursor, query, parameters, kwparameters)
                return cursor.lastrowid
            finally:
                cursor.close()
    
        def execute_rowcount(self, query, *parameters, **kwparameters):
            """Executes the given query, returning the rowcount from the query."""
            cursor = self._cursor()
            try:
                self._execute(cursor, query, parameters, kwparameters)
                return cursor.rowcount
            finally:
                cursor.close()
    
        def executemany(self, query, parameters):
            """Executes the given query against all the given param sequences.
    
            We return the lastrowid from the query.
            """
            return self.executemany_lastrowid(query, parameters)
    
        def executemany_lastrowid(self, query, parameters):
            """Executes the given query against all the given param sequences.
    
            We return the lastrowid from the query.
            """
            cursor = self._cursor()
            try:
                cursor.executemany(query, parameters)
                return cursor.lastrowid
            finally:
                cursor.close()
    
        def executemany_rowcount(self, query, parameters):
            """Executes the given query against all the given param sequences.
    
            We return the rowcount from the query.
            """
            cursor = self._cursor()
            try:
                cursor.executemany(query, parameters)
                return cursor.rowcount
            finally:
                cursor.close()
    
        update = execute_rowcount
        updatemany = executemany_rowcount
    
        insert = execute_lastrowid
        insertmany = executemany_lastrowid
    
        def _ensure_connected(self):
            # Mysql by default closes client connections that are idle for
            # 8 hours, but the client library does not report this fact until
            # you try to perform a query and it fails.  Protect against this
            # case by preemptively closing and reopening the connection
            # if it has been idle for too long (7 hours by default).
            if (self._db is None or
                    (time.time() - self._last_use_time > self.max_idle_time)):
                self.reconnect()
            self._last_use_time = time.time()
    
        def _cursor(self):
            self._ensure_connected()
            return self._db.cursor()
    
        def _execute(self, cursor, query, parameters, kwparameters):
            try:
                return cursor.execute(query, kwparameters or parameters)
            except OperationalError:
                logging.error("Error connecting to MySQL on %s", self.host)
                self.close()
                raise
    
        def __str__(self):
            return f'{type(self)} -> [{self.host}机器的{ self.database}库]'  # TODO
    
    
    class Row(dict):
        """A dict that allows for object-like property access syntax."""
    
        def __getattr__(self, name):
            try:
                return self[name]
            except KeyError:
                raise AttributeError(name)
    
    
    if MySQLdb is not None:
        # Fix the access conversions to properly recognize unicode/binary
        FIELD_TYPE = MySQLdb.constants.FIELD_TYPE
        FLAG = MySQLdb.constants.FLAG
        CONVERSIONS = copy.copy(MySQLdb.converters.conversions)
    
        field_types = [FIELD_TYPE.BLOB, FIELD_TYPE.STRING, FIELD_TYPE.VAR_STRING]
        if 'VARCHAR' in vars(FIELD_TYPE):
            field_types.append(FIELD_TYPE.VARCHAR)
    
        for field_type in field_types:
            # CONVERSIONS[field_type] = [(FLAG.BINARY, str)] + CONVERSIONS[field_type]  # TODO 修改了此行
            CONVERSIONS[field_type] = [(FLAG.BINARY, str)].append(CONVERSIONS[field_type])
    
        # Alias some common MySQL exceptions
        IntegrityError = MySQLdb.IntegrityError
        OperationalError = MySQLdb.OperationalError

    使用用法,举一个例子:

    # pip install torndb_for_python3

    mysql_conn = torndb_for_python3.Connection(host='localhost', database='test', user='root', password='123456', charset='utf8')
    print(mysql_conn.query('SELECT * FROM test.tablexx'))
  • 相关阅读:
    MySql给表中某字段插入随机数
    MySql 基本语法_数据操作
    thinkphp中模板继承
    thinkphp中模块和操作映射
    如何让ThinkPHP的模板引擎达到最佳效率
    ThinkPHP访问不存在的模块跳到404页面
    thinkphp中I方法
    thinkphp中field方法
    thinkphp中F方法
    thinkphp中where方法
  • 原文地址:https://www.cnblogs.com/ydf0509/p/9538130.html
Copyright © 2011-2022 走看看