zoukankan      html  css  js  c++  java
  • 线程的创建 锁 Threading模块 事件 条件 定时器 队列 线程池 回调函数

    线程的创建:
    创建线程的方式1:
    from threading import Thread
    import time
    def sayhi(name):
        time.sleep(2)
        print('%s say hello' %name)
    
    if __name__ == '__main__':
        t=Thread(target=sayhi,args=('egon',))
        t.start()
        print('主线程')
    View Code
    创建线程的方式2(面向对象):
    from threading import Thread
    
    import time
    class Sayhi(Thread):
        def __init__(self,name):
            super().__init__()
            self.name=name
        def run(self):
            time.sleep(2)
            print('%s say hello' % self.name)
    
    
    if __name__ == '__main__':
        t = Sayhi('egon')
        t.start()
        print('主线程')
    View Code
    多线程与多进程的差别:       
    pid的比较:
    from threading import Thread
    from multiprocessing import Process
    import os
    
    def work():
        print('hello',os.getpid())
    
    if __name__ == '__main__':
        #part1:在主进程下开启多个线程,每个线程都跟主进程的pid一样
        t1=Thread(target=work)
        t2=Thread(target=work)
        t1.start()
        t2.start()
        print('主线程/主进程pid',os.getpid())
    
        #part2:开多个进程,每个进程都有不同的pid
        p1=Process(target=work)
        p2=Process(target=work)
        p1.start()
        p2.start()
        print('主线程/主进程pid',os.getpid())
    View Code
    开启效率的快慢:
    from threading import Thread
    from multiprocessing import Process
    import os
    
    def work():
        print('hello')
    
    if __name__ == '__main__':
        #在主进程下开启线程
        t=Thread(target=work)
        t.start()
        print('主线程/主进程')
        '''
        打印结果:
        hello
        主线程/主进程
        '''
    
        #在主进程下开启子进程
        t=Process(target=work)
        t.start()
        print('主线程/主进程')
        '''
        打印结果:
        主线程/主进程
        hello
        '''
    View Code
    内存数据的共享问题:
    from  threading import Thread
    from multiprocessing import Process
    import os
    def work():
        global n
        n=0
    
    if __name__ == '__main__':
        # n=100
        # p=Process(target=work)
        # p.start()
        # p.join()
        # print('主',n) #毫无疑问子进程p已经将自己的全局的n改成了0,但改的仅仅是它自己的,查看父进程的n仍然为100
    
    
        n=1
        t=Thread(target=work)
        t.start()
        t.join()
        print('',n) #查看结果为0,因为同一进程内的线程之间共享进程内的数据
    同一进程内的线程共享该进程的数据?
    View Code
    Thread类         
    效率差别
    import time
    from threading import Thread
    from multiprocessing import Process
    效率差别
    def func(a):
        a = a + 1
    
    if __name__ == '__main__':
        start = time.time()
        t_l = []
        for i in range(50):
            t = Thread(target=func,args=(i,))
            t.start()
            t_l.append(t)
        # t_l 50个线程对象
        for t in t_l : t.join()
        print('主线程')
        print(time.time() - start)
    
        start = time.time()
        t_l = []
        for i in range(50):
            t = Process(target=func, args=(i,))
            t.start()
            t_l.append(t)
        # t_l 50个线程对象
        for t in t_l: t.join()
        print('主进程')
        print(time.time() - start)
    
    start join
    View Code
    terminate (强制结束一个正在运行的进程)--非阻塞    在线程中没有
    Thread实例对象的方法
     
    线程之间的数据共享   (可以引用全局变量)   
    from threading import Thread
    n = 100
    def func():
        global n
        n -= 1
    
    if __name__ == '__main__':
        t_l = []
        for i in range(100):
            t = Thread(target=func)
            t.start()
            t_l.append(t)
        for t in t_l:
            t.join()
        print(n)
    View Code
    join方法:   
    from threading import Thread
    import time
    def sayhi(name):
        time.sleep(2)
        print('%s say hello' %name)
    
    if __name__ == '__main__':
        t=Thread(target=sayhi,args=('egon',))
        t.start()
        t.join()
        print('主线程')
        print(t.is_alive())
        '''
        egon say hello
        主线程
        False
        '''
    View Code
    守护线程      setDaemon    
    import time
    from threading import Thread
    def thread1():
        while True:
            print(True)
            time.sleep(0.5)
    
    def thread2():
        print('in t2 start')
        time.sleep(3)
        print('in t2 end')
    
    if __name__ == '__main__':
        t1 = Thread(target=thread1)
        t1.setDaemon(True)
        t1.start()
        t2 = Thread(target=thread2)
        t2.start()
        time.sleep(1)
        print('主线程')
    View Code
    主线程如果结束了 那么整个进程就结束
    守护线程 会等待主线程(及其他子线程)结束之后才结束.
        主进程 等待 守护进程 子进程
        守护进程 只守护主进程的代码就可以了
        守护线程不行 主线程如果结束了,其他子进程也结束了, 那么整个进程就结束 所有的线程就都结束
    使用多线程实现tcp协议的socket server
    客户端:
    import multiprocessing
    import threading
    
    import socket
    s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    s.bind(('127.0.0.1',8080))
    s.listen(5)
    
    def action(conn):
        while True:
            data=conn.recv(1024)
            print(data)
            conn.send(data.upper())
    
    if __name__ == '__main__':
    
        while True:
            conn,addr=s.accept()
    
    
            p=threading.Thread(target=action,args=(conn,))
            p.start()
    View Code
    服务器:
    import socket
    s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    s.connect(('127.0.0.1',8080))
    
    while True:
        msg=input('>>: ').strip()
        if not msg:continue
    
        s.send(msg.encode('utf-8'))
        data=s.recv(1024)
        print(data)
    View Code
    开启线程的第二种方式和查看线程id
    from threading import Thread,get_ident
    开启线程的第二种方式和查看线程id
    class MyThread(Thread):
        def __init__(self,args):
            super().__init__()   # Thread类的init,在这个方法中做了很多对self的赋值操作,都是给创建线程或者使用线程的时候用的
            self.args = args
    
        def run(self):
            print('in my thread : ',get_ident(),self.args)
    
    print('main',get_ident())
    t = MyThread('wahaha')
    t.start()
    View Code
    线程中的方法    
    import time
    from threading import Thread,get_ident,currentThread,enumerate,activeCount
    开启线程的第二种方式和查看线程id
    class MyThread(Thread):
        def __init__(self,args):
            super().__init__()   # Thread类的init,在这个方法中做了很多对self的赋值操作,都是给创建线程或者使用线程的时候用的
            self.args = args
    
        def run(self):
            time.sleep(0.1)
            # print(currentThread())
            print('in my thread : ',get_ident(),self.args)  #查看id
    
    print('main',get_ident())
    t = MyThread('wahaha')
    # print(t.is_alive())
    t.start()
    print(activeCount())  # 正在运行的线程的数量 len(enumerate())
    # print(enumerate())   #有相同的结果。
    # print('t : ',t)
    # print(t.is_alive())
    View Code
    Thread实例对象的方法:
      # isAlive(): 返回线程是否活动的。
      # getName(): 返回线程名。
      # setName(): 设置线程名。
    
    threading模块提供的一些方法:
      # threading.currentThread(): 返回当前的线程变量。
      # threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
      # threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。
    
    thread对象的其他方法 : isAlive ,setname,getname
    threading模块的方法 : currentTread,activeCount,enumerate
    
    在多个进程线程同时访问一个数据的时候就会产生数据不安全的现象
    多进程 访问文件
    多线程
        同时去访问一个数据
    GIL 全局解释器锁           
        在同一个进程里的每一个线程同一时间只能有一个线程访问cpu
    尽量不要设置全局变量
    只要在多线程/进程之间用到全局变量 就加上锁
    from threading import Lock,Thread
    lock = Lock()
    lock.acquire()
      # lock.acquire()
    noodle = 100
    def func(name,lock):
        global noodle
        lock.acquire()
        noodle -= 1
        lock.release()
        print('%s吃到面了'%name)
    
    if __name__ == '__main__':
        lock = Lock()  # 线程锁 互斥锁
        t_lst = []
        for i in range(10):
            t = Thread(target=func,args=(i,lock))
            t.start()
            t_lst.append(t)
        for t in t_lst:
            t.join()
        print(noodle)
    View Code
    死锁:               
    科学家吃面问题
    import time
    from threading import Thread,Lock
    lock = Lock()
    noodle_lock = Lock()
    fork_lock = Lock()
    def eat1(name):
        noodle_lock.acquire()
        print('%s拿到了面' % name)
        fork_lock.acquire()
        print('%s拿到了叉子' % name)
        print('%s在吃面'%name)
        time.sleep(0.5)
        fork_lock.release()  # 0.01
        noodle_lock.release() # 0.01
    
    def eat2(name):
        fork_lock.acquire()  # 0.01
        print('%s拿到了叉子' % name) # 0.01
        noodle_lock.acquire()
        print('%s拿到了面' % name)
        print('%s在吃面'%name)
        time.sleep(0.5)
        noodle_lock.release()
        fork_lock.release()
    
    eat_lst = ['alex','wusir','太白','yuan']
    for name in eat_lst:  # 8个子线程 7个线程 3个线程eat1,4个线程eat2
        Thread(target=eat1,args=(name,)).start()
        Thread(target=eat2,args=(name,)).start()
    View Code
    递归锁     (只拿不还)    
    from threading import RLock
    rlock = RLock()
    rlock.acquire()
    print(1)
    rlock.acquire()
    print(2)
    rlock.acquire()
    print(3)
    View Code
    递归锁解决死锁问题   
    import time
    from threading import Thread,RLock
    lock = RLock()
    def eat1(name):
        lock.acquire()
        print('%s拿到了面' % name)
        lock.acquire()
        print('%s拿到了叉子' % name)
        print('%s在吃面'%name)
        time.sleep(0.5)
        lock.release()  # 0.01
        lock.release() # 0.01
    
    def eat2(name):
        lock.acquire()  # 0.01
        print('%s拿到了叉子' % name) # 0.01
        lock.acquire()
        print('%s拿到了面' % name)
        print('%s在吃面'%name)
        time.sleep(0.5)
        lock.release()
        lock.release()
    
    eat_lst = ['alex','wusir','太白','yuan']
    for name in eat_lst:  # 8个子线程 7个线程 3个线程eat1,4个线程eat2
        Thread(target=eat1,args=(name,)).start()
        Thread(target=eat2,args=(name,)).start()
    View Code
    互斥锁解决死锁问题     
    import time
    from threading import Thread,Lock
    lock = Lock()
    def eat1(name):
        lock.acquire()
        print('%s拿到了面' % name)
        print('%s拿到了叉子' % name)
        print('%s在吃面'%name)
        time.sleep(0.5)
        lock.release() # 0.01
    
    def eat2(name):
        lock.acquire()  # 0.01
        print('%s拿到了叉子' % name) # 0.01
        print('%s拿到了面' % name)
        print('%s在吃面'%name)
        time.sleep(0.5)
        lock.release()
    
    eat_lst = ['alex','wusir','太白','yuan']
    for name in eat_lst:  # 8个子线程 7个线程 3个线程eat1,4个线程eat2
        Thread(target=eat1,args=(name,)).start()
        Thread(target=eat2,args=(name,)).start()
    View Code
    死锁
        多把锁同时应用在多个线程中
    互斥锁和递归锁哪个好
        递归锁 快速恢复服务
        死锁问题的出现 是程序的设计或者逻辑的问题
        还应该进一步的排除和重构逻辑来保证使用互斥锁也不会发生死锁
    互斥锁和递归锁的区别
        互斥锁 就是在一个线程中不能连续多次ACQUIRE
        递归锁 可以在同一个线程中acquire任意次,注意acquire多少次就需要release多少次
    信号量     + 计数器
    
    信号量与进程池的效率对比:
    import time
    from multiprocessing import Semaphore,Process,Pool
    def ktv1(sem,i):
        sem.acquire()
        i += 1
        sem.release()
    
    def ktv2(i):
        i += 1
    
    if __name__ == '__main__':
        sem = Semaphore(5)
        start = time.time()
        p_l = []
        for i in range(100):
            p = Process(target=ktv1,args=(sem,i))
            p.start()
            p_l.append(p)
        for p in p_l : p.join()
        print('###',time.time() - start)
    
        start = time.time()
        p = Pool(5)
        p_l = []
        for i in range(100):
            ret = p.apply_async(func=ktv2, args=(sem, i))
            p_l.append(ret)
        p.close()
        p.join()
        print('***',time.time() - start)
    View Code
    池 和 信号量
    池 效率高
        池子里有几个一共就起几个
        不管多少任务 池子的个数是固定的
        开启进程和关闭进程这些事都是需要固定的开销
        就不产生额外的时间开销
        且进程程池中的进程数控制的好,那么操作系统的压力也小
    信号量
        有多少个任务就起多少进程/线程
        可以帮助你减少操作系统切换的负担
        但是并不能帮助你减少进/线程开启和关闭的时间
    
    事件       
    wait
        等 到 事件内部的信号变成True就不阻塞了
    set
        设置信号变成True
    clear
        设置信号变成False
    is_set
        查看信号是否为True
    数据库连接   
    import time
    import random
    from threading import Event,Thread
    def check(e):
        '''检测一下数据库的网络和我的网络是否通'''
        print('正在检测两台机器之间的网络情况 ...')
        time.sleep(random.randint(1,3))
        e.set()
    
    def connet_db(e):
        e.wait()
        print('连接数据库 ... ')
        print('连接数据库成功~~~')
    
    e = Event()
    Thread(target=connet_db,args=(e,)).start()
    Thread(target=check,args=(e,)).start()
    
    import time
    import random
    from threading import Event,Thread
    def check(e):
        '''检测一下数据库的网络和我的网络是否通'''
        print('正在检测两台机器之间的网络情况 ...')
        time.sleep(random.randint(0,2))
        e.set()
    
    def connet_db(e):
        n = 0
        while n < 3:
            if e.is_set():
                break
            else:
                e.wait(0.5)
                n += 1
        if n == 3:
            raise TimeoutError
        print('连接数据库 ... ')
        print('连接数据库成功~~~')
    
    e = Event()
    Thread(target=connet_db,args=(e,)).start()
    Thread(target=check,args=(e,)).start()
    View Code
    条件:          
    from threading import Condition
    acquire
    release
    wait    阻塞
    notify  让wait解除阻塞的工具
    wait 或是 notify在执行这两个方法的前后 必须执行acquire和release
    from threading import Condition,Thread
    def func(con,i):
        con.acquire()
        # 判断某条件
        con.wait()
        print('threading : ',i)
        con.release()
    
    con = Condition()
    for i in range(20):
        Thread(target=func,args=(con,i)).start()
    con.acquire()
    # 帮助wait的子线程处理某个数据直到满足条件
    con.notify_all()
    con.release()
    while True:
        num = int(input('num >>>'))
        con.acquire()
        con.notify(num)
        con.release()
    View Code
    定时器:      
    from threading import Timer
    def func():
        print('执行我啦')
    
    interval 时间间隔
    Timer(0.2,func).start()  # 定时器
    创建线程的时候,就规定它多久之后去执行
    队列:                                                
    from multiprocessing import Queue,JoinableQueue  # 进程IPC队列
    from queue import Queue  # 线程队列  先进先出的
    from queue import LifoQueue  # 后进先出的
    put get put_nowait get_nowait full empty qsize
    队列Queue
        先进先出
        自带锁 数据安全
    栈 LifoQueue
        后进先出
        自带锁 数据安全
    lq = LifoQueue(5)
    lq.put(123)
    lq.put(456)
    lq.put('abc')
    lq.put('abc')
    lq.put('abc')
    lq.put('abc')
    lq.put('abc')
    print(lq)
    print(lq.get())
    print(lq.get())
    print(lq.get())
    print(lq.get())
    from queue import PriorityQueue # 优先级队列
    pq = PriorityQueue()
    pq.put((10,'aaa'))
    pq.put((5,'zzz'))
    pq.put((5,'bbb'))
    pq.put((20,'ccc'))
    
    print(pq.get())
    print(pq.get())
    print(pq.get())
    print(pq.get())
    View Code
    线程池:                                  
    Threading 没有线程池的
    Multiprocessing Pool
    concurrent.futures帮助你管理线程池和进程池
    import time
    from threading import currentThread,get_ident
    from concurrent.futures import ThreadPoolExecutor  # 帮助你启动线程池的类
    from concurrent.futures import ProcessPoolExecutor  # 帮助你启动线程池的类
    
    def func(i):
        time.sleep(1)
        print('in %s %s'%(i,currentThread()))
        return i**2
    
    def back(fn):
        print(fn.result(),currentThread())
    
    map启动多线程任务
    t = ThreadPoolExecutor(5)
    t.map(func,range(20))
    for i in range(20):
        t.submit(func,i)
    
    submit异步提交任务
    t = ThreadPoolExecutor(5)
    for i in range(20):
        t.submit(fn=func,)
    t.shutdown()
    print('main : ',currentThread())
    起多少个线程池
        5*CPU的个数
    
    获取任务结果
    t = ThreadPoolExecutor(20)
    ret_l = []
    for i in range(20):
        ret = t.submit(func,i)
        ret_l.append(ret)
    t.shutdown()
    for ret in ret_l:
        print(ret.result())
    print('main : ',currentThread())
    
    回调函数
    t = ThreadPoolExecutor(20)
    for i in range(100):
        t.submit(func,i).add_done_callback(back)
    回调函数(进程版)                   
    import os
    import time
    from concurrent.futures import ProcessPoolExecutor  # 帮助你启动线程池的类
    
    def func(i):
        time.sleep(1)
        print('in %s %s'%(i,os.getpid()))
        return i**2
    
    def back(fn):
        print(fn.result(),os.getpid())
    if __name__ == '__main__':
        print('main : ',os.getpid())
        t = ProcessPoolExecutor(20)
        for i in range(100):
            t.submit(func,i).add_done_callback(back)
    View Code
    multiprocessing模块自带进程池的
    threading模块是没有线程池的
    concurrent.futures 进程池 和 线程池
        高度封装
        进程池/线程池的统一的使用方式
    创建线程池/进程池  ProcessPoolExecutor  ThreadPoolExecutor
    ret = t.submit(func,arg1,arg2....)  异步提交任务
    ret.result() 获取结果,如果要想实现异步效果,应该是使用列表
    map(func,iterable)
    shutdown close+join 同步控制的
    add_done_callback 回调函数,在回调函数内接收的参数是一个对象,需要通过result来获取返回值
        回调函数仍然在主进程中执行
    线程总结:
    线程
        守护线程
            在主线程结束(包括所有的子线程)之后守护线程才结束
        面向对象开启线程
            原本写在func中的代码挪到run方法中
        同步的机制
            锁
                GIL锁  是全局解释器锁 线程没有并行
                    是Cpython解释器的
                    在开发python解释器的时候可以减少很多细粒度的锁
                为什么有了GIL锁还会产生数据安全问题呢?
                    因为GIL锁得是线程 而不是具体的内存
                互斥锁 不能连续acquire两次
                递归锁 可以在用一个线程/进程中被acquire多次
                死锁 : 是一种现象,而不是一个工具
                    为什么产生死锁 : 代码的逻辑有问题
                    如何解决死锁 :
                        如果在服务阶段 -> 递归锁 ->排查逻辑 -> 互斥锁
                        如果在测试阶段 -> ->排查逻辑 -> 互斥锁
            信号量  锁+计数器
                和池的区别 : 信号量是有几个任务起几个进程/线程 池是固定的线程/进程数,不限量的任务
                现象 : 信号量慢 且耗资源 池快
            事件 Event wait set clear is_set
            条件 condition wait notify acquire release
            定时器 Timer
    join  同步控制 用来获取结果
    锁    数据安全
    池    提高效率,解决并发
  • 相关阅读:
    学习笔记:字符串-Hash
    模板:高精度
    关于我自己
    学习笔记:数学-GCD与LCM-素数筛法
    学习笔记:数学-GCD与LCM-唯一分解定理(质因数分解)
    学习笔记:数学-GCD与LCM-整除的基础概念
    题解 洛谷P1990 覆盖墙壁
    学习笔记:平衡树-splay
    npm发布myself的插件
    javascript API文档
  • 原文地址:https://www.cnblogs.com/ls13691357174/p/9392675.html
Copyright © 2011-2022 走看看