zoukankan      html  css  js  c++  java
  • Python_Example_Thread 线程 学习/经验/示例

     Author: 楚格

    2018-11-17     17:34:58

    IDE: Pycharm2018.02   Python 3.7   

    KeyWord :  线程 threading Thread

    Explain:  

    -----------------------------------------------------

    --

       1 # coding=utf-8
       2 #---------------------------------
       3 '''
       4 # Author  : chu ge 
       5 # Function: 线程 thread
       6 #
       7 '''
       8 #---------------------------------
       9 '''
      10 # --------------------------------
      11 # 导入模块 
      12 # 1.系统库
      13 # 2.第三方库
      14 # 3.相关定义库
      15 # --------------------------------
      16 '''
      17 # 1.系统库
      18 import sys
      19 import os
      20 import time
      21 import random
      22 
      23 #2.第三方库
      24 
      25 #进程导入模块
      26 # from multiprocessing import Process
      27 from multiprocessing import Pool
      28 # from multiprocessing import Queue
      29 # from multiprocessing import Manager
      30 
      31 #进程导入模块
      32 import threading
      33 from threading import Thread
      34 from threading import Lock
      35 from queue import Queue
      36 
      37 
      38 
      39 
      40 
      41 
      42 #
      43 '''
      44 ============================================================================
      45 #》》》》》》》》》》
      46 
      47 线程
      48 
      49 -----------------------
      50 python的thread模块比较底层,
      51 threading模块做了些包装,可以更加方便的被使用
      52 
      53 进程 VS 线程
      54 功能的不同
      55 进程,能够完成多任务,如一台电脑上可以同时运行多个微信
      56 线程,能够完成多任务,如一个微信界面中可以运行多个窗口
      57 定义的不同
      58 进程,是系统进行资源分配和调度的一个独立单位。
      59 线程,是进程的一个实体,是CPU调度和分派的基本单位,它是比进程更小的
      60 能独立运行的基本单位。
      61     线程自己基本不拥有系统资源,只拥有一点在运行中必不可少的资源,
      62     如:程序计数器,一组寄存器和栈,但是它可与同属一个进程的
      63     其他线程共享进程所拥有的全部的资源。
      64 
      65 区别
      66 一个程序至少有一个进程,一个进程至少有一个线程。
      67 线程的划分尺度小于进程,资源比进程少,使得多线程程序的并发性高。
      68 进程在执行过程中拥有独立的内存单元,而多个线程共享内存,从而极大提高程序的运行效率
      69 线程不能够独立执行,必须依存在进程中
      70 优缺点
      71 线程和进程在使用上各有优缺点:线程执行开销小但不利于资源管理和保护
      72 而进程正相反。
      73 
      74 ----------------------------------------------
      75 1. 线程 使用threading 模块
      76 
      77 -----------------------
      78 1.1 单 / 多线程执行
      79 e.g:
      80 #1如果多个线程执行同一函数,各是各的,互不影响
      81 def Function_Say_Sorry():
      82     print("A say: Sorry !")
      83     time.sleep(1)
      84 if __name__ == "__main__":
      85     # 单线程
      86     for var in range(10):
      87         Function_Say_Sorry()
      88 
      89     # 多线程执行
      90     for var in range(10):
      91         # 创建线程
      92         thread_name = threading.Thread(target = Function_Say_Sorry)
      93         thread_name.start()
      94 result:
      95 1)可以看出使用了多线程并发操作,花费时间要短很多
      96 2)创建好的线程,需要调用start()方法来启动
      97 
      98 -----------------------
      99 e.g:
     100 
     101 if __name__ == "__main__":
     102 
     103 result:
     104 
     105 -----------------------
     106 1.2 主线程会等待所有的子线程结束后才结束
     107 e.g:
     108 def Function_Sing():
     109     for var in range(10):
     110         print("正在唱歌 %d" % var)
     111         time.sleep(1)
     112 
     113 def Function_Dance():
     114     for var in range(5):
     115         print("正在跳舞 %d" % var)
     116         time.sleep(1)
     117         
     118 if __name__ == "__main__":
     119         # 主线程等待子线程结束而结束
     120     print("--- start ---:
     %s" % (time.time()))
     121 
     122     time_A = threading.Thread(target = Function_Sing)
     123     time_B = threading.Thread(target = Function_Dance)
     124     time_A.start()
     125     time_B.start()
     126     time.sleep(5) #
     127     print("--- finish ---:
    %s" % (time.time()))
     128     # 查看线程数量
     129     while True:
     130         length = len(threading.enumerate())
     131         print("当前运行的线程数为: [%d] " % (length))
     132         if length <= 1 :
     133             break
     134         time.sleep(0.5)
     135 result:
     136 
     137 -----------------------
     138 1.3 threading 
     139 1.3.1 线程执行代码的封装
     140 通过使用threading模块能完成多任务的程序开发,为了每个线程更完美
     141 所有使用threading模块时,往往定义新的子类class,
     142 只要继承threading.Thread就可以了,然后重写run方法
     143 
     144 e.g:
     145 #线程执行代码的封装
     146 class Class_MyThread(threading.Thread):
     147     def run(self):
     148         for var in range(10):
     149             time.sleep(1)
     150             message = "I am " + self.name + " @ " + str(var)
     151             print("message: %s" % (message))
     152 
     153 
     154 if __name__ == "__main__":
     155     thread_name = Class_MyThread()
     156     thread_name.start()
     157     
     158 result:
     159 python的threading.Thread类有一个run方法,
     160 用于定义线程的功能函数,可以在自己的线程类中覆盖还方法。
     161 而创建自己线程实例后,通过thread类start方法,可以启动该线程,
     162 交给python虚拟机进行调度,当该线程获得执行的机会时,就会调用run方法执行线程。
     163 
     164 -----------------------
     165 1.3.2 线程的执行顺序
     166 e.g:
     167 # 线程的执行顺序
     168   #重复封装类
     169 def Function_Test_A():
     170     for var in range(10):
     171         thread_name_local = Class_MyThread()
     172         thread_name_local.start()
     173 if __name__ == "__main__":
     174     Function_Test_A()
     175     
     176 result:
     177 从执行结果可以看出,多线程程序的执行顺序是不确定的
     178 当执行到sleep语句时,线程将会被阻塞(Blocked),到sleep结束后,
     179 线程进入就绪(Runnable)状态,等待调度,而线程调度就讲自行选择一个线程执行。
     180 上面代码可以保证运行完成的run,但线程启动顺序和run函数中每次循环的执行顺序都不确定。
     181 
     182 总结
     183 1) 每个线程一定会有一个name,尽管示例中没有指定线程对象的name,
     184 但是python会自动为线程指定一个名字。
     185 2)当线程的run函数结束时,该线程完成
     186 3)无法控制线程调度程序,但是可以通过别的方式来影响调度的方式
     187 4)线程的几种状态
     188 
     189      启动         调度        结束
     190 新建 -----> 就绪 <-----> 运行 -----> 死亡
     191                         /
     192          满足条件       等待条件
     193                      /
     194                等待(阻塞) 
     195 
     196 
     197 -----------------------
     198 1.4.1 多线程-共享全局变量
     199 
     200 e.g:
     201 def Function_Work_A():
     202     global global_var_number_A
     203 
     204     for var in range(10):
     205         global_var_number_A += 1
     206     print("work A ,number is %d" % (global_var_number_A))
     207 
     208 
     209 def Function_Work_B():
     210     global global_var_number_A
     211 
     212     print("work B ,number is %d" % (global_var_number_A))
     213 
     214 if __name__ == "__main__":
     215     global_var_number_A = 100
     216 
     217     print("创建线程之前,number is %d" %(global_var_number_A))
     218     thread_name_A = threading.Thread(target = Function_Work_A )
     219     thread_name_A.start()
     220     time.sleep(1)
     221 
     222     thread_name_B = threading.Thread(target = Function_Work_B )
     223     thread_name_B.start()
     224 
     225 result:
     226     创建线程之前,number is 100
     227     work A ,number is 110
     228     work B ,number is 110
     229 
     230 -----------------------
     231 1.4.2 列表当做实参传递到线程中
     232 e.g:
     233 # 列表当做实参传递到线程中
     234 def Function_Work_C(nums):
     235     local_var_number = nums
     236 
     237     local_var_number.append("CC")
     238     print("work C ,number is %s" % (local_var_number))
     239 
     240 
     241 def Function_Work_D(nums):
     242     local_var_number = nums
     243     time.sleep(1)
     244     print("work D ,number is %s" % (local_var_number))
     245 
     246 if __name__ == "__main__":
     247     global_nums = [11,22,33,44,55]
     248     thread_name_C = threading.Thread(target=Function_Work_C, args=(global_nums,))
     249     thread_name_C.start()
     250     thread_name_D = threading.Thread(target=Function_Work_D, args=(global_nums,))
     251     thread_name_D.start()
     252 
     253 result:
     254     work C ,number is [11, 22, 33, 44, 55, 'CC']
     255     work D ,number is [11, 22, 33, 44, 55, 'CC']
     256 总结:
     257 在一线程内的所有线程共享全局变量,能够在不适用其他方式的前提下,
     258 完成多线程之间的数据共享,这点要比多进程要好。
     259 缺点就是,线程是对全局变量随意遂改可能造成多线程之间,
     260 对全局变量的混乱,即线程非安全。
     261 
     262 -----------------------
     263 1.5 线程不安全  同步
     264 
     265 e.g:
     266 # 同步
     267 def Function_Test_B():
     268     global global_var_number
     269     for var in range(1000000):
     270         global_var_number += 1
     271     print("Test B ,number is %d" % (global_var_number))
     272 
     273 def Function_Test_C():
     274     global global_var_number
     275     for var in range(1000000):
     276         global_var_number += 1
     277     print("Test C ,number is %d" % (global_var_number))
     278 
     279 if __name__ == "__main__":
     280     global_var_number = 0
     281     thread_name_E = threading.Thread(target = Function_Test_B)
     282     thread_name_E.start()
     283     time.sleep(3)
     284     thread_name_F = threading.Thread(target = Function_Test_C)
     285     thread_name_F.start()
     286 
     287     print("number: %s" % (global_var_number))
     288 
     289 result:
     290     Test B ,number is 1000000
     291     number: 1059644
     292     Test C ,number is 2000000
     293 
     294 同步:就是协同步调,按照预定的先后次序进行运行。
     295 同指协同,协助,相互配合
     296 如,进程/线程同步,可以理解为进程/线程A和B一块配合,A执行到一定程度时
     297 要依靠B的某个结果,于是停下来,示意B运行,在将结果给A,A继续执行操作。
     298 
     299 解决问题:线程同步
     300 1.系统调用thread_name_E,然后获取到num的值为0,
     301 此时上一把锁,即不允许其他现在操作num
     302 2.对num的值进行+1
     303 3.解锁,此时num的值为1,其他的线程就可以使用num了,
     304 而且num的值是0而不是1
     305 4.同理,其他线程在对num进行修改时,都要先上锁,处理完成后再解锁。
     306 在上锁的整个过程中不允许其他线程访问,就保证了数据的正确性。
     307 
     308 -----------------------
     309 1.6 互斥锁
     310 
     311 当多个线程几乎同时修改某个共享数据的时候,需要进行同步控制。
     312 线程同步能够保证多个线程安全访问竞争资源,最简单的同步机制是引入互斥锁。
     313 
     314 互斥锁为资源引入一个状态: 锁定/非锁定
     315 
     316 某个线程要更改共享数据时,先将其锁定,此时资源的状态为锁定,
     317 其他线程不能更改,直到该线程释放资源,将资源的状态变成非锁定,
     318 其他的线程才能再次锁定该资源。
     319 互斥锁保证每次只有一个线程进行写入操作,从而保证了多线程情况下数据的正确性。
     320 
     321 
     322 threading模块定义Lock类,方便处理锁定:
     323 #创建锁
     324 mutex = threading.Lock()
     325 #锁定
     326 mutex.acquire([blocking])
     327 #释放
     328 mutex.release()
     329 
     330 在锁定方法中,acquire可以有个blocking参数
     331 如果设定blocking为True,则当前线程会阻塞,直到获得这个锁为止,如果没有指定,那么默认为True
     332 如果设定blocking为Fasle,则当前线程不会阻塞。
     333 
     334 
     335 上锁解锁过程:
     336 当一个线程调用锁的acpuire()方法获得解锁时,锁就进入Locked状态。
     337 每次只有一个线程可以获得锁。如果此时另外一个线程试图获得这个锁,
     338 该线程就会变为blocked状态,称为阻塞,直到拥有锁的线程调用锁release()方法释放后,锁进入unlocked状态
     339 线程调度程序从处于同步阻塞状态的线程中选择一个获得锁,并使得该线程进入运行(running)
     340 
     341 总结:
     342 锁的好处:
     343 确保了某段关键代码只能由一个线程从头到尾完整的执行
     344 锁的坏处:
     345 阻止了多线程并发执行,包含锁的某段代码实际上只能以单线程模式执行,效率降低了。
     346 由于可以存在多个锁,不同的线程持有不同的锁,并试图获得对方持有的锁,可能造成死锁。
     347 
     348 
     349 e.g:
     350 # 互斥锁
     351 def Function_Test_D():
     352     global global_var_number
     353     # Test_D和Test_F线程都在抢着上锁,对这个锁,如果一方上锁成功,
     354     # 导致另一方会堵塞(一直等待)到这个锁被解锁为止
     355     mutexFlag = mutex.acquire(True)
     356     for var in range(1000000):
     357         # True 表示堵塞,如果这个锁在上锁之前已经被上锁了,那么这个线程会在
     358         #False 表示非堵塞,即不管本次调用能够成功上锁,都会卡在这,而继续
     359         if mutexFlag:
     360             global_var_number += 1
     361     # 用来对mutex指向的这个锁,进行解锁,只要解锁了,那么接下来会让所有
     362     # 因为这个锁,被上了锁而堵塞的线程进行抢着上锁
     363     mutex.release()
     364 
     365     print("Test D ,number is %d" % (global_var_number))
     366 
     367 def Function_Test_E():
     368     global global_var_number
     369 
     370     for var in range(1000000):
     371         mutexFlag = mutex.acquire(True)
     372         if mutexFlag:
     373             global_var_number += 1
     374         mutex.release()
     375 
     376     print("Test E ,number is %d" % (global_var_number))
     377 
     378 def Function_Test_F():
     379     global global_var_number
     380     mutexFlag = mutex.acquire(True)
     381     for var in range(1000000):
     382         if mutexFlag:
     383             global_var_number += 1
     384     mutex.release()
     385 
     386     print("Test F ,number is %d" % (global_var_number))
     387 
     388 if __name__ == "__main__":
     389     global_var_number = 0
     390     # 创建锁
     391     mutex = threading.Lock()
     392 
     393     thread_name_D = threading.Thread(target=Function_Test_D)
     394     thread_name_D.start()
     395 
     396     thread_name_E = threading.Thread(target = Function_Test_E)
     397     thread_name_E.start()
     398 
     399     thread_name_F = threading.Thread(target = Function_Test_F)
     400     thread_name_F.start()
     401 
     402     print("number: %s" % (global_var_number))
     403     
     404 result:
     405 
     406 -----------------------
     407 1.7 多线程-非共享数据
     408 
     409 对于全局变量,在多线程中要格外小心,否则容易造成数据错乱的情况发生
     410 在多线程开发中,全局变量是多个线程都是共享的数据,而局部变量等是各自线程,是非共享的。
     411 
     412 e.g:
     413 class Class_My_Thread(threading.Thread):
     414     # 重写 构造方法
     415     def __init__(self,number,SleepTime):
     416         threading.Thread.__init__(self)
     417         self.num       = number
     418         self.SleepTime = SleepTime
     419 
     420     def run(self):
     421         self.num += 1  # 局部变量
     422         time.sleep(self.SleepTime)
     423         print("线程 (%s), number = (%s)" % (self.name,self.num))
     424 
     425 if __name__ == "__main__":
     426     # 创建锁
     427     mutex = threading.Lock()
     428     thread_name_G = Class_My_Thread(100, 3)
     429     thread_name_G.start()
     430 
     431     thread_name_H = Class_My_Thread(200, 1)
     432     thread_name_H.start()
     433 
     434     thread_name_I = Class_My_Thread(300, 2)
     435     thread_name_I.start()
     436     
     437 result:
     438     线程 (Thread-2), number = (201)
     439     线程 (Thread-3), number = (301)
     440     线程 (Thread-1), number = (101)
     441 局部变量是不共享数据的。
     442 
     443 -----------------------
     444 1.8  死锁
     445 
     446 在线程共享多个资源的时候,如果二个线程分别占有一部分资源
     447 并且同时等待对方的资源,就会造成死锁。
     448 尽管死锁很少发生,但一旦发生就会造成应用的停止响应。
     449 
     450 e.g:
     451 # 死锁
     452 class Class_My_Thread_A(threading.Thread):
     453     def run(self):
     454         if mutexA.acquire():
     455             print(self.name + "--do 1 up --")
     456             time.sleep(1)
     457 
     458             if mutexB.acquire():
     459                 print(self.name + "--do 1 down --")
     460                 mutexB.release()
     461             mutexA.release()
     462 
     463 
     464 class Class_My_Thread_B(threading.Thread):
     465     def run(self):
     466         if mutexB.acquire():
     467             print(self.name + "--do 2 up --")
     468             time.sleep(1)
     469 
     470             if mutexA.acquire():
     471                 print(self.name + "--do 2 down --")
     472                 mutexA.release()
     473             mutexB.release()
     474 
     475 if __name__ == "__main__":
     476     # 死锁  不能执行的效果
     477     mutexA = threading.Lock()
     478     mutexB = threading.Lock()
     479     thread_name_J = Class_My_Thread_A()
     480     thread_name_J.start()
     481     thread_name_K = Class_My_Thread_B()
     482     thread_name_K.start()
     483 
     484     print("...")
     485     
     486 result:
     487     #避免死锁
     488     # 程序设计时,要尽量避免,银行家算法
     489     # 添加超时时间等 acquire(timeout)很重要!!!
     490     
     491     
     492 -----------------------
     493 1.9.1 同步应用
     494 多线程有序执行
     495 
     496 可以使用互斥锁完成多个任务,有序的进程工作,这就是线程的同步
     497 
     498 e.g:
     499 # 多线程有序执行
     500 class Class_Task_C(threading.Thread):
     501     def run(self):
     502         while True:
     503             if mutexC.acquire():
     504                 print("--- Task C ---[%s]" % (self.name))
     505                 time.sleep(1)
     506                 mutexD.release()
     507 
     508 class Class_Task_D(threading.Thread):
     509     def run(self):
     510         while True:
     511             if mutexD.acquire():
     512                 print("--- Task D ---[%s]" % (self.name))
     513                 time.sleep(1)
     514                 mutexE.release()
     515 
     516 class Class_Task_E(threading.Thread):
     517     def run(self):
     518         while True:
     519             if mutexE.acquire():
     520                 print("--- Task F ---[%s]" % (self.name))
     521                 time.sleep(1)
     522                 mutexC.release()
     523                 
     524 if __name__ == "__main__":
     525     # 同步应用
     526     # 创建锁
     527     mutexC = threading.Lock()
     528 
     529     mutexD = threading.Lock()
     530     mutexD.acquire()
     531     mutexE = threading.Lock()
     532     mutexE.acquire()
     533 
     534     thread_name_L = Class_Task_C()
     535     thread_name_L.start()
     536 
     537     thread_name_M = Class_Task_D()
     538     thread_name_M.start()
     539 
     540     thread_name_N = Class_Task_E()
     541     thread_name_N.start()
     542 result:
     543     --- Task C ---[Thread-1]
     544     --- Task D ---[Thread-2]
     545     --- Task F ---[Thread-3]
     546     --- Task C ---[Thread-1]
     547     --- Task D ---[Thread-2]
     548     --- Task F ---[Thread-3]
     549     --- Task C ---[Thread-1]
     550     --- Task D ---[Thread-2]
     551 
     552 -----------------------
     553 2.0 Queue 队列
     554 
     555 Queue使用说明
     556 1) 对于Queue,在多线程通信之间扮演重要的角色
     557 2) 添加数据队列中,使用put()方法
     558 3) 从队列中获取数据,使用get()方法
     559 4) 判断队列中是否还有数据,使用qsize()方法
     560 
     561 生产者消费者模式说明:
     562 在线程世界里。生成者就是生产数据的线程,消费者就是消费数据的线程。
     563 在多线程开发中,如果生产者处理速度很快,而消费者处理速度很慢,
     564 那么生成者就必须等待消费者处理完,才能继续生产数据。同样道理
     565 如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。
     566 为了解决这个问题引入了生产者和消费者模式。
     567 
     568 生产者消费者模式,是通过一个容器来解决生产者和消费者的强耦合问题。
     569 生产者和消费者之间彼此之间不直接通讯,而通过阻塞队列来进行通讯,
     570 所有生产者生产数据之后不用等待消费者处理,直接扔给阻塞队列,
     571 消费者不找生产者要数据,而是直接从阻塞队列里取出数据,
     572 阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
     573 这个阻塞队列就是用来给生产者和消费者解耦的。
     574 纵观大多数设计模式,都会找到一个第三者出来进行解耦。
     575 
     576 
     577 e.g:
     578 class Class_Producer_F(threading.Thread):
     579     def run(self):
     580         # global queue
     581         var_count = 0
     582 
     583         while True:
     584             if queue_name.qsize() < 1000 :
     585                 # 每次就生产100个商品,二个线程做个事
     586                 for var in range(100):
     587                     var_count += 1
     588                     message = "生成产品" + str(var_count)
     589                     queue_name.put(message)
     590                     # print("-- Producer F --[%s]" % (self.name))
     591             time.sleep(0.5)
     592 
     593 class Class_Consumer_G(threading.Thread):
     594     def run(self):
     595         # global queue
     596 
     597         while True:
     598             if queue_name.qsize() > 100:
     599                 # 若队列数量大于100,每个线程就取出3个商品,五个线程做这个事
     600                 for var in range(3):
     601                     message = self.name + "消费了" + queue_name.get()
     602                     print("message: (%s)",message)
     603                     # print("-- Consumer G --[%s]" % (self.name))
     604             time.sleep(1)
     605 
     606 if __name__ == "__main__":
     607         # #-----------------------
     608     # Queue  提供了同步、线程安全的队列类
     609     # 队列原则: 先进先出  FIFO
     610     # 栈的原则: 先进后出  LIFO
     611     # 这些队列都实现了锁原语,原子操作
     612     #创建队列 ,这个队列只能用于线程,而进程中不能使用
     613     queue_name = Queue() # 队列
     614     # 队列初创
     615     for var in range(500):
     616         queue_name.put("》》初始产品"+ str(var))
     617     # 创建二次线程 producer
     618     for var in range(2):
     619         producer_name = Class_Producer_F()
     620         producer_name.start()
     621     # 创建二次线程 consumer
     622     for var in range(5):
     623         producer_name = Class_Consumer_G()
     624         producer_name.start()
     625     # 2 + 5 +1(主线程) = 8 (总线程)
     626 result:
     627 
     628 -----------------------
     629 2.1 ThreadLocal
     630 
     631 ThreadLocal不用查找dict,ThreadLocal帮你自动的做这个事:
     632 全局变量Local_school就是一个ThreadLocal对象,
     633 每个Thread对他都可以读写student属性,但互不影响。
     634 你可以把ThreadLocal看出全局变量,但每个属性,
     635 如,Local_school.student都是线程的局部变量,
     636 可以任意读写而互不干扰,也不用管理锁的问题,ThreadLocal内部会处理。
     637 
     638 可以理解为全局变量local_school是dict,不但可以用Local_school.student,
     639 还可以绑定其他变量,如Local_school.teacher等。
     640 
     641 ThreadLocal最常用的地方就是为每个线程绑定一个数据库连接,
     642 HTTP请求,用户身份信息,这样一个线程的所有调用到的处理函数
     643 都可以非常方便的访问这些资源。
     644 
     645 总结:
     646 ThreadLocal变量虽然是全局变量,但是每个线程都只能读写自己线程的独立副本,
     647 互不干扰,ThreadLocal解决了参数在一个线程中各个函数之间互相传递的问题。
     648 
     649 
     650 e.g:
     651 # ThreadLocal
     652 def Function_Process_student():
     653     # 获取当前线程关联的student
     654     student_name = local_school.student
     655     print("%s: (in %s)"% (student_name,threading.current_thread()))
     656 
     657 def Function_Process_thread(name):
     658     local_var_name =name
     659 
     660     # 绑定ThreadLocal的student
     661     local_school.student = local_var_name
     662     Function_Process_student()
     663 
     664 if __name__ == "__main__":
     665     # ThreadLocal
     666     # 创建全局ThreadLocal对象
     667     local_school = threading.local()
     668 
     669     thread_name_O = threading.Thread(target=Function_Process_thread, args=('图形分析',), name='Thread-A')
     670     thread_name_O.start()
     671     thread_name_O.join()
     672 
     673     thread_name_P = threading.Thread(target = Function_Process_thread, args=('数据处理',), name='Thread-B')
     674     thread_name_P.start()
     675     thread_name_P.join()
     676 
     677     thread_name_Q = threading.Thread(target = Function_Process_thread, args=('信号处理',), name='Thread-C')
     678     thread_name_Q.start()
     679     thread_name_Q.join()
     680 result:
     681 
     682 -----------------------
     683 2.2 异步
     684 
     685 e.g:
     686 def Function_Test_G():
     687     print("进程池中的进程PID=[%s],PPID=[%s]" % (os.getpid(),os.getppid()))
     688     for var in range(3):
     689         print("-- [%s]" % (var))
     690         time.sleep(1)
     691     return "Function_Test_G"
     692 
     693 def Function_Test_H(args):
     694     print("callback func PID =[%s]" % (os.getpid()))
     695     print("callback func args=[%s]" % (args))
     696 
     697 if __name__ == "__main__":
     698     # 异步
     699     #创建进程池
     700     pool_name = Pool(3)
     701     # callback 回调函数 子线程结束后,立刻执行回调函数
     702     pool_name.apply_async(func = Function_Test_G,callback= Function_Test_H )
     703 
     704     # 异步的理解:主进程正在做某件事情,
     705     # 突然来了一件更需要立刻去做的事情,
     706     # 那么这种,在父进程去做某件事情的时候 
     707     # 并不知道是什么时候去做,的模式 就称为异步
     708     while True:
     709         time.sleep(1)
     710         print("主进程 PID = [%s]" % (os.getpid()))
     711 result:
     712     进程池中的进程PID=[69792],PPID=[71624]
     713     -- [0]
     714     主进程 PID = [71624]
     715     -- [1]
     716     主进程 PID = [71624]
     717     -- [2]
     718     主进程 PID = [71624]
     719     callback func PID =[71624]
     720     callback func args=[Function_Test_G]
     721     主进程 PID = [71624]
     722     主进程 PID = [71624]
     723     主进程 PID = [71624]
     724     
     725 -----------------------
     726 
     727 ----------------------------------------------
     728 
     729 ============================================================================
     730 '''
     731 
     732 #
     733 '''
     734 # ============================================================================
     735 # Function:  
     736 # Explain :  输入参数   
     737 #         :  输出参数  
     738 # ============================================================================
     739 '''
     740 # #-----------------------
     741 # 单 / 多 线程执行
     742 
     743 def Function_Say_Sorry():
     744     print("A say: Sorry !")
     745     time.sleep(1)
     746 
     747 # #-----------------------
     748 def Function_Sing():
     749     for var in range(10):
     750         print("正在唱歌 %d" % var)
     751         time.sleep(1)
     752 
     753 def Function_Dance():
     754     for var in range(5):
     755         print("正在跳舞 %d" % var)
     756         time.sleep(1)
     757 
     758 # #-----------------------
     759 #线程执行代码的封装
     760 class Class_MyThread(threading.Thread):
     761     def run(self):
     762         for var in range(10):
     763             time.sleep(1)
     764             message = "I am " + self.name + " @ " + str(var)
     765             print("message: %s" % (message))
     766 
     767 # #-----------------------
     768 # 线程的执行顺序
     769   #重复封装类
     770 def Function_Test_A():
     771     for var in range(10):
     772         thread_name_local = Class_MyThread()
     773         thread_name_local.start()
     774 
     775 # #-----------------------
     776 # 多线程-共享全局变量
     777 
     778 def Function_Work_A():
     779     global global_var_number_A
     780 
     781     for var in range(10):
     782         global_var_number_A += 1
     783     print("work A ,number is %d" % (global_var_number_A))
     784 
     785 
     786 def Function_Work_B():
     787     global global_var_number_A
     788 
     789     print("work A ,number is %d" % (global_var_number_A))
     790 
     791 # #-----------------------
     792 # 列表当做实参传递到线程中
     793 def Function_Work_C(nums):
     794     local_var_number = nums
     795 
     796     local_var_number.append("CC")
     797     print("work C ,number is %s" % (local_var_number))
     798 
     799 
     800 def Function_Work_D(nums):
     801     local_var_number = nums
     802     time.sleep(1)
     803     print("work D ,number is %s" % (local_var_number))
     804 
     805 
     806 # #-----------------------
     807 # 同步
     808 def Function_Test_B():
     809     global global_var_number
     810     for var in range(1000000):
     811         global_var_number += 1
     812     print("Test B ,number is %d" % (global_var_number))
     813 
     814 def Function_Test_C():
     815     global global_var_number
     816     for var in range(1000000):
     817         global_var_number += 1
     818     print("Test C ,number is %d" % (global_var_number))
     819 
     820 # #-----------------------
     821 # 互斥锁
     822 def Function_Test_D():
     823     global global_var_number
     824     # Test_D和Test_F线程都在抢着上锁,对这个锁,如果一方上锁成功,
     825     # 导致另一方会堵塞(一直等待)到这个锁被解锁为止
     826     mutexFlag = mutex.acquire(True)
     827     for var in range(1000000):
     828         # True 表示堵塞,如果这个锁在上锁之前已经被上锁了,那么这个线程会在
     829         #False 表示非堵塞,即不管本次调用能够成功上锁,都会卡在这,而继续
     830         if mutexFlag:
     831             global_var_number += 1
     832     # 用来对mutex指向的这个锁,进行解锁,只要解锁了,那么接下来会让所有
     833     # 因为这个锁,被上了锁而堵塞的线程进行抢着上锁
     834     mutex.release()
     835 
     836     print("Test D ,number is %d" % (global_var_number))
     837 
     838 def Function_Test_E():
     839     global global_var_number
     840 
     841     for var in range(1000000):
     842         mutexFlag = mutex.acquire(True)
     843         if mutexFlag:
     844             global_var_number += 1
     845         mutex.release()
     846 
     847     print("Test E ,number is %d" % (global_var_number))
     848 
     849 def Function_Test_F():
     850     global global_var_number
     851     mutexFlag = mutex.acquire(True)
     852     for var in range(1000000):
     853         if mutexFlag:
     854             global_var_number += 1
     855     mutex.release()
     856 
     857     print("Test F ,number is %d" % (global_var_number))
     858 
     859 # #-----------------------
     860 # 多线程-非共享数据
     861 class Class_My_Thread(threading.Thread):
     862     # 重写 构造方法
     863     #   1. 全局变量在多个线程中 共享,为了保证正确运行需要锁
     864     #   2. 非全局变量在每个线程中 各有一份,不会共享,当然了不需要加锁
     865     def __init__(self,number,SleepTime):
     866         threading.Thread.__init__(self)
     867         self.num       = number
     868         self.SleepTime = SleepTime
     869 
     870     def run(self):
     871         self.num += 1  # 局部变量
     872         time.sleep(self.SleepTime)
     873         print("线程 (%s), number = (%s)" % (self.name,self.num))
     874 
     875 # #-----------------------
     876 # 死锁
     877 class Class_My_Thread_A(threading.Thread):
     878 
     879     def run(self):
     880         if mutexA.acquire():
     881             print(self.name + "--do 1 up --")
     882             time.sleep(1)
     883 
     884             if mutexB.acquire(True,3):
     885                 print(self.name + "--do 1 down --")
     886                 mutexB.release()
     887             mutexA.release()
     888 
     889 class Class_My_Thread_B(threading.Thread):
     890 
     891     def run(self):
     892         if mutexB.acquire(True,4):
     893             print(self.name + "--do 2 up --")
     894             time.sleep(1)
     895 
     896             if mutexA.acquire():
     897                 print(self.name + "--do 2 down --")
     898                 mutexA.release()
     899             mutexB.release()
     900 
     901 # #-----------------------
     902 # 多线程有序执行
     903 class Class_Task_C(threading.Thread):
     904     def run(self):
     905         while True:
     906             if mutexC.acquire():
     907                 print("--- Task C ---[%s]" % (self.name))
     908                 time.sleep(1)
     909                 mutexD.release()
     910 
     911 class Class_Task_D(threading.Thread):
     912     def run(self):
     913         while True:
     914             if mutexD.acquire():
     915                 print("--- Task D ---[%s]" % (self.name))
     916                 time.sleep(1)
     917                 mutexE.release()
     918 
     919 class Class_Task_E(threading.Thread):
     920     def run(self):
     921         while True:
     922             if mutexE.acquire():
     923                 print("--- Task E ---[%s]" % (self.name))
     924                 time.sleep(1)
     925                 mutexC.release()
     926 # #-----------------------
     927 # FIFO 队列  生产者消费模式
     928 class Class_Producer_F(threading.Thread):
     929     def run(self):
     930         # global queue
     931         var_count = 0
     932 
     933         while True:
     934             if queue_name.qsize() < 1000 :
     935                 # 每次就生产100个商品,二个线程做个事
     936                 for var in range(100):
     937                     var_count += 1
     938                     message = "生成产品" + str(var_count)
     939                     queue_name.put(message)
     940                     # print("-- Producer F --[%s]" % (self.name))
     941             time.sleep(0.5)
     942 
     943 class Class_Consumer_G(threading.Thread):
     944     def run(self):
     945         # global queue
     946 
     947         while True:
     948             if queue_name.qsize() > 100:
     949                 # 若队列数量大于100,每个线程就取出3个商品,五个线程做这个事
     950                 for var in range(3):
     951                     message = self.name + "消费了" + queue_name.get()
     952                     print("message: (%s)",message)
     953                     # print("-- Consumer G --[%s]" % (self.name))
     954             time.sleep(1)
     955 
     956 # #-----------------------
     957 # ThreadLocal
     958 def Function_Process_student():
     959     # 获取当前线程关联的student
     960     student_name = local_school.student
     961     print("%s: (in %s)"% (student_name,threading.current_thread()))
     962 
     963 def Function_Process_thread(name):
     964     local_var_name =name
     965 
     966     # 绑定ThreadLocal的student
     967     local_school.student = local_var_name
     968     Function_Process_student()
     969 
     970 # #-----------------------
     971 # 异步
     972 def Function_Test_G():
     973     print("进程池中的进程PID=[%s],PPID=[%s]" % (os.getpid(),os.getppid()))
     974     for var in range(3):
     975         print("-- [%s]" % (var))
     976         time.sleep(1)
     977     return "Function_Test_G"
     978 
     979 def Function_Test_H(args):
     980     print("callback func PID =[%s]" % (os.getpid()))
     981     print("callback func args=[%s]" % (args))
     982 
     983 # #-----------------------
     984 
     985 # #-----------------------
     986 
     987 # ============================================================================
     988 '''
     989 # ============================================================================
     990 #   测试专用
     991 # ============================================================================
     992 '''
     993 if __name__ == "__main__":
     994     # #-----------------------
     995     # # 单线程执行
     996     # for var in range(10):
     997     #     Function_Say_Sorry()
     998 
     999 
    1000 
    1001     # # -----------------------
    1002     # # 多线程执行
    1003     # for var in range(10):
    1004     #     # 创建线程
    1005     #     thread_name = threading.Thread(target = Function_Say_Sorry)
    1006     #     thread_name.start()
    1007 
    1008 
    1009 
    1010     # # # -----------------------
    1011     # # 主线程等待子线程结束而结束
    1012     # print("--- start ---:
     %s" % (time.time()))
    1013     #
    1014     # time_A = threading.Thread(target = Function_Sing)
    1015     # time_B = threading.Thread(target = Function_Dance)
    1016     # time_A.start()
    1017     # time_B.start()
    1018     # # time.sleep(5) #
    1019     # print("--- finish ---:
    %s" % (time.time()))
    1020     #
    1021     # # 查看线程数量
    1022     # while True:
    1023     #     length = len(threading.enumerate())
    1024     #     print("当前运行的线程数为: [%d] " % (length))
    1025     #     if length <= 1 :
    1026     #         break
    1027     #     time.sleep(0.5)
    1028 
    1029 
    1030 
    1031     # # # -----------------------
    1032     # #线程执行代码的封装
    1033     # thread_name = Class_MyThread()
    1034     # thread_name.start()
    1035 
    1036 
    1037 
    1038     # # # -----------------------
    1039     # #线程执行顺序
    1040     # Function_Test_A()
    1041 
    1042 
    1043 
    1044     # # -----------------------
    1045     # 多线程-共享全局变量
    1046     # global_var_number_A = 100
    1047     #
    1048     # print("创建线程之前,number is %d" %(global_var_number_A))
    1049     # thread_name_A = threading.Thread(target = Function_Work_A )
    1050     # thread_name_A.start()
    1051     # time.sleep(1)
    1052     #
    1053     # thread_name_B = threading.Thread(target = Function_Work_B )
    1054     # thread_name_B.start()
    1055 
    1056 
    1057 
    1058     # # -----------------------
    1059     # 列表当做实参传递到线程中
    1060     # global_nums = [11,22,33,44,55]
    1061     # thread_name_C = threading.Thread(target=Function_Work_C, args=(global_nums,))
    1062     # thread_name_C.start()
    1063     # thread_name_D = threading.Thread(target=Function_Work_D, args=(global_nums,))
    1064     # thread_name_D.start()
    1065 
    1066 
    1067 
    1068     # # # -----------------------
    1069     # # 同步
    1070     # global_var_number = 0
    1071     # thread_name_E = threading.Thread(target = Function_Test_B)
    1072     # thread_name_E.start()
    1073     # time.sleep(3)
    1074     # thread_name_F = threading.Thread(target = Function_Test_C)
    1075     # thread_name_F.start()
    1076     #
    1077     # print("number: %s" % (global_var_number))
    1078 
    1079 
    1080 
    1081     # # -----------------------
    1082     # 互斥锁
    1083     # global_var_number = 0
    1084     # # 创建锁
    1085     # mutex = threading.Lock()
    1086     #
    1087     # thread_name_D = threading.Thread(target=Function_Test_D)
    1088     # thread_name_D.start()
    1089     #
    1090     # thread_name_E = threading.Thread(target = Function_Test_E)
    1091     # thread_name_E.start()
    1092     #
    1093     # thread_name_F = threading.Thread(target = Function_Test_F)
    1094     # thread_name_F.start()
    1095     #
    1096     # print("number: %s" % (global_var_number))
    1097 
    1098 
    1099     # # # -----------------------
    1100     # # 多线程-非共享数据
    1101     # # 创建锁
    1102     # mutex = threading.Lock()
    1103     # thread_name_G = Class_My_Thread(100, 3)
    1104     # thread_name_G.start()
    1105     #
    1106     # thread_name_H = Class_My_Thread(200, 1)
    1107     # thread_name_H.start()
    1108     #
    1109     # thread_name_I = Class_My_Thread(300, 2)
    1110     # thread_name_I.start()
    1111 
    1112 
    1113 
    1114     # # # -----------------------
    1115     # # 死锁  不能执行的效果
    1116     # mutexA = threading.Lock()
    1117     # mutexB = threading.Lock()
    1118     # thread_name_J = Class_My_Thread_A()
    1119     # thread_name_J.start()
    1120     # thread_name_K = Class_My_Thread_B()
    1121     # thread_name_K.start()
    1122     #
    1123     # print("...")
    1124     # #避免死锁
    1125     # # 程序设计时,要尽量避免,银行家算法
    1126     # # 添加超时时间等 acquire(timeout)很重要!!!
    1127 
    1128 
    1129     # # #-----------------------
    1130     # # 同步应用
    1131     # # 创建锁
    1132     # mutexC = threading.Lock()
    1133     #
    1134     # mutexD = threading.Lock()
    1135     # mutexD.acquire()
    1136     # mutexE = threading.Lock()
    1137     # mutexE.acquire()
    1138     #
    1139     # thread_name_L = Class_Task_C()
    1140     # thread_name_L.start()
    1141     #
    1142     # thread_name_M = Class_Task_D()
    1143     # thread_name_M.start()
    1144     #
    1145     # thread_name_N = Class_Task_E()
    1146     # thread_name_N.start()
    1147 
    1148 
    1149     # # #-----------------------
    1150     # # Queue  提供了同步、线程安全的队列类
    1151     # # 队列原则: 先进先出  FIFO
    1152     # # 栈的原则: 先进后出  LIFO
    1153     # # 这些队列都实现了锁原语,原子操作
    1154     # #创建队列 ,这个队列只能用于线程,而进程中不能使用
    1155     # queue_name = Queue() # 队列
    1156     # # 队列初创
    1157     # for var in range(500):
    1158     #     queue_name.put("》》初始产品"+ str(var))
    1159     # # 创建二次线程 producer
    1160     # for var in range(2):
    1161     #     producer_name = Class_Producer_F()
    1162     #     producer_name.start()
    1163     # # 创建二次线程 consumer
    1164     # for var in range(5):
    1165     #     producer_name = Class_Consumer_G()
    1166     #     producer_name.start()
    1167     # # 2 + 5 +1(主线程) = 8 (总线程)
    1168 
    1169 
    1170 
    1171     # # #-----------------------
    1172     # # ThreadLocal
    1173     # # 创建全局ThreadLocal对象
    1174     # local_school = threading.local()
    1175     # thread_name_O = threading.Thread(target=Function_Process_thread, args=('图形分析',), name='Thread-A')
    1176     # thread_name_O.start()
    1177     # thread_name_O.join()
    1178     # thread_name_P = threading.Thread(target = Function_Process_thread, args=('数据处理',), name='Thread-B')
    1179     # thread_name_P.start()
    1180     # thread_name_P.join()
    1181     #
    1182     # thread_name_Q = threading.Thread(target = Function_Process_thread, args=('信号处理',), name='Thread-C')
    1183     # thread_name_Q.start()
    1184     # thread_name_Q.join()
    1185 
    1186 
    1187     # # #-----------------------
    1188     # # 异步
    1189     # #创建进程池
    1190     # pool_name = Pool(3)
    1191     # # callback 回调函数 子线程结束后,立刻执行回调函数
    1192     # pool_name.apply_async(func = Function_Test_G,callback= Function_Test_H )
    1193     #
    1194     # # 异步的理解:主进程正在做某件事情,
    1195     # # 突然来了一件更需要立刻去做的事情,
    1196     # # 那么这种,在父进程去做某件事情的时候 
    1197     # # 并不知道是什么时候去做,的模式 就称为异步
    1198     # while True:
    1199     #     time.sleep(1)
    1200     #     print("主进程 PID = [%s]" % (os.getpid()))
    1201 
    1202     print("learn finish")
    1203     

    --

    -----------------------------------------------------

  • 相关阅读:
    ORACLE 查看进程数,已执行任务数, 剩余任务数,删除指定任务
    ORACLE 收集统计整个用户数据
    解决Hystrix dashboard Turbine 一直 Loading…… 及其他坑
    利用 Maven 构造 Spring Cloud 微服务架构 模块使用 spring Boot构建
    AES加解密
    JAVA POI XSSFWorkbook导出扩展名为xlsx的Excel,附带weblogic 项目导出Excel文件错误的解决方案
    JAVA 文件的上传下载
    shell启停服务脚本模板
    JAVA 设计模式之 原型模式详解
    JAVA 设计模式之 工厂模式详解
  • 原文地址:https://www.cnblogs.com/caochucheng/p/9974843.html
Copyright © 2011-2022 走看看