zoukankan      html  css  js  c++  java
  • python进程,线程,协程

    参考别人的博客:https://www.cnblogs.com/whatisfantasy/p/6440585.html

    线程

    线程是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务

    创建线程的两种方法

    # 方法1 直接调用
    import threading
    import time
    
    def sayhi(num):
        print("running on number:%s"%num)
        time.sleep(2)
    
    if __name__ == "__main__":
    
        t1 = threading.Thread(target=sayhi, args=(1,))
        t2 = threading.Thread(target=sayhi, args=(2,))
        t1.start()
        t2.start()
    
        print(t1.getName()) #获取线程名
        print(t2.getName())
        
    # 方法2 继承调用
    import threading
    import time
    
    class MyThread(threading.Thread):
        def __init__(self, num):
            super(MyThread, self).__init__()
            self.num = num
        def run(self):
            print("running on number:%s" %self.num)
            time.sleep(2)
    
    if __name__=="__main__":
        t1 = MyThread(1)
        t2 = MyThread(2)
    
        t1.start()
        t2.start()
    

    join

    join,等待至线程中止。可以等待子进程结束后再继续往下运行,通常用于进程间的同步。

    #例子1
    import threading
    import time
    
    def run(num):
        print("thread%s start" % num)
        time.sleep(1)
        print("thread%s end" % num)
    
    
    
    if __name__ == '__main__':
        thread_list = []
        for i in range(5):
            t = threading.Thread(target=run, args=(i,))
            t.start()
            # t.join()
            thread_list.append(t)
    
        for i in thread_list:
            i.join()
            
    # 输出结果
    thread0 start
    thread1 start
    thread2 start
    thread3 start
    thread4 start
    thread0 endthread1 end
    thread4 endthread3 end
    thread2 end
    
    #例子2
    import threading
    import time
    
    def run(num):
        print("thread%s start" % num)
        time.sleep(1)
        print("thread%s end" % num)
    
    
    
    if __name__ == '__main__':
        thread_list = []
        for i in range(5):
            t = threading.Thread(target=run, args=(i,))
            t.start()
            t.join()
            thread_list.append(t)
    
    #输出结果
    thread0 start
    thread0 end
    thread1 start
    thread1 end
    thread2 start
    thread2 end
    thread3 start
    thread3 end
    thread4 start
    thread4 end
    

    setDaemon

    setDaemon,将一个线程可以被标记成一个 "守护线程"。这个标志的意义是,只有守护线程都终结,整个Python程序才会退出。这里使用setDaemon(True)把所有的子线程都变成了主线程的守护线程,因此当主进程结束后,子线程也会随之结束。所以当主线程结束后,整个程序就退出了。

    注解

    守护线程在程序关闭时会突然关闭。他们的资源(例如已经打开的文档,数据库事务等等)可能没有被正确释放。如果你想你的线程正常停止,设置他们成为非守护模式并且使用合适的信号机制,例如: Event

    import threading
    import time
    
    
    def sayhi(num):
        print("running on number:%s" % num)
        print(threading.current_thread().name)
        time.sleep(2)
    def main():
        for i in range(5):
            t = threading.Thread(target=sayhi, args=(i,))
            t.start()
            t.join()
            print('starting thread', t.getName())
    
    
    if __name__ == "__main__":
    
        m = threading.Thread(target=main, args=[])
        m.setDaemon(True)   #设置为守护进程
        m.start()
        m.join(timeout=2)
        print("---main thread done----")
    
    

    常用函数

    threading.currentThread(): 返回当前的线程变量。
    threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
    threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。
    除了使用方法外,线程模块同样提供了Thread类来处理线程,Thread类提供了以下方法:
    
    run(): 用以表示线程活动的方法。
    start():启动线程活动。
      
    join([time]): 等待至线程中止。这阻塞调用线程直至线程的join() 方法被调用中止-正常退出或者抛出未处理的异常-或者是可选的超时发生。
    isAlive(): 返回线程是否活动的。
    getName(): 返回线程名。
    setName(): 设置线程名。
    setDaemon():设置为守护进程
    

    Python GIL

    (Global Interpreter Lock)

    无论你启多少个线程,你有多少个cpu, Python在执行的时候会淡定的在同一时刻只允许一个线程运行

    为了实现线程间的数据保护和状态同步

    在非python环境中,单核情况下,同时只能有一个任务执行。多核时可以支持多个线程同时执行。但是在python中,无论有多少核,同时只能执行一个线程。究其原因,这就是由于GIL的存在导致的。

    GIL的全称是Global Interpreter Lock(全局解释器锁),来源是python设计之初的考虑,为了数据安全所做的决定。某个线程想要执行,必须先拿到GIL,我们可以把GIL看作是“通行证”,并且在一个python进程中,GIL只有一个。拿不到通行证的线程,就不允许进入CPU执行。GIL只在cpython中才有,因为cpython调用的是c语言的原生线程,所以他不能直接操作cpu,只能利用GIL保证同一时间只能有一个线程拿到数据。而在pypy和jpython中是没有GIL的。

    Python多线程的工作过程:
    python在使用多线程的时候,调用的是c语言的原生线程。

    1. 拿到公共数据
    2. 申请gil
    3. python解释器调用os原生线程
    4. os操作cpu执行运算
    5. 当该线程执行时间到后,无论运算是否已经执行完,gil都被要求释放
    6. 进而由其他进程重复上面的过程
    7. 等其他进程执行完后,又会切换到之前的线程(从他记录的上下文继续执行)
      整个过程是每个线程执行自己的运算,当执行时间到就进行切换(context switch)。
    • python针对不同类型的代码执行效率也是不同的:

      1、CPU密集型代码(各种循环处理、计算等等),在这种情况下,由于计算工作多,ticks计数很快就会达到阈值,然后触发GIL的释放与再竞争(多个线程来回切换当然是需要消耗资源的),所以python下的多线程对CPU密集型代码并不友好。
      2、IO密集型代码(文件处理、网络爬虫等涉及文件读写的操作),多线程能够有效提升效率(单线程下有IO操作会进行IO等待,造成不必要的时间浪费,而开启多线程能在线程A等待时,自动切换到线程B,可以不浪费CPU的资源,从而能提升程序执行效率)。所以python的多线程对IO密集型代码比较友好。

    • 使用建议?

      python下想要充分利用多核CPU,就用多进程。因为每个进程有各自独立的GIL,互不干扰,这样就可以真正意义上的并行执行,在python中,多进程的执行效率优于多线程(仅仅针对多核CPU而言)。

    • GIL在python中的版本差异:

      1、在python2.x里,GIL的释放逻辑是当前线程遇见IO操作或者ticks计数达到100时进行释放。(ticks可以看作是python自身的一个计数器,专门做用于GIL,每次释放后归零,这个计数可以通过sys.setcheckinterval 来调整)。而每次释放GIL锁,线程进行锁竞争、切换线程,会消耗资源。并且由于GIL锁存在,python里一个进程永远只能同时执行一个线程(拿到GIL的线程才能执行),这就是为什么在多核CPU上,python的多线程效率并不高。
      2、在python3.x中,GIL不使用ticks计数,改为使用计时器(执行时间达到阈值后,当前线程释放GIL),这样对CPU密集型程序更加友好,但依然没有解决GIL导致的同一时间只能执行一个线程的问题,所以效率依然不尽如人意。

    线程锁(互斥锁)

    多线程中,所有变量都由所有线程共享,所以,任何一个变量都可以被任何一个线程修改,因此,线程之间共享数据最大的危险在于多个线程同时改一个变量,把内容给改乱了。

    未加锁

    import threading
    num = 0
    def run(n):
        global num
        for i in range(1000000):
            num += n
            num -= n
    
    
    if __name__ == '__main__':
    
    
        t1 = threading.Thread(target=run, args=(1,))
        t2 = threading.Thread(target=run, args=(2,))
        t1.start()
        t2.start()
        t1.join()
        t2.join()
        print("num:", num)
    

    加锁版

    import threading
    
    num = 0
    def run(n):
        global num
        lock.acquire()
        for i in range(1000000):
            num += n
            num -= n
        lock.release()
    
    
    if __name__ == '__main__':
    
        lock = threading.Lock()
        t1 = threading.Thread(target=run, args=(1,))
        t2 = threading.Thread(target=run, args=(2,))
        t1.start()
        t2.start()
        t1.join()
        t2.join()
        print("num:", num)
    

    递归锁

    使用普通的锁里还要锁的时候,会出现卡死的情况,程序无法进行

    import threading
    
    num = 0
    money = 0
    def earn():
        global money
        lock.acquire()
        money += 1
        lock.release()
    
    def run(n):
        global num
        lock.acquire()
        earn()
        num += n
        lock.release()
    
    
    if __name__ == '__main__':
    
        lock = threading.RLock()  #使用递归锁
        t1 = threading.Thread(target=run, args=(1,))
        t2 = threading.Thread(target=run, args=(2,))
        t1.start()
        t2.start()
        t1.join()
        t2.join()
        print("num:", num)
    

    Semaphore(信号量)

    互斥锁 同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据

    import threading
    import time
    
    num = 0
    def run(n):
        global num
        semaphore.acquire() #心好累锁
        print("thread%s" %n)
        time.sleep(n*0.5)
        num += 1
        semaphore.release()
    
    
    if __name__ == '__main__':
    
        lock = threading.Lock()
        semaphore = threading.BoundedSemaphore(3) #最多可以有三个线程进行操作
        t1 = threading.Thread(target=run, args=(1,))
        t2 = threading.Thread(target=run, args=(2,))
        t3 = threading.Thread(target=run, args=(3,))
        t4 = threading.Thread(target=run, args=(4,))
        t5 = threading.Thread(target=run, args=(5,))
        t6 = threading.Thread(target=run, args=(6,))
        t1.start()
        t2.start()
        t3.start()
        t4.start()
        t5.start()
        t6.start()
        
        print("num:", num)
    

    定时器

    经过一定时间后,运行该线程

    import threading
    def hello():
        print("hello, world")
    
    t = threading.Timer(5.0, hello)
    t.start() 
    

    Event

    线程同步对象

    它作为一个内部的标志,标志被设置,线程进行操作,标志被清空,除非再次被设置,线程才会阻塞。

    import threading
    import time
    
    def light():
        count = 0
        if not event.isSet():
            event.set()
        while True:
            time.sleep(1)
            if count <= 10:
                print("绿灯啦啦啦啦啦")
            elif count < 20:
                print("红灯啦,不能走啦")
                event.clear()
            elif count >= 20:
                count = 0
                if event.isSet():
                    event.set()
            count += 1
    
    def car(num):
        while 1:
            time.sleep(2)
            if event.isSet():
                print("car%s moving!!!" % num)
            else:
                print("car%s stop...." % num)
    
    if __name__ == '__main__':
        event = threading.Event()
        light = threading.Thread(target=light)
        light.start()
        for i in range(2):
            c = threading.Thread(target=car, args=(i,))
            c.start()
    

    线程优先级队列( Queue)

    Python的Queue模块中提供了同步的、线程安全的队列类,包括FIFO(先入先出)队列Queue,LIFO(后入先出)队列LifoQueue,和优先级队列PriorityQueue。这些队列都实现了锁原语,能够在多线程中直接使用。可以使用队列来实现线程间的同步。

    #例子1
    import threading
    import queue
    import time
    
    def producer():
        count = 1
        while True:
            time.sleep(1)
            print("生产者生产面包%s" % count)
            print("面包数量%s" % q.qsize())
            q.put("面包%s" % count)
            count += 1
    def consumer():
        while True:
            time.sleep(2)
            print("消费者吃%s" % q.get())
    
    if __name__ == '__main__':
        q = queue.Queue(2)  #设置队列的最大数量是2
        pro = threading.Thread(target=producer,)
        con = threading.Thread(target=consumer,)
        pro.st`art()
        con.start()
    
    #例子2
    import threading
    import queue
    import time
    
    def producer():
        count = 1
        for i in range(10):
            time.sleep(1)
            print("生产者生产面包%s" % count)
            print("面包数量%s" % q.qsize())
            q.put("面包%s" % count)
            count += 1
            q.join()   #队列会阻塞除非所有任务完成
    
    def consumer():
        while True:
            time.sleep(1)
            print("消费者吃%s" % q.get())
            q.task_done()  #告诉队列任务已经完成
    
    if __name__ == '__main__':
        q = queue.Queue()
        pro = threading.Thread(target=producer,)
        con = threading.Thread(target=consumer,)
        pro.start()
        con.start()
    

    Queue模块中的常用方法:

    • Queue.qsize() 返回队列的大小
    • Queue.empty() 如果队列为空,返回True,反之False
    • Queue.full() 如果队列满了,返回True,反之False
    • Queue.full 与 maxsize 大小对应
    • Queue.get([block[, timeout]])获取队列,timeout等待时间
    • Queue.get_nowait() 相当Queue.get(False)
    • Queue.put(item) 写入队列,timeout等待时间
    • Queue.put_nowait(item) 相当Queue.put(item, False)
    • Queue.task_done() 在完成一项工作之后,Queue.task_done()函数向任务已经完成的队列发送一个信号
    • Queue.join() 实际上意味着等到队列为空,再执行别的操作

    生产者消费者模型

    在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。

    为什么要使用生产者和消费者模式

    在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。

    什么是生产者消费者模式

    生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

    ThreadLocal

    线程本地数据是值特定于线程的数据。要管理线程本地数据,只需创建一个本地(或子类)实例,并在其上存储属性

    原因:

    1. 对于全局变量需要加锁,对于局部变量,线程可以直接修改
    2. 对于局部变量,传递参数比较麻烦
    import threading
    
    local_school = threading.local()
    
    def printName():
        std = local_school.student
        print(std, "	 hello ", threading.current_thread().name)
    
    
    def student(name):
        local_school.student = name
        printName()
        print(threading.current_thread().name)
    
    
    if __name__ == '__main__':
        t1 = threading.Thread(target=student, args=("Tom",))
        t2 = threading.Thread(target=student, args=("shagua",))
        t1.start()
        t2.start()
    #输出结果
    Tom 	 hello  Thread-1
    Thread-1
    shagua 	 hello  Thread-2
    Thread-2
    
    

    进程

    程序并不能单独运行,只有将程序装载到内存中,系统为它分配资源才能运行,而这种执行的程序就称之为进程。程序和进程的区别就在于:程序是指令的集合,它是进程运行的静态描述文本;进程是程序的一次执行活动,属于动态概念。

    #例子1 python的父进程是pycharm
    import multiprocessing
    import os
    
    
    def run():
        print("当前进程的父进程号%s" % os.getppid())
        print("当前进程的进程号%s" % os.getpid())
    
    if __name__ == '__main__':
        r = multiprocessing.Process(target=run,)
        r.start()
        r.join()
        print("父进程号%s" % os.getppid())
        print("进程号%s" % os.getpid())
    #输出结果
    当前进程的父进程号17784
    当前进程的进程号1056
    父进程号8148
    进程号17784
    
    #例子2 子进程复制一份新的dict,内存不共享
    from multiprocessing import Process,Manager
    
    def test(d):
        print("before test.d:%s" % d)
        d[10] = True
        print("after test.d:%s" % d)
    
    if __name__ == '__main__':
        d = {}
        t = Process(target=test, args=(d,))
        t.start()
        t.join()
        print("主线程的d:%s" % d)
    #输出结果
    before test.d:{}
    after test.d:{10: True}
    主线程的d:{}
    

    进程间通信

    Queue

    #例子1
    #进程的队列需要传参数
    from multiprocessing import Process, Queue
     
    def f(q):
        q.put([42, None, 'hello'])
     
    if __name__ == '__main__':
        q = Queue()
        p = Process(target=f, args=(q,))
        p.start()
        print(q.get())    # prints "[42, None, 'hello']"
        p.join()
    

    Pipes

    管道函数会返回两个连接对象,默认是全双工

    import multiprocessing
    import time
    
    def f(conn):
        conn.send("hello")
        conn.close()
    
    if __name__ == '__main__':
        parent_pipe, child_pipe = multiprocessing.Pipe()
        p = multiprocessing.Process(target=f, args=(child_pipe,))
        p.start()
        print(parent_pipe.recv())
        p.join()
    

    Manager()

    返回一个manager的控制对象,允许其他进程使用代理操作它们。

    支持的数据类型list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array

    #例子1 有manager允许其他进程修改数据
    from multiprocessing import Process,Manager
    
    def test(d):
        print("before test.d:%s" % d)
        d[10] = True
        print("after test.d:%s" % d)
    
    if __name__ == '__main__':
        with Manager() as manager:
            d = manager.dict()
            t = Process(target=test, args=(d,))
            t.start()
            t.join()
            print("主线程的d:%s" % d)
            
    #输出结果
    before test.d:{}
    after test.d:{10: True}
    主线程的d:{10: True}
        
    
    

    进程同步

    为什么需要进程同步?

    每一个进程需要数据不共享,但是输出结果时候会共享屏幕,输出的结果也会出现混乱

    # 不使用进程锁
    from  multiprocessing import Process, Lock
    
    def f(i):
        print("hello world %s" % i)
    
    if __name__ == '__main__':
        for i in range(10):
            p = Process(target=f, args=(i,))
            p.start()
    
    # 使用进程锁
    from  multiprocessing import Process, Lock
    
    def f(l, i):
        l.acquire()
        try:
            print("hello world %s" % i)
        finally:
            l.release()
    
    if __name__ == '__main__':
        lock = Lock()
        for i in range(10):
            p = Process(target=f, args=(lock, i))
            p.start()
    

    进程池

    一个进程池对象,它控制可以向其提交作业的工作进程池。它支持带timeouts和callback的异步结果,并具有并行映射实现。

    进程池调度方法

    1. apply_async:异步,可使用回调函数
    2. apply:同步
    3. map:切割迭代器为不同的块,提交任务给进程池作为不同的任务进行调度
    4. map_async:可使用回调函数
    5. terminate() 立刻关闭进程池
    6. join() 主进程等待所有子进程执行完毕。必须在close或terminate()之后。
    7. close() 等待所有进程结束后,才关闭进程池。(必须在join之前,不然会报错)
    from multiprocessing import Process,Pool
    import time
    import os
    
    def run(n):
        time.sleep(1)
        print("hello world %s" % n)
    
    
    
    def Bar(arg):
        # time.sleep(1)
        print("-->>exec done", arg)
    
    
    if __name__ == '__main__':
        pool = Pool(3)
        for i in range(5):
            pool.apply_async(func=run, args=(i,), callback=Bar(run.__name__)) #异步(并发)
            # pool.apply(func=run, args=(i,))         #同步
        pool.close()
        pool.join()
        print("end")
    

    常用函数

    1. get([timeout])

    获取异步进程的结果,Pool.apply_async() and Pool.map_async().

    Return the result when it arrives. If timeout is not None and the result does not arrive within timeout seconds then multiprocessing.TimeoutError is raised. If the remote call raised an exception then that exception will be reraised by get().

    #example
    from multiprocessing import Pool, TimeoutError
    import time
    import os
    
    def f(x):
        return x*x
    
    if __name__ == '__main__':
        # start 4 worker processes
        with Pool(processes=4) as pool:
    
            # print "[0, 1, 4,..., 81]"
            print(pool.map(f, range(10)))
    
            # print same numbers in arbitrary order
            for i in pool.imap_unordered(f, range(10)):
                print(i)
    
            # evaluate "f(20)" asynchronously
            res = pool.apply_async(f, (20,))      # runs in *only* one process
            print(res.get(timeout=1))             # prints "400"
    
            # evaluate "os.getpid()" asynchronously
            res = pool.apply_async(os.getpid, ()) # runs in *only* one process
            print(res.get(timeout=1))             # prints the PID of that process
    
            # launching multiple evaluations asynchronously *may* use more processes
            multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
            print([res.get(timeout=1) for res in multiple_results])
    
            # make a single worker sleep for 10 secs
            res = pool.apply_async(time.sleep, (10,))
            try:
                print(res.get(timeout=1))
            except TimeoutError:
                print("We lacked patience and got a multiprocessing.TimeoutError")
    
            print("For the moment, the pool remains available for more work")
    
        # exiting the 'with'-block has stopped the pool
        print("Now the pool is closed and no longer available")
    

    进程和线程的区别

    1. 线程间共享进程创建的内存地址,进程自己有自己的独立的内存空间
    2. 线程能够直接访问它的进程的数据段,子进程从父进程复制一份数据段
    3. 线程能够直接和其他线程交流,进程必须通过进程间通信来与其他同级进程通信
    4. 新的线程容易创建,新的进程需要父进程的创建
    5. 线程能够控制所有同进程的线程,但是进程只能控制它的子进程
    6. 主线程的更改(取消、优先级更改等)可能会影响进程中其他线程的行为;对父进程的更改不影响子进程。

    协程

    线程和进程的操作是由程序触发系统接口,最后的执行者是系统,它本质上是操作系统提供的功能。而协程的操作则是程序员指定的,在python中通过yield,人为的实现并发处理。

    协程的优点:

    最大的优势就是协程极高的执行效率。因为子程序切换不是线程切换,而是由程序自身控制,因此,没有线程切换的开销,和多线程比,线程数量越多,协程的性能优势就越明显。

    第二大优势就是不需要多线程的锁机制,因为只有一个线程,也不存在同时写变量冲突,在协程中控制共享资源不加锁,只需要判断状态就好了,所以执行效率比多线程高很多。

    协程的适用场景:

    当程序中存在大量不需要CPU的操作时(IO)。

    常用第三方模块gevent和greenlet。(本质上,gevent是对greenlet的高级封装,因此一般用它就行,这是一个相当高效的模块。)

    # 例子1,greenlet实现协程来回调度
    from greenlet import greenlet
    
    def task1():
        print("1")
        t2.switch()
        print("2")
        t2.switch()
    
    def task2():
        print("3")
        t1.switch()
        print("4")
    
    if __name__ == '__main__':
        t1 = greenlet(task1)
        t2 = greenlet(task2)
        t1.switch()
    
    # 例子2,gevent高级封装
    from urllib import request
    from gevent import monkey
    import gevent
    monkey.patch_all()   #gevent无法识别线程的IO进行调用,必须调用它才可以
    
    def get(url):
        print('GET: %s' % url)
        resp = request.urlopen(url)
        data = resp.read()
        print('%d bytes received from %s.' % (len(data), url))
        
    #通过joinall将任务f和它的参数进行统一调度,实现单线程中的协程。
    gevent.joinall([
            gevent.spawn(get, 'https://www.python.org/'),
            gevent.spawn(get, 'https://www.baidu.com/'),
            gevent.spawn(get, 'https://github.com/'),
    ])
    
  • 相关阅读:
    openSUSE 13.1 Milestone 4 发布
    Neo4j 2.0 M4 发布
    iBoxDB for .NET v1.5发布, 移动NoSQL数据库
    GNU libc (Glibc) 2.18 发布
    Android 开源项目维护者宣布退出
    Jeasyframe 开源框架 稳定版 V1.5 发布
    Spring Mobile 1.1.0.RC1 和 1.0.2 发布
    Deis logo 开源PaaS系统 Deis
    EasyCriteria 3.0 发布
    TypeScript 0.9.1 发布,新增 typeof 关键字
  • 原文地址:https://www.cnblogs.com/akiz/p/11144321.html
Copyright © 2011-2022 走看看