zoukankan      html  css  js  c++  java
  • Python3 系列之 并行编程

    进程和线程

    进程是程序运行的实例。一个进程里面可以包含多个线程,因此同一进程下的多个线程之间可以共享线程内的所有资源,它是操作系统动态运行的基本单元;每一个线程是进程下的一个实例,可以动态调度和独立运行,由于线程和进程有很多类似的特点,因此,线程又被称为轻量级的进程。线程的运行在进程之下,进程的存在依赖于线程;

    开胃菜

    基于 Python3 创建一个简单的进程示例

    from threading import Thread
    from time import sleep
    
    
    class CookBook(Thread):
        def __init__(self):
            Thread.__init__(self)
            self.message = "Hello Parallel Python CookBook!!
    "
    
        def print_message(self):
            print(self.message)
    
        def run(self):
            print("Thread Starting
    ")
            x = 0
            while x < 10:
                self.print_message()
                sleep(2)
                x += 1
            print("Thread Ended!
    ")
    
    
    print("Process Started")
    hello_python = CookBook()
    
    hello_python.start()
    print("Process Ended")
    

    需要注意的是,永远不要让线程在后台默默执行,当其执行完毕后要及时释放资源。

    基于线程的并行

    多线程编程一般使用共享内存空间进行线程间的通信,这就使管理内存空间成为多线程编程的关键。Python 通过标准库 threading 模块来管理线程,具有以下的组件:

    • 线程对象
    • Lock 对象
    • RLock 对象
    • 信号对象
    • 条件对象
    • 事件对象

    定义一个线程

    基本语法

    示例代码如下所示

    import threading
    
    
    def function(i):
        print("function called by thread: {0}".format(i))
        return
    
    
    threads = []
    for i in range(5):
        t = threading.Thread(target=function, args=(i,))
        threads.append(t)
        t.start()
    
    lambda t, threads: t.join()
    

    需要注意的是,线程创建后并不会自动运行,需要主动调用 start() 方法来启动线程,join() 会让调用它的线程被阻塞直到执行结束。(PS:可通过调用 t.setDaemon(True) 使其为后台线程避免主线程被阻塞)

    线程定位

    示例代码如下所示

    import threading
    import time
    
    
    def first_function():
        print("{0} is starting".format(threading.currentThread().getName()))
        time.sleep(2)
        print("{0} is Exiting".format(threading.currentThread().getName()))
    
    
    def second_function():
        print("{0} is starting".format(threading.currentThread().getName()))
        time.sleep(2)
        print("{0} is Exiting".format(threading.currentThread().getName()))
    
    
    def third_function():
        print("{0} is starting".format(threading.currentThread().getName()))
        time.sleep(2)
        print("{0} is Exiting".format(threading.currentThread().getName()))
    
    if __name__ == "__main__":
        t1 = threading.Thread(target=first_function,name="first")
        t2 = threading.Thread(target=second_function,name="second")
        t3 = threading.Thread(target=third_function,name="third")
    
        t1.start()
        t2.start()
        t3.start()
        t1.join()
        t2.join()
        t3.join()
    

    通过设置 threading.Thread() 函数的 name 参数来设置线程名称,通过 threading.currentThread().getName() 来获取当前线程名称;线程的默认名称会以 Thread-{i} 格式来定义

    自定义一个线程对象

    示例代码如下所示

    import threading
    import time
    
    exitFlag = 0
    
    
    class myThread(threading.Thread):
        def __init__(self, threadID, name, counter):
            threading.Thread.__init__(self)
            self.threadID = threadID
            self.name = name
            self.counter = counter
    
        def run(self):
            print("Starting:{0}".format(self.name))
            print_time(self.name, self.counter, 5)
            print("Exiting:{0}".format(self.name))
    
    
    def print_time(threadName, delay, counter):
        while counter:
            if exitFlag:
                thread.exit()
            time.sleep(delay)
            print("{0} {1}".format(threadName, time.ctime(time.time())))
            counter -= 1
    
    
    t1 = myThread(1, "Thread-1", 1)
    t2 = myThread(2, "Thread-2", 1)
    
    t1.start()
    t2.start()
    
    t1.join()
    t2.join()
    
    print("Exiting Main Thread.")
    

    如果想自定义一个线程对象,首先就是要定义一个继承 threading.Thread 类的子类,实现构造函数, 并重写 run() 方法即可。

    线程同步

    Lock

    示例代码如下所示

    import threading
    
    shared_resource_with_lock = 0
    shared_resource_with_no_lock = 0
    COUNT = 100000
    shared_resource_lock = threading.Lock()
    
    
    def increment_with_lock():
        global shared_resource_with_lock
        for i in range(COUNT):
            shared_resource_lock.acquire()
            shared_resource_with_lock += 1
            shared_resource_lock.release()
    
    
    def decrement_with_lock():
        global shared_resource_with_lock
        for i in range(COUNT):
            shared_resource_lock.acquire()
            shared_resource_with_lock -= 1
            shared_resource_lock.release()
    
    
    def increment_without_lock():
        global shared_resource_with_no_lock
        for i in range(COUNT):
            shared_resource_with_no_lock += 1
    
    
    def decrement_wthout_lock():
        global shared_resource_with_no_lock
        for i in range(COUNT):
            shared_resource_with_no_lock -= 1
    
    
    if __name__ == "__main__":
        t1 = threading.Thread(target=increment_with_lock)
        t2 = threading.Thread(target=decrement_with_lock)
        t3 = threading.Thread(target=increment_without_lock)
        t4 = threading.Thread(target=decrement_wthout_lock)
        t1.start()
        t2.start()
        t3.start()
        t4.start()
        t1.join()
        t2.join()
        t3.join()
        t4.join()
        print("the value of shared variable with lock management is :{0}".format(
            shared_resource_with_lock))
        print("the value of shared variable with race condition is :{0}".format(
            shared_resource_with_no_lock))
    

    通过 threading.Lock() 方法我们可以拿到线程锁,一般有两种操作方式:acquire()release() 在两者之间是加锁状态,如果释放失败的话会显示 RuntimError() 的异常。

    RLock

    RLock 也叫递归锁,和 Lock 的区别在于:谁拿到谁释放,是通过 threading.RLock() 来拿到的;

    示例代码如下所示

    import threading
    import time
    
    
    class Box(object):
        lock = threading.RLock()
    
        def __init__(self):
            self.total_items = 0
    
        def execute(self, n):
            Box.lock.acquire()
            self.total_items += n
            Box.lock.release()
    
        def add(self):
            Box.lock.acquire()
            self.execute(1)
            Box.lock.release()
    
        def remove(self):
            Box.lock.acquire()
            self.execute(-1)
            Box.lock.release()
    
    
    def adder(box, items):
        while items > 0:
            print("adding 1 item in the box")
            box.add()
            time.sleep(1)
            items -= 1
    
    
    def remover(box, items):
        while items > 0:
            print("removing 1 item in the box")
            box.remove()
            time.sleep(1)
            items -= 1
    
    
    if __name__ == "__main__":
        items = 5
        print("putting {0} items in the box".format(items))
        box = Box()
        t1 = threading.Thread(target=adder, args=(box, items))
        t2 = threading.Thread(target=remover, args=(box, items))
    
        t1.start()
        t2.start()
    
        t1.join()
        t2.join()
        print("{0} items still remain in the box".format(box.total_items))
    

    信号量

    示例代码如下所示

    import threading
    import time
    import random
    
    semaphore = threading.Semaphore(0)
    
    
    def consumer():
        print("Consumer is waiting.")
        semaphore.acquire()
        print("Consumer notify:consumed item numbers {0}".format(item))
    
    
    def producer():
        global item
        time.sleep(10)
        item = random.randint(0, 10000)
        print("producer notify:produced item number {0}".format(item))
        semaphore.release()
    
    
    if __name__ == "__main__":
        for i in range(0, 5):
            t1 = threading.Thread(target=producer)
            t2 = threading.Thread(target=consumer)
            t1.start()
            t2.start()
            t1.join()
            t2.join()
    
        print("program terminated.")
    

    信号量初始化为 0 ,然后在两个并行线程中,通过调用 semaphore.acquire() 函数会阻塞消费者线程,直到 semaphore.release() 在生产者中被调用,这里模拟了生产者-消费者 模式来进行了测试;如果信号量的计数器到了0,就会阻塞 acquire() 方法,直到得到另一个线程的通知。如果信号量的计数器大于0,就会对这个值-1然后分配资源。

    使用条件进行线程同步

    解释条件机制最好的例子还是生产者-消费者问题。在本例中,只要缓存不满,生产者一直向缓存生产;只要缓存不空,消费者一直从缓存取出(之后销毁)。当缓冲队列不为空的时候,生产者将通知消费者;当缓冲队列不满的时候,消费者将通知生产者。

    示例代码如下所示

    from threading import Thread, Condition
    import time
    
    items = []
    condition = Condition()
    
    
    class consumer(Thread):
        def __init__(self):
            Thread.__init__(self)
    
        def consume(self):
            global condition
            global items
            condition.acquire()
            if len(items) == 0:
                condition.wait()
                print("Consumer notify:no item to consum")
            items.pop()
            print("Consumer notify: consumed 1 item")
            print("Consumer notify: item to consume are:{0}".format(len(items)))
    
            condition.notify()
            condition.release()
    
        def run(self):
            for i in range(0, 20):
                time.sleep(2)
                self.consume()
    
    
    class producer(Thread):
        def __init__(self):
            Thread.__init__(self)
    
        def produce(self):
            global condition
            global items
            condition.acquire()
            if len(items) == 10:
                condition.wait()
                print("Producer notify:items producted are:{0}".format(len(items)))
                print("Producer notify:stop the production!!")
            items.append(1)
            print("Producer notify:total items producted:{0}".format(len(items)))
            condition.notify()
            condition.release()
    
        def run(self):
            for i in range(0, 20):
                time.sleep(1)
                self.produce()
    
    
    if __name__ == "__main__":
        producer = producer()
        consumer = consumer()
        producer.start()
        consumer.start()
        producer.join()
        consumer.join()
    

    通过 condition.acquire() 来获取锁对象,condition.wait() 会使当前线程进入阻塞状态,直到收到 condition.notify() 信号,同时,调用信号的通知的对象也要及时调用 condition.release() 来释放资源;

    使用事件进行线程同步

    事件是线程之间用于通信的对。有的线程等待信号,有的线程发出信号。

    示例代码如下所示

    import time
    from threading import Thread, Event
    import random
    
    items = []
    event = Event()
    
    
    class consumer(Thread):
        def __init__(self, items, event):
            Thread.__init__(self)
            self.items = items
            self.event = event
    
        def run(self):
            while True:
                time.sleep(2)
                self.event.wait()
                item = self.items.pop()
                print('Consumer notify:{0} popped from list by {1}'.format(
                    item, self.name))
    
    
    class producer(Thread):
        def __init__(self, integers, event):
            Thread.__init__(self)
            self.items = items
            self.event = event
    
        def run(self):
            global item
            for i in range(100):
                time.sleep(2)
                item = random.randint(0, 256)
                self.items.append(item)
                print('Producer notify: item  N° %d appended to list by %s' %
                      (item, self.name))
                print('Producer notify: event set by %s' % self.name)
                self.event.set()
                print('Produce notify: event cleared by %s ' % self.name)
                self.event.clear()
    
    
    if __name__ == "__main__":
        t1 = producer(items, event)
        t2 = consumer(items, event)
        t1.start()
        t2.start()
        t1.join()
        t2.join()
    

    使用 with 语法简化代码

    import threading
    import logging
    
    logging.basicConfig(level=logging.DEBUG,
                        format='(%(threadName)-10s) %(message)s')
    
    
    def threading_with(statement):
        with statement:
            logging.debug("%s acquired via with" % statement)
    
    
    def Threading_not_with(statement):
        statement.acquire()
        try:
            logging.debug("%s acquired directly " % statement)
        finally:
            statement.release()
    
    
    if __name__ == "__main__":
        lock = threading.Lock()
        rlock = threading.RLock()
        condition = threading.Condition()
        mutex = threading.Semaphore(1)
        threading_synchronization_list = [lock, rlock, condition, mutex]
    
        for statement in threading_synchronization_list:
            t1 = threading.Thread(target=threading_with, args=(statement,))
            t2 = threading.Thread(target=Threading_not_with, args=(statement,))
            t1.start()
            t2.start()
            t1.join()
            t2.join()
    

    使用 queue 进行线程通信

    Queue 常用的方法有以下四个:

    • put():往 queue 中添加一个元素
    • get():从 queue 中删除一个元素,并返回该元素
    • task_done():每次元素被处理的时候都需要调用这个方法
    • join():所有元素都被处理之前一直阻塞
    from threading import Thread, Event
    from queue import Queue
    import time
    import random
    
    
    class producer(Thread):
        def __init__(self, queue):
            Thread.__init__(self)
            self.queue = queue
    
        def run(self):
            for i in range(10):
                item = random.randint(0, 256)
                self.queue.put(item)
                print("Producer notify: item item N° %d appended to queue by %s" %
                      (item, self.name))
                time.sleep(1)
    
    
    class consumer(Thread):
        def __init__(self, queue):
            Thread.__init__(self)
            self.queue = queue
    
        def run(self):
            while True:
                item = self.queue.get()
                print('Consumer notify : %d popped from queue by %s' %
                      (item, self.name))
                self.queue.task_done()
    
    
    if __name__ == "__main__":
        queue = Queue()
        t1 = producer(queue)
        t2 = consumer(queue)
        t3 = consumer(queue)
        t4 = consumer(queue)
        t1.start()
        t2.start()
        t3.start()
        t4.start()
        t1.join()
        t2.join()
        t3.join()
        t4.join()
    

    基于进程的并行

    multiprocessing 是 Python 标准库中的模块,实现了共享内存机制。

    异步编程

    使用 concurrent.futures 模块

    该模块具有线程池和进程池,管理并行编程任务、处理非确定性的执行流程、进程/线程同步等功能;此模块由以下部分组成

    • concurrent.futures.Executor: 这是一个虚拟基类,提供了异步执行的方法。
    • submit(function, argument): 调度函数(可调用的对象)的执行,将 argument 作为参数传入。
    • map(function, argument): 将 argument 作为参数执行函数,以 异步 的方式。
    • shutdown(Wait=True): 发出让执行者释放所有资源的信号。
    • concurrent.futures.Future: 其中包括函数的异步执行。Future对象是submit任务(即带有参数的functions)到executor的实例。

    示例代码如下所示

    import concurrent.futures
    import time
    
    number_list = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    
    
    def evaluate_item(x):
        result_item = count(x)
        return result_item
    
    
    def count(number):
        for i in range(0, 1000000):
            i = i + 1
        return i * number
    
    
    if __name__ == "__main__":
        # 顺序执行
        start_time = time.time()
        for item in number_list:
            print(evaluate_item(item))
        print("Sequential execution in " + str(time.time() - start_time), "seconds")
        # 线程池执行
        start_time_1 = time.time()
        with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
            futures = [executor.submit(evaluate_item, item)
                       for item in number_list]
            for future in concurrent.futures.as_completed(futures):
                print(future.result())
        print("Thread pool execution in " +
              str(time.time() - start_time_1), "seconds")
        # 线程池执行
        start_time_2 = time.time()
        with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
            futures = [executor.submit(evaluate_item, item)
                       for item in number_list]
            for future in concurrent.futures.as_completed(futures):
                print(future.result())
        print("Process pool execution in " +
              str(time.time() - start_time_2), "seconds")
    

    使用 Asyncio 管理事件循环

    Python 的 Asyncio 模块提供了管理事件、协程、任务和线程的方法,以及编写并发代码的原语。此模块的主要组件和概念包括:

    • 事件循环: 在Asyncio模块中,每一个进程都有一个事件循环。
    • 协程: 这是子程序的泛化概念。协程可以在执行期间暂停,这样就可以等待外部的处理(例如IO)完成之后,从之前暂停的地方恢复执行。
    • Futures: 定义了 Future 对象,和 concurrent.futures 模块一样,表示尚未完成的计算。
    • Tasks: 这是Asyncio的子类,用于封装和管理并行模式下的协程。

    Asyncio 提供了以下方法来管理事件循环:

    • loop = get_event_loop(): 得到当前上下文的事件循环。
    • loop.call_later(time_delay, callback, argument): 延后 time_delay 秒再执行 callback 方法。
    • loop.call_soon(callback, argument): 尽可能快调用 callback, call_soon() 函数结束,主线程回到事件循环之后就会马上调用 callback 。
    • loop.time(): 以float类型返回当前时间循环的内部时间。
    • asyncio.set_event_loop(): 为当前上下文设置事件循环。
    • asyncio.new_event_loop(): 根据此策略创建一个新的时间循环并返回。
    • loop.run_forever(): 在调用 stop() 之前将一直运行。

    示例代码如下所示

    import asyncio
    import datetime
    import time
    
    
    def fuction_1(end_time, loop):
        print("function_1 called")
        if(loop.time() + 1.0) < end_time:
            loop.call_later(1, fuction_2, end_time, loop)
        else:
            loop.stop()
    
    
    def fuction_2(end_time, loop):
        print("function_2 called")
        if(loop.time() + 1.0) < end_time:
            loop.call_later(1, function_3, end_time, loop)
        else:
            loop.stop()
    
    
    def function_3(end_time, loop):
        print("function_3 called")
        if(loop.time() + 1.0) < end_time:
            loop.call_later(1, fuction_1, end_time, loop)
        else:
            loop.stop()
    
    
    def function_4(end_time, loop):
        print("function_4 called")
        if(loop.time() + 1.0) < end_time:
            loop.call_later(1, function_4, end_time, loop)
        else:
            loop.stop()
    
    
    loop = asyncio.get_event_loop()
    
    end_loop = loop.time() + 9.0
    loop.call_soon(fuction_1, end_loop, loop)
    loop.run_forever()
    loop.close()
    

    使用 Asyncio 管理协程

    示例代码如下所示

    import asyncio
    import time
    from random import randint
    
    
    @asyncio.coroutine
    def StartState():
        print("Start State called 
    ")
        input_val = randint(0, 1)
        time.sleep(1)
        if input_val == 0:
            result = yield from State2(input_val)
        else:
            result = yield from State1(input_val)
        print("Resume of the Transition:
    Start State calling" + result)
    
    
    @asyncio.coroutine
    def State1(transition_value):
        outputVal = str("State 1 with transition value=%s 
    " % (transition_value))
        input_val = randint(0, 1)
        time.sleep(1)
        print("...Evaluating...")
        if input_val == 0:
            result = yield from State3(input_val)
        else:
            result = yield from State2(input_val)
    
    
    @asyncio.coroutine
    def State2(transition_value):
        outputVal = str("State 2 with transition value= %s 
    " %
                        (transition_value))
        input_Val = randint(0, 1)
        time.sleep(1)
        print("...Evaluating...")
        if (input_Val == 0):
            result = yield from State1(input_Val)
        else:
            result = yield from State3(input_Val)
        result = "State 2 calling " + result
        return outputVal + str(result)
    
    
    @asyncio.coroutine
    def State3(transition_value):
        outputVal = str("State 3 with transition value = %s 
    " %
                        (transition_value))
        input_val = randint(0, 1)
        time.sleep(1)
        print("...Evaluating...")
        if(input_val == 0):
            result = yield from State1(input_val)
        else:
            result = yield from State2(input_val)
        result = "State 3 calling " + result
        return outputVal + str(result)
    
    
    @asyncio.coroutine
    def EndState(transition_value):
        outputVal = str("End State With transition value = %s 
    " %
                        (transition_value))
        print("...Stop Computation...")
        return outputVal
    
    
    if __name__ == "__main__":
        print("Finites State Machine simulation with Asyncio Coroutine")
        loop = asyncio.get_event_loop()
        loop.run_until_complete(StartState())
    

    使用 Asyncio 控制任务

    示例代码如下所示

    import asyncio
    
    
    @asyncio.coroutine
    def factorial(number):
        f = 1
        for i in range(2, number + 1):
            print("Asyncio.Task:Compute factorial(%s)" % (i))
            yield from asyncio.sleep(1)
            f *= i
        print("Asyncio.Task - factorial(%s) = %s" % (number, f))
    
    
    @asyncio.coroutine
    def fibonacci(number):
        a, b = 0, 1
        for i in range(number):
            print("Asyncio.Task:Complete fibonacci (%s)" % (i))
            yield from asyncio.sleep(1)
            a, b = b, a+b
        print("Asyncio.Task - fibonaci (%s)= %s" % (number, a))
    
    
    @asyncio.coroutine
    def binomialCoeff(n, k):
        result = 1
        for i in range(1, k+1):
            result = result * (n-i+1) / i
            print("Asyncio.Task:Compute binomialCoeff (%s)" % (i))
            yield from asyncio.sleep(1)
        print("Asyncio.Task - binomialCoeff (%s,%s) = %s" % (n, k, result))
    
    
    if __name__ == "__main__":
        tasks = [asyncio.Task(factorial(10)), asyncio.Task(
            fibonacci(10)), asyncio.Task(binomialCoeff(20, 10))]
        loop = asyncio.get_event_loop()
        loop.run_until_complete(asyncio.wait(tasks))
        loop.close()
    

    使用Asyncio和Futures

    示例代码如下所示

    import asyncio
    import sys
    
    
    @asyncio.coroutine
    def first_coroutine(future, N):
        count = 0
        for i in range(1, N + 1):
            count = count + i
        yield from asyncio.sleep(4)
        future.set_result(
            "first coroutine (sum of N integers) result = " + str(count))
    
    
    @asyncio.coroutine
    def second_coroutine(future, N):
        count = 1
        for i in range(2, N + 1):
            count *= i
        yield from asyncio.sleep(3)
        future.set_result("second coroutine (factorial) result = " + str(count))
    
    
    def got_result(future):
        print(future.result())
    
    
    if __name__ == "__main__":
        N1 = 1
        N2 = 1
        loop = asyncio.get_event_loop()
        future1 = asyncio.Future()
        future2 = asyncio.Future()
        tasks = [
            first_coroutine(future1, N1),
            second_coroutine(future2, N2)
        ]
        future1.add_done_callback(got_result)
        future2.add_done_callback(got_result)
        loop.run_until_complete(asyncio.wait(tasks))
        loop.close()
    

    分布式编程

    GPU 编程

    相关参考

  • 相关阅读:
    锁,你知多少呢?
    成长于我
    js write google广告
    项目三边六拍
    IT新人培养计划
    网站变灰色 代码
    职业人生
    ASP.NET 4 新特性之一二
    .net 例子
    A Better sp_who2 using DMVs (sp_who3)
  • 原文地址:https://www.cnblogs.com/hippieZhou/p/10205308.html
Copyright © 2011-2022 走看看