zoukankan      html  css  js  c++  java
  • Python多线程,线程死锁及解决,生产者与消费者问题

    1.Thread类

    普通调用

    t = Thread(target=test, args=(i,))	# test为目标函数名, 若函数需要参数将其以元组形									                # 式赋给args, 若无参数可不写	
    t.start()	# 用start()函数开启线程
    

    例子

    import time
    from threading import Thread
    
    # 目标函数
    def test(i):
        print("hello ", i)
        time.sleep(1)
    
    def main():
        # 循环5次,开起五个线程
        for i in range(5):
            t = Thread(target=test, args=(i,))	
            t.start()
    
    if __name__ == '__main__':
        main()
    

    继承Thread类

    定义一个自己的类继承自Thread,重写run()方法,即 将原本执行任务的函数内容移植到run()方法中.可通过类的属性传参.
    

    例子

    from threading import Thread
    import time
    
    class MyThread(Thread):
        def __init__(self, i):  
            Thread.__init__(self)   # 初始化父类"构造函数"
            self.i = i  # 初始化,目的将run函数参数作为类的属性
    
        def run(self):
            time.sleep(1)
            msg = "I'm " + self.name + " @ " + str(self.i)
            print(msg)
    
    def main():
        for i in range(3):	# 开启三个线程
            t = MyThread(i)	# 实例化自己的类
            t.start()
    
    if __name__ == '__main__':
        main()
    

    线程的执行顺序

    上面的例子中线程的执行顺序是随机的
    

    2.线程间共享全局变量

    下面例子中test1()和test2()共享g_num全局变量.希望test1()执行的结果是1000000,test2()执行的结果是2000000.但是time.sleep()函数会影响结果.

    from threading import Thread
    import time
    
    g_num = 0
    
    def test1():
        global g_num
        for i in range(1000000):
            g_num += 1
    
        print('---test1 g_num is %d---' % g_num)
    
    def test2():
        global g_num
        for i in range(1000000):
            g_num += 1
    
        print('---test2 g_num is %d---' % g_num)
    
    t1 = Thread(target=test1)
    t1.start()
    
    # time.sleep(3) # 运行这句话与不运行这句话结果不一样
    
    t2 = Thread(target=test2)
    t2.start()
    
    print('-----g_num: %d-----' % g_num)
    

    不执行sleep函数的结果(可能出现多种不同的运行结果)

    ---test1 g_num is 1312283---
    -----g_num: 1312283-----
    ---test2 g_num is 1341534---
    

    执行sleep()函数的结果

    ---test1 g_num is 1000000---
    -----g_num: 1079982-----
    ---test2 g_num is 2000000---
    

    其实这也不难理解,sleep之后test1中的任务肯定先完成,而不执行sleep两个函数同能对g_num同时操作

    通过轮询的方式解决线程间共享全局变量的问题

    from threading import Thread
    
    g_num = 0
    g_flag = 1  # 增加一个标识全局变量
    
    def test1():
        global g_num
        global g_flag
        if g_flag == 1:
            for i in range(1000000):
                g_num += 1
        g_flag = 0
        print('---test1 g_num is %d---' % g_num)
    
    def test2():
        global g_num
    
        # 轮询
        while True:
            if g_flag != 1: # 一旦test1()执行完,即g_flag = 0时,test2()开始执行累加g_num操作
                for i in range(1000000):
                    g_num += 1
                break
    
        print('---test2 g_num is %d---' % g_num)
    
    t1 = Thread(target=test1)
    t1.start()
    
    t2 = Thread(target=test2)
    t2.start()
    
    print('-----g_num: %d-----' % g_num)
    

    运行结果

    -----g_num: 303721-----
    ---test1 g_num is 1000000---
    ---test2 g_num is 2000000---
    

    第二个线程一开始并没有执行累加g_num的操作,而是先进行一个死循环,在这个循环中不断的"询问"g_flag的值是否不等于1.一但g_flag不等于1,即test1()结束后便开始干"正事".

    通过互斥锁解决线程间共享全局变量的问题

    from threading import Thread, Lock  # 导入互斥锁
    
    g_num = 0
    
    def test1():
        global g_num
    
        for i in range(1000000):
            mutex.acquire()  # 上锁,此时其他的锁会等待  上锁应该遵循最小原则
            g_num += 1
            mutex.release() # 开锁,此时其他的锁会抢着开锁
    
        print('---test1 g_num is %d---' % g_num)
    
    def test2():
        global g_num
    
        for i in range(1000000):
            mutex.acquire()
            g_num += 1
            mutex.release()
    
        print('---test2 g_num is %d---' % g_num)
    
    # 创建一把互斥锁,默认不上锁
    mutex = Lock()
    
    t1 = Thread(target=test1)
    t1.start()
    
    t2 = Thread(target=test2)
    t2.start()
    
    print('-----g_num: %d-----' % g_num)
    

    运行结果

    -----g_num: 45012-----
    ---test1 g_num is 1979942---
    ---test2 g_num is 2000000---
    

    从结果可以看出test2()的结果是正确的,而test1()的结果很接近test2.这也不难理解.互斥锁会把夹在中间的部分锁定,也就是说,在极短时间内只能有一个线程在执行该代码.一旦开锁了(release),所有线程开始抢这把锁,某个线程抢到之后会把自己的操作锁住,其他线程只能等待,一直反复直至全部任务完成.

    只有对上述代码稍微修改便可以实现我们想要的结果

    修改后的代码

    from threading import Thread, Lock  # 导入互斥锁
    
    g_num = 0
    
    def test1():
        global g_num
    
        mutex.acquire()  # 上锁,此时其他的锁会等待  上锁应该遵循最小原则
        for i in range(1000000):
            g_num += 1
        mutex.release() # 开锁,此时其他的锁会抢着开锁
    
        print('---test1 g_num is %d---' % g_num)
    
    def test2():
        global g_num
    
        mutex.acquire()
        for i in range(1000000):
            g_num += 1
        mutex.release()
    
        print('---test2 g_num is %d---' % g_num)
    
    # 创建一把互斥锁,默认不上锁
    mutex = Lock()
    
    t1 = Thread(target=test1)
    t1.start()
    
    t2 = Thread(target=test2)
    t2.start()
    
    print('-----g_num: %d-----' % g_num)
    

    结果

    -----g_num: 220254-----
    ---test1 g_num is 1000000---
    ---test2 g_num is 2000000---
    

    值得注意的是,互斥锁上的范围太大就失去了线程的意义,别的线程都把时间浪费在了等待上.轮询同理.

    3.线程间使用非全局变量

    from threading import Thread
    import threading
    import time
    
    def test1():
        name = threading.current_thread().name  # 获取当前线程名字
        print('----thread name is %s----' % name)
        g_num = 100
        if name == 'Thread-1':
            g_num += 1
        else:
            time.sleep(2)
        print('---thread is %s | g_num is %d---' % (name, g_num))
    
    t1 = Thread(target=test1)
    t1.start()
    
    t2 = Thread(target=test1)
    t2.start()
    

    运行结果

    ----thread name is Thread-1----
    ---thread is Thread-1 | g_num is 101---
    ----thread name is Thread-2----
    ---thread is Thread-2 | g_num is 100---
    

    非全局对于同一个函数来说.可以通过线程的名字来区分.

    4.线程死锁

    import threading
    import time
    
    class MyThread1(threading.Thread):
        def run(self):
            if mutexA.acquire():
                print(self.name + '---do1---up---')
                time.sleep(1)
    
                if mutexB.acquire():
                    print(self.name + '---do1---down---')
                    mutexB.release()
                mutexA.release()
    
    class MyThread2(threading.Thread):
        def run(self):
            if mutexB.acquire():
                print(self.name + '---do2---up---')
                time.sleep(1)
    
                if mutexA.acquire():
                    print(self.name + '---do2---down---')
                    mutexA.release()
                mutexB.release()
    
    if __name__ == '__main__':
        mutexA = threading.Lock()
        mutexB = threading.Lock()
        t1 = MyThread1()
        t2 = MyThread2()
        t1.start()
        t2.start()
    

    运行结果(卡在了这两句,未结束)

    Thread-1---do1---up---
    Thread-2---do2---up---
    
    

    分析代码,t1的代码在等待mutexB解锁的时候t2在等待mutexA解锁.而t1必须先执行完mutexB锁中的代码执行完才能释放mutexA,t2必须先执行完mutexA锁中的代码执行完才能释放mutexB,这就导致两个线程一直等待下去形成死锁,会浪费CPU资源.

    解决死锁的办法

    设置超时时间 mutexA.acquire(2)
    当然也可以从算法上避免死锁
    

    5.使用ThreadLocal

    import threading
    
    # 创建全局ThreadLocal对象
    local_school = threading.local()
    
    def process_student():
        # 获取当前线程相关联的student
        std = local_school.student
        print('Hello, %s in %s' % (std, threading.current_thread().name))
    
    def process_thread(name):
        # 绑定ThreadLocal的student
        local_school.student = name
        process_student()
    
    t1 = threading.Thread(target=process_thread, args=('kain',), name='Thread-A')
    t2 = threading.Thread(target=process_thread, args=('huck',), name='Thread-B')
    
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    

    运行结果

    Hello, kain in Thread-A
    Hello, huck in Thread-B
    

    6.生产者与消费者问题

    import threading
    import time
    
    # Python2
    # from Queue import Queue
    
    # Python3
    from queue import Queue
    
    class Producer(threading.Thread):
        def run(self):
            global queue
            count = 0
            while True:
                if queue.qsize() < 1000:
                    for i in range(100):
                        count += 1
                        msg = '生成产品' + str(count)
                        queue.put(msg)
                        print(msg)
                time.sleep(0.5)
    
    class Consumer(threading.Thread):
        def run(self):
            global queue
            while True:
                if queue.qsize() > 100:
                    for i in range(3):
                        msg = self.name + '消费了' + queue.get()
                        print(msg)
                time.sleep(1)
    
    if __name__ == '__main__':
        queue = Queue()
    
        for i in range(500):
            queue.put('初始产品'+str(i))    # 向队列中塞内容
    
        for i in range(2):
            p = Producer()
            p.start()
    
        for i in range(5):
            c = Consumer()
            c.start()
    

    运行结果过长不予展示

  • 相关阅读:
    Delphi 10.1.2 berlin开发跨平台APP的几点经验
    (矫情)关于编程(能力第一,其它不要多想)
    Xdite:永葆热情的上瘾式学习法(套路王:每天总结自己,反省自己的作息规律,找到自己的幸运时间、幸运方法,倒霉时间、倒霉方法。幸运是与注意力挂钩的。重复才能让自己登峰造极,主动去掉运气部分来训练自己。游戏吸引自己的几个原因非常适合训练自己)good
    无锡美新赵阳:创业18年,一辈子做好一家企业(创业是一种生活方式;为了赚钱而创业,那是扯淡”。最重要的是做自己喜欢做的事情)
    Delphi中流对象的应用
    多样性的配置来源
    WeText项目的服务端
    集线器,交换机,路由器
    不知道的JavaScript
    IPerf网络测试工具
  • 原文地址:https://www.cnblogs.com/kainhuck/p/10580442.html
Copyright © 2011-2022 走看看