zoukankan      html  css  js  c++  java
  • Python之线程

    一、python中常见的线程库

    _thread--->threading---->Queue---->ThreadingPoolExecutor

    二、_thread模块——低级的threading API接口

    2.1、_thread模块是最基本的用来编写多线程(也叫做轻量级进程或任务)的模块。控制的多线程共享全局数据空间。为了数据同步,会加上简易锁(互斥锁或二进制信号)。threading基于_threading提供了更加简单的高级使用多线程的API接口。由于这个包是可选的,没有的话,使用_dummy_thread替代。详细内容请参考官网

    2.1、_thread模块提供的方法:

    identifer = _thread.start_new_thread(function, args[, kwargs])创建一个线程返回其标识符
    
    _thread.interrupt_main() 会往main thread(主线程)抛出一个KeyboardInterrupt,打断主线程的执行
    
    _thread.stack_size([size])返回线程创建新线程堆使用的大小,0使用系统默认值,否则指定必须大于32,768(32K)
    
    _thread.TIMEOUT_MAX允许Lock.acquire()获取锁的超时时间,如果超时将抛出OverflowError
    
    _thread.get_ident() 返回当前线程的identifier
    
    _thread.allocate_lock()返回一个锁对象
    
    _thread.exit()退出线程
    方法

    2.3、锁对象class LockType 和class RLock()

    lock.acquire(waitflag=1, timeout=-1):waitflag是否去等,timeout等多长时间
    
    lock.release()释放锁
    
    lock.locked()返回锁的状态
    
    锁还可以用with
    import _thread
    
    a_lock = _thread.allocate_lock()
    
    with a_lock:
        print("a_lock is locked while this executes")
    锁的方法

    2.4、_thread 使用样例

    import _thread as thread
    
    import time
    
    executed_count = 0
    
    # Define a function for the thread
    def print_time(thread_name, delay):
        global executed_count
        count = 0
        print(thread.stack_size(32768))
        while count < 5:
            time.sleep(delay)
            count += 1
            print("%s: %s" % (thread_name, time.ctime(time.time())))
        executed_count += 1
    
    
    # Create two threads as follows
    try:
        threads = [thread.start_new_thread(print_time, ("Thread-1", 2,)),
                   thread.start_new_thread(print_time, ("Thread-2", 4,))]
    except:
        print("Error: unable to start thread")
    
    while executed_count < 2:
        pass
    使用样例

    2.5、说明:

    1、调用sys.exit()或者抛出 SystemExit异常等同于调用 _thread.exit()
    线程之间可以相互中断:任意的线程抛出KeyboardInterrupt可能会让其他线程收到(一般都会抛给主线程)
    2、Lock的acquire()是不可中断的——已经获得锁,还要去acquire()就会抛出KeyboardInterrupt异常
    3、当主线程退出时,系统将决定其他线程是否存活。在大多数系统,其他线程都会被直接杀死(不执行try ... finally...代码 和 也不会执行对象销毁)
    4、当主线程退出时,主线程不会做它的日常清除工作(除了try ...finally...代码会执行),标准I/O文件也不会刷新
    注意

    三、threading模块——基于线程的并发

    这个模块构建了在_thread模块的基础上,构建了高级的线程接口。这个模块参考了Java的threading模块。但是两者有所不同,比如Java创建锁和条件基于每个对象的基本行为而变化,但是在Python中他们是不同的对象。Python的线程类支持Java 线程类的子集行为。目前,线程之间无优先级,无线程组,线程不可销毁,停止,挂起,重新开始,或者打断。 Java线程类的静态方法,映射的是模块级别的函数。

    3.1、threading 模块提供的方法:

    threading.active_count():返回活动的线程数目,其等于threading.enumerate()返回的列表的长度
    
    threading.current_thread():返回当前的线程对象
    
    threading.get_ident():返回当前线程的标识符。线程标识符可以循环使用
    
    threading.enumerate():返回当前活动的所有线程对象的列表。这个列表包含由current_thread()创建的daemonic 线程,dummy threads,以及主线程。不包含已经终止的线程和未启动的线程
    
    threading.main_thread():返回主线程对象。主线程就是Python解释器启动的线程。
    
    threading.settrace(func):为所有从threading启动的函数设置一个跟踪函数。函数将通过sys.settrace()传递给每一个线程。在其run()调用之前执行。
    
    threading.setprofile(func):为所有从threading启动的函数设置一个概要函数。函数将通过sys.setprofile()传递给每一个线程。在其run()调用之前执行。
    
    threading.stack_size([size]):和_thread的一样
    threading方法

    3.2、threading模块的常量

    threading.TIMEOUT_MAX:设置Lock.acquire()RLock.acquire()Condition.wait()这些阻塞函数的最大超时时间。3.3、Thread-Local Data:线程局部数据时线程特定的数据。为了管理thread-local数据,需要创建一个local的实例,将参数存储在里面。不同的线程其数据不同。

    mydata = threading.local()
    mydata.x = 1

    3.4、Thread对象:thread类代表在不同的控制线程的执行不同的活动。有两种指定其活动的方式:给其构造器传递可调用对象或继承Thread覆写其run()方法。其他方法不应该被覆写,除了其构造方法。也就是说,只能覆写__init__()方法和run()方法

    实例化
    import threading 
    import time
    
    
    def thread1_func(n):
        print("路飞正在使用橡胶火箭炮%s,攻击力%s" %(time.ctime(),n))
        time.sleep(3)
        print("路飞结束该技能%s" % time.ctime())
    
    
    def thread2_func(n):
        print("艾尼路正在出雷神万击%s你,攻击力%s" %(time.ctime(),n))
        time.sleep(5)
        print("艾尼路结束该技能%s" %time.ctime())
    
    if __name__ == '__main__':
    
        thread_1 = threading.Thread(target=thread1_func,args=(10,))  # 创建子线程
        thread_2 = threading.Thread(target=thread2_func,args=(9,))
        # thread_1.setDaemon(True)  # 是否设置为守护线程,主进程的退出,不等待daemon线程
        # thread_2.setDaemon(True)
        thread_1.start()
        thread_2.start()
        # thread_1.join() # 等待线程终止,踩继续往下执行
        # thread_2.join()
    
        print("ending Fighting")
    方法创建线程
    import threading
    import time
    
    
    class MyThread(threading.Thread):
        def __init__(self,num):
            threading.Thread.__init__(self)
            self.num = num
    
        def run(self):  # 定义每个线程要运行的函数
            print("running on number:%s" % self.num)
            time.sleep(3)
            print("ending......")
    
    if __name__ == '__main__':
        t1 = MyThread(1)  # 继承这个类,把1这个参数,传给num ,t1就是个线程对象
        t2 = MyThread(2)
        t1.start()
        t2.start()
    继承方法创建线程

    一旦线程创建。其活动的的启动可以调线程的start()方法。这将会调用在其控制隔离的线程调用其run()方法。

    一旦线程活动开始,就认为其为活动的。当run()执行结束时,其将正常死亡,或者抛出一个未处理的异常。is_alive()方法可以判断其是否存活。

    其他的线程可以调用一个线程的join()方法。这将会阻塞调用的线程 直到 被调用join()方法的线程终止。

    线程可以有名字。可以传递给其构造方法,可以通过其name属性 读取或改变

    线程可以被标记为"daemon线程"。其存在的意义,当只有daemon进程存在是,整个Python程序就可以退出。其初始值可以从创建进程时继承。这个标记可以通过daemon属性设置  或者 通过 daemon构造器参数。

    注意:Daemon线程突然停止或关闭,其资源(例如打开文件,数据库事务等等)可能不能正确释放。如果你需要线程优雅地通知,请将其设置为非daemon或者使用合适的信号机制例如:Event

    dummy thread objects有可能被创建。这些线程被视为外部线程,这些线程的控制在threading模块之外,比方说直接从C代码。Dummy线程对象仅仅支持有限的功能。通常被认为是存活的和daemoic,不能被join()。他们不可能被删除,因为不能监测到外部线程的终止

    线程对象的方法:

    Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None):构造方法
    
    start():启动线程的活动,同一线程对象只能调用一次run()方法,否则会报RuntimeError错误
    
    run():代表线程活动的方法。可以在子类中重写本方法,标准的run()方法是以构造器传递的targer参数传递的可调用对象。
    
    join(timeout=None):等待线程终止。这将阻塞调用join的线程,直到join()方法的线程调用结束。
    
    name:仅仅是为了标识线程。多个线程可以有同一个名字。可以通过构造器传递
    
    getName():获取名字
    
    setName():设置名字
    
    ident:标识符
    
    is_alive():是否存活
    
    daemon:是否为daemon进程
    
    isDaemon()
    
    setDaemon()
    Thread对象的属性和方法

    3.5、锁对象Lock

    原始的锁是同步的,它不属于特定的线程。在Python中,它是可用的最低等级的同步锁,私有_thread扩展模块实现的

    原始的锁有两种状态:锁和未锁。创建的时候是未锁状态,有两个基本的方法:acquire()获得和release()释放。如果release()释放未锁的锁会报RuntimeError

    锁对象支持上下文管理协议

    当多个线程等待获取锁,只有一个线程可以获得锁

    class threading.Lock:实现了基本的锁对象。

    acquire(blocking=True, timeout=-1)
    
    release()
    锁的两种方法

    3.6、RLock对象(可重复载入锁)

    RLock是一个同步的初始锁,可以被同一线程获取多次。acquire和release重复嵌套出现

    RLock支持上下文管理协议

    acquire(blocking=True, timeout=-1)
    release()
    RLock的方法

    3.7、Condition对象

     一个条件变量经常和一些锁关联。可以是传递过来的锁,也可以是默认创建的锁。传递过来的一个锁经常用来处理多个条件变量共享同一把锁。锁是条件对象的一部分:不应该分开跟踪。

    条件变量同样遵循上下文管理协议

    其他方法只能在持有锁的时候才能调用。wait()方法释放锁,一直处于阻塞状态,直到其他线程使用notify()或notify_all()进行唤醒。一旦被唤醒wait()就会获得锁,然后返回。而且还可以执行超时时间。

    notify()方法:唤醒哪些等待条件变量的其他线程的某一个。notify_all()唤醒所有等待条件变量的线程。

    注意:notify()和notify_all()方法不释放锁。这意味着被唤醒的线程不会立即从wait()调用中返回。只有当线程调用notify()或notify_all()后,释放锁的所有权。

    典型使用条件变量的编程风格,使用锁来同步访问一些共享状态。

    import queue
    import threading
    import collections
    import time
    cv = threading.Condition()
    
    q = queue.Queue()
    
    
    def make_an_item_available():
        q.put(1)
    
    
    def an_item_is_available():
        if q.empty():
            return False
        else:
            return True
    
    
    def get_an_available_item():
        q.get()
    
    
    def func1():
        with cv:
            while not an_item_is_available():
                cv.wait()
            print('OK')
            get_an_available_item()
    
    
    # Produce one item
    def func2():
        with cv:
            time.sleep(2)
            make_an_item_available()
            cv.notify()
            print('has notified!')
    
    if __name__ == '__main__':
        thread1 = threading.Thread(target=func1)
        thread2 = threading.Thread(target=func2)
        thread1.start()
        thread2.start()
    使用样例
    # Consume an item
    with cv:
        cv.wait_for(an_item_is_available)
        get_an_available_item()
    使用wait_for(condition)

    类threading.Condition的方法

    threading.Condition(lock=None):构造器方法
    
    acquire(*args):获取内置的锁
    
    release():释放锁
    
    wait(timeout=None):等待notify()或超时
    
    wait_for(predicate, timeout=None):等待条件满足
    
    notify(n=1):通知唤醒
    
    notify_all():唤醒所有
    Condition的方法

     3.8、Semaphore Objects(信号量对象)

    信号是计算机科学最古老的同步机制,由荷兰科学家Edsger W. Dijkstra发明。

    一个信号管理了一个内部的计数器,每调用一次acquire()方法,信号量就减少,每调用一次release()方法,信号量就加一。计数器永远不可能小于0,;一旦acquire()获取的时候发现其值为0,就会阻塞,直到线程调用release()方法

    信号量也支持上下文管理协议

    class threading.Semaphore(value=1):一个信号维护了了一个计数器:  Number of release()  -  Number of acquire() + initial value
    
    acquire(blocking=True, timeout=None)
    
    release()
    Semaphore

    class threading.BoundedSemaphore(value=1):有边界的信号量,保证计数器的值不会超过其初始值。解决semaphore被释放次数太多的bug

    maxconnections = 5
    # ...
    pool_sema = BoundedSemaphore(value=maxconnections)
    
    
    with pool_sema:
        conn = connectdb()
        try:
            # ... use connection ...
        finally:
            conn.close()
    使用样例

    3.9、Event对象

    这个是最简单的线程间通信机制:一个线程发送一个事件信号,其他线程等待这个事件信号

    一个事件对象管理一个标记:使用set()将其置为True,使用clear()将其重置为False。wait()等待其置为True。标记初始为False,可以创建定制化的线程。

    class threading.Event

    is_set()
    
    set()
    
    clear()
    
    wait(timeout=None)
    方法

    3.10、Timer对象

    这个类表示行动应该在经历过给定的时间后运行——a Timer。Timer类是Thread的子类。Timers和线程的启动方式一样,调用其start()方法。timer可以在动作执行前调用其cancel()取消其执行。timer在执行前的等待时间没有用户指定的时间那么准确。Timer有点像定时器,启动一个线程,定时执行某个任务函数

    class threading.Timer(intervalfunctionargs=Nonekwargs=None)

    cancel()

    import time
    import threading
    
    
    def func():
        time.sleep(2)
        print('This is func!')
    
    t = threading.Timer(4, func)
    
    t.start()
    print('hello')
    样例
    def hello():
        print("hello, world")
    
    t = Timer(30.0, hello)
    t.start()  # after 30 seconds, "hello, world" will be printed
    样例

    3.11、Barrier对象

    这个类为固定数目的线程之间互相等待提供了同步机制。每个线程尝试传递barrier,调用wait()方法将会阻塞,直到所有的线程都调用wait()方法。那时,所有的线程都会被同时释放。

    barrier可以被重用。感觉有点像跑步比赛,大家都准备好(调用wait),才允许一起跑(执行)。

    class threading.Barrier(parties, action=None, timeout=None):parties:同步的线程数。当某个线程被释放
    
    wait(timeout=None)
    
    reset():重置,所有处于等待的,都会收到BrokenBarrierError异常
    
    abort():为了防止某一个进程意外终止,会造成整个进程的死锁。建议在创建Barrier指定超时时间
    
    parties:所有需要跨越障碍的线程数
    
    n_waiting:有多少个线程处于等待状态
    
    broken:barrier是否处于broken状态
    barrier的方法

     四、Threading Pool(线程池)

    线程池对线程进行了封装,提供了更加友好的接口python3 直接提供concurrent模块,python2,需要pip2 install futures模块

    还有第三方模块threadpool,我比较习惯用concurrent的future包的ThreadPoolExecutor

    import concurrent.futures
    import urllib.request
    
    URLS = ['www.baidu.com',
            'www.sina.com',
            'www.tencent.com']
    
    
    # 请求页面,将其内容返回
    def load_url(url,timeout):
        with urllib.request.urlopen(url,timeout=timeout) as conn:
            return conn.read()
    
    
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        # 字典生成式 生成future对象和url的字典
        future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
        # 会检测future对象completed就返回future对象
        for future in concurrent.futures.as_completed(future_to_url):
            url = future_to_url[future]
            try:
                data = future.result()
            except Exception as exc:
                print '%r generated an exception: %s' % (url,exc)
            else:
                print '%r page is %d bytes' % (url, len(data))
    线程池使用样例
  • 相关阅读:
    Mybatis3详解(一)----Mybatis的介绍
    【k8s】svc-sessionAffinityConfig
    【k8s】svc-sessionAffinity
    【k8s】svc-selector
    【k8s】svc-publishNotReadyAddresses
    【k8s】svc-ports
    【k8s】svc-externalIPs
    【k8s】svc-clusterIPs
    【k8s】svc-clusterIP
    【k8s】Service
  • 原文地址:https://www.cnblogs.com/skiler/p/7068787.html
Copyright © 2011-2022 走看看