zoukankan      html  css  js  c++  java
  • 进程与线程通用补充与协程

    进程与线程通用补充与协程

    1 进程池与线程池(重点)

    1.1 回顾TCP服务端实现并发

    每来一个人就开设一个进程或者线程去处理

    """
    无论是开设进程也好还是开设线程也好 都需要消耗资源
    只不过开设线程的消耗比开设进程的稍微小一点而已
    
    我们是不可能做到无限制的开设进程和线程的 因为计算机硬件的资源更不上!!!
    硬件的开发速度远远赶不上软件
    
    我们的宗旨应该是在保证计算机硬件能够正常工作的情况下最大限度的利用它
    """
    # 池的概念
    """
    什么是池?
    	池是用来保证计算机硬件安全的情况下最大限度的利用计算机
    	它降低了程序的运行效率但是保证了计算机硬件的安全 从而让你写的程序能够正常运行
    """
    

    1.2 基本使用

    from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
    import time
    import os
    
    
    # pool = ThreadPoolExecutor(5)  # 池子里面固定只有五个线程
    # 括号内可以传数字 不传的话默认会开设当前计算机cpu个数五倍的线程
    pool = ProcessPoolExecutor(5)
    # 括号内可以传数字 不传的话默认会开设当前计算机cpu个数进程
    """
    池子造出来之后 里面会固定存在五个线程
    这个五个线程不会出现重复创建和销毁的过程
    池子造出来之后 里面会固定的几个进程
    这个几个进程不会出现重复创建和销毁的过程
    
    池子的使用非常的简单
    你只需要将需要做的任务往池子中提交即可 自动会有人来服务你
    """
    
    
    def task(n):
        print(n,os.getpid())
        time.sleep(2)
        return n**n
    
    def call_back(n):
        print('call_back>>>:',n.result())
    """
    任务的提交方式
        同步:提交任务之后原地等待任务的返回结果 期间不做任何事
        异步:提交任务之后不等待任务的返回结果 执行继续往下执行
            返回结果如何获取???
            异步提交任务的返回结果 应该通过回调机制来获取
            回调机制
                就相当于给每个异步任务绑定了一个定时炸弹
                一旦该任务有结果立刻触发爆炸
    """
    if __name__ == '__main__':
        # pool.submit(task, 1)  # 朝池子中提交任务  异步提交
        # print('主')
        t_list = []
        for i in range(20):  # 朝池子中提交20个任务
            # res = pool.submit(task, i)  # <Future at 0x100f97b38 state=running>
            res = pool.submit(task, i).add_done_callback(call_back)
            # print(res.result())  # result方法   同步提交
            # t_list.append(res)
        # 等待线程池中所有的任务执行完毕之后再继续往下执行
        # pool.shutdown()  # 关闭线程池  等待线程池中所有的任务运行完毕
        # for t in t_list:
        #     print('>>>:',t.result())  # 肯定是有序的
    """
    程序有并发变成了串行
    任务的为什么打印的是None
    res.result() 拿到的就是异步提交的任务的返回结果
    """
    

    1.3 总结

    from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
    pool = ProcessPoolExecutor(5)
    pool.submit(task, i).add_done_callback(call_back)
    

    2 死锁与递归锁(了解)

    2.1 死锁

    虽然了解了锁的使用:抢锁必须要释放锁,但是其实在操作锁的时候也极其容易产生死锁现象

    (整个程序卡死 阻塞)

    如:每个线程都同时需要两个锁才能完全运行,但是出现了两个线程分别拿着其中一把锁,就产生了死锁现象

    from threading import Thread, Lock
    import time
    
    
    mutexA = Lock()
    mutexB = Lock()
    # 类只要加括号多次 产生的肯定是不同的对象
    # 如果想要实现多次加括号等到的是相同的对象 ---> 单例模式
    
    
    class MyThead(Thread):
        def run(self):
            self.func1()
            self.func2()
    
        def func1(self):
            mutexA.acquire()
            print('%s 抢到A锁'% self.name)  # 获取当前线程名
            mutexB.acquire()
            print('%s 抢到B锁'% self.name)
            mutexB.release()
            mutexA.release()
            
        def func2(self):
            mutexB.acquire()
            print('%s 抢到B锁'% self.name)
            time.sleep(2)
            mutexA.acquire()
            print('%s 抢到A锁'% self.name)  # 获取当前线程名
            mutexA.release()
            mutexB.release()
    
    
    if __name__ == '__main__':
        for i in range(10):
            t = MyThead()
            t.start()
    

    2.2 递归锁

    递归锁:锁上加锁,当所有锁全部解除才能被其他线程抢到

    from threading import RLock
    """
    递归锁的特点	
    	可以被连续的acquire和release
    	但是只能被第一个抢到这把锁执行上述操作
    	它的内部有一个计数器 每acquire一次计数加一 每realse一次计数减一
    	只要计数不为0 那么其他人都无法抢到该锁
    """
    # 将上述的
    mutexA = Lock()
    mutexB = Lock()
    # 类只要加括号多次 产生的肯定是不同的对象
    # 如果想要实现多次加括号等到的是相同的对象 ---> 单例模式
    
    # 换成
    mutexA = mutexB = RLock()
    

    3 信号量(了解)

    信号量在不同的阶段可能对应不同的技术点

    在并发编程中信号量指的是一种锁 !!!

    特点是这把锁可以分成多把锁

    """
    如果我们将互斥锁比喻成一个厕所的话
    那么信号量就相当于多个厕所
    """
    from threading import Thread, Semaphore
    import time
    import random
    
    
    
    sm = Semaphore(5)  # 括号内写数字 写几就表示开设几个坑位
    
    
    def task(name):
        sm.acquire()
        print('%s 正在蹲坑'% name)
        time.sleep(random.randint(1, 5))
        sm.release()
    
    
    if __name__ == '__main__':
        for i in range(20):
            t = Thread(target=task, args=('伞兵%s号'%i, ))
            t.start()
    # 第一批能进入5个人,后面每走一个人就能重新进一个人
    

    4 Event事件(了解)

    一些进程/线程需要等待另外一些进程/线程运行完毕之后才能运行,event可以实现类似于发射信号一样的效果,使执行到event.wait()的线程挂起,直到出现了event.set()才继续执行

    from threading import Thread, Event
    import time
    
    
    event = Event()  # 造了一个红绿灯
    
    
    def light():
        print('红灯亮着的')
        time.sleep(3)
        print('绿灯亮了')
        # 告诉等待红灯的人可以走了
        event.set()
    
    
    def car(name):
        print('%s 车正在灯红灯'%name)
        event.wait()  # 等待别人给你发信号
        print('%s 车加油门飙车走了'%name)
    
    
    if __name__ == '__main__':
        t = Thread(target=light)
        t.start()
    
        for i in range(20):
            t = Thread(target=car, args=('%s'%i, ))
            t.start()
    

    5 线程q(了解)

    """
    同一个进程下多个线程数据是共享的
    为什么先同一个进程下还会去使用队列呢
    因为队列是
        管道 + 锁
    所以用队列还是为了保证数据的安全
    """
    
    
    # 我们现在使用的队列都是只能在本地测试使用
    # 后期使用redis等封装完成的队列
    

    5.1 队列q 先进先出

    import queue
    
    q = queue.Queue(3)
    q.put(1)
    q.get()
    q.get_nowait()
    q.get(timeout=3)
    q.full()
    q.empty()
    
    

    5.2 后进先出q

    import queue
    q = queue.LifoQueue(3)  # last in first out
    q.put(1)
    q.put(2)
    q.put(3)
    print(q.get())  # 3
    

    5.3 优先级q

    可以给放入队列中的数据设置进出的优先级

    lowest first 数字越小优先级越高

    import queue
    q = queue.PriorityQueue(4)
    q.put((10, '111'))
    q.put((100, '222'))
    q.put((0, '333'))
    q.put((-5, '444'))
    print(q.get())  # (-5, '444')
    # put括号内放一个元祖  第一个放数字表示优先级
    # 需要注意的是 数字越小优先级越高!!!
    

    6 协程

    """
    进程:资源单位
    线程:执行单位
    协程:这个概念完全是程序员自己意淫出来的 根本不存在
    		单线程下实现并发
    		我们程序员自己再代码层面上检测我们所有的IO操作
    		一旦遇到IO了 我们在代码级别完成切换
    		这样给CPU的感觉是你这个程序一直在运行 没有IO
    		从而提升程序的运行效率
    	
    多道技术
    	切换+保存状态
    	CPU两种切换
    		1.程序遇到IO
    		2.程序长时间占用
    
    TCP服务端 
    	accept
    	recv
    	
    代码如何做到
    	切换+保存状态
    
    切换
    	切换不一定是提升效率 也有可能是降低效率
    	IO切			提升
    	没有IO切 降低
    		
    保存状态
    	保存上一次我执行的状态 下一次来接着上一次的操作继续往后执行
    	yield
    """
    

    6.1 验证切换是否一定提升效率

    6.1.1 计算密集型

    import time
    
    # 串行执行计算密集型的任务   1.2372429370880127
    def func1():
        for i in range(10000000):
            i + 1
    
    def func2():
        for i in range(10000000):
            i + 1
    
    start_time = time.time()
    func1()
    func2()
    print(time.time() - start_time)
    
    # 切换 + yield 方法 	2.1247239112854004
    import time
    
    def func1():
        while True:
            10000000 + 1
            yield
    
    
    def func2():
        g = func1()  # 先初始化出生成器
        for i in range(10000000):
            i + 1
            next(g)
    
    start_time = time.time()
    func2()
    print(time.time() - start_time)
    # 串行执行计算密集型的任务   原本时间1.2372429370880127
    # 切换 + yield 后时间 2.1247239112854004
    
    

    6.1.2 IO密集型

    (可能需要安装gevent模块,cmd输入下面代码安装)
    python -m pip --default-timeout=100 install gevent -i http://pypi.douban.com/simple/ --trusted-host pypi.douban.com
    
    from gevent import monkey
    monkey.patch_all()
    from gevent import spawn
    import time
    
    """
    gevent模块本身无法检测常见的一些io操作
    在使用的时候需要你额外的导入一句话
    from gevent import monkey
    monkey.patch_all()
    又由于上面的两句话在使用gevent模块的时候是肯定要导入的
    所以还支持简写
    from gevent import monkey;monkey.patch_all()
    """
    
    
    def heng():
        print('哼')
        time.sleep(2)
        print('哼')
    
    
    def ha():
        print('哈')
        time.sleep(3)
        print('哈')
    
    def heiheihei():
        print('heiheihei')
        time.sleep(5)
        print('heiheihei')
    
    
        
    start_time = time.time()
    heng()
    ha()
    heiheihei()
    print(time.time() - start_time)  # 10.007060527801514
    
        
    start_time = time.time()
    g1 = spawn(heng)
    g2 = spawn(ha)
    g3 = spawn(heiheihei)
    g1.join()
    g2.join()  # 等待被检测的任务执行完毕 再往后继续执行
    g3.join()
    
    print(time.time() - start_time)  # 5.005439043045044
    
    

    6.2 协程实现TCP服务端的并发

    # 服务端
    from gevent import monkey;monkey.patch_all()
    import socket
    from gevent import spawn
    
    
    def communication(conn):
        while True:
            try:
                data = conn.recv(1024)
                if len(data) == 0: break
                conn.send(data.upper())
            except ConnectionResetError as e:
                print(e)
                break
        conn.close()
    
    
    def server(ip, port):
        server = socket.socket()
        server.bind((ip, port))
        server.listen(5)
        while True:
            conn, addr = server.accept()
            spawn(communication, conn)
    
    
    if __name__ == '__main__':
        g1 = spawn(server, '127.0.0.1', 8080)
        g1.join()
    
        
    # 客户端
    from threading import Thread, current_thread
    import socket
    
    
    def x_client():
        client = socket.socket()
        client.connect(('127.0.0.1',8080))
        n = 0
        while True:
            msg = '%s say hello %s'%(current_thread().name,n)
            n += 1
            client.send(msg.encode('utf-8'))
            data = client.recv(1024)
            print(data.decode('utf-8'))
    
    
    if __name__ == '__main__':
        for i in range(500):
            t = Thread(target=x_client)
            t.start()
    

    6.3 总结

    """
    理想状态:
    	我们可以通过
    	多进程下面开设多线程
    	多线程下面再开设协程
    	从而使我们的程序执行效率提升
    """
    
  • 相关阅读:
    Linux文件查询笔记
    C语言学习和回顾
    hive的数据压缩
    进程线程那些事儿
    hive的数据存储格式
    hive的内置函数
    Hive自定义函数
    spark编译
    Impala的安装和使用
    数据库的读写分离
  • 原文地址:https://www.cnblogs.com/achai222/p/12790906.html
Copyright © 2011-2022 走看看