zoukankan      html  css  js  c++  java
  • python 进程,线程,协程

    内容概要

    • 死锁与递归锁
    • 信号量
    • Event事件
    • 线程q
    • 进程池与线程池
    • 协程
    • 协程实现TCP服务端的并发效果

    死锁与递归锁

    死锁: 两个进程相互抢到了对方的关键锁,导致双方都无法抢到该锁而卡死的现象
    递归锁:
    可以被连续的acquire和release
    但是只能被第一个抢到这把锁执行上述操作
    它的内部有一个计数器 每acquire一次计数加一 每realse一次计数减一
    只要计数不为0 那么其他人都无法抢到该锁
    用法: mutexA = mutexB = RLock()
    实例:

    from threading import Thread, Lock, RLock
    import time
    
    
    # mutexA = Lock()
    # mutexB = Lock()
    mutexA = mutexB = RLock()
    # 类只要加括号多次 产生的肯定是不同的对象
    # 如果你想要实现多次加括号等到的是相同的对象 单例模式
    
    
    class MyThead(Thread):
        def run(self):
            self.func1()
            self.func2()
    
        def func1(self):
            mutexA.acquire()
            print('%s 抢到A锁'% self.name)  # 获取当前线程名
            mutexB.acquire()
            print('%s 抢到B锁'% self.name)
            mutexB.release()
            mutexA.release()
    
        def func2(self):
            mutexB.acquire()
            print('%s 抢到B锁'% self.name)
            time.sleep(2)
            mutexA.acquire()
            print('%s 抢到A锁'% self.name)  # 获取当前线程名
            mutexA.release()
            mutexB.release()
    
    
    if __name__ == '__main__':
        for i in range(10):
            t = MyThead()
            t.start()
    

    信号量

    相当于一次产生多个锁,由多个人去抢
    使用方法:
    from threading import Thread, Semaphore
    sm=Semaphore(5) # 生成锁的数量
    实例:

    from threading import Thread, Semaphore
    import time
    import random
    
    sm = Semaphore(5)  # 括号内写数字 写几就表示开设几个坑位
    
    
    def task(name):
        sm.acquire()
        print('%s 正在蹲坑'% name)
        time.sleep(random.randint(1, 5))
        sm.release()
    
    
    if __name__ == '__main__':
        for i in range(20):
            t = Thread(target=task, args=('伞兵%s号'%i, ))
            t.start()
    

    Event事件

    一个进程/线程等待其他进程/线程,区别join,daemon
    方法:
    from threading import Thread, Event
    event = Event()
    event.set()
    event.wait()

    实例:

    from threading import Thread, Event
    import time
    
    
    event = Event()  # 造了一个红绿灯
    
    
    def light():
        print('红灯亮着的')
        time.sleep(3)
        print('绿灯亮了')
        # 告诉等待红灯的人可以走了
        event.set()
    
    
    def car(name):
        print('%s 车正在灯红灯'%name)
        event.wait()  # 等待别人给你发信号
        print('%s 车加油门飙车走了'%name)
    
    
    if __name__ == '__main__':
        t = Thread(target=light)
        t.start()
    
        for i in range(20):
            t = Thread(target=car, args=('%s'%i, ))
            t.start()
    

    线程q

    同一个进程下多个线程数据是共享的,为什么先同一个进程下还会去使用队列呢?
    因为队列是
    管道 + 锁
    所以用队列还是为了保证数据的安全
    后进先出:queue.LifoQueue(3)

    有优先级:queue.PriorityQueue(4)

    用法:

    import queue
    
    # 我们现在使用的队列都是只能在本地测试使用
    
    # 1 队列q  先进先出
    # q = queue.Queue(3)
    # q.put(1)
    # q.get()
    # q.get_nowait()
    # q.get(timeout=3)
    # q.full()
    # q.empty()
    
    
    # 后进先出q
    # q = queue.LifoQueue(3)  # last in first out
    # q.put(1)
    # q.put(2)
    # q.put(3)
    # print(q.get())  # 3
    
    # 优先级q   你可以给放入队列中的数据设置进出的优先级
    q = queue.PriorityQueue(4)
    q.put((10, '111'))
    q.put((100, '222'))
    q.put((0, '333'))
    q.put((-5, '444'))
    print(q.get())  # (-5, '444')
    # put括号内放一个元祖  第一个放数字表示优先级
    # 需要注意的是 数字越小优先级越高!!!
    

    进程池与线程池

    什么是池?
    池是用来保证计算机硬件安全的情况下最大限度的利用计算机
    它降低了程序的运行效率但是保证了计算机硬件的安全 从而让程序能够正常运行

    任务的提交方式
    同步:提交任务之后原地等待任务的返回结果 期间不做任何事
    异步:提交任务之后不等待任务的返回结果 执行继续往下执行
    返回结果如何获取???
    异步提交任务的返回结果 应该通过回调机制来获取
    回调机制
    就相当于给每个异步任务绑定了一个定时炸弹
    一旦该任务有结果立刻触发爆炸使用方法:
    from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
    pool = ProcessPoolExecutor(5)
    pool.submit(task, i).add_done_callback(call_back)
    pool.shutdown() # 关闭线程池 等待线程池中所有的任务运行完毕

    实例:

    from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
    import time
    import os
    
    
    # pool = ThreadPoolExecutor(5)  # 池子里面固定只有五个线程
    # 括号内可以传数字 不传的话默认会开设当前计算机cpu个数五倍的线程
    pool = ProcessPoolExecutor(5)
    # 括号内可以传数字 不传的话默认会开设当前计算机cpu个数进程
    """
    池子造出来之后 里面会固定存在五个线程
    这个五个线程不会出现重复创建和销毁的过程
    池子造出来之后 里面会固定的几个进程
    这个几个进程不会出现重复创建和销毁的过程
    """
    
    
    def task(n):
        print(n,os.getpid())
        time.sleep(2)
        return n**n
    
    def call_back(n):
        print('call_back>>>:',n.result())
    
    if __name__ == '__main__':
        # pool.submit(task, 1)  # 朝池子中提交任务  异步提交
        # print('主')
        # t_list = []
        for i in range(20):  # 朝池子中提交20个任务
            # res = pool.submit(task, i)  # <Future at 0x100f97b38 state=running>
            res = pool.submit(task, i).add_done_callback(call_back)
            # print(res.result())  # result方法   同步提交
            # t_list.append(res)
        # 等待线程池中所有的任务执行完毕之后再继续往下执行
        # pool.shutdown()  # 关闭线程池  等待线程池中所有的任务运行完毕
        # for t in t_list:
        #     print('>>>:',t.result())  # 肯定是有序的
    """
    程序有并发变成了串行
    任务的为什么打印的是None
    res.result() 拿到的就是异步提交的任务的返回结果
    """
    

    协程

    实现原理:调用gevent模块
    一旦遇到IO了 我们在代码级别完成切换
    这样给CPU的感觉是你这个程序一直在运行 没有IO
    从而提升程序的运行效率

    gevent模块本身无法检测常见的一些io操作
    在使用的时候需要你额外的导入一个模块
    from gevent import monkey
    monkey.patch_all()

    实例:

    from gevent import monkey;monkey.patch_all()
    ef heng():
        print('哼')
        time.sleep(2)
        print('哼')
    
    
    def ha():
        print('哈')
        time.sleep(3)
        print('哈')
    
    def heiheihei():
        print('heiheihei')
        time.sleep(5)
        print('heiheihei')
    
    
    start_time = time.time()
    g1 = spawn(heng)
    g2 = spawn(ha)
    g3 = spawn(heiheihei)
    g1.join()
    g2.join()  # 等待被检测的任务执行完毕 再往后继续执行
    g3.join()
    # heng()
    # ha()
    # print(time.time() - start_time)  # 5.005702018737793
    print(time.time() - start_time)  # 3.004199981689453   5.005439043045044
    

    协程实现TCP服务端的并发效果

    实例:

    # 服务端
    from gevent import monkey;monkey.patch_all()
    import socket
    from gevent import spawn
    
    
    def communication(conn):
        while True:
            try:
                data = conn.recv(1024)
                if len(data) == 0: break
                conn.send(data.upper())
            except ConnectionResetError as e:
                print(e)
                break
        conn.close()
    
    
    def server(ip, port):
        server = socket.socket()
        server.bind((ip, port))
        server.listen(5)
        while True:
            conn, addr = server.accept()
            spawn(communication, conn)
    
    
    if __name__ == '__main__':
        g1 = spawn(server, '127.0.0.1', 8080)
        g1.join()
    
        
    # 客户端
    from threading import Thread, current_thread
    import socket
    
    
    def x_client():
        client = socket.socket()
        client.connect(('127.0.0.1',8080))
        n = 0
        while True:
            msg = '%s say hello %s'%(current_thread().name,n)
            n += 1
            client.send(msg.encode('utf-8'))
            data = client.recv(1024)
            print(data.decode('utf-8'))
    
    
    if __name__ == '__main__':
        for i in range(500):
            t = Thread(target=x_client)
            t.start()
    
  • 相关阅读:
    Hadoop集群(三) Hbase搭建
    Hadoop集群(二) HDFS搭建
    Hadoop集群(一) Zookeeper搭建
    Redis Cluster 添加/删除 完整折腾步骤
    Redis Cluster在线迁移
    Hadoop分布式HA的安装部署
    Describe the difference between repeater, bridge and router.
    what is the “handover” and "soft handover" in mobile communication system?
    The main roles of LTE eNodeB.
    The architecture of LTE network.
  • 原文地址:https://www.cnblogs.com/Franciszw/p/12789222.html
Copyright © 2011-2022 走看看