zoukankan      html  css  js  c++  java
  • CP学习笔记(10)

    Python有很多库可以支持并行计算。

    >>> import threading
    >>> def thread_hello():
            other = threading.Thread(target=thread_say_hello, args=())
            other.start()
            thread_say_hello()
    >>> def thread_say_hello():
            print('hello from', threading.current_thread().name)
    >>> thread_hello()
    hello from Thread-1
    hello from MainThread
    
    
    >>> import multiprocessing
    >>> def process_hello():
            other = multiprocessing.Process(target=process_say_hello, args=())
            other.start()
            process_say_hello()
    >>> def process_say_hello():
            print('hello from', multiprocessing.current_process().name)
    >>> process_hello()
    hello from MainProcess
    hello from Process-1
    

    threadingmultiprocessing库有着类似的API,但是前者只是建立单个线程,后者对多进程封装得更完善,对多核CPU的支持更好。更多可阅读Python标准库08 多线程与同步 (threading包), Python标准库10 多进程初步 (multiprocessing包), Python多进程并发(multiprocessing)

    threading模块使用线程,multiprocessing使用进程。其区别不同在于,线程使用同一内存空间,而进程分配有不同的内存空间。因此进程间难以共享对象。但两个线程则有可能同时改写同一内存空间。为防止出现冲突,可以使用GIL保证不会同时执行可能冲突的线程。
    更多对比

    下面是一个线程冲突的实例

    import threading
    from time import sleep
    
    counter = [0]
    
    def increment():
        count = counter[0]
        sleep(0) # try to force a switch to the other thread
        counter[0] = count + 1
    
    other = threading.Thread(target=increment, args=())
    other.start()
    increment()
    print('count is now: ', counter[0])
    

    下面是执行过程:

    Thread 0                    Thread 1
    read counter[0]: 0
                                read counter[0]: 0
    calculate 0 + 1: 1
    write 1 -> counter[0]
                                calculate 0 + 1: 1
                                write 1 -> counter[0]
    

    结果是尽管执行了两次加法,但结果仍然是:1。

    在Python中,最简单的保证数据同步的方法是使用queue模块的Queue类。

    from queue import Queue
    
    queue = Queue()
    
    def synchronized_consume():
        while True:
            print('got an item:', queue.get())  # 得到对象
            queue.task_done()                   # 队列任务结束
    
    def synchronized_produce():
        consumer = threading.Thread(target=synchronized_consume, args=())
        consumer.daemon = True
        consumer.start()
        for i in range(10):
            queue.put(i)           # 加入新对象
        queue.join()               # 确保所有队列任务结束后,退出
    
    synchronized_produce()
    

    如果上面这个办法因为某些原因做不到,那我们可以使用threading模块中的Lock类。

    seen = set()
    seen_lock = threading.Lock()
    
    def already_seen(item):
        seen_lock.acquire()     # 在Lock类的
        result = True           # acquire方法
        if item not in seen:    # 和release方法
            seen.add(item)      # 之间的代码仅能
            result = False      # 被一个线程
        seen_lock.release()     # 同时访问
        return result
    
    def already_seen(item):
        with seen_lock:
            if item not in seen:
                seen.add(item)
                return False
            return True
    

    还有一个办法是threading模块中的Barrier类。

    counters = [0, 0]
    barrier = threading.Barrier(2)
    
    def count(thread_num, steps):
        for i in range(steps):
            other = counters[1 - thread_num]
            barrier.wait() # wait for reads to complete
            counters[thread_num] = other + 1
            barrier.wait() # wait for writes to complete
    
    def threaded_count(steps):
        other = threading.Thread(target=count, args=(1, steps))
        other.start()
        count(0, steps)
        print('counters:', counters)
    
    threaded_count(10)
    

    更多参考Python的多线程编程模块 threading 参考17.1. threading — Thread-based parallelism

    防止共享数据错误读写的终极机制是完全避免并发地接触同一数据。进程的内存空间的独立性完全符合这一要求。为了解决进程之间的交流问题,multiprocessing模块特别提供了Pipe类。Pipe默认为两条通道,如果传入参数False则为一条通道。

    def process_consume(in_pipe):
        while True:
            item = in_pipe.recv()  # 只有接收成功后才会继续执行
            if item is None:
                return
            print('got an item:', item)
    
    def process_produce():
        pipe = multiprocessing.Pipe(False)
        consumer = multiprocessing.Process(target=process_consume, args=(pipe[0],))
        consumer.start()
        for i in range(10):
            pipe[1].send(i)        # 通过通道发送对象
        pipe[1].send(None) # done signal
    
    process_produce()
    

    在执行并发计算时,程序员往往会犯下错误:

    1. 同步不足(Under-synchronization):一些线程没有被同步
    2. 过度同步(Over-synchronization):某些本可以并发执行的线程,被串行化
    3. 死锁(Deadlock):被同步的进程相互等候对方完成某些步骤才进行下一步,导致程序锁死。一个栗子:
    def deadlock(in_pipe, out_pipe):
        item = in_pipe.recv()
        print('got an item:', item)
        out_pipe.send(item + 1)
    
    def create_deadlock():
        pipe = multiprocessing.Pipe()
        other = multiprocessing.Process(target=deadlock, args=(pipe[0], pipe[1]))
        other.start()
        deadlock(pipe[1], pipe[0])
    
    create_deadlock()
    
  • 相关阅读:
    【转】给ExtJS的Grid增加两行tbar
    AWK使用手册
    SED单行脚本快速参考(Unix 流编辑器)
    Linux的用户和用户组管理
    Linux下中文man帮助安装。
    关于TLBB 客户端UI界面修改几个定义了解
    正则表达式30分钟入门教程
    VimDiff技巧
    CentOS 更换网易源
    Mysql数据库搭建Linux
  • 原文地址:https://www.cnblogs.com/rim99/p/5075838.html
Copyright © 2011-2022 走看看