zoukankan      html  css  js  c++  java
  • 并发函数--线程

    计算机的执行单位以线程为单位.计算机的最小可执行单位是线程.

    进程是资源分配的基本单位,线程是可执行的基本单位.是可被调度的基本单位.

    线程不可以自己独立拥有资源,线程的执行必须依赖于所属进程的资源.

    线程被称作轻量级的进程.线程的切换速度不进程块.

    进程中必须至少要有一个线程

    GIL:全局解释锁,用来锁解释器(只有CPython解释器才有),限制只能有一个线程来访问CPU.对于线程来说,因为有了GIL所以只有并发没有并行,CPU允许一个线程执行5毫秒

    线程又分为:

         用户级线程:对于程序员来说的.这样的线程完全被程序员控制执行调度

         内核级线程:对于计算机内核来说的,这样的线程完全被内核控制调度

    线程的模块:threading

    线程是由:代码块,数据段,TCB(线程控制块)组成.

    实现线程的两种方法:

    1.直接实现

    例:

    from threading import Thread

    def func(i):
    print(i+1)

    for i in range(100):
    t = Thread(target = func,args = (i,)) #开启100个线程
    t.start()

    2.继承Thread类

    例:

    from threading import Thread
    class Mythread(Thread): #继承Thread类来开启线程
    def __init__(self):
    super(Mythread, self).__init__()
    def run(self):
    print(1)
    t = Mythread()
    t.start()

    进程和线程的区别:

    1.CPU的切换进程要比CPU切换线程慢很多 所有在python中,如果I/O操作密集时,最好使用线程;计算密集情况下,最好使用进程

    2.在同一进程内,所有线程共享这个进程的pid,也就是说这个线程共享这个进程的所有资源和内存地址.

    3.在同一进程内,所有进程共享这个进程中的全局变量

    例: 

    from threading import Thread,Lock
    import time
    def func():
    global num
    l.acquire()
    tem = num
    time.sleep(0.01)
    num = tem - 1
    l.release()

    num = 100
    lis = []
    l = Lock()
    for i in range(100):
    t = Thread(target = func, )
    t.start()
    lis.append(t)
    [i.join() for i in lis]
    print(num)

    4.因为GIL锁的存在,在CPython中,没有真正的线程并行,但是有真正的多进程并行.

    5.守护线程是根据主线程执行结束才结束;守护进程是根据主进程代码执行结束而结束

    主线程会等待普通线程执行而结束

    守护线程会等待主线程结束而结束.一般把不重要的事情设置为守护线程

    线程受被强制放弃CPU的原因:1.线程会受到时间片影响

                   2.GIL会限制每个线程的执行时间一般都是5毫秒左右

                   3.限制执行固定的bytecode(字节码)

    线程的使用方法:

    1.锁

      Lock是互斥锁,一把钥匙配一把锁

      Rlock是递归锁,是一个无止尽的锁,但是所有锁都有一个公用的钥匙

            在同一个线程内,递归锁可以无止尽的acquire,但互斥锁不行

            在不同的线程内,递归锁是保证只能被一个线程拿到钥匙.其他线程等待

      GIL是全局解释器锁,锁的是线程,是解释器上的锁,锁的是线程.意思是在同一时间只允许一个线程访问CPU

    例:

    from threading import Thread,RLock,Lock

    def func1():
    r.acquire()
    print(456)
    r.acquire()
    print(123)
    r.release()
    r.release()

    def func2():
    r.acquire()
    print("ABC")
    r.acquire()
    print("abc")
    r.release()
    r.release()

    r = RLock() #实例化一个万能锁
    t1 = Thread(target = func1)
    t2 = Thread(target = func2)
    t1.start()
    t2.start()

    2.信号量

    例:

    from threading import Semaphore,Thread
    import time,random
    def func(sem,i):
    sem.acquire()
    print("33[32m 第%s个人进入 33[0m" % i)
    time.sleep(random.randint(1,3))
    print("33[35m 第%s个人出去了 33[31m" % i)
    sem.release()

    sem = Semaphore(5)
    for i in range(20):
    t = Thread(target = func,args = (sem,i))
    t.start()

    3.事件

    例:

    from threading import Thread,Event
    import time
    def Traffic_lights(e,):
    while 1:
    if e.is_set():
    time.sleep(5)
    print("33[35m 红灯亮了 33[0m")
    e.clear()
    else:
    time.sleep(5)
    print("33[36m 绿灯亮了 33[0m")
    e.set()

    def car(e,i):
    e.wait()
    print("第%s辆车过去了" % i)

    e = Event()
    t_l = Thread(target = Traffic_lights,args = (e,))
    t_l.start()
    for i in range(20):
    c = Thread(target = car,args = (e,i+1))
    c.start()

    4.条件

    条件是让程序员自行去调度线程的一个机制

    Condition的四个方法:acquire,release,wait,notify

    wait是指让线程阻塞住

    notify(n)是给wait发一个信号,让wait变不阻塞,n是指要给多少wait发信号

    例:

    from threading import Condition,Thread
    def func(con,i):
    con.acquire()       #主线程和100个子线程都在抢递归锁的一把钥匙
    con.wait() #阻塞
    con.release()
    print("第%s个线程开始执行了" % i)
    con = Condition()
    for i in range(10):
    t = Thread(target = func,args = (con,i))
    t.start()
    while 1:
    num = int(input(">>>"))
    con.acquire()
    con.notify(num) #发送一个标识让线程不阻塞num次
    con.release()
    结果:
    >>>5
    >>>第0个线程开始执行了
    第1个线程开始执行了
    第3个线程开始执行了
    第4个线程开始执行了
    第2个线程开始执行了
    4
    >>>第7个线程开始执行了
    第6个线程开始执行了
    第5个线程开始执行了
    第8个线程开始执行了
    2
    >>>第9个线程开始执行了

    注意:如果主线程执行顺序是拿到钥匙->input->notify发送信号->换钥匙.如果主线程执行特别快,接下来极有可能主线程又拿到钥匙.
    那么此时即使有十个子线程的wait接收到信号,也拿不到钥匙执行不了程序.

    5.定时器

    语法:Timer(time,func).star()

    time:睡眠时间

    func:要执行的函数名

    例:

    from threading import Timer
    def func():
    print("hello world")
    Timer(0.1, func).start()

    线程的队列:

    同一进程(多线程)的队列:不能做多进程的通信

    例:先进先出队列

    import queue
    q = queue.Queue()
    q.put("1")
    q.put("2")
    q.put("3")
    print(q.get())
    print(q.get())
    结果:
    1
    2

    例:后进先出队列

    import queue
    q = queue.LifoQueue()
    q.put(1)
    q.put(2)
    q.put(3)
    print(q.get())
    print(q.get())
    结果:
    3
    2

    例:优先级队列

    import queue
    q = queue.PriorityQueue()
    q.put((1,"a"))
    q.put((0,"b"))
    q.put((9,"c"))
    print(q.get())
    print(q.get())
    结果:
    (0, 'b')
    (1, 'a')

    例:

    import queue
    q = queue.Queue(3)
    q.put(1)
    q.put(2)
    q.put(3)
    # q.put_nowait("a")
    while 1:
    try:
    print(q.get_nowait())
    except:
    print("队列空了")
    break

    for i in range(0):
    print(i)

    优先级队列的put()接收的是元组(,):第一个元素是优先级,第二个位置是存入队列的数据

    首先保证整个队列中,所有优先级的类型一致

    优先级是数字(int)类型的直接比较大小,数字越小优先级越高

    优先级是字符串类型的按照ASCII码比较的,比较字符串的第一个位置的ASCII,如果字符串的ASCII码相同会按照先进先出原则

    线程池:并发

    定义:在一个池子里放固定数量的线程,这些线程处于等待任务状态,一旦有任务来,就有线程自动执行

    concurrent.futures这个模块是异步的调用机制

    导入进程池和线程池:from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor  

    ==> 提交任务都用submit,多个任务用for循环+submit   shutdown()==pool进程池中的close+join

    例:线程,进程和Pool进程异步的效率对比

    from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
    import time
    from multiprocessing import Pool
    def func(num):
    sum = 0
    for i in range(num):
    sum += i**2
    # print(sum)

    if __name__ == '__main__':
    t_start = time.time()
    t = ThreadPoolExecutor(20)
    for i in range(10000):
    t.submit(func,i)
    t.shutdown()
    t_time = time.time() - t_start

    p_start = time.time()
    p = ProcessPoolExecutor(5)
    for i in range(10000):
    p.submit(func,i)
    p.shutdown()
    p_time = time.time() - p_start

    po_start = time.time()
    po = Pool(5)
    for i in range(10000):
    po.apply_async(func,i)
    po.close()
    po.join()
    po_time = time.time() - po_start

    print("进程执行的时间:%s,Pool进程中异步执行的时间:%s,线程执行的时间:%s" % (p_time,po_time,t_time))

    针对计算密集时:

    不管是Pool进程池的异步还是ProcessPoolExecutor的进程池执行效率相当

    ThreadPoolExecutor的效率差很多,所以计算密集时使用进程池

    shutdown等效于Pool进程池中异步处理进程中的close+join 是指不允许向线程池中添加任务,然后让父进程(线程)等待池中所以进程(线程)执行任务

    提交多个任务:1.使用for循环+submit

          2.使用map

    例:for循环+submit方法拿到返回结果用result

    from concurrent.futures import ThreadPoolExecutor

    def func(i):
    return i+1

    t = ThreadPoolExecutor(20)
    lis = []
    for i in range(1000):
    res = t.submit(func,i)
    lis.append(res)
    t.shutdown()
    [print(i.result()) for i in lis]

    例:map方法拿到返回结果用__next__()   返回值是一个生成器可以直接用for循环

    from concurrent.futures import ThreadPoolExecutor

    def func(i):
    return i+1

    t = ThreadPoolExecutor(20)
    obj = t.map(func,[i for i in range(1000)])
    t.shutdown()

    while 1 :
    try :
    print(obj.__next__())
    except StopIteration:
    break

    回调函数:

    例:

    from concurrent.futures import ThreadPoolExecutor

    def func(num):
    sum = 0
    for i in range(num):
    sum += i**2
    return sum

    def call_back(ret):
    print(ret.result())

    t = ThreadPoolExecutor(20)

    for i in range(1000):
    t.submit(func,i).add_done_callback(call_back)
    t.shutdown()
  • 相关阅读:
    HTML
    MySQL 表操作
    MySQL 库操作
    MySQL
    python 客户端的安全性验证和服务端对客户端的多端连接
    python 黏包
    python 通信
    SpringData —— HelloWorld
    JPA
    Hibernate ——二级缓存
  • 原文地址:https://www.cnblogs.com/gxj742/p/9526664.html
Copyright © 2011-2022 走看看