zoukankan      html  css  js  c++  java
  • threading

    关于threading模块的多线程的一些实验.

    threading.Thread

    先来看一个最简单的实验多线程的方法,我们只需要创建一个threading.Thread子类,并且覆写__init__和run方法就可以了.

    import threading
    import time
    
    
    
    
    
    class Guard(threading.Thread):
        def __init__(self):
            super(Guard, self).__init__()
    
        def run(self):
            print("Nice to meet you #{0}".format(
                self.name
            ))
    
    
    
    def main():
        for i in range(5):
            guard = Guard()
            guard.start()
            guard.join()
            
    
    
    
    
    if __name__ == "__main__":
        main()
    
    

    输出为:

    Nice to meet you #Thread-1
    Nice to meet you #Thread-2
    Nice to meet you #Thread-3
    Nice to meet you #Thread-4
    Nice to meet you #Thread-5
    

    从输出可以发现,关于线程的命名,默认情况是Thread-N的格式,当然我们也可以给出自己的名字。

    比如:

    
    class Guard(threading.Thread):
        def __init__(self, id):
            super(Guard, self).__init__(name="Sir"+str(id))
        def run(self):
            print("Nice to meet you #{0}".format(
                self.name
            ))
    
    
    
    def main():
        for i in range(5):
            guard = Guard(i+1)
            guard.start()
            guard.join()
    
    if __name__ == "__main__":
        main()
    
    

    或者

    class Guard(threading.Thread):
        def __init__(self, id):
            super(Guard, self).__init__()
            self.setName("Sir"+str(id))
        def run(self):
            print("Nice to meet you #{0}".format(
                self.name
            ))
    
    

    方法是很多的,不一一说明了.

    来看看Thread对象的一些属性和方法:

    class threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)

    group:这个参数是不能传的,好像是为了以后的扩展做的准备,现在没有用
    target:传入函数,初始的run函数会调用这个函数
    name:线程的名称

    方法:

    start()

    Start() : 激活线程,同时会调用run函数
    run(): ...

    join(timeout=None)

    join(timeout=None) 等待直到线程结束

    class Guard(threading.Thread):
        def __init__(self, id):
            super(Guard, self).__init__()
            self.setName("Sir"+str(id))
        def run(self):
            print("Nice to meet you #{0}".format(
                self.name
            ))
            time.sleep(1)
            print("ByeBye... #{0}".format(
                self.name
            ))
    
    
    
    def main():
        for i in range(5):
            guard = Guard(i+1)
            guard.start()
    
    if __name__ == "__main__":
        main()
    
    

    我们没有guard.join()
    输出结果为:

    Nice to meet you #Sir1
    Nice to meet you #Sir2
    Nice to meet you #Sir3
    Nice to meet you #Sir4
    Nice to meet you #Sir5
    ByeBye... #Sir3ByeBye... #Sir5
    ByeBye... #Sir1
    
    ByeBye... #Sir4
    ByeBye... #Sir2
    

    加了之后应该是:

    Nice to meet you #Sir1
    ByeBye... #Sir1
    Nice to meet you #Sir2
    ByeBye... #Sir2
    Nice to meet you #Sir3
    ByeBye... #Sir3
    Nice to meet you #Sir4
    ByeBye... #Sir4
    Nice to meet you #Sir5
    ByeBye... #Sir5
    

    是一个完成后才接着另一个的,不过这么玩好像线程的意义就没有了.

    需要注意的是timeout参数,假设我们设定timeout=0.5,即0.5秒的等待时间:

    Nice to meet you #Sir1
    Nice to meet you #Sir2
    ByeBye... #Sir1
    Nice to meet you #Sir3
    ByeBye... #Sir2
    Nice to meet you #Sir4
    ByeBye... #Sir3
    Nice to meet you #Sir5
    ByeBye... #Sir4
    ByeBye... #Sir5
    

    可以看到,虽然线程超时了,但是线程不会终止,知识暂时进入等待区,所以这个timeout不如干脆理解为这个程序的单次最大运行时间, join()总是会返回None,所以,如果想要判断线程是否超时,我们可以通过is_alive()来获得该线程是否为激活状态.

    ident

    返回线程的identifier从实验中来看似乎没有什么规律. 如果该线程没有开始,则结果是None.

    
    class Guard(threading.Thread):
        def __init__(self, id):
            super(Guard, self).__init__()
            self.setName("Sir"+str(id))
        def run(self):
            print("Nice to meet you #{0}".format(
                self.name
            ))
            print("ByeBye... #{0}".format(
                self.name
            ))
    
    
    
    def main():
        for i in range(5):
            guard = Guard(i+1)
            guard.start()
            guard.join(timeout=0.5)
            print(guard.ident)
    
    if __name__ == "__main__":
        main()
    
    Nice to meet you #Sir1
    ByeBye... #Sir1
    7884
    Nice to meet you #Sir2
    ByeBye... #Sir2
    8420
    Nice to meet you #Sir3
    ByeBye... #Sir3
    1228
    Nice to meet you #Sir4
    ByeBye... #Sir4
    3608
    Nice to meet you #Sir5
    ByeBye... #Sir5
    9948
    

    daemon 守护线程

    何为daemon thread, 一般情况:

    
    
    
    class Guard(threading.Thread):
        def __init__(self, id):
            super(Guard, self).__init__()
            self.setName("Sir"+str(id))
        def run(self):
            print("Nice to meet you #{0}".format(
                self.name
            ))
            time.sleep(1)
            print("ByeBye... #{0}".format(
                self.name
            ))
    
    
    
    def main():
        print("start...")
        for i in range(5):
            guard = Guard(i+1)
            guard.start()
            print(guard.ident)
    
        print("ending...")
    
    
    if __name__ == "__main__":
        main()
    
    

    结果为:

    start...
    Nice to meet you #Sir1
    10584
    Nice to meet you #Sir2
    2084
    Nice to meet you #Sir3
    14832
    Nice to meet you #Sir4
    7888
    Nice to meet you #Sir5
    1088
    ending...
    ByeBye... #Sir5ByeBye... #Sir2
    ByeBye... #Sir4
    ByeBye... #Sir3
    
    ByeBye... #Sir1
    

    注意到,ending...部分在线程未结束前就显示了,这说明本来主进程是该结束的,但是因为线程没有结束,所以进程在等而没有结束,如果我们设置daemon=True:

    
    def main():
        print("start...")
        for i in range(5):
            guard = Guard(i+1)
            guard.daemon = True #注意要在start之前设置
            guard.start()
            print(guard.ident)
    
        print("ending...")
    
    

    结果为:

    start...
    Nice to meet you #Sir1
    6496
    Nice to meet you #Sir2
    1892
    Nice to meet you #Sir3
    4752
    Nice to meet you #Sir4
    10928
    Nice to meet you #Sir5
    6644
    ending...
    

    此时,线程没有结束,但是主进程并没有等待,而是义无反顾地结束了. 这便是daemon thread...

    锁 thread.Lock

    多线程往往离不开锁机制.

    下面是从廖雪峰的教程上抄的一个例子:

    
    balance = 0
    
    def change_it(n):
        # 先存后取,结果应该为0:
        global balance
        balance = balance + n
        balance = balance - n
    
    def run_thread(n):
        for i in range(1000000):
            change_it(n)
    
    
    
    def main():
        print("start...")
        t1 = threading.Thread(target=run_thread, args=(5,))
        t2 = threading.Thread(target=run_thread, args=(8,))
        t1.start()
        t2.start()
        t1.join()
        t2.join()
        print(balance)
        print("ending...")
    
    

    理论上balance的结果是0, 但是:

    start...
    15
    ending...
    

    原因

    
    balance = 0
    
    def change_it(n):
        # 先存后取,结果应该为0:
        global balance
        balance += n
        balance -= n
    
    
    lock = threading.Lock()
    def run_thread(n):
        for i in range(1000000):
            lock.acquire()
            try:
                change_it(n)
            finally:
                lock.release()
    
    
    
    def main():
        print("start...")
        t1 = threading.Thread(target=run_thread, args=(5,))
        t2 = threading.Thread(target=run_thread, args=(8,))
        t1.start()
        t2.start()
        t1.join()
        t2.join()
        print(balance)
        print("ending...")
    
    

    我们对其上了锁,结果就为0了.

    再来看看threading.Lock:

    acquire(blocking=True, timeout=-1)

    class Guard(threading.Thread):
        def __init__(self, id, lock):
            super(Guard, self).__init__()
            self.setName("Sir"+str(id))
            self.lock = lock
        def run(self):
            print("run  start..........................")
            self.lock.acquire()
            print("lock ......................")
            try:
                print("Nice to meet you #{0}".format(
                    self.name
                ))
                time.sleep(1)
                print("ByeBye... #{0}".format(
                    self.name
                ))
            finally:
                self.lock.release()
                print("release ..........")
    
    class Guard2(threading.Thread):
        def __init__(self, id):
            super(Guard2, self).__init__()
            self.setName("Sir"+str(id))
    
        def run(self):
            print("Guard2 run  start..........................")
    
            try:
                print("Nice to meet you #{0}".format(
                    self.name
                ))
                time.sleep(1)
                print("ByeBye... #{0}".format(
                    self.name
                ))
            finally:
                print("release ..........")
    
    
    def main():
        print("start...")
        lock = threading.Lock()
        for i in range(5):
            if i == 3:
                guard = Guard2(i+1)
            else:
                guard = Guard(i+1, lock)
            guard.start()
        print("ending...")
    
    
    
    if __name__ == "__main__":
        main()
    
    
    

    结果为:

    start...
    run  start..........................
    lock ......................
    Nice to meet you #Sir1
    run  start..........................
    run  start..........................
    Guard2 run  start..........................
    Nice to meet you #Sir4
    run  start..........................ending...
    
    ByeBye... #Sir1
    release ..........
    ByeBye... #Sir4lock ......................
    
    release ..........Nice to meet you #Sir2
    
    ByeBye... #Sir2
    release ..........
    lock ......................
    Nice to meet you #Sir3
    ByeBye... #Sir3
    release ..........
    lock ......................
    Nice to meet you #Sir5
    ByeBye... #Sir5
    release ..........
    

    也就是说,一般情况下,几个线程被同一把锁控制,那么会一个一个的运行,但是一个其它线程的锁上前的代码是不受影响的,只有被锁的部分会晚一点.

    如果我们设置blocking=Flase, 即不阻塞:

    
    
    class Guard(threading.Thread):
        def __init__(self, id, lock):
            super(Guard, self).__init__()
            self.setName("Sir"+str(id))
            self.lock = lock
        def run(self):
            print("run  start..........................")
            flag = self.lock.acquire(False) #!!!
            print("lock ......................")
            try:
                print("Nice to meet you #{0}".format(
                    self.name
                ))
                time.sleep(1)
                print("ByeBye... #{0}".format(
                    self.name
                ))
            finally:
                if flag: #!!!
                    self.lock.release()
                print("release ..........")
    
    class Guard2(threading.Thread):
        def __init__(self, id):
            super(Guard2, self).__init__()
            self.setName("Sir"+str(id))
    
        def run(self):
            print("Guard2 run  start..........................")
    
            try:
                print("Nice to meet you #{0}".format(
                    self.name
                ))
                time.sleep(1)
                print("ByeBye... #{0}".format(
                    self.name
                ))
            finally:
                print("release ..........")
    
    
    def main():
        print("start...")
        lock = threading.Lock()
        for i in range(5):
            if i == 3:
                guard = Guard2(i+1)
            else:
                guard = Guard(i+1, lock)
            guard.start()
        print("ending...")
    
    
    
    if __name__ == "__main__":
        main()
    
    
    

    注意我们改的地方, 结果为:

    start...
    run  start..........................
    lock ......................
    Nice to meet you #Sir1
    run  start..........................
    lock ......................
    Nice to meet you #Sir2
    run  start..........................
    lock ......................
    Nice to meet you #Sir3
    Guard2 run  start..........................
    Nice to meet you #Sir4
    run  start..........................
    lock ......................
    Nice to meet you #Sir5
    ending...
    ByeBye... #Sir1
    release ..........
    ByeBye... #Sir5ByeBye... #Sir4
    release ..........
    release ..........ByeBye... #Sir2
    release ..........
    
    
    ByeBye... #Sir3
    release ..........
    

    因为第一个guard上锁了,所以后面的就上不了锁,也就相当于不上锁了.

    threading.RLock

    RLock 与 Lock是相似的,不同的地方在于,在同一个线程内,可以锁多次.

    
    class Guard(threading.Thread):
        def __init__(self, id, lock):
            super(Guard, self).__init__()
            self.setName("Sir"+str(id))
            self.lock = lock
        def run(self):
            if self.lock.acquire():
                print("lock 1...")
                if self.lock.acquire():
                    print("lock 2...")
                else:
                    print("error")
    
    def main():
        print("start...")
        lock = threading.Lock()
        guard = Guard(6, lock)
        guard.start()
        print("ending...")
    
    
    
    if __name__ == "__main__":
        main()
    
    

    会死锁

    start...
    lock 1...ending...
    

    修改为:

    def main():
        print("start...")
        lock = threading.RLock()
        guard = Guard(6, lock)
        guard.start()
        print("ending...")
    
    
    
    start...
    lock 1...
    lock 2...
    ending...
    

    Condition

    
    class Guard(threading.Thread):
        def __init__(self, id, lock):
            super(Guard, self).__init__()
            self.setName("Sir"+str(id))
            self.lock = lock
        def run(self):
            global flag
            with self.lock:
                while not flag:
                    print("wait...")
                    self.lock.wait() #wait会暂时释放锁
                print("out...")
    
    
    
    
    flag = False
    def main():
        global flag
        print("start...")
        lock = threading.Condition(threading.Lock())
        guard = Guard(6, lock)
        guard.start()
        flag = True
        print(lock.acquire())
        lock.notify()
        lock.release()
        print("ending...")
    
    
    
    if __name__ == "__main__":
        main()
    
    

    上面程序是这样的,guard线程被激活,因为flag=False, 所以第一次循环,会执行self.lock.wait(),此时程序会挂起并等待(注意,此时锁被暂时释放), 回到主进程,我们令flag=True, 并再一次请求锁(这一步是必须的,否则会报错), lock.notify()会通知线程重新激活,于是回到线程中,因为flag=True, 所以下一次循环判断的时候,会跳出循环(所以,如果没有flag=True这一步,线程会再一次挂起), 并且再一次进入锁的状态.

    我们将代码改成如下:

    
    
    class Guard(threading.Thread):
        def __init__(self, id, lock):
            super(Guard, self).__init__()
            self.setName("Sir"+str(id))
            self.lock = lock
        def run(self):
            global flag
            with self.lock:
                while not flag:
                    print("wait..." + self.name)
                    self.lock.wait() #wait会暂时释放锁
                print("out..." + self.name)
    
    
    
    
    flag = False
    def main():
        global flag
        print("start...")
        lock = threading.Condition(threading.Lock())
        guards = (Guard(i, lock) for i in range(9))
        for guard in guards:
            guard.start()
        flag = True #这句话实际上没有必要
        print(lock.acquire())
        lock.notify(9)
        lock.release()
        print("ending...")
    
    
    
    if __name__ == "__main__":
        main()
    
    
    
    

    我们弄了9个线程,并利用lock.notify(9)重新激活9个, 根据结果,发现激活的顺序并不固定:

    start...
    wait...Sir0
    wait...Sir1
    wait...Sir2
    wait...Sir3
    wait...Sir4
    wait...Sir5
    wait...Sir6
    wait...Sir7
    wait...Sir8
    True
    ending...out...Sir0
    
    out...Sir4
    out...Sir3
    out...Sir5
    out...Sir6
    out...Sir7
    out...Sir8
    out...Sir1
    out...Sir2
    

    注意 lock.notify(n)表示最多激活n个,也可以用lock.notify_all()来激活全部.
    另外需要一提的是wait_for(predicate, timeout=None), 等价于:

    while not predicate():
    	lock.wait()
    

    Semaphore

    
    class Guard(threading.Thread):
        def __init__(self, id, lock):
            super(Guard, self).__init__()
            self.setName("Sir"+str(id))
            self.lock = lock
        def run(self):
            global flag
            print("lock..." + self.name)
            with self.lock:
                print("out..." + self.name)
                time.sleep(1)
    
    
    
    def main():
        global flag
        print("start...")
        lock = threading.Semaphore(value=5)
        guards = (Guard(i, lock) for i in range(9))
        for guard in guards:
            guard.start()
        print("ending...")
    
    
    
    if __name__ == "__main__":
        main()
    
    start...
    lock...Sir0
    out...Sir0
    lock...Sir1
    out...Sir1
    lock...Sir2
    out...Sir2
    lock...Sir3
    out...Sir3
    lock...Sir4
    out...Sir4
    lock...Sir5
    lock...Sir6
    lock...Sir7
    lock...Sir8
    ending...
    out...Sir5
    out...Sir6
    out...Sir8
    out...Sir7
    
    

    可以看到,前5个线程请求锁都成功了,后面的都被阻塞了一会儿, 这是因为semaphere类我们设定了value=5表示最大允许请求锁为5, 感觉这个功能还是蛮有用的.

    其机制可以看这里.

    Event

    这里讲的很明白.

    Timer

    直接用官方的例子:

    
    
    if __name__ == "__main__":
        def hello():
            print("hello, world")
    
    
        t = threading.Timer(30.0, hello)
        t.start()  # after 30 seconds, "hello, world" will be printed
    
    

    30秒后会打印"hello world".

    
    if __name__ == "__main__":
        def hello():
            print("hello, world")
    
    
        t = threading.Timer(30.0, hello)
        t.start()  # after 30 seconds, "hello, world" will be printed
        t.cancel()
    
    

    并且我们可以通过cancel()来中途取消,就像上面的一样,这样就不会打印了.

    Barrier

    
    class Guard(threading.Thread):
        def __init__(self, id, lock):
            super(Guard, self).__init__()
            self.setName("Sir"+str(id))
            self.lock = lock
        def run(self):
            global flag
            while True:
                print("lock..." + self.name)
                self.lock.wait()
                print("out..." + self.name)
                time.sleep(1)
    
    
    
    def main():
        global flag
        print("start...")
        lock = threading.Barrier(3)
        guards = (Guard(i, lock) for i in range(3))
        for guard in guards:
            guard.start()
        print("ending...")
    
    
    
    if __name__ == "__main__":
        main()
    

    结果为:

    start...
    lock...Sir0
    lock...Sir1
    lock...Sir2ending...
    
    out...Sir2out...Sir1
    out...Sir0
    
    lock...Sir0
    lock...Sir2
    lock...Sir1
    out...Sir1
    out...Sir0
    out...Sir2
    ...
    

    因为设定为3,所以只有当wait()状态的线程达到3的时候,才会一起唤醒这些线程.

    Thread-Local Data

    Thread-local data is data whose values are thread specific. To manage thread-local data, just create an instance of local (or a subclass) and store attributes on it:

    mydata = threading.local()
    mydata.x = 1
    

    The instance’s values will be different for separate threads.

    class threading.local
    A class that represents thread-local data.

    For more details and extensive examples, see the documentation string of the _threading_local module.

    通过这玩意儿,可以保证线程的读写互补干扰.

    一些其他的函数

    threading.active_count()
    Return the number of Thread objects currently alive. The returned count is equal to the length of the list returned by enumerate().

    threading.current_thread()
    Return the current Thread object, corresponding to the caller’s thread of control. If the caller’s thread of control was not created through the threading module, a dummy thread object with limited functionality is returned.

    threading.get_ident()
    Return the ‘thread identifier’ of the current thread. This is a nonzero integer. Its value has no direct meaning; it is intended as a magic cookie to be used e.g. to index a dictionary of thread-specific data. Thread identifiers may be recycled when a thread exits and another thread is created.

    New in version 3.3.

    threading.enumerate()
    Return a list of all Thread objects currently alive. The list includes daemonic threads, dummy thread objects created by current_thread(), and the main thread. It excludes terminated threads and threads that have not yet been started.

    threading.main_thread()
    Return the main Thread object. In normal conditions, the main thread is the thread from which the Python interpreter was started.

    New in version 3.4.

    threading.settrace(func)
    Set a trace function for all threads started from the threading module. The func will be passed to sys.settrace() for each thread, before its run() method is called.

    threading.setprofile(func)
    Set a profile function for all threads started from the threading module. The func will be passed to sys.setprofile() for each thread, before its run() method is called.

    threading.stack_size([size])
    Return the thread stack size used when creating new threads. The optional size argument specifies the stack size to be used for subsequently created threads, and must be 0 (use platform or configured default) or a positive integer value of at least 32,768 (32 KiB). If size is not specified, 0 is used. If changing the thread stack size is unsupported, a RuntimeError is raised. If the specified stack size is invalid, a ValueError is raised and the stack size is unmodified. 32 KiB is currently the minimum supported stack size value to guarantee sufficient stack space for the interpreter itself. Note that some platforms may have particular restrictions on values for the stack size, such as requiring a minimum stack size > 32 KiB or requiring allocation in multiples of the system memory page size - platform documentation should be referred to for more information (4 KiB pages are common; using multiples of 4096 for the stack size is the suggested approach in the absence of more specific information).

  • 相关阅读:
    读spring Micro-Service tats收获
    读spring Micro-Service tats收获
    读spring Micro-Service tats收获
    读Software Entity Architektur收获
    读Software Entity Architektur收获
    读Software Entity Architektur收获
    mvc案例
    11.16
    11.15
    11.13
  • 原文地址:https://www.cnblogs.com/MTandHJ/p/11185717.html
Copyright © 2011-2022 走看看