zoukankan      html  css  js  c++  java
  • 线程池进程池

    验证GIL锁的存在方式

    from threading import Thread
    from multiprocessing import Process
    
    def task():
        while True:
            pass
    if __name__ == '__main__':
        for i in range(6):
    		# t=Thread(target=task)  # 因为有GIL锁,同一时刻,只有一条线程执行,所以cpu不会满
            t=Process(target=task)   # 由于是多进程,进程中的线程会被cpu调度执行,6个cpu全在工作,就会跑满
            t.start()
    

    GIL与普通互斥锁的区别

    GIL锁是不能保证数据的安全,普通互斥锁来保证数据安全
    from threading import Thread,Lock
    import time
    
    mutex = Lock()
    money = 100
    
    def task():
        global money
        mutex.acquire()
        temp = money
        time.sleep(1)
        money = temp-1
        mutex.release()
        
    
    if __name__ == '__main__':
        ll=[]
        for i in range(10):
            t = Thread(target=task)
            t.start()
            # t.join()  # 会怎么样?变成了串行,不能这么做
            ll.append(t)
        for t in ll:
            t.join()
        print(money)
     
    

    io密集型和计算密集型

    cpython解释器
    在单核情况下:
        开多线程还是多进程,不管干什么都是开多线程
    在多核情况下:
        如果是计算密集型,需要开进程,能被多个cpu调度执行
        如果是io密集型,需要开线程,cpu遇到io会切换到其他线程执行
    
    from threading import Thread
    from multiprocessing import Process
    import time
    
    计算密集型
    def task():
        count = 0
        for i in range(100000000):
            count += i
    
    
    if __name__ == '__main__':
        ctime = time.time()
        ll = []
        for i in range(10):
            t = Thread(target=task)  # 开线程:42.68658709526062
            # t = Process(target=task)   # 开进程:9.04949426651001
            t.start()
            ll.append(t)
    
        for t in ll:
            t.join()
        print(time.time()-ctime)
    
    # io密集型
    def task():
        time.sleep(2)
    
    
    if __name__ == '__main__':
        ctime = time.time()
        ll = []
        for i in range(400):
            t = Thread(target=task)  # 开线程:2.0559656620025635
            # t = Process(target=task)   # 开进程:9.506720781326294
            t.start()
            ll.append(t)
    
        for t in ll:
            t.join()
        print(time.time()-ctime)
    

    死锁现象(哲学家就餐问题)

    是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,他们都将无法推进下去.此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程,如下就是死锁:
        死锁现象,张三拿到了A锁,等B锁,李四拿到了B锁,等A锁
        
    from threading import Thread,Lock
    import time
    mutexA = Lock()
    mutexB = Lock()
    
    def eat_apple(name):
        mutexA.acquire()
        print('%s 获取到了a锁'%name)
        mutexB.acquire()
        print('%s 获取到了b锁' % name)
        print('开始吃苹果,并且吃完了')
        mutexB.release()
        print('%s 释放了b锁' % name)
        mutexA.release()
        print('%s 释放了a锁' % name)
        
    def eat_egg(name):
        mutexB.acquire()
        print('%s 获取到了b锁' % name)
        time.sleep(2)
        mutexA.acquire()
        print('%s 获取到了a锁' % name)
        print('开始吃鸡蛋,并且吃完了')
        mutexA.release()
        print('%s 释放了a锁' % name)
        mutexB.release()
        print('%s 释放了b锁' % name)
    
    
    if __name__ == '__main__':
        ll = ['egon', 'alex', '铁蛋']
        for name in ll:
            t1 = Thread(target=eat_apple, args=(name,))
            t2 = Thread(target=eat_egg, args=(name,))
            t1.start()
            t2.start()        
    
    递归锁(可重入),同一个人可以多次acquire,每acquire一次,内部计数器加1,每relaese一次
    内部计数器减1
    只有计数器不为0,其他人都不获得这把锁
    
    from threading import Thread,Lock,RLock
    import time
    # 同一把锁
    # mutexA = Lock()
    # mutexB = mutexA
    
    # 使用可重入锁解决(同一把锁)
    # mutexA = RLock()
    # mutexB = mutexA
    mutexA = mutexB =RLock()
    
    def eat_apple(name):
        mutexA.acquire()
        print('%s 获取到了a锁' % name)
        mutexB.acquire()
        print('%s 获取到了b锁' % name)
        print('开始吃苹果,并且吃完了')
        mutexB.release()
        print('%s 释放了b锁' % name)
        mutexA.release()
        print('%s 释放了a锁' % name)
    
    
    def eat_egg(name):
        mutexB.acquire()
        print('%s 获取到了b锁' % name)
        time.sleep(2)
        mutexA.acquire()
        print('%s 获取到了a锁' % name)
        print('开始吃鸡蛋,并且吃完了')
        mutexA.release()
        print('%s 释放了a锁' % name)
        mutexB.release()
        print('%s 释放了b锁' % name)
    
    
    if __name__ == '__main__':
        ll = ['egon', 'alex', '铁蛋']
        for name in ll:
            t1 = Thread(target=eat_apple, args=(name,))
            t2 = Thread(target=eat_egg, args=(name,))
            t1.start()
            t2.start()
    
    

    Semaphore信号量

    Semaphore:信号量可以立即为多把锁,同时允许多个线程来更改数据
     
    from threading import Thread,Semaphore
    import timme
    import random
    sm = Semaphore(3) # 数字表示可以同时有多少个线程操作
    
    def task(name):
        sm.acquire()
        print("%s 正在蹲坑"%name)
        time.sleep(random.randint(1,5))
        sm.release()
        
    

    Event事件

    一些线程需要等到其他线程执行完成之后才能执行,类似于发射信号
    比如一个线程等待另一个线程执行结束在继续执行
    一些线程需要等到其他线程执行完成之后才能执行,类似于发射信号
    比如一个线程等待另一个线程执行结束在继续执行
     from threading import Thread, Event
     import time
    
     event = Event()
    
    
     def girl(name):
         print('%s 现在不单身,正在谈恋爱'%name)
         time.sleep(10)
         print('%s 分手了,给屌丝男发了信号'%name)
         event.set()
    
    
     def boy(name):
         print('%s 在等着女孩分手'%name)
         event.wait()  # 只要没来信号,就卡在者
         print('女孩分手了,机会来了,冲啊')
    
    
     if __name__ == '__main__':
         lyf = Thread(target=girl, args=('刘亦菲',))
         lyf.start()
    
         for i in range(10):
             b = Thread(target=boy, args=('屌丝男%s号' % i,))
             b.start()
    

    起两个线程,第一个线程读文件的前半部分,读完发一个信号,另一个进程读后半部分,并打印

    from threading import Thread,Event
    import time
    import os
    
    event = Event()
    # 获取文件的总大小
    size = os.path.getsize('a.txt')
    
    
    def read_first():
        with open('a.txt','r',encoding='utf-8')as f:
            n = size // 2 # 取文件的一半,整除
            data = f.read(n)
            print(data)
            print('我一半读完了,发个信号')
            event.set()
    def read_last():
        event.wait()  # 等着发信号
        with open('a.txt', 'r', encoding='utf-8') as f:
            n = size // 2  # 取文件一半,整除
            # 光标从文件开头开始,移动了n个字节,移动到文件一半
            f.seek(n, 0)
            data = f.read()
            print(data)
    
    
    if __name__ == '__main__':
        t1=Thread(target=read_first)
        t1.start()
        t2=Thread(target=read_last)
        t2.start()     
    

    线程queue

    进程queue和线程不是一个
    
    from multiprocessing import	Queue
    
    线程queue
    from queue import Queue,LifoQueue,PriorityQueue
    
    # 线程间通信,因为共享变量会出现数据不安全问题,用线程queue通信,不需要加锁,内部自带
    # queue是线程安全的
    
    
    '''
    三种线程Queue
        -Queue:队列,先进先出
        -PriorityQueue:优先级队列,谁小谁先出
        -LifoQueue:栈,后进先出
    '''
    # 如何使用
    # q=Queue(5)
    # q.put("lqz")
    # q.put("egon")
    # q.put("铁蛋")
    # q.put("钢弹")
    # q.put("金蛋")
    #
    #
    # # q.put("银蛋")
    # # q.put_nowait("银蛋")
    # # 取值
    # print(q.get())
    # print(q.get())
    # print(q.get())
    # print(q.get())
    # print(q.get())
    # # 卡住
    # # print(q.get())
    # # q.get_nowait()
    # # 是否满,是否空
    # print(q.full())
    # print(q.empty())
    
    # LifoQueue
    
    # q=LifoQueue(5)
    # q.put("lqz")
    # q.put("egon")
    # q.put("铁蛋")
    # q.put("钢弹")
    # q.put("金蛋")
    # #
    # # q.put("ddd蛋")
    # print(q.get())
    
    
    #PriorityQueue:数字越小,级别越高
    
    # q=PriorityQueue(3)
    # q.put((-10,'金蛋'))
    # q.put((100,'银蛋'))
    # q.put((101,'铁蛋'))
    # # q.put((1010,'铁dd蛋'))  # 不能再放了
    #
    # print(q.get())
    # print(q.get())
    # print(q.get())
    
    
    

    线程池

    为什么会出现池:
        不管是开进程还是开线程,不能无限制开,通过池,假设池子里就有10个,不管再怎么开,永远就是这10个
     如何使用
    
    from concurrent.futures import ThreadPoolExecutor
    from threading import Thread
    import time,random
    
    pool = ThreadPoolExecutor(5) # 数字是池的大小
    
    def task(name):
    	print('%s任务开始'%name)
    	time.sleep(random.randint(1,4))
    	print('任务结束')
    	return '%s 返回了'%name
    
    
    def call_back(f):
    	print(f.result())
    
    
    if __name__ == '__main__':
        for i in range(10): # 起了10个线程
    	    #向线程池中提交了一个任务,等待任务执行完成,自动回到call_back函数执行
    	pool.submit(task,'屌丝男%s号'%i).add_done_callback(call_back)
    

    进程池

    from concurrent.futures import ProcessPoolExecutor
    from multiprocessing import Process
    import time, random
    
    pool = ProcessPoolExecutor(5)  # 数字是池的大小
    
    
    def task(name):
    	print('%s任务开始' % name)
    	time.sleep(random.randint(1, 4))
    	print('任务结束')
    	return '%s 返回了' % name
    
    
    def call_back(f):
    	print(f.result())
    
    
    if __name__ == '__main__':
    	for i in range(10):  # 起了10个线程
    		# 向进程池中提交了一个任务,等待任务执行完成,自动回到call_back函数执行
    		pool.submit(task, '屌丝男%s号' % i).add_done_callback(call_back)
    
    

    multiprocess.Pool模块

    1 numprocess:要创建的进程数,如果省略,将默认使用cpu_count()的值
    2 initializer:是每个工作进程启动时要执行的可调用对象,默认为None
    3 initargs:是要传给initializer的参数组
    
    主要方法
    1 p.apply(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。
    2 '''需要强调的是:此操作并不会在所有池工作进程中并执行func函数。如果要通过不同参数并发地执行func函数,必须从不同线程调用p.apply()函数或者使用p.apply_async()'''
    3 
    4 p.apply_async(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。
    5 '''此方法的结果是AsyncResult类的实例,callback是可调用对象,接收输入参数。当func的结果变为可用时,将理解传递给callback。callback禁止执行任何阻塞操作,否则将接收其他异步操作中的结果。'''
    6    
    7 p.close():关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成
    8 
    9 P.jion():等待所有工作进程退出。此方法只能在close()或teminate()之后调用
    
    其他方法
    1 方法apply_async()和map_async()的返回值是AsyncResul的实例obj。实例具有以下方法
    2 obj.get():返回结果,如果有必要则等待结果到达。timeout是可选的。如果在指定时间内还没有到达,将引发一场。如果远程操作中引发了异常,它将在调用此方法时再次被引发。
    3 obj.ready():如果调用完成,返回True
    4 obj.successful():如果调用完成且没有引发异常,返回True,如果在结果就绪之前调用此方法,引发异常
    5 obj.wait([timeout]):等待结果变为可用。
    6 obj.terminate():立即终止所有工作进程,同时不执行任何清理或结束任何挂起工作。如果p被垃圾回收,将自动调用此函数
    
  • 相关阅读:
    程序员偷偷深爱的9个不良编程习惯
    JQuery实现放大镜
    ACM1995
    liubo.im
    Linux中的一些点
    EPOLL使用详解
    Elays'Blog
    c#数据库解析
    codeforces #332 div 2 D. Spongebob and Squares
    类型
  • 原文地址:https://www.cnblogs.com/lgh8023/p/13567315.html
Copyright © 2011-2022 走看看