zoukankan      html  css  js  c++  java
  • python-线程

    ★概念

    进程是由若干线程组成的,一个进程至少有一个线程;

    线程是CPU调度的最小单位;

    线程之间资源共享。

    ★全局解释器锁(GIL)

    cpython的特性

    同一时刻只能有一个线程访问CPU;

    锁的是线程

    在多线程环境中,Python 虚拟机按以下方式执行:

    1,设置GIL;  

    2,切换到一个线程去执行;  

    3,运行指定数量字节码指令或线程主动让出控制;  

    4,把线程设置为睡眠状态;  

    5,解锁 GIL;  

    6,再次重复以上所有步骤。      

    ★threading模块

    ◇线程的创建-threading.Thread

     1 import time
     2 import os
     3 from threading import Thread
     4 
     5 """多线程并发"""
     6 
     7 
     8 def func(n):
     9     time.sleep(1)
    10     print('in func%s' % n)
    11     print('子进程PID:%s' % os.getpid())
    12 
    13 
    14 for i in range(10):
    15     t = Thread(target=func, args=(i+1,))    # 并发10个线程
    16     t.start()
    17 
    18 print('主进程PID:%s' % os.getpid())
    多线程的创建.py
     1 import time
     2 from threading import Thread
     3 
     4 
     5 class MyThread(Thread):     # 继承线程类
     6     def __init__(self, arg):    # 初始化MyThread的参数
     7         super().__init__()
     8         self.arg = arg
     9 
    10     def run(self):      # 重写run方法
    11         time.sleep(1)
    12         print('in func%s' % self.arg)
    13 
    14 
    15 for i in range(10):
    16     t = MyThread(i+1)
    17     t.start()
    通过面向对象实现多线程.py
     1 import os
     2 from threading import Thread
     3 
     4 
     5 def func(a, b):
     6     global g    # 声明全局变量,主线程和子线程共享g
     7     c = a + b
     8     g = 0       # 这个g同时对主线程的g有作用
     9     print('in func的结果:%s' % c)
    10 
    11 
    12 g = 100
    13 t_lis = []
    14 for i in range(10):
    15     t = Thread(target=func, args=(i+1, 5))
    16     t.start()
    17     t_lis.append(t)
    18 
    19 for t in t_lis:
    20     t.join()
    21 
    22 print('主线程g的值:', g)        # g的值已被子线程修改
    23 print('主进程PID:%s' % os.getpid())
    多线程之间的数据共享.py

    守护线程

    守护线程会在主线程代码结束之后等待【其他子线程】的结束而结束

    与进程相同:

      △主线程会等待子线程结束才结束

      △主进程会等待子进程结束才结束

    与进程不同:  

      ▽守护线程在【其他线程结束】才结束  -- >这时候主线程已经结束

      ▽守护进程在【代码结束】而结束     --> 这时候主进程还没结束

     1 from threading import Thread
     2 import time
     3 
     4 
     5 def func1():
     6     while True:
     7         print('**********')
     8         time.sleep(1)
     9 
    10 
    11 def func2():
    12     time.sleep(10)
    13     print('in func2')
    14 
    15 
    16 t = Thread(target=func1,)
    17 t.daemon = True     # 设置守护线程, 会一直等所有子线程执行结束,守护线程才结束
    18 t.start()
    19 t1 = Thread(target=func2,)
    20 t1.start()
    21 # t1.join()
    22 print('in main thread')
    守护线程的简单例子.py

    ◇线程的锁-threading.Lock

    方法

    acquire()  上锁

    release()  解锁

    Lock()  互斥锁

    from threading import Lock
    
    lock = Lock()
    
    lock.acquire()
    # lock.release()    
    lock.acquire()
    print('ok')

    互斥锁(英语:Mutual exclusion,缩写 Mutex)是一种用于多线程编程中,防止两条线程同时对同一公共资源(比如全局变量)进行读写的机制。该目的通过将代码切片成一个一个的临界区域(critical section)达成。临界区域指的是一块对公共资源进行访问的代码,并非一种机制或是算法。一个程序、进程、线程可以拥有多个临界区域,但是并不一定会应用互斥锁。

      有两个互斥锁时,会有可能产生死锁问题

     1 from threading import Thread, Lock
     2 import time
     3 
     4 """互斥锁"""
     5 
     6 
     7 def func(lock):
     8     lock.acquire()  # 一个线程执行完了,下一个线程才能进去执行
     9     global n
    10     temp = n
    11     time.sleep(0.2)
    12     n = temp - 1
    13     lock.release()
    14 
    15 
    16 n = 10
    17 lock = Lock()
    18 t_lst = []
    19 for i in range(10):
    20     t = Thread(target=func, args=(lock,))
    21     t_lst.append(t)
    22     t.start()
    23 
    24 for t in t_lst:
    25     t.join()
    26 print(n)
    加锁保证多线程的数据安全.py

      ps:如果加锁,1,第一个线程执行,遇到time.sleep阻塞,时间片不切换,等待阻塞完成,执行完这个线程,2,(这时候第一个执行的线程执行完)CPU执行下一个线程,3,再下一个线程重复以上操作

      ps:如果没有锁,1,第一个线程执行,遇到time.sleep阻塞;2,时间片切换,CPU执行下一个线程(这时候第一个执行的线程还没有执行完),3,再下一个线程重复以上操作  -->大家共享阻塞的时间(相当于同时等待)

     1 from threading import Thread, Lock
     2 import time
     3 
     4 
     5 def eat1(name):
     6     noodle_lock.acquire()
     7     print('%s拿到了面条' % name)
     8     fork_lock.acquire()
     9     print('%s拿到了叉子' % name)
    10     print('%s 吃面了' % name)
    11     noodle_lock.release()
    12     fork_lock.release()
    13 
    14 
    15 def eat2(name):
    16     fork_lock.acquire()
    17     print('%s拿到了叉子' % name)
    18     time.sleep(0.1)     # 先阻塞一下(迫使时间片切换),让另一个线程去拿了面条的锁,然后就会产生死锁的问题
    19     noodle_lock.acquire()   # 其他线程拿了锁没有解锁,导致我这里无法拿到而一直等待,
    20     print('%s拿到了面条' % name)
    21     print('%s 吃面了' % name)
    22     noodle_lock.release()
    23     fork_lock.release()
    24 
    25 
    26 noodle_lock = Lock()
    27 fork_lock = Lock()
    28 Thread(target=eat1, args=('alex', )).start()
    29 Thread(target=eat2, args=('jin', )).start()
    30 Thread(target=eat1, args=('boss', )).start()
    31 Thread(target=eat2, args=('egg', )).start()
    死锁问题.py

    RLock()  递归锁

      1,在一个线程能有多个锁

      2,为了解决死锁问题

    1 from threading import RLock
    2 
    3 rlock = RLock()
    4 
    5 rlock.acquire()
    6 rlock.acquire()
    7 rlock.acquire()
    8 print('hello')
    简单递归锁.py
     1 from threading import Thread, RLock
     2 import time
     3 
     4 
     5 def eat1(name):
     6     noodle_lock.acquire()
     7     print('%s拿到了面条' % name)
     8     fork_lock.acquire()
     9     print('%s拿到了叉子' % name)
    10     print('%s 吃面了' % name)
    11     noodle_lock.release()
    12     fork_lock.release()
    13 
    14 
    15 def eat2(name):
    16     fork_lock.acquire()     # 上锁了
    17     print('%s拿到了叉子' % name)
    18     time.sleep(2)           # 阻塞,就算切换到其他线程也无法执行,因为是同一把锁,另外的线程外面也有锁
    19     noodle_lock.acquire()
    20     print('%s拿到了面条' % name)
    21     print('%s 吃面了' % name)
    22     noodle_lock.release()
    23     fork_lock.release()     # 已经完全解锁了,其他线程能去上锁了然后去执行
    24 
    25 
    26 noodle_lock = fork_lock = RLock()
    27 Thread(target=eat1, args=('alex', )).start()
    28 Thread(target=eat2, args=('jin', )).start()
    29 Thread(target=eat1, args=('boss', )).start()
    30 Thread(target=eat2, args=('egg', )).start()
    通过递归锁解决死锁问题.py

    ◇信号量-threading.Semaphore

    方法

    acquire()  上锁

    release()  解锁

    同一时间只有n个线程可以执行上锁代码

     1 from threading import Semaphore, Thread
     2 import time
     3 
     4 
     5 def func(n):
     6     sem.acquire()
     7     time.sleep(1)
     8     print(n)
     9     sem.release()
    10 
    11 
    12 sem = Semaphore(3)      # 设置一个信号量, 只能有3个线程可以并发执行
    13 for i in range(10):
    14     t = Thread(target=func, args=(i,))      # 在线程里sem不用传进来,子线程共享主线程的变量
    15     t.start()
    线程信号量的实例.py

    ◇事件-threading.Event

    状态

      False  (默认)  wait()阻塞

      True        wait()非阻塞

    修改状态

      set  设置为True

      clear  设置为False

    wait方法

      wait(1)  状态为False的时候,只等待1s就往下执行

     1 from threading import Event, Thread
     2 import time
     3 import random
     4 
     5 
     6 def connect_db():
     7     count = 0
     8     while count < 3:
     9         e.wait(1)   # 只阻塞1S,往下执行
    10         if e.is_set():
    11             print('连接数据库成功!!!')
    12             break
    13         else:
    14             count += 1
    15             print('第%s次连接失败' % count)
    16     else:
    17         raise TimeoutError('连接超时!!!')
    18 
    19 
    20 def check_web():
    21     time.sleep(random.randint(0, 3))    # 模拟网络延迟
    22     e.set()
    23 
    24 
    25 e = Event()
    26 t1 = Thread(target=connect_db)      # 直到事件状态被设置为True,这个线程才能连接上数据库
    27 t2 = Thread(target=check_web)      # 这个线程负责修改事件状态
    28 t1.start()
    29 t2.start()
    模拟数据库登录.py

    ◇条件-threading.Condition

    使得线程等待,只有满足某条件时,才释放n个线程

    方法

      acquire()  

      release()      

      notify() 造钥匙(一次性钥匙)    

      wait() 等钥匙  

     1 from threading import Condition, Thread
     2 
     3 
     4 def func(i):
     5     con.acquire()
     6     con.wait()          # 等造钥匙出来
     7     print('在%s循环里' % i)
     8     con.release()
     9 
    10 
    11 con = Condition()
    12 for i in range(10):
    13     t = Thread(target=func, args=(i, ))
    14     t.start()
    15 while True:
    16     num = int(input('>>>'))
    17     con.acquire()
    18     con.notify(num)     # 造钥匙(一次性钥匙)
    19     con.release()
    条件的简单例子.py

    ◇定时器-threading.Timer

    指定n秒后执行某个线程

     1 from threading import Timer
     2 import time
     3 
     4 
     5 def func():
     6     print('时间同步')
     7 
     8 
     9 while True:
    10     Timer(2, func).start()      # 实例化一个定时器
    11     time.sleep(2)               # 等待2S,再重新进入循环
    定时器的简单例子.py 

    ★队列【数据安全】-queue

    ◇先进先出-queue.Queue

    方法

    • put()  
    • get()
    • put_nowait() 满值不阻塞,但报错
    • get_nowait() 没值不阻塞,但报错
    1 q = queue.Queue()
    2 q.put('a')
    3 q.put('b')
    4 q.put('c')
    5 print(q.get())
    6 print('ttt:%s' % q.get_nowait())
    7 print('ttt:%s' % q.get_nowait())
    8 print('ttt:%s' % q.get_nowait())    # 队列已空,不阻塞,但报错
    先进先出队列.py

    ◇先进后出-queue.LifoQueue(栈)

    方法

    • put()
    • get()
    • put_nowait() 满值不阻塞,但报错
    • get_nowait() 没值不阻塞,但报错
    1 q = queue.LifoQueue()
    2 q.put('a')
    3 q.put('b')
    4 q.put('c')
    5 print(q.get())
    6 print('ttt:%s' % q.get_nowait())
    7 print('ttt:%s' % q.get_nowait())
    8 print('ttt:%s' % q.get_nowait())    # 队列已空,不阻塞,但报错
    先进后出队列.py

    ◇优先级队列-queue.PriorityQueue

    1, 数字越低,优先级越高;
    2, 数字相同,ASCII越小,优先级越高
    3, 返回的是一个元祖

    方法

    • put()
    • get()
    • put_nowait()  满值不阻塞,但报错
    • get_nowait()  没值不阻塞,但报错
    from queue import PriorityQueue
    
    q = PriorityQueue()
    
    q.put(30)
    q.put(40)
    q.put(2)
    q.put(2)
    q.put(3)
    q.put(1)
    
    print(q.get())  # 数字越小,优先级越高
    print(q.get())
    print(q.get())
    print(q.get())
    print(q.get())
    print(q.get_nowait())

    ★标准模块-concurrent.futures

    提供了高度封装的异步调用接口

    子模块

    ThreadPoolExecutor:线程池,提供异步调用

    ProcessPoolExecutor: 进程池,提供异步调用

    方法

    submit  异步提交任务

    map   for循环submit操作

    shutdown  相当于进程池pool.close() + pool.join()

    result  取得结果

    add_done_callbak  回调函数

    """
    map: 使用线程池快速实现并发
    """
    import time
    from concurrent.futures import ThreadPoolExecutor
    
    
    def fun(n):
        print('in fun %s' % n)
        time.sleep(2)
    
    
    if __name__ == '__main__':
        thread_pool = ThreadPoolExecutor(max_workers=5)  # 实例化一个线程池,可以同时并发5个线程
        thread_pool.map(fun, range(20))
    """
    获取线程池的返回值
    """
    import time
    from concurrent.futures import ThreadPoolExecutor
    
    
    def fun(n):
        print('in fun %s' % n)
        time.sleep(2)
        return n * n
    
    
    if __name__ == '__main__':
        thread_pool = ThreadPoolExecutor(max_workers=5)  # 实例化一个线程池,可以同时并发5个线程
        ret_list = []
        for n in range(20):
            ret = thread_pool.submit(fun, n)
            ret_list.append(ret)
        thread_pool.shutdown()
        print('主线程,打印一下所有线程的返回值', [ret.result() for ret in ret_list])
    """
    回调函数的例子
    """
    import time
    from concurrent.futures import ThreadPoolExecutor
    
    
    def fun(n):
        print('in fun %s' % n)
        time.sleep(0.3)
        return n * n
    
    
    def fun_callback(m):
        """
        回调函数
        :param m: 在此示例中为:fun函数的 `return 结果对象`
        :return:
        """
        print('回调函数参数:', m.result())
    
    
    if __name__ == '__main__':
        thread_pool = ThreadPoolExecutor(max_workers=3)  # 实例化一个线程池,可以同时并发5个线程
        for n in range(20):
            thread_pool.submit(fun, n).add_done_callback(fun_callback)
    进程的用法相同,只需要把子模块名字改一下即可
  • 相关阅读:
    一些经验
    倍增(在线)求LCA
    IDA*算法——骑士精神
    A*算法——第K短路
    (持续更新)一些黑科技和技巧
    逆元
    方便人类——信息学训练专用库
    PHP单点登陆
    PHP 中运用 elasticsearch
    PHP斐波那契数列
  • 原文地址:https://www.cnblogs.com/sunch/p/9576545.html
Copyright © 2011-2022 走看看