zoukankan      html  css  js  c++  java
  • python并发编程之多线程

    python并发编程之多线程

     

    1、线程理论知识

       概念:指的是一条流水线的工作过程的总称,是一个抽象的概念,是CPU基本执行单位。

      进程和线程之间的区别:

        1. 进程仅仅是一个资源单位,其中包含程序运行所需的资源,而线程就相当于车间的流水线,负责执行具代码。

        2. 每个进程至少包含一个线程,由操作系统自动创建,称之为主线程

        3. 每个进程可以有任意数量的线程

        4.创建进程的开销要比创建进程小得多

        5. 同一进程的线程间数据是共享的

        6.线程之间是平等的,没有子父级关系,同一进程下的各线程的PID相同

        7. 创建线程的代码可以写在任意位置,不一定非要在main函数下。

      为什么使用线程:

        提高程序执行效率

    2、开启线程的两种方式

      和进程类似,但是开启方式不一定非要建在main函数下。

    复制代码
    # 第一种方式,实例化 Thread
    # from threading import Thread
    #
    # def task():
    #     print("subthread is running....")
    #
    # t = Thread(target=task)
    # t.start()
    # print('main is over....')
    
    # 第二种方式,继承Thread类
    
    from threading import Thread
    
    class MyThread(Thread):
        def run(self):
            print("subthread is running....")
    复制代码

    3、主线程和子线程之间的关系

      1. 主线程任务执行完毕后,主线程会等待所有子线程全部执行完毕后结束

      2. 在同一进程中,所有线程都是平等的,没有子父级关系

    复制代码
    # 验证主线程代码执行完后会不会立即结束,
    import random
    import time
    import threading
    from threading import Thread
    def task(name):
        print("%s is running..." % name)
        time.sleep(random.randint(1, 3))
        print(threading.enumerate())
        print("%s is over....." % name)
    
    
    t = Thread(target=task, args=('aaa',))
    t.start()
    
    print('main over....')
    复制代码

    4、验证线程和进程之间的区别

    复制代码
    from threading import Thread
    import time
    
    def task():
        global num
        time.sleep(1)
        num -= 1
    num = 10
    t = Thread(target=task,)
    t.start()
    t.join()
    print(num)
    复制代码
     创建线程的开销要比创建进程小的多

    5、线程的安全问题

     1.互斥锁

      数据共享必然会造成竞争,竞争就会造成数据错乱问题。

      解决办法:和进程一样,加互斥锁。

    复制代码
    from threading import Thread, Lock
    import time
    
    num = 10
    
    def task(lock):
        global num
        lock.acquire()
        a = num
        time.sleep(0.5)
        num = a-1
        lock.release()
    
    ts = []
    lock = Lock()
    for i in range(10):
        t = Thread(target=task,args=(lock,))
        t.start()
        ts.append(t)
    for t in ts:
        t.join()
    print(num)
    复制代码

      2.死锁

      死锁不是一种锁,而是一种锁的状态,

      一般出现死锁的情况有两种:

        1. 对同一把锁多次acquire.(使用RLOCK锁,代替LOCK)

        2. 两个或两个以上的进程或线程在执行过程中,因争夺资源造成的相互等待现象。(解决办法:能不加最好不加,要加就只加一把)

    复制代码
    from threading import Thread, Lock
    import time
    
    def task1(name, locka, lockb):
        locka.acquire()
        print("%s拿到a锁"%name)
        time.sleep(0.3)
        lockb.acquire()
        print('%s拿到b锁'%name)
        lockb.release()
        locka.release()
    def task2(name, locka, lockb):
        lockb.acquire()
        print("%s拿到b锁"%name)
        time.sleep(0.3)
        locka.acquire()
        print('%s拿到a锁'%name)
        locka.release()
        lockb.release()
    
    locka = Lock()
    lockb = Lock()
    t1 = Thread(target=task1, args=('t1', locka, lockb))
    t2 = Thread(target=task2, args=('t2', locka, lockb))
    t1.start()
    t2.start()
    复制代码

      3.可重入锁

      只能解决同一线程多次执行acquire情况。

      只有一个线程所有的acquire都被释放,其他线程才能拿到这个锁。

      也会发生死锁现象。

    复制代码
    from threading import Thread, RLock
    
    lock = RLock()
    lock.acquire()
    lock.acquire()
    lock.acquire()
    lock.acquire()
    
    print("over")
    lock = RLock()
    
    def task1():
        lock.acquire()
        print('task1')
    def task2():
        lock.acquire()
        print('task2')
    
    
    Thread(target=task1).start()
    Thread(target=task2).start()
    复制代码

      4. 信号量

      也是一种锁,用来控制同一时间,有多少线程可以提供并发访问,不是用来处理线程安全问题

    复制代码
    from threading import Semaphore, Thread
    import time
    s_lock = Semaphore(3)
    
    
    def task():
        s_lock.acquire()
        time.sleep(1)
        print("run.....")
        s_lock.release()
    
    
    for i in range(20):
        t = Thread(target=task)
        t.start()
    复制代码

    6、守护线程

      守护线程在所有非守护线程结束后结束。

    复制代码
    import threading
    from threading import Thread
    import time
    
    def task1():
        print('thread-1 is running...')
        time.sleep(3)
        print('thread-1 over....')
    
    def task2():
        print('thread-2 is running...')
        time.sleep(1)
        print('thread-2 over....')
    
    if __name__ == '__main__':
        t1 = Thread(target=task1,)
        t2 = Thread(target=task2,)
        t1.setDaemon(True)
        t1.start()
        t2.start()
        print(t1.ident)
        print(threading.enumerate())
        print("main over...")
    复制代码

    7、GIL

      全局解释器锁,是一互斥锁,只有在Cpython解释器存在。

      为什么需要:因为一个python.exe进行运行只有一份解释器,如果这个进程开启的多个线程都要执行代码,多线程之间就要竞争解释器,一旦竞争就有可能出现问题。

      带来的好处:保证了多线程同时访问解释器时的数据安全问题。

      带来的问题:同一时间只有一个线程访问解释器,使得多线程无法真正的并发

      出现的原因:默认情况下,一个进程只有一个线程不会是不会出问题,但不要忘了还有GC线程,一旦出现多个线程就可能出现问题,所以当初就简单粗暴的加上了GIL锁

      GIL加锁和解锁时机:

        加锁:在调用解释器时立即加锁

        解锁:当前线程遇到IO时释放,或者当前线程执行超过设定值释放(py2计算的是执行代码的行数,py3中计算的是时间)

      解决办法:使用多进程或使用其他的python解释器

    8、线程池和进程池

      一种容器,本质十一存储线程或进程的列表

      为什么使用? 因为服务器不能无限开启线程或进程,所以需要对线程数量加以控制,线程池就是帮我们完成线程/进程的创建、销毁以及任务分配

      特点:

        线程池在创建时不会开启线程,

        等到任务提交时,如果没有空闲线程,并且已存在的线程数量小于最大值,开启新线程,

        线程开启后不会关闭,直到进程全部结束为止

      (线程池的建立也要建在main函数下

    复制代码
    from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
    pool= ProcessPoolExecutor(maxsize),创建进程池,maxsize为最大进程个数
    
    res = pool.submit(task, 'a'), 提交任务
    
    res.result(timeout),接收调用的返回值,timeout为超时时间,超时报错
    该函数是阻塞函数,会一直等待任务执行完毕
    pool.shutdown(wait),所有任务执行完毕,阻塞函数
    wait=True, 等待池内所有任务执行完毕后回收资源才继续
    wait=False,立即返回,并不会等待池内的任务执行完毕
    复制代码
    复制代码
    from concurrent.futures import ThreadPoolExecutor
    import time
    
    def task(num):
        time.sleep(0.5)
        print("%s is running....."%num)
        return num**2
    
    pool = ThreadPoolExecutor()
    ress = []
    for i in range(10):
        res = pool.submit(task, i)
        ress.append(res)
    
    pool.shutdown(wait=False)
    
    for i in ress:
         print(i.result())
    
    print('over')
    复制代码

     9、同步异步阻塞非阻塞

      阻塞和非阻塞都是指程序的运行状态

        阻塞:当程序执行遇到IO操作,无法继续执行代码

        非阻塞:程序执行没有遇到IO操作,或通过某种方式,使程序遇到了也不会停在原地,还可以继续执行

      同步异步指的是提交任务的方式

        同步:发起任务后必须原地等待任务执行完成,才可以继续执行

        异步:发起任务后不用等待任务执行,可以立即执行其他操作

        异步效率高于同步,发起异步任务方式:就是多线程和多进程

      同步和阻塞的不同:阻塞一定使CPU已经切换,同步虽然在等待,但CPU没有切走,还在当前进程中执行其他任务

    10、异步回调

       其实说的是回调函数,给异步任务绑定一个函数,当任务完成时会自动调用该函数。

      优点:不用原地等待,任务结果立即获取

      线程池或进程池内内的调用回调函数方法add_done_back(), 且回调函数必须有且只有一个参数,就是调用对象本身。

      线程池的回调函数是在子线程内执行,

      进程池的回调函数是在主进程下执行

    复制代码
    import requests
    from concurrent.futures import ThreadPoolExecutor
    import threading
    
    
    def product_data(url):
        data = requests.get(url)
        return data.text,url
    
    
    def parser_data(f):
        res = f.result()
        print(len(res[0]), res[1], "当前线程", threading.current_thread())
    
    
    
    
    if __name__ == '__main__':
        urls = ['http://www.baidu.com','https://www.cnblogs.com/ywsun/', 'https://www.processon.com/']
        pool = ThreadPoolExecutor()
        for url in urls:
            f = pool.submit(product_data, url)
            f.add_done_callback(parser_data)
    复制代码
    复制代码
    import requests
    from concurrent.futures import  ProcessPoolExecutor
    import os
    def product_data(url):
        data = requests.get(url)
        return data.text,url
    
    
    def parser_data(f):
        res = f.result()
        print(len(res[0]), res[1], ", callback pid", os.getpid() )
    
    
    
    
    if __name__ == '__main__':
        urls = ['http://www.baidu.com','https://www.cnblogs.com/ywsun/', 'https://www.processon.com/']
        pool = ProcessPoolExecutor()
        print('main process', os.getpid())
        for url in urls:
            f = pool.submit(product_data, url)
            f.add_done_callback(parser_data)
    复制代码

    11、线程队列

      queue 该模块下提供了一些常见的数据容器,仅仅是容器,没有数据共享特点

      Queue,先进先出

      LifoQueue,后进先出

      PriorityQueue,可设置优先级的队列。插入元组,第一个元素是优先级,可是数字、字母,对应的数值越小优先级越高

    复制代码
    import queue
    
    
    q=queue.PriorityQueue()
    #put进入一个元组,元组的第一个元素是优先级(通常是数字,也可以是非数字之间的比较),数字越小优先级越高
    q.put((2,'a'))
    q.put((1,'b'))
    q.put((3,'c'))
    
    
    print(q.get())
    print(q.get())
    print(q.get())
    
    
    # 如果是字符,按照ASCII表来排序
    q.put(('a', "sfsja"))
    q.put(('b', "sdfsdf"))
    q.put(('A', "sdfsdf"))
    q.put(('ae', "sdfsdf"))
    q.put(('ab', "sdfsdf"))
    print(q.get())
    print(q.get())
    print(q.get())
    print(q.get())
    print(q.get())
    复制代码

    12、事件

      用于线程间通讯,线程间本就是数据共享,也就是即使没有事件,也没有问题

      线程之间,执行流程是完全独立的,一些时候可能需要知道另一个进程发生了什么,然后采取一些行动。
      方法:
        event.isSet(),返回event的状态值;
        event.wait(), 如果event.isSet()==False将阻塞进程。
        event.set(), 设置event的状态Ture,将所有阻塞池的线程激活,进入就绪状态。等待操作系统调度。
        even.clear(), 恢复event的状态位False。
    复制代码
    from threading import Thread, Event
    import time
    import random
    boot= Event()
    
    
    def server():
        print('启动服务器。。。。')
        time.sleep(random.randint(1,3))
        print('服务器运行。。。。。')
        boot.set()
    
    def connect():
        print('开始尝试连接')
        boot.wait()
        print('连接成功')
    
    t1 = Thread(target=server)
    t1.start()
    
    t2 = Thread(target=connect)
    t2.start()
    复制代码
     
  • 相关阅读:
    unity UGUI实现类似NGUI切换Sprite的方式
    商业智能系统在税务行业的应用
    MSRDS机器人仿真软件学习资源汇总
    Emotiv脑电设备与RDS机器人仿真初步测试
    unity使用UGUI创建摇杆
    如何利用FineBI做财务分析
    Android4.2.2源码目录结构分析
    一个前端妹子的悲欢编程之路
    推荐一款优雅高效的免费在线APP原型工具
    数据分析概述和理论基础
  • 原文地址:https://www.cnblogs.com/xinxihua/p/12791748.html
Copyright © 2011-2022 走看看