zoukankan      html  css  js  c++  java
  • 线程同步

    在多线程编程中,有两个必须要解决的问题,
    一个是线程通信,另一个是线程同步问题。

    from threading import Thread
    from multiprocessing import Process
    a = 0
    def add():
        global a
        b = 0
        for i in range(1000000):
            a += 1
    
    def desc():
        global a
        for i in range(1000000):
            a -= 1
    
    def task():
        t1 = Thread(target=add)
        t2 = Thread(target=desc)
        t1.start()
        t2.start()
        t1.join()
        t2.join()
        print(a)
    
    if __name__ == "__main__":
        for i in range(4):
            p = Process(target=task)
            p.start()
            p.join()

    执行结果:
      -557629
      478789
      444414
      -12991

    原因分析:
      关键在于赋值上面,当某个时间段,t1说a = 5000,t2说a=3000,那么到低是5000还是3000了,
      谁先执行听谁的,如果a=3000,对于t1来说,问题就出现了。

    解决方法一:
      t1.start()
      t1.join()
      t2.start()
      t2.join()
    如果让现场顺序执行,那就没有问题了,但是这样多线程就没有意义。

    1.Lock

    既然赋值的时候可能出现竞争,那么就给他上一把锁。
    让它同一时刻,只有一个代码段可以运行。

    from threading import Thread,Lock
    from multiprocessing import Process
    
    a = 0
    def add(lock):
        global a
        b = 0
        for i in range(1000000):
            lock.acquire()
            a += 1
            lock.release()
    
    def desc(lock):
        global a
        for i in range(1000000):
            lock.acquire()
            a -= 1
            lock.release()  #一定要释放掉
    
    def task():
        lock = Lock()
        t1 = Thread(target=add,args=(lock,))
        t2 = Thread(target=desc,args=(lock,))
        t1.start()
        t2.start()
        t1.join()
        t2.join()
        print(a)
    
    if __name__ == "__main__":
        for i in range(4):
            p = Process(target=task)
            p.start()

    执行结果:
      0
      0
      0
      0

    但是用锁会影响性能,同时可能会引起死锁。
    由于资源竞争最容易出现死锁的状况。

    2.RLook:可重用锁

    在一个线程里面,可以连续调用多次acquire,但是release的次数一定要相等。

    lock.acquire()
    lock.acquire()
    a
    += 1
    lock.release()
    lock.release()
    对于lock来说,使用锁之前所以一定要释放掉,上述代码肯定出错。
    只要使用可重用锁就行:
    lock = RLook()

    3.condition的使用以及源码分析

    condition(条件变量)是python中提供了另一个用于复杂的线程间同步的锁。
    如果我们要实现以下对话:
    天猫精灵:小艾
    小艾:在

    天猫精灵:我们来对古诗吧
    小艾:好

    天猫精灵:君住长江头
    小艾:我住长江尾

    天猫精灵:日日思君不见君
    小艾:共饮长江水

    天猫精灵:此水几时休
    小艾:此恨何时已

    天猫精灵:只愿君心似我心
    小艾:定不负相思意

    前面两句还好:

    from threading import Thread
    
    class XiaoAi(Thread):
        def __init__(self):
            super().__init__(name="小艾")
    
    
        def run(self):
            print("{}:{}".format(self.name,""))
    
    
    class TianMao(Thread):
        def __init__(self):
            super().__init__(name="天猫精灵")
    
        def run(self):
            print("{}:{}".format(self.name,"小艾同学"))
    
    
    if __name__ == "__main__":
        tianmao = TianMao()
        xiaoai = XiaoAi()
        tianmao.start()
        xiaoai.start()
    执行结果:
    天猫精灵:小艾同学
    小艾:在

    但是说话的时候只能允许一个人说,因此上个锁。

    from threading import Thread,Lock
    
    class XiaoAi(Thread):
        def __init__(self,lock):
            super().__init__(name="小艾")
            self.lock = lock
    
    
        def run(self):
            self.lock.acquire()
            print("{}:{}".format(self.name,""))
            self.lock.release()
    
    
    class TianMao(Thread):
        def __init__(self,lock):
            super().__init__(name="天猫精灵")
            self.lock = lock
    
        def run(self):
            self.lock.acquire()
            print("{}:{}".format(self.name,"小艾同学"))
            self.lock.release()
    
    
    if __name__ == "__main__":
        lock = Lock()
        tianmao = TianMao(lock)
        xiaoai = XiaoAi(lock)
        tianmao.start()
        xiaoai.start()  
    执行结果:
    天猫精灵:小艾同学
    小艾:在

    但是如果要全部打印

    from threading import Thread,Lock
    
    class XiaoAi(Thread):
        def __init__(self,lock):
            super().__init__(name="小艾")
            self.lock = lock
    
        def run(self):
            self.lock.acquire()
            print("{}:{}".format(self.name,""))
            self.lock.release()
    
            self.lock.acquire()
            print("{}:{}".format(self.name, ""))
            self.lock.release()
    
    
    class TianMao(Thread):
        def __init__(self,lock):
            super().__init__(name="天猫精灵")
            self.lock = lock
    
        def run(self):
            self.lock.acquire()
            print("{}:{}".format(self.name,"小艾同学"))
            self.lock.release()
    
            self.lock.acquire()
            print("{}:{}".format(self.name, "我们来对古诗吧"))
            self.lock.release()
    
    
    if __name__ == "__main__":
        lock = Lock()
        tianmao = TianMao(lock)
        xiaoai = XiaoAi(lock)
        tianmao.start()
        xiaoai.start()
    执行结果:
    天猫精灵:小艾同学
    天猫精灵:我们来对古诗吧
    小艾:在
    小艾:好

    我们发现并不是我们期待的顺序,那么如何控制线程的切换顺序了。

    from threading import Thread,Condition
    
    class XiaoAi(Thread):
        def __init__(self,cond):
            super().__init__(name="小艾")
            self.cond = cond
    
        def run(self):
            with self.cond:
                self.cond.wait()  #先等待通知
                print("{}:{}".format(self.name,""))
                self.cond.notify()  #执行完代码之后,通知对方我执行完了你可以接着执行
    
                self.cond.wait()
                print("{}:{}".format(self.name, ""))
                self.cond.notify()
    
                self.cond.wait()
                print("{}:{}".format(self.name, "握住长江尾"))
                self.cond.notify()
    
                self.cond.wait()
                print("{}:{}".format(self.name, "共饮长江水"))
                self.cond.notify()
    
    class TianMao(Thread):
        def __init__(self,cond):
            super().__init__(name="天猫精灵")
            self.cond = cond
    
        def run(self):
            with self.cond:
                print("{}:{}".format(self.name,"小艾同学"))
                self.cond.notify()  #通知
                self.cond.wait()  #等待小艾执行完代码,发送通知过来
    
                print("{}:{}".format(self.name, "我们来对古诗吧"))
                self.cond.notify()
                self.cond.wait()
    
                print("{}:{}".format(self.name, "君住长江头"))
                self.cond.notify()
                self.cond.wait()
    
                print("{}:{}".format(self.name, "日日思君不见君"))
                self.cond.notify()
                self.cond.wait()
    
    
    if __name__ == "__main__":
        cond = Condition()
        tianmao = TianMao(cond)
        xiaoai = XiaoAi(cond)
        xiaoai.start()
        tianmao.start()
    执行结果:
    天猫精灵:小艾同学
    小艾:在
    天猫精灵:我们来对古诗吧
    小艾:好
    天猫精灵:君住长江头
    小艾:握住长江尾
    天猫精灵:日日思君不见君
    小艾:共饮长江水
    天猫精灵:此水几时休
    小艾:此恨何时已
    天猫精灵:只愿君心似我心
    小艾:定不负相思意

    这样就控制了线程的执行顺序,注意wait()和notify()一定要在condition下面,也可以不使用with语句。
    使用acquire()和release()也可以。

    def run(self):
        self.cond.acquire()
        print("{}:{}".format(self.name,"小艾同学"))
        self.cond.notify()
        self.cond.wait()
    
        print("{}:{}".format(self.name, "我们来对古诗吧"))
        self.cond.notify()
        self.cond.wait()
    
        print("{}:{}".format(self.name, "君住长江头"))
        self.cond.notify()
        self.cond.wait()
    
        print("{}:{}".format(self.name, "日日思君不见君"))
        self.cond.notify()
        self.cond.wait()
        self.cond.release()

    注意,下面的启动顺序:
      xiaoai.start()
      tianmao.start()
    如果顺序不对,会直接卡死,谁先说谁后启动,因为tianmao先启动可能使xiaoai收不到notify()通知,这样就会一直卡住。

    condition有两把锁,一把底层锁会在线程调用wait方法的时候释放,
    上面的锁会在每次调用wait的时候分配一把并放入到cond的等到队列中,
    等到notify方法的唤醒。

    def wait(self, timeout=None):
        if not self._is_owned():
            raise RuntimeError("cannot wait on un-acquired lock")
        waiter = _allocate_lock()
        waiter.acquire()
        self._waiters.append(waiter)
        saved_state = self._release_save()
        gotit = False
        try:    # restore state no matter what (e.g., KeyboardInterrupt)
            if timeout is None:
                waiter.acquire()
                gotit = True
            else:
                if timeout > 0:
                    gotit = waiter.acquire(True, timeout)
                else:
                    gotit = waiter.acquire(False)
            return gotit
        finally:
            self._acquire_restore(saved_state)
            if not gotit:
                try:
                    self._waiters.remove(waiter)
                except ValueError:
                    pass
    
    
    def notify(self, n=1):
        if not self._is_owned():
            raise RuntimeError("cannot notify on un-acquired lock")
        all_waiters = self._waiters
        waiters_to_notify = _deque(_islice(all_waiters, n))
        if not waiters_to_notify:
            return
        for waiter in waiters_to_notify:
            waiter.release()
            try:
                all_waiters.remove(waiter)
            except ValueError:
                pass

    4.Semaphore 信号量

    用于控制进入数量的锁。
    现在我们要爬去一个页面,分为列表页和详情页。

    import time
    from threading import Thread
    
    class AnalyzeUrl(Thread):
    
        def run(self):
            time.sleep(3)
            print("成功获取一个文章详情页")
    
    
    class GetUrl(Thread):
        def run(self):
            #现在在文章列表页获取到了20个文章的URL
            for i in range(20):
                result =AnalyzeUrl()
                result.start()
    
    
    if __name__ == "__main__":
        t1 = GetUrl()
        t1.start()

    上述的做法是一次开启20个线程来处理所有页面,
    但是了,同一时刻如果这个多请求,IP很容易被封掉。

    import time
    from threading import Thread,Semaphore
    
    class AnalyzeUrl(Thread):
        def __init__(self,sem):
            super().__init__()
            self.sem = sem
    
        def run(self):
            time.sleep(3)
            print("成功获取一个文章详情页")
            self.sem.release()
    
    
    class GetUrl(Thread):
        def __init__(self,sem):
            super().__init__()
            self.sem = sem
    
        def run(self):
            #现在在文章列表页获取到了20个文章的URL
            for i in range(20):
                self.sem.acquire()
                result =AnalyzeUrl(self.sem)
                result.start()
    
    
    if __name__ == "__main__":
        sem = Semaphore(4)
        t1 = GetUrl(sem)
        t1.start()
    
    Semaphore源码:
    class Semaphore:
        def __init__(self, value=1):
            if value < 0:
                raise ValueError("semaphore initial value must be >= 0")
            self._cond = Condition(Lock())
            self._value = value

      Semaphore本质上还是使用condition。

  • 相关阅读:
    大端小端与数字的二进制存储
    java基础之进制转换汇总
    (转) tcp udp通讯协议
    JAVA Tcp Udp的通讯实现(转)
    ExecutorService创建线程使用 转()
    转:java中的位运算
    SVN服务器的搭建与TortoiseSVN的使用
    [Mark]VM Cone & Template
    [Mark]VM migrate
    [Mark] ethtool command in REHL OS
  • 原文地址:https://www.cnblogs.com/yangmingxianshen/p/11290487.html
Copyright © 2011-2022 走看看