zoukankan      html  css  js  c++  java
  • Python 多线程

    Python多线程

    Threading模块用于提供线程相关的操作,线程是应用程序中工作的最小单元。

    threading模块提供的类:    

      Thread, Lock, Rlock, Condition, [Bounded]Semaphore, Event, Timer, local。

    Thread类

    Thread是线程类,有两种使用方法,直接传入要运行的方法或从Thread继承并覆盖run():

    # -*- coding: utf-8 -*-
    # @Author  : Jack.Ming
    import threading
    import time
    
    def run(arg):
        time.sleep(2)
        print("Thread-%s" % arg, )
    
    if __name__ == '__main__':
        for i in range(10):
            # 将要执行的方法作为参数传递给Thread方法
            thread = threading.Thread(target=run, args=(i,))
            thread.start()
    方式1:创建线程
    Thread-3
    Thread-2
    Thread-1
    Thread-0
    Thread-4
    Thread-7
    Thread-6
    Thread-5
    Thread-9
    Thread-8
    
    Process finished with exit code 0
    执行结果
    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    # @Author  : Jack.Ming
    import threading
    import time
    
    # 从Thread继承,并重写run()
    class myThread(threading.Thread):
        def __init__(self, num):
            # #注意:一定要显式的调用父类的初始化函数。
            threading.Thread.__init__(self)
            self.num = num
    
        # 定义每个线程要运行的函数
        def run(self):
            time.sleep(1)
            print("Thread-%s" % self.num)
    
    if __name__ == '__main__':
        for i in range(10):
            thread = myThread(i)
            thread.start()
    方式2:自定义线程类,继承Thread类,并重写run方法
    Thread-4
    Thread-2
    Thread-1
    Thread-3
    Thread-0
    Thread-6
    Thread-5
    Thread-8
    Thread-9
    Thread-7
    
    Process finished with exit code 0
    运行结果

    Thread类的构造方法: 

    Thread(group=None, target=None, name=None,args=(), kwargs=None, *, daemon=None)

      class Thread:
        def __init__(self, group=None, target=None, name=None,args=(), kwargs=None, *, daemon=None):


      • group: 线程组,目前还没有实现,库引用中提示必须是None;
      • target: 要执行的方法run();
      • name: 线程名;
      • args/kwargs: 要传入方法的参数args元组,kwargs字典。
      • daemon=None:如果子类重写构造函数,则必须确保在对线程执行其他操作之前调用基类构造函数

    Thread实例方法: 

      • is_alive(): 返回线程是否在运行[bool]。正在运行指启动后、终止前。
      • getName(): 获取线程名。
      • setName(str):设置线程名
      • start(): 线程准备就绪,启动线程,等待CPU调度
      • isDaemon():返回线程前台线程还是后台线程[bool]
      • setDaemon(bool):设置线程前台线程还是后台线程[bool],必须在start前设置
         如果是后台线程[True],主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,主线程和后台线程均停止
         如果是前台线程[False]默认,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序停止
      • join([timeout]): 阻塞当前上下文环境的线程,直到调用此方法的线程终止或到达指定的timeout(可选参数),逐个执行每个线程,执行完毕后继续往下执行,该方法使得多线程变得无意义

    #!/usr/bin/env python
    # -*- coding: utf-8 -*
    # Created by YangYongming at 2018/11/24 14:07
    # FileName: 5.py
    
    import threading
    
    def run(arg):
        print(arg)
    
    thread = threading.Thread(name="yangym", target=run(1), )
    print(thread.getName())  # 输出 yangym
    thread.setDaemon(False)
    thread.setName("YANGYM")
    print(thread.getName())  # 输出 YANGYM
    thread.start()
    print(thread.is_alive())  # 输出 False
    print(thread.isDaemon())  # 输出 False
    实例演示

    setDaemon 使用介绍

    setDaemon为True时,主线程不等待其他线程的执行;setDaemon为False时(默认),主线程等待所有其他线程执行完成后再继续往下执行。

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    # @Author  : Jack.Ming
    import threading
    import time
    
    # 从Thread继承,并重写run()
    class myThread(threading.Thread):
        def __init__(self, num):
            # #注意:一定要显式的调用父类的初始化函数。
            threading.Thread.__init__(self)
            self.num = num
    
        # 定义每个线程要运行的函数
        def run(self):
            time.sleep(1)
            print("Thread-%s" % self.num)
    
    if __name__ == '__main__':
        for i in range(10):
            thread = myThread(i)
            thread.setDaemon(True)
            thread.start()
    setDaemon为True
    Process finished with exit code 0
    运行结果:主程序执行完毕后立即退出
    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    # @Author  : Jack.Ming
    import threading
    import time
    
    # 从Thread继承,并重写run()
    class myThread(threading.Thread):
        def __init__(self, num):
            # #注意:一定要显式的调用父类的初始化函数。
            threading.Thread.__init__(self)
            self.num = num
    
        # 定义每个线程要运行的函数
        def run(self):
            time.sleep(1)
            print("Thread-%s" % self.num)
    
    if __name__ == '__main__':
        for i in range(10):
            thread = myThread(i)
            thread.setDaemon(False) #默认
            thread.start()
    setDaemon为False
    Thread-2
    Thread-3
    Thread-1
    Thread-0
    Thread-9
    Thread-6
    Thread-4
    Thread-7
    Thread-8
    Thread-5
    
    Process finished with exit code 0
    运行结果:主程序执行完毕后会等待其他所有线程全部结束后在退出程序

     join 使用介绍

    阻塞当前上下文环境的线程,直到调用此方法的线程终止或到达指定的timeout(可选参数),逐个执行每个线程,执行完毕后继续往下执行,该方法使得多线程变得无意义

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    # @Author  : Jack.Ming
    import threading
    import time
    
    def run(arg):
        time.sleep(2)
        print("Thread-%s Time is: %s" % (arg,time.ctime()) )
    
    
    if __name__ == '__main__':
        for i in range(10):
            # 将要执行的方法作为参数传递给Thread方法
            thread = threading.Thread(target=run, args=(i,))
            thread.start()
            thread.join()
    使用join,使用join的线程会阻塞其他线程的运行,指导线程结束或者超时,其他线程才可以继续运行
    Thread-0 Time is: Sat Mar 10 13:33:24 2018
    Thread-1 Time is: Sat Mar 10 13:33:26 2018
    Thread-2 Time is: Sat Mar 10 13:33:28 2018
    Thread-3 Time is: Sat Mar 10 13:33:30 2018
    Thread-4 Time is: Sat Mar 10 13:33:32 2018
    Thread-5 Time is: Sat Mar 10 13:33:34 2018
    Thread-6 Time is: Sat Mar 10 13:33:36 2018
    Thread-7 Time is: Sat Mar 10 13:33:38 2018
    Thread-8 Time is: Sat Mar 10 13:33:40 2018
    Thread-9 Time is: Sat Mar 10 13:33:42 2018
    
    Process finished with exit code 0
    运行结果:每个线程都会等待使用join的线程执行完毕后才继续运行,等待2秒,这使得多线程变得没有意义
    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    # @Author  : Jack.Ming
    import threading
    import time
    
    def run(arg):
        time.sleep(2)
        print("Thread-%s Time is: %s" % (arg,time.ctime()) )
    
    if __name__ == '__main__':
        for i in range(10):
            # 将要执行的方法作为参数传递给Thread方法
            thread = threading.Thread(target=run, args=(i,))
            thread.start()
    不使用join,线程之前运行互不影响
    Thread-4 Time is: Sat Mar 10 13:34:22 2018
    Thread-2 Time is: Sat Mar 10 13:34:22 2018
    Thread-1 Time is: Sat Mar 10 13:34:22 2018
    Thread-0 Time is: Sat Mar 10 13:34:22 2018
    Thread-3 Time is: Sat Mar 10 13:34:22 2018
    Thread-6 Time is: Sat Mar 10 13:34:22 2018
    Thread-5 Time is: Sat Mar 10 13:34:22 2018
    Thread-9 Time is: Sat Mar 10 13:34:22 2018
    Thread-8 Time is: Sat Mar 10 13:34:22 2018
    Thread-7 Time is: Sat Mar 10 13:34:22 2018
    
    Process finished with exit code 0
    运行结果

     线程锁(Lock、RLock)

    由于线程之间是进行随机调度,并且每个线程可能只执行n条执行之后,当多个线程同时修改同一条数据时可能会出现脏数据,所以,出现了线程锁 - 同一时刻允许一个线程执行操作。

    Lock属于全局,Rlock属于线程。Lock(),Rlock(),推荐使用Rlock()

    一把锁头,一把钥匙,钥匙用完给下一个人用

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    # @Author  : Jack.Ming
    import threading
    import time
    num = 0
    def run(arg):
        global num
        time.sleep(1)
        num = num + arg
        print(num)
    
    if __name__ == '__main__':
        for i in range(10):
            # 将要执行的方法作为参数传递给Thread方法
            thread = threading.Thread(target=run, args=(i,))
            thread.start()
    未使用锁,所有线程随机运行,数据混乱
    5
    8
    9
    11
    15
    15
    24
    32
    39
    45
    
    Process finished with exit code 0
    执行结果,未按照理想的顺序运行
    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    # @Author  : Jack.Ming
    import threading
    import time
    
    lock = threading.RLock()
    num = 0
    
    def run(arg):
        lock.acquire()
        global num
        time.sleep(1)
        num = num + arg
        print(num)
        lock.release()
    
    if __name__ == '__main__':
        for i in range(10):
            # 将要执行的方法作为参数传递给Thread方法
            thread = threading.Thread(target=run, args=(i,))
            thread.start()
    使用锁,设置一条数据在同一时间只可以被一个线程修改
    0
    1
    3
    6
    10
    15
    21
    28
    36
    45
    
    Process finished with exit code 0
    运行结果

     信号量(Semaphore)

    互斥锁Rlock 同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据 ,比如厕所有3个坑,那最多只允许3个人上厕所,后面的人只能等里面有人出来了才能再进去。

    一把锁头,n把钥匙,你把钥匙全部用完,再给下n个人使用

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    # @Author  : Jack.Ming
    import threading
    import time
    
    class myThread(threading.Thread):
        def __init__(self, num):
            threading.Thread.__init__(self)
            self.num = num
    
        def run(self):
            semaphore.acquire()
            time.sleep(5)
            print("Thread-%s Time is: %s" % (self.num,time.ctime()))
            semaphore.release()
    
    if __name__ == '__main__':
        semaphore = threading.BoundedSemaphore(5) # 最多允许5个线程被同事执行
        for i in range(10):
            thread = myThread(i)
            thread.start()
    实例:某条语句设置最多可以5个线程同时执行
    Thread-4 Time is: Sat Mar 10 14:07:00 2018
    Thread-3 Time is: Sat Mar 10 14:07:00 2018
    Thread-2 Time is: Sat Mar 10 14:07:00 2018
    Thread-1 Time is: Sat Mar 10 14:07:00 2018
    Thread-0 Time is: Sat Mar 10 14:07:00 2018
    Thread-9 Time is: Sat Mar 10 14:07:05 2018
    Thread-5 Time is: Sat Mar 10 14:07:05 2018
    Thread-7 Time is: Sat Mar 10 14:07:05 2018
    Thread-6 Time is: Sat Mar 10 14:07:05 2018
    Thread-8 Time is: Sat Mar 10 14:07:05 2018
    
    Process finished with exit code 0
    # 每次执行5个线程,中间间隔5秒
    运行结果:看时间

     事件(event)

    python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法 set、wait、clear。

    事件处理的机制:全局定义了一个“Flag”,如果“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,如果“Flag”值为True,那么event.wait 方法时便不再阻塞。

    • clear:将“Flag”设置为False 默认的,当Flag为False,无论线程执行到哪里,遇到wait后全部阻塞
    • set:将“Flag”设置为True   无论在哪里,遇到set后所有线程正常执行
    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    # @Author  : Jack.Ming
    import threading
    import time
    
    class myThread(threading.Thread):
        def __init__(self, num, event):
            threading.Thread.__init__(self)
            self.num = num
            self.event = event
    
        def run(self):
            print("Thread-%s" % self.num)
            self.event.wait() # 所有线程到此将被阻塞
            print(self.num)
    
    if __name__ == '__main__':
        event_obj = threading.Event()
        for i in range(10):
            thread = myThread(i, event_obj)
            thread.start()
        inp = input("please input:") # 输入true时,所有被阻塞线程恢复执行
        if inp == "true":
            event_obj.set()
    实例
    Thread-0
    Thread-1
    Thread-2
    Thread-3
    Thread-4
    Thread-5
    Thread-6
    Thread-7
    Thread-8
    Thread-9
    please input:true
    0
    2
    3
    1
    5
    7
    8
    9
    4
    6
    
    Process finished with exit code 0
    运行结果

    Queue类

    Python 的 Queue 模块中提供了同步的、线程安全的队列类,包括FIFO(先入先出)队列Queue,LIFO(后入先出)队列LifoQueue,和优先级队列 PriorityQueue。

    这些队列都实现了锁原语,能够在多线程中直接使用,可以使用队列来实现线程间的同步。

    Queue 模块中的常用方法:

    • Queue.qsize() 返回队列的大小
    • Queue.empty() 如果队列为空,返回True,反之False
    • Queue.full() 如果队列满了,返回True,反之False
    • Queue.full 与 maxsize 大小对应
    • Queue.get([block[, timeout]])获取队列,timeout等待时间
    • Queue.get_nowait() 相当Queue.get(False)
    • Queue.put(item) 写入队列,timeout等待时间
    • Queue.put_nowait(item) 相当Queue.put(item, False)
    • Queue.task_done() 在完成一项工作之后,Queue.task_done()函数向任务已经完成的队列发送一个信号
    • Queue.join() 实际上意味着等到队列为空,再执行别的操作
    #!/usr/bin/python3
    
    import queue
    import threading
    import time
    
    exitFlag = 0
    
    class myThread (threading.Thread):
        def __init__(self, threadID, name, q):
            threading.Thread.__init__(self)
            self.threadID = threadID
            self.name = name
            self.q = q
        def run(self):
            print ("开启线程:" + self.name)
            process_data(self.name, self.q)
            print ("退出线程:" + self.name)
    
    def process_data(threadName, q):
        while not exitFlag:
            queueLock.acquire()
            if not workQueue.empty():
                data = q.get()
                queueLock.release()
                print ("%s processing %s" % (threadName, data))
            else:
                queueLock.release()
            time.sleep(1)
    
    threadList = ["Thread-1", "Thread-2", "Thread-3"]
    nameList = ["One", "Two", "Three", "Four", "Five"]
    queueLock = threading.Lock()
    workQueue = queue.Queue(10)
    threads = []
    threadID = 1
    
    # 创建新线程
    for tName in threadList:
        thread = myThread(threadID, tName, workQueue)
        thread.start()
        threads.append(thread)
        threadID += 1
    
    # 填充队列
    queueLock.acquire()
    for word in nameList:
        workQueue.put(word)
    queueLock.release()
    
    # 等待队列清空
    while not workQueue.empty():
        pass
    
    # 通知线程是时候退出
    exitFlag = 1
    
    # 等待所有线程完成
    for t in threads:
        t.join()
    print ("退出主线程")
    实例
    开启线程:Thread-1
    开启线程:Thread-2
    开启线程:Thread-3
    Thread-3 processing One
    Thread-3 processing Two
    Thread-1 processing Three
    Thread-2 processing Four
    Thread-1 processing Five
    退出线程:Thread-2
    退出线程:Thread-1
    退出线程:Thread-3
    退出主线程
    运行结果

    简单应用

    多线程ping操作,

    ping.py:主程序

    sfile:IP地址段列表文件

    dfile:ping通的地址保存到该文件中

    sfile 文件:存放IP网段列表,每行一个,内容如下

    #!/usr/bin/env python
    # -*- coding: utf-8 -*
    # Created by YangYongming at 2018/11/25 12:26
    # FileName: ping.py
    import threading
    import IPy
    import os
    import queue
    
    
    class MyThread(threading.Thread):
        def __init__(self, queue):
            threading.Thread.__init__(self)
            self.queue = queue
    
        def run(self):  # 定义每个线程要运行的函数
            global aliveip
            while True:
                host = self.queue.get(timeout=2)  # 从队列中取Ip
                host = str(host)
                res = os.popen("ping -n 1 %s" % host)
                os.popen("exit
    ")
                if "TTL" in res.read():
                    print("%s is Alive" % host)
                    lock.acquire()
                    with open("dfile", "a", encoding="utf-8") as f2:
                        f2.write(host + '
    ')
                    aliveip.append(host)
                    lock.release()
                self.queue.task_done()
                if self.queue.empty(): # 当队列为空时,终止该线程
                    break
    
    
    if __name__ == '__main__':
        threadlist = []
        lock = threading.RLock()
        queue = queue.Queue(50)  # 初始化一个队列, 容量50
        aliveip = []
        for i in range(40):  # 建立40个线程
            t = MyThread(queue)
            t.setDaemon(False)  # 主线程执行完成,等待所有的前台线程执行完毕,默认[等待]False
            t.start()  # 线程准备就绪,等待CPU调度
        with open("sfile", "r", encoding="utf-8") as f1:
            for line in f1.readlines():
                ip = IPy.IP(line)
                for x in ip:
                    queue.put(x)  # 向队列里面放IP
        queue.join()
    主程序

    输出结果:

    将可以ping通的地址保存到dfile文件中,内容如下:

  • 相关阅读:
    MySQL数据库的创建&删除&选择
    JS实现异步的几种方式
    十种排序算法实例说明总结
    常用的bug管理工具
    Bootstrap+Hbuilder
    从菜鸟的视角看测试!
    安装numpy和matplotlib
    Eclipse在线安装svn
    重新打个招呼
    <USACO09JAN>气象测量/气象牛The Baric Bovineの思路
  • 原文地址:https://www.cnblogs.com/ming5218/p/8538825.html
Copyright © 2011-2022 走看看