zoukankan      html  css  js  c++  java
  • 线程锁、线程池



    一、线程(IO密集型工作多线程有用)

    • 线程:
      • 概述:
        • 若一个文件从上到下顺序执行,则为串行执行,整个py文件实际上是一个主线程
        • 若多线程,则可以并行执行,同一个时刻可以运行多个代码段
        • 给每个client请求分配一个线程,则这些线程可以同时工作
      • 多线程、多进程:
        • 1、一个应用程序,可以有多进程和多线程;默认是单进程、单线程
        • 2、单进程、多线程 :
              • 多线程:IO操作(输入输出流,文件操作)有用,因为几乎不用cpu来调度,一般用多线程来提高并发
              • 计算型操作,需要用到cpu调度执行,一般用多进程提高并发
        • 3、GIL 全局解释器锁,即进程中同一时刻只能被CPU 调度一个线程
    • 创建方式
      • 创建方式一:常规方式,比较简单常用
    import threading
    def func1(arg):
    print(arg)
    # t = threading.Thread( target=线程要执行的函数,  args=函数参数-数组, kwargs=函数参数-字典)
    t = threading.Thread(target=f1, args=(123,) )
    t.start() 
    # 准备就绪,准备让cpu进行调度,
    # 只要cpu一旦调度了该线程,就会执行threading 模块中的run方法,run方法只做一件事,执行target指向的函数
    # 即,target实际上是run在方法内部执行的
      • 创建方式二: 重写init和run
    class MyThread(threading.Thread):
    def __init__(self, func, args):
    self.func = func 
    self.args = args 
    super(MyThread, self).__init__()
    def run(self):
    self.func(self.args)
    def f2():
    pass
    obj = MyThread(f2, 123)
    obj.start()
    • 常用方法:
      • t.start()  
          • 准备好该线程,等待CPU进行调度,
          • 虽然线程已经就绪,但是什么时候执行 谁也不知道,只有被cpu进行调度的时候才会执行
      • t.join(3) 
          • 逐个执行线程,执行完毕后再往下执行;可设置执行“超时时间”,该方法使得多线程没有意义
          • 主线程代码执行到这里,会等待子线程去执行,等待超时时间为3秒,子线程执行完毕,主线程才会继续往下执行
      • t.setDaemon(True) 
          • 设置为后台进程(默认为False),主线程执行过程中,后台线程也在执行,主线程执行完毕,后台线程立即终止


    二、线程锁
    • 概述:
      • 线程锁:
        • 因为线程是随机调度,即可能并行去更“改同一个数据”,若多个线程同时对资源进行修改,则会发生错误
        • 互斥锁,同一时刻,只允许一个线程来执行的操作
      • 未使用“锁”:
        • 10个人进行购买商品,每个人购买后商品减1,每个人看到的结果应该为9876543210
        • 实际执行,所有人同时购买商品,同时修改NUM,同时输出NUM ,则都输出0 
    import threading
    import time

    NUM = 0
    def function1():
    global NUM
    NUM += 1
    time.sleep(0.1) # 需要让所有线程都夯在这里,然后同时输出
    print(NUM)

    for x in range(10):
    t = threading.Thread(target=function1, )
    t.start()
    • 各种锁讲解
      • “Rlock 锁”:
        • 将数据修改的位置作为原子操作,加锁
    import threading
    import time

    NUM = 0
    lock = threading.RLock()

    def function1():
    lock.acquire()  
    global NUM
    NUM += 1
    time.sleep(2)
    print(NUM)
    lock.release()

    for x in range(10):
    t = threading.Thread(target=function1,)
    t.start()
     
      • 注意:lock = threading.Rlock() # 支持多层锁的嵌套,一般用Rlock,lock不支持
    def func(l):
    global NUM 
    l.acquire()
    NUM -= 1 
    l.acquire()
    time.sleep(2)
    l.release()
    print(NUM)
    l.release()
      • semaphore”锁
        • 信号量:表示同一时刻允许有多个线程同时工作,比如火车站过安检,每次允许10个人,上批次安检完成后再来十个人
        • 注意:若同一批次运行的线程,都修改同一个数据,依然会有“数据错误”的情况,这里只是演示功能
    import threading
    import time

    NUM = 0
    semaphore = threading.BoundedSemaphore(5) # 表示每批放进来5个线程

    def function1(i, se):
    se.acquire()
    global NUM
    NUM += 1
    time.sleep(2)
    print('我是线程:{}, NUM此时为:{}'.format(i, NUM))
    se.release()

    for x in range(10):
    t = threading.Thread(target=function1, args=(x, semaphore))
    t.start()
      • “event”锁:
        • 事件:要锁全部锁,要放全部放,实则内部维护了一个变量,值为布尔值
        • 该锁提供了3个方法:
          1. set() # 将Flag 值设为True
          2. clear() # 将Flag 值设为 False
          3. wait() # 若Flag为False,则阻塞住;若为True则放行
    import threading
    event = threading.Event()

    def function(id, en):
    print('兵{}:大王,怎么办?'.format(id))
    en.wait()
    print('兵{}:杀呀~~~'.format(id))

    for x in range(10):
    t = threading.Thread(target=function, args=(x, event))
    t.start()

    event.clear()
    print('1、杀出一条血路 2、投降')
    you_choice = input('>>').strip()
    if you_choice == '1':
    event.set()
      • "condition" 锁
        • 使得线程等待,只有当满足条件时,才放出N个线程去执行任务
        • 注意,wait和notify方法前后,必须被acquire和release方法包起来
        • 姿势1:自行notify 通知运行N个
    import threading
    condition = threading.Condition()

    def function(id, con):
    con.acquire()  
    con.wait()  # 子线程全部卡在这里,等待通知
    print('兵{}:杀呀~~~'.format(id))
    con.release()

    for x in range(100):
    t = threading.Thread(target=function, args=(x, condition))
    t.start()


    print('你要挑战几个?')
    you_choice = input('>>').strip()
    condition.acquire()  
    condition.notify(int(you_choice))   # 传进去几个,上面运行几个线程
    condition.release()
        • 姿势2:条件成立,则放出一个去运行
    import threading
    def condition_func(): # 该函数返回值必须为布尔值
    ret = False
    inp = input('>>').strip()
    if inp == '1':
    ret = True
    return ret

    def function(id, con):
    con.acquire()  
    con.wait_for(condition_func) # 一旦该函数体返回值为True,则放出一个线程
    print('thread{}:im gone~~~'.format(id))
    con.release()

    condition = threading.Condition()
    for x in range(5):
    t = threading.Thread(target=function, args=(x, condition))
    t.start()
     
     
    三、Timeer 定时器:
    • 定时器,指定n秒后执行某操作
    from threading import Timer
    import time
     
    def function():
    print('开火~')
     
    t = Timer(3, function)  # 子线程,三秒后执行该函数
    t.start()
    for i in range(1, 4):
    print('time:{}'.format(i)) # 主线程,数数,每次间隔1秒
    time.sleep(1)



     
     
    四、线程池:
    • 线程池概述:
      • “线程池”与“上下文切换”:
        • CPU调度线程,当时间片用完,会进行切换,每次切换时线程现场的“保存与载入”都是时间开销
        • 当线程数目达到一个峰值后,再多创建线程,执行效率会下降
        • 因此,应该控制线程创建的最大数目,池中线程取一个少一个,无线程时,后续请求等待;执行完毕归还线程
      • 要创建一个线程池,那么该线程池应满足如下条件:
        • 线程池为一个容器
        • 池中线程用一个少一个
        • 池中无线程,则进行等待
        • 线程执行完任务,则进行归还
    • 创建姿势1: 利用队列来存放“线程”
    import time
    import queue
    import threading

    class MyPool:
    def __init__(self, maxsize=5):
    self.maxsize = maxsize
    self._q = queue.Queue(self.maxsize)  # 创建一个指定大小的队列
    for _ in range(self.maxsize):
    self._q.put(threading.Thread)  # 在队列中全为线程"类"

    def get_thread(self):
    return self._q.get()  # 取出一个线程"类"

    def add_thread(self):
    self._q.put(threading.Thread)  # 添加一个线程“类”

    # 定义任务,参数为线程池(执行该任务的线程来源),执行完毕后向该池中归还(添加)一个线程
    def task(n, p):
    print('{}: 执行任务,完毕'.format(n))
    time.sleep(2)
    p.add_thread()

    # 创建一个大小为5的线程池
    pool = MyPool(5)

    for i in range(100):
    t_class = pool.get_thread()  # 从队列中获取一个线程"类名称"
    t_obj = t_class(target=task, args=(i, pool))  # 用取出来的类,实例化个线程,并交给线程任务去执行,每次从队列中取一个线程,若没有线程则等待
    t_obj.start()
    • 此时,每次有5个线程去处理工作,不会超过5
    • 问题:
      • 线程重用:任务执行完成后归还线程,用的是“重新创建一个线程并put进队列”的方法,即原来的线程还放于内存中等待被GC回收
      • 空闲线程:若任务数少于池数目,则会多余创建,即应该线程池最初是空的,来一个创建一个,最大为5
     






























    姿势2: 


    • 队列中存放任务:
      • 往往每个任务都是一个函数,
      • 可以将任务和其相关参数搞成元组(函数名,参数),将这些元组put到队列中,则队列中保存的全是需要执行的任务
    • 创建N个线程(N为线程池大小):
      • 每个线程都“循环”从队列中get任务并执行(线程重用)
      • 若队列中所有任务都取完了,则终止已经创建的线程,终止方法如下:
        1. get方法有超时时间,可以设置超时时间,超过这个时间则线程自动销毁
        2. 在队列末尾插入几个空值,get后判断,若取到的是任务则执行,否则则终止
     
     
     
     
     
    #!/usr/bin/env python
    # -*- coding:utf-8 -*-

    import queue
    import threading
    import contextlib
    import time

    StopEvent = object() # 创建个静态字段,字段任何值都行,相当于None,即要在队列后插入的空值


    class ThreadPool(object):

    def __init__(self, max_num, max_task_num = None):
    # 创建任务队列,可显示指定任务队列大小,否则为不限制
    if max_task_num:
    self.q = queue.Queue(max_task_num)
    else:
    self.q = queue.Queue()
     
     
    self.max_num = max_num # 最大线程数
    self.cancel = False
    self.terminal = False
    self.generate_list = [] # 当前已经创建的线程
    self.free_list = [] # 当前的空闲线程,队列中没有任务,则线程会空闲

    def run(self, func, args, callback=None):
    """
    线程池执行一个任务
    :param func: 任务函数
    :param args: 任务函数所需参数
    :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数)
    :return: 如果线程池已经终止,则返回True否则None
    """
    if self.cancel:
    return
    # 没有空闲着的线程,并且已经创建的线程有没有超过线程池最大大小时,创建线程
    if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
    self.generate_thread()
    w = (func, args, callback,) # 将线程的id,要执行的任务,包装成一个元组
    self.q.put(w) # 将包装好的元组放入队列中

    def generate_thread(self):
    """
    创建一个线程,并执行call方法
    """
    t = threading.Thread(target=self.call)
    t.start()

    def call(self):
    """
    循环去获取任务函数并执行任务函数
    """
    current_thread = threading.currentThread() # 获取当前线程
    self.generate_list.append(current_thread) # 在线程列表中添加一个线程,

    event = self.q.get() #从队列中获取任务,任务都是包装的元组(id,任务)
    # 若是一个元组,即不等于stopevent
    while event != StopEvent:
    func, arguments, callback = event # 元组解包
    try:
    result = func(*arguments)  # 执行任务
    success = True
    except Exception as e:
    success = False
    result = None

    if callback is not None:
    try:
    callback(success, result)
    except Exception as e:
    pass
    # 线程执行完任务,则线程处于空闲状态,则标记该任务为空闲进程,下次直接会从空闲进程去取任务
    with self.worker_state(self.free_list, current_thread):
    if self.terminal:
    event = StopEvent
    else:
    event = self.q.get() # 执行完任务后,该线程又去获取任务,然后再次while
    # 若evnent不是一个元组,则从线程列表中移除当前线程
    else:
    self.generate_list.remove(current_thread)

    def close(self):
    """
    执行完所有的任务后,所有线程停止,创建了几个线程,则往任务队列中插入几个空值
    """
    self.cancel = True
    full_size = len(self.generate_list)
    while full_size:
    self.q.put(StopEvent)
    full_size -= 1

    def terminate(self):
    """
    无论是否还有任务,终止线程
    """
    self.terminal = True

    while self.generate_list:
    self.q.put(StopEvent)

    self.q.queue.clear()

    @contextlib.contextmanager
    def worker_state(self, state_list, worker_thread):
    """
    用于记录线程中正在等待的线程数
    """
    state_list.append(worker_thread)
    try:
    yield
    finally:
    state_list.remove(worker_thread)



    # How to use


    pool = ThreadPool(5)

    def callback(status, result):
    # 需要执行的任务
    pass


    def action(i):
    print(i)

    # 创建300个任务
    for i in range(300):
    ret = pool.run(action, (i,), callback) 

    time.sleep(5)
    print(len(pool.generate_list), len(pool.free_list))
    print(len(pool.generate_list), len(pool.free_list))
    # pool.close()
    # pool.terminate()
     
     
     
     
     
     
     
     
     
     
     
     

     
     
  • 相关阅读:
    面向对象方法与调用
    LeetCode OJ:Spiral Matrix(螺旋矩阵)
    LeetCode OJ:Jump Game(跳跃游戏)
    LeetCode OJ:Word Search(单词查找)
    LeetCode OJ:Majority Element II(主元素II)
    LeetCode OJ:Maximum Subarray(子数组最大值)
    LeetCode OJ:Next Permutation(下一排列)
    LeetCode OJ:Product of Array Except Self(除己之外的元素乘积)
    LeetCode OJ:Remove Duplicates from Sorted Array II(移除数组中的重复元素II)
    LeetCode OJ:Best Time to Buy and Sell Stock II(股票买入卖出最佳实际II)
  • 原文地址:https://www.cnblogs.com/qiaogy/p/5875448.html
Copyright © 2011-2022 走看看