zoukankan      html  css  js  c++  java
  • 41、进程池与线程池

    一、死锁与递归锁

    1.1、死锁

      死锁:锁的使用,包括抢锁以及释放锁,当两个人分别各自抢到一把锁,有需要对方的锁时就会造成死锁,即程序阻塞

      Thread:操作线程模块

      Lock:互斥锁模块

      用户1抢到了A锁,接着抢到了B锁,释放了b锁,又释放了a锁,在抢了B锁,睡眠2s,

      用户2抢到了a锁,接着要去抢B锁,但是B锁在1手上,而1需要A锁,就会造成两个人都拿不到锁。

    1.2、递归锁

      需要将两个锁指向同一个目标,即同一把锁,就不会造成死锁现象

      内部有一个计数器,可以连续的加锁和释放锁:每次acquire就会加一,每次release就会减一,

      只要计数器不为零,其他人就抢不到锁

    from threading import Thread, Lock
    import time
    
    
    mutexA = Lock()
    mutexB = Lock()
    #mutexA = mutexB = RLock()   #递归锁,将两把锁指向同一把锁
    # 类只要加括号多次 产生的肯定是不同的对象
    # 如果你想要实现多次加括号等到的是相同的对象 单例模式
    
    
    class MyThead(Thread):
        def run(self):
            self.func1()
            self.func2()
    
        def func1(self):
            mutexA.acquire()
            print('%s 抢到A锁'% self.name)  # 获取当前线程名
            mutexB.acquire()
            print('%s 抢到B锁'% self.name)
            mutexB.release()
            mutexA.release()
            
        def func2(self):
            mutexB.acquire()
            print('%s 抢到B锁'% self.name)
            time.sleep(2)
            mutexA.acquire()
            print('%s 抢到A锁'% self.name)  # 获取当前线程名
            mutexA.release()
            mutexB.release()
    
    
    if __name__ == '__main__':
        for i in range(10):
            t = MyThead()
            t.start()

    二、信号量:Semaphore模块

      信号量在并发编程中指的是锁,即可以多个对象同时进行加锁

      使用sm = Semaphore(n),填写的数量为可以有几个对象加锁

    from threading import Thread, Semaphore
    import time
    import random
    
    
    """
    利用random模块实现打印随机验证码(搜狗的一道笔试题)
    """
    sm = Semaphore(5)  # 括号内写数字 写几就表示开设几个坑位
    
    
    def task(name):
        sm.acquire()
        print('%s 正在蹲坑'% name)
        time.sleep(random.randint(1, 5))
        sm.release()
    
    
    if __name__ == '__main__':
        for i in range(20):
            t = Thread(target=task, args=('伞兵%s号'%i, ))
            t.start()

    三、Event事件

      Event模块:一些线程需要等待另一个线程执行完了之后才能执行

      event = Event()

      event.set()通知event.wait()可以执行了

    from threading import Thread, Event
    import time
    
    
    event = Event()  # 造了一个红绿灯
    
    
    def light():
        print('红灯亮着的')
        time.sleep(3)
        print('绿灯亮了')
        # 告诉等待红灯的人可以走了
        event.set()
    
    
    def car(name):
        print('%s 车正在灯红灯'%name)
        event.wait()  # 等待别人给你发信号
        print('%s 车加油门飙车走了'%name)
    
    
    if __name__ == '__main__':
        t = Thread(target=light)
        t.start()
    
        for i in range(20):
            t = Thread(target=car, args=('%s'%i, ))
            t.start()

    四、线程q

    4.1、进程下为什么使用队列

      线程的资源是共享的,队列 = 管道 + 锁

      所以队列使用来保护数据的安全

    4.2、队列q,先进先出  Queue()

     q = queue.Queue(3)
     q.put(1)     加入
     q.get()    输出
     q.get_nowait()   
     q.get(timeout=3)   等待3秒后执行
     q.full()     判断队列中是否饱和
     q.empty()    判断队列中是否为空

    4.3、堆栈q,先进后出    LifoQueue()

    # q = queue.LifoQueue(3)  # last in first out
    # q.put(1)
    # q.put(2)
    # q.put(3)
    # print(q.get())  # 3

    4.4、优先级q    PriorityQueue(n)

      n代表可以入队的数量,入队时需要填元组,元组内第一个数字为优先级,第二个位置为内容,优先级的数字越低,级别越高

    q = queue.PriorityQueue(4)
    q.put((10, '111'))
    q.put((100, '222'))
    q.put((0, '333'))
    q.put((-5, '444'))
    print(q.get())  # (-5, '444')
    # put括号内放一个元祖  第一个放数字表示优先级
    # 需要注意的是 数字越小优先级越高!!!

    五、进程池与线程池

    5.1、池的作用

      池就是在计算机保证计算机安全的前提下,最大程度的使用计算机

      由于软件的发展速度远快于硬件,因此需要在通过限制程序的运行效率来保证计算机硬件的安全,使程序能够正常运行

    5.2、进程池 ProcesspoolExecutor(进程池执行者)

      需要从concurrent里面导出ProcesspoolExecotor模块,设置进程池 pool = processpoolexecotor(),不传参则默认当前计算机CPU数,在异步提交的情况下,需要有个反馈机制来提醒有返回值了,需要输入res= pool.submit(task.i),add_done_callback(callback),接着继续执行程序

    5.3、线程池 ThreadpoolExecutor(线程池执行者)

      需要从concurrent里面导出ThreadpoolExecotor模块,设置线程池 pool = ThreadpoolExecotor(),不传参则默认当前计算机CPU个数*5,在异步提交的情况下,需要有个反馈机制来提醒有返回值了,需要输入res = pool.submit(task,i).add_done_callback(callback)

    from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
    import time
    import os
    
    
    # pool = ThreadPoolExecutor(5)  # 池子里面固定只有五个线程
    # 括号内可以传数字 不传的话默认会开设当前计算机cpu个数五倍的线程
    pool = ProcessPoolExecutor(5)
    # 括号内可以传数字 不传的话默认会开设当前计算机cpu个数进程
    """
    池子造出来之后 里面会固定存在五个线程
    这个五个线程不会出现重复创建和销毁的过程
    池子造出来之后 里面会固定的几个进程
    这个几个进程不会出现重复创建和销毁的过程
    
    池子的使用非常的简单
    你只需要将需要做的任务往池子中提交即可 自动会有人来服务你
    """
    
    
    def task(n):
        print(n,os.getpid())
        time.sleep(2)
        return n**n
    
    def call_back(n):
        print('call_back>>>:',n.result())
    """
    任务的提交方式
        同步:提交任务之后原地等待任务的返回结果 期间不做任何事
        异步:提交任务之后不等待任务的返回结果 执行继续往下执行
            返回结果如何获取???
            异步提交任务的返回结果 应该通过回调机制来获取
            回调机制
                就相当于给每个异步任务绑定了一个定时炸弹
                一旦该任务有结果立刻触发爆炸
    """
    if __name__ == '__main__':
        # pool.submit(task, 1)  # 朝池子中提交任务  异步提交
        # print('主')
        t_list = []
        for i in range(20):  # 朝池子中提交20个任务
            # res = pool.submit(task, i)  # <Future at 0x100f97b38 state=running>
            res = pool.submit(task, i).add_done_callback(call_back)
            # print(res.result())  # result方法   同步提交
            # t_list.append(res)
        # 等待线程池中所有的任务执行完毕之后再继续往下执行
        # pool.shutdown()  # 关闭线程池  等待线程池中所有的任务运行完毕
        # for t in t_list:
        #     print('>>>:',t.result())  # 肯定是有序的
    """
    程序有并发变成了串行
    任务的为什么打印的是None
    res.result() 拿到的就是异步提交的任务的返回结果

    六、协程

      协程并不存在与计算机内,而是程序员总结出来的。

      通过编写程序,要求IO操作在代码级别就进行切换,是CPU认为一直在执行程序,从而提高效率

    6.1、协程的使用

      多道技术:保存和切换

        保存:保存上一步执行的状态,下一次接着执行之前的状态,使用yield

        切换:程序遇到IO以及程序长时间占用CPU

          切换不一定会提高效率,也有可能会降低效率:在没有等待时间的情况下,串行执行密集型的任务,频繁的切换就会降低效率,切换也是需要时间的

      TCP服务端:accept和recv

     import time
    
    # 串行执行计算密集型的任务   1.2372429370880127
     def func1():
         for i in range(10000000):
             i + 1
    
     def func2():
         for i in range(10000000):
             i + 1
    
     start_time = time.time()
     func1()
     func2()
     print(time.time() - start_time)
    
     #切换 + yield  2.1247239112854004
     import time
    
    
     def func1():
         while True:
             10000000 + 1
            yield
    
    
     def func2():
         g = func1()  # 先初始化出生成器
         for i in range(10000000):
             i + 1
             next(g)
    
     start_time = time.time()
     func2()
     print(time.time() - start_time)

    七、gevent模块

      gevent模块可以通过导入一句话,从而可以在程序中具备检测IO操作的能力,from gevent import monkey;monkey.path_all()

      通过g1 = pawn(ha)执行函数,g1.join()加入gevent模块中。

    from gevent import monkey;monkey.patch_all()
    """
    
    
    def heng():
        print('哼')
        time.sleep(2)
        print('哼')
    
    
    def ha():
        print('哈')
        time.sleep(3)
        print('哈')
    
    def heiheihei():
        print('heiheihei')
        time.sleep(5)
        print('heiheihei')
    
    
    start_time = time.time()
    g1 = spawn(heng)
    g2 = spawn(ha)
    g3 = spawn(heiheihei)
    g1.join()
    g2.join()  # 等待被检测的任务执行完毕 再往后继续执行
    g3.join()
    # heng()
    # ha()
    # print(time.time() - start_time)  # 5.005702018737793
    print(time.time() - start_time)  # 3.004199981689453 

    八、协程实现tcp服务端的并发

    from gevent import monkey;monkey.patch_all()
    import socket
    from gevent import spawn
    
    
    def communication(conn):
        while True:
            try:
                data = conn.recv(1024)
                if len(data) == 0: break
                conn.send(data.upper())
            except ConnectionResetError as e:
                print(e)
                break
        conn.close()
    
    
    def server(ip, port):
        server = socket.socket()
        server.bind((ip, port))
        server.listen(5)
        while True:
            conn, addr = server.accept()
            spawn(communication, conn)
    
    
    if __name__ == '__main__':
        g1 = spawn(server, '127.0.0.1', 8080)
        g1.join()
    
        
    # 客户端
    from threading import Thread, current_thread
    import socket
    
    
    def x_client():
        client = socket.socket()
        client.connect(('127.0.0.1',8080))
        n = 0
        while True:
            msg = '%s say hello %s'%(current_thread().name,n)
            n += 1
            client.send(msg.encode('utf-8'))
            data = client.recv(1024)
            print(data.decode('utf-8'))
    
    
    if __name__ == '__main__':
        for i in range(500):
            t = Thread(target=x_client)
            t.start()

    九、总结

      理想状态:

        多进程下面使用多线程

        多线程下面使用协程

        从而提高程序的执行效率

      

      

  • 相关阅读:
    子库存安全性控制
    检查装配件属性
    检查加工费是否有父件
    检查委外货位
    只允许操作外协任务
    检查是否存在工艺路线
    不能取组织ID
    加宽任务号宽度
    采购订单供应商地点必输
    只显示标准采购订单
  • 原文地址:https://www.cnblogs.com/jingpeng/p/12789137.html
Copyright © 2011-2022 走看看