zoukankan      html  css  js  c++  java
  • Python多线程操作。(threading)

    老样子,先上参考连接:

    https://www.cnblogs.com/jiangfan95/p/11439543.html

    https://www.liaoxuefeng.com/wiki/1016959663602400/1017629247922688

    https://blog.csdn.net/mr__l1u/article/details/81772073

    先复制一篇进程与线程的特点。

    1> 进程、线程和协程的认识:

      进程是系统进行资源分配和调度的独立单位;

      线程是进程的实体,是CPU调度和分派的基本单位;

      协程也是线程,称微线程,自带CPU上下文,是比线程更小的执行单元;

    2> 区别

      一个程序至少有一个进程,一个进程至少有一个线程;

      线程的划分尺度小于进程(资源比进程少),使得多线程程序的并发性高;

      进程在执行过程中拥有独立的内存单元,而多个线程共享内存,从而极大地提高了程序的运行效率;

      线程不能够独立执行,必须依存在进程中;

    3> 优缺点:线程和进程在使用上各有优缺点:线程执行开销小,但不利于资源的管理和保护;而进程正相反

    4> 协程:我们假设把一个进程比作我们实际生活中的一个兰州拉面馆,那么负责保持拉面馆运行的服务员就是线程了,每个餐桌代表要完成的任务。

      当我们用多线程完成任务时,模式是这样的==》每来一桌的客人,就在那张桌子上安排一个服务员,即有多少桌客人就得对应多少个服务员;

      而当我们用协程来完成任务时,模式却有所不同了==》 就安排一个服务员,来吃饭得有一个点餐和等菜的过程,当A在点菜,就去B服务,B叫了菜在等待,我就去C,当C也在等菜并且A点菜点完了,赶紧到A来服务… …依次类推。

      从上面的例子可以看出,想要使用协程,那么我们的任务必须有等待。当我们要完成的任务有耗时任务,属于IO密集型任务时,我们使用协程来执行任务会节省很多的资源(一个服务员和多个服务员的区别。##能一个人服务一个店铺,这样的超人给我来一打 (-..-)), 并且可以极大的利用到系统的资源。

    上面的介绍,我觉的还是比较不错的,第三个链接可以点进去看原文。

    上代码,两种不同的多线程开启模式:

    import threading
    import time
    import logging
    import random
    
    
    
    class MyThreading(threading.Thread):  # 通过继承的方式
        def __init__(self, num):
            super(MyThreading, self).__init__()
            self.num = num
    
        def run(self) -> None:
            time.sleep(random.random())
            logging.debug(str(self.__class__.__name__) + '======>' + 'Work: %s' % self.num)
    
    
    def worker(num):
        """thread worker function"""
        time.sleep(random.random())
        # 通过threading.current_thread().getName()获取线程的名字
        logging.debug(threading.current_thread().getName() + ':' + 'Worker: %s' % num)
    
    
    #通过logging来打印显示具体信息
    logging.basicConfig(level=logging.DEBUG, format="[%(levelname)s] (%(threadName)-10s) %(message)s")
    
    threads = []
    
    for i in range(5):
        my = MyThreading(i)
        my.start()
        t = threading.Thread(target=worker, args=(i,), name='sidian')   # 通过直接调用使用多线程
        t.start()
        threads.append(t)
        threads.append(my)
    
    for i in threads:            # 加了阻塞让新建的多线程执行。
        i.join()
    
    
    logging.info('__main__:' + threading.current_thread().getName())
    
    /usr/local/bin/python3.7 /Users/shijianzhong/study/t_threading/t1.py
    [DEBUG] (Thread-5  ) MyThreading======>Work: 4
    [DEBUG] (sidian    ) sidian:Worker: 0
    [DEBUG] (Thread-1  ) MyThreading======>Work: 0
    [DEBUG] (Thread-4  ) MyThreading======>Work: 3
    [DEBUG] (sidian    ) sidian:Worker: 4
    [DEBUG] (sidian    ) sidian:Worker: 2
    [DEBUG] (sidian    ) sidian:Worker: 1
    [DEBUG] (Thread-2  ) MyThreading======>Work: 1
    [DEBUG] (sidian    ) sidian:Worker: 3
    [DEBUG] (Thread-3  ) MyThreading======>Work: 2
    [INFO] (MainThread) __main__:MainThread
    
    Process finished with exit code 0
    

     从代码看,多线程的写法与多进程有很多相同之处。

    后面我写个案例看多线程与多进程的一些区别。

    import multiprocessing
    import threading
    import time
    
    n = 0
    def run():
        global n
        for i in range(10000):
            n += i
        return n
    
    
    def s_time(func):
        def wrap():
            t1 = time.perf_counter()
            res = func()
            print(f'({func.__name__})cost_time:{time.perf_counter()-t1:0.5f}')
            return res
        return wrap
    
    @s_time
    def run_threads():
        threads = []
        for i in range(10000):
            t = threading.Thread(target=run)
            t.start()
            threads.append(t)
        for i in threads:
            i.join()
    
    @s_time
    def run_process():
        threads = []
        for i in range(10000):
            t = multiprocessing.Process(target=run)
            t.start()
            threads.append(t)
        for i in threads:
            i.join()
    
    run_threads()
    run_process()
    
    /usr/local/bin/python3.7 /Users/shijianzhong/study/t_threading/t2.py
    (run_threads)cost_time:8.54433
    (run_process)cost_time:20.15610
    
    Process finished with exit code 0
    

     上面是同样开一万个进程与一万的线程的事件对比查,明显多线程快多了。

    后面我是我分别开1000个与100个的效果。

    /usr/local/bin/python3.7 /Users/shijianzhong/study/t_threading/t2.py
    (run_threads)cost_time:0.86798
    (run_process)cost_time:1.20561
    
    Process finished with exit code 0
    
    /usr/local/bin/python3.7 /Users/shijianzhong/study/t_threading/t2.py
    (run_threads)cost_time:0.10313
    (run_process)cost_time:0.12918
    
    Process finished with exit code 0
    

     总体来说,针对IO任务,多线程的开启速度明显比多进程快。

    下面讲一下守护线程,跟守护进程一样,守护线程不阻塞主线程的运行,但随着主线程的运行结束而结束。默认情况下线程不作为守护线程。

    import threading
    import time
    import logging
    
    
    def daemon():
        logging.debug('Starting')
        time.sleep(0.2)
        logging.debug('Exiting')
    
    def non_daemon():
        logging.debug('Starting')
        logging.debug('Exiting')
    
    logging.basicConfig(
        level=logging.DEBUG,
        format="(%(threadName)-10s) %(message)s",
    )
    
    d = threading.Thread(target=daemon, name='daemon', daemon=True)   # 开启守护线程
    # d.setDaemon(True)                  # 第二种方式
    t = threading.Thread(target=non_daemon, name='non_daemon')
    d.start()
    t.start()
    d.join(0.1)         # 设置守护线程的阻塞时间,如果不设置时间,讲长期阻塞
    logging.info('d.isAlive' + str(d.is_alive()))  # 判断该线程是否还活着
    
    /usr/local/bin/python3.7 /Users/shijianzhong/study/t_threading/t3.py
    (daemon    ) Starting
    (non_daemon) Starting
    (non_daemon) Exiting
    (MainThread) d.isAliveTrue
    
    Process finished with exit code 0
    

     下面再介绍一个感觉不是很实用的枚举所有线程的方法。

    import random
    import threading
    import time
    import logging
    
    
    def worker():
        """thread worker function"""
        pause = random.randint(1, 5) / 10
        logging.debug('sleeping %0.2f', pause)
        time.sleep(pause)
        logging.debug('ending')
    
    logging.basicConfig(
        level=logging.DEBUG,
        format="[%(levelname)s] (%(threadName)-10s) %(message)s",
    )
    
    for i in range(3):
        t = threading.Thread(target=worker)
        t.setDaemon(True)
        t.start()
    
    main_thread = threading.main_thread()
    logging.info(main_thread)                # 主线程
    logging.info(threading.enumerate())
    for t in threading.enumerate():       # 枚举所有的线程
        if t is main_thread:           # 如果是主线程就跳过
            continue
        logging.debug(f'joining {t.getName()}')    # 非主线程就阻塞
        t.join()
    /usr/local/bin/python3.7 /Users/shijianzhong/study/t_threading/t4.py
    [DEBUG] (Thread-1  ) sleeping 0.20
    [DEBUG] (Thread-2  ) sleeping 0.50
    [DEBUG] (Thread-3  ) sleeping 0.30
    [INFO] (MainThread) <_MainThread(MainThread, started 4757224896)>
    [INFO] (MainThread) [<_MainThread(MainThread, started 4757224896)>, <Thread(Thread-1, started daemon 123145473859584)>, <Thread(Thread-2, started daemon 123145479114752)>, <Thread(Thread-3, started daemon 123145484369920)>]
    [DEBUG] (MainThread) joining Thread-1
    [DEBUG] (Thread-1  ) ending
    [DEBUG] (MainThread) joining Thread-2
    [DEBUG] (Thread-3  ) ending
    [DEBUG] (Thread-2  ) ending
    [DEBUG] (MainThread) joining Thread-3
    
    Process finished with exit code 0
    

    其实我感觉为什么这么麻烦,其实直接做个空列表,把需要阻塞的线程放进去再循环不是一样吗?

    比如下面的:

    import random
    import threading
    import time
    import logging
    
    
    def worker():
        """thread worker function"""
        pause = random.randint(1, 5) / 10
        logging.debug('sleeping %0.2f', pause)
        time.sleep(pause)
        logging.debug('ending')
    
    logging.basicConfig(
        level=logging.DEBUG,
        format="[%(levelname)s] (%(threadName)-10s) %(message)s",
    )
    
    threads = []
    for i in range(3):
        t = threading.Thread(target=worker)
        t.setDaemon(True)
        t.start()
        threads.append(t)
    
    main_thread = threading.main_thread()
    logging.info(main_thread)                # 主线程
    logging.info(threading.enumerate())
    
    # for t in threading.enumerate():       # 枚举所有的线程
    #     if t is main_thread:           # 如果是主线程就跳过
    #         continue
    #     logging.debug(f'joining {t.getName()}')    # 非主线程就阻塞
    #     t.join()
    
    for t in threads:
        t.join()
    

     这样不是更加简单吗?

    下面上一个更加无聊的定时器线程,实在想不通,这种定时器线程一般用在什么地方。

    import threading
    import time
    import logging
    
    def delayed():
        logging.debug('worker running')
    
    logging.basicConfig(
        level=logging.DEBUG,
        format="[%(levelname)s] (%(threadName)-10s) %(message)s",
    )
    
    t1 = threading.Timer(0.3, delayed)   # 0.3秒后启动线程
    t1.setName('t1')
    t2 = threading.Timer(0.3, delayed)
    t2.setName('t2')
    
    logging.debug('starting timers')
    t1.start()
    t2.start()
    
    logging.debug(f'waiting before canceling {t2.getName()}')
    time.sleep(0.2)
    logging.debug(f'canceling {t2.getName()}')
    t2.cancel()             # 取消线程
    logging.debug('done')
    
    /usr/local/bin/python3.7 /Users/shijianzhong/study/t_threading/threading_timer.py
    [DEBUG] (MainThread) starting timers
    [DEBUG] (MainThread) waiting before canceling t2
    [DEBUG] (MainThread) canceling t2
    [DEBUG] (MainThread) done
    [DEBUG] (t1        ) worker running
    
    Process finished with exit code 0
    

     从代码可以看出,Timer我只看到了延迟启动线程任务与中途可以取消任务的功能,但实在很难想象使用的场景。

    下面介绍一个多线程里面的Event线程通讯管理,其实多线程共享数据,我觉设置全局变量作为通讯管理应该也是不错的选择。

    import logging
    import threading
    import time
    
    
    def wait_for_event(e):
        """wait for the event to be set before doing anything"""
        logging.debug('wait for event starting')   # 线程启动最开始启动
        event_is_set = e.wait()      # 这个还是比较有意思的,竟然还有返回值。
        logging.debug(f'event set: {event_is_set}')
        logging.debug(str(e.is_set()))
    
    def wait_for_event_timeout(e, t):
        """Wait t seconds and then timeout"""
        while not e.is_set():        # 默认事件为关闭
            logging.debug('wait_for_event_timeout starting')
            event_is_set = e.wait(t)      # 线程开始进来等待阻塞,可以设置阻塞时间,返回值为时间是否为设置的值
            logging.debug('event set : %s', event_is_set)
            if event_is_set:
                logging.debug('processing event')
            else:
                logging.debug('doing other work')
    
    logging.basicConfig(
        level=logging.DEBUG,
        format="[%(levelname)s] (%(threadName)-10s) %(message)s",
    )
    
    e = threading.Event()
    
    t1 = threading.Thread(
        name='block',
        target=wait_for_event,
        args=(e,),
    )
    t1.start()
    t2 = threading.Thread(
        name='nonblock',
        target=wait_for_event_timeout,
        args=(e, 2),
    )
    t2.start()
    
    logging.debug('Waiting before calling Event.set()')
    time.sleep(3)
    e.set()
    logging.debug('Event is set')
    
    /usr/local/bin/python3.7 /Users/shijianzhong/study/t_threading/thread_event.py
    [DEBUG] (block     ) wait for event starting
    [DEBUG] (nonblock  ) wait_for_event_timeout starting
    [DEBUG] (MainThread) Waiting before calling Event.set()
    [DEBUG] (nonblock  ) event set : False
    [DEBUG] (nonblock  ) doing other work
    [DEBUG] (nonblock  ) wait_for_event_timeout starting
    [DEBUG] (MainThread) Event is set
    [DEBUG] (block     ) event set: True
    [DEBUG] (block     ) True
    [DEBUG] (nonblock  ) event set : True
    [DEBUG] (nonblock  ) processing event
    
    Process finished with exit code 0
    

     从代码可以看出e.wait()还是非常好用的一个方法。

    接下来讲一下,多线程操作中的锁。这一点很重要,上面讲的是同步线程操作,这里是要能控制对共享资源的访问,从而避免破坏或丢失数据。

    Python的内置数据结构(列表、字典等)是线程安全的,这是Python使用原子字节码来管理这些数据结构的一个副作用(全局解释器锁的一个好处)

    Python中实现的其他数据结构或更简单的类型(如整数和浮点数)则没有这个保护。要保证同时安全地访问一个对象,可以使用Lock对象。

    锁尽量用with,这样避免锁忘记关闭,防止死锁。

    import logging
    import threading, time

    logging.basicConfig(level=logging.INFO)

    # 10 -> 100cups
    cups = []
    lock = threading.Lock()


    def worker(lock = threading.Lock, task=100):
    while True:
    count = len(cups) # 当多个最后count为99的时候,如果有多个线程进入的话,cups数据个数将出现问题。
    if count >= task:
    break
    logging.info(count)
    cups.append(1)
    logging.info("{} make 1........ ".format(threading.current_thread().getName()))
    logging.info("{} ending=======>".format(len(cups)))


    for x in range(10):
    threading.Thread(target=worker, args=(lock, 100)).start()
    INFO:root:104 ending=======>
    INFO:root:104 ending=======>
    INFO:root:Thread-6 make 1........ 
    INFO:root:105 ending=======>
    INFO:root:105 ending=======>
    INFO:root:105 ending=======>
    
    import logging
    import threading, time
    
    logging.basicConfig(level=logging.INFO)
    
    # 10 -> 100cups
    cups = []
    lock = threading.Lock()
    
    
    def worker(lock = None, task=100):
        with lock:           # 加锁以后
            while True:
                count = len(cups)     # 当多个最后count为99的时候,如果有多个线程进入的话,cups数据个数将出现问题。
                if count >= task:
                    break
                logging.info(count)
                cups.append(1)
                logging.info("{} make 1........ ".format(threading.current_thread().getName()))
            logging.info("{} ending=======>".format(len(cups)))
    
    
    for x in range(10):
        threading.Thread(target=worker, args=(lock, 100)).start()
    
    INFO:root:Thread-1 make 1........ 
    INFO:root:96
    INFO:root:Thread-1 make 1........ 
    INFO:root:97
    INFO:root:Thread-1 make 1........ 
    INFO:root:98
    INFO:root:Thread-1 make 1........ 
    INFO:root:99
    INFO:root:Thread-1 make 1........ 
    INFO:root:100 ending=======>
    INFO:root:100 ending=======>
    INFO:root:100 ending=======>
    INFO:root:100 ending=======>
    INFO:root:100 ending=======>
    INFO:root:100 ending=======>
    INFO:root:100 ending=======>
    INFO:root:100 ending=======>
    INFO:root:100 ending=======>
    INFO:root:100 ending=======>
    

     加锁以后数据就正确了,但加锁真的是一件非常让人蛋疼的事情,这样的话,基本多线程的效果等于0,因为加锁了以后会让并行的线程变成串行,效率会低很多。

    Lock的方法:
    acquire(blocking=True,timeout=-1)  加锁。默认True阻塞,阻塞可以设置超时时间。非阻塞时成功获取锁返回True,否则返回False。

    这个还是非常有意思的,当你再acquire里面填参数0的时候,这个锁大家都可以拿,但假如返回是Flase说明你拿的是假锁。

    l = threading.Lock()
    
    r1 = l.acquire(0)
    r2 = l.acquire(0)
    l.release()
    r3 = l.acquire(0)
    r4 = l.acquire(0)
    logging.info(f'r1>{r1};r2>{r2};r3>{r3};r4>{r4}')
    
    INFO:root:r1>True;r2>False;r3>True;r4>False
    

     这个还是蛮有意思的,可以通过返回值来判断,谁拿到的锁是真的锁,而且大家可以重复去拿锁,当那把锁解锁以后,第一个拿到的锁的,就是True

    插播一条廖大的threading.local代码:

    import threading
    
    # 创建全局ThreadLocal对象(在实际使用中,其实你创建了几个线程后期就有几个实际的对象):
    local_school = threading.local()
    
    
    def process_student():
        # 获取当前线程关联的student:
        std = local_school.student
        print('Hello, %s (in %s)' % (std, threading.current_thread().getName()))
    
    
    def process_thread(name):
        # 绑定ThreadLocal的student:
        local_school.student = name
        process_student()
    
    
    t1 = threading.Thread(target=process_thread, args=('Alice',), name='Thread-A')
    t2 = threading.Thread(target=process_thread, args=('Bob',), name='Thread-B')
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    

     这个threading.loacl,其实在任何一条线程里面都有一个对立的对象,说实话,要是我用,还不如函数里面层层传递比较,又直接,又清爽。

    而且我感觉这个对象创建还会比较消耗内存。

    后面用的比较少模块我觉的,我讲抄袭Python3标准库的代码,做好相关解释。

    还有由于多线程是共享内存的,如果为了防止用锁带来的效率底下,可以使用queue模块。

    queue队列 :使用import queue,用法与进程Queue一样

    class queue.Queue(maxsize=0) #先进先出

    复制代码
    import queue
    
    q=queue.Queue()
    q.put('first')
    q.put('second')
    q.put('third')
    
    print(q.get())
    print(q.get())
    print(q.get())
    '''
    结果(先进先出):
    first
    second
    third
    '''
    复制代码

    class queue.LifoQueue(maxsize=0) #last in fisrt out 

    复制代码
    import queue
    
    q=queue.LifoQueue()
    q.put('first')
    q.put('second')
    q.put('third')
    
    print(q.get())
    print(q.get())
    print(q.get())
    '''
    结果(后进先出):
    third
    second
    first
    '''
    复制代码

    class queue.PriorityQueue(maxsize=0) #存储数据时可设置优先级的队列

    复制代码
    import queue
    
    q=queue.PriorityQueue()
    #put进入一个元组,元组的第一个元素是优先级(通常是数字,也可以是非数字之间的比较),数字越小优先级越高
    q.put((20,'a'))
    q.put((10,'b'))
    q.put((30,'c'))
    
    print(q.get())
    print(q.get())
    print(q.get())
    '''
    结果(数字越小优先级越高,优先级高的优先出队):
    (10, 'b')
    (20, 'a')
    (30, 'c')
    '''

    ok
    
    
    

     同样你可以用queue写一个生产者与消费者的模型。以下我先插入写一个生产者,消费者模型吧。

    import threading
    import queue
    import logging
    import time
    
    logging.basicConfig(
        level=logging.DEBUG,
        format='[%(levelname)s] (%(threadName)s) => %(message)s'
    )
    
    def consumer(q):
        while True:
            res = q.get()
            if res is None:
                logging.debug(f'你没东西给我吃了,我走了')
                break
            logging.debug(f'我开始吃{res}了')
    
    
    def product(q):
        for i in ('猪头','milk', 'egg'):
            logging.debug(f'我放入了{i}')
            q.put(i)
            time.sleep(2)
        q.put(None)     # 给一个结束信号
    
    if __name__ == '__main__':
        q = queue.Queue()
        c = threading.Thread(target=consumer, args=(q,))
        p = threading.Thread(target=product, args=(q,))
        c.start()
        p.start()
    
    /usr/local/bin/python3.7 /Users/shijianzhong/study/t_threading/threading_queue.py
    [DEBUG] (Thread-2) => 我放入了猪头
    [DEBUG] (Thread-1) => 我开始吃猪头了
    [DEBUG] (Thread-2) => 我放入了milk
    [DEBUG] (Thread-1) => 我开始吃milk了
    [DEBUG] (Thread-2) => 我放入了egg
    [DEBUG] (Thread-1) => 我开始吃egg了
    [DEBUG] (Thread-1) => 你没东西给我吃了,我走了
    
    Process finished with exit code 0
    

     接下来,我开始抄写,注释代码了。

    首先是同步线程除了使用Event还能使用Condition对象来同步线程。

    import logging
    import threading
    import time
    
    def consumer(cond):
        """wait for the condition and use the resource"""
        logging.debug('Starting consumer thread')
        with cond:           # 必须再condition的环境下才等待
            cond.wait()      # 开始等待
            logging.debug('Resource is available to consumer')
    
    
    def producer(cond):
        """set up the resource to be used by the consumer"""
        logging.debug('Starting producer thread')
        with cond:       # 必须在condition环境下菜能设置condition
            logging.debug('Making resource available')
            cond.notifyAll()    # 给条件通知
    
    logging.basicConfig(
        level=logging.DEBUG,
        format='%(asctime)s (%(threadName)-2s) %(message)s',
    )
    
    condition = threading.Condition()
    # print(dir(condition))
    c1 = threading.Thread(name='c1', target=consumer,
                          args=(condition,))
    c2 = threading.Thread(name='c2', target=consumer,
                          args=(condition,))
    p = threading.Thread(name='p', target=producer,
                         args=(condition,))
    c1.start()
    time.sleep(0.2)
    c2.start()
    time.sleep(0.2)
    p.start()
    
    /usr/local/bin/python3.7 /Users/shijianzhong/study/t_threading/threding_condition.py
    2019-12-12 02:16:24,131 (c1) Starting consumer thread
    2019-12-12 02:16:24,334 (c2) Starting consumer thread
    2019-12-12 02:16:24,535 (p ) Starting producer thread
    2019-12-12 02:16:24,536 (p ) Making resource available
    2019-12-12 02:16:24,536 (c1) Resource is available to consumer
    2019-12-12 02:16:24,536 (c2) Resource is available to consumer
    
    Process finished with exit code 0
    

    屏障(barrier)是另外一种线程同步机制。Barrier会建立一个控制点,所有参与的线程会在这里阻塞,直到所有这些参与线程到达这个点。

    当线程的数量等于你的设置量时候,线程开始工作。感觉非常有意思的玩意。

    import threading
    import time
    
    
    def work(barrier):
        print(threading.current_thread().getName(),
              'waiting for barrier with {} others'.format(barrier.n_waiting))
        worker_id = barrier.wait()      # 设置线程数量阻塞,返回值会阻塞数量,跟列表有点像从从0开始,到2返回值,就说明三个线程阻塞完成。
        # print('worker_id_num====>',worker_id)
        print(threading.current_thread().getName(), 'after barrier', worker_id)
    
    NUM_THREADS = 3
    
    barrier = threading.Barrier(NUM_THREADS)           # 外部定义barrier传入运作函数里面。
    
    threads = [
        threading.Thread(
            name=f'worker-{i}',
            target=work,
            args=(barrier,),
        )
        for i in range(6)           # 定一6个线程,如果定义的线程数量如阻塞的数量为非整除,整个线程将阻塞。
    ]
    
    for t in threads:
        print(t.getName(),'starting')
        t.start()
        time.sleep(0.1)
    
    for t in threads:
        t.join()
    
    /usr/local/bin/python3.7 /Users/shijianzhong/study/t_threading/threading_barrier.py
    worker-0 starting
    worker-0 waiting for barrier with 0 others
    worker-1 starting
    worker-1 waiting for barrier with 1 others
    worker-2 starting
    worker-2 waiting for barrier with 2 others
    worker-2 after barrier 2
    worker-0 after barrier 0
    worker-1 after barrier 1
    worker-3 starting
    worker-3 waiting for barrier with 0 others
    worker-4 starting
    worker-4 waiting for barrier with 1 others
    worker-5 starting
    worker-5 waiting for barrier with 2 others
    worker-5 after barrier 2
    worker-4 after barrier 1
    worker-3 after barrier 0
    
    Process finished with exit code 0
    

     后面将介绍一个在使用abort()来避免由于线程数量问题导致的阻塞。

    import threading
    import time
    
    
    def work(barrier):
        print(threading.current_thread().getName(),
              'waiting for barrier with {} others'.format(barrier.n_waiting))
        try:
            worker_id = barrier.wait()      # 设置线程数量阻塞,返回值会阻塞数量,跟列表有点像从从0开始,到2返回值,就说明三个线程阻塞完成。
        # print('worker_id_num====>',worker_id)
        except threading.BrokenBarrierError:
            print(threading.current_thread().getName(), 'aborting')
        else:
            print(threading.current_thread().getName(), 'after barrier', worker_id)
    
    NUM_THREADS = 3
    
    barrier = threading.Barrier(NUM_THREADS)           # 外部定义barrier传入运作函数里面。
    
    threads = [
        threading.Thread(
            name=f'worker-{i}',
            target=work,
            args=(barrier,),
        )
        for i in range(4)           # 定一6个线程,如果定义的线程数量如阻塞的数量为非整除,整个线程将阻塞。
    ]
    
    for t in threads:
        print(t.getName(),'starting')
        t.start()
        time.sleep(0.1)
    
    barrier.abort()             # 定义abort取消符后,就非常方便,当最后的线程无法满足条件,将自动报错,可以通过except接收。
    
    for t in threads:
        t.join()
    
    /usr/local/bin/python3.7 /Users/shijianzhong/study/t_threading/threading_barrier_abort.py
    worker-0 starting
    worker-0 waiting for barrier with 0 others
    worker-1 starting
    worker-1 waiting for barrier with 1 others
    worker-2 starting
    worker-2 waiting for barrier with 2 others
    worker-2 after barrier 2
    worker-0 after barrier 0
    worker-1 after barrier 1
    worker-3 starting
    worker-3 waiting for barrier with 0 others
    worker-3 aborting
    
    Process finished with exit code 0
    

     还有最后两个,再说一个限制资源的并发访问。有时候可能需要允许多个工作线程同时访问一个资源,但要限制总数。列如,连接池支持同时链接,但数目可能固定,或者一个网络应用可能支持固定数目的并发下载。这些可以用Semaphore来管理。

    import logging
    import random
    import threading
    import time
    
    class ActivePool:
    
        def __init__(self):
            self.active = []
            self.lock = threading.Lock()
    
        def makeActive(self,name):    # 不加锁的情况下
            # with self.lock:
            self.active.append(name)
            time.sleep(1)
            logging.debug(f'A{threading.current_thread().getName()}-Running: {self.active}')
    
        def makeInactive(self,name):
            # with self.lock:
            self.active.remove(name)
            # time.sleep(0.1)
            logging.debug(f'I{threading.current_thread().getName()}-Running: {self.active}')
    
    def worker(s, pool):
        logging.debug('Waiting to join the pool')
        with s:                                          # 可以使用acquire与release
            # time.sleep(0.1)
            name = threading.current_thread().getName()
            print(f'{name} is coming.')
            pool.makeActive(name)
            # time.sleep(0.2)
            pool.makeInactive(name)
            # time.sleep(0.1)
            print(f'{name} is out.')
    
    logging.basicConfig(
        level=logging.DEBUG,
        format='%(asctime)s (%(threadName)-2s) %(message)s',
    )
    
    pool = ActivePool()
    s = threading.Semaphore(2)       # 设置最大连接数量
    
    def spend_time(func):
        def warp():
            t1 = time.perf_counter()
            func()
            t2 = time.perf_counter()
            print(f'speng_time is :{t2 -t1:0.5f}')
        return warp
    
    @spend_time
    def run():
        threads = []
        for i in range(4):
            t = threading.Thread(
                target=worker,
                name=str(i),
                args=(s, pool),
            )
            t.start()
            threads.append(t)
        for t in threads:
            t.join()
    
    
    run()
    
    /usr/local/bin/python3.7 /Users/shijianzhong/study/t_threading/therading_semaphonre.py
    0 is coming.
    1 is coming.
    2019-12-13 03:22:01,429 (0 ) Waiting to join the pool
    2019-12-13 03:22:01,429 (1 ) Waiting to join the pool
    2019-12-13 03:22:01,430 (2 ) Waiting to join the pool
    2019-12-13 03:22:01,430 (3 ) Waiting to join the pool
    0 is out.
    2 is coming.1 is out.
    
    3 is coming.
    2019-12-13 03:22:02,429 (0 ) A0-Running: ['0', '1']
    2019-12-13 03:22:02,429 (0 ) I0-Running: ['1']
    2019-12-13 03:22:02,430 (1 ) A1-Running: ['1']
    2019-12-13 03:22:02,430 (1 ) I1-Running: []
    2019-12-13 03:22:03,430 (3 ) A3-Running: ['2', '3']
    2019-12-13 03:22:03,431 (3 ) I3-Running: ['2']
    2019-12-13 03:22:03,431 (2 ) A2-Running: ['2']
    2019-12-13 03:22:03,431 (2 ) I2-Running: []
    3 is out.
    2 is out.
    speng_time is :2.00197
    
    Process finished with exit code 0
    

     加锁的情况下:

    import logging
    import random
    import threading
    import time
    
    class ActivePool:
    
        def __init__(self):
            self.active = []
            self.lock = threading.Lock()
    
        def makeActive(self,name):    # 不加锁的情况下
            with self.lock:
                self.active.append(name)
                time.sleep(1)
                logging.debug(f'A{threading.current_thread().getName()}-Running: {self.active}')
    
        def makeInactive(self,name):
            with self.lock:
                self.active.remove(name)
                # time.sleep(0.1)
                logging.debug(f'I{threading.current_thread().getName()}-Running: {self.active}')
    
    def worker(s, pool):
        logging.debug('Waiting to join the pool')
        with s:                                          # 可以使用acquire与release
            # time.sleep(0.1)
            name = threading.current_thread().getName()
            print(f'{name} is coming.')
            pool.makeActive(name)
            # time.sleep(0.2)
            pool.makeInactive(name)
            # time.sleep(0.1)
            print(f'{name} is out.')
    
    logging.basicConfig(
        level=logging.DEBUG,
        format='%(asctime)s (%(threadName)-2s) %(message)s',
    )
    
    pool = ActivePool()
    s = threading.Semaphore(2)       # 设置最大连接数量
    
    def spend_time(func):
        def warp():
            t1 = time.perf_counter()
            func()
            t2 = time.perf_counter()
            print(f'speng_time is :{t2 -t1:0.5f}')
        return warp
    
    @spend_time
    def run():
        threads = []
        for i in range(4):
            t = threading.Thread(
                target=worker,
                name=str(i),
                args=(s, pool),
            )
            t.start()
            threads.append(t)
        for t in threads:
            t.join()
    
    
    run()
    
    /usr/local/bin/python3.7 /Users/shijianzhong/study/t_threading/therading_semaphonre.py
    2019-12-13 03:24:15,506 (0 ) Waiting to join the pool
    2019-12-13 03:24:15,506 (1 ) Waiting to join the pool
    2019-12-13 03:24:15,506 (2 ) Waiting to join the pool
    2019-12-13 03:24:15,507 (3 ) Waiting to join the pool
    0 is coming.1 is coming.
    
    2019-12-13 03:24:16,507 (1 ) A1-Running: ['1']
    2019-12-13 03:24:16,507 (1 ) I1-Running: []
    1 is out.
    2 is coming.
    2019-12-13 03:24:17,509 (0 ) A0-Running: ['0']
    2019-12-13 03:24:17,510 (0 ) I0-Running: []
    0 is out.
    3 is coming.
    2019-12-13 03:24:18,512 (2 ) A2-Running: ['2']
    2019-12-13 03:24:18,512 (2 ) I2-Running: []
    2 is out.
    2019-12-13 03:24:19,514 (3 ) A3-Running: ['3']
    2019-12-13 03:24:19,515 (3 ) I3-Running: []
    3 is out.
    speng_time is :4.00896
    
    Process finished with exit code 0
    

     其实根据我实际的操作来看,是否需要加锁应该也看实际需要,像Python保护的类型,操作列表,字典就无需加锁,要不然效率非常低。

    测试多线程运行的过程中,当跑的线程中,外部传入的函数的参数,是属于每个函数内部的一部分,并不会相互干扰,属于局部变量。

    最后对于前面的local类的使用,参照Python3标准库的介绍,再详细记录一下。(主要用在函数内部调用函数,每个人操作一份具体的属性值,可以参考操作个人的存款)

    有些资源需要锁定以便于多个线程使用,另外一些资源则需要保护,让他们能够对非这些线程的"所有者"隐藏。

    local()类会创建一个对象,,它能隐藏值,使其再不同的线程中无法被看到。(一半这种用在多层函数使用中,一层需要使用的函数就可以设置好对象属性,后面所有的函数对可以直接调用这个对象属性)

    对比廖大的解释,感觉还是这个专业书籍翻译过来的解释更加让人理解。

    import random
    import threading
    import logging
    
    def show_value(data):
        try:
            val = data.value
        except AttributeError:
            logging.debug('No value yet')
        else:
            logging.debug(f'value={val}')
    
    def worker(data):
        show_value(data)
        data.value = random.randint(1, 100)        # 对对象的属性进行赋值
        show_value(data)
    
    logging.basicConfig(
        level=logging.DEBUG,
        format='(%(threadName)-10s) %(message)s',
    )
    local_data = threading.local()
    show_value(local_data)            # 主线程开始先跑起来
    local_data.value = 1000
    show_value(local_data)
    
    for i in range(2):
        t = threading.Thread(target=worker, args=(local_data,))   # 开启两个线程,并传入local对象。
        t.start()
    
    /usr/local/bin/python3.7 /Users/shijianzhong/study/t_threading/thread_local_book.py
    (MainThread) No value yet
    (MainThread) value=1000
    (Thread-1  ) No value yet
    (Thread-1  ) value=90
    (Thread-2  ) No value yet
    (Thread-2  ) value=62
    
    Process finished with exit code 0
    

    上面的代码测试了,不同的线程执行可以拥有各自线程的对象属性。但如果希望这个对象一出来就自带一个默认的线程属性,

    可以继承local,并再__init__初始化过程中给予对象设置属性。(银行用户操作,其实用起来蛮好了,可以混合继承该类,用于多线程操作,操作起来会更加方便)

    import random
    import threading
    import logging
    
    def show_value(data):
        try:
            val = data.value
        except AttributeError:
            logging.debug('No value yet')
        else:
            logging.debug(f'value={val}')
    
    def worker(data):
        show_value(data)
        data.value = random.randint(1, 100)        # 对对象的属性进行赋值
        show_value(data)
    
    class MyLocal(threading.local):    # 通过继承的方式
    
        def __init__(self,value):
            super(MyLocal, self).__init__()
            logging.debug('Initializing %r', self)
            self.value = value    # 给予对象初始值
    
    logging.basicConfig(
        level=logging.DEBUG,
        format='(%(threadName)-10s) %(message)s',
    )
    
    local_data = MyLocal(1000)
    show_value(local_data)
    for i in range(2):
        t = threading.Thread(target=worker, args=(local_data,))   # 开启两个线程,并传入local对象。
        t.start()
    
    /usr/local/bin/python3.7 /Users/shijianzhong/study/t_threading/thread_loacl_defaults.py
    (MainThread) Initializing <__main__.MyLocal object at 0x1060acc20>
    (MainThread) value=1000
    (Thread-1  ) Initializing <__main__.MyLocal object at 0x1060acc20>
    (Thread-1  ) value=1000
    (Thread-1  ) value=53
    (Thread-2  ) Initializing <__main__.MyLocal object at 0x1060acc20>
    (Thread-2  ) value=1000
    (Thread-2  ) value=99
    
    Process finished with exit code 0
    

    最后给自己提个醒,无论是Event事件,还是condition,又或者是semaphore,local等等,杜需要将该对象放入函数中,再外面对该对象设置好相关参数,再函数中执行该对象具体方法。

     

    初步的就记录到这里,总体感觉多线程与多进程很多操作都是非常相似的,平时一半并发操作,应该还是多线程比较好,至少内存使用少,启动快。

    而且对比方法里面线程也比较多,也不存在数据通信问题。

  • 相关阅读:
    51nod 1179 最大的最大公约数 (数论)
    POJ 3685 二分套二分
    POJ 3045 贪心
    LIC
    HDU 1029 Ignatius and the Princess IV
    HDU 1024 Max Sum Plus Plus
    HDU 2389 Rain on your Parade
    HDU 2819 Swap
    HDU 1281 棋盘游戏
    HDU 1083 Courses
  • 原文地址:https://www.cnblogs.com/sidianok/p/12020060.html
Copyright © 2011-2022 走看看