zoukankan      html  css  js  c++  java
  • 线程、进程、协程

    并行:并行是指两者同时执行,比如赛跑,两个人都在不停的往前跑;(资源够用,比如三个线程,四核的CPU )

    并发:并发是指资源有限的情况下,两者交替轮流使用资源,比如一段路(单核CPU资源)同时只能过一个人,A走一段后,让给B,B用完继续给A ,交替使用,目的是提高效率。

    同步:所谓同步就是一个任务的完成需要依赖另外一个任务时,只有等待被依赖的任务完成后,依赖的任务才能算完成,这是一种可靠的任务序列。

    异步:所谓异步是不需要等待被依赖的任务完成,只是通知被依赖的任务要完成什么工作,依赖的任务也立即执行,只要自己完成了整个任务就算完成了。

    阻塞非阻塞:阻塞和非阻塞这两个概念与程序(线程)等待消息通知(无所谓同步或者异步)时的状态有关。也就是说阻塞与非阻塞主要是程序(线程)等待消息通知时的状态角度来说的。

    python的GIL:无论你启多少个线程,你有多少个cpu, Python在执行的时候会淡定的在同一时刻只允许一个线程运行。这两个

    Python根据处理事情的类型,是IO密集型还是计算密集型,选择不同的方式,进程,线程、协程,相互配合来用。但是对于计算密集型,。。。

     

    一.   线程:

        线程也叫轻量级进程,它是一个基本的CPU执行单元,也是程序执行过程中的最小单元,由线程ID、程序计数器、寄存器集合和堆栈共同组成。

        线程的引入减小了程序并发执行时的开销,提高了操作系统的并发 性能。线程没有自己的系统资源。

                  <python的线程与threading模块>                

    直接调用:

    import threading
    import time
     
    def sayhi(num): #定义每个线程要运行的函数
     
        print("running on number:%s" %num)
     
        time.sleep(3)
     
    if __name__ == '__main__':
     
        t1 = threading.Thread(target=sayhi,args=(1,)) #生成一个线程实例
        t2 = threading.Thread(target=sayhi,args=(2,)) #生成另一个线程实例
     
        t1.start() #启动线程
        t2.start() #启动另一个线程
     
        print(t1.getName()) #获取线程名
        print(t2.getName())

    继承式调用:

    import threading
    import time
    
    
    class MyThread(threading.Thread):
        def __init__(self,num):
            threading.Thread.__init__(self)
            self.num = num
    
        def run(self):#定义每个线程要运行的函数
    
            print("running on number:%s" %self.num)
    
            time.sleep(3)
    
    if __name__ == '__main__':
    
        t1 = MyThread(1)
        t2 = MyThread(2)
        t1.start()
        t2.start()
        
        print("ending......")

    join():在子线程完成运行之前,这个子线程的父线程将一直被阻塞。

    setDaemon(True):

             将线程声明为守护线程,必须在start() 方法调用之前设置, 如果不设置为守护线程程序会被无限挂起。这个方法基本和join是相反的。当我们 在程序运行中,执行一个主线程,如果主线程又创建一个子线程,主线程和子线程 就分兵两路,分别运行,那么当主线程完成 想退出时,会检验子线程是否完成。如 果子线程未完成,则主线程会等待子线程完成后再退出。但是有时候我们需要的是 只要主线程完成了,不管子线程是否完成,都要和主线程一起退出,这时就可以 用setDaemon方法啦

    # run():  线程被cpu调度后自动执行线程对象的run方法
    # start():启动线程活动。
    # isAlive(): 返回线程是否活动的。
    # getName(): 返回线程名。
    # setName(): 设置线程名。
    
    threading模块提供的一些方法:
    # threading.currentThread(): 返回当前的线程变量。
    # threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
    # threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。

    关于锁:

      因为线程能直接操作到进程里的所有变量,如果存在多个线程同时操作同一个变量,存在竞争,最后变量是什么样的都不知道,所以在线程需要操作变量前,进行取锁竞争,

      拿到锁的线程才能对变量操作,操作完后释放锁。意味着同时时刻只有一个线程运行那部分代码。

    1. threading.Lock()  同步锁

    import time
    import threading
    R=threading.Lock()
    def addNum():
        global num #在每个线程中都获取这个全局变量
        #num-=1
        R.acquire()
        temp=num
        #print('--get num:',num )
        time.sleep(0.01)
        num =temp-1 #对此公共变量进行-1操作
        R.release()
    
    num = 100  #设定一个共享变量
    thread_list = []
    for i in range(100):
        t = threading.Thread(target=addNum)
        t.start()
        thread_list.append(t)
    
    for t in thread_list: #等待所有线程执行完毕
        t.join()
    
    print('final num:', num )

      为了支持在同一线程中多次请求同一资源,python提供了“可重入锁”:threading.RLock。RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次acquire。直到一个线程所有的acquire都被release,其他的线程才能获得资源。

    2. threading.RLock()  递归锁

    import  threading
    import time
    
    
    class MyThread(threading.Thread):
    
        def actionA(self):
    
            r_lcok.acquire() #count=1
            print(self.name,"gotA",time.ctime())
            time.sleep(2)
            r_lcok.acquire() #count=2
    
            print(self.name, "gotB", time.ctime())
            time.sleep(1)
    
            r_lcok.release() #count=1
            r_lcok.release() #count=0
    
    
        def actionB(self):
    
            r_lcok.acquire()
            print(self.name, "gotB", time.ctime())
            time.sleep(2)
    
            r_lcok.acquire()
            print(self.name, "gotA", time.ctime())
            time.sleep(1)
    
            r_lcok.release()
            r_lcok.release()
    
    
        def run(self):
            self.actionA()
            self.actionB()
    
    
    if __name__ == '__main__':
    
        # A=threading.Lock()
        # B=threading.Lock()
    
        r_lcok=threading.RLock()
        L=[]
    
        for i in range(5):
            t=MyThread()
            t.start()
            L.append(t)
    
    
        for i in L:
            i.join()
    
        print("ending....")

    3. 关于 event事件

    Python线程的 event 事件 可以看做是 事件驱动模型。当一个线程通过事件驱动模型发送了一个信号,另一个线程通过事件驱动模型获取了该信号,从而做出反应。

    只是这个模型比较简单,发送的信号默认是 false ,要么是 true。

    event = threading.Event()
    
    event.is_set()  event.isSet()   # 获取标志位 是 true  还是 false
    event.set()   # 设置标志位  为  true
    event.clear()   #  初始化标志位 为 false
    event.wait()    #  阻塞等待标志位  为  true  才会继续运行

    栗子:

    from threading import Thread,Event
    import time
    
    event=Event()
    
    def light():
        print('红灯正亮着')
        time.sleep(3)
        event.set() #绿灯亮
    
    def car(name):
        print('车%s正在等绿灯' %name)
        event.wait() #等灯绿 此时event为False,直到event.set()将其值设置为True,才会继续运行.
        print('车%s通行' %name)
    
    if __name__ == '__main__':
        # 红绿灯
        t1=Thread(target=light)
        t1.start()
        #
        for i in range(10):
            t=Thread(target=car,args=(i,))
            t.start()

     

    4. 信号量 —— multiprocess.Semaphore

      信号量是比同步锁还能多设置的方式。同步锁只有一把,而信号量可以设置多把锁。信号量同步基于内部计数器,每调用一次acquire(),计数器减1;每调用一次release(),计数器加1.当计数器为0时,acquire()调用被阻塞。这是迪科斯彻(Dijkstra)信号量概念P()和V()的Python实现。信号量同步机制适用于访问像服务器这样的有限资源。

      信号量与进程池的概念很像,但是要区分开,信号量涉及到加锁的概念

      进程信号量

    from multiprocessing import Process,Semaphore 
    import time,random
    
    def go_ktv(sem,user):
        sem.acquire()
        print('%s 占到一间ktv小屋' %user)
        time.sleep(random.randint(0,3)) #模拟每个人在ktv中待的时间不同
        sem.release()
    
    if __name__ == '__main__':
        sem=Semaphore(4)
        p_l=[]
        for i in range(13):
            p=Process(target=go_ktv,args=(sem,'user%s' %i,))
            p.start()
            p_l.append(p)
    
        for i in p_l:
            i.join()
        print('============》')

     

    # coding: utf-8
    import threading
    import time
    import random
    
    semaphore = threading.Semaphore(0)
    
    def consumer():
        print("Consumer is waiting.")
        semaphore.acquire()
        print("Consumer notify: Consumed item number %s" %item)
    
    def producer():
        global item
        time.sleep(10)
        item = random.randint(0, 100)
        print("Producer notify: Produced item number %s" %item)
        semaphore.release()
    
    if __name__ == "__main__":
        for i in range(0, 5):
            t1 = threading.Thread(target=producer)
            t2 = threading.Thread(target=consumer)
            t1.start()
            t2.start()
            t1.join()
            t2.join()
        print("Program terminated")

     

    5. 队列  queue-----线程利器, 利用队列,组合 生产者消费模型

    创建一个“队列”对象
    import queue  # python 3 是 queue  Python2 是 Queue
    q = queue.Queue(maxsize = 10)
    Queue.Queue类即是一个队列的同步实现。队列长度可为无限或者有限。可通过Queue的构造函数的可选参数maxsize来设定队列长度。如果maxsize小于1就表示队列长度无限。
    
    将一个值放入队列中
    q.put(10)
    调用队列对象的put()方法在队尾插入一个项目。put()有两个参数,第一个item为必需的,为插入项目的值;第二个block为可选参数,默认为
    1。如果队列当前为空且block为1,put()方法就使调用线程暂停,直到空出一个数据单元。如果block为0,put方法将引发Full异常。
    
    将一个值从队列中取出
    q.get()
    调用队列对象的get()方法从队头删除并返回一个项目。可选参数为block,默认为True。如果队列为空且block为True,
    get()就使调用线程暂停,直至有项目可用。如果队列为空且block为False,队列将引发Empty异常。
    
    Python Queue模块有三种队列及构造函数:
    1、Python Queue模块的FIFO队列先进先出。   class queue.Queue(maxsize)
    2、LIFO类似于堆,即先进后出。               class queue.LifoQueue(maxsize)
    3、还有一种是优先级队列级别越低越先出来。        class queue.PriorityQueue(maxsize)
    
    此包中的常用方法(q = Queue.Queue()):
    q.qsize() 返回队列的大小
    q.empty() 如果队列为空,返回True,反之False
    q.full() 如果队列满了,返回True,反之False
    q.full 与 maxsize 大小对应
    q.get([block[, timeout]]) 获取队列,timeout等待时间
    q.get_nowait() 相当q.get(False)
    非阻塞 q.put(item) 写入队列,timeout等待时间
    q.put_nowait(item) 相当q.put(item, False)
    q.task_done() 在完成一项工作之后,q.task_done() 函数向任务已经完成的队列发送一个信号
    q.join() 实际上意味着等到队列为空,再执行别的操作
    import threading
    import time
    import queue
    
    class MyThread(threading.Thread):
        def __init__(self, num):
            threading.Thread.__init__(self)
            self.num = num
    
        def run(self):  # 定义每个线程要运行的函数
    
            num = qq.get() # 队列里没有就会阻塞等待
            numb = num -1
            time.sleep(0.1)
            num = numb
            print("running on number:%s" % self.name,num)
            qq.put(num)
    
    
    if __name__ == '__main__':
        qq= queue.Queue()
        qq.put(100)
    
        tl = []
        for i in range(100):
            th = MyThread(i)
            tl.append(th)
    
        for th in tl:
            th.start()
    
        for th in tl:
            th.join()
    
    
        print("ending......", qq.get())
    import time,random
    import queue,threading
    
    q = queue.Queue()
    
    def Producer(name):
      count = 0
      while count <10:
        print("making........")
        time.sleep(5)
        q.put(count)
        print('Producer %s has produced %s baozi..' %(name, count))
        count +=1
        #q.task_done()
        q.join()
        print("ok......")
    
    def Consumer(name):
      count = 0
      while count <10:
            time.sleep(random.randrange(4))
        # if not q.empty():
        #     print("waiting.....")
            #q.join()
            data = q.get()
            print("eating....")
            time.sleep(4)
    
            q.task_done()
            #print(data)
            print('33[32;1mConsumer %s has eat %s baozi...33[0m' %(name, data))
        # else:
        #     print("-----no baozi anymore----")
            count +=1
    
    p1 = threading.Thread(target=Producer, args=('A君',))
    c1 = threading.Thread(target=Consumer, args=('B君',))
    c2 = threading.Thread(target=Consumer, args=('C君',))
    c3 = threading.Thread(target=Consumer, args=('D君',))
    
    p1.start()
    c1.start()
    c2.start()
    c3.start()

    二.   进程

    Python的多进程,multiprocessing包是Python中的多进程管理包。与threading.Thread类似,它可以利用multiprocessing.Process对象来创建一个进程。该进程可以运行在Python程序内部编写的函数。该Process对象与Thread对象的用法相同,也有start(), run(), join()的方法。此外multiprocessing包中也有Lock/Event/Semaphore/Condition类 (这些对象可以像多线程那样,通过参数传递给各个进程),用以同步进程,其用法与threading包中的同名类一致。所以,multiprocessing的很大一部份与threading使用同一套API,只不过换到了多进程的情境。

    多进程、多线程里的 run 方法  和  start 方法不一样的。

    1.  调用方式

    from multiprocessing import Process
    import time
    def f(name):
        time.sleep(1)
        print('hello', name,time.ctime())
    
    if __name__ == '__main__':
        p_list=[]
        for i in range(3):
            p = Process(target=f, args=('alvin',))
            p_list.append(p)
            p.start()
        for i in p_list:
            p.join()
        print('end')

    类的方式

    from multiprocessing import Process
    import time
    
    class MyProcess(Process):
        def __init__(self):
            super(MyProcess, self).__init__()
            #self.name = name
    
        def run(self):
            time.sleep(1)
            print ('hello', self.name,time.ctime())
    
    if __name__ == '__main__':
        p_list=[]
        for i in range(3):
            p = MyProcess()
            p.start()
            p_list.append(p)
        for p in p_list:
            p.join()
    
        print('end')

     Process 模块介绍

    Process([group,  [target,  [name,   [args,   [kwargs ]]]])
    
    强调:
    1. 需要使用关键字的方式来指定参数
    2. args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号
    
    参数介绍:
    group参数未使用,值始终为None
    target表示调用对象,即子进程要执行的任务
    args表示调用对象的位置参数元组,args=(1,2,'egon',)
    kwargs表示调用对象的字典,kwargs={'name':'egon','age':18}
    name为子进程的名称

    方法介绍
    p.start():启动进程,并调用该子进程中的p.run() 。
    p.run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法  
    p.terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。
    如果p还保存了一个锁那么也将不会被释放,进而导致死锁。 p.is_alive():如果p仍然运行,返回True。 p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。
                timeout是可选的超时时间,需要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程
    属性介绍
    p.daemon:默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止时,p也随之终止,
         并且设定为True后,p不能创建自己的新进程,必须在p.start()之前设置。 p.name:进程的名称。 p.pid:进程的pid 。 p.exitcode:进程在运行时为None、如果为–N,表示被信号N结束(了解即可) 。 p.authkey:进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。
    这个键的用途是为涉及网络连接的底层进程间通信提供安全性,这类连接只有在具有相同的身份验证键时才能成功(了解即可)。

    2. 进程间数据传输

    数据传输而不是共享,两种方式,队列和管道。

    队列   from multiprocessing import Queue   

    from multiprocessing import Queue   

    q=Queue([maxsize]) 

    创建共享的进程队列。
    参数 :maxsize是队列中允许的最大项数。如果省略此参数,则无大小限制。
    底层队列使用管道和锁定实现.
    
    Queue的实例q具有以下方法:
    
    q.get( [ block [ ,timeout ] ] ) 
    返回q中的一个项目。如果q为空,此方法将阻塞,直到队列中有项目可用为止。block用于控制阻塞行为,默认为True. 如果设置为False,将引发Queue.Empty异常(定义在Queue模块中)。timeout是可选超时时间,用在阻塞模式中。如果在制定的时间间隔内没有项目变为可用,将引发Queue.Empty异常。
    
    q.get_nowait( ) 
    同q.get(False)方法。
    
    q.put(item [, block [,timeout ] ] ) 
    将item放入队列。如果队列已满,此方法将阻塞至有空间可用为止。block控制阻塞行为,默认为True。如果设置为False,将引发Queue.Empty异常(定义在Queue库模块中)。timeout指定在阻塞模式中等待可用空间的时间长短。超时后将引发Queue.Full异常。
    
    q.qsize() 
    返回队列中目前项目的正确数量。此函数的结果并不可靠,因为在返回结果和在稍后程序中使用结果之间,队列中可能添加或删除了项目。在某些系统上,此方法可能引发NotImplementedError异常。
    
    
    q.empty() 
    如果调用此方法时 q为空,返回True。如果其他进程或线程正在往队列中添加项目,结果是不可靠的。也就是说,在返回和使用结果之间,队列中可能已经加入新的项目。
    
    q.full() 
    如果q已满,返回为True. 由于线程的存在,结果也可能是不可靠的(参考q.empty()方法)。。
    
    q.close() 
    关闭队列,防止队列中加入更多数据。调用此方法时,后台线程将继续写入那些已入队列但尚未写入的数据,但将在此方法完成时马上关闭。如果q被垃圾收集,将自动调用此方法。关闭队列不会在队列使用者中生成任何类型的数据结束信号或异常。例如,如果某个使用者正被阻塞在get()操作上,关闭生产者中的队列不会导致get()方法返回错误。
    
    q.cancel_join_thread() 
    不会再进程退出时自动连接后台线程。这可以防止join_thread()方法阻塞。
    
    q.join_thread() 
    连接队列的后台线程。此方法用于在调用q.close()方法后,等待所有队列项被消耗。默认情况下,此方法由不是q的原始创建者的所有进程调用。调用q.cancel_join_thread()方法可以禁止这种行为。
    
    

    from multiprocessing import Queue

    '''
    multiprocessing模块支持进程间通信的两种主要形式:管道和队列
    都是基于消息传递实现的,但是队列接口
    '''
    
    from multiprocessing import Queue
    q=Queue(3)
    
    #put ,get ,put_nowait,get_nowait,full,empty
    q.put(3)
    q.put(3)
    q.put(3)
    # q.put(3)   # 如果队列已经满了,程序就会停在这里,等待数据被别人取走,再将数据放入队列。
               # 如果队列中的数据一直不被取走,程序就会永远停在这里。
    try:
        q.put_nowait(3) # 可以使用put_nowait,如果队列满了不会阻塞,但是会因为队列满了而报错。
    except: # 因此我们可以用一个try语句来处理这个错误。这样程序不会一直阻塞下去,但是会丢掉这个消息。
        print('队列已经满了')
    
    # 因此,我们再放入数据之前,可以先看一下队列的状态,如果已经满了,就不继续put了。
    print(q.full()) #满了
    
    print(q.get())
    print(q.get())
    print(q.get())
    # print(q.get()) # 同put方法一样,如果队列已经空了,那么继续取就会出现阻塞。
    try:
        q.get_nowait(3) # 可以使用get_nowait,如果队列满了不会阻塞,但是会因为没取到值而报错。
    except: # 因此我们可以用一个try语句来处理这个错误。这样程序不会一直阻塞下去。
        print('队列已经空了')
    
    print(q.empty()) #空了
    
    单看队列用法
    
    
    import time
    from multiprocessing import Process, Queue
    
    def f(q):
        q.put([time.asctime(), 'from Eva', 'hello'])  #调用主函数中p进程传递过来的进程参数 put函数为向队列中添加一条数据。
    
    if __name__ == '__main__':
        q = Queue() #创建一个Queue对象
        p = Process(target=f, args=(q,)) #创建一个进程
        p.start()
        print(q.get())
        p.join()
    
    子进程发送数据给父进程
    from multiprocessing import Process, Pipe
    
    def f(conn):
        conn.send([12, {"name":"yuan"}, 'hello'])
        response=conn.recv()
        print("response",response)
        conn.close()
        print("q_ID2:",id(child_conn))
    
    if __name__ == '__main__':
    
        parent_conn, child_conn = Pipe()
        print("q_ID1:",id(child_conn))
        p = Process(target=f, args=(child_conn,))
        p.start()
        print(parent_conn.recv())   # prints "[42, None, 'hello']"
        parent_conn.send("儿子你好!")
        p.join()
    管道
  • 相关阅读:
    004 RequestMappingHandlerMapping
    003 HandlerMapping
    002 环境配置
    001 springmvc概述
    011 使用AOP操作注解
    010 连接点信息
    009 通知类型
    一台服务器的IIS绑定多个域名
    程序包需要 NuGet 客户端版本“2.12”或更高版本,但当前的 NuGet 版本为“2.8.50313.46”
    通过ping 主机名,或者主机名对应的IP地址
  • 原文地址:https://www.cnblogs.com/chenpython123/p/11028542.html
Copyright © 2011-2022 走看看