zoukankan      html  css  js  c++  java
  • python--再看并行之协程线程进程

    1、gevent协程适合I/O密集,不适合CPU密集。

    3、gevent协程无法发挥多核优势,事实上,协程只是以单线程的方式在运行。

    3、子程序就是协程的一种特例

    项目实际应用

    from gevent import monkey
    from gevent.monkey import patch_all
    from gevent.pool import Pool
    import time
    
    def lr_classify():
        print('lr_classify')
        
    def rf_classify():
        print('rf_classify')
        
    def svm_classify():
        print('svm_classify')
    
    def decisionTree_classify():
        print('decisionTree_classify')
        
    def gbt_classify():
        print('gbt_classify')
        
    def naive_bayes_classify():
        print('naive_bayes_classify')
    
    model_dict = {'logistic':lr_classify
                  ,'random_forest':rf_classify
                  ,'linear_svm':svm_classify
                  ,'decision_tree':decisionTree_classify
                  ,'gbt':gbt_classify
                  ,'naive_bayes':naive_bayes_classify}
    
    def get_task(names):
        return [task for name,task in model_dict.items() if name in names]
    #     return model_dict.get(name,'no such model')
    
    
    start = time.time()
    
    model_list = ['logistic','gbt', 'naive_bayes', 'linear_svm', 'decision_tree', 'random_forest']
    
    names = model_dict.keys()&(set(model_list))
    
    tasks = get_task(names)
    p = Pool(6)
    
    # jobs =  [i for i in range(10)]
    for task in tasks:
        p.spawn(task)
    # for task in tasks:
    #     p.apply_async(task,(None,))
    #     p.spawn
    p.join()
    end = time.time()
    print('cost {} seconds in total...'.format((end-start)))
    
    #输出结果如下:
    lr_classify
    svm_classify
    gbt_classify
    naive_bayes_classify
    decisionTree_classify
    rf_classify
    cost 0.0018892288208007812 seconds in total...
    
    

    线程进程协程比较

    # 多线程
    import threading
    import time
    
    def loop_5(interval):
        for i in range(5):
            print('loop_5: ',i)
            time.sleep(interval)
    
    def loop_10(interval):
        for i in range(10):
            print('loop_10: ',i)
            time.sleep(interval)
            
    if __name__ == '__main__':
        print('start...:')
        start = time.time()
        threads = []
        tasks = [loop_5,loop_10]
        for task in tasks:
            t = threading.Thread(target=task,args=(1,))
            threads.append(t)
            t.start()
        for t in threads:
            t.join()
        end = time.time()
        print('end...and cost {} seconds in total...'.format((end-start)))
    
    # 输出如下
    start...:
    loop_5: loop_10:   00
    
    loop_5: loop_10:  1
     1
    loop_10:  2
    loop_5:  2
    loop_10:  3
    loop_5:  3
    loop_10:  4
    loop_5:  4
    loop_10:  5
    loop_10:  6
    loop_10:  7
    loop_10:  8
    loop_10:  9
    end...and cost 10.02077603340149 seconds in total...
    
    #多进程
    
    from multiprocessing  import Pool
    import time
    
    def loop_5(interval):
        for i in range(5):
            print('loop_5: ',i)
            time.sleep(interval)
            
    def loop_10(interval):
        for i in range(10):
            print('loop_10: ',i)
            time.sleep(interval)
            
    if __name__ == '__main__':
        print('start ...')
        start = time.time()
        p = Pool(2)
        tasks = [loop_5,loop_10]
        for task in tasks:
            p.apply_async(task,args=(1,))
        p.close()
        p.join()    
        end = time.time()
        print('end...cost {} seconds in total...'.format((end-start)))
    
    # 然而,发现多进程仍然耗费 10 秒左右,不难理解啊,因为loop_5虽然跟10交替跑,但是还是要等待10跑完才会主进程结束啊。。。
    start ...
    loop_5:  0
    loop_10:  0
    loop_5:  1
    loop_10:  1
    loop_5:  2
    loop_10:  2
    loop_5:  3
    loop_10:  3
    loop_5:  4
    loop_10:  4
    loop_10:  5
    loop_10:  6
    loop_10:  7
    loop_10:  8
    loop_10:  9
    end...cost 10.231749534606934 seconds in total...
    
    ### 不使用进程池
    
    from multiprocessing  import Process
    import time
    
    def loop_5(interval):
        for i in range(5):
            print('loop_5: ',i)
            time.sleep(interval)
            
    def loop_10(interval):
        for i in range(10):
            print('loop_10: ',i)
            time.sleep(interval)
            
    if __name__ == '__main__':
        print('start ...')
        start = time.time()
        tasks = [loop_5,loop_10]
        processes = []
        for task in tasks:
            p = Process(target=task,args=(1,))
            processes.append(p)
            p.start()
        for p in processes:
            p.join()
        end = time.time()
        print('end...cost {} seconds in total...'.format((end-start)))
    
    #输出结果如下:
    start ...
    loop_5:  0
    loop_10:  0
    loop_5:  1
    loop_10:  1
    loop_5:  2
    loop_10:  2
    loop_5:  3
    loop_10:  3
    loop_5:  4
    loop_10:  4
    loop_10:  5
    loop_10:  6
    loop_10:  7
    loop_10:  8
    loop_10:  9
    end...cost 10.139782667160034 seconds in total...
    
    

    协程进行文件复制

    from gevent.pool import Pool
    import time
    
    def copy_file(src,target):
        with open(src,'r') as fr:
            with open(target,'w') as fw:
                for line in fr:
                    fw.write(line)
                    
    if __name__ == '__main__':
        print('start...')
        start = time.time()
        p = Pool(6)
        
        args = [('./test.py','./test2.py'),('./test.py','./test3.py')]
        
        for arg in args:
            p.spawn(copy_file,*arg)
        p.join()
        end = time.time()
        print('cost {} seconds in total...'.format((end-start)))
                
    

    多线程进行文件复制

    import threading
    import asyncio
    import time
    def copy(src,tar):
        print('{} start...'.format(threading.current_thread().name))
        with open(src,'rb') as binFileInputStream:
            with open(tar,'wb') as binFileOutputStream:
                binFileOutputStream.write(binFileInputStream.read())
        time.sleep(10)
        print('{} end...'.format(threading.current_thread().name))
        
    threads = []
    args = [('./test.py','./hella.py'),('./diabetes.csv','./diabetes.py')]
    for i,arg in enumerate(args):
        t = threading.Thread(target=copy,args=arg,name='thread-{}'.format(i))
        threads.append(t)
    for thread in threads:
        thread.start()
        
    for thread in threads:
        thread.join()
    

    多进程

    from multiprocessing import Pool
    import time
    import random
    import os
    
    def copy_file_multiprocess(src,target):
        with open(src,'rb') as fr:
            with open(target,'wb') as fw:
                print('{} start to copy file...'.format(os.getpid()))
                fw.write(fr.read())
                time.sleep(random.random()*3)
                print('pid {} finished...'.format(os.getpid()))
    
    if __name__ == '__main__':
        p = Pool(3)
        args = [('./config.txt','./babao/config.txt'),('./config.txt','./babao/config2.txt'),('./config.txt','./babao/config3.txt')]
        for i in range(3):
            p.apply_async(copy_file_multiprocess,args[i])
        print('Waiting for all subprocesses done...')
        p.close()
        p.join()
        print('All suprocesses finished!')
            
    

    扩展知识

    Python多线程编程时经常会用到join()和setDaemon()方法,基本用法如下:
    
    join([time]): 等待至线程中止。这阻塞调用线程直至线程的join() 方法被调用中止-正常退出或者抛出未处理的异常-或者是可选的超时发生。
    setDaemon,将该线程标记为守护线程或用户线程
     
    1、join ()方法:主线程A中,创建了子线程B,并且在主线程A中调用了B.join(),那么,主线程A会在调用的地方等待,直到子线程B完成操作后,才可以接着往下执行,那么在调用这个线程时可以使用被调用线程的join方法。
    原型:join([timeout]),里面的参数时可选的,代表线程运行的最大时间,即如果超过这个时间,不管这个此线程有没有执行完毕都会被回收,然后主线程或函数都会接着执行的。
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    import threading
    import time
     
     
    class MyThread(threading.Thread):
        def __init__(self, id):
            threading.Thread.__init__(self)
            self.id = id
     
        def run(self):
            x = 0
            time.sleep(10)
            print(self.id)
            print('线程结束:'+str(time.time()))
     
    if __name__ == "__main__":
        t1 = MyThread(999)
        print('线程开始:'+str(time.time()))
        t1.start()
        print('主线程打印开始:'+str(time.time()))
        for i in range(5):
            print(i)
        time.sleep(2)
        print('主线程打印结束:' + str(time.time()))
    线程开始:1497534590.2784667
    主线程打印开始:1497534590.2794669
    0
    1
    2
    3
    4
    主线程打印结束:1497534592.279581
    999
    线程结束:1497534600.2800388
    
    从打印结果可知,线程t1 start后,主线程并没有等线程t1运行结束后再执行,而是在线程执行的同时,执行了后面的语句。
    
     
    
    现在,把join()方法加到启动线程后面(其他代码不变)
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    import threading
    import time
     
     
    class MyThread(threading.Thread):
        def __init__(self, id):
            threading.Thread.__init__(self)
            self.id = id
     
        def run(self):
            x = 0
            time.sleep(10)
            print(self.id)
            print('线程结束:'+str(time.time()))
     
    if __name__ == "__main__":
        t1 = MyThread(999)
        print('线程开始:'+str(time.time()))
        t1.start()
        t1.join()
        print('主线程打印开始:'+str(time.time()))
        for i in range(5):
            print(i)
        time.sleep(2)
        print('主线程打印结束:' + str(time.time()))
    线程开始:1497535176.5019968
    999
    线程结束:1497535186.5025687
    主线程打印开始:1497535186.5025687
    0
    1
    2
    3
    4
    主线程打印结束:1497535188.5026832
    
    线程t1 start后,主线程停在了join()方法处,等子线程t1结束后,主线程继续执行join后面的语句。 
    
     
    
    2、setDaemon()方法。主线程A中,创建了子线程B,并且在主线程A中调用了B.setDaemon(),这个的意思是,把主线程A设置为守护线程,这时候,要是主线程A执行结束了,就不管子线程B是否完成,一并和主线程A退出.这就是setDaemon方法的含义,这基本和join是相反的。此外,还有个要特别注意的:必须在start() 方法调用之前设置。
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    import threading
    import time
     
     
    class MyThread(threading.Thread):
        def __init__(self, id):
            threading.Thread.__init__(self)
            self.id = id
     
        def run(self):
            x = 0
            time.sleep(10)
            print(self.id)
            print("This is:" + self.getName()) # 获取线程名称
            print('线程结束:' + str(time.time()))
     
    if __name__ == "__main__":
        t1 = MyThread(999)
        print('线程开始:'+str(time.time()))
        t1.setDaemon(True)
        t1.start()
        print('主线程打印开始:'+str(time.time()))
        for i in range(5):
            print(i)
        time.sleep(2)
        print('主线程打印结束:' + str(time.time())) 
    线程开始:1497536678.8509264
    主线程打印开始:1497536678.8509264
    0
    1
    2
    3
    4
    主线程打印结束:1497536680.8510408
    
     
    
    t1.setDaemon(True)的操作,将子线程设置为了守护线程。根据setDaemon()方法的含义,父线程打印内容后便结束了,不管子线程是否执行完毕了。
    
    如果在线程启动前没有加t1.setDaemon(True),输出结果为:
    
    线程开始:1497536865.3215919
    主线程打印开始:1497536865.3215919
    0
    1
    2
    3
    4
    主线程打印结束:1497536867.3217063
    999
    This is:Thread-1
    线程结束:1497536875.3221638
    
    程序运行中,执行一个主线程,如果主线程又创建一个子线程,主线程和子线程就分兵两路,分别运行,那么当主线程完成想退出时,会检验子线程是否完成,如果子线程未完成,则主线程会等待子线程完成后再退出;
    
    有时我们需要的是,子线程运行完,才继续运行主线程,这时就可以用join方法(在线程启动后面);
    
    但是有时候我们需要的是,只要主线程完成了,不管子线程是否完成,都要和主线程一起退出,这时就可以用setDaemon方法(在线程启动前面)。
    

    这里一个坑 (pool.map 中不能使用匿名函数), 就是 lambda 无法被 pickle,可见匿名函数并不是到处通用 ,正确的 下面再贴出

    
    # -*- coding: utf-8 -*-
    __author__ = 'Frank Li'
    from functools import wraps,reduce
    import threading as td
    import multiprocessing as mp
    from multiprocessing import Queue
    from multiprocessing import Pool
    import time
    
    def time_count(func):
        @wraps(func)
        def inner(*args,**kw):
            start = time.time()
            result = func(*args,**kw)
            end = time.time()
            num = kw.get('num','default')
            print('func: {}-{} cost {:.2f}s
    '.format(func.__name__,num,end-start))# 这里可以插入日志
            return result
        return inner
    
    @time_count
    def multicore(data,q,**kw):
        q.put(reduce(lambda x,y:x+y,data))
    
    @time_count
    def multi_thread(data,q,**kw):
        q.put(reduce(lambda x, y: x + y, data))
    
    @time_count
    def main_process():
        q = Queue()
        datas = [range(10**5),range(10**6),range(10**7),range(10**8)]
        processes = []
        for i,data in enumerate(datas):
            p = mp.Process(target=multicore,name='process-{}'.format(i),args=(data,q),kwargs={'num':i})
            p.start()
            processes.append(p)
        for p in processes:
            p.join()
    
        sum = 0
        for n in range(len(datas)):
            sum += q.get()
        print('main_process sum result: {}
    '.format(sum))
    
    @time_count
    def main_thread():
        q = Queue()
        datas = [range(10 ** 5), range(10 ** 6), range(10 ** 7), range(10 ** 8)]
        threads = []
        for i,data in enumerate(datas):
            t = td.Thread(target=multi_thread,name='thread-{}'.format(i),args=(data,q),kwargs={'num':i})
            t.start()
            threads.append(t)
    
        for t in threads:
            t.join()
    
        sum = 0
        for n in range(len(datas)):
            sum += q.get()
        print('main_thread sum result: {}
    '.format(sum))
    
    def another_multi_process_sub(data):
        return reduce(lambda x, y: x + y, data)
    
    @time_count
    def another_multi_process():
        pool = Pool(processes=2)
        datas = [range(10 ** 5), range(10 ** 6), range(10 ** 7), range(10 ** 8)]
        results = []
        for data in datas:
            res = pool.map(lambda data:lambda x,y:x+y,(data,))
            results.extend(res)
        sum = reduce(lambda x,y:x+y,results)
        print('another_multi_process sum result: {}'.format(sum))
    
    @time_count
    def multi_process_pool():
        pool = Pool(processes=2)
        datas = [range(10 ** 5), range(10 ** 6), range(10 ** 7), range(10 ** 8)]
        dataset= []
        sum = reduce(lambda x,y:x+y,[ pool.apply_async(another_multi_process_sub,(data,)).get() for data in datas])
        print('multi_process_pool sum result: {}'.format(sum))
    
    def main():
        # main_process()
        # main_thread()
        another_multi_process()
        # multi_process_pool()
    
    @time_count
    def single_process():
        datas = [range(10 ** 5), range(10 ** 6), range(10 ** 7), range(10 ** 8)]
        sum = 0
        for data in datas:
            sum += reduce(lambda x,y:x+y,data)
        print('
     single process sum result: {}'.format(sum))
    
    if __name__ == '__main__':
        main()
        single_process()
    
    

    对比多进程 多线程 效率 , 总结 优先使用顺序 (多进程+协程>多进程 >= 单进程多线程 >=单进程单线程)

    # -*- coding: utf-8 -*-
    __author__ = 'Frank Li'
    from functools import wraps,reduce
    import threading as td
    import multiprocessing as mp
    from multiprocessing import Queue
    from multiprocessing import Pool
    import time
    
    def time_count(func):
        @wraps(func)
        def inner(*args,**kw):
            start = time.time()
            result = func(*args,**kw)
            end = time.time()
            num = kw.get('num','default')
            print('func: {}-{} cost {:.2f}s
    '.format(func.__name__,num,end-start))# 这里可以插入日志
            return result
        return inner
    
    @time_count
    def multicore(data,q,**kw):
        q.put(reduce(lambda x,y:x+y,data))
    
    @time_count
    def multi_thread(data,q,**kw):
        q.put(reduce(lambda x, y: x + y, data))
    
    @time_count
    def main_process():
        q = Queue()
        datas = [range(10**5),range(10**6),range(10**7),range(10**8)]
        processes = []
        for i,data in enumerate(datas):
            p = mp.Process(target=multicore,name='process-{}'.format(i),args=(data,q),kwargs={'num':i})
            p.start()
            processes.append(p)
        for p in processes:
            p.join()
    
        sum = 0
        for n in range(len(datas)):
            sum += q.get()
        print('main_process sum result: {}
    '.format(sum))
    
    @time_count
    def main_thread():
        q = Queue()
        datas = [range(10 ** 5), range(10 ** 6), range(10 ** 7), range(10 ** 8)]
        threads = []
        for i,data in enumerate(datas):
            t = td.Thread(target=multi_thread,name='thread-{}'.format(i),args=(data,q),kwargs={'num':i})
            t.start()
            threads.append(t)
    
        for t in threads:
            t.join()
    
        sum = 0
        for n in range(len(datas)):
            sum += q.get()
        print('main_thread sum result: {}
    '.format(sum))
    
    def another_multi_process_sub(data):
        return reduce(lambda x, y: x + y, data)
    
    @time_count
    def another_multi_process():
        pool = Pool(processes=2)
        datas = [range(10 ** 5), range(10 ** 6), range(10 ** 7), range(10 ** 8)]
        results = []
        for data in datas:
            res = pool.map(another_multi_process_sub,(data,))
            results.extend(res)
        sum = reduce(lambda x,y:x+y,results)
        print('another_multi_process sum result: {}'.format(sum))
    
    @time_count
    def multi_process_pool():
        pool = Pool(processes=2)
        datas = [range(10 ** 5), range(10 ** 6), range(10 ** 7), range(10 ** 8)]
        dataset= []
        sum = reduce(lambda x,y:x+y,[ pool.apply_async(another_multi_process_sub,(data,)).get() for data in datas])
        print('multi_process_pool sum result: {}'.format(sum))
    
    def main():
        main_process()
        main_thread()
        another_multi_process()
        multi_process_pool()
    
    @time_count
    def single_process():
        datas = [range(10 ** 5), range(10 ** 6), range(10 ** 7), range(10 ** 8)]
        sum = 0
        for data in datas:
            sum += reduce(lambda x,y:x+y,data)
        print('
     single process sum result: {}'.format(sum))
    
    if __name__ == '__main__':
        main()
        single_process()
    
    

    下面结果 可能跟我电脑有其他进程没有关闭有关系,只是贴出一下,这个结果仁者见仁智者见智

    # 结果如下 :
    
    func: multicore-0 cost 0.06s
    
    func: multicore-1 cost 0.50s
    
    func: multicore-2 cost 3.51s
    
    func: multicore-3 cost 27.56s
    
    main_process sum result: 5050504944450000
    
    func: main_process-default cost 28.36s
    
    func: multi_thread-0 cost 0.08s
    
    func: multi_thread-1 cost 0.67s
    
    func: multi_thread-2 cost 5.00s
    
    func: multi_thread-3 cost 25.31s
    
    main_thread sum result: 5050504944450000
    
    func: main_thread-default cost 25.36s
    
    another_multi_process sum result: 5050504944450000
    func: another_multi_process-default cost 24.31s
    
    multi_process_pool sum result: 5050504944450000
    func: multi_process_pool-default cost 25.84s
    
    
    single process sum result: 5050504944450000
    func: single_process-default cost 25.58s
    

    多核cpu 共享内存 ,进程通信安全问题, p1, p2 都有可能 抢到资源,一个抢到 一顿执行完事儿,第二个才能接着执行

    # -*- coding: utf-8 -*-
    __author__ = 'Frank Li'
    
    import multiprocessing as mp
    import time
    '''
    进程间共享内存,加锁
    '''
    
    def add_num(v,num,num_lock):
        with num_lock:
            for _ in range(10):
                v.value+=num
                time.sleep(0.1)
                print(v.value)
    
    if __name__ == '__main__':
        v = mp.Value('i',0)  ### 还有一个 mp.Array 
        num_lock = mp.Lock()
        p1 = mp.Process(target=add_num,args=(v,1,num_lock))
        p2 = mp.Process(target=add_num,args=(v,3,num_lock))
        p1.start()
        p2.start()
        p1.join()
        p2.join()
    

    线程 操作共享变量 安全 加锁

    # -*- coding: utf-8 -*-
    __author__ = 'Frank Li'
    import threading
    
    def thread_job1():
        print('current threading... {}'.format(threading.current_thread()))
        global A,lock
        lock.acquire()
        for i in range(10):
            A+=1
            print('jb1 A value: {}'.format(A))
        lock.release()
    
    def thread_job2():
        print('current threading... {}'.format(threading.current_thread()))
        global A,lock
        lock.acquire()
        for j in range(20):
            A+=5
            print('job2 value of A: {}'.format(A))
        lock.release()
    
    def main():
        global A,lock
        A = 0
        lock = threading.Lock()
    
        t1 = threading.Thread(target=thread_job1,name='thread-1')
        t2 = threading.Thread(target=thread_job2,name='thread-2')
        t1.start()
        t2.start()
        t1.join()
        t2.join()
    
    if __name__ == '__main__':
        main()
    
    

    多线程

    # -*- coding: utf-8 -*-
    __author__ = 'Frank Li'
    import threading
    from threading import Thread
    from queue import Queue
    
    def thread_job(q,l=None):
        print(threading.current_thread())
        for i in range(len(l)):
            l[i] = l[i]**2
        q.put(l)
    
    def multithreading(datas=None):
        q = Queue()
        threads = [] # 线程 列表
        # print(datas)
        for i,data in enumerate(datas):
            print(data)
            thread = Thread(target=thread_job,name='thread{}'.format(i),args=(q,data))
            thread.start()
            threads.append(thread)
        for thread in threads:
            thread.join()
        result = []
    
        for _ in range(len(datas)):
            result.append(q.get())
        print(result)
    
    if __name__ == '__main__':
        datas = [[1,2],[3,4],[5,6],[7,8]]
        multithreading(datas)
    
    如果有来生,一个人去远行,看不同的风景,感受生命的活力。。。
  • 相关阅读:
    C语言实现数据机构链表的基本操作(从键盘输入生成链表、读取数组生成链表)
    MySql-8.0.x免安装版下载与配置,Navicat打开数据库链接报错1251的解决办法
    win10彻底卸载和删除MySql
    Linux/(centos、unix等)的ssh双向免密登录原理和实现
    笔趣阁小说-圣墟-爬虫源代码
    C语言实现顺序表的基本操作(从键盘输入 生成线性表,读txt文件生成线性表和数组生成线性表----三种写法)
    python语言开发环境配置
    Python闭包详解
    结对作业
    Java第九次作业——接口回调
  • 原文地址:https://www.cnblogs.com/Frank99/p/9831622.html
Copyright © 2011-2022 走看看