zoukankan      html  css  js  c++  java
  • python生成数据库中所有表的DESC描述

           在数据库设计完成之后, 常常需要在 wiki 或其他文档中保存一份数据库中所有表的 desc 描述, 尤其是每个字段的含义和用途。 手动去生成自然是不可取的。 因此, 我编写了一个简单的 python 程序,可以自动生成数据库中所有表的 desc 描述, 并以可读格式输出。

           

    # -*- coding: utf-8 -*-
    # -------------------------------------------------------------------------------
    # Name:          db_tables_descs.py
    # Purpose:       generate the tables that describe the meanings of fields in db
    #
    # Author:       qin.shuq
    #
    # Created:      2014/11/17
    # Output:       desc.txt
    #               recording the tables that describe the meanings of fields in db
    #-------------------------------------------------------------------------------
    #!/usr/bin/env python
    
    import db
    
    globalFieldDescs = ('Field', 'Type', 'Null', 'Key', 'Default', 'Extra')
    
    globalDescFile = 'desc.txt'
    
    conflictedWithMysqlKeywords = set(['group'])
    
    fieldDescMapping = {
        'id':         '唯一标识',
        'is_deleted': '是否逻辑删除',
        'status':     '实体状态',
        'type':       '实体类型',
        'priority':   '优先级',
        'password':   '密码',
        'ip':         'ip 地址',
        'mac':        'mac 地址',
        'protocol':   '访问协议',
        'user_id':    '用户唯一标识'
    }
    
    def formatCols(fieldDesc):
        return  "%-16s %-24s %-5s %-8s %-8s %-30s" % fieldDesc
    
    def withNewLine(astr):
        return astr + '
    '
    
    
    def commonFieldsProcess(fieldDescList):
        fieldName = fieldDescList[0]
        fieldDesc = fieldDescMapping.get(fieldName)
        desclen =   len(fieldDescList)
        if fieldDesc is None:
            if fieldName.startswith('gmt_c'):
                fieldDesc = '创建时间'
            elif fieldName.startswith('gmt_m'):
                fieldDesc = '修改时间'
            else:
                fieldDesc = fieldDescList[desclen-1]
        fieldDescList[desclen-1] = fieldDesc
    
    def formatF(fieldDescTuple):
        fieldDescList = list(fieldDescTuple)
        fieldLen = len(fieldDescList)
        for i in range(fieldLen):
            if fieldDescList[i] is None:
                fieldDescList[i] = 'NULL'
            else:
                fieldDescList[i] = str(fieldDescList[i])
        commonFieldsProcess(fieldDescList)
        return formatCols(tuple(fieldDescList))
    
    def format(tableDesc):
        desc = ''
        for fieldDescTuple in tableDesc:
            desc += withNewLine(formatF(fieldDescTuple))
        return desc
    
    def descDb(givenDb):
        tablesRet = givenDb.query("show tables;")
        tableNames = [table[0] for table in tablesRet]
        desc = u''
        for tablename in tableNames:
            if tablename in conflictedWithMysqlKeywords:
                tablename = '`' + tablename + '`'
            descSql = "desc " + tablename
            tableDesc = givenDb.query(descSql)
            desc += withNewLine(tablename)
            desc += withNewLine(formatCols(globalFieldDescs)).decode('utf-8')
            desc += withNewLine(format(tableDesc)).decode('utf-8')
            desc += withNewLine('').decode('utf-8')
        return desc
    
    
    def main():
    
        descFile = open(globalDescFile, 'w')
    
        desc = descDb(db.Mydb())
        descFile.write(desc.encode('utf-8'))
    
        descFile.close()
    
    
    if __name__ == '__main__':
        main()

    db.py

    #!/usr/ali/bin/python
    # coding=utf-8
    
    '''Implements a database api to your db.
    
    Example 1: Query SQL
    
    a. Use execute() method to execute query sql:
    
        db.execute('select * from ip')
    
        # Get only the first two rows
        db.get_rows(2)
        # result like [('10.10.0.1', 'my'), ..]
    
        # Get the next two rows, but each row record is a dict
        db.get_rows(2, is_dict = True)
        # result like [{'address':'10.10.0.1', 'name': 'my'}, ..]
    
    b. Use query() method to execute query sql directly:
    
        # The query() method will get the result rows immediately
        db.query('select * from ip', size = 2, is_dict = True)
    
    c. Use split_query() method to split long query into small ones:
    
       # Assume that the name_list's length is 10000
       # See the docstring of split_query() for more details
       db.split_query('select address from ip', 'name', name_list)
    
    Example 2: Insert SQL
    
    a. Insert a new record into ip table:
    
        db.execute("insert into ip('address','name') values('192.168.0.1','vm-xxx')")
    
        # If auto commit set to false, call commit() method manually
        db.commit()
    
    b. Insert multi-records into ip table:
    
        db.executemany("insert into ip('address','name') values(%s,%s)", [
                ('192.168.0.1', 'vm-xxx'),
                ('192.168.0.2', 'vm-yyy'),
                ('192.168.0.3', 'vm-zzz')])
        db.commit()
    
    Note: db.multi_insert is an alias for executemany method.
    See test_main() method for more examples.
    '''
    
    from database import DB
    
    class Mydb(DB):
        '''A simple query interface of a specific database.'''
        def __init__(self, read_only = True,
                     auto_commit = False, timeout = 5, auto_connect = False,
                     max_idle_time = 28800):
            '''Initialize the database access object.'''
            # Get the database parameters
            args = {'host':'127.0.0.1', 'user':'root','passwd':'123456','db':'mysql','port':3306,'charset':'utf8'}
    
            # Set extra connection parameters
            args['connect_timeout'] = timeout
            args['auto_commit'] = auto_commit
            args['max_idle_time'] = max_idle_time
            args['auto_connect'] = auto_connect
    
            DB.__init__(self, **args)

    database.py

    #!/usr/ali/bin/python
    # coding=utf-8
    
    '''Implements a simple database interface
    
    Example 0: Create connection:
    
        # Set auto commit to false, default case
        db = DB(auto_commit = False, host = 'x', user = 'x', passwd = 'x', db = 'x')
        # Set auto commit to true
        db = DB(auto_commit = True, host = 'x', user = 'x', passwd = 'x', db = 'x')
        # Set auto connect to true, this will set auto commit to true too
        # This will enable auto connect when the connection is timeout
        db = DB(auto_connect = True, host = 'x', user = 'x', passwd = 'x', db = 'x')
    
    Example 1: Query SQL
    
    a. Use query() method to execute query sql directly:
    
        # The query() method will get the result rows immediately
        db.query('select * from ip', size = 2, is_dict = True)
    
    c. Use split_query() method to split long query into small ones:
    
       # Assume that the name_list's length is 10000
       # See the docstring of split_query() for more details
       db.split_query('select address from ip', 'name', name_list)
    
    Example 2: Insert SQL
    
    a. Insert a new record into ip table:
    
        db.execute("insert into ip('address','name') values('192.168.0.1','vm-xxx')")
    
        # If auto commit set to false, call commit() method manually
        db.commit()
    
    b. Insert multi-records into ip table:
    
        db.executemany("insert into ip('address','name') values(%s,%s)", [
                ('192.168.0.1', 'vm-xxx'),
                ('192.168.0.2', 'vm-yyy'),
                ('192.168.0.3', 'vm-zzz')])
        db.commit()
    
    Note: db.multi_insert is an alias for executemany method.
    '''
    
    # Can be 'Prototype', 'Development', 'Product'
    __status__ = 'Development'
    __author__ = 'tuantuan.lv <tuantuan.lv@alibaba-inc.com>'
    
    import re
    import time
    import MySQLdb
    
    from storage import Storage
    
    OperationalError = MySQLdb.OperationalError
    
    def _format(sql):
        '''Format the sql.'''
        return ' '.join(sql.split())
    
    class DB():
        '''A simple database query interface.'''
        def __init__(self, auto_commit = False, auto_connect = False,
                     max_idle_time = 28800, **kwargs):
            '''Initialize the DB object.'''
            #
            # Remember the max idle time (default: 28800)
            # You should set this value to mysql option 'wait_timeout'
            #
            # mysql> show variables like 'wait_timeout';
            # +---------------+-------+
            # | Variable_name | Value |
            # +---------------+-------+
            # | wait_timeout  | 28800 |
            # +---------------+-------+
            #
            self.max_idle_time = max_idle_time
    
            kwargs.setdefault('charset', 'utf8')             # set default charset to utf8
            kwargs['port'] = int(kwargs.get('port', '3306')) # set default port to 3306
    
            self._db = None                     # MySQLdb connection object
            self._db_cursor = None              # MySQLdb cursor object
            self.cursor = None                  # MySQLdb cursor object, deprecated
            self._db_args = kwargs              # MySQL db connection args
            self._last_use_time = time.time()   # Last active time
    
            self._auto_connect = auto_connect   # Auto connect when timeout
            self._auto_commit = auto_commit     # Auto commit
    
            # Open a new mysql connection
            self._reconnect()
    
        def __del__(self):
            self.close()
    
        def close(self):
            '''Close the database connection.'''
            if self._db is not None:
                self._db_cursor.close()
                self._db.close()
                self._db = None
    
        def _reconnect(self):
            '''Close existing connection and re-open a new one.'''
            self.close()
            self._db = MySQLdb.connect(**self._db_args)
    
            # Override auto commit setting if auto connect is true
            if self._auto_connect:
                self._db.autocommit(True)
            else:
                self._db.autocommit(self._auto_commit)
    
            self._db_cursor = self._db.cursor()
            self.cursor = self._db_cursor
    
        def _ensure_connected(self):
            '''Ensure we connect to mysql.'''
            # Mysql by default closes client connections that are idle for
            # 8 hours, but the client library does not report this fact until
            # you try to perform a query and it fails.  Protect against this
            # case by preemptively closing and reopening the connection
            # if it has been idle for too long (8 hours by default).
            if (self._db is None or
                (time.time() - self._last_use_time > self.max_idle_time)):
                self._reconnect()
    
            self._last_use_time = time.time()
    
        def _cursor(self):
            '''Get the cursor.'''
            if self._auto_connect:
                self._ensure_connected()
    
            return self._db_cursor
    
        def execute(self, sql, args = None):
            '''Execute a sql and return the affected row number.
    
            You should call the get_rows method to fetch the rows manually.
            '''
            cursor = self._cursor()
            return cursor.execute(_format(sql), args)
    
        def execute_lastrowid(self, sql, args = None):
            '''Execute a sql and return the last row id.
    
            You should call the get_rows method to fetch the rows manually.
            '''
            cursor = self._cursor()
            cursor.execute(_format(sql), args)
    
            return cursor.lastrowid
    
        def executemany(self, sql, args):
            '''Execute a multi-row insert.
    
            You can use this method to do a multi-row insert:
    
               c.executemany(
                  """INSERT INTO breakfast (name, spam, eggs, sausage, price)
                  VALUES (%s, %s, %s, %s, %s)""",
                  [
                  ("Spam and Sausage Lover's Plate", 5, 1, 8, 7.95 ),
                  ("Not So Much Spam Plate", 3, 2, 0, 3.95 ),
                  ("Don't Wany ANY SPAM! Plate", 0, 4, 3, 5.95 )
                  ] )
    
            See http://mysql-python.sourceforge.net/MySQLdb.html for more help.
            '''
            cursor = self._cursor()
            return cursor.executemany(_format(sql), args)
    
        # Execute a multi-row insert, the same as executemany()
        multi_insert = executemany
    
        def get_rows(self, size = None, is_dict = False):
            '''Get the result rows after executing.'''
            cursor = self._cursor()
            description = cursor.description
    
            if size is None:
                rows = cursor.fetchall()
            else:
                rows = cursor.fetchmany(size)
    
            if rows is None:
                rows = []
    
            if is_dict:
                dict_rows = []
                dict_keys = [ r[0] for r in description ]
    
                for row in rows:
                    dict_rows.append(Storage(zip(dict_keys, row)))
    
                rows = dict_rows
    
            return list(rows)
    
        def query(self, sql, args = None, size = None, is_dict = False):
            '''Execute a query sql and return the rows immediately.'''
            self.execute(sql, args)
            return self.get_rows(size, is_dict)
    
        # Alias of query() method
        select = query
    
        def split_query(self, sql, in_attr, in_list, max_cnt = 3000):
            '''Split one long query into many small ones.
    
            For example, if you want to select the records whose attrname is in
            one long list (larger than 8000) of possible values. If you decide to
            use 'attr in (...)' syntax, the length will exceed the maximum length
            of one sql allowed. In this case you must split the long query into many
            small ones.
    
            in_attr is the attribute name of in operator, and in_list is the possible
            value list. max_cnt is the maximum count of values in one small query.
            '''
            total = len(in_list)
    
            start = 0
            end = max_cnt
    
            result = []
    
            if re.search(r'where', sql.lower()):
            #if sql.lower().find('where ') != -1 or sql.lower().find('where
    ') != -1:
                sql = '%s and %s in %%s' % (sql, in_attr)
            else:
                sql = '%s where %s in %%s' % (sql, in_attr)
    
            while start < total:
                if end < total:
                    in_expr = "('%s')" % "','".join(in_list[start:end])
                else:
                    in_expr = "('%s')" % "','".join(in_list[start:])
    
                result.extend(self.query(sql % in_expr))
    
                start = end
                end += max_cnt
    
            return result
    
        #def get_autoincrement_id(self, tbl):
        #    '''Get the next auto increment id of table.
        #
        #    Return None if the table doesn't have an auto-increment id.
        #    '''
        #    self.execute('SHOW TABLE STATUS LIKE %s', (tbl,))
        #    result = self.get_rows(is_dict = True)
    
        #    if result[0]:
        #        return result[0]['Auto_increment']
        #    else:
        #        return None
    
        def commit(self):
            '''Commits the current transaction.'''
            if self._db is not None:
                self._db.commit()
    
        def rollback(self):
            '''Rollback the last transaction.'''
            if self._db is not None:
                self._db.rollback()
    
    # vim: set expandtab smarttab shiftwidth=4 tabstop=4:

    storage.py

    #!/usr/ali/bin/python
    # coding=utf-8
    
    '''Wrap an existing dict, or create a new one, and access with dot notation
    
    See test_main() for more example.
    '''
    
    # Can be 'Prototype', 'Development', 'Product'
    __status__ = 'Development'
    __author__ = 'tuantuan.lv <tuantuan.lv@alibaba-inc.com>'
    
    # Taken from http://stackoverflow.com/a/12187277
    class Storage(object):
        '''Wrap an existing dict, or create a new one, and access with dot notation.
    
        The attribute _data is reserved and stores the underlying dictionary.
    
        args:
            d: Existing dict to wrap, an empty dict created by default.
            create: Create an empty, nested dict instead of raising a KeyError.
        '''
        def __init__(self, d = None, create = True):
            '''Initialize storage object.'''
            if d is None: # Create empty storage object
                d = {}
            else: # create as a dictionary?
                d = dict(d)
    
            # Set storage attributes
            self.__dict__['__storage_data'] = d
            self.__dict__['__storage_create'] = create
    
        def __getattr__(self, name):
            '''Get the key value.'''
            try:
                value = self.__dict__['__storage_data'][name]
    
            except KeyError:
                # Create empty storage value if auto-create set to true
                if not self.__dict__['__storage_create']:
                    raise
    
                value = {}
                self.__dict__['__storage_data'][name] = value
    
            # Create nested dict if the value has items attribute
            if isinstance(value, dict):
                value = Storage(value, self.__dict__['__storage_create'])
                self.__dict__['__storage_data'][name] = value
    
            return value
    
        def __setattr__(self, name, value):
            '''Set the storage key to value'''
            self.__dict__['__storage_data'][name] = value
    
        def __delattr__(self, name):
            '''Delete the storage key.'''
            del self.__dict__['__storage_data'][name]
    
        def __contains__(self, name):
            '''Check whether the key exists.'''
            return name in self.__dict__['__storage_data']
    
        def __nonzero__(self):
            '''Check whether the storage is empty.'''
            return bool(self.__dict__['__storage_data'])
    
        # Defines common dict api
        __getitem__ = __getattr__
        __setitem__ = __setattr__
        __delitem__ = __delattr__
    
        def get(self, name, default = None):
            '''Defines an get method.'''
            return self.__dict__['__storage_data'].get(name, default)
    
        # Define dictionary like methods
        def keys(self):
            return self.__dict__['__storage_data'].keys()
    
        def items(self):
            return self.__dict__['__storage_data'].items()
    
        def values(self):
            return self.__dict__['__storage_data'].values()
    
        def setdefault(self, name, default = None):
            return self.__dict__['__storage_data'].setdefault(name, default)
    
        def pop(self, name, *args):
            return self.__dict__['__storage_data'].pop(name, *args)
    
        def update(self, d, **kwargs):
            return self.__dict__['__storage_data'].update(d, **kwargs)
    
        def clear(self):
            self.__dict__['__storage_data'].clear()
    
        def __len__(self):
            return len(self.__dict__['__storage_data'])
    
        def __iter__(self):
            return self.__dict__['__storage_data'].__iter__()
    
        def __unicode__(self):
            return u'<Storage %s>' % str(self.__dict__['__storage_data'])
    
        def __str__(self):
            return '<Storage %s>' % str(self.__dict__['__storage_data'])
    
        def __repr__(self):
            return '<Storage %s>' % repr(self.__dict__['__storage_data'])
    
    def test_main():
        # Create an empty storage
        d1 = Storage()
        d1.a.b = 1
        d1.b.c = 2
    
        # Iterate the items in storage object
        for k, v in d1.items():
           print k, v
    
        # Create a storage in a (key,value) tuple
        d3 = Storage(zip(['a','b','c'], [1,2,3]))
        print d3.a, d3.b, d3.c
        print d3
    
        # Create a storage from a existing dict
        d4 = Storage({'a':{'b':1}})
        print d4.a.b
        print d4
    
        # Check the attribute
        d5 = Storage()
        print 'a' in d5 # False
        print d5.a      # create attribute 'a'
        print 'a' in d5 # True
        print d5.get('c')
        print d5.get('d', 3)
    
        d5 = Storage(create = False)
        print 'a' in d5 # False
        print d5.get('a', 5)
        print d5.a      # raise KeyError
        print 'a' in d5 # False, also
    
    if __name__ == '__main__':
        test_main()
  • 相关阅读:
    python实战===教你用微信每天给女朋友说晚安
    [go]beego获取参数/返回参数
    [go]os.Open方法源码
    [go]从os.Stdin探究文件类源码
    [svc]linux中的文件描述符(file descriptor)和文件
    [net]tcp和udp&socket
    [django]update_or_create使用场景
    [sh]shell语法小结
    [drf]访问文档出现错误'AutoSchema' object has no attribute 'get_link'
    [py]python操作zookeeper
  • 原文地址:https://www.cnblogs.com/lovesqcc/p/4104359.html
Copyright © 2011-2022 走看看