zoukankan      html  css  js  c++  java
  • Python说文解字_Python之多任务_03

    问:线程学完了,现在我们开始学习进程了吧?

    答:是的。前面说到线程就是我们的手,我们现在可以学习一下我们的“胳膊”了。

      我们有了多线程,为什么还要学习多进程呢?这是因为在Python当中有一把GIL锁的存在,比如某些耗CPU的运算的时候,我们可以运行多进程多个CPU并发的操作进行操作。对于IO操作来说,我们的瓶颈不在于我们的CPU因此我们用多线程操作。进程切换操作不是轻量级的。

      我们首先举例一个数据密集型的操作,来计算斐波那契数列:

    from  concurrent.futures import ThreadPoolExecutor,as_completed
    from concurrent.futures import ProcessPoolExecutor
    import time
    
    
    def fib(n):
        if n<=2:
            return 1
        return fib(n-1) + fib(n-2)
    
    if __name__ == '__main__':
        with ThreadPoolExecutor(3) as executor:
            all_task = [executor.submit(fib,(num)) for num in range(25,40)]
            start_time = time.time()
            for future in as_completed(all_task):
                data = future.result()
                print("get result:= {}".format(data))
            print("multithread last time is {}".format(time.time()-start_time))
    
        with ProcessPoolExecutor(3) as executor:
            all_task = [executor.submit(fib,(num)) for num in range(25,40)]
            start_time = time.time()
            for future in as_completed(all_task):
                data = future.result()
                print("get result:= {}".format(data))
            print("multiprocess last time is {}".format(time.time()-start_time))
    # 
    # multithread last time is 43.156678199768066
    # multiprocess last time is 27.62783455848694

      我们明显看到多进程比多线程快。

      我们在以一个IO操作来进行对比:

    from  concurrent.futures import ThreadPoolExecutor,as_completed
    from concurrent.futures import ProcessPoolExecutor
    import time
    
    
    def random_sleep(n):
        time.sleep(n)
        return n
    
    if __name__ == '__main__':
        with ThreadPoolExecutor(3) as executor:
            all_task = [executor.submit(random_sleep,(num)) for num in [2]*30]
            start_time = time.time()
            for future in as_completed(all_task):
                data = future.result()
                print("get result:= {}".format(data))
            print("multithread last time is {}".format(time.time()-start_time))
    
        with ProcessPoolExecutor(3) as executor:
            all_task = [executor.submit(random_sleep,(num)) for num in [2]*30]
            start_time = time.time()
            for future in as_completed(all_task):
                data = future.result()
                print("get result:= {}".format(data))
            print("multiprocess last time is {}".format(time.time()-start_time))
    #
    # multithread last time is 20.035860300064087
    # multiprocess last time is 20.641016483306885

      

      正式进入我们的进程操作:

    import os
    import time
    # fork只能用于linux下面
    pid = os.fork()
    print("bobby")
    if pid == 0:
        print("子进程{},父进程是{}".format(os.getpid(),os.getppid()))
    else:
        print("我是父进程:{}".format(pid))
    
    time.sleep(2)

      这段代码只能在Linux下运行。我们发现的问题是如果主进程结束了,子进程还是会运行的。

      

    问:进程如何进行编程?

    答:我们懂了线程的编程,进程的编程会变得非常的简单。多余的内容就不再讲解,我们讲解一些不同的包,其实这些包的应用也是跟进程差不多的。

      multiprocessing

    import multiprocessing
    import time
    def get_html(n):
        time.sleep(n)
        return n
    
    if __name__ == '__main__':
        progress = multiprocessing.Process(target=get_html,args=(2,))
        progress.start()
        progress.join()

      我们还可以直接获取进程的pid和ppid。

      其他和我们多线程差不都就不详解了。

      使用进程池:

      进程池:Pool和ProcessPoolExecutor。后那个跟线程一样。我们单独说一下Pool这个进程池。

    import multiprocessing
    import time
    from multiprocessing import Pool
    
    
    def get_html(n):
        time.sleep(n)
        return n
    
    if __name__ == '__main__':
        progress = multiprocessing.Process(target=get_html,args=(1,))
        progress.start()
        progress.join()
        pool = Pool(multiprocessing.cpu_count())
        print(multiprocessing.cpu_count())
        result = pool.apply_async(get_html,args=(3,))
        pool.close()

      注意最后要关闭线程池。详细的关于线程池的代码可以参照这里:https://www.cnblogs.com/noah0532/p/10938771.html

      特别要说明的是有两个方法:imap 和 imap_unordered(这个是谁先完成先打印谁)

    for result in  pool.imap(get_html,[1,5,3]):

      

      进程间的通信:

      进程间的通信和线程间的通信有一样的也有不一样的地方,比如锁就不能使用了。

      举一个简单的例子:用队列进行通信

    from multiprocessing import Process,Queue
    # from queue import Queue  # 这个queue就不能用了
    import time
    
    def producer(queue):
        queue.put("a")
        time.sleep(2)
    
    def consumer(queue):
        time.sleep(2)
        data = queue.get()
        print(data)
        
    if __name__ == '__main__':
        queue = Queue(10)
        my_producer = Process(target=producer,args=(queue,))
        my_consumer = Process(target=consumer, args=(queue,))
        my_producer.start()
        my_consumer.start()
        my_producer.join()
        my_consumer.join()

      在多进程的编程中不能用之前的queue了,带用multiprocessing里面的queue,这一带你要注意

      我们再举一个共享变量的例子:

    from multiprocessing import Process
    import time
    
    def producer(a):
        a += 1
        time.sleep(2)
    
    
    def consumer(a):
        time.sleep(2)
        print(a)
    
    if __name__ == '__main__':
        a = 1
        my_producer = Process(target=producer,args=(a,))
        my_consumer = Process(target=consumer, args=(a,))
        my_producer.start()
        my_consumer.start()
        my_producer.join()
        my_consumer.join()

      我们发现我们的全局变量不能用了,正如我们前面说的,我们再进程中每一块的变量是单独的,不能共享的。

      另外multiprocessing中的queue也不能用在进程池当中。如果我们想在进程当中应用就带用Manager当中的Queue

    from multiprocessing import Process,Queue,Manager,Pool
    import time
    
    def producer(queue):
        queue.put("a")
        time.sleep(2)
    
    def consumer(queue):
        time.sleep(2)
        data = queue.get()
        print(data)
    
    if __name__ == '__main__':
        queue = Manager().Queue(10)
        pool = Pool(2)
    
        pool.apply_async(producer,args=(queue,))
        pool.apply_async(consumer, args=(queue,))
    
        pool.close()
        pool.join()

      另外,我们还可以通过我们的pipe管道来进行通讯,但是Pipe只能使用两个进程间的通信,如果是两个交换pipe的性能比queue高

    from multiprocessing import Process,Queue,Manager,Pool,Pipe
    import time
    
    def producer(pipe):
        pipe.send("bobby")
    
    def consumer(pipe):
        print(pipe.recv())
    
    if __name__ == '__main__':
        # pipe只能用于两个进程间的通讯
        receive_pipe,send_pipe = Pipe()
        my_producer = Process(target=producer,args=(send_pipe,))
        my_consumer = Process(target=consumer, args=(receive_pipe,))
    
        my_producer.start()
        my_consumer.start()
        my_producer.join()
        my_consumer.join()

      重点:进程间的共享内存操作:Manager().dict(),array()....常用的数据类型都有。

    from multiprocessing import Process,Queue,Manager,Pool,Pipe
    
    def add_data(p_dict,key,value):
        p_dict[key] = value
    
    if __name__ == '__main__':
        progress_dict = Manager().dict()
    
        first_progess = Process(target=add_data,args=(progress_dict,"bobby1",22))
        second_progess = Process(target=add_data, args=(progress_dict, "bobby1", 23))
    
        first_progess.start()
        second_progess.start()
        first_progess.join()
        second_progess.join()
    
        print(progress_dict)
    # {'bobby1': 23}

     

  • 相关阅读:
    jquery 序列化form表单
    nginx for windows 安装
    nodejs idea 创建项目 (一)
    spring 配置 shiro rememberMe
    idea 2018 解决 双击shift 弹出 search everywhere 搜索框的方法
    redis 在windows 集群
    spring IOC控制反转和DI依赖注入
    redis 的安装
    shiro 通过jdbc连接数据库
    handlebars的用法
  • 原文地址:https://www.cnblogs.com/noah0532/p/11011976.html
Copyright © 2011-2022 走看看