zoukankan      html  css  js  c++  java
  • Python 线程(threading)

    • Python 的thread模块是比较底层的模块,Python的threading模块是对thread做了一些包装,可以更加方便的
      被使用;

    1. 使用threading 模块

    # 示例一: 单线程执行
    import time
    
    def say_happy():
        print("Happy Birthday!")
        time.sleep(2)
    
    if __name__ == "__main__":
        for i in range(5):
            say_happy()
    
    
    # 示例二: 多线程执行
    import threading
    import time
    
    
    def say_happy():
        print("Happy Birthday!")
        time.sleep(2)
    
    
    if __name__ == "__main__":
        for i in range(5):
            t = threading.Thread(target=say_happy)
            t.start()   # 启动线程,即让线程开始执行
    
    
    # 示例三: 第二种创建多线程的方式
    import threading
    import time
    
    
    class MyThread(threading.Thread):
        def __init__(self, num):
            threading.Thread.__init__(self)
            self.num = num
        
        def run(self):            # 定义每个线程要运行的函数
                print('running on number: %s' % self.num)
                
                time.sleep(3)
    
    
    if __name__ == '__main__':
        t1 = MyThread(1)
        t2 = MyThread(2)
        t3 = MyThread(3)
        t1.start()
        t2.start()
        t3.start()
    

    2. 多线程之间共享全局变量

    # 示例:
    from threading import Thread
    import time
    
    
    g_num = 100
    
    def work1():
        global g_num
        for i in range(3):
            g_num += 1
    
        print("=== in work1, g_num is %d ===" % g_num)
    
    
    def work2():
        global g_num
        print("=== in work2, g_num is %d ===" % g_num)
    
    
    print("=== 线程创建之前g_num is %d ===" % g_num)
    
    t1 = Thread(target=work1)
    t1.start()
    
    # 延时一会儿, 保证t1线程中的事情做完
    time.sleep(1)
    
    t2 = Thread(target=work2)
    t2.start()
    
    # 输出:
    # === 线程创建之前g_num is 100 ===
    # === in work1, g_num is 103 ===
    # === in work2, g_num is 103 ===
    
    
    # 示例二: 多线程共享变量的问题
    from threading import Thread
    import time
    
    g_num = 0
    
    def test1():
        global g_num
        for i in range(1000000):
            g_num += 1
    
        print("=== test1 === g_num=%d" % g_num)
    
    def test2():
        global g_num
        for i in range(1000000):
            g_num += 1
    
        print("=== test2 === g_num=%d" % g_num)
    
    p1 = Thread(target=test1)
    p1.start()
    
    # time.sleep(3)
    
    p2 = Thread(target=test2)
    p2.start()
    
    print("=== g_num=%d" % g_num)
    
    # 输出:
    #   === g_num=281358
    #   === test1 === g_num=1477304
    #   === test2 === g_num=1531631
    
    
    # 示例三: 列表当作实参传递到线程中
    from threading import Thread
    import time
    
    def work1(nums):
        nums.append(44)
        print("=== in work1 ===", nums)
    
    def work2(nums):
        # 延时一会儿,保证t1线程中的事情做完
        # 这里的参数,不需要使用 global 关键字
        time.sleep(1)
        print("=== in work2 ===", nums)
    
    g_nums = [11, 22, 33]
    
    t1 = Thread(target=work1, args=(g_nums,))
    t1.start()
    
    t2 = Thread(target=work2, args=(g_nums,))
    t2.start()
    

    3. 互斥锁

    # 示例:
    from threading import Thread, Lock
    import time
    
    g_num = 0
    
    def test1():
        global g_num
        # 上锁
        mutex.acquire()
        for i in range(1000000):
            g_num += 1
    
        # 释放锁
        mutex.release()
    
        print("=== test1 === g_num=%d" % g_num)
    
    def test2():
        global g_num
        mutex.acquire()
        for i in range(1000000):
            g_num += 1
    
        # 释放锁
        mutex.release()
    
        print("=== test2 === g_num=%d" % g_num)
    
    # 创建一把互斥锁
    mutex = Lock()
    
    p1 = Thread(target=test1)
    p1.start()
    
    
    p2 = Thread(target=test2)
    p2.start()
    
    # 输出:
    #     === test1 === g_num=1000000
    #     === test2 === g_num=2000000
    

    4. 死锁

    # 示例
    import threading
    import time
    
    class MyThread1(threading.Thread):
        def run(self):
            if mutexA.acquire():
                print(self.name + "=== do1 === up ===")
                time.sleep(1)
    
                if mutexB.acquire():
                    print(self.name + "=== do1 === down ===")
                    mutexB.release()
                mutexA.release()
    
    class MyThread2(threading.Thread):
        def run(self):
            if mutexB.acquire():
                print(self.name + "=== do2 === up ===")
                time.sleep(1)
    
                if mutexA.acquire():
                    print(self.name + "=== do2 === down ===")
                    mutexA.release()
                mutexB.release()
    
    mutexA = threading.Lock()
    mutexB = threading.Lock()
    # lock = threading.RLock()        # 递归锁(可重用锁)
    
    
    if __name__ == '__main__':
        t1 = MyThread1()
        t2 = MyThread2()
        t1.start()
        t2.start()
    

    5. 信号量(Semaphore)

    • 信号量用来控制线程并发数的,BoundedSemaphore或Semaphore管理一个内置的计数器,每当调用acquire()
      时-1,调用release()时,+1;
    • 计数器不能小于0,当计数器为0时,acquire()将阻塞线程至同步锁定状态,直至其他线程调用release();
    • BoundedSemaphoreSemaphore的唯一区别在于前者将在调用release时,检查计数器的值是否超过了计
      数器的初始值,如果超过了将抛出一个异常;
    # 示例:
    import threading
    import time
    
    class myThread(threading.Thread):
        def run(self):
            if semaphore.acquire():
                print(self.name)
                time.sleep(5)
                semaphore.release()
    
    if __name__ == '__main__':
        semaphore = threading.BoundedSemaphore(5)     # 创建信号量锁
        thrs = []
        for i in range(100):
            thrs.append(myThread())
        for t in thrs:
            t.start()
    

    6. 条件变量同步(Condition)

    • 有一类线程需要满足条件之后,才能够继续执行,Python提供了threading.Condition对象用于条件变量线程的支
      持,它除了能提供RLock()Lock()的方法外,还提供了wait(),notify(),notifyAll()方法;
    • wait(): 条件不满足时调用,线程会释放锁并进入等待阻塞;
    • notify(): 条件创造后调用,通知等待池激活一个线程;
    • notifyAll(): 条件创造后调用,通知等待池激活所有线程;
    # 示例一:
    lock_con = threading.Condition([Lock/Rlock]): 锁参数是可选选项,不传入锁,对象自动创建一个 RLock()
    
    # 示例二:
    import threading
    import time
    from random import randint
    
    class Producer(threading.Thread):
        def run(self):
            global L
            while True:
                val = randint(0, 100)
                print('生产者', self.name, ':Append' + str(val), L)
                if lock_con.acquire():
                    L.append(val)
                    lock_con.notify()
                    lock_con.release()
                time.sleep(3)
    class Customer(threading.Thread):
        def run(self):
            global L
            while True:
                lock_con.acquire()
                if len(L) == 0:
                    lock_con.wait()
                print('消费者', self.name,':Delete'+str(L[0]), L)
                del L[0]
                lock_con.release()
                time.sleep(0.25)
    
    if __name__ == '__main__':
        L = []
        lock_con = threading.Condition()
        threads = []
        for i in range(5):
            threads.append(Producer())
        threads.append(Customer())
        for t in threads:
            t.start()
        for t in threads:
            t.join()
    

    7. 同步条件(Event)

    • 条件同步和条件变量同步差不多意思,只是少了锁功能,因为条件同步设计于不访问共享资源的条件环境。
    • event = threading.Event(): 条件环境对象,初始值为False;
    • event.isSet(): 返回event的状态值;
    • event.wait(): 如果 event.isSet() == False将阻塞线程;
    • event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态,等待操作系统调度;
    • event.clear(): 恢复event的状态值为False;
    # 示例一:
    import threading
    import time
    class Boss(threading.Thread):
        def run(self):
            print('Boss: 今晚大家都要加班到22:00')
            event.isSet() or event.set()
            time.sleep(5)
            print('Boss: <22:00>可以下班了')
            event.isSet() or event.set()
    
    class Worker(threading.Thread):
        def run(self):
            event.wait()
            print('Worker: 哎.....命苦啊!')
            time.sleep(0.25)
            event.clear()
            event.wait()
            print('Worker: Oh Yeah!')
    
    if __name__ == '__main__':
        event = threading.Event()
        threads = []
        for i in range(5):
            threads.append(Worker())
        threads.append(Boss())
        for t in threads:
            t.start()
        for t in threads:
            t.join()
    
    
    # 示例二: 红绿灯
    import threading
    import time
    import random
    
    def light():
        if not event.isSet():
            event.set()     # 绿灯状态
        count = 0
        while True:
            if count < 10:
                print('33[42;1m--green light on---33[0m')
            elif count < 13:
                print('33[43;1m--yellow light on---33[0m')
            elif count < 20:
                if event.isSet():
                    event.clear()
                print('33[41;1m--red light on---33[0m')
            else:
                count = 0
                event.set()     #打开绿灯
            time.sleep(1)
            count += 1
    
    def car(n):
        while 1:
            time.sleep(random.randrange(10))
            if event.isSet():   # 绿灯
                print('car [%s] is running...' % n)
            else:
                print('car [%s] is waiting for the red light...' %n)
    
    if __name__ == '__main__':
        event = threading.Event()
        Light = threading.Thread(target = light)
        Light.start()
        for i in range(3):
            t = threading.Thread(target = car, args = (i,))
            t.start()
    

    8. 生产者与消费者模式

    # 示例:
    from queue import Queue
    
    class Producer(threading.Thread):
        def run(self):
            global queue
            count = 0
            while True:
                if queue.qsize() < 1000:
                    for i in range(100):
                        count = count + 1
                        msg = "生成产品" + str(count)
                        queue.put(msg)
                time.sleep(0.5)
    
    class Consumer(threading.Thread):
        def run(self):
            global queue
            while True:
                if queue.qsize() > 100:
                    for i in range(3):
                        msg = self.name + "消费了" + queue.get()
                        print(msg)
                time.sleep(1)
    
    if __name__ == '__main__':
        queue = Queue()
    
        for i in range(500):
            queue.put("初始产品"+str(i))
        for i in range(2):
            p = Producer()
            p.start()
        for i in range(5):
            c = Consumer()
            c.start()
    

    9. ThreadLocal 对象在线程中的使用

    # 示例:
    import threading
    
    # 创建全局 ThreadLocal 对象
    local_school = threading.local()
    
    def process_student():
        # 获取当前线程关联的student
        std = local_school.student
        print("Hello, %s (in %s)" % (std, threading.current_thread().name))
    
    def process_thread(name):
        # 绑定 ThreadLocal 的 student
        local_school.student = name
        process_student()
    
    t1 = threading.Thread(target=process_thread, args=("zhangsan",), name="Thread-A")
    t2 = threading.Thread(target=process_thread, args=("lisi",), name="Thread-B")
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    
    # 输出:
    #   Hello, zhangsan (in Thread-A)
    #   Hello, lisi (in Thread-B)
    

    10. 异步

    # 示例:
    from multiprocessing import Pool
    import time
    import os
    
    def test():
        print("=== 进程池中的进程 === pid=%d, ppid=%d==" % (os.getpid(), os.getppid()))
        for i in range(3):
            print("=== %d ===" % i)
            time.sleep(1)
        return "hehe"
    
    def test2(args):
        print("=== callback func == pid=%d" % os.getpid())
        print("=== callback func == args=%s" % args)
    
    pool = Pool(3)
    pool.apply_async(func=test, callback=test2)
    
    time.sleep(5)
    
    print("=== 主进程 = pid=%d ===" % os.getpid())
    

    11. Join 和 Daemon

    • join:
      • join完成的工作就是线程同步,即主线程任务结束之后,进入阻塞状态,一直等待其他的子线程执行结束之后,主线程才终止;
    • setDaemon(True):
      • 将线程声明为守护进程,必须在start()方法调用之前设置,这个方法基本和join是相反的;
      • 当我们在程序运行中,执行一个主线程,如果主线程又创建一个子线程,主线程和子线程就兵分两路,分别运行,那么当
        主线程完成想退出时,会检验子线程是否完成。如果子线程未完成,则主线程会等待子线程完成后再退出。
      • 但是,有时候我们需要的是只要主线程完成了,不管子线程是否完成,都要和主线程一起退出,这时,就可以使用setDaemon
        方法;
    # 示例:
    import threading
    from time import ctime, sleep
    import time
    
    def music(func):
        for i in range(2):
            print('Begin listening to %s. %s' % (func, ctime()))
            sleep(2)
            print('end listening %s' % ctime())
    
    def movie(func):
        for i in range(2):
            print('Begin watching at the %s! %s' % (func, ctime()))
            sleep(3)
            print('end watching %s'% ctime())
    
    threads = []
    t1 = threading.Thread(target = music, args = ('七里香',))
    threads.append(t1)
    t2 = threading.Thread(target = movie, args = ('阿甘正传',))
    threads.append(t2)
    
    if __name__ == '__main__':
        for t in threads:
            t.setDaemon(True)
            t.start()
    
        print(threading.current_thread())    # 打印当前线程名称
        print(threading.active_count())      # 
        print('all over %s', % ctime)
    

    12. 线程中的队列(Queue)

    • queue定义了三种信息队列模式类:
      • Queue([maxsize]):FIFO队列模式,[maxsize]定义队列容量; 缺省,即无穷大;
      • LifoQueue([maxsize]):LIFO队列模式;
      • PriorityQueue([maxsize]):优先级队列模式,使用此队列时,项目应该是(priority,data)的形式;
    # 示例一:
    import queue
    q = queue.Queue(maxsize = 10)
    
    # 存入一个值
    q.put(item, block=True, timeout=None)
    # block 为可选参数,如果队列满了的话,put(item),调用者将被阻塞, 而put(item,0)会报Full异常;
    
    # 取出一个值
    q.get(item, block=False, timeout=None)
    # block 为可选参数,如果队列为空, get(item,0)会报Empty异常;
    
    # 常用方法:
    #   q.qsize(): 返回队列大小;
    #   q.full(): 如果队列满了,返回True,否则返回False;
    #   q.empty(): 如果队列为空,返回True,否则返回False;
    
    
    # 示例二:
    import threading
    import queue
    from time import sleep
    from random import randint
    class Production(threading.Thread):
        def run(self):
            while True:
                r = randint(0, 100)
                q.put(r)
                print('生产出来%s号包子' % r)
                sleep(1)
    class Process(threading.Thread):
        def run(self):
            while True:
                re = q.get()
                print('吃掉%s号包子' % re)
    
    if __name__ == '__main__':
        q = queue.Queue(10)
        threads = [Production(), Production(), Production(), Process()]
        for t in threads:
            t.start()
    

    参考资料:

  • 相关阅读:
    (转)Maven实战(三)Eclipse构建Maven项目
    刷欢乐豆的方法
    R-pie()
    R-plot()
    10只老鼠与1000瓶药水
    资源共享
    第一只python小爬虫
    正则表达式之python实现
    马踏棋盘之贪心算法优化
    八皇后之回溯法解决
  • 原文地址:https://www.cnblogs.com/linkworld/p/8552875.html
Copyright © 2011-2022 走看看