zoukankan      html  css  js  c++  java
  • python 36 锁、进程池、线程池

    1. 死锁与递归锁

    死锁:两个或者两个以上的进程或者线程在执行过程中,因争夺资源而造成的一种等待现象,称为死锁现象。

    递归锁可以解决死锁现象。

    递归锁有一个计数的功能,原数字为0,锁一次计数+1,释放一次,计数-1;只要数字不为0,其他线程就不能枪锁。

    from threading import RLock
    from threading import Thread
    import time
    
    lock_A = lock_B = RLock()		# 必须这样写
    
    class MyThread(Thread):
    
        def run(self):
            self.f1()
            self.f2()
    
        def f1(self):
            lock_A.acquire()
            print(f'{self.name}抢到了A锁!')
            lock_B.acquire()
            print(f'{self.name}抢到了B锁!')
    
            lock_B.release()
            lock_A.release()
    
        def f2(self):
            lock_B.acquire()
            print(f'{self.name}抢到了B锁!')
            time.sleep(0.1)
            lock_A.acquire()
            print(f'{self.name}抢到了A锁!')
    
            lock_A.release()
            lock_B.release()
    
    if __name__ == '__main__':
    
        for i in range(3):
            t = MyThread()
            t.start()
    

    2. 信号量Semaphor

    Semaphore管理一个内置的计数器,
    每当调用acquire()时内置计数器-1;调用release() 时内置计数器+1;
    计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其他线程调用release()。

    能够控制同一时刻的线程数。

    from threading import Thread, Semaphore, current_thread
    import time
    import random
    
    sem = Semaphore(5)		# 设置同一时刻只能有5个线程并发执行
    
    def task():
        sem.acquire()
        print(f'{current_thread().name}进程在运行')
        time.sleep(random.random())
        sem.release()
    
    if __name__ == '__main__':
        for i in range(30):
            t = Thread(target=task)
            t.start()
    
    

    3. GIL全局解释器锁:(Cpython)

    ​ Cpython规定,同一时刻只允许一个线程进入解释器,因为加了GIL。

    ​ 如果不加全局解释器锁,会出现死锁现象,开发此的程序员为了方便,在进入解释器时给线程加了一个全局解释器锁,这样保证了解释器内的数据安全

    优点: 保证了Cpython解释器的数据资源的安全;

    缺点: 单个进程的多线程不能利用多核cpu(缺陷)。

    Jython、pypy解释器没有GIL全局解释器锁。

    ​ 流程:当线程遇到IO阻塞,cpu就会无情的被操作系统切走,GIL全局解释器锁被释放,线程挂起,另一个线程进入,这样可以实现并发。

    总结

    单个进程的多线程可以并发执行,但是不能利用cpu多核并行执行;

    多个进程可以并发、并行执行。

    4. IO、计算密集型对比

    4.1 计算密集型:

    ​ 单个进程的多线程 VS 多个进程的并发、并行

    from threading import Thread
    from multiprocessing import Process
    import time
    
    def task():
        count = 0
        for i in range(10000000):
            count += 1
    
    # 多线程并发
    if __name__ == '__main__':
        start_time = time.time()
        l1 = []
        for i in range(4):
            p = Thread(target=task,)
            l1.append(p)
            p.start()
        for p in l1:
            p.join()
        print(f'执行效率:{time.time()- start_time}')  # 1.9125
    
        
    # 多进程并发、并行
    if __name__ == '__main__':
        start_time = time.time()
        l1 = []
        for i in range(4):
            p = Process(target=task,)
            l1.append(p)
            p.start()
        for p in l1:
            p.join()
        print(f'执行效率:{time.time()- start_time}') # 0.86031
    
    # 总结:
    计算密集型:多进程并发、并行效率高。
    

    4.2 IO密集型

    单个进程的多线程 VS 多个进程的并发并行

    from threading import Thread
    from multiprocessing import Process
    import time
    import random
    
    def task():
        count = 0
        time.sleep(random.randint(1,3))
        count += 1
    
    
    if __name__ == '__main__':
    
    # 多进程并发、并行
        start_time = time.time()
        l1 = []
        for i in range(50):
            p = Process(target=task,)
            l1.append(p)
            p.start()
        for p in l1:
            p.join()
        print(f'执行效率:{time.time()- start_time}')    # 4.52878
        
        
    # 多线程并发
        start_time = time.time()
        l1 = []
        for i in range(50):
            p = Thread(target=task, )
            l1.append(p)
            p.start()
        for p in l1:
            p.join()
        print(f'执行效率:{time.time() - start_time}')  # 3.00845
        
    # 总结:
    对于IO密集型:单个进程的多线程的并发效率高。
    

    5. GIL与Lock锁的区别

    相同点:都是互斥锁;

    GIL: 保护解释器内部的资源数据的安全,释放无需手动操作;

    Lock:自定义锁,保护进程中的数据资源安全。需要手动操作。

    6. 多线程实现socket通信

    服务端每连接到一个客户端时,都会开启一个线程进行通信。

    # server端
    
    from threading import Thread
    import socket
    
    def accept():
        server = socket.socket()
        server.bind(("127.0.0.1", 8888))
        server.listen(5)
    
        while 1:
            conn, addr = server.accept()
             # 连接一个开启一个线程
            t = Thread(target=communication, args=(conn, addr))	
            t.start()
    
    def communication(conn, addr):
    
        while 1:
            try:
                from_client_data = conn.recv(1024)
                print(f"来自客户端{addr}的消息:{from_client_data.decode('utf-8')}")
                to_client_data = input(">>>").strip().encode("utf-8")
                conn.send(to_client_data)
            except Exception:
                break
        conn.close()
    
    if __name__ == '__main__':
        accept()
    
    # client端
    
    import socket
    
    client = socket.socket()
    client.connect(('127.0.0.1', 8888))
    
    while 1:
        try:
            to_server_data = input(">>>").strip().encode("utf-8")
            client.send(to_server_data)
            from_server_data = client.recv(1024)
            print(f"来自客户端的消息:{from_server_data.decode('utf-8')}")
        except Exception:
            break
    client.close()
    

    7. 进程池、线程池

    以时间换取空间。

    ​ 概念:定义一个池子,在里面放上固定数量的进程(线程),有需求来了,就拿一个池中的进程(线程)来处理任务,等到处理完毕,进程(线程)并不关闭,而是将进程(线程)再放回池中继续等待任务。如果有很多任务需要执行,池中的进程(线程)数量不够,任务就要等待之前的进程(线程)执行任务完毕归来,拿到空闲的进程(线程)才能继续执行。也就是说,池中进程(线程)的数量是固定的,那么同一时间最多有固定数量的进程(线程)在运行。这样不会增加操作系统的调度难度,还节省了开闭进程(线程)的时间,也一定程度上能够实现并发效果。

    官网:https://docs.python.org/dev/library/concurrent.futures.html

    from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
    import os
    import time
    import random
    
    def task(i):
        print(f"第{i}:{os.getpid()}开始执行!")
        time.sleep(random.random())
    
    if __name__ == '__main__':
    
    # 线程池
        t = ThreadPoolExecutor()    # 开启线程池 不写默认最大处理20个线程
        for i in range(40):
            t.submit(task, i+1)		# 开启线程
            
    # 进程池        
        p = ProcessPoolExecutor()   # 开启进程池 不写默认处理cpu个数的进程
        for i in range(20):
            p.submit(task, i+1)
    

    示例:使用线程池实现socket通信

    # server端
    from concurrent.futures import ThreadPoolExecutor
    import socket
    
    
    def accept():
    
        server = socket.socket()
        server.bind(("127.0.0.1", 8888))
        server.listen(5)
        t = ThreadPoolExecutor(2)       # 设置最大2个线程的线程池
        while 1:
            conn, addr = server.accept()
            t.submit(communication, conn, addr)
    
    def communication(conn, addr):
    
        while 1:
            try:
                from_client_data = conn.recv(1024)
                print(f"来自客户端{addr}的消息:{from_client_data.decode('utf-8')}")
    
                to_client_data = input(">>>").strip().encode("utf-8")
                conn.send(to_client_data)
            except Exception:
                break
        conn.close()
    
    if __name__ == '__main__':
        accept()
    
    # client端
    import socket
    
    client = socket.socket()
    client.connect(('127.0.0.1', 8888))
    
    while 1:
        try:
            to_server_data = input(">>>").strip().encode("utf-8")
            client.send(to_server_data)
            from_server_data = client.recv(1024)
            print(f"来自客户端的消息:{from_server_data.decode('utf-8')}")
        except Exception:
            break
    client.close()
    
    
  • 相关阅读:
    鲜牛奶与纯牛奶的区别 All In One
    Rough Notation Animation All In One
    java基础编程String及相关
    JDBC1
    java基础数据类型
    java基础编程
    JDBC3
    MYSQL1
    JDBC2
    JSP和Servlet的相同点和不同点,他们之间的联系
  • 原文地址:https://www.cnblogs.com/yzm1017/p/11401462.html
Copyright © 2011-2022 走看看