zoukankan      html  css  js  c++  java
  • Python说文解字_Python之多任务_03

    问:线程学完了,现在我们开始学习进程了吧?

    答:是的。前面说到线程就是我们的手,我们现在可以学习一下我们的“胳膊”了。

      我们有了多线程,为什么还要学习多进程呢?这是因为在Python当中有一把GIL锁的存在,比如某些耗CPU的运算的时候,我们可以运行多进程多个CPU并发的操作进行操作。对于IO操作来说,我们的瓶颈不在于我们的CPU因此我们用多线程操作。进程切换操作不是轻量级的。

      我们首先举例一个数据密集型的操作,来计算斐波那契数列:

    from  concurrent.futures import ThreadPoolExecutor,as_completed
    from concurrent.futures import ProcessPoolExecutor
    import time
    
    
    def fib(n):
        if n<=2:
            return 1
        return fib(n-1) + fib(n-2)
    
    if __name__ == '__main__':
        with ThreadPoolExecutor(3) as executor:
            all_task = [executor.submit(fib,(num)) for num in range(25,40)]
            start_time = time.time()
            for future in as_completed(all_task):
                data = future.result()
                print("get result:= {}".format(data))
            print("multithread last time is {}".format(time.time()-start_time))
    
        with ProcessPoolExecutor(3) as executor:
            all_task = [executor.submit(fib,(num)) for num in range(25,40)]
            start_time = time.time()
            for future in as_completed(all_task):
                data = future.result()
                print("get result:= {}".format(data))
            print("multiprocess last time is {}".format(time.time()-start_time))
    # 
    # multithread last time is 43.156678199768066
    # multiprocess last time is 27.62783455848694

      我们明显看到多进程比多线程快。

      我们在以一个IO操作来进行对比:

    from  concurrent.futures import ThreadPoolExecutor,as_completed
    from concurrent.futures import ProcessPoolExecutor
    import time
    
    
    def random_sleep(n):
        time.sleep(n)
        return n
    
    if __name__ == '__main__':
        with ThreadPoolExecutor(3) as executor:
            all_task = [executor.submit(random_sleep,(num)) for num in [2]*30]
            start_time = time.time()
            for future in as_completed(all_task):
                data = future.result()
                print("get result:= {}".format(data))
            print("multithread last time is {}".format(time.time()-start_time))
    
        with ProcessPoolExecutor(3) as executor:
            all_task = [executor.submit(random_sleep,(num)) for num in [2]*30]
            start_time = time.time()
            for future in as_completed(all_task):
                data = future.result()
                print("get result:= {}".format(data))
            print("multiprocess last time is {}".format(time.time()-start_time))
    #
    # multithread last time is 20.035860300064087
    # multiprocess last time is 20.641016483306885

      

      正式进入我们的进程操作:

    import os
    import time
    # fork只能用于linux下面
    pid = os.fork()
    print("bobby")
    if pid == 0:
        print("子进程{},父进程是{}".format(os.getpid(),os.getppid()))
    else:
        print("我是父进程:{}".format(pid))
    
    time.sleep(2)

      这段代码只能在Linux下运行。我们发现的问题是如果主进程结束了,子进程还是会运行的。

      

    问:进程如何进行编程?

    答:我们懂了线程的编程,进程的编程会变得非常的简单。多余的内容就不再讲解,我们讲解一些不同的包,其实这些包的应用也是跟进程差不多的。

      multiprocessing

    import multiprocessing
    import time
    def get_html(n):
        time.sleep(n)
        return n
    
    if __name__ == '__main__':
        progress = multiprocessing.Process(target=get_html,args=(2,))
        progress.start()
        progress.join()

      我们还可以直接获取进程的pid和ppid。

      其他和我们多线程差不都就不详解了。

      使用进程池:

      进程池:Pool和ProcessPoolExecutor。后那个跟线程一样。我们单独说一下Pool这个进程池。

    import multiprocessing
    import time
    from multiprocessing import Pool
    
    
    def get_html(n):
        time.sleep(n)
        return n
    
    if __name__ == '__main__':
        progress = multiprocessing.Process(target=get_html,args=(1,))
        progress.start()
        progress.join()
        pool = Pool(multiprocessing.cpu_count())
        print(multiprocessing.cpu_count())
        result = pool.apply_async(get_html,args=(3,))
        pool.close()

      注意最后要关闭线程池。详细的关于线程池的代码可以参照这里:https://www.cnblogs.com/noah0532/p/10938771.html

      特别要说明的是有两个方法:imap 和 imap_unordered(这个是谁先完成先打印谁)

    for result in  pool.imap(get_html,[1,5,3]):

      

      进程间的通信:

      进程间的通信和线程间的通信有一样的也有不一样的地方,比如锁就不能使用了。

      举一个简单的例子:用队列进行通信

    from multiprocessing import Process,Queue
    # from queue import Queue  # 这个queue就不能用了
    import time
    
    def producer(queue):
        queue.put("a")
        time.sleep(2)
    
    def consumer(queue):
        time.sleep(2)
        data = queue.get()
        print(data)
        
    if __name__ == '__main__':
        queue = Queue(10)
        my_producer = Process(target=producer,args=(queue,))
        my_consumer = Process(target=consumer, args=(queue,))
        my_producer.start()
        my_consumer.start()
        my_producer.join()
        my_consumer.join()

      在多进程的编程中不能用之前的queue了,带用multiprocessing里面的queue,这一带你要注意

      我们再举一个共享变量的例子:

    from multiprocessing import Process
    import time
    
    def producer(a):
        a += 1
        time.sleep(2)
    
    
    def consumer(a):
        time.sleep(2)
        print(a)
    
    if __name__ == '__main__':
        a = 1
        my_producer = Process(target=producer,args=(a,))
        my_consumer = Process(target=consumer, args=(a,))
        my_producer.start()
        my_consumer.start()
        my_producer.join()
        my_consumer.join()

      我们发现我们的全局变量不能用了,正如我们前面说的,我们再进程中每一块的变量是单独的,不能共享的。

      另外multiprocessing中的queue也不能用在进程池当中。如果我们想在进程当中应用就带用Manager当中的Queue

    from multiprocessing import Process,Queue,Manager,Pool
    import time
    
    def producer(queue):
        queue.put("a")
        time.sleep(2)
    
    def consumer(queue):
        time.sleep(2)
        data = queue.get()
        print(data)
    
    if __name__ == '__main__':
        queue = Manager().Queue(10)
        pool = Pool(2)
    
        pool.apply_async(producer,args=(queue,))
        pool.apply_async(consumer, args=(queue,))
    
        pool.close()
        pool.join()

      另外,我们还可以通过我们的pipe管道来进行通讯,但是Pipe只能使用两个进程间的通信,如果是两个交换pipe的性能比queue高

    from multiprocessing import Process,Queue,Manager,Pool,Pipe
    import time
    
    def producer(pipe):
        pipe.send("bobby")
    
    def consumer(pipe):
        print(pipe.recv())
    
    if __name__ == '__main__':
        # pipe只能用于两个进程间的通讯
        receive_pipe,send_pipe = Pipe()
        my_producer = Process(target=producer,args=(send_pipe,))
        my_consumer = Process(target=consumer, args=(receive_pipe,))
    
        my_producer.start()
        my_consumer.start()
        my_producer.join()
        my_consumer.join()

      重点:进程间的共享内存操作:Manager().dict(),array()....常用的数据类型都有。

    from multiprocessing import Process,Queue,Manager,Pool,Pipe
    
    def add_data(p_dict,key,value):
        p_dict[key] = value
    
    if __name__ == '__main__':
        progress_dict = Manager().dict()
    
        first_progess = Process(target=add_data,args=(progress_dict,"bobby1",22))
        second_progess = Process(target=add_data, args=(progress_dict, "bobby1", 23))
    
        first_progess.start()
        second_progess.start()
        first_progess.join()
        second_progess.join()
    
        print(progress_dict)
    # {'bobby1': 23}

     

  • 相关阅读:
    pandas DataFrame 数据处理常用操作
    host文件配置 了解
    安装Spring+搭建Spring开发环境
    java 环境配置 maven 环境配置
    Hive SQL 常见问题(转载)
    小结
    SQL 将两个结构相同的表合并到成一个表
    用户画像--初步了解
    hive 提取用户第一次浏览/购买 某商品的 时间
    ES2017 keys,values,entries使用
  • 原文地址:https://www.cnblogs.com/noah0532/p/11011976.html
Copyright © 2011-2022 走看看