zoukankan      html  css  js  c++  java
  • 十六.多线程

    0.Python多线程,不适合cpu密集操作型的任务,如大量的计算。适合io操作密集型的任务。大量的高密度运算,多线程不一定提高效率,多线程适合轻量级多个任务。

    1._thread的使用

    #不建议使用_thread模块,因为主线程退出后,其他线程没有清理直接退出。而使用threading模块,会确保在所有重要的子线程退出前,保持整个进程存活。
    
    import _thread
    from time import sleep, ctime
    def loop0():
        print('start loop 0 at:', ctime())
        sleep(4)
        print('loop 0 done at:', ctime())
    def loop1():
        print('start loop 1 at:', ctime())
        sleep(2)
        print('loop 1 done at:', ctime())
    def main():
        print('starting at:', ctime())
        _thread.start_new_thread(loop0,())
        _thread.start_new_thread(loop1,())
        sleep(6) #没有这句话就直接结束了,相当于多线程等待
        print('all DONE at:', ctime())
    if __name__ == '__main__':
        main()
    
    #下面不需要特别指定等待时间,本质是通过锁对象的状态监测来执行等待
    import _thread as thread
    from time import sleep, ctime
    SleepLoops = [4, 2] #休息时间
    def Func(nloop, nsec, lock):
        print('start loop', nloop, 'at:', ctime())
        sleep(nsec)
        print('loop', nloop, 'done at:', ctime())
        lock.release() #针对锁对象使用,改变锁的状态,释放锁
    def main():
        print('starting threads...')
        #锁住线程的对象
        LocksHandle = []
        #对象数量
        nSleep = range(len(SleepLoops))
        #建立锁对象
        for i in nSleep:
            lock = thread.allocate_lock() #得到锁对象
            lock.acquire() #取得锁对象
            LocksHandle.append(lock)
        #执行功能函数
        for i in nSleep:
            #传递3个参数到新线程的功能函数中
            thread.start_new_thread(Func, (i, SleepLoops[i], LocksHandle[i]) )
        #本质通过状态监测来处理多线程等待问题
        for i in nSleep:
            #如果锁对象是锁住状态,一直等待
            while LocksHandle[i].locked():
                pass
        print('all DONE at:', ctime())
    if __name__ == '__main__':
        main()
    View Code

    2.threading的使用

    #threading模块不需要繁琐的设置就可以轻松关闭线程。
    
    #可调用的函数实例----------------------------------------------------
    import threading
    import time
    SleepLoops = [2, 1] #休息时间
    #定义功能函数
    def loop(nloop, nsec):
        print('start loop', nloop, 'at:', time.ctime())
        time.sleep(nsec)
        print('loop', nloop, 'done at:', time.ctime())
    #主要的多线程函数
    def main():
        print('starting at:', time.ctime())
        threads = [] #储存线程句柄
        nSleep = range(len(SleepLoops))
        #传递函数到线程,并且添加线程句柄,这里不执行功能函数。
        for i in nSleep:
            t = threading.Thread(target=loop,args=(i, SleepLoops[i]))
            threads.append(t)
        #开始线程,cup会执行功能函数
        print("开始执行功能函数")
        for i in nSleep:
            threads[i].start()
        #join()函数让线程在执行完后自动结束线程,不需要去设置繁琐的状态监控。
        for i in nSleep:            # wait for all
            threads[i].join()       # threads to finish
        print('all DONE at:', time.ctime())
    
    if __name__ == '__main__':
        main()
    
    #可调用的类实例-----------------------------------------------------
    import threading
    from time import sleep, ctime
    SleepLoops = [2, 1]
    class ThreadFunc(object):
        def __init__(self, func, args, name=''):
            self.name = name
            self.func = func
            self.args = args
        def __call__(self):
            self.func(*self.args)
    def loop(nloop, nsec):
        print('start loop', nloop, 'at:', ctime())
        sleep(nsec)
        print('loop', nloop, 'done at:', ctime())
    
    def main():
        print('starting at:', ctime())
        threads = [] #存档线程句柄
        n = range(len(SleepLoops))
        for i in n:        # create all threads
            #本质是调用类中__call__
            t = threading.Thread(target=ThreadFunc(loop, (i, SleepLoops[i]),loop.__name__))
            threads.append(t)
        print("开始执行功能类")
        for i in n:        # start all threads
            threads[i].start()
        for i in n:        # wait for completion
            threads[i].join()
        print('all DONE at:', ctime())
    
    if __name__ == '__main__':
        main()
    
    #子类的实例---------------------------------------------------
    import threading
    from time import sleep, ctime
    SleepLoops = [2, 1]
    class MyThread(threading.Thread):
        def __init__(self, func, args, name=''):
            threading.Thread.__init__(self, name=name)
            self.func = func
            self.args = args
        #类似于__call__(),这里必须要写成run()
        def run(self):
            self.func(*self.args)
    def Func(nloop, nsec):
        print('start loop', nloop, 'at:', ctime())
        sleep(nsec)
        print('loop', nloop, 'done at:', ctime())
    
    def main():
        print('starting at:', ctime())
        threads = []
        n = range(len(SleepLoops))
        for i in n:
            t = MyThread(Func, (i, SleepLoops[i]),Func.__name__)
            threads.append(t)
        print("开始执行子类")
        for i in n:
            threads[i].start()
        for i in n:
            threads[i].join()
        print('all DONE at:', ctime())
    
    if __name__ == '__main__':
        main()

    3.threading建立模块(MyThread.py)

    import threading
    from time import time, ctime
    
    class MoreThread(threading.Thread):
        def __init__(self, func, args, name='', verb=False):
            threading.Thread.__init__(self)
            self.name = name
            self.func = func
            self.args = args
            self.verb = verb
    
        def getResult(self):
            return self.res
    
        def run(self):
            if self.verb:
                print('starting', self.name, 'at:', ctime())
            self.res = self.func(*self.args)
            if self.verb:
                print(self.name, 'finished at:', ctime())

    4.单线程与多线程的比较

    import MyThread
    from time import ctime, sleep
    
    def fib(x):
        sleep(0.005)
        if x < 2: return 1
        return (fib(x-2) + fib(x-1))
    def fac(x):
        sleep(0.1)
        if x < 2: return 1
        return (x * fac(x-1))
    def sum(x):
        sleep(0.1)
        if x < 2: return 1
        return (x + sum(x-1))
    
    funcs = (fib, fac, sum)
    n = 12
    
    def main():
        nfuncs = range(len(funcs))
    
        print('*** SINGLE THREAD')
        for i in nfuncs:
            print('starting', funcs[i].__name__,'at:', ctime())
            print(funcs[i](n))
            print(funcs[i].__name__, 'finished at:',ctime())
    
        print('
    *** MULTIPLE THREADS')
        threads = []
        for i in nfuncs:
            t = MyThread.MoreThread(funcs[i], (n,),funcs[i].__name__)
            threads.append(t)
        for i in nfuncs:
            threads[i].start()
        for i in nfuncs:
            threads[i].join()
            print(threads[i].getResult())
    
        print('all DONE')
    
    if __name__ == '__main__':
        main()
    View Code

    5.锁Lock与可重复锁RLock

    #锁能够防止多个线程同时进入公共临界区
    import threading,time
    num = 0
    #创建lock对象,这句必须要有,指向一个锁。且只能acquire()一次------------------------------------
    lock = threading.Lock()
    def run(n):
        #如果不加入锁,print的内容会混乱
        lock.acquire() #获得锁,锁住状态。每次只能有一个线程访问
        global num
        print("first",n,num)
        num +=1
        print("second",n,num)
        time.sleep(0.01)
        lock.release() #释放锁,没有锁状态。排队的进程可以访问。
        #或者用with语句也可以
        with lock:
            print("third", n, num)
            num += 1
            print("fourth", n, num)
            time.sleep(0.01)
    t_objs = [] #存线程实例
    for i in range(50):
        t = threading.Thread(target=run,args=(i,))
        t.start()
        t_objs.append(t) #为了不阻塞后面线程的启动,不在这里join,先放到一个列表里
    for t in t_objs: #循环线程实例列表,等待所有线程执行完毕
        t.join()
    print("----------all threads has finished...",threading.current_thread(),threading.active_count())
    print("num:",num)
    
    #----------------------------------------------------------------------------------------------
    import threading, time
    num, num2 = 0, 0
    Relock = threading.RLock() #RLock()可以多次上锁,且不会阻塞----------------------------------------
    def run1():
        print("grab the first part data")
        Relock.acquire()
        global num
        num += 1
        Relock.release()
        return num
    
    def run2():
        print("grab the second part data")
        Relock.acquire()
        global num2
        num2 += 1
        Relock.release()
        return num2
    
    def run3():
        Relock.acquire()
        res = run1()
        print('--------between run1 and run2-----')
        res2 = run2()
        Relock.release()
        print(res, res2)
    
    for i in range(1):
        t = threading.Thread(target=run3)
        t.start()
    
    while threading.active_count() != 1:
        print(threading.active_count())
    else:
        print('----all threads done---')
        print(num, num2)

    6.信号量

    #对于有限资源的应用,使用信号量是个不错的选择
    #threading.BoundedSemaphore()限制多线程的访问
    import threading, time
    # 最多允许5个线程同时运行:0-4运行完,才可以运行下一组
    semaphore = threading.BoundedSemaphore(5)
    def run(n):
        semaphore.acquire()
        time.sleep(1)
        print("run the thread: %s
    " % n)
        semaphore.release()
        with semaphore:
            time.sleep(1)
            print("run the thread: %s
    " % n)
    
    if __name__ == '__main__':
        for i in range(22):
            t = threading.Thread(target=run, args=(i,))
            t.start()
    while threading.active_count() != 1:
        pass  # print(threading.active_count())
    else:
        print('----all threads done---')
        #print(num)

    7.队列queue

    from time import sleep
    from queue import Queue,PriorityQueue
    from MyThread import MoreThread
    
    def writer(queue, loops):
        for i in range(loops):
            print('producing object for Q...')
            queue.put('xxx', 1)
            print("size now", queue.qsize())
    
    def reader(queue, loops):
        for i in range(loops):
            val = queue.get(1)
            print('consumed object from Q... size now', queue.qsize())
    
    funcs = [writer, reader]
    nfuncs = range(len(funcs))
    
    def main():
        nloops = 5
        q = Queue(32) #建立一个先入先出队列
        threads = []
        for i in nfuncs:
            t = MoreThread(funcs[i], (q, nloops), funcs[i].__name__)
            threads.append(t)
        for i in nfuncs:
            threads[i].start()
        for i in nfuncs:
            threads[i].join()
        print('all DONE')
    
    if __name__ == '__main__':
        main()
    
    #优先级队列
    q = PriorityQueue()
    q.put((-1,"A"))
    q.put((3,"B"))
    q.put((10,"C"))
    q.put((6,"D"))
    print(q.get())
    print(q.get())
    print(q.get())
    print(q.get())
  • 相关阅读:
    Step by Step To Create a K8S Cluster
    Linux简单操作指令
    安装redis 最新版 redis-6.2.6
    GCC升级到11.2.0
    SQL开窗函数
    SQL 树形结构递归查询
    一文详解python的类方法,普通方法和静态方法
    _new_()与_init_()的区别
    关于Python的import机制原理
    【原创】android内存管理-hprof文件
  • 原文地址:https://www.cnblogs.com/i201102053/p/10686659.html
Copyright © 2011-2022 走看看