zoukankan      html  css  js  c++  java
  • python--线程知识详解

    Threading用于提供线程相关的操作,线程是应用程序中工作的最小单元。

    1.1、threading模块

    threading模块建立在_thread模块之上。thread模块以低级=原始的方式来处理和控制线程,而threading模块
    通过对thread进行二次封装,提供了更方便的api来处理线程。

    简单的线程实例:
    创建了20个“前台”线程,然后控制器就交给了CPU,CPU根据指定算法进行调度,分片执行指令

     1 #!/usr/bin/env python
     2 # -*- coding:utf-8 -*-
     3 """
     4 创建一个简单的threading线程实例
     5 """
     6 import threading
     7 import time
     8 
     9 def to_worker(num):
    10     """
    11     线程方法
    12     :param num:
    13     :return:
    14     """
    15     time.sleep(1)
    16     print("The num is %s" % num)
    17     return
    18 
    19 for i in range(5):
    20     t = threading.Thread(target=to_worker, args=(i, ))
    21     t.start()  #激活线程

    代码执行结果:

    1.2、创建线程的构造方法

    t = threading.Thread(group = None, target = None, name = Nome, args = 0, kwargs = {})

    注释说明:
    group --线程组
    target --要执行的方法
    name --线程名
    args/kwargs -要传入方法的参数

    Thread类提供了以下方法:
    1、t.start() --激活线程
    2、t.getName() --获取线程的名称
    3、t.setName() --设置线程的名称
    4、t.name() --获取或设置线程的名称
    5、t.is_alive() --判断线程是否为激活状态
    6、t.isAlive() --判断线程是否为激活状态
    7、t.setDaemon() --设置为后台线程或前台线程(默认:False)
    8、t.isDaemon() --判断是否为守护线程
    9、t.ident() --获取线程的标识符。线程标识符是一个非零整数,只有在调用start()方法后,该属性才有效,否则它只返回None
    10、t.join() --逐个执行每个线程,执行完毕后继续往下执行,该方法使得多线程变得无意义。
    11、t.run() --线程被cpu调度后自动执行线程对象的run方法

    1.3、python线程锁

    当有一个数据有多个线程对其进行修改的时候,任何一个线程改变他都会对其他线程造成影响,如果我们想某一个线程在使用完之前,其他线程不能对其修改,就需要对这个线程加一个线程锁。

    简单的线程锁实例:

     1 #!/usr/bin/env python
     2 # -*- coding:utf-8 -*-
     3 """
     4 线程锁小实例
     5 """
     6 import threading
     7 import time
     8 
     9 globals_num = 0
    10 
    11 lock = threading.RLock()
    12 
    13 def Func():
    14     lock.acquire()  #获取锁
    15     global globals_num
    16     globals_num += 1
    17     time.sleep(1)
    18     print(globals_num)
    19     lock.release()  #释放锁
    20 
    21 for i in range(10):
    22     t = threading.Thread(target=Func)
    23     t.start()

    代码执行结果:

    threading.RLock和threading.Lock 的区别
    RLock允许在同一线程中被多次acquire。而Lock却不允许这种情况。 如果使用RLock,那么acquire和release必须成对出现,即调用了n次acquire,必须调用n次的release才能真正释放所占用的琐。

     1 import threading
     2 lock = threading.Lock()    #Lock对象
     3 lock.acquire()
     4 lock.acquire()  #产生了死琐。
     5 lock.release()
     6 lock.release() 
     7 
     8 
     9 
    10 import threading
    11 rLock = threading.RLock()  #RLock对象
    12 rLock.acquire()
    13 rLock.acquire()    #在同一线程内,程序不会堵塞。
    14 rLock.release()
    15 rLock.release()

    1.4、threading.Event

    1、python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法:set、wait、clear。
    2、事件处理的机制:全局定义了一个Flag,如果Flag值为False,那么当程序执行event.wait方法时就会阻塞,
    如果Flag值为True,那么event.wait方法时便不再阻塞。

    方法说明:
    clear --将Flag设置为False
    set -- 将Flag设置为True
    Event.isSet() --判断标识符是否为True


    threading.Event简单实例:
    当线程执行的时候,如果flag为False,则线程会阻塞,当flag为True的时候,线程不会阻塞。它提供了本地和远程的并发性。

     1 #!/usr/bin/env python
     2 # -*- coding:utf-8 -*-
     3 """
     4 threading.Event事件实例
     5 """
     6 import threading
     7 
     8 def do(event):
     9     print("start.....")
    10     event.wait()
    11     print("execuse...")
    12 
    13 event_obj = threading.Event()
    14 
    15 for i in range(5):
    16     t = threading.Thread(target=do, args=(event_obj,))
    17     t.start()
    18 
    19 event_obj.clear()
    20 inp = input('input:(true) ')
    21 if inp == "true":
    22     event_obj.set()

    代码执行结果:

    1.5、threading.Condition(条件变量)

    示例说明:当小伙伴a在往火锅里面添加鱼丸,这个就是生产者行为;另外一个小伙伴b在吃掉鱼丸就是消费者行为。当火锅里面鱼丸达到一定数量加满后b才能吃,这就是一种条件判断了。

    Condition(条件变量)通常与一个锁关联。需要在多个Contidion中共享一个锁时,可以传递一个Lock/RLock实例给构造方法,否则它将自己生成一个RLock实例。

    可以认为,除了Lock带有的锁定池外,Condition还包含一个等待池,池中的线程处于状态图中的等待阻塞状态,直到另一个线程调用notify()/notifyAll()通知;得到通知后线程进入锁定池等待锁定。

    Condition():

    acquire(): 线程锁
    release(): 释放锁
    wait(timeout): 线程挂起,直到收到一个notify通知或者超时(可选的,浮点数,单位是秒s)才会被唤醒继续运行。wait()必须在已获得Lock前提下才能调用,否则会触发RuntimeError。
    notify(n=1): 通知其他线程,那些挂起的线程接到这个通知之后会开始运行,默认是通知一个正等待该condition的线程,最多则唤醒n个等待的线程。notify()必须在已获得Lock前提下才能调用,否则会触发RuntimeError。notify()不会主动释放Lock。
    notifyAll(): 如果wait状态线程比较多,notifyAll的作用就是通知所有线程。


    生产者与消费者示例:
    现实场景:当a同学王火锅里面添加鱼丸加满后(最多3个,加满后通知b去吃掉),通知b同学去吃掉鱼丸(吃到0的时候通知a同学继续添加)

     1 #!/usr/bin/env python
     2 # -*- coding:utf-8 -*-
     3 """
     4 threading.Condition
     5 """
     6 import threading
     7 import time
     8 
     9 con = threading.Condition()
    10 
    11 num = 0
    12 
    13 #生产者
    14 class Producer(threading.Thread):
    15 
    16     def __init__(self):
    17         threading.Thread.__init__(self)
    18 
    19     def run(self):
    20         #锁定线程
    21         global  num
    22         con.acquire()  #获取锁
    23         while True:
    24             print("a同学开始添加......")
    25             num += 1
    26             print("锅里丸子个数为:%s" % str(num))
    27             time.sleep(1)
    28             if num >= 3:
    29                 print("丸子个数已经达到3个了,无法添加。")
    30                 #唤醒等待的线程
    31                 con.notify()  #唤醒同学开吃
    32                 #等待通知
    33                 con.wait()
    34 
    35         #释放锁
    36         con.release()
    37 
    38 #消费者
    39 class Consumers(threading.Thread):
    40     def __init__(self):
    41         threading.Thread.__init__(self)
    42 
    43     def run(self):
    44         con.acquire()
    45         global num
    46         while True:
    47             print("我准备开吃了...")
    48             num -= 1
    49             print("锅里丸子数量为:%s" % str(num))
    50             time.sleep(2)
    51             if num <= 0:
    52                 print("丸子吃完了,赶紧添加啦..")
    53                 con.notify()  #唤醒等待的线程
    54                 #等待通知
    55                 con.wait()
    56         con.release()  #释放锁
    57 
    58 p = Producer()
    59 c = Consumers()
    60 p.start()
    61 c.start()

    代码执行结果:

    1.6、Queue模块

    1.6.1、创建一个“队列”对象
    import queue
    q = queue.queue(maxsize = 10)
    queue.queue类即是一个队列的同步实现。队列长度可为无限或者有限。可通过queue的构造函数的可选参数maxsize来设定队列长度。如果maxsize小于1就表示队列长度无限。

    1.6.2、将一个值放入队列中
    q.put(10)
    调用队列对象的put()方法在队尾插入一个项目。put()有两个参数,第一个item为必需的,为插入项目的值;第二个block为可选参数,默认为
    1。如果队列当前为空且block为1,put()方法就使调用线程暂停,直到空出一个数据单元。如果block为0,put方法将引发Full异常。

    1.6.3、将一个值从队列中取出
    q.get()
    调用队列对象的get()方法从队头删除并返回一个项目。可选参数为block,默认为True。如果队列为空且block为True,get()就使调用线程暂停,直至有项目可用。如果队列为空且block为False,队列将引发Empty异常。

    1.6.4、Python queue模块有三种队列及构造函数:
    1、Python queue模块的FIFO队列先进先出。 class queue.queue(maxsize)
    2、LIFO类似于堆,即先进后出。 class queue.Lifoqueue(maxsize)
    3、还有一种是优先级队列级别越低越先出来。 class queue.Priorityqueue(maxsize)

    1.6.5、queue常用方法(q =queue.queue()):
    q.qsize() 返回队列的大小
    q.empty() 如果队列为空,返回True,反之False
    q.full() 如果队列满了,返回True,反之False
    q.full 与 maxsize 大小对应
    q.get([block[, timeout]]) 获取队列,timeout等待时间
    q.get_nowait() 相当q.get(False)
    非阻塞 q.put(item) 写入队列,timeout等待时间
    q.put_nowait(item) 相当q.put(item, False)
    q.task_done() 在完成一项工作之后,q.task_done() 函数向任务已经完成的队列发送一个信号
    q.join() 实际上意味着等到队列为空,再执行别的操作

    1.6.6、简单的queue实例:生产者-消费者模型

     1 #!/usr/bin/env python
     2 # -*- coding:utf-8 -*-
     3 """
     4 Queue队列
     5 """
     6 import queue
     7 import threading
     8 
     9 
    10 message = queue.Queue(10)
    11 
    12 def producer(i):
    13     while True:
    14         message.put(i)
    15 
    16 def consumer(i):
    17     while True:
    18         msg = message.get(i)
    19 
    20 for i in range(12):
    21     t = threading.Thread(target=producer, args=(i, ))
    22     t.start()
    23 for i in range(10):
    24     t = threading.Thread(target=consumer, args=(i, ))
    25     t.start()

    1.7、自定义线程池

    1.7.1、方法一:简单往队列中传输线程数

     1 #!/usr/bin/env python
     2 # -*- coding:utf-8 -*-
     3 """
     4 自定义线程池
     5 方法一:简单往队列中传输线程数
     6 """
     7 import threading
     8 import time
     9 import queue
    10 
    11 class ThreadingPool():
    12     def __init__(self, max_num = 10):
    13         self.queue = queue.Queue(max_num)
    14         for i in range(max_num):
    15             self.queue.put(threading.Thread)
    16 
    17     def getthreading(self):
    18         return self.queue.get()
    19 
    20     def addthreading(self):
    21         self.queue.put(threading.Thread)
    22 
    23 
    24 def func(p, i):
    25     time.sleep(1)
    26     print(i)
    27     p.addthreading()
    28 
    29 if __name__ == "__main__":
    30     p = ThreadingPool()
    31     for i in range(12):
    32         thread = p.getthreading()
    33         t = thread(target = func, args = (p, i))
    34         t.start()

    代码执行结果:

    1.7.2、方法二:往队列中无限添加任务

      1 #!/usr/bin/env python
      2 # -*- coding:utf-8 -*-
      3 """
      4 自定义线程池
      5 方法二:往队列中无限添加任务
      6 """
      7 import queue
      8 import threading
      9 import contextlib
     10 import time
     11 
     12 StopEvent = object()
     13 
     14 class ThreadPool(object):
     15 
     16     def __init__(self, max_num):
     17         self.q = queue.Queue()
     18         self.max_num = max_num
     19 
     20         self.treminal = False
     21         self.generate_list = []
     22         self.free_list = []
     23 
     24     def run(self, func, args, callback=None):
     25         """
     26         线程池执行一个任务
     27         :param func: 任务函数
     28         :param args: 任务函数所需参数
     29         :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数:1、任务函数执行状态;2、任务函数返回值(默认为None,即不执行回调函数)
     30         :return:如果线程池已经终止,则返回True,否则为None
     31         """
     32         if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
     33             self.generate_thread()
     34             w = (func, args, callback,)
     35             self.q.put(w)
     36 
     37     def generate_thread(self):
     38         """
     39         创建一个线程
     40         :param self:
     41         :return:
     42         """
     43         t = threading.Thread(target=self.call)
     44         t.start()
     45 
     46     def call(self):
     47         """
     48         循环去获取任务函数并执行任务函数
     49         :return:
     50         """
     51         current_thread = threading.currentThread
     52         self.generate_list.append(current_thread)
     53 
     54         event = self.q.get()  #获取线程
     55         while event != StopEvent: #判断获取的线程数不等于全局变量
     56             func, arguments, callback = event  #拆分元组, 获取执行函数,参数, 回调函数
     57             try:
     58                 result = func(*arguments) #执行函数
     59                 status = True
     60 
     61             except Exception as e: #函数执行失败
     62                 status = False
     63                 result = e
     64 
     65             if callback is not None:
     66                 try:
     67                     callback(status, result)
     68                 except Exception as e:
     69                     pass
     70 
     71             with self.work_state():
     72                 event = self.q.get()
     73 
     74         else:
     75             self.generate_list.remove(current_thread)
     76 
     77 
     78     def close(self):
     79         """
     80         关闭线程,给传输全局非元组的变量来进行关闭
     81         :return:
     82         """
     83         for i in range(len(self.generate_list)):
     84             self.q.put(StopEvent)
     85 
     86 
     87     def terminate(self):
     88         """
     89         突然关闭线程
     90         :return:
     91         """
     92         self.terminal = True
     93         while self.generate_list:
     94             self.q.put(StopEvent)
     95         self.q.empty()
     96 
     97 
     98     def work_state(self):
     99         self.free_list.append(threading.current_thread)
    100         try:
    101             yield
    102         finally:
    103             self.free_list.remove(threading.currentThread)
    104 
    105 def work(i):
    106     print(i)
    107     return i + 1 #返回给回调函数
    108 
    109 def callback(ret):
    110     print(ret)
    111 
    112 pool = ThreadPool(10)
    113 for item in range(50):
    114     pool.run(func=work, args=(item, ), callback=callback)
    115 
    116 pool.terminate()
  • 相关阅读:
    HDU 3401 Trade
    POJ 1151 Atlantis
    HDU 3415 Max Sum of MaxKsubsequence
    HDU 4234 Moving Points
    HDU 4258 Covered Walkway
    HDU 4391 Paint The Wall
    HDU 1199 Color the Ball
    HDU 4374 One hundred layer
    HDU 3507 Print Article
    GCC特性之__init修饰解析 kasalyn的专栏 博客频道 CSDN.NET
  • 原文地址:https://www.cnblogs.com/june-L/p/11795631.html
Copyright © 2011-2022 走看看