zoukankan      html  css  js  c++  java
  • python之多线程

    一.什么是线程

     在传统操作系统中,每个进程有一个地址空间,而且默认就有一个控制线程

     线程顾名思义,就是一条流水线工作的过程(流水线的工作需要电源,电源就相当于cpu),而一条流水线必须属于一个车间,一个车间的工作过程是一个进程,车间负责把资源整合到一起,是一个资源单位,而一个车间内至少有一条流水线。

    所以,进程只是用来把资源集中到一起(进程只是一个资源单位,或者说资源集合),而线程才是cpu上的执行单位。

    多线程(即多个控制线程)的概念是,在一个进程中存在多个线程,多个线程共享该进程的地址空间,相当于一个车间内有多条流水线,都共用一个车间的资源。例如,北京地铁与上海地铁是不同的进程,而北京地铁里的13号线是一个线程,北京地铁所有的线路共享北京地铁所有的资源,比如所有的乘客可以被所有线路拉。

    二.线程与进程的区别

     1.同一个进程内的多个线程共享该进程内的地址资源

     2.创建线程的开销要远小于创建进程的开销(创建一个进程,就是创建一个车间,涉及到申请空间,而且在该空间内建至少一条流水线,但创建线程,就只是在一个车间内造一条流水线,无需申请空间,所以创建开销小)

    三.开启线程方法

    方法1:

    from threading import Thread
    import time
    
    def sayhi(name):
        time.sleep(2)
        print('%s say hello' %name)
    
    if __name__ == '__main__':
        t=Thread(target=sayhi,args=('tom',))
        t.start()
        print('主线程')

    方法2:

    from threading import Thread
    import time
    
    class Sayhi(Thread):
        def __init__(self,name):
            super().__init__()
            self.name=name
        def run(self):
            time.sleep(2)
            print('%s say hello' % self.name)
    
    if __name__ == '__main__':
        t = Sayhi('tom')
        t.start()
        print('主线程')

    四.线程对象的其他属性和方法

    Thread实例对象的方法

    # isAlive(): 返回线程是否活动的。

    # getName(): 返回线程名。

    # setName(): 设置线程名。

    threading模块提供的一些方法:

    # threading.currentThread(): 返回当前的线程变量。

    # threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。

    # threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。

    from threading import Thread
    import threading
    from multiprocessing import Process
    import os
    
    def work():
        import time
        time.sleep(3)
        print(threading.current_thread().getName())
    
    
    if __name__ == '__main__':
        #在主进程下开启线程
        t=Thread(target=work)
        t.start()
    
        print(threading.current_thread().getName())
        print(threading.current_thread()) #主线程
        print(threading.enumerate()) #连同主线程在内有两个运行的线程
        print(threading.active_count())
        print('主线程/主进程')

    执行结果

    MainThread
    <_MainThread(MainThread, started 140735268892672)>
    [<_MainThread(MainThread, started 140735268892672)>, <Thread(Thread-1, started 123145307557888)>]
    主线程/主进程
    Thread-1

    五.守护线程

    无论是进程还是线程,都遵循:守护xxx会等待主xxx运行完毕后被销毁

    需要强调的是:运行完毕并非终止运行

    1、对主进程来说,运行完毕指的是主进程代码运行完毕

    2、对主线程来说,运行完毕指的是主线程所在的进程内所有非守护线程统统运行完毕,主线程才算运行完毕

     例子

    from threading import Thread
    import time
    
    
    def foo():
        print(123)
        time.sleep(1)
        print("end123")
        time.sleep(3)
        print("end no see")  # 时间大于线程,不会被打印
    
    
    def bar():
        print(456)
        time.sleep(3)
        print("end456")
    
    
    if __name__ == '__main__':
        t1 = Thread(target=foo)
        t2 = Thread(target=bar)
    
        t1.daemon = True  # 守护线程时间段能运行完
        t1.start()
        t2.start()  # 3秒后主线程随t2线程关闭,守护线程也关闭
        print("main-------")

    执行结果

    123
    main-------
    456
    end123
    end456

    六.线程死锁与递归锁

    死锁现象

     所谓死锁: 是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程,如下就是死锁

    from threading import Thread,Lock
    import time
    mutexA=Lock()
    mutexB=Lock()
    
    class MyThread(Thread):
        def run(self):
            self.func1()
            self.func2()
        def func1(self):
            mutexA.acquire()
            print('33[41m%s 拿到A锁33[0m' %self.name)
    
            mutexB.acquire()
            print('33[42m%s 拿到B锁33[0m' %self.name)
            mutexB.release()
    
            mutexA.release()
    
        def func2(self):
            mutexB.acquire()
            print('33[43m%s 拿到B锁33[0m' %self.name)
            time.sleep(2)
    
            mutexA.acquire()
            print('33[44m%s 拿到A锁33[0m' %self.name)
            mutexA.release()
    
            mutexB.release()
    
    if __name__ == '__main__':
        for i in range(10):
            t=MyThread()
            t.start()

    执行结果

    Thread-1 拿到A锁
    Thread-1 拿到B锁
    Thread-1 拿到B锁
    Thread-2 拿到A锁 #出现死锁,整个程序阻塞住

    递归锁

     解决方法,递归锁,在Python中为了支持在同一线程中多次请求同一资源,python提供了可重入锁RLock。

     这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。上面的例子如果使用RLock代替Lock,则不会发生死锁,二者的区别是:递归锁可以连续acquire多次,而互斥锁只能acquire一次

    from threading import Thread,RLock
    import time
    
    mutexA=mutexB=RLock() #一个线程拿到锁,counter加1,该线程内又碰到加锁的情况,则counter继续加1,这期间所有其他线程都只能等待,等待该线程释放所有锁,即counter递减到0为止
    
    class MyThread(Thread):
        def run(self):
            self.func1()
            self.func2()
        def func1(self):
            mutexA.acquire()
            print('33[41m%s 拿到A锁33[0m' %self.name)
    
            mutexB.acquire()
            print('33[42m%s 拿到B锁33[0m' %self.name)
            mutexB.release()
    
            mutexA.release()
    
        def func2(self):
            mutexB.acquire()
            print('33[43m%s 拿到B锁33[0m' %self.name)
            time.sleep(2)
    
            mutexA.acquire()
            print('33[44m%s 拿到A锁33[0m' %self.name)
            mutexA.release()
    
            mutexB.release()
    
    if __name__ == '__main__':
        for i in range(10):
            t=MyThread()
            t.start()

    七.信号量、event、定时器

    信号量

    信号量也是一把锁,可以指定信号量为5,对比互斥锁同一时间只能有一个任务抢到锁去执行,信号量同一时间可以有5个任务拿到锁去执行,如果说互斥锁是合租房屋的人去抢一个厕所,那么信号量就相当于一群路人争抢公共厕所,公共厕所有多个坑位,这意味着同一时间可以有多个人上公共厕所,但公共厕所容纳的人数是一定的,这便是信号量的大小 。

    from threading import Thread, Semaphore, Event
    import threading
    import time, random
    
    sm = Semaphore(5)
    event = Event()
    
    
    def task():
        with sm: #等同于sm.acquire() ....  sm.release()
            print('%s in' % threading.currentThread().getName())
            time.sleep(random.randint(1, 3))
    
    
    if __name__ == '__main__':
        print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()))
        event.wait(10)
        print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()))
        for i in range(10):
            t = Thread(target=task)
            t.start()

    EVENT

     线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其 他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就会变得非常棘手。为了解决这些问题,我们需要使用threading库中的Event对象。 对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在 初始情况下,Event对象中的信号标志被设置为假。如果有线程等待一个Event对象, 而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件, 继续执行。

    from threading import Event
    
    event.isSet():返回event的状态值;
    event.wait():如果 event.isSet()==False将阻塞线程;
    event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;
    event.clear():恢复event的状态值为False。
    from threading import Thread,Event
    import threading
    import time,random
    
    
    def conn_mysql():
        count = 1
        while not event.is_set():
            if count > 3:
                return
            print('<%s>第%s次尝试链接' % (threading.current_thread().getName(), count))
            event.wait(1)  # 每秒重连一次,event为true时链接成功
            count += 1
        print('<%s>链接成功' % threading.current_thread().getName())
    
    
    def check_mysql():
        print('33[45m[%s]正在检查mysql33[0m' % threading.current_thread().getName())
        time.sleep(random.randint(1, 3))  # 1-3秒检测后event为true
        event.set()
    
    
    if __name__ == '__main__':
        event = Event()
        conn1 = Thread(target=conn_mysql)
        conn2 = Thread(target=conn_mysql)
        check = Thread(target=check_mysql)
    
        conn1.start()
        conn2.start()
        check.start()

    定时器

     定时器,指定n秒后执行某操作

    from threading import Timer
    
    def hello():
        print("hello, world")
    
    t = Timer(1, hello)
    t.start()  # after 1 seconds, "hello, world" will be printed

    八.线程queue

     class queue.Queue(maxsize=0) #队列:先进先出

    import queue
    
    q=queue.Queue()
    q.put('first')
    q.put('second')
    q.put('third')
    
    print(q.get())
    print(q.get())
    print(q.get())
    
    
    
    '''
    结果(先进先出):
    first
    second
    third
    '''

    class queue.LifoQueue(maxsize=0) #堆栈:last in fisrt out

    import queue
    
    q=queue.LifoQueue()
    q.put('first')
    q.put('second')
    q.put('third')
    
    print(q.get())
    print(q.get())
    print(q.get())
    
    
    
    '''
    结果(后进先出):
    third
    second
    first
    '''

    class queue.PriorityQueue(maxsize=0) #优先级队列:存储数据时可设置优先级的队列

    import queue
    
    q=queue.PriorityQueue()
    #put进入一个元组,元组的第一个元素是优先级(通常是数字,也可以是非数字之间的比较),数字越小优先级越高
    q.put((20,'a'))
    q.put((10,'b'))
    q.put((30,'c'))
    
    print(q.get())
    print(q.get())
    print(q.get())
    
    
    
    '''
    结果(数字越小优先级越高,优先级高的优先出队):
    (10, 'b')
    (20, 'a')
    (30, 'c')
    '''

    九.进程池和线程池

    模块介绍

    concurrent.futures模块提供了高度封装的异步调用接口

    ThreadPoolExecutor:线程池,提供异步调用

    ProcessPoolExecutor: 进程池,提供异步调用

     基本方法

    1、submit(fn, *args, **kwargs)
    异步提交任务
    
    2、map(func, *iterables, timeout=None, chunksize=1) 
    取代for循环submit的操作
    
    3、shutdown(wait=True) 
    相当于进程池的pool.close()+pool.join()操作
    wait=True,等待池内所有任务执行完毕回收完资源后才继续
    wait=False,立即返回,并不会等待池内的任务执行完毕
    但不管wait参数为何值,整个程序都会等到所有任务执行完毕
    submit和map必须在shutdown之前
    
    4、result(timeout=None)
    取得结果
    
    5、add_done_callback(fn)
    回调函数

    进程线程池方法相同

    from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
    import os, random, time
    
    
    def task(name):
        print('name: %s pid: %s run' % (name, os.getpid()))
        time.sleep(random.randint(1, 3))
    
    
    if __name__ == '__main__':
        # pool = ProcessPoolExecutor(4) 多进程
        pool = ThreadPoolExecutor(5)  # 多线程
        # for i in range(10):
        #     pool.submit(task, 'egon%s' % i)
        pool.map(task, range(1, 10))
    
        pool.shutdown()
    
        print('')

    回调函数

     可以为进程池或线程池内的每个进程或线程绑定一个函数,该函数在进程或线程的任务执行完毕后自动触发,并接收任务的返回值当作参数,该函数称为回调函数。

    from concurrent.futures import ThreadPoolExecutor
    import time
    import random
    
    
    def la(name):
        print('%s is laing' % name)
        time.sleep(random.randint(3, 5))
        res = random.randint(7, 13) * '#'
        return {'name': name, 'res': res}
    
    
    def weigh(shit):
        shit = shit.result()  # weigh传入对象la,取对象的结果{'name': name, 'res': res}
        name = shit['name']
        size = len(shit['res'])
        print('%s la %s kg' % (name, size))
    
    
    if __name__ == '__main__':
        pool = ThreadPoolExecutor(13)
        shit = pool.submit(la, 'tom').add_done_callback(weigh)
  • 相关阅读:
    UVALive 6909 Kevin's Problem 数学排列组合
    UVALive 6908 Electric Bike dp
    UVALive 6907 Body Building tarjan
    UVALive 6906 Cluster Analysis 并查集
    八月微博
    hdu 5784 How Many Triangles 计算几何,平面有多少个锐角三角形
    hdu 5792 World is Exploding 树状数组
    hdu 5791 Two dp
    hdu 5787 K-wolf Number 数位dp
    hdu 5783 Divide the Sequence 贪心
  • 原文地址:https://www.cnblogs.com/fanhk/p/9280580.html
Copyright © 2011-2022 走看看