zoukankan      html  css  js  c++  java
  • pymysql 线程安全pymysqlpool

    # -*-coding: utf-8-*-
    # Author : Christopher Lee
    # License: Apache License
    # File   : test_example.py
    # Date   : 2017-06-18 01-23
    # Version: 0.0.1
    # Description: simple test.
    
    
    import logging
    import string
    import threading
    
    import pandas as pd
    import random
    
    from pymysqlpool import ConnectionPool
    
    config = {
        'pool_name': 'test',
        'host': 'localhost',
        'port': 3306,
        'user': 'root',
        'password': 'chris',
        'database': 'test',
        'pool_resize_boundary': 50,
        'enable_auto_resize': True,
        # 'max_pool_size': 10
    }
    
    logging.basicConfig(format='[%(asctime)s][%(name)s][%(module)s.%(lineno)d][%(levelname)s] %(message)s',
                        datefmt='%Y-%m-%d %H:%M:%S',
                        level=logging.DEBUG)
    
    
    def connection_pool():
        # Return a connection pool instance
        pool = ConnectionPool(**config)
        # pool.connect()
        return pool
    
    
    def test_pool_cursor(cursor_obj=None):
        cursor_obj = cursor_obj or connection_pool().cursor()
        with cursor_obj as cursor:
            print('Truncate table user')
            cursor.execute('TRUNCATE user')
    
            print('Insert one record')
            result = cursor.execute('INSERT INTO user (name, age) VALUES (%s, %s)', ('Jerry', 20))
            print(result, cursor.lastrowid)
    
            print('Insert multiple records')
            users = [(name, age) for name in ['Jacky', 'Mary', 'Micheal'] for age in range(10, 15)]
            result = cursor.executemany('INSERT INTO user (name, age) VALUES (%s, %s)', users)
            print(result)
    
            print('View items in table user')
            cursor.execute('SELECT * FROM user')
            for user in cursor:
                print(user)
    
            print('Update the name of one user in the table')
            cursor.execute('UPDATE user SET name="Chris", age=29 WHERE id = 16')
            cursor.execute('SELECT * FROM user ORDER BY id DESC LIMIT 1')
            print(cursor.fetchone())
    
            print('Delete the last record')
            cursor.execute('DELETE FROM user WHERE id = 16')
    
    
    def test_pool_connection():
        with connection_pool().connection(autocommit=True) as conn:
            test_pool_cursor(conn.cursor())
    
    
    def test_with_pandas():
        with connection_pool().connection() as conn:
            df = pd.read_sql('SELECT * FROM user', conn)
            print(df)
    
    
    def delete_users():
        with connection_pool().cursor() as cursor:
            cursor.execute('TRUNCATE user')
    
    
    def add_users(users, conn):
        def execute(c):
            c.cursor().executemany('INSERT INTO user (name, age) VALUES (%s, %s)', users)
            c.commit()
    
        if conn:
            execute(conn)
            return
        with connection_pool().connection() as conn:
            execute(conn)
    
    
    def add_user(user, conn=None):
        def execute(c):
            c.cursor().execute('INSERT INTO user (name, age) VALUES (%s, %s)', user)
            c.commit()
    
        if conn:
            execute(conn)
            return
        with connection_pool().connection() as conn:
            execute(conn)
    
    
    def list_users():
        with connection_pool().cursor() as cursor:
            cursor.execute('SELECT * FROM user ORDER BY id DESC LIMIT 5')
            print('...')
            for x in sorted(cursor, key=lambda d: d['id']):
                print(x)
    
    
    def random_user():
        name = "".join(random.sample(string.ascii_lowercase, random.randint(4, 10))).capitalize()
        age = random.randint(10, 40)
        return name, age
    
    
    def worker(id_, batch_size=1, explicit_conn=True):
        print('[{}] Worker started...'.format(id_))
    
        def do(conn=None):
            for _ in range(batch_size):
                add_user(random_user(), conn)
    
        if not explicit_conn:
            do()
            return
    
        with connection_pool().connection() as c:
            do(c)
    
        print('[{}] Worker finished...'.format(id_))
    
    
    def bulk_worker(id_, batch_size=1, explicit_conn=True):
        print('[{}] Bulk worker started...'.format(id_))
    
        def do(conn=None):
            add_users([random_user() for _ in range(batch_size)], conn)
            time.sleep(3)
    
        if not explicit_conn:
            do()
            return
    
        with connection_pool().connection() as c:
            do(c)
    
        print('[{}] Worker finished...'.format(id_))
    
    
    def test_with_single_thread(batch_number, batch_size, explicit_conn=False, bulk_insert=False):
        delete_users()
        wk = worker if not bulk_insert else bulk_worker
        for i in range(batch_number):
            wk(i, batch_size, explicit_conn)
        list_users()
    
    
    def test_with_multi_threads(batch_number=1, batch_size=1000, explicit_conn=False, bulk_insert=False):
        delete_users()
    
        wk = worker if not bulk_insert else bulk_worker
    
        threads = []
        for i in range(batch_number):
            t = threading.Thread(target=wk, args=(i, batch_size, explicit_conn))
            threads.append(t)
            t.start()
    
        [t.join() for t in threads]
        list_users()
    
    
    if __name__ == '__main__':
        import time
    
        start = time.perf_counter()
        test_pool_cursor()
        test_pool_connection()
    
        test_with_pandas()
        test_with_multi_threads(20, 10, True, bulk_insert=True)
        test_with_single_thread(1, 10, True, bulk_insert=True)
        elapsed = time.perf_counter() - start
        print('Elapsed time is: "{}"'.format(elapsed))
    

      

  • 相关阅读:
    第三天 moyax
    mkfs.ext3 option
    write file to stroage trigger kernel warning
    download fomat install rootfs script
    custom usb-seriel udev relus for compatible usb-seriel devices using kermit
    Wifi Troughput Test using iperf
    learning uboot switch to standby system using button
    learning uboot support web http function in qca4531 cpu
    learngin uboot design parameter recovery mechanism
    learning uboot auto switch to stanbdy system in qca4531 cpu
  • 原文地址:https://www.cnblogs.com/c-x-a/p/9045646.html
Copyright © 2011-2022 走看看