zoukankan      html  css  js  c++  java
  • 多线程

    模块:threading

      Thread

    开启方式,join()方法、互斥锁、守护线程与多进程相同

    进程只是用来把资源集中到一起(进程只是一个资源单位,或者说资源集合),而线程才是cpu上的执行单位。

    与多进程的区别:

      1、开进程的开销远大于开线程

      2、同一进程内的多个线程共享该进程的地址空间

    # mutex
    # 互斥锁:牺牲效率,保证数据安全
    from threading import Thread, Lock
    import time
    
    n = 100
    
    
    def task():  # 多进程互斥锁必须传参?
        global n
        mutex.acquire()
        temp = n
        time.sleep(0.01)
        n = temp - 1
        mutex.release()
    
    
    if __name__ == '__main__':
        mutex = Lock()
        t_l = []
        for i in range(100):
            t = Thread(target=task)
            t_l.append(t)
            t.start()
    
        for t in t_l:
            t.join()
        # time.sleep(0.1)
        print('', n)
    View Code

    GIL全局解释器锁

    GIL是cpython解释器的特性,不是python的特性
    GIL保证同一时间只能有一个线程被执行
    当多线程的python代码执行时,所有线程都去抢GIL锁,谁抢到执行谁
      当该线程执行完或者该线程内遇到IO操作时,释放GIL,其他线程去抢GIL
      如果这多个线程操作同一个内存数据,就会导致错误
      所以要保证数据安全,就要在多线程任务中自定义一把互斥锁,
      即使第一次抢到GIL的线程遇到IO时,由于互斥锁未释放,其他线程也不会被执行
    # 计算密集型:用多进程
    # from multiprocessing import Process
    # from threading import Thread
    # import os,time
    # def work():
    #     res=0
    #     for i in range(100000000):
    #         res*=i
    #
    #
    # if __name__ == '__main__':
    #     l=[]
    #     # print(os.cpu_count()) #本机为8核
    #     start=time.time()
    #     for i in range(8):
    #         # p=Process(target=work) #耗时8s多
    #         p=Thread(target=work) #耗时37s多
    #         l.append(p)
    #         p.start()
    #     for p in l:
    #         p.join()
    #     stop=time.time()
    #     print('run time is %s' %(stop-start))
    
    
    # IO密集型:用多线程
    from multiprocessing import Process
    from threading import Thread
    import threading
    import os,time
    
    def work():
        time.sleep(2)
    
    if __name__ == '__main__':
        l=[]
        # print(os.cpu_count()) #本机为4核
        start=time.time()
        for i in range(400):
            # p=Process(target=work) #耗时2.697多,大部分时间耗费在创建进程上
            p=Thread(target=work) #耗时2.02多
            l.append(p)
            p.start()
        for p in l:
            p.join()
        stop=time.time()
        print('run time is %s' %(stop-start))
    View Code

    死锁与递归锁:

    互斥锁只能acquire一次,释放之后才能被再次获取
    递归锁:可以连续acquire多次,每acquire一次计数器+1,只有计数为0时,才能被抢到acquire
    from threading import Thread,Lock
    import time
    
    mutexA=Lock()
    mutexB=Lock()
    
    class MyThread(Thread):
        def run(self):
            self.f1()
            self.f2()
    
        def f1(self):
            mutexA.acquire()
            print('%s 拿到了A锁' %self.name)
    
            mutexB.acquire()
            print('%s 拿到了B锁' %self.name)
            mutexB.release()
    
            mutexA.release()
    
    
        def f2(self):
            mutexB.acquire()
            print('%s 拿到了B锁' % self.name)
            time.sleep(0.1)
    
            mutexA.acquire()
            print('%s 拿到了A锁' % self.name)
            mutexA.release()
    
            mutexB.release()
    
    if __name__ == '__main__':
        for i in range(10):
            t=MyThread()
            t.start()
    死锁
    from threading import Thread, RLock
    import time
    
    mutexB = mutexA = RLock()
    
    
    class MyThread(Thread):
        def run(self):
            self.f1()
            self.f2()
    
        def f1(self):
            mutexA.acquire()
            print('%s 拿到了A锁' % self.name)
    
            mutexB.acquire()
            print('%s 拿到了B锁' % self.name)
            mutexB.release()
    
            mutexA.release()
    
        def f2(self):
            mutexB.acquire()
            print('%s 拿到了B锁' % self.name)
            time.sleep(7)
    
            mutexA.acquire()
            print('%s 拿到了A锁' % self.name)
            mutexA.release()
    
            mutexB.release()
    
    
    if __name__ == '__main__':
        for i in range(10):
            t = MyThread()
            t.start()
    递归锁
    
    

    信号量

    也是锁,互斥锁可比喻为一件房子只有一把锁,一次只有一个人能进入
    信号量可比喻为一间房安装了多把锁,抢到任何一把锁都能进入
    from threading import Thread, Semaphore, currentThread
    import time, random
    
    sm = Semaphore(3)
    
    
    def task():
        # sm.acquire()
        # print('%s in' %currentThread().getName())
        # sm.release()
        with sm:
            print('%s in' % currentThread().getName())
            time.sleep(random.randint(1, 3))
    
    
    if __name__ == '__main__':
        for i in range(10):
            t = Thread(target=task)
            t.start()
    View Code

    Event事件

      Event对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在 初始情况下,Event对象中的信号标志被设置为假。如果有线程等待一个Event对象, 而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件, 继续执行

    from threading import Event
    
    event.isSet():返回event的状态值;
    
    event.wait():如果 event.isSet()==False将阻塞线程;
    
    event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;
    
    event.clear():恢复event的状态值为False。
    from threading import Thread, Event
    import time
    
    event = Event()
    # event.wait()
    # event.set()
    
    
    def student(name):
        print('学生%s 正在听课' % name)
        event.wait(3)  # 超时时间,到时间后自动往下执行
        print('学生%s 课间活动' % name)
    
    
    def teacher(name):
        print('老师%s 正在授课' % name)
        time.sleep(5)
        print('下课')
        event.set()
    
    
    if __name__ == '__main__':
        stu1 = Thread(target=student, args=('alex',))
        stu2 = Thread(target=student, args=('wxx',))
        stu3 = Thread(target=student, args=('yxx',))
        t1 = Thread(target=teacher, args=('egon',))
    
        stu1.start()
        stu2.start()
        stu3.start()
        t1.start()
    View Code

    定时器:指定n秒后执行某操作

    from threading import Timer
    
    
    def task(name):
        print('hello %s' % name)
    
    
    t = Timer(5, task, args=('egon',))
    t.start()
    View Code
    from threading import Timer
    import random
    
    
    class Code:
        def __init__(self):
            self.make_cache()
    
        def make_cache(self, interval=5):
            self.cache = self.make_code()
            print(self.cache)
            self.t = Timer(interval, self.make_cache)
            self.t.start()
    
        def make_code(self, n=4):
            res = ''
            for i in range(n):
                s1 = str(random.randint(0, 9))
                s2 = chr(random.randint(65, 90))
                res += random.choice([s1, s2])
            return res
    
        def check(self):
            while True:
                code = input('请输入你的验证码>>: ').strip()
                if code.upper() == self.cache:
                    print('验证码输入正确')
                    self.t.cancel()
                    break
    
    
    obj = Code()
    obj.check()
    View Code

    队列

    import queue
    
    q = queue.Queue(3)  # 先进先出->队列
    
    q.put('first')
    q.put(2)
    q.put('third')
    print(3)
    q.put(4)
    print(4)
    # q.put(4,block=False) #q.put_nowait(4)
    # q.put(4,block=True,timeout=3)
    #
    #
    # #
    # print(q.get())
    # print(q.get())
    # print(q.get())
    # # print(q.get(block=False)) #q.get_nowait()
    # # print(q.get_nowait())
    #
    # # print(q.get(block=True,timeout=3))
    
    
    # q=queue.LifoQueue(3) #后进先出->堆栈
    # q.put('first')
    # q.put(2)
    # q.put('third')
    #
    # print(q.get())
    # print(q.get())
    # print(q.get())
    
    #
    # q=queue.PriorityQueue(3) #优先级队列
    #
    # q.put((10,'one'))
    # q.put((40,'two'))
    # q.put((30,'three'))
    #
    # print(q.get())
    # print(q.get())
    # print(q.get())
    View Code

    进程池、线程池

    基本方法
    1、submit(fn, *args, **kwargs)
    异步提交任务
    
    2、map(func, *iterables, timeout=None, chunksize=1) 
    取代for循环submit的操作
    
    3、shutdown(wait=True) 
    相当于进程池的pool.close()+pool.join()操作
    wait=True,等待池内所有任务执行完毕回收完资源后才继续
    wait=False,立即返回,并不会等待池内的任务执行完毕
    但不管wait参数为何值,整个程序都会等到所有任务执行完毕
    submit和map必须在shutdown之前
    
    4、result(timeout=None)
    取得结果
    
    5、add_done_callback(fn)
    回调函数
    # from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
    # import os,time,random
    #
    # def task(name):
    #     print('name:%s pid:%s run' %(name,os.getpid()))
    #     time.sleep(random.randint(1,3))
    #
    #
    # if __name__ == '__main__':
    #     # pool=ProcessPoolExecutor(4)  # 不指定则默认使用cpu核数
    #     pool=ThreadPoolExecutor(5)
    #
    #     for i in range(10):
    #         pool.submit(task,'egon%s' %i)
    #
    #     pool.shutdown(wait=True)  # 相当于join()
    #
    #
    #     print('主')
    
    
    from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
    from threading import currentThread
    import os, time, random
    
    
    def task():
        print('name:%s pid:%s run' % (currentThread().getName(), os.getpid()))
        time.sleep(random.randint(1, 3))
    
    
    if __name__ == '__main__':
        pool = ThreadPoolExecutor(5)
    
        for i in range(10):
            pool.submit(task, )
    
        pool.shutdown(wait=True)
    
        print('')
    View Code

    提交任务的两种方式

    1、同步调用:提交完任务后,就在原地等待任务执行完毕,拿到结果,再执行下一行代码,导致程序是串行执行

      同步 != 阻塞

    from concurrent.futures import ThreadPoolExecutor
    import time
    import random
    
    def la(name):
        print('%s is laing' %name)
        time.sleep(random.randint(3,5))
        res=random.randint(7,13)*'#'
        return {'name':name,'res':res}
    
    def weigh(shit):
        name=shit['name']
        size=len(shit['res'])
        print('%s 拉了 《%s》kg' %(name,size))
    
    
    if __name__ == '__main__':
        pool=ThreadPoolExecutor(13)
    
        shit1=pool.submit(la,'alex').result()
        weigh(shit1)
    
        shit2=pool.submit(la,'wupeiqi').result()
        weigh(shit2)
    
        shit3=pool.submit(la,'yuanhao').result()
        weigh(shit3)
    View Code

    2、异步调用:提交完任务后,不地等待任务执行完毕,异步通常伴随着回调

      举个例子,某部门经理X手下原本只有一个员工A,X每天早上给A安排了任务(先干这再干那再再干那)后就回去喝茶了,后来老板又给X安排了两个员工B和C,以后X就要给三个人安排任务了,A去干这个,B去干那个,C再去干什么什么,X怎么知道他们三个有没有完成任务呢?他们主动给经理报告呗。

    from concurrent.futures import ThreadPoolExecutor
    import time
    import random
    
    
    def la(name):
        print('%s is laing' % name)
        time.sleep(random.randint(3, 5))
        res = random.randint(7, 13) * '#'
        return {'name': name, 'res': res}
    
    
    def weigh(shit):
        shit = shit.result()
        name = shit['name']
        size = len(shit['res'])
        print('%s 拉了 《%s》kg' % (name, size))
    
    
    if __name__ == '__main__':
        pool = ThreadPoolExecutor(13)
    
        pool.submit(la, 'alex').add_done_callback(weigh)
    
        pool.submit(la, 'wupeiqi').add_done_callback(weigh)
    
        pool.submit(la, 'yuanhao').add_done_callback(weigh)
    View Code
  • 相关阅读:
    设计模式 享元模式(池化技术)
    设计模式 混合模式(整体部分模式)
    设计模式 适配器模式
    Flex3示例、 安装 、注册码
    VS2010错误
    转载:glut.h 与 stdlib.h中 的exit()重定义问题的解决
    宿迁软件QQ群(109233721)
    百度地图 开发API接口啦
    Sublime Text 插件个人使用总结&推荐
    sublime text2 使用安装插件中文乱码问题解决
  • 原文地址:https://www.cnblogs.com/webc/p/9168480.html
Copyright © 2011-2022 走看看