zoukankan      html  css  js  c++  java
  • 【模块】:Concurrent

    concurrent 模块

    回顾:

      对于python来说,作为解释型语言,Python的解释器必须做到既安全又高效。我们都知道多线程编程会遇到的问题,解释器要留意的是避免在不同的线程操作内部共享的数据,同时它还要保证在管理用户线程时保证总是有最大化的计算资源。而python是通过使用全局解释器锁来保护数据的安全性:

      python代码的执行由python虚拟机来控制,即Python先把代码(.py文件)编译成字节码(字节码在Python虚拟机程序里对应的是PyCodeObject对象,.pyc文件是字节码在磁盘上的表现形式),交给字节码虚拟机,然后虚拟机一条一条执行字节码指令,从而完成程序的执行。python在设计的时候在虚拟机中,同时只能有一个线程执行。同样地,虽然python解释器中可以运行多个线程,但在任意时刻,只有一个线程在解释器中运行。而对python虚拟机的访问由全局解释器锁来控制,正是这个锁能保证同一时刻只有一个线程在运行

    多线程执行方式:

    • 设置GIL(global interpreter lock).
    • 切换到一个线程执行。
    • 运行:
    •     a,指定数量的字节码指令。
    •     b,线程主动让出控制(可以调用time.sleep(0))。
    • 把线程设置为睡眠状态。
    • 解锁GIL.
    • 再次重复以上步骤。
      GIL的特性,也就导致了python不能充分利用多核cpu。而对面向I/O的(会调用内建操作系统C代码的)程序来说,GIL会在这个I/O调用之前被释放,以允许其他线程在这个线程等待I/O的时候运行。如果线程并为使用很多I/O操作,它会在自己的时间片一直占用处理器和GIL。这也就是所说的:I/O密集型python程序比计算密集型的程序更能充分利用多线程的好处。
    总之,不要使用python多线程,使用python多进程进行并发编程,就不会有GIL这种问题存在,并且也能充分利用多核cpu

     

    threading使用回顾:

    import threading
    import time
    
    def run(n):
        semaphore.acquire()
        time.sleep(2)
        print("run the thread: %s" % n)
        semaphore.release()
    
    if __name__ == '__main__':
        start_time = time.time()
        thread_list = []
        semaphore = threading.BoundedSemaphore(5)  # 信号量,最多允许5个线程同时运行
        for i in range(20):
            t = threading.Thread(target=run, args=(i,))
            t.start()
            thread_list.append(t)
        for t in thread_list:
            t.join()
    
        used_time = time.time() - start_time
        print('用时',used_time)
    
    # 用时 8.04102110862732
    

      

    ThreadPoolExecutor多并发:

    1、submit

    import time
    from concurrent import futures
    
    
    def run(n):
        time.sleep(2)
        print("run the thread: %s" % n)
    
    if __name__ == '__main__':
        start = time.time()
        with futures.ThreadPoolExecutor(5) as executor:
            for i in range(20):
                executor.submit(run,i)     
    
        print(time.time()-start)
    
    # 8.006775379180908
    

     2、map

    import time
    from concurrent import futures
    
    
    def run(n):
        time.sleep(2)
        print("run the thread: %s" % n)
    
    if __name__ == '__main__':
        start = time.time()
        with futures.ThreadPoolExecutor(5) as executor:
            executor.map(run,range(20))
    
        print(time.time()-start)
    
    # 8.006775379180908 

    executor.submit 和 futures.as_completed 这个组合比executor.map 更灵活,因为 submit 方法能处理不同的可调用对象和参数,而 executor.map 只能处理参数不同的同一个可调用对象。此外,传给 futures.as_completed 函数的期物集合可以来自多个 Executor 实例,例如一些由 ThreadPoolExecutor 实例创建,另一些由ProcessPoolExecutor创建

    ProcessPoolExecutor多并发:

    1、submit

    import time
    from concurrent import futures
    
    import time
    from concurrent import futures
    
    
    def run(n):
        time.sleep(2)
        print("run the thread: %s" % n)
    
    
    if __name__ == '__main__':
        start = time.time()
        with futures.ProcessPoolExecutor(5) as executor:
            for i in range(20):
                executor.submit(run, i)
    
        print(time.time() - start)
    
    # 8.365714311599731
    

    2、map

    import time
    from concurrent import futures
    
    import time
    from concurrent import futures
    
    
    def run(n):
        time.sleep(2)
        print("run the thread: %s" % n)
    
    
    if __name__ == '__main__':
        start = time.time()
        with futures.ProcessPoolExecutor(5) as executor:
            executor.map(run, range(20))
    
        print(time.time() - start)
    
    # 8.317736864089966
    

    接口压力测试的脚本

    # #!/usr/bin/env python
    # # -*- coding:utf-8 -*-
    
    import os
    import time
    import logging
    import requests
    import threading
    from multiprocessing import Lock,Manager
    from concurrent import futures
    
    
    download_url = 'http://192.168.188.105:8888'
    workers = 250
    cpu_count = 4
    
    session = requests.Session()
    
    def handle(cost,mutex,contain):
        with mutex:
            min_cost = contain['min_cost']
            max_cost = contain['max_cost']
            hit_count = contain['hit_count']
            average_cost = contain['average_cost']
            if min_cost == 0:
                contain['min_cost'] = cost
            if min_cost > cost:
                contain['min_cost'] = cost
            if max_cost < cost:
                contain['max_cost'] = cost
            average_cost = (average_cost*hit_count + cost) / (hit_count + 1)
            hit_count +=1
            contain['average_cost'] = average_cost
            contain['hit_count'] = hit_count
        logging.info(contain)
    
    def download_one(mutex,contain):
        while True:
            try:
                stime = time.time()
                request = requests.Request(method='GET', url=download_url,)
                prep = session.prepare_request(request)
                response = session.send(prep, timeout=50)
                etime = time.time()
                print(response.status_code)
                logging.info('process[%s] thread[%s] status[%s] cost[%s]',os.getpid(),threading.current_thread().ident,
                             response.status_code,etime-stime)
                handle(float(etime-stime),mutex,contain)
                # time.sleep(1)
            except Exception as e:
                logging.error(e)
                print(e)
    
    def new_thread_pool(mutex,contain):
        with futures.ThreadPoolExecutor(workers) as executor:
            for i in range(workers):
                executor.submit(download_one,mutex,contain)
    
    def subprocess():
        manager = Manager()
        mutex = manager.Lock()
        contain = manager.dict({'average_cost': 0, 'min_cost': 0, 'max_cost': 0, 'hit_count': 0})
    
        with futures.ProcessPoolExecutor(cpu_count) as executor:
            for i in range(cpu_count):
                executor.submit(new_thread_pool,mutex,contain)
    
    if __name__ == '__main__':
        logging.basicConfig(filename="client.log", level=logging.INFO,
                            format="%(asctime)s  [%(filename)s:%(lineno)d] %(message)s", datefmt="%m/%d/%Y %H:%M:%S [%A]")
        subprocess()
    

      

      

  • 相关阅读:
    字符编码乱码处理
    字典,元组,集合的使用
    三级菜单 ,求1
    运算符 if和while的使用
    if条件判断和while循环
    基本数据类型的结构和使用方法
    计算机基础部分
    计算机基础
    数据库之表查询,单表、多表,子查询
    google map API 学习
  • 原文地址:https://www.cnblogs.com/lianzhilei/p/6506636.html
Copyright © 2011-2022 走看看