zoukankan      html  css  js  c++  java
  • 线程、进程以及协程,上下文管理器

    线程和进程

    线程:线程是操作系统能够进行运算调度的最小单位。进程被包含在进程中,是进程中实际处理单位。一条线程就是一堆指令集合。

    一条线程是指进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。

         优点:共享内存,I/O操作时候,创造并发操作

         缺点:抢占资源(相当于建人)

    进程:进程(Process)是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。在早期面向进程设计的计算机结构中,进程是程序的基本执行实体;在当代面向线程设计的计算机结构中,进程是线程的容器。程序是指令、数据及其组织形式的描述,进程是程序的实体。

       优点:同事利用多个CPU,能够同时进行多个操作

       缺点:耗费资源(重新开辟内存空间相当于建房子)

    进程不是越多越好,CPU个数 == 进程个数

    线程也不是越多越好,具体案例具体分析,请求上下文切换耗时

    进程和线程的目的:提高执行效率

    计算机最小的任务执行单元:线程

    I/O操作不占用CPU时间:

           1、I/O密集型(不用CPU)---》多线程

           2、计算机密集型(用CPU)-----》多进程

           3、存在大量且不需要CPU操作时------》协程  

    GIL限制:在同一时刻,只能有一个线程进入CPython解释器。

     python的进程上有个GIL 全局解释性锁,这个会造成,一个进程的多个线程,不能同时使用多个cpu,而是cpu每次只能选一个线程执行,因此,多线程在cpu执行的是无效的。但是在I/O操作的时候是可以同步的,比如time.sleep就是io 操作,多线程,可以同时等待

    进程和程序关系
            进程:程序实例  程序子集   有所谓生命周期,可以kill叼 比如你安装的word   是一个程序  ,你打开一个文档是一个进程,可以关掉。
    
           进程要想完成并发执行的功能就要进程切换
     进程切换 ,上下文切换,进程运行,说明在cpu的寄存器里面有数据了。假如5条数据现在有两条,就切换了,现在要保存现场,回来时候  要恢复现场。如果机器上有几千个进程,会切换 上万个切换需要时间,进程切换时监控程序来完成的,也就是内核,消耗时间
    
    正常程序执行空间是用户空间,
    
    占用在内核,说明大量时间消耗到进程切换。不好。
    进程和程序关系

    threading模块

    threading 模块建立在 _thread 模块之上。thread 模块以低级、原始的方式来处理和控制线程,而 threading 模块通过对 thread 进行二次封装,提供了更方便的 api 来处理线程。

    
    
    #重点: 多线程传参数 args  1个参数的时候 内部一点要逗号结尾,否则报错。不认为是元祖
     1 import threading,time
     2 def a(num):
     3     time.sleep(1)
     4     print("thread'%d'"%num)
     5     return
     6 for i in range(10):
     7     t = threading.Thread(target=a,args=(i,))#i 不加逗号是代表一个参数,加逗号是代表一个数组 8     t.setDaemon(True)
     9     t.start()
    10     t.join()
    11 print("t.setDaemon(True)",type)

    创建线程的两种方式

     1 第一种创建线程的方式 创建20个线程

     1 import threading
     2 import time
     3   
     4 def worker(num):
     5     """
     6     thread worker function
     7     :return:
     8     """
     9     time.sleep(1)#隔一秒再去抢占资源
    10     print("Thread %d" % num)
    11     return
    12   
    13 for i in range(20):
    14     t = threading.Thread(target=worker,args=(i,),name=“t.%d” % i)#创建线程干事(创建线程并执行worker函数和参数i一起传入方法中)
    15     t.start()  #激活线程
    16             ......#程序到这夯筑了,不动了,程序也不结束,等着举手后,子线程执行完后程序结束
    17  
    18 比喻:运动员(子线程)在起跑线上等待枪响,抢不响就等待,枪响后(子线程)开始跑

    2 第二种创建线程的方式 创建20个线程

     1 class MyThread(threading.Thread):
     2     def __init__(self,name):
     3         # threading.Thread.__init__(self)
     4         super(MyThread,self).__init__(target=self.fun,name="t %d" %i)
     5         self.name = name
     6  
     7     def fun(self):
     8         time.sleep(2)
     9         print("name %s thread %s" % (self.name,threading.current_thread().name) )
    10  
    11 for i in range(20):
    12     t = MyThread(i)
    13     t.start()

    多线程的说明

    threading用于提供线程相关的操作,线程是应用程序中工作的最小单元。python当前版本的多线程库没有实现优先级、线程组,线程也不能被停止、暂停、恢复、中断。

    threading模块提供的类:  
      Thread, Lock, Rlock, Condition, [Bounded]Semaphore, Event, Timer, local。

    threading 模块提供的常用方法: 
      threading.currentThread(): 返回当前的线程变量。 
      threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。 
      threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。

    threading 模块提供的常量:

      threading.TIMEOUT_MAX 设置threading全局超时时间。

    import threading
    import time
      
    def worker(num):
        """
        thread worker function
        :return:
        """
        time.sleep(1)#隔一秒再去抢占资源
        print("Thread %d" % num)
        return
      
    for i in range(20):
        t = threading.Thread(target=worker,args=(i,),name=“t.%d” % i)#创建线程D对象并实例化去干事(创建线程并执行worker函数和参数i一起传入方法中)
        t.start()  #激活线程
                t.join()
    print("t.setDaemon(True)",type)
     
     
    程序激活后,主线程从上到下执行,t 子线程启动后,与主线程并行,抢占CPU资源,一次输出结果几乎同时打印,没有先后顺序(没有sleep情况)
    View Code
    输出:
    t.setDaemon(True) <class 'type'>
    thread'2'
    thread'6'
    thread'4'
    thread'0'
    thread'8'
    thread'1'
    thread'7'
    thread'3'
    thread'5'
    thread'9'
    
    加join():逐个执行每个线程,执行完毕后继续往下执行,该方法使得多线程无意义。 即等子线程执行完后 再去执行主线程。 输出: thread
    '0' thread'1' thread'2' thread'3' thread'4' thread'5' thread'6' thread'7' thread'8' thread'9' t.setDaemon(True) <class 'type'>
    e.g
     1 import threading
     2 import time
     3 
     4 def foo(n):
     5     print("foo%s"%n)
     6     time.sleep(1)
     7 
     8 def bar(n):
     9     print("bar%s"%n)
    10     time.sleep(2)
    11 
    12 t1 = threading.Thread(target=foo,args=(1,))
    13 t2 = threading.Thread(target=bar,args=(2,))
    14 
    15 t1.start()
    16 t2.start()
    17 
    18 print("...in the main...")
    程序启动后,主线程从上到下依次执行,t1、t2两个子线程启动后,与主线程并行,抢占CPU资源。因此,前三行的输出结果几乎同时打印,没有先后顺序,
    此时,需要等t1和t2都结束后程序才结束。故等待2s后,进程结束。程序总共花了2s。 输出: foo1 bar2 ...in the main...

     E.g

     1 import threading
     2 from time import ctime,sleep
     3 import time
     4 def music(func):
     5      for i in range(2):
     6          print ("Begin listening to %s. %s" %(func,ctime()))
     7          sleep(3)
     8          print("end listening %s"%ctime())
     9 def move(func):
    10      for i in range(2):
    11         print ("Begin watching at the %s! %s" %(func,ctime()))
    12         sleep(5)
    13         print('end watching %s'%ctime())
    14 threads = []
    15 t1 = threading.Thread(target=music,args=('凤凰于飞1',))
    16 threads.append(t1)
    17 t2 = threading.Thread(target=move,args=('喜欢你2',))
    18 threads.append(t2)
    19 if __name__ == '__main__':
    20      for t in threads:
    21          t.start()
    22          t.join()
    23      print ("all over %s" %ctime())
    View Code
    一:t.join()在for内部时 按子线程的顺序一个一个执行
    输出:先t1线程启动 Begin listening to 凤凰于飞1. Fri Aug
    25 19:43:27 2017 #隔三秒 同时输出 end listening Fri Aug 25 19:43:30 2017 Begin listening to 凤凰于飞1. Fri Aug 25 19:43:30 2017 #四秒后t2线程启动,第4秒,同时输出 end listening Fri Aug 25 19:43:33 2017 Begin watching at the 喜欢你2! Fri Aug 25 19:43:33 2017 #第五秒 end watching Fri Aug 25 19:43:38 2017 Begin watching at the 喜欢你2! Fri Aug 25 19:43:38 2017 #最后t2线程结束,主线程一起结束 end watching Fri Aug 25 19:43:43 2017 all over Fri Aug 25 19:43:43 2017
    二:
    无join():
    #
    启动t1,t2线程 和主线程一起并行输出 Begin listening to 凤凰于飞1. Fri Aug 25 19:45:44 2017 Begin watching at the 喜欢你2! Fri Aug 25 19:45:44 2017 all over Fri Aug 25 19:45:44 2017 #隔三秒 end listening Fri Aug 25 19:45:47 2017 Begin listening to 凤凰于飞1. Fri Aug 25 19:45:47 2017 #隔五秒 end watching Fri Aug 25 19:45:49 2017 Begin watching at the 喜欢你2! Fri Aug 25 19:45:49 2017 #第六秒 end listening Fri Aug 25 19:45:50 2017 #最后 end watching Fri Aug 25 19:45:54 2017
    三:t.join()与for抬头对齐时,按时间的先后顺序执行

    1
    Begin listening to 凤凰于飞1. Fri Aug 25 20:17:38 2017 2 Begin watching at the 喜欢你2! Fri Aug 25 20:17:38 2017
    #隔三秒 3 end listening Fri Aug 25 20:17:41 2017 4 Begin listening to 凤凰于飞1. Fri Aug 25 20:17:41 2017
    #隔五秒 5 end watching Fri Aug 25 20:17:43 2017 6 Begin watching at the 喜欢你2! Fri Aug 25 20:17:43 2017
    #第六秒 7 end listening Fri Aug 25 20:17:44 2017
    #最后 8 end watching Fri Aug 25 20:17:48 2017 #t2结束,t2结束之前,主线程一直被阻塞。t2结束主线程继续执行
    9 all over Fri Aug 25 20:17:48 2017 #主线程结束

     setDaemon(True):不等,主线程直接运行后就结束

    setDaemon(False):等,主线程运行后,等待子线程运行完后,才结束

    将线程声明为守护线程,必须在start() 方法调用之前设置, 如果不设置为守护线程程序会被无限挂起。当我们 在程序运行中,执行一个主线程,如果主线程又创建一个子线程,主线程和子线程 就兵分两路,分别运行,那么当主线程完成想退出时,会检验子线程是否完成。如 果子线程未完成,则主线程会等待子线程完成后再退出。但是有时候我们需要的是 只要主线程完成了,不管子线程是否完成,都要和主线程一起退出,这时就可以用setDaemon方法。

    默认是 setDaemon(False)  即 主线程会handle住,一直等待多线程执行完毕
    如果不想让 主线程等待子线程,那么我们直接在start之前 改t.setDaemon(True) 即可

    主线程等子线程,属于后台子线程

    主线程不等子线程,属于前台子线程

     1 import threading
     2 from time import ctime,sleep
     3 import time
     4 def music(func):
     5      for i in range(2):
     6          print ("Begin listening to %s. %s" %(func,ctime()))
     7          sleep(3)
     8          print("end listening %s"%ctime())
     9 def move(func):
    10      for i in range(2):
    11         print ("Begin watching at the %s! %s" %(func,ctime()))
    12         sleep(5)
    13         print('end watching %s'%ctime())
    14 threads = []
    15 t1 = threading.Thread(target=music,args=('凤凰于飞1',))
    16 threads.append(t1)
    17 t2 = threading.Thread(target=move,args=('喜欢你2',))
    18 threads.append(t2)
    19 # if __name__ == '__main__':
    20 for t in threads:
    21     t.setDaemon(True)#不等子线程,直接主线程运行完结束
    22     t.start()
    23     # t.join()
    24 print ("all over %s" %ctime())
    输出:
    1
    Begin listening to 凤凰于飞1. Fri Aug 25 21:05:55 2017 #t1和t2启动,分别打印一次后sleep,主进程继续

    2 Begin watching at the 喜欢你2! Fri Aug 25 21:05:55 2017
    3 all over Fri Aug 25 21:05:55 2017  #主进程结束,程序结束

    可见由于setDaemon(True)把子线程设置为守护线程,子线程启动后,父线程也继续执行下去,当父线程执行完最后一条语句print ("all over %s" %ctime())后,

    没有等待子线程,直接就退出了,同时子线程也一同结束。

    一、setDaemon(True):【在没有t.join()情况下起作用】变成守护线程
        setDaemon(False):可忽略掉,默认值为False
    二、把代码换成下面时,标记红色字体共存情况下,t.join()会使 t.setDaemon(True)失效
    for t in threads:
        t.setDaemon(True)
        t.start()
        t.join()
    print ("all over %s" %ctime())
    输出:如下

    Begin listening to 凤凰于飞1. Fri Aug 25 21:40:20 2017


    end listening Fri Aug 25 21:40:23 2017
    Begin listening to 凤凰于飞1. Fri Aug 25 21:40:23 2017


    end listening Fri Aug 25 21:40:26 2017
    Begin watching at the 喜欢你2! Fri Aug 25 21:40:26 2017


    end watching Fri Aug 25 21:40:31 2017
    Begin watching at the 喜欢你2! Fri Aug 25 21:40:31 2017


    end watching Fri Aug 25 21:40:36 2017
    all over Fri Aug 25 21:40:36 2017

     该单线程例子与上面有t.join()的多线程例子对比:join()方法使多线程无意义,仅仅是使主线程最后输出而已;单线程就会一直阻塞主线程

    无法运行下去。在这里父线程没法继续执行for循环,所以第二个子线程也就不会出现了。

     1 import threading
     2 from time import ctime,sleep
     3 import time
     4 def music(func):
     5     while True:
     6      # for i in range(2):
     7          print ("Begin listening to %s. %s" %(func,ctime()))
     8          sleep(3)
     9          print("end listening %s"%ctime())
    10          # sleep(3)
    11 threads = []
    12 t1 = threading.Thread(target=music,args=('凤凰于飞1',))
    13 threads.append(t1)
    14 t2 = threading.Thread(target=music,args=('喜欢你2',))
    15 threads.append(t2)
    16 # if __name__ == '__main__':
    17 for t in threads:
    18     # t.setDaemon(False)
    19     t.start()
    20     t.join()
    21 print ("all over %s" %ctime())

    主线程从上到下执行 遇到join后进入 while True后死循环了,所以t2线程执行不了。
    这样一直循环下去,可见只有第一个子线程被调用了,第二个子线程,以及父线程都没有继续走下去。这里我的理解是:join()的作用是,在子线程完成运行之前,这个子线程的父线程将
    一直被阻塞,无法运行下去。在这里父线程没法继续执行for循环,所以第二个子线程也就不会出现了

    输出:
    1
    Begin listening to 凤凰于飞1. Fri Aug 25 22:12:20 2017
    下面是循环第一个线程,一直阻塞其他线程 2 end listening Fri Aug 25 22:12:23 2017 3 Begin listening to 凤凰于飞1. Fri Aug 25 22:12:23 2017
    4 end listening Fri Aug 25 22:12:26 2017 5 Begin listening to 凤凰于飞1. Fri Aug 25 22:12:26 2017
    1 def music(func):
    2     # while True:
    3      # for i in range(2):
    4          print ("Begin listening to %s. %s" %(func,ctime()))
    5          sleep(3)
    6          print("end listening %s"%ctime())
    将上述while True给去掉后输出:
    1 Begin listening to 凤凰于飞1. Fri Aug 25 22:18:48 2017
    2 end listening Fri Aug 25 22:18:51 2017
    3 Begin listening to 喜欢你2. Fri Aug 25 22:18:51 2017
    4 end listening Fri Aug 25 22:18:54 2017
    5 all over Fri Aug 25 22:18:54 2017
    e.g
    def music(func):
    while True:
    # for i in range(2):
    print ("Begin listening to %s. %s" %(func,ctime()))
    sleep(3)
    # print("end listening %s"%ctime())
    # sleep(3)
    threads = []
    t1 = threading.Thread(target=music,args=('凤凰于飞1',))
    threads.append(t1)
    t2 = threading.Thread(target=music,args=('喜欢你2',))
    threads.append(t2)
    # if __name__ == '__main__':
    for t in threads:
    # t.setDaemon(False)
    t.start()
    for t in threads:
    t.join()
    print ("all over %s" %ctime())
    输出:循环2个线程,随机输出(抢占CPU资源)
    1
    Begin listening to 凤凰于飞1. Fri Aug 25 22:39:35 2017 2 Begin listening to 喜欢你2. Fri Aug 25 22:39:35 2017
    3 Begin listening to 喜欢你2. Fri Aug 25 22:39:38 2017 4 Begin listening to 凤凰于飞1. Fri Aug 25 22:39:38 2017
    5 Begin listening to 凤凰于飞1. Fri Aug 25 22:39:41 2017 6 Begin listening to 喜欢你2. Fri Aug 25 22:39:41 2017
    7 Begin listening to 凤凰于飞1. Fri Aug 25 22:39:44 2017 8 Begin listening to 喜欢你2. Fri Aug 25 22:39:44 2017

     守护线程脚本讲解    

    我的一个main_thread 主线程产生10个子线程,如果主线程不是守护进程,他们都是并发,执行完才会走下一个主进程,如果设置为守护进程,一旦执行下一个主线程,

    代表main_thread结束,其他线程执行多少算多少。

     1 import threading
     2 import time
     3 def run(num):
     4     # pass
     5     if not num == 5:
     6         time.sleep(1)
     7     print (" hi i am thread %s ...lalala 
    " % num)
     8 def main(n):
     9     print ("------running main thread----------")
    10     for i in range(10):
    11         t = threading.Thread(target=run,args=(i,))
    12         t.start()
    13     #time.sleep(3)
    14     print ("------done main thread----------")
    15 main_thread = threading.Thread(target=main,args=(10,)) #主线程产生10个子线程
    16 main_thread.setDaemon(True) #将主线程设置为守护线程
    17 main_thread.start()
    18 time.sleep(2)
    19 print ('
    ------->>>>>')  #顶格的都是主线程
    View Code
    输出:
    ------running main thread---------- hi i am thread 5 ...lalala ------done main thread----------
    并发输出上面三项
    hi i am thread 8 ...lalala hi i am thread 2 ...lalala hi i am thread 4 ...lalala hi i am thread 3 ...lalala hi i am thread 6 ...lalala hi i am thread 0 ...lalala hi i am thread 7 ...lalala hi i am thread 1 ...lalala hi i am thread 9 ...lalala #最后输出的 ------->>>>>

    thread方法说明

    t.start() : 激活线程

    t.getName() : 获取线程的名称

    t.setName() : 设置线程的名称 

    t.name : 获取或设置线程的名称

    t.is_alive() : 判断线程是否为激活状态

    t.isAlive() :判断线程是否为激活状态

    t.setDaemon(): 设置为后台线程或前台线程(默认:False);通过一个布尔值设置线程是否为守护线程,必须在执行start()方法之后才可以使用。如果是后台线程,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,均停止;如果是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序停止

    t.isDaemon() : 判断是否为守护线程

    t.ident :获取线程的标识符。线程标识符是一个非零整数,只有在调用了start()方法之后该属性才有效,否则它只返回None。

    t.join() :逐个执行每个线程,执行完毕后继续往下执行,该方法使得多线程变得无意义

    t.run() :线程被cpu调度后自动执行线程对象的run方法

    join 代码 
    join在start之上,当代码运行到start()举手了,接着运行join()就不再继续往下进行了,等待上面的线程运行完后,再往下走
    
    
    E.g  join()主线程等待,知道子线程执行完;join(n)最多等n秒。

    1
    import time,threading 2 def f1(): 3 pass 4 def f2(arg1,arg2): 5 time.sleep(5) 6 f1() 7 8 t = threading.Thread(target=f2,args=(1,2,)) 9 t.start() 10 print('000') 11 t.join(2) 12 print('111')

    输出:000
    111 #等2秒后输出的

    
    
     1 #!/usr/bin/env python
     2 # _*_ coding:utf-8 _*_
     3 
     4 
     5 import time,threading
     6 
     7 def f1():
     8     pass
     9 
    10 def f2(arg1,arg2):
    11     time.sleep(3)
    12     print(4+5)
    13     b = time.time()
    14     print(b - a)
    15     f1()
    16 a = time.time()
    17 
    18 # join   join(timeout=None)  默认timeout无值的时候,子线程会执行完再往下执行
    19 t = threading.Thread(target=f2,args=(1,2,))
    20 t.start()
    21 t.join()
    22 t = threading.Thread(target=f2,args=(1,2,))
    23 t.start()
    24 t.join()
    25 t = threading.Thread(target=f2,args=(1,2,))#重点: 多线程传参数 args  1个参数的时候 内部一点要逗号结尾,否则报错。不认为是元祖
    26 t.start()
    27 t.join()
    输出:
    1
    9 2 3.000171661376953 3 9 4 6.000343322753906 5 9 6 9.00051474571228

    Lock、Rlock类

    import threading
    import time
    def run(num):
        global NUM
        time.sleep(1)
        print(" hi i am thread %s ...lalala " % num)
        NUM += 1
    
    NUM =0
    p_list = []
    
    for i in range(5):
        t = threading.Thread(target=run,args=(i,))
        t.start()
        p_list.append(t)  #我们自己实现并行,先让并发线程执行,加到列表等待我们取结果即可
        #t.jion() # 等待一个线程结束才会执行第二个线程,这样就成了串行,而不是并行了
    for i in p_list: #
        i.join() #取出我们上面放入的结果。 但是串行取出
    print( '---->',NUM)    #由于加入列表时候是并发加入的,数字没有先后,所以打印结果i的时候也就看到没有顺序。最后打印的的i会导致 NUM的变化
    先执行 并发的线程,再执行主线程,输出无序
    import threading
    import time
    
    def run(num):
        global NUM
        lock.acquire()  #上锁 注意位置,理论上是要 上锁  处理数据 解锁。一定要看好sleep时间
        print (" hi i am thread %s ...lalala " % num)
        NUM += 1
        lock.release()  #释放锁
        time.sleep(1)
    NUM =0
    p_list = []
    lock = threading.Lock()  #制造一把锁
    for i in range(5):
        t = threading.Thread(target=run,args=(i,))
        t.start()
        p_list.append(t)  #我们自己实现并行,先让并发线程执行,加到列表等待我们取结果即可
        #t.jion() # 等待一个线程结束才会执行第二个线程,这样就成了串行,而不是并行了
    for i in p_list: #
        i.join() #取出我们上面放入的结果。 但是串行取出
    print( '---->',NUM)   #由于加锁了,所以会数字不会变化
    实现并发线程,再去执行主线程,输出且是有序

    第一个例子:对于第一个线程拿到 num = 0再没有自增一之前,CPU切换了第二个线程去了,二个线程都自增了,同时处理该数据时就会出现脏数据,这时就需要锁。

    由于线程之间随机调度:某线程可能在执行n条后,CPU接着执行其他线程。为了多个线程同时操作一个内存中的资源时不产生混乱,我们使用锁。

    Lock(指令锁)是可用的最低级的同步指令。Lock处于锁定状态时,不被特定的线程拥有。Lock包含两种状态——锁定和非锁定,以及两个基本的方法。

    可以认为Lock有一个锁定池,当线程请求锁定时,将线程至于池中,直到获得锁定后出池。池中的线程处于状态图中的同步阻塞状态。

    RLock(可重入锁)是一个可以被同一个线程请求多次的同步指令。RLock使用了“拥有的线程”和“递归等级”的概念,处于锁定状态时,RLock被某个线程拥有。拥有RLock的线程可以再次调用acquire(),释放锁时需要调用release()相同次数。

    可以认为RLock包含一个锁定池和一个初始值为0的计数器,每次成功调用 acquire()/release(),计数器将+1/-1,为0时锁处于未锁定状态。

    RLock允许在同一线程中被多次acquire。而Lock却不允许这种情况。 如果使用RLock,那么acquire和release必须成对出现,即调用了n次acquire,必须调用n次的release才能真正释放所占用的琐。

    简言之:Lock属于全局,Rlock属于线程。

    构造方法: 
    Lock(),Rlock()
    推荐使用Rlock()

    实例方法: 
      acquire([timeout]): 尝试获得锁定。使线程进入同步阻塞状态。 
      release(): 释放锁。使用前线程必须已获得锁定,否则将抛出异常。

    #coding:utf-8
    import threading
    import time
    
    gl_num = 0 #全局变量
    
    def show(arg):
        global gl_num
        time.sleep(1)
        gl_num +=1
        print (gl_num)
    
    for i in range(10):
        t = threading.Thread(target=show, args=(i,))
        t.start()
    
    print'main thread stop'
    import threading
    import time
    
    gl_num = 0
    
    lock = threading.RLock()
    
    
    # 调用acquire([timeout])时,线程将一直阻塞,
    # 直到获得锁定或者直到timeout秒后(timeout参数可选)。
    # 返回是否获得锁。
    def Func():
        lock.acquire()
        global gl_num
        gl_num += 1
        time.sleep(1)
        print (gl_num)
        lock.release()
    
    
    for i in range(10):
        t = threading.Thread(target=Func)
        t.start()
    使用锁

    信号量 semaphore  讲解: 区别一个数据一个线程即一个厕所一把钥匙 一个厕所5个坑 5吧钥匙

    控制数据库链接

        semaphore = threading.BoundedSemaphore(5) 
     互斥锁 同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据 ,比如厕所有3个坑,那最多只允许3个人上厕所,后面的人只能等里面有人出来了才能再进去。
    import threading
    import time
    def run(n):
        semaphore.acquire()
        time.sleep(1)
        print ("run the thread :%s
     " % n)
        semaphore.release()
    
    if __name__ == '__main__':
        num = 0
        semaphore = threading.BoundedSemaphore(5)##最多允许5个线程同时运行
    
        for i in range(10):
            t = threading.Thread(target=run,args=(i,))
            t.start()
    while threading.active_count != 1 :#一个程序至少有一个进程一个线程。所以等于1 就会结束
        pass
    else:
        print ( '-----all thread done-----')
        print ('---->',num)
    run the thread :3
     run the thread :4
     run the thread :2
     run the thread :1
     run the thread :0
     
    
    
    
    
    run the thread :6
     run the thread :5
     
    run the thread :9
     
    run the thread :7
     
    
    run the thread :8
    输出结果

    Event类

    Event(事件)是最简单的线程通信机制之一:一个线程通知事件,其他线程等待事件。Event内置了一个初始为False的标志,当调用set()时设为True,调用clear()时重置为 False。wait()将阻塞线程至等待阻塞状态。

      Event其实就是一个简化版的 Condition。Event没有锁,无法使线程进入同步阻塞状态。

    构造方法: 
    Event()

    实例方法: 
      isSet(): 当内置标志为True时返回True。 
      set(): 将标志设为True,并通知所有处于等待阻塞状态的线程恢复运行状态。 
      clear(): 将标志设为False。 
      wait([timeout]): 如果标志为True将立即返回,否则阻塞线程至等待阻塞状态,等待其他线程调用set()。

    import threading
    import time
    
    event = threading.Event()#一旦遇到wait就把所有线程挡住
    
    
    def func():
        # 等待事件,进入等待阻塞状态
        print'%s wait for event...' % threading.currentThread().getName())
        event.wait()#阻塞,红灯
    
        # 收到事件后进入运行状态
        print'%s recv event.' % threading.currentThread().getName())
    
    
    t1 = threading.Thread(target=func)
    t2 = threading.Thread(target=func)
    t1.start()
    t2.start()
    
    time.sleep(2)
    
    # 发送事件通知
    print'MainThread set event.')
    event.set()#把红灯变绿 

    输出:

    Thread-1 wait for event...
    Thread-2 wait for event...
    
    #2秒后。。。
    MainThread set event.
    Thread-1 recv event.
    Thread-2 recv event.
    
    

    timer类

    Timer(定时器)是Thread的派生类,用于在指定时间后调用一个方法。

    构造方法: 
    Timer(interval, function, args=[], kwargs={}) 
      interval: 指定的时间 
      function: 要执行的方法 
      args/kwargs: 方法的参数

    实例方法: 
    Timer从Thread派生,没有增加实例方法。

    例子一: 线程延迟5秒后执行。

    import threading
    def func():
        print 'hello timer!'
    
    timer = threading.Timer(5, func)
    timer.start()

    local类

     local是一个小写字母开头的类,用于管理 thread-local(线程局部的)数据。对于同一个local,线程无法访问其他线程设置的属性;线程设置的属性不会被其他线程设置的同名属性替换。

      可以把local看成是一个“线程-属性字典”的字典,local封装了从自身使用线程作为 key检索对应的属性字典、再使用属性名作为key检索属性值的细节。

    
    
    import threading
     
    local = threading.local()
    local.tname = 'main'
     
    def func():
        local.tname = 'notmain'
        print( local.tname)
     
    t1 = threading.Thread(target=func)
    t1.start()
    t1.join()
     
    print (local.tname)
    输出:
    notmain
    main

    线程锁threading.RLock和threading.Lock

    我们使用线程对数据进行操作的时候,如果多个线程同时修改某个数据,可能会出现不可预料的结果,为了保证数据的准确性,引入了锁的概念。

    例:假设列表A的所有元素就为0,当一个线程从前向后打印列表的所有元素,另外一个线程则从后向前修改列表的元素为1,那么输出的时候,列表的元素就会一部分为0,一部分为1,这就导致了数据的不一致。锁的出现解决了这个问题。

    import threading
    import time
     
    globals_num = 0
     
    lock = threading.RLock()
     
    def Func():
        lock.acquire()  # 获得锁 
        global globals_num
        globals_num += 1
        time.sleep(1)
        print(globals_num)
        lock.release()  # 释放锁 
     
    for i in range(10):
        t = threading.Thread(target=Func)
        t.start()
    View Code

    threading.RLock和threading.Lock 的区别

    RLock允许在同一线程中被多次acquire。而Lock却不允许这种情况。 如果使用RLock,那么acquire和release必须成对出现,即调用了n次acquire,

    必须调用n次的release才能真正释放所占用的琐。

    import threading
    lock = threading.Lock()    #Lock对象
    lock.acquire()
    lock.acquire()  #产生了死琐。
    lock.release()
    lock.release()
    import threading
    
    rLock = threading.RLock()  #RLock对象
    
    rLock.acquire()
    
    rLock.acquire()    #在同一线程内,程序不会堵塞。
    
    rLock.release()
    
    rLock.release()

    RLOCK递归锁的用处  解锁多层

    import  threading,time
    
    number = 0
    lock = threading.RLock()#创建锁
    
    def run(num):
        lock.acquire() #获取一把锁(别人无法操作)
        global number#使用外部的全局变量
        #print 'hi ,i am the thread,',num
    
        number += 1
        lock.release() #解锁  上面是锁内独占内容
    
        print (number)
    
        time.sleep(1)
    for i in range(5): t = threading.Thread(target=run,args=(i,)) t.start()

    threading.Event

    Event是线程间通信最间的机制之一:一个线程发送一个event信号,其他的线程则等待这个信号。用于主线程控制其他线程的执行。 Events 管理一个flag,这个flag可以使用set()设置成True或者使用clear()重置为False,wait()则用于阻塞,在flag为True之前。Event内置了一个初始化为False的标志(flag)。

    • Event.wait([timeout]) : 堵塞线程,直到Event对象内部标识位被设为True或超时(如果提供了参数timeout)。
    • Event.set() :将标识位设为Ture
    • Event.clear() : 将标识伴设为False。
    • Event.isSet() :判断标识位是否为Ture。
    • import threading
      def do(event):
          print('start')
          event.wait()#相当于红灯,进入等待状态
          print( 'execute')
      #使一个线程等待其他线程的通知,我们把这个Event传递到线程对象中去
      event_obj = threading.Event()#创建事件对象(生成Event对象的内部信号标志),一旦遇到wait就会把所有的线程都挡住
      for i in range(5):
          t = threading.Thread(target=do, args=(event_obj,))
          t.start()
      
      event_obj.clear()#让灯变红(清除内部信号标志/继续阻塞)
      inp = input('input:')
      if inp == 'true':
          event_obj.set()#相当于把红灯变绿灯
      输出:
      start start start start start input:(等待输入使红灯变绿)
      
      

      !!线程之间交互  threading.Event方法  红灯 绿灯  信号标志位

    • import threading
      import time
      import random
      
      def light():
          if not event.isSet():  #没有设置的话
              event.set()  # 设置绿灯
          count = 0  #计数器秒数
          while True:
              if count < 10:    #小于十秒 是绿灯
                  print( "33[42;1m ------green light on ----33[0m")
              elif count < 15:  #小于13秒 大于10秒 是黄灯
                  print( "33[43;1m ------yellow light on ----33[0m")
              elif count < 25:  #小于于20秒 有设置则取消
                  if event.isSet():
                      event.clear()
                  print ("33[41;1m ------red light on ----33[0m")
              else:  #大于20 重新
                  count = 0    #取消秒数计时
                  event.set()   #重新变为绿灯
              time.sleep(1)
              count +=1
              
      def car(n):  # 第二个线程 车线程
          while 1:
              time.sleep(random.randrange(3))  #随机等待三秒
              if event.isSet():
                  print( "car [%s] is running..." % n)  #如果被设置了信号则是绿灯,该线程的车即可通过
              else:  #否则的话提示红灯
                  print ("car [%s] is waitting for the red light.." %n)
                  event.wait()   #红灯的话,会在此处卡住,不往下执行
                  print ("Green light is on ,car %s is running......." %n)
      if __name__ == '__main__':  #下面是定义了两个线程  ,灯线程 车线程, threading.Event用来设置标着符号让两个线程交流
          event = threading.Event()
          Light = threading.Thread(target=light)
          Light.start()
          for i in range(3):
              t = threading.Thread(target=car,args=(i,))
              t.start()
      View Code

      条件(Condition)

      使得线程等待,只有满足某条件时,才释放n个线程

    • Condition(条件变量)通常与一个锁关联。需要在多个Contidion中共享一个锁时,可以传递一个Lock/RLock实例给构造方法,否则它将自己生成一个RLock实例。

        可以认为,除了Lock带有的锁定池外,Condition还包含一个等待池,池中的线程处于等待阻塞状态,直到另一个线程调用notify()/notifyAll()通知;得到通知后线程进入锁定池等待锁定。

      构造方法: 
      Condition([lock/rlock])

      实例方法: 
        acquire([timeout])/release(): 调用关联的锁的相应方法。 
        wait([timeout]): 调用这个方法将使线程进入Condition的等待池等待通知,并释放锁。使用前线程必须已获得锁定,否则将抛出异常。 
        notify(): 调用这个方法将从等待池挑选一个线程并通知,收到通知的线程将自动调用acquire()尝试获得锁定(进入锁定池);其他线程仍然在等待池中。调用这个方法不会释放锁定。使用前线程必须已获得锁定,否则将抛出异常。 
        notifyAll(): 调用这个方法将通知等待池中所有的线程,这些线程都将进入锁定池尝试获得锁定。调用这个方法不会释放锁定。使用前线程必须已获得锁定,否则将抛出异常。

    • 一个condition变量总是与某些类型的锁相联系,这个可以使用默认的情况或创建一个,当几个condition变量必须共享和同一个锁的时候,是很有用的。锁是conditon对象的一部分:没有必要分别跟踪。

      condition变量服从上下文管理协议:with语句块封闭之前可以获取与锁的联系。 acquire() 和 release() 会调用与锁相关联的相应的方法。

      其他和锁关联的方法必须被调用,wait()方法会释放锁,当另外一个线程使用 notify() or notify_all()唤醒它之前会一直阻塞。一旦被唤醒,wait()会重新获得锁并返回,

      Condition类实现了一个conditon变量。 这个conditiaon变量允许一个或多个线程等待,直到他们被另一个线程通知。 如果lock参数,被给定一个非空的值,,那么他必须是一个lock或者Rlock对象,它用来做底层锁。否则,会创建一个新的Rlock对象,用来做底层锁。

    • wait(timeout=None) : 等待通知,或者等到设定的超时时间。当调用这wait()方法时,如果调用它的线程没有得到锁,那么会抛出一个RuntimeError 异常。 wati()释放锁以后,在被调用相同条件的另一个进程用notify() or notify_all() 叫醒之前 会一直阻塞。wait() 还可以指定一个超时时间。
    • 如果有等待的线程,notify()方法会唤醒一个在等待conditon变量的线程。notify_all() 则会唤醒所有在等待conditon变量的线程。

      注意: notify()和notify_all()不会释放锁,也就是说,线程被唤醒后不会立刻返回他们的wait() 调用。除非线程调用notify()和notify_all()之后放弃了锁的所有权。

    • consumer()线程要等待producer()设置了Condition之后才能继续。
    • import threading
      def condition_func():
          ret = False
          inp = input('>>>')
          if inp == '1':
              ret = True
          return ret
      def run(n):
          con.acquire()
          con.wait_for(condition_func)
          print("run the thread: %s" %n)
          con.release()
      if __name__ == '__main__':
          con = threading.Condition()
          for i in range(5):
              t = threading.Thread(target=run, args=(i,))
              t.start()
      满足条件
      >>>1
      run the thread: 0
      >>>1
      run the thread: 1
      >>>1
      run the thread: 2
      >>>1
      run the thread: 3
      >>>1
      run the thread: 4
      输出结果
    • import threading
      def run(n):
          con.acquire()
          con.wait()
          print("run the thread: %s" % n)
          con.release()
      if __name__ == '__main__':
          con = threading.Condition()
          for i in range(5):
              t = threading.Thread(target=run, args=(i,))
              t.start()
          while True:
              inp = input('>>>')
              if inp == 'q':
                  break
              con.acquire()
              con.notify(int(inp))
              con.release()
      输出:

      >>>132
      >>>run the thread: 0
      run the thread: 2
      run the thread: 1
      run the thread: 4
      run the thread: 3

    • import threading
      import time
      
      # 商品
      product = None
      # 条件变量
      con = threading.Condition()
      
      # 生产者方法
      def produce():
          global product
          if con.acquire():
              while True:
                  if product is None:
                      print ('produce...')
                      product = ('anything')
      
                      # 通知消费者,商品已经生产
                      con.notify()
                  # 等待通知
                  con.wait()
                  time.sleep(2)
      
      # 消费者方法
      def consume():
          global product
          if con.acquire():
              while True:
                  if product is not None:
                      print ('consume...')
                      product = None
                      # 通知生产者,商品已经没了
                      con.notify()
                  # 等待通知
                  con.wait()
                  time.sleep(2)
      
      t1 = threading.Thread(target=produce)
      t2 = threading.Thread(target=consume)
      t2.start()
      t1.start()
      生产者消费者模型
      produce...
      consume...
      produce...
      consume...
      produce...
      consume...
      produce...
      consume...
      produce...
      consume...
      
      Process finished with exit code -1
      程序不断循环运行下去。重复生产消费过程。
      输出结果
      import threading
       
      alist = None
      condition = threading.Condition()
       
      def doSet():
          if condition.acquire():
              while alist is None:
                  condition.wait()
              for i in range(len(alist))[::-1]:
                  alist[i] = 1
              condition.release()
       
      def doPrint():
          if condition.acquire():
              while alist is None:
                  condition.wait()
              for i in alist:
                  print i,
              print
              condition.release()
       
      def doCreate():
          global alist
          if condition.acquire():
              if alist is None:
                  alist = [0 for i in range(10)]
                  condition.notifyAll()
              condition.release()
       
      tset = threading.Thread(target=doSet,name='tset')
      tprint = threading.Thread(target=doPrint,name='tprint')
      tcreate = threading.Thread(target=doCreate,name='tcreate')
      tset.start()
      tprint.start()
      tcreate.start()
      E.g 二、生产者消费者模型
      import threading
      import time
      
      condition = threading.Condition()
      products = 0
      
      class Producer(threading.Thread):
          def run(self):
              global products
              while True:
                  if condition.acquire():
                      if products < 10:
                          products += 1;
                          print "Producer(%s):deliver one, now products:%s" %(self.name, products)
                          condition.notify()#不释放锁定,因此需要下面一句
                          condition.release()
                      else:
                          print "Producer(%s):already 10, stop deliver, now products:%s" %(self.name, products)
                          condition.wait();#自动释放锁定
                      time.sleep(2)
      
      class Consumer(threading.Thread):
          def run(self):
              global products
              while True:
                  if condition.acquire():
                      if products > 1:
                          products -= 1
                          print "Consumer(%s):consume one, now products:%s" %(self.name, products)
                          condition.notify()
                          condition.release()
                      else:
                          print "Consumer(%s):only 1, stop consume, products:%s" %(self.name, products)
                          condition.wait();
                      time.sleep(2)
      
      if __name__ == "__main__":
          for p in range(0, 2):
              p = Producer()
              p.start()
      
          for c in range(0, 3):
              c = Consumer()
              c.start()
      E.g 三、生产者消费者模型
      def produccer(cond):
          with cond:
              print("producer before notifyAll")
              cond.notifyAll()
              print("producer after notifyAll")
      
      condition = threading.Condition()
      c1 = threading.Thread(name="c1", target=consumer, args=(condition,))
      c2 = threading.Thread(name="c2", target=consumer, args=(condition,))
      p = threading.Thread(name="p", target=producer, args=(condition,))
      c1.start()
      time.sleep(2)
      c2.start()
      time.sleep(2)
      p.start()
      输出:
      consumer before wait consumer before wait producer before notifyAll producer after notifyAll consumer after wait consumer after wait

      queue模块(FIFO)

    • Queue 就是对队列,它是线程安全的

      举例来说,我们去肯德基吃饭。厨房是给我们做饭的地方,前台负责把厨房做好的饭卖给顾客,顾客则去前台领取做好的饭。这里的前台就相当于我们的队列。

      这个模型也叫生产者-消费者模型。

    •  先进先出:比如排队卖火车票

      import queue
      #队列容量,内存级别队列
      q = queue.Queue(maxsize=0)  # 构造一个先进先出队列,maxsize指定队列长度,为0 时,表示队列长度无限制。
      #阻塞调用的线程,知道队列中所有任务呗处理掉
      q.join()    # 等到队列为kong的时候,在执行别的操作
      
      q.qsize()   # 返回队列的大小 (不可靠)
      
      q.empty()   # 当队列为空的时候,返回True 否则返回False (不可靠)
      
      q.full()    # 当队列满的时候,返回True,否则返回False (不可靠)
      
      q.put(item, block=True, timeout=None) #  将item放入Queue尾部,item必须存在(相当于append),可以参数block默认为True,表示当队列满时,会等待队列给出可用位置,
      
                               为False时为非阻塞,此时如果队列已满,会引发queue.Full 异常。 可选参数timeout,表示 会阻塞设置的时间,过后,
      
                                如果队列无法给出放入item的位置,则引发 queue.Full 异常
      
      q.get(block=True, timeout=None) # 等  移除并返回队列头部的一个值,可选参数block默认为True,表示获取值的时候,如果队列为空,则阻塞,为False时,不阻塞,
      
                            若此时队列为空,则引发 queue.Empty异常。 可选参数timeout,表示会阻塞设置的时候,过后,如果队列为空,则引发Empty异常。
      
      q.put_nowait(item) #   等效于 put(item,block=False)
      
      q.get_nowait() #不等    等效于 get(item,block=False)
      import threading, time
      import queue
      import random
      q = queue.Queue()
      
      def Produce(name):
          for i in range(20):
              q.put(i)  # 放到队列
              print ("33[32;1mProducer %s has made %s baozi .. 33[0m" % (name, i))
              time.sleep(random.randrange(1))  # 随机休息
      def Consumer(name):
          count = 0
          while count < 20:
              data = q.get()  # 取得队列上的
              print("33[31;1mConsumer %s has eaten %s baozi ... chihuo.. 33[0m" % (name, data))
              count += 1
              time.sleep(random.randrange(4))  # 随机休息,但是取得的东西比生产快
      
      p = threading.Thread(target=Produce, args=('alex',))  # 生产者
      c = threading.Thread(target=Consumer, args=('liu',))  # 消费者
      p.start()  # 启动
      c.start()
      卖包子和吃饱 例子

       多进程 

    • multiprocessing模块

      multiprocessing是python的多进程管理包,和threading.Thread类似。直接从侧面用subprocesses替换线程使用GIL的方式,由于这一点,multiprocessing模块可以让程序员在给定的机器上充分的利用CPU。

      在multiprocessing中,通过创建Process对象生成进程,然后调用它的start()方法,

    • 在UNIX平台上,当某个进程终结之后,该进程需要被其父进程调用wait,否则进程成为僵尸进程(Zombie)。所以,有必要对每个Process对象调用join()方法 (实际上等同于wait)。对于多线程来说,由于只有一个进程,所以不存在此必要性。
    • 注意:由于进程之间的数据需要各自持有一份,所以创建进程需要的非常大的开销(相当于重新建房子)。
    • multiprocessing包是Python中的多进程管理包。与threading.Thread类似,它可以利用multiprocessing.Process对象来创建一个进程。该进程可以运行在Python程序内部编写的函数。该Process对象与Thread对象的用法相同,也有start(), run(), join()的方法。此外multiprocessing包中也有Lock/Event/Semaphore/Condition类 (这些对象可以像多线程那样,通过参数传递给各个进程),用以同步进程,其用法与threading包中的同名类一致。所以,multiprocessing的很大一部份与threading使用同一套API,只不过换到了多进程的情境。
      
      但在使用这些共享API的时候,我们要注意以下几点:
      
      在UNIX平台上,当某个进程终结之后,该进程需要被其父进程调用wait,否则进程成为僵尸进程(Zombie)。所以,有必要对每个Process对象调用join()方法 (实际上等同于wait)。对于多线程来说,由于只有一个进程,所以不存在此必要性。
      multiprocessing提供了threading包中没有的IPC(比如Pipe和Queue),效率上更高。应优先考虑Pipe和Queue,避免使用Lock/Event/Semaphore/Condition等同步方式 (因为它们占据的不是用户进程的资源)。
      多进程应该避免共享资源。在多线程中,我们可以比较容易地共享资源,比如使用全局变量或者传递参数。在多进程情况下,由于每个进程有自己独立的内存空间,以上方法并不合适。此时我们可以通过共享内存和Manager的方法来共享资源。但这样做提高了程序的复杂度,并因为同步的需要而降低了程序的效率。
      详细信息

      进程各自持有一份数据,默认无法共享数据,每个进程都有自己的数据。

    • import time
      from multiprocessing import Process
      def a(i):
          time.sleep(2)
          print(i)
      
      if __name__ == '__main__':
          p = Process(target=a,args=(111,))
          # print(t.daemon)
          # p.deamon()
          p.start()
          # p.join()
          p1 = Process(target=a,args=(22,))
          p1.start()
          # p1.join()
          print('end',)
    • from multiprocessing import Process
      def f(name):
          print("hello", name)
      if __name__ == "__main__":  #windows系统环境下必须有这句话
        for i in range(5):
          p = Process(target=f, args=(i,))
          print(123)
          p.start()
          p.join()
      输出不是有序的,因为具体哪个进程被调用是有CPU决定,Process换成threading.Thread后 线程间的数据共享
      from multiprocessing import Process
      import time,threading
      li = []
      def foo(i):
      li.append(i)
      print('say hi', li)
      if __name__ == "__main__":
      for i in range(5):
      p = Process(target=foo, args=(i,))
      # p = threading.Thread(target=foo, args=(i,))
      p.start()
      输出:

      say hi [0]
      say hi [4]
      say hi [3]
      say hi [1]
      say hi [2]

      把Process换成threading.Thread后,输出:

      say hi [0]
      say hi [0, 1]
      say hi [0, 1, 2]
      say hi [0, 1, 2, 3]
      say hi [0, 1, 2, 3, 4]

      进程间的数据共享

    • Array 共享进程间的列表  linux无问题。windows下有问题

    • # 内存演示, 线程的内存是1份  进程是独自申请线性地址空间
    #线程与进程对比
    import
    time import threading li = [] def f(i): time.sleep(3) li.append(i) print(i,li) if __name__ == '__main__': for i in range(10): t = threading.Thread(target=f,args=(i,)) t.start()
    6 [6]
    4 [6, 4]
    3 [6, 4, 3]
    2 [6, 4, 3, 2]
    1 [6, 4, 3, 2, 1]
    5 [6, 4, 3, 2, 1, 5]
    0 [6, 4, 3, 2, 1, 5, 0]
    8 [6, 4, 3, 2, 1, 5, 0, 8]
    7 [6, 4, 3, 2, 1, 5, 0, 8, 7]
    9 [6, 4, 3, 2, 1, 5, 0, 8, 7, 9]
    import time
    from multiprocessing import Process, Array,Manager
    # 方法一 Array模块(数组类型)的进程间共享
    li = []
    def f(i):
        time.sleep(3)
        li.append(i)
        print(i,li)
    if __name__ == '__main__':
        for i in range(5):
            p= Process(target=f, args=(i,))
            p.start()
    
    def Foo(i,temp):
        temp[i] = 100 + i
        for item in temp:
            print(i, '----->', item)
    if  __name__ == '__main__':
        #[]数组创建后不能改变统一的字符类型,内存地址的元素是连续挨着的
        temp = Array('i', [11, 22, 33, 44])#相当于2个房间之间的介质
        for i in range(2):
           p = Process(target=Foo, args=(i,temp,))
           p.start()
    Array模块来实现进程之间的数据共享 去Linux可以执行,Windows不行
    推荐使用方法二:
    # 方法二 Manager 进程间共享数据
    from multiprocessing import Process,Manager
    import time,threading
    
    def a(i,dic):
        print('123')
        dic[i] = 100 + i
        print(len(dic))
        # for k , v in dic.items():
        #   print(k,v)
    if __name__ == '__main__':
        manage = Manager()
        dic = manage.dict()#特殊的字典,主进程
        # dic={}#普通字典,dic[i]就一个值
        for i in range(2):
            p = Process(target=a ,args=(i,dic,))#每个进程进行操作是带有特殊的字典意义
            p.start()
    #......
    # p.join()#第一种方法 #执行完毕之后,关闭连接 #time.sleep(10)#第二种方法 要在主进程关闭之前(断开连接),才能修改主线程中的内容 r = input('>>>')#第三种方法 。没有输入就一直挂起 #即让主进程等着就行

    dic = manage.dict()该写成 dic={}会有啥输出结果
    
    
    123
    1
    123
    1
    dic = manage.dict()输出的结果
    p.join()#第一种方法
    123
    1
    123
    2
    r = input('>>>')#第三种方法 。没有输入就一直挂起
    123
    1
    123
    2
    >>>

    子进程要修改主线程,就要创建一个链接才能修改,join是一个一个去执行让主线程一直等待,没有join话,主进程一下全执行完,让后在start下方挂起,相当于

    断开连接,主进程里面代码都没运行完,即链接没断就可以链接(也就是上述的三种方法)。

     

    Pipe例子 管道

    import  multiprocessing,time
    
    def f(conn):
        conn.send([42,None,'HELLO'])
        conn.close
    
    if __name__ == '__main__':  #管道发送 子进程数据给父进程接收
        parent_conn,child__conn = multiprocessing.Pipe()
        p = multiprocessing.Process(target=f,args=(child__conn,))
        p.start()
        print parent_conn.recv()  #prints "[42,None,'hello']"
        p.join()
    
    
    
    #_*_coding:utf-8_*_
    __author__ = 'jianzuo'
    
    import  multiprocessing,time
    
    def f(conn):
        conn.send([42,None,'HELLO'])
        conn.close
    
    if __name__ == '__main__':  #管道发送 子进程数据给父进程接收
    
        A,B = multiprocessing.Pipe() #赋值A B为管道两端
        p = multiprocessing.Process(target=f,args=(B,))  #将b子进程数据发送到管道
        p.start()
        print A.recv()  #prints "[42,None,'hello']"   #父进程接收
        p.join()
    View Code

    Multiprocess 的queue 队列 案例

    import  multiprocessing,time
    def f(conn,q):
        conn.send([42,None,'HELLO'])
        q.put('hahhahhah')
        conn.close
    if __name__ == '__main__':  #管道发送 子进程数据给父进程接收
        '''
        parent_conn,child__conn = multiprocessing.Pipe()
        p = multiprocessing.Process(target=f,args=(child__conn,))
        p.start()
        print parent_conn.recv()  #prints "[42,None,'hello']"
        p.join()
        '''
        A,B = multiprocessing.Pipe() #赋值A B为管道两端
        Q = multiprocessing.Queue()
        p = multiprocessing.Process(target=f,args=(B,Q))  #将b子进程数据发送到管道
        p.start()
        print (A.recv() ) #prints "[42,None,'hello']"   #父进程接收
        print ("queue....----",Q.get())
        p.join()
    'c': ctypes.c_char,  'u': ctypes.c_wchar,
        'b': ctypes.c_byte,  'B': ctypes.c_ubyte,
        'h': ctypes.c_short, 'H': ctypes.c_ushort,
        'i': ctypes.c_int,   'I': ctypes.c_uint,
        'l': ctypes.c_long,  'L': ctypes.c_ulong,
        'f': ctypes.c_float, 'd': ctypes.c_double
    c语言数据类型

    当创建进程时(非使用时),共享数据会被拿到子进程中,当进程中执行完毕后,再赋值给原值。

    多进程LOCK,如果不锁 屏幕抢占输出进程锁实例

    from multiprocessing import Process, Array, RLock
    
    def Foo(lock,temp,i):
    
        """
    
        将第0个数加100
    
        """
    
        lock.acquire()
        temp[0] = 100+i
        for item in temp:
            print(i,'----->',item)
        lock.release()
    
    lock = RLock()
    temp = Array('i', [11, 22, 33, 44]) 
    if __name__ == '__main__':
       for i in range(20): 
    p
    = Process(target=Foo,args=(lock,temp,i,))
    p.start()

    进程池

    进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。

     python内部没有提供线程池需自定义

    进程池中有两个方法区别:

    • apply                   每一个任务是排队进行:每一个进程都有[ 进程.join() ]
    • apply_async        每一个任务都并发进行。可以设置回调函数,没有进程.无join() ==deamon(True)不等

    实际应用中,并不是每次执行任务的时候,都去创建多进程,而是维护了一个进程池,每次执行的时候,都去进程池取一个,如果进程池里面的进程取光了,就会阻塞在那里,直到进程池中有可用进程为止。首先来看一下进程池提供了哪些方法

    • apply(func[, args[, kwds]]) :使用arg和kwds参数调用func函数,结果返回前会一直阻塞,由于这个原因,apply_async()更适合并发执行,另外,func函数仅被pool中的一个进程运行。

    • apply_async(func[, args[, kwds[, callback[, error_callback]]]]) : apply()方法的一个变体,会返回一个结果对象。如果callback被指定,那么callback可以接收一个参数然后被调用,当结果准备好回调时会调用callback,调用失败时,则用error_callback替换callback。 Callbacks应被立即完成,否则处理结果的线程会被阻塞。

    • close() : 等待任务完成后在停止工作进程(相当于辞退,你交接后再走人),阻止更多的任务提交到pool,待任务完成后,工作进程会退出。

    • terminate() : 不管任务是否完成,立即停止工作进程(辞退,你直接走人)。在对pool对象进程垃圾回收的时候,会立即调用terminate()。

    • join() : 等待工作线程的退出,在调用join()前,必须调用close() or terminate()。这样是因为被终止的进程需要被父进程调用wait(join等价与wait,否则进程会成为僵尸进程。

    下面来简单的看一下代码怎么用的

    # join相当于wait 在UNIX平台上,当某个进程终结之后,该进程需要被其父进程调用wait,否则进程成为僵尸进程(Zombie)。所以,有必要对每个Process对象调用join()方法 (实际上等同于wait)。对于多线程来说,由于只有一个进程,所以不存在此必要性。

    from multiprocessing import Process, Pool
    import time
    def foo(i):
        time.sleep(1)
        print(i + 100)
        return i + 100#返回给回调函数
    def bar(args):
        print(args)
    if __name__ == '__main__':
        pool = Pool(3)#建立进程池为3,进程为3放不下,for循环,每次执行3个进程
        for i in range(6):
            #进程执行该函数,申请一个进程执行该函数,运行完后放回进程池。
           #pool.apply(func=foo, args=(i,)) # apply 默认是依次执行完每个进程才会进行下一个,相当于Daemon = True
           print(11111)
          # apply_async默认是不等待的相当于Daemon = False,,批量生成 , 而async这个方法还可以用回调函数,即,
          # foo的执行完,其返回值传给回调函数callback(干完后告诉我下),做处理
          # pool.apply_async(func=foo, args=(i,), callback=bar)
    
        print("end")
        pool.terminate()
        # pool.close()  # 阻止执行 超过规定的进程数量  close 是每个进程运行完毕才会继续,terminate是不管执行没有执行完,都跳过
        # pool.join()  # 上面必须有个close 或者terminate  否则发生assert错误 断言错误
                       #进程池中的进程执行完毕后再关闭,如果是注释,那么程序直接关闭。

    协程

    协程内部就是控制单个线程来回跳动执行的,又称微线程,不需要cpu的操作时候,比如并发请求网站的时候 。是网卡io请求,我们就可以用协程。爬虫。

    协程,又称微线程,协程执行看起来有点像多线程,但是事实上协程就是只有一个线程,因此,没有线程切换的开销,和多线程比,线程数量越多,协程的性能优势就越明显,此外因为只有一个线程,不需要多线程的锁机制,也不存在同时写变量冲突。协程的适用场景:当程序中存在大量不需要CPU的操作时(IO)下面来看一个利用协程例子

    协程的一个基本模块 greenlet ,功能很少,需要手动控制。而丰富的协程模块gevent 就是底层调用的greenlet,可以自动跳。

    协程详细:http://www.cnblogs.com/demon89/p/7436416.html

    安装方法

    参考http://www.cnblogs.com/liujianzuo888/articles/5507196.html

    源码或者二进制exe方式安装:需要手动安装依赖

    pip安装:会自动安装依赖,类似linux的yum

    python -m pip install --upgrade pip
    
    
    python -m pip install gevent

    线程和进程的操作是由程序触发系统接口,最后的执行者是系统;协程的操作则是程序员。区别

    协程存在的意义:对于多线程应用,CPU通过切片的方式来切换线程间的执行,线程切换时需要耗时(保存状态,下次继续)。

                                协程,则只使用一个线程,在一个线程中规定某个代码块执行顺序。

    greenlet(模块太Low,不推荐用)

    # 遇到greenlet 的switch 某个函数就会跳。
    from greenlet import greenlet
    def test1():
        print(12)
        gr2.switch()
        print(34)
        gr2.switch()
    def test2():
        print (56)
        gr1.switch()
        print(78)
    
    gr1 = greenlet(test1)
    gr2 = greenlet(test2)
    gr1.switch()

    gevent(推荐使用)

    相比greenlet gevent 是调的greenlet ,可以自动跳

    import gevent
    def foo():
        print('Running in foo')
        gevent.sleep(1) #跳过(不要继续执行了,跳到下个协程去)
        print('Explicit context switch to foo again')
    
    def bar():
        print('Explicit context to bar')
        gevent.sleep(3)
        print('Implicit context switch back to bar')
    
    gevent.joinall([
        gevent.spawn(foo),
        gevent.spawn(bar),
    ])

    遇到IO操作自动切换:

    爬虫专用:url----->调用gevent----->调用greenlet----->调用协程

    from gevent import monkey;monkey.patch_all()
    import gevent
    import requests
    def f(url):
        print('GET: %s' % url)
        resp = requests.get(url)
        data = resp.text
        print(len(data),url)
    
    gevent.joinall([
        gevent.spawn(f, 'https://www.python.org/'),
        gevent.spawn(f, 'https://www.yahoo.com/'),
        gevent.spawn(f, 'https://github.com/'),
    ])
    爬虫专用

     

    管理上下文的模块contextlib

    实现下面功能,就用管理上下文来实现
    # li.append(1)
    # q.get()
    # li.remove(1)
    import contextlib,queue
    @contextlib.contextmanager#处理上下文的装饰器
    def f1(xxx, arg):
        xxx.append(arg)
        try:
            yield 
        finally:
            print(123)
            xxx.remove(arg)
    q = queue.Queue()
    li = []
    q.put('liu')
    with f1(li,1):#with管理下面函数
        print("before",li)
        q.get()
    print("after", li)
    输出:
    before [1] 123 after []
    E.g管理上下文

       详细的上下文管理: http://www.cnblogs.com/demon89/p/7435694.html

    人的思维,观点是一直都会进化的。如果现在的思维,观点看法;和十年前的一模一样的话。那就可以说,你这十年是白活的。
  • 相关阅读:
    使用npm安装一些包失败了的看过来(npm国内镜像介绍)
    利用JMX统计远程JAVA进程的CPU和Memory
    Spring Boot学习笔记
    django数据库时间存储格式问题
    解决 Ubuntu 无法调节屏幕亮度的问题(转)
    django models auto_now和auto_now_add的区别
    django redis操作
    接口测试的工具
    django中migration文件是干啥的
    mysql简单操作(实时更新)
  • 原文地址:https://www.cnblogs.com/liuzhiyun/p/7429366.html
Copyright © 2011-2022 走看看