zoukankan      html  css  js  c++  java
  • Python基础---线程

    线程概念

      之前提到的进程有很多优点,它提供了多道编程,让我们感觉我们每个人都拥有自己的CPU和其他资源,可以提高计算机的利用率。但进程还是有一些缺陷存在:

      进程只能在一个时间干一件事情,如果想同时干两个或多个任务,进程就达不到了.而且进程在执行的过程中如果发生阻塞,整个进程就会被挂起.

      在实际的操作系统中,引入了类似的机制,就是线程,  线程是进程的一部分,一个线程必须依赖进程存在,一个进程内至少有一条线程,进程中默认的线程是主线程,也可以在一个进程中开启多个子线程来完成更多的事情.

    进程和线程的关系

      1. 进程间相互独立, 数据是隔离的, 同一进程的各线程之间数据共享; 进程是资源分配的最小单位,线程是CPU调度的最小单位,是计算机中最小的执行单位,是真正负责执行代码的

      2. 进程之间可以实现通信IPC, 线程之间可以直接读写进程数据段来进行通信---> 需要进程同步和互斥来辅助

      3. 进程之间开销大, 线程之间开销小

      4. 多个进程之间的多个线程可以利用多核, 一个进程之间的多个线程是可以利用多核的

    python中的线程

      GIL全局解释器锁

      在CPython解释器中,同一个进程中的多个线程不能利用多核,GIL锁能够让同一个进程中的多个线程在同一时刻只有一个线程被CPU调用,有了GIL锁,只要有socket通信,在高I/O操作时的多线程仍然比多进程要好用很多

    python线程模块的选择:

      python中提供了几个用于多线程编程的模块,包括thread,threading和Queue等, thread模块提供了基本的线程和锁的支持, threading提供了更为高级,功能更强的线程管理的功能, Queue模块允许用户创建一个可以用于多个线程之间的数据共享的队列数据结构

    threading模块

    线程的创建:

    from threading import Thread
    import time
    
    def func(name):
        time.sleep(1)
        print('%s say hello'% name)
    
    if __name__ == '__main__':
        t = Thread(target=func,args=('小胖',))
        t.start()
        print('主线程')
    创建方式1
    from threading import Thread
    import time
    
    class Say(Thread):
        def __init__(self,name):
            super().__init__()
            self.name = name
        def run(self):                     # 必须是固定写法,不能改变
            time.sleep(2)
            print('%s say hello'%self.name)
    
    if __name__ == '__main__':
        t = Say('小胖')
        t.start()
        print('主线程')
    创建方式2

    多线程和多进程

    from threading import Thread
    from multiprocessing import Process
    import os
    
    def work():
        print('hello',os.getpid())
    
    if __name__ == '__main__':
               #part1:在主进程下开启多个线程,每个线程都跟主进程的pid一样
        t1=Thread(target=work)
        t2=Thread(target=work)
        t1.start()
        t2.start()
        print('主线程/主进程pid',os.getpid())
    
               #part2:开多个进程,每个进程都有不同的pid
        p1=Process(target=work)
        p2=Process(target=work)
        p1.start()
        p2.start()
        print('主线程/主进程pid',os.getpid())
    id的比较
    from multiprocessing import Process
    from threading import Thread
    import time
    
    def func(i):
        i += 1
    
    if __name__ == '__main__':
        start = time.time()
        for i in range(20):
            Thread(target=func, args=(i,)).start()       # 线程时间短,效率高
        print(time.time() - start)
        end = time.time()
        for i in range(20):
            Process(target=func, args=(i,)).start()      # 进程时间长,效率低
        print(time.time() - end)
    进程线程效率的对比
    from threading import Thread
    
    n = 100
    def func():
        global n
        n -= 1
    t_lst = []
    for i in range(100):
        t = Thread(target=func)
        t.start()
        t_lst.append(t)
    for t in t_lst:
        t.join()
    print(n)
    线程之间的数据共享

    多线程实现socket通信:

    from threading import Thread
    import socket
    
    s=socket.socket()
    s.bind(('127.0.0.1',8000))
    s.listen(5)
    
    def action(conn):
        while True:
            data=conn.recv(1024)
            conn.send(data.upper())
    
    if __name__ == '__main__':
        while True:
            conn,addr=s.accept()
            p=threading.Thread(target=action,args=(conn,))
            p.start()
    server端
    import socket
    
    client = socket.socket()
    client.connect(('127.0.0.1',8000))
    while 1:
        msg = input('>>>').strip()
        if not msg:
            continue
        client.send(msg.encode('utf-8'))
        data = client.recv(1024)
        print(data)
    client端

    查看线程名和线程id

    from threading import Thread
    import time
    
    def func():                    # 在函数外部
        print('in func')
    t = Thread(target=func)
    t.start()
    print(t.ident)       # 线程id
    print(t.name)      # 线程名
    
    from threading import currentThread
    
    def func():          # 函数内部
        print('in func',currentThread().name,currentThread().ident)
    t = Thread(target = func)
    t.start()
    函数内和函数外

    Thread类的其他方法

    Thread实例对象的方法
        isAlive(): 返回线程是否活动的。
        getName(): 返回线程名。
        setName(): 设置线程名。
    
    threading模块提供的一些方法:
      # threading.currentThread(): 返回当前的线程变量。
      # threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括
    启动前和终止后的线程。
    # threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果

    实例:

    from threading import currentThread,active_count,enumerate
    import time
    
    def func():
        time.sleep(1)         # 方便系统检测线程个数
        print('in func',currentThread().name,currentThread().ident)
    t = Thread(target=func)
    t.start()
    print(active_count())   # 当前所有活着的线程对象的个数
    print(enumerate())     # 当前所有活着的线程对象组成的列表 
    其他方法

    守护线程

    守护线程和守护进程的区别:

      守护进程会等待主进程的代码执行完毕而结束

      守护线程会等待主线程的结束而结束

      主进程必须最后结束,回收子线程的资源

      线程是属于进程的,主线程如果结束了,那整个进程就结束了

      主线程的结束依赖两件事: 自身的代码执行完毕,非守护的子线程执行完毕

    from threading import Thread
    import time
    
    def func():
        while 1:
            time.sleep(1)
            print('in func')
    def son():
        print('son start')
        time.sleep(1)
        print('son end')
    t = Thread(target=func)
    t.setDaemon(True)
    t.start()
    Thread(target=son).start()
    time.sleep(5)   #  守护线程会等待主线程的结束而结束,如果主线程还开启了其他的子线程,守护线程会守护到最后
    守护线程
    from threading import Thread
    import time
    def foo():
        print(123)
        time.sleep(1)
        print("end123")
    
    def bar():
        print(456)
        time.sleep(3)
        print("end456")
    
    t1=Thread(target=foo)
    t2=Thread(target=bar)
    
    t1.daemon=True          # 设置t1为守护线程
    t1.start()
    t2.start()
    print("main-------")
    守护线程2

    线程数据安全:

    1. GIL锁: 能保证在同一时刻不可能有两个线程同时执行CPU指令

    from threading import Lock 
    import dis
    lst = []
    def func1():
        lst.append(1)
    a = 0
    def func2():
        lock = Lock()
        global a
        with lock:           # 上锁,执行完一段代码之后再执行另外一段代码
            a += 1
    dis.dis(func2)

    2. 同步锁

    from threading import Thread
    import time
    def work():
        global n
        temp=n
        time.sleep(1)
        n=temp-1
    if __name__ == '__main__':
        n=100
        l=[]
        for i in range(100):
            p=Thread(target=work)
            l.append(p)
            p.start()
        for p in l:
            p.join()
    
        print(n)       #结果为99
    多个线程抢占资源
    from threading import Thread,Lock
    
    r = threading.Lock()
    r.acquire()
    ''' 对公共数据的操作 '''
    r.release()
    from threading import Thread,Lock
    import time
    def work():
        global n
        lock.acquire()
        temp=n
        time.sleep(0.1)
        n=temp-1
        lock.release()
    if __name__ == '__main__':
        lock=Lock()
        n=100
        l=[]
        for i in range(100):
            p=Thread(target=work)
            l.append(p)
            p.start()
        for p in l:
            p.join()
        print(n)                   # 结果为0, 由并发编程串行,保证了数据的安全
    同步锁

    互斥锁和join的区别:

    from threading import current_thread,Thread,Lock
    import time
    def task():
        global n
        print('%s is running' %current_thread().getName())
        temp=n
        time.sleep(0.5)
        n=temp-1
    
    if __name__ == '__main__':
        n=100
        lock=Lock()
        threads=[]
        start_time=time.time()
        for i in range(100):
            t=Thread(target=task)
            threads.append(t)
            t.start()
        for t in threads:
            t.join()
        stop_time=time.time()
        print('主:%s n:%s' %(stop_time-start_time,n))
    
    ''' 
    结果是:
    Thread-1 is running
    Thread-2 is running
    ......
    Thread-100 is running
    主:0.5220310688018799 n:99
    
    '''
    不加锁并发执行
    from threading import current_thread,Thread,Lock
    import time
    def task():
        #未加锁的代码并发运行
        time.sleep(3)
        print('%s start to run' %current_thread().getName())
        global n
        #加锁的代码串行运行
        lock.acquire()
        temp=n
        time.sleep(0.5)
        n=temp-1
        lock.release()
    
    if __name__ == '__main__':
        n=100
        lock=Lock()
        t_lst=[]
        start=time.time()
        for i in range(100):
            t=Thread(target=task)
            t_lst.append(t)
            t.start()
        for t in t_lst:
            t.join()
        stop=time.time()
        print('主:%s n:%s' %(stop-start,n))
    '''
    Thread-1 is running
    Thread-2 is running
    ......
    Thread-100 is running
    主:53.037208557128906 n:0
    '''
    加锁串行
    from threading import current_thread,Thread,Lock
    import time
    def task():
        time.sleep(3)
        print('%s start to run' %current_thread().getName())
        global n
        temp=n
        time.sleep(0.5)
        n=temp-1
    
    if __name__ == '__main__':
        n=100
        lock=Lock()
        start_time=time.time()
        for i in range(100):
            t=Thread(target=task)
            t.start()
            t.join()
        stop_time=time.time()
        print('主:%s n:%s' %(stop_time-start_time,n))
    
    '''
    Thread-1 start to run
    Thread-2 start to run
    ......
    Thread-100 start to run
    主:350.1151704788208 n:0
    '''
    不加锁用join方法

      在start之后立即join,也能让任务中的所有代码变为串行执行,而加锁只是在修改公共数据的部分是串行的,在数据安全上,两种方法都能实现,但很明显的是加锁的效率更高,join方法耗时很长.

    3. 死锁和递归锁

      死锁是由于多个锁对多个变量管理不当而产生的一种现象,是指两个或两个以上的进程或线程在执行过程中,因争夺系统资源而造成的一种相互等待的现象.

    from threading import Thread, Lock
    noodle_lock = Lock()
    fork_lock = Lock()
    def eat1(name):
        noodle_lock.acquire()
        print('%s 抢到了面条'%name)
        fork_lock.acquire()
        print('%s 抢到了叉子'%name)
        print('%s 吃到了面'%name)
        frok_lock.release()
        noodle_lock.release()
    def eat2(name):
        fork_lock.acquire()
        print('%s 抢到了叉子'%name)
        noodle_lock.acquire()
        print('%s 抢到了面条'%name)
        print('%s 吃到了面'%name)
        noodle_lock.release()
        fork_lock.release()
    t1 = Thread(target=eat1, args=(name,)).start()
    t2 = Thread(target=eat2, args=(name,)).start()
    ''' 会发生死锁现象 '''
    死锁现象

      互斥锁Lock,用互斥锁可以解决死锁现象,将两个资源都锁起来,

    from threading import Thread, Lock
    lock = Lock()
    def eat1(name):
        lock.acquire()
        print('%s 抢到了面条'%name)
        print('%s 抢到了叉子'%name)
        print('%s 吃到了面'%name)
        lock.release()
    def eat2(name):
        lock.acquire()
        print('%s 抢到了叉子'%name)
        print('%s 抢到了面条'%name)
        print('%s 吃到了面'%name)
        lock.release()
    Thread(target=eat1, args=('小一',)).start()
    Thread(target=eat2, args=('小二',)).start()
    互斥锁解决死锁

      递归锁Rlock,同一个线程可以被acquire多次,必须acquire几次就release几次,才能保证其他的线程可以访问被锁住的代码,而且可以快速的解决死锁现象

    from threading import Thread, RLock
    noodle_lock = fork_lock = RLock()
    def eat1(name):
        noodle_lock.acquire()
        print('%s 抢到了面条'%name)
        fork_lock.acquire()
        print('%s 抢到了叉子'%name)
        frok_lock.release()
        noodle_lock.release()
    def eat2(name):
        fork_lock.acquire()
        print('%s 抢到了叉子'%name)
        noodle_lock.acquire()
        print('%s 抢到了面条'%name)
        noodle_lock.release()
        fork_lock.release()
    Thread(target=eat1, args=('小一',)).start()
    Thread(target=eat2, args=('小二',)).start()
    递归锁解决死锁现象

    线程队列

      Queue队列, 使用import queue,用法和进程中的Queue一样

    FIFO(先进先出队列)

    import queue
    
    q=queue.Queue()
    q.put('first')
    q.put('second')
    q.put('third')
    
    print(q.get())
    print(q.get())
    print(q.get())
    '''
    (先进先出):
    first
    second
    third
    '''
    先进先出

    LifoQueue(后进先出队列),也称为栈

    import queue
    
    q=queue.LifoQueue()
    q.put('first')
    q.put('second')
    q.put('third')
    
    print(q.get())
    print(q.get())
    print(q.get())
    '''
    (后进先出):
    third
    second
    first
    '''
    后进先出

    优先级队列:

      queue.PriorityQueue(),数字越小优先级越高,且优先出队列,放进队列的是元组,根据第一个参数数字的大小,如果是同等优先的,就会根据第二个参数在ASCII码位表中的顺序来决定

    from queue import PriorityQueue
    pq = PriorityQueue()
    pq.put((3, '小三'))
    pq.put((2, '阿二'))
    pq.put((1, '大大'))
    print(pq.get())
    print(pq.get())
    print(pq.get())
    优先级队列

    concurrent模块

      concurrent.futures 提供了一个高度封装的异步调用接口

      ThreadPoolExecutor: 线程池, 提供异步调用

      ProcessPoolExecutor: 进程池, 提供异步调用

    基本方法:

      submit  --->  相当于进程中的apply_async方法

      shutdown  --->  相当于进程池中的 close + join

      result   --->   相当于 get

      add_done_callback  --->    相当于 callback

    from concurrent.futures import ThreadPoolExecutor
    import time
    def func(i):
        time.sleep(1)
        print('in son thread')
    if __name__ == '__main__':
        tp = ThreadPoolExecutor()
        for i in range(20):
            tp.submit(func,i)          # 在这儿可以直接传参
        tp.shutdown()
        print('所有任务执行完毕')    # 最后执行shutdown 
    线程池

    使用Executor达到无缝衔接

    from concurrent.futures import ThreadPoolExecutor as Executor, ProcessPoolExecutor as Executor
    import time
    
    def func(i):
        time.sleep(1)
        print('in son thread')
    if __name__ == '__main__':
        tp = Executor()                 #  可以通过改变上面的模块导入来切换进程池和线程池
        for i in range(20):
            tp.submit(func,i)           # 可以直接进行传参
    Executor无缝衔接

    map方法

    from concurrent.futures import ThreadPoolExecutor
    import time
    def func(i):
        time.sleep(1)
        print('in son thread%s' % i)
    if __name__ == '__main__'
        tp = ThreadPoolExecutor()
        tp.map(func, range(20))
    '''
    in son thread 0
    in son thread 1 
    ......
    in son thread 19
    '''
    map
    from concurrent.futures import ThreadPoolExecutor
    def func(i):
        return '*' * i
    if __name__ == '__main__':
        tp = ThreadPoolExecutor()
        ret_lst = tp.map(func, range(20))
        for ret in ret_lst:
            print(ret)
        
    # 用map方法就省去下面的这些步骤
        ret_l = []
        for i in range(20):
            ret = tp.submit(func,i)
            ret_l.append(ret)
        for ret in ret_l:
            print(ret.result())
    map方法获取返回值

    add_done_callback,回调函数

    from concurrent.futures import ThreadPoolExecutor
    def back(ret):
        print(ret)  
        print(ret.result())     # 拿到具体的结果
    def son_func(i):
        return i**i
    tp = ThreadPoolExecutor()
    for i in range(20):
        ret = tp.submit(son_func, i)
    ret.add_done_callback(back)
    '''
    <Future at 0x1ba6881da58 state=finished returned int>
    361
    '''
    回调函数
  • 相关阅读:
    实用的SpringBoot生成License方案
    实用的jar包加密方案
    整合Atomikos、Quartz、Postgresql的踩坑日记
    CentOS7使用NTP搭建时间同步服务器
    初探Mysql架构和InnoDB存储引擎
    postgresql常用命令
    闲聊CAP、BASE与XA
    还原面试现场-ACID与隔离级别
    图片拖动并交换图片-使用观察者模式
    图片拖动并交换图片
  • 原文地址:https://www.cnblogs.com/py8318/p/10549625.html
Copyright © 2011-2022 走看看