zoukankan      html  css  js  c++  java
  • 线程锁、线程池



    一、线程(IO密集型工作多线程有用)

    • 线程:
      • 概述:
        • 若一个文件从上到下顺序执行,则为串行执行,整个py文件实际上是一个主线程
        • 若多线程,则可以并行执行,同一个时刻可以运行多个代码段
        • 给每个client请求分配一个线程,则这些线程可以同时工作
      • 多线程、多进程:
        • 1、一个应用程序,可以有多进程和多线程;默认是单进程、单线程
        • 2、单进程、多线程 :
              • 多线程:IO操作(输入输出流,文件操作)有用,因为几乎不用cpu来调度,一般用多线程来提高并发
              • 计算型操作,需要用到cpu调度执行,一般用多进程提高并发
        • 3、GIL 全局解释器锁,即进程中同一时刻只能被CPU 调度一个线程
    • 创建方式
      • 创建方式一:常规方式,比较简单常用
    import threading
    def func1(arg):
    print(arg)
    # t = threading.Thread( target=线程要执行的函数,  args=函数参数-数组, kwargs=函数参数-字典)
    t = threading.Thread(target=f1, args=(123,) )
    t.start() 
    # 准备就绪,准备让cpu进行调度,
    # 只要cpu一旦调度了该线程,就会执行threading 模块中的run方法,run方法只做一件事,执行target指向的函数
    # 即,target实际上是run在方法内部执行的
      • 创建方式二: 重写init和run
    class MyThread(threading.Thread):
    def __init__(self, func, args):
    self.func = func 
    self.args = args 
    super(MyThread, self).__init__()
    def run(self):
    self.func(self.args)
    def f2():
    pass
    obj = MyThread(f2, 123)
    obj.start()
    • 常用方法:
      • t.start()  
          • 准备好该线程,等待CPU进行调度,
          • 虽然线程已经就绪,但是什么时候执行 谁也不知道,只有被cpu进行调度的时候才会执行
      • t.join(3) 
          • 逐个执行线程,执行完毕后再往下执行;可设置执行“超时时间”,该方法使得多线程没有意义
          • 主线程代码执行到这里,会等待子线程去执行,等待超时时间为3秒,子线程执行完毕,主线程才会继续往下执行
      • t.setDaemon(True) 
          • 设置为后台进程(默认为False),主线程执行过程中,后台线程也在执行,主线程执行完毕,后台线程立即终止


    二、线程锁
    • 概述:
      • 线程锁:
        • 因为线程是随机调度,即可能并行去更“改同一个数据”,若多个线程同时对资源进行修改,则会发生错误
        • 互斥锁,同一时刻,只允许一个线程来执行的操作
      • 未使用“锁”:
        • 10个人进行购买商品,每个人购买后商品减1,每个人看到的结果应该为9876543210
        • 实际执行,所有人同时购买商品,同时修改NUM,同时输出NUM ,则都输出0 
    import threading
    import time

    NUM = 0
    def function1():
    global NUM
    NUM += 1
    time.sleep(0.1) # 需要让所有线程都夯在这里,然后同时输出
    print(NUM)

    for x in range(10):
    t = threading.Thread(target=function1, )
    t.start()
    • 各种锁讲解
      • “Rlock 锁”:
        • 将数据修改的位置作为原子操作,加锁
    import threading
    import time

    NUM = 0
    lock = threading.RLock()

    def function1():
    lock.acquire()  
    global NUM
    NUM += 1
    time.sleep(2)
    print(NUM)
    lock.release()

    for x in range(10):
    t = threading.Thread(target=function1,)
    t.start()
     
      • 注意:lock = threading.Rlock() # 支持多层锁的嵌套,一般用Rlock,lock不支持
    def func(l):
    global NUM 
    l.acquire()
    NUM -= 1 
    l.acquire()
    time.sleep(2)
    l.release()
    print(NUM)
    l.release()
      • semaphore”锁
        • 信号量:表示同一时刻允许有多个线程同时工作,比如火车站过安检,每次允许10个人,上批次安检完成后再来十个人
        • 注意:若同一批次运行的线程,都修改同一个数据,依然会有“数据错误”的情况,这里只是演示功能
    import threading
    import time

    NUM = 0
    semaphore = threading.BoundedSemaphore(5) # 表示每批放进来5个线程

    def function1(i, se):
    se.acquire()
    global NUM
    NUM += 1
    time.sleep(2)
    print('我是线程:{}, NUM此时为:{}'.format(i, NUM))
    se.release()

    for x in range(10):
    t = threading.Thread(target=function1, args=(x, semaphore))
    t.start()
      • “event”锁:
        • 事件:要锁全部锁,要放全部放,实则内部维护了一个变量,值为布尔值
        • 该锁提供了3个方法:
          1. set() # 将Flag 值设为True
          2. clear() # 将Flag 值设为 False
          3. wait() # 若Flag为False,则阻塞住;若为True则放行
    import threading
    event = threading.Event()

    def function(id, en):
    print('兵{}:大王,怎么办?'.format(id))
    en.wait()
    print('兵{}:杀呀~~~'.format(id))

    for x in range(10):
    t = threading.Thread(target=function, args=(x, event))
    t.start()

    event.clear()
    print('1、杀出一条血路 2、投降')
    you_choice = input('>>').strip()
    if you_choice == '1':
    event.set()
      • "condition" 锁
        • 使得线程等待,只有当满足条件时,才放出N个线程去执行任务
        • 注意,wait和notify方法前后,必须被acquire和release方法包起来
        • 姿势1:自行notify 通知运行N个
    import threading
    condition = threading.Condition()

    def function(id, con):
    con.acquire()  
    con.wait()  # 子线程全部卡在这里,等待通知
    print('兵{}:杀呀~~~'.format(id))
    con.release()

    for x in range(100):
    t = threading.Thread(target=function, args=(x, condition))
    t.start()


    print('你要挑战几个?')
    you_choice = input('>>').strip()
    condition.acquire()  
    condition.notify(int(you_choice))   # 传进去几个,上面运行几个线程
    condition.release()
        • 姿势2:条件成立,则放出一个去运行
    import threading
    def condition_func(): # 该函数返回值必须为布尔值
    ret = False
    inp = input('>>').strip()
    if inp == '1':
    ret = True
    return ret

    def function(id, con):
    con.acquire()  
    con.wait_for(condition_func) # 一旦该函数体返回值为True,则放出一个线程
    print('thread{}:im gone~~~'.format(id))
    con.release()

    condition = threading.Condition()
    for x in range(5):
    t = threading.Thread(target=function, args=(x, condition))
    t.start()
     
     
    三、Timeer 定时器:
    • 定时器,指定n秒后执行某操作
    from threading import Timer
    import time
     
    def function():
    print('开火~')
     
    t = Timer(3, function)  # 子线程,三秒后执行该函数
    t.start()
    for i in range(1, 4):
    print('time:{}'.format(i)) # 主线程,数数,每次间隔1秒
    time.sleep(1)



     
     
    四、线程池:
    • 线程池概述:
      • “线程池”与“上下文切换”:
        • CPU调度线程,当时间片用完,会进行切换,每次切换时线程现场的“保存与载入”都是时间开销
        • 当线程数目达到一个峰值后,再多创建线程,执行效率会下降
        • 因此,应该控制线程创建的最大数目,池中线程取一个少一个,无线程时,后续请求等待;执行完毕归还线程
      • 要创建一个线程池,那么该线程池应满足如下条件:
        • 线程池为一个容器
        • 池中线程用一个少一个
        • 池中无线程,则进行等待
        • 线程执行完任务,则进行归还
    • 创建姿势1: 利用队列来存放“线程”
    import time
    import queue
    import threading

    class MyPool:
    def __init__(self, maxsize=5):
    self.maxsize = maxsize
    self._q = queue.Queue(self.maxsize)  # 创建一个指定大小的队列
    for _ in range(self.maxsize):
    self._q.put(threading.Thread)  # 在队列中全为线程"类"

    def get_thread(self):
    return self._q.get()  # 取出一个线程"类"

    def add_thread(self):
    self._q.put(threading.Thread)  # 添加一个线程“类”

    # 定义任务,参数为线程池(执行该任务的线程来源),执行完毕后向该池中归还(添加)一个线程
    def task(n, p):
    print('{}: 执行任务,完毕'.format(n))
    time.sleep(2)
    p.add_thread()

    # 创建一个大小为5的线程池
    pool = MyPool(5)

    for i in range(100):
    t_class = pool.get_thread()  # 从队列中获取一个线程"类名称"
    t_obj = t_class(target=task, args=(i, pool))  # 用取出来的类,实例化个线程,并交给线程任务去执行,每次从队列中取一个线程,若没有线程则等待
    t_obj.start()
    • 此时,每次有5个线程去处理工作,不会超过5
    • 问题:
      • 线程重用:任务执行完成后归还线程,用的是“重新创建一个线程并put进队列”的方法,即原来的线程还放于内存中等待被GC回收
      • 空闲线程:若任务数少于池数目,则会多余创建,即应该线程池最初是空的,来一个创建一个,最大为5
     






























    姿势2: 


    • 队列中存放任务:
      • 往往每个任务都是一个函数,
      • 可以将任务和其相关参数搞成元组(函数名,参数),将这些元组put到队列中,则队列中保存的全是需要执行的任务
    • 创建N个线程(N为线程池大小):
      • 每个线程都“循环”从队列中get任务并执行(线程重用)
      • 若队列中所有任务都取完了,则终止已经创建的线程,终止方法如下:
        1. get方法有超时时间,可以设置超时时间,超过这个时间则线程自动销毁
        2. 在队列末尾插入几个空值,get后判断,若取到的是任务则执行,否则则终止
     
     
     
     
     
    #!/usr/bin/env python
    # -*- coding:utf-8 -*-

    import queue
    import threading
    import contextlib
    import time

    StopEvent = object() # 创建个静态字段,字段任何值都行,相当于None,即要在队列后插入的空值


    class ThreadPool(object):

    def __init__(self, max_num, max_task_num = None):
    # 创建任务队列,可显示指定任务队列大小,否则为不限制
    if max_task_num:
    self.q = queue.Queue(max_task_num)
    else:
    self.q = queue.Queue()
     
     
    self.max_num = max_num # 最大线程数
    self.cancel = False
    self.terminal = False
    self.generate_list = [] # 当前已经创建的线程
    self.free_list = [] # 当前的空闲线程,队列中没有任务,则线程会空闲

    def run(self, func, args, callback=None):
    """
    线程池执行一个任务
    :param func: 任务函数
    :param args: 任务函数所需参数
    :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数)
    :return: 如果线程池已经终止,则返回True否则None
    """
    if self.cancel:
    return
    # 没有空闲着的线程,并且已经创建的线程有没有超过线程池最大大小时,创建线程
    if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
    self.generate_thread()
    w = (func, args, callback,) # 将线程的id,要执行的任务,包装成一个元组
    self.q.put(w) # 将包装好的元组放入队列中

    def generate_thread(self):
    """
    创建一个线程,并执行call方法
    """
    t = threading.Thread(target=self.call)
    t.start()

    def call(self):
    """
    循环去获取任务函数并执行任务函数
    """
    current_thread = threading.currentThread() # 获取当前线程
    self.generate_list.append(current_thread) # 在线程列表中添加一个线程,

    event = self.q.get() #从队列中获取任务,任务都是包装的元组(id,任务)
    # 若是一个元组,即不等于stopevent
    while event != StopEvent:
    func, arguments, callback = event # 元组解包
    try:
    result = func(*arguments)  # 执行任务
    success = True
    except Exception as e:
    success = False
    result = None

    if callback is not None:
    try:
    callback(success, result)
    except Exception as e:
    pass
    # 线程执行完任务,则线程处于空闲状态,则标记该任务为空闲进程,下次直接会从空闲进程去取任务
    with self.worker_state(self.free_list, current_thread):
    if self.terminal:
    event = StopEvent
    else:
    event = self.q.get() # 执行完任务后,该线程又去获取任务,然后再次while
    # 若evnent不是一个元组,则从线程列表中移除当前线程
    else:
    self.generate_list.remove(current_thread)

    def close(self):
    """
    执行完所有的任务后,所有线程停止,创建了几个线程,则往任务队列中插入几个空值
    """
    self.cancel = True
    full_size = len(self.generate_list)
    while full_size:
    self.q.put(StopEvent)
    full_size -= 1

    def terminate(self):
    """
    无论是否还有任务,终止线程
    """
    self.terminal = True

    while self.generate_list:
    self.q.put(StopEvent)

    self.q.queue.clear()

    @contextlib.contextmanager
    def worker_state(self, state_list, worker_thread):
    """
    用于记录线程中正在等待的线程数
    """
    state_list.append(worker_thread)
    try:
    yield
    finally:
    state_list.remove(worker_thread)



    # How to use


    pool = ThreadPool(5)

    def callback(status, result):
    # 需要执行的任务
    pass


    def action(i):
    print(i)

    # 创建300个任务
    for i in range(300):
    ret = pool.run(action, (i,), callback) 

    time.sleep(5)
    print(len(pool.generate_list), len(pool.free_list))
    print(len(pool.generate_list), len(pool.free_list))
    # pool.close()
    # pool.terminate()
     
     
     
     
     
     
     
     
     
     
     
     

     
     
  • 相关阅读:
    最大子数组求和并进行条件组合覆盖测试
    Ubuntu 16.04 c++ Google框架单元测试
    The directory '/home/stone/.cache/pip/http' or its parent directory is not owned by the current user and the cache has been disabled. Please check the permissions and owner of that directory. If execu
    Problem executing scripts APT::Update::Post-Invoke-Success 'if /usr/bin/test -w /var/cache/app-info -a -e /usr/bin/appstreamcli; then appstreamcli refresh > /dev/null; fi'
    个人博客作业三:微软小娜APP的案例分析
    补交 作业一
    补交 作业二:个人博客作业内容:需求分析
    嵌入式软件设计第12次实验报告
    嵌入式软件设计第11次实验报告
    嵌入式软件设计第10次实验报告
  • 原文地址:https://www.cnblogs.com/qiaogy/p/5875448.html
Copyright © 2011-2022 走看看