zoukankan      html  css  js  c++  java
  • Python 并发总结,多线程,多进程,异步IO

    1 测量函数运行时间
    import time
    def profile(func):
        def wrapper(*args, **kwargs):
            import time
            start = time.time()
            func(*args, **kwargs)
            end   = time.time()
            print 'COST: {}'.format(end - start)
        return wrapper
     
    @profile
    def fib(n):
        if n<= 2:
            return 1
        return fib(n-1) + fib(n-2)
     
    fib(35)
     

    2 启动多个线程,并等待完成
     
    2.1 使用threading.enumerate()
    import threading
    for i in range(2):
        t = threading.Thread(target=fib, args=(35,))
        t.start()
    main_thread = threading.currentThread()
     
    for t in threading.enumerate():
        if t is main_thread:
            continue
        t.join()
    2.2 先保存启动的线程
    threads = []
    for i in range(5):
        t = Thread(target=foo, args=(i,))
        threads.append(t)
        t.start()
    for t in threads:
        t.join()
     

    3 使用信号量,限制同时能有几个线程访问临界区
    from threading import Semaphore
    import time
     
    sema = Semaphore(3)
     
    def foo(tid):
        with sema:
            print('{} acquire sema'.format(tid))
            wt = random() * 2
            time.sleep(wt)
            print('{} release sema'.format(tid))
     

    4 锁,相当于信号量为1的情况
    from threading import Thread Lock
    value = 0
    lock = Lock()
    def getlock():
        global lock
        with lock:
            new = value + 1
            time.sleep(0.001)
            value = new
     

    5 可重入锁RLock
        acquire() 可以不被阻塞的被同一个线程调用多次,release()需要和acquire()调用次数匹配才能释放锁

    6 条件 Condition
    一个线程发出信号,另一个线程等待信号
    常用于生产者-消费者模型
    import time
    import threading
     
    def consumer(cond):
        t = threading.currentThread()
        with cond:
            cond.wait()
            print("{}: Resource is available to sonsumer".format(t.name))
     
    def producer(cond):
        t = threading.currentThread()
        with cond:
            print("{}: Making resource available".format(t.name))
            cond.notifyAll()
     
    condition = threading.Condition()
    c1 = threading.Thread(name='c1', target=consumer, args=(condition,))
    c2 = threading.Thread(name='c2', target=consumer, args=(condition,))
    p = threading.Thread(name='p', target=producer, args=(condition,))
     
    c1.start()
    c2.start()
    p.start()
     

    7 事件 Event
    感觉和Condition 差不多
    import time
    import threading
    from random import randint
     
    TIMEOUT = 2
     
    def consumer(event, l):
        t = threading.currentThread()
        while 1:
            event_is_set = event.wait(TIMEOUT)
            if event_is_set:
                try:
                    integer = l.pop()
                    print '{} popped from list by {}'.format(integer, t.name)
                    event.clear()  # 重置事件状态
                except IndexError:  # 为了让刚启动时容错
                    pass
     
    def producer(event, l):
        t = threading.currentThread()
        while 1:
            integer = randint(10, 100)
            l.append(integer)
            print '{} appended to list by {}'.format(integer, t.name)
            event.set()  # 设置事件
            time.sleep(1)
     
    event = threading.Event()
    l = []
     
    threads = []
     
    for name in ('consumer1', 'consumer2'):
        t = threading.Thread(name=name, target=consumer, args=(event, l))
        t.start()
        threads.append(t)
     
    p = threading.Thread(name='producer1', target=producer, args=(event, l))
    p.start()
    threads.append(p)
     
     
    for t in threads:
        t.join()
     

    8 线程队列 
    线程队列有task_done() 和 join()
    标准库里的例子
    往队列内放结束标志,注意do_work阻塞可能无法结束,需要用超时
    import queue
    def worker():
        while True:
            item = q.get()
            if item is None:
                break
            do_work(item)
            q.task_done()
    q = queue.Queue()
    threads = []
    for i in range(num_worker_threads):
        t = threading.Thread(target=worker)
        t.start()
        threads.append(t)
    for item in source():
        q.put(item)
    q.join()
    for i in range(num_worker_threads):
        q.put(None)
    for t in threads:
        t.join()
     

    9 优先级队列 PriorityQueue
    import threading
    from random import randint
    from queue import PriorityQueue
     
    q = PriorityQueue()
     
    def double(n):
        return n * 2
     
    def producer():
        count = 0
        while 1:
            if count > 5:
                break
            pri = randint(0, 100)
            print('put :{}'.format(pri))
            q.put((pri, double, pri))  # (priority, func, args)
            count += 1
     
    def consumer():
        while 1:
            if q.empty():
                break
            pri, task, arg = q.get()
            print('[PRI:{}] {} * 2 = {}'.format(pri, arg, task(arg)))
            q.task_done()
            time.sleep(0.1)
     
    t = threading.Thread(target=producer)
    t.start()
    time.sleep(1)
    t = threading.Thread(target=consumer)
    t.start()
     

    10 线程池
    当线程执行相同的任务时用线程池
    10.1 multiprocessing.pool 中的线程池
    from multiprocessing.pool import ThreadPool
    pool = ThreadPool(5)
    pool.map(lambda x: x**2, range(5))
    10.2 multiprocessing.dummy
    from multiprocessing.dummy import Pool
    10.3 concurrent.futures.ThreadPoolExecutor
    from concurrent.futures improt ThreadPoolExecutor
    from concurrent.futures import as_completed
    import urllib.request
     
    URLS = ['http://www.baidu.com', 'http://www.hao123.com']
     
    def load_url(url, timeout):
        with urllib.request.urlopen(url, timeout=timeout) as conn:
            return conn.read()
     
    with ThreadPoolExecutor(max_workers=5) as executor:
        future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
        for future in as_completed(future_to_url):
            url = future_to_url[future]
            try:
                data = future.result()
            execpt Exception as exc:
                print("%r generated an exception: %s" % (url, exc))
            else:
                print("%r page is %d bytes" % (url, len(data)))
     

    11 启动多进程,等待多个进程结束
    import multiprocessing
    jobs = []
    for i in range(2):
        p = multiprocessing.Process(target=fib, args=(12,))
        p.start()
        jobs.append(p)
    for p in jobs:
        p.join()
     

    12 进程池
    12.1 multiprocessing.Pool
    from multiprocessing import Pool
    pool = Pool(2)
    pool.map(fib, [36] * 2)
     
    12.2 concurrent.futures.ProcessPoolExecutor
    from concurrent.futures import ProcessPoolExecutor
    import math
     
    PRIMES = [ 112272535095293, 112582705942171]
     
    def is_prime(n):
        if n < 2:
            return False
        if n == 2:
            return True
        if n % 2 == 0:
            return False
        sqrt_n = int(math.floor(math.sqrt(n)))
        for i in range(3, sqrt_n + 1, 2):
            if n % i == 0:
                return False
        return True
     
    if __name__ == "__main__":
        with ProcessPoolExecutor() as executor:
            for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
                print("%d is prime: %s" % (number, prime))
     

     
    13 asyncio
     
    13.1 最基本的示例,单个任务
    import asyncio
     
    async def hello():
        print("Hello world!")
        await asyncio.sleep(1)
        print("Hello again")
     
    loop = asyncio.get_event_loop()
    loop.run_until_complete(hello())
    loop.close()
    13.2 最基本的示例,多个任务
    import asyncio
     
    async def hello():
        print("Hello world!")
        await asyncio.sleep(1)
        print("Hello again")
     
    loop = asyncio.get_event_loop()
    tasks = [hello(), hello()]
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()
     
    13.3 结合httpx 执行多个任务并接收返回结果
    httpx 接口和 requests基本一致
    import asyncio
    import httpx
     
     
    async def get_url():
        r = await httpx.get("http://www.baidu.com")
        return r.status_code
     
     
    loop = asyncio.get_event_loop()
    tasks = [get_url() for i in range(10)]
    results = loop.run_until_complete(asyncio.gather(*tasks))
    loop.close()
     
     
    for num, result in zip(range(10), results):
        print(num, result)
     
     
     
  • 相关阅读:
    【leetcode_medium】78. Subsets
    【opencv基础】随机颜色生成
    【leetcode_easy_array】1566. Detect Pattern of Length M Repeated K or More Times
    XSSFSheet对象的格式设置(转)
    Devexpress控件使用技巧
    Visual Studio 2017社区版安装C++开发环境(转)
    DevExpress GridControl添加选择框的两种方法
    DevExpress GridControl使用教程:之 添加 checkbox 复选框(转)
    DevExpress中GridControl中实现checkbox多行选中(转)
    C#开发WinForm窗体程序时,如何在子窗体中关闭窗口时并退出程序?(转)
  • 原文地址:https://www.cnblogs.com/junmoxiao/p/11948993.html
Copyright © 2011-2022 走看看