zoukankan      html  css  js  c++  java
  • python连接Greenplum数据库

    配置greenplum客户端认证

    配置pg_hba.conf 

    cd /home/gpadmin/gpdbdata/master/gpseg-1
    vim pg_hba.conf 
    增加
    host    all         gpadmin          10.1.201.55/32    trust
    
    [gpadmin@ gpseg-1]$ export PGDATA=/home/gpadmin/gpdbdata/master/gpseg-1
    [gpadmin@ gpseg-1]$ pg_ctl reload -D $PGDATA
    server signaled

    使用Psycopg2访问数据库

    Psycopg2 是 Python 语言下最常用的连接PostgreSQL数据库连接库,Psycopg2 的底层是由 C 语言封装 PostgreSQL 的标准库  libpq 实现的,

    运行速度非常快,Psycopg2支持大型多线程应用的大量并发Insert和Update操作,Psycopg2完全兼容 DB API 2.0 

    安装Psycopg2 

    pip install psycopg2

    Psycopg2使用参考文档

    http://initd.org/psycopg/docs/index.html

    Psycopg2 连接PostgreSQL数据库接口

     Psycopg2提供的操作数据库的两个重要类是ConnectionCursor,和获取数据库连接的快捷函数connect()

    psycopg2.connect(host="localhost", port="5432", dbname="testdb", user="gpadmin", password="123456")

    常用关键词参数说明如下:

    • host:主机名或 IP 地址
    • port:连接PostgreSQL数据库使用的端口
    • dbname:连接的数据库,默认为与用户名同名的数据库
    • user:连接数据库的用户
    • password:连接数据库用户的密码

    Connection类方法说明

    Connection类用于获取到PostgreSQL数据库的连接,以下介绍Connection类常用的方法,详细内容阅读 Psycopg2 Connection 类

    • Connection()构造函数

      用于构造一个到PostgreSQL数据库的连接,常用的是使用connect()快捷函数构造数据库连接

    • connection.cursor()

      用于从当前的数据库连接中获取一个Cursor对象(游标),用于执行SQL语句。

    • cursor = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)

      这里创建的是一个字典Cursor, 这样返回的数据, 都是字典的形式, 方便使用

    • connection.close()

      关闭当前的数据库连接

    • connection.commit()

      此方法提交当前事务。如果不调用这个方法,无论做了什么修改,数据不能保存到数据库中。

    • connection.rollback()

      此方法会回滚任何更改数据库数据

    Cursor类方法说明

    Cursor类用于执行SQL语句,并返回执行结果,以下介绍Cursor类常用的方法,详细内容阅读 Psycopg2 Cursor 类

    • cursor.execute(query)

      用于执行SQL语句 query

    • cursor.mogrify(query)

      会返回生成的sql脚本, 用以查看生成的sql是否正确

    • cursor.fetchall()

      获取SQL执行结果中的所有记录,返回值是一个元组的列表,每一条记录是一个元组

    • cursor.fetchmany(([size=cursor.arraysize]))

      获取SQL执行结果中指定条数的记录,记录数由size指定,当不指定size值时,默认为arraysize属性的值,arraysize属性的默认值是1;返回值是一个元组的列表,每一条记录是一个元组

    • cursor.fetchone()

      获取执行结果中的一条记录

    • cursor.close()

      关闭当前连接的游标

    • curosr.callproc(procname[, parameters])

      这个程序执行的存储数据库程序给定的名称。该程序预计为每一个参数,参数的顺序必须包含一个条目

    • cursor.rowcount

      只读属性,它返回数据库中的行的总数已修改,插入或删除最后 execute*()

    psycopg2.pool模块说明

    提供了一些纯Python类直接在客户端应用程序实现简单的连接池。

    class psycopg2.pool.AbstractConnectionPool(minconn, maxconn, *args, **kwargs)

    基类实现通用的基于密钥池代码。

          自动创建新的minconn连接。池将支持的最大maxconn连接。* args,* * kwargs传递到connect()函数。

          以下预计将由子类实现方法:

    getconn(key=None)

       得到一个空连接,并将其分配给key如果不是没有。

    putconn(conn, key=None, close=False)

         put away 一个连接。

      如果关闭是真的,则放弃池中的连接。

     closeall()

          关闭池处理的所有连接。

          请注意所有的连接都关闭,包括最终在应用程序中使用的连接。

     下面的类,子类可以使用abstractconnectionpool。

    class psycopg2.pool.SimpleConnectionPool(minconn, maxconn, *args, **kwargs)

          不能在不同线程中共享的连接池。

          请注意,这个池类仅用于单线程应用程序。

    class psycopg2.pool.ThreadedConnectionPool(minconn, maxconn, *args, **kwargs)

          一个连接池与线程模块一起工作。

          注意这个池类可以安全地应用在多线程应用程序中。

    Psycopg2中可用的异常错误类

    异常错误类的继承关系如下:

    StandardError
    |__Warning
    |__Error
       |__InterfaceError
       |__DatabaseError
          |__DataError
          |__OperationalError
             |__ psycopg2.extensions.QueryCanceledError
             |__ psycopg2.extensions.TransactionRollbackError
          |__IntegrityError
          |__InternalError
          |__ProgrammingError
          |__NotSupportedError
    异常描述
    psycopg2.Warning 当有严重警告时触发,例如插入数据是被截断等等。
    psycopg2.Error 警告以外所有其他错误类。
    psycopg2.InterfaceError 当有数据库接口模块本身的错误(而不是数据库的错误)发生时触发。
    psycopg2.DatabaseError 和数据库有关的错误发生时触发。
    psycopg2.DataError 当有数据处理时的错误发生时触发,例如:除零错误,数据超范围等等。
    psycopg2.OperationalError 指非用户控制的,而是操作数据库时发生的错误。例如:连接意外断开、 数据库名未找到、事务处理失败、内存分配错误等等操作数据库是发生的错误。
    psycopg2.IntegrityError 完整性相关的错误,例如外键检查失败等。
    psycopg2.InternalError 数据库的内部错误,例如游标(cursor)失效了、事务同步失败等等。
    psycopg2.ProgrammingError 程序错误,例如数据表(table)没找到或已存在、SQL语句语法错误、 参数数量错误等等。
    psycopg2.NotSupportedError 不支持错误,指使用了数据库不支持的函数或API等。例如在连接对象上 使用.rollback()函数,然而数据库并不支持事务或者事务已关闭。

    Psycopg2使用举例

    简单的增加,查询记录

    import psycopg2
    import psycopg2.extras
    import time
    
    '''
        连接数据库
        returns:db
    '''
    def gp_connect():
        try:
            db = psycopg2.connect(dbname="testdb",
                                  user="gpadmin",
                                  password="gpadmin",
                                  host="10.1.208.42",
                                  port="5432")
            # connect()也可以使用一个大的字符串参数,
            # 比如”host=localhost port=5432 user=postgres password=postgres dbname=test”
            return db
        except psycopg2.DatabaseError as e:
            print("could not connect to Greenplum server",e)
    
    
    if __name__ == '__main__':
        conn = gp_connect()
        print(conn)
        cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
        # 这里创建的是一个字典Cursor, 这样返回的数据, 都是字典的形式, 方便使用
        ret = cur.execute("CREATE TABLE public.gp_test (id serial PRIMARY KEY, num integer, data varchar);")
        conn.commit()
        # 提交到数据库中
        print(ret)
        ret = cur.execute("INSERT INTO public.gp_test (num, data) VALUES (%s, %s);",(300, "abc'def"))
    
        conn.commit()
        # 提交到数据库中
        print(cur.rowcount)  # 1
        # 返回数据库中的行的总数已修改,插入或删除最后 execute*().
    
        ret_sql = cur.mogrify("select * from pg_tables where tablename = %s;", ('gp_test',))
        # 返回生成的sql脚本, 用以查看生成的sql是否正确.
        # sql脚本必须以;结尾, 不可以省略.其次, 不管sql中有几个参数, 都需要用 % s代替, 只有 % s, 不管值是字符还是数字, 一律 % s.
        # 最后, 第二个参数中, 一定要传入元组, 哪怕只有一个元素, 像我刚才的例子一样, ('gp_test')这样是不行的.
        print(ret_sql.decode('utf-8'))  # select * from pg_tables where tablename = E'gp_test';
    
        cur.execute("select * from gp_test where num = %s;", (300,))
        pg_obj = cur.fetchone()
        print(pg_obj) # {'id': 1, 'num': 300, 'data': "abc'def"}
    
        conn.close() # 关闭连接

    批量插入,查询

        conn = gp_connect()
        print(conn)
        cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
        # # 这里创建的是一个字典Cursor, 这样返回的数据, 都是字典的形式, 方便使用
        # ret = cur.execute("CREATE TABLE public.gp_test (id serial PRIMARY KEY, num integer, data varchar);")
        # conn.commit()
        # # 提交到数据库中
        # print(ret)
        gp_list = []
        for i in range(200):
            gp_list.append((i,'abc%s'%i))
        # print(gp_list)
        # 批量提交数据
        ret = cur.executemany("INSERT INTO public.gp_test (num, data) VALUES (%s, %s);", gp_list)
        conn.commit()
        # 提交到数据库中
        print(cur.query)  # 查看上一条执行的脚本
        print(cur.rowcount)  # 200
        # 返回数据库中的行的总数已修改,插入或删除最后 execute*().
        cur.execute("select  count(*) num from gp_test")
        pg_obj = cur.fetchone()
        print(pg_obj)  # {'num': 200}
    
        conn.close()  # 关闭连接
    

    使用连接池,执行高性能的批量插入与查询

    import psycopg2
    import psycopg2.extras
    import psycopg2.pool
    from datetime import datetime
    
    '''
        连接数据库
        使用数据库连接池
        returns:db
    '''
    def gp_connect():
        try:
            simple_conn_pool = psycopg2.pool.SimpleConnectionPool(minconn=1, maxconn=5,dbname="testdb",
                                  user="gpadmin",
                                  password="gpadmin",
                                  host="10.1.208.42",
                                  port="5432")
            # connect()也可以使用一个大的字符串参数,
            # 比如”host=localhost port=5432 user=postgres password=postgres dbname=test”
            # 从数据库连接池获取连接
            conn = simple_conn_pool.getconn()
            return conn
        except psycopg2.DatabaseError as e:
            print("could not connect to Greenplum server",e)
    
    
    if __name__ == '__main__':
        conn = gp_connect()
        print(conn)
        cur = conn.cursor()
        # 批量查询大小
        batch_size = 1000
        gp_list = []
        for i in range(2000, 100000):
            gp_list.append((i,'abc%s'%i))
        # print(gp_list)
    
        # 开始时间
        start_time = datetime.now()
        # 批量提交数据execute_values性能大于executemany
        psycopg2.extras.execute_values(cur, "INSERT INTO public.gp_test (num, data) VALUES %s", gp_list)
        conn.commit()
        # 提交到数据库中
        cur.execute("select  *  from gp_test order by id")
        count = 0
    
        while True:
            count = count + 1
            # 每次获取时会从上次游标的位置开始移动size个位置,返回size条数据
            data = cur.fetchmany(batch_size)
            # 数据为空的时候中断循环
            if not data:
                break
            else:
                print(data[-1])  # 得到最后一条(通过元祖方式返回)
            print('获取%s到%s数据成功' % ((count - 1) * batch_size, count * batch_size))
        print('insert到fetchmany获取全量数据所用时间:', (datetime.now() - start_time).seconds) # 16s
    conn.close()  # 关闭连接

    执行高性能的批量更新与查询

    import psycopg2
    import psycopg2.extras
    import psycopg2.pool
    from datetime import datetime
    
    '''
        连接数据库
        使用数据库连接池
        returns:db
    '''
    def gp_connect():
        ……略
    
    if __name__ == '__main__':
        conn = gp_connect()
        print(conn)
        cur = conn.cursor()
        # 批量查询大小
        batch_size = 1000
        gp_uplist = [] # 更新列表
        for i in range(2000, 10000):
            gp_uplist.append((i,'def%s'%i))
        print(gp_uplist)
    
        # 开始时间
        start_time = datetime.now()
        # 批量提交数据execute_values性能大于executemany
    
        sql = "UPDATE public.gp_test SET data = TEST.data  " 
              "FROM (VALUES %s) AS TEST(num, data) " 
              "WHERE public.gp_test.num = TEST.num"
        # 批量更新语句模版 UPDATE TABLE SET TABLE.COL = XX.col
        # FROM (VALUES %s) AS XX(id_col,col)
        # WHERE TABLE.id_col = XX.id_col 
        # XX为别名
        psycopg2.extras.execute_values(cur, sql, gp_uplist, page_size=100)
        print(cur.query)
        conn.commit()
        # 提交到数据库中
        cur.execute("select  *  from gp_test order by id")
        count = 0
    
        while True:
            count = count + 1
            # 每次获取时会从上次游标的位置开始移动size个位置,返回size条数据
            data = cur.fetchmany(batch_size)
            # 数据为空的时候中断循环
            if not data:
                break
            else:
                print(data[-1])  # 得到最后一条(通过元祖方式返回)
            print('获取%s到%s数据成功' % ((count - 1) * batch_size, count * batch_size))
        print('update到fetchmany获取全量数据所用时间:', (datetime.now() - start_time).seconds) # 16s
    conn.close()  # 关闭连接

     使用服务端游标

    当执行一个数据库查询时,Pscopg cursor通常将查询到的所有数据返回给客户端,如果返回的数据过大,则将占用客户端大量的内存。因此,psycopg提供了一种成为server side curosr机制,每次返回可控制数量的数据。

    Server side cursor是使用PostgreSQL的DECLARE命令创建,并经过MOVE、FETCH和CLOSE命令处理的。

    Psycopg通过命名的cursors装饰server side cursor的,而命名cursor是通过对cursor()方法指定name参数而创建的。

    server side cursor允许用户在数据集中使用scroll()移动游标,并通过fetchone()和fetchmany()方法获取数据。

    • scrollable:控制游标是否可以向后移动
    • itersize:控制每次可以获取多少条数据,默认是2000
    #逐条处理
    with psycopg2.connect(database_connection_string) as conn:
        with conn.cursor(name='name_of_cursor') as cursor:
    
            cursor.itersize = 20000
    
            query = "SELECT * FROM ..."
            cursor.execute(query)
    
            for row in cursor:
             # process row
    
    #2 一次处理多条
    while True:
        rows = cursor.fetchmany(100)
        if len(rows) > 0:
            for row in rows:
                # process row
        else:
            break
  • 相关阅读:
    阿里云服务器 FTP配置图文教程和添加两个FTP站点
    SAP 用事务码SQVI 做简单报表 .
    公司间联动权限解决方案
    table合并单元格colspan和rowspan .
    js更改input标签的读写属性
    小div在大div中垂直居中,以及div在页面垂直居中
    UI控件(UITextField)
    node基本理念(事件、多线程、进程)
    MyBatis(跨表查询)
    MyBatis(增删改查)
  • 原文地址:https://www.cnblogs.com/xiao-apple36/p/10362367.html
Copyright © 2011-2022 走看看