zoukankan      html  css  js  c++  java
  • python--线程知识详解

    Threading用于提供线程相关的操作,线程是应用程序中工作的最小单元。

    1.1、threading模块

    threading模块建立在_thread模块之上。thread模块以低级=原始的方式来处理和控制线程,而threading模块
    通过对thread进行二次封装,提供了更方便的api来处理线程。

    简单的线程实例:
    创建了20个“前台”线程,然后控制器就交给了CPU,CPU根据指定算法进行调度,分片执行指令

     1 #!/usr/bin/env python
     2 # -*- coding:utf-8 -*-
     3 """
     4 创建一个简单的threading线程实例
     5 """
     6 import threading
     7 import time
     8 
     9 def to_worker(num):
    10     """
    11     线程方法
    12     :param num:
    13     :return:
    14     """
    15     time.sleep(1)
    16     print("The num is %s" % num)
    17     return
    18 
    19 for i in range(5):
    20     t = threading.Thread(target=to_worker, args=(i, ))
    21     t.start()  #激活线程

    代码执行结果:

    1.2、创建线程的构造方法

    t = threading.Thread(group = None, target = None, name = Nome, args = 0, kwargs = {})

    注释说明:
    group --线程组
    target --要执行的方法
    name --线程名
    args/kwargs -要传入方法的参数

    Thread类提供了以下方法:
    1、t.start() --激活线程
    2、t.getName() --获取线程的名称
    3、t.setName() --设置线程的名称
    4、t.name() --获取或设置线程的名称
    5、t.is_alive() --判断线程是否为激活状态
    6、t.isAlive() --判断线程是否为激活状态
    7、t.setDaemon() --设置为后台线程或前台线程(默认:False)
    8、t.isDaemon() --判断是否为守护线程
    9、t.ident() --获取线程的标识符。线程标识符是一个非零整数,只有在调用start()方法后,该属性才有效,否则它只返回None
    10、t.join() --逐个执行每个线程,执行完毕后继续往下执行,该方法使得多线程变得无意义。
    11、t.run() --线程被cpu调度后自动执行线程对象的run方法

    1.3、python线程锁

    当有一个数据有多个线程对其进行修改的时候,任何一个线程改变他都会对其他线程造成影响,如果我们想某一个线程在使用完之前,其他线程不能对其修改,就需要对这个线程加一个线程锁。

    简单的线程锁实例:

     1 #!/usr/bin/env python
     2 # -*- coding:utf-8 -*-
     3 """
     4 线程锁小实例
     5 """
     6 import threading
     7 import time
     8 
     9 globals_num = 0
    10 
    11 lock = threading.RLock()
    12 
    13 def Func():
    14     lock.acquire()  #获取锁
    15     global globals_num
    16     globals_num += 1
    17     time.sleep(1)
    18     print(globals_num)
    19     lock.release()  #释放锁
    20 
    21 for i in range(10):
    22     t = threading.Thread(target=Func)
    23     t.start()

    代码执行结果:

    threading.RLock和threading.Lock 的区别
    RLock允许在同一线程中被多次acquire。而Lock却不允许这种情况。 如果使用RLock,那么acquire和release必须成对出现,即调用了n次acquire,必须调用n次的release才能真正释放所占用的琐。

     1 import threading
     2 lock = threading.Lock()    #Lock对象
     3 lock.acquire()
     4 lock.acquire()  #产生了死琐。
     5 lock.release()
     6 lock.release() 
     7 
     8 
     9 
    10 import threading
    11 rLock = threading.RLock()  #RLock对象
    12 rLock.acquire()
    13 rLock.acquire()    #在同一线程内,程序不会堵塞。
    14 rLock.release()
    15 rLock.release()

    1.4、threading.Event

    1、python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法:set、wait、clear。
    2、事件处理的机制:全局定义了一个Flag,如果Flag值为False,那么当程序执行event.wait方法时就会阻塞,
    如果Flag值为True,那么event.wait方法时便不再阻塞。

    方法说明:
    clear --将Flag设置为False
    set -- 将Flag设置为True
    Event.isSet() --判断标识符是否为True


    threading.Event简单实例:
    当线程执行的时候,如果flag为False,则线程会阻塞,当flag为True的时候,线程不会阻塞。它提供了本地和远程的并发性。

     1 #!/usr/bin/env python
     2 # -*- coding:utf-8 -*-
     3 """
     4 threading.Event事件实例
     5 """
     6 import threading
     7 
     8 def do(event):
     9     print("start.....")
    10     event.wait()
    11     print("execuse...")
    12 
    13 event_obj = threading.Event()
    14 
    15 for i in range(5):
    16     t = threading.Thread(target=do, args=(event_obj,))
    17     t.start()
    18 
    19 event_obj.clear()
    20 inp = input('input:(true) ')
    21 if inp == "true":
    22     event_obj.set()

    代码执行结果:

    1.5、threading.Condition(条件变量)

    示例说明:当小伙伴a在往火锅里面添加鱼丸,这个就是生产者行为;另外一个小伙伴b在吃掉鱼丸就是消费者行为。当火锅里面鱼丸达到一定数量加满后b才能吃,这就是一种条件判断了。

    Condition(条件变量)通常与一个锁关联。需要在多个Contidion中共享一个锁时,可以传递一个Lock/RLock实例给构造方法,否则它将自己生成一个RLock实例。

    可以认为,除了Lock带有的锁定池外,Condition还包含一个等待池,池中的线程处于状态图中的等待阻塞状态,直到另一个线程调用notify()/notifyAll()通知;得到通知后线程进入锁定池等待锁定。

    Condition():

    acquire(): 线程锁
    release(): 释放锁
    wait(timeout): 线程挂起,直到收到一个notify通知或者超时(可选的,浮点数,单位是秒s)才会被唤醒继续运行。wait()必须在已获得Lock前提下才能调用,否则会触发RuntimeError。
    notify(n=1): 通知其他线程,那些挂起的线程接到这个通知之后会开始运行,默认是通知一个正等待该condition的线程,最多则唤醒n个等待的线程。notify()必须在已获得Lock前提下才能调用,否则会触发RuntimeError。notify()不会主动释放Lock。
    notifyAll(): 如果wait状态线程比较多,notifyAll的作用就是通知所有线程。


    生产者与消费者示例:
    现实场景:当a同学王火锅里面添加鱼丸加满后(最多3个,加满后通知b去吃掉),通知b同学去吃掉鱼丸(吃到0的时候通知a同学继续添加)

     1 #!/usr/bin/env python
     2 # -*- coding:utf-8 -*-
     3 """
     4 threading.Condition
     5 """
     6 import threading
     7 import time
     8 
     9 con = threading.Condition()
    10 
    11 num = 0
    12 
    13 #生产者
    14 class Producer(threading.Thread):
    15 
    16     def __init__(self):
    17         threading.Thread.__init__(self)
    18 
    19     def run(self):
    20         #锁定线程
    21         global  num
    22         con.acquire()  #获取锁
    23         while True:
    24             print("a同学开始添加......")
    25             num += 1
    26             print("锅里丸子个数为:%s" % str(num))
    27             time.sleep(1)
    28             if num >= 3:
    29                 print("丸子个数已经达到3个了,无法添加。")
    30                 #唤醒等待的线程
    31                 con.notify()  #唤醒同学开吃
    32                 #等待通知
    33                 con.wait()
    34 
    35         #释放锁
    36         con.release()
    37 
    38 #消费者
    39 class Consumers(threading.Thread):
    40     def __init__(self):
    41         threading.Thread.__init__(self)
    42 
    43     def run(self):
    44         con.acquire()
    45         global num
    46         while True:
    47             print("我准备开吃了...")
    48             num -= 1
    49             print("锅里丸子数量为:%s" % str(num))
    50             time.sleep(2)
    51             if num <= 0:
    52                 print("丸子吃完了,赶紧添加啦..")
    53                 con.notify()  #唤醒等待的线程
    54                 #等待通知
    55                 con.wait()
    56         con.release()  #释放锁
    57 
    58 p = Producer()
    59 c = Consumers()
    60 p.start()
    61 c.start()

    代码执行结果:

    1.6、Queue模块

    1.6.1、创建一个“队列”对象
    import queue
    q = queue.queue(maxsize = 10)
    queue.queue类即是一个队列的同步实现。队列长度可为无限或者有限。可通过queue的构造函数的可选参数maxsize来设定队列长度。如果maxsize小于1就表示队列长度无限。

    1.6.2、将一个值放入队列中
    q.put(10)
    调用队列对象的put()方法在队尾插入一个项目。put()有两个参数,第一个item为必需的,为插入项目的值;第二个block为可选参数,默认为
    1。如果队列当前为空且block为1,put()方法就使调用线程暂停,直到空出一个数据单元。如果block为0,put方法将引发Full异常。

    1.6.3、将一个值从队列中取出
    q.get()
    调用队列对象的get()方法从队头删除并返回一个项目。可选参数为block,默认为True。如果队列为空且block为True,get()就使调用线程暂停,直至有项目可用。如果队列为空且block为False,队列将引发Empty异常。

    1.6.4、Python queue模块有三种队列及构造函数:
    1、Python queue模块的FIFO队列先进先出。 class queue.queue(maxsize)
    2、LIFO类似于堆,即先进后出。 class queue.Lifoqueue(maxsize)
    3、还有一种是优先级队列级别越低越先出来。 class queue.Priorityqueue(maxsize)

    1.6.5、queue常用方法(q =queue.queue()):
    q.qsize() 返回队列的大小
    q.empty() 如果队列为空,返回True,反之False
    q.full() 如果队列满了,返回True,反之False
    q.full 与 maxsize 大小对应
    q.get([block[, timeout]]) 获取队列,timeout等待时间
    q.get_nowait() 相当q.get(False)
    非阻塞 q.put(item) 写入队列,timeout等待时间
    q.put_nowait(item) 相当q.put(item, False)
    q.task_done() 在完成一项工作之后,q.task_done() 函数向任务已经完成的队列发送一个信号
    q.join() 实际上意味着等到队列为空,再执行别的操作

    1.6.6、简单的queue实例:生产者-消费者模型

     1 #!/usr/bin/env python
     2 # -*- coding:utf-8 -*-
     3 """
     4 Queue队列
     5 """
     6 import queue
     7 import threading
     8 
     9 
    10 message = queue.Queue(10)
    11 
    12 def producer(i):
    13     while True:
    14         message.put(i)
    15 
    16 def consumer(i):
    17     while True:
    18         msg = message.get(i)
    19 
    20 for i in range(12):
    21     t = threading.Thread(target=producer, args=(i, ))
    22     t.start()
    23 for i in range(10):
    24     t = threading.Thread(target=consumer, args=(i, ))
    25     t.start()

    1.7、自定义线程池

    1.7.1、方法一:简单往队列中传输线程数

     1 #!/usr/bin/env python
     2 # -*- coding:utf-8 -*-
     3 """
     4 自定义线程池
     5 方法一:简单往队列中传输线程数
     6 """
     7 import threading
     8 import time
     9 import queue
    10 
    11 class ThreadingPool():
    12     def __init__(self, max_num = 10):
    13         self.queue = queue.Queue(max_num)
    14         for i in range(max_num):
    15             self.queue.put(threading.Thread)
    16 
    17     def getthreading(self):
    18         return self.queue.get()
    19 
    20     def addthreading(self):
    21         self.queue.put(threading.Thread)
    22 
    23 
    24 def func(p, i):
    25     time.sleep(1)
    26     print(i)
    27     p.addthreading()
    28 
    29 if __name__ == "__main__":
    30     p = ThreadingPool()
    31     for i in range(12):
    32         thread = p.getthreading()
    33         t = thread(target = func, args = (p, i))
    34         t.start()

    代码执行结果:

    1.7.2、方法二:往队列中无限添加任务

      1 #!/usr/bin/env python
      2 # -*- coding:utf-8 -*-
      3 """
      4 自定义线程池
      5 方法二:往队列中无限添加任务
      6 """
      7 import queue
      8 import threading
      9 import contextlib
     10 import time
     11 
     12 StopEvent = object()
     13 
     14 class ThreadPool(object):
     15 
     16     def __init__(self, max_num):
     17         self.q = queue.Queue()
     18         self.max_num = max_num
     19 
     20         self.treminal = False
     21         self.generate_list = []
     22         self.free_list = []
     23 
     24     def run(self, func, args, callback=None):
     25         """
     26         线程池执行一个任务
     27         :param func: 任务函数
     28         :param args: 任务函数所需参数
     29         :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数:1、任务函数执行状态;2、任务函数返回值(默认为None,即不执行回调函数)
     30         :return:如果线程池已经终止,则返回True,否则为None
     31         """
     32         if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
     33             self.generate_thread()
     34             w = (func, args, callback,)
     35             self.q.put(w)
     36 
     37     def generate_thread(self):
     38         """
     39         创建一个线程
     40         :param self:
     41         :return:
     42         """
     43         t = threading.Thread(target=self.call)
     44         t.start()
     45 
     46     def call(self):
     47         """
     48         循环去获取任务函数并执行任务函数
     49         :return:
     50         """
     51         current_thread = threading.currentThread
     52         self.generate_list.append(current_thread)
     53 
     54         event = self.q.get()  #获取线程
     55         while event != StopEvent: #判断获取的线程数不等于全局变量
     56             func, arguments, callback = event  #拆分元组, 获取执行函数,参数, 回调函数
     57             try:
     58                 result = func(*arguments) #执行函数
     59                 status = True
     60 
     61             except Exception as e: #函数执行失败
     62                 status = False
     63                 result = e
     64 
     65             if callback is not None:
     66                 try:
     67                     callback(status, result)
     68                 except Exception as e:
     69                     pass
     70 
     71             with self.work_state():
     72                 event = self.q.get()
     73 
     74         else:
     75             self.generate_list.remove(current_thread)
     76 
     77 
     78     def close(self):
     79         """
     80         关闭线程,给传输全局非元组的变量来进行关闭
     81         :return:
     82         """
     83         for i in range(len(self.generate_list)):
     84             self.q.put(StopEvent)
     85 
     86 
     87     def terminate(self):
     88         """
     89         突然关闭线程
     90         :return:
     91         """
     92         self.terminal = True
     93         while self.generate_list:
     94             self.q.put(StopEvent)
     95         self.q.empty()
     96 
     97 
     98     def work_state(self):
     99         self.free_list.append(threading.current_thread)
    100         try:
    101             yield
    102         finally:
    103             self.free_list.remove(threading.currentThread)
    104 
    105 def work(i):
    106     print(i)
    107     return i + 1 #返回给回调函数
    108 
    109 def callback(ret):
    110     print(ret)
    111 
    112 pool = ThreadPool(10)
    113 for item in range(50):
    114     pool.run(func=work, args=(item, ), callback=callback)
    115 
    116 pool.terminate()
  • 相关阅读:
    机器学习:以分析红酒口感为例说明交叉验证的套索模型
    机器学习:分类算法性能指标之ROC曲线
    机器学习:最小二乘法实际应用的一个完整例子
    机器学习:Python中如何使用支持向量机(SVM)算法
    机器学习:python中如何使用朴素贝叶斯算法
    机器学习:Python实现lms中的学习率的退火算法
    机器学习:Python实现最小均方算法(lms)
    @Autowired 与@Resource选择(治好你的强迫症)
    @Resource 进行注入bean的过程
    @Autowired 进行注入bean的过程
  • 原文地址:https://www.cnblogs.com/june-L/p/11795631.html
Copyright © 2011-2022 走看看