zoukankan      html  css  js  c++  java
  • 多进程multiprocessing

    PACKAGE CONTENTS
        connection
        dummy (package)
        forking
        heap
        managers
        pool
        process
        queues
        reduction
        sharedctypes
        synchronize
        util
    module package

    the class frequently-used of the module:

    1、Process

    Process 类用来描述一个进程对象。创建子进程的时候,只需要传入一个执行函数和函数的参数即可完成 Process 示例的创建。

    star() 方法启动进程,
    join() 方法实现进程间的同步,等待所有进程退出。
    close() 用来阻止多余的进程涌入进程池 Pool 造成进程阻塞。

    class Process(__builtin__.object)
         |  Process objects represent activity that is run in a separate process
         |  
         |  The class is analagous to `threading.Thread`
         |  
         |  Methods defined here:
         |  
         |  __init__(self, group=None, target=None, name=None, args=(), kwargs={})
         |  
         |  __repr__(self)
         |  
         |  is_alive(self)
         |      Return whether process is alive
         |  
         |  join(self, timeout=None)
         |      Wait until child process terminates
         |  
         |  run(self)
         |      Method to be run in sub-process; can be overridden in sub-class
         |  
         |  start(self)
         |      Start child process
         |  
         |  terminate(self)
         |      Terminate process; sends SIGTERM signal or uses TerminateProcess()
    Process Class
    #1、设置daemon都为True
    import multiprocessing
    import time,datetime
    def create_test_function(statement):
        time.sleep(2)
        print(statement)
    if __name__ == "__main__":
        print("first")
        create_one_process = multiprocessing.Process(target=create_test_function,args=("good",))
        create_one_process.daemon = True
        create_one_process.start()
        print("second")
        create_two_process = multiprocessing.Process(target=create_test_function,args=("nice",))
        create_two_process.daemon = True
        create_two_process.start()
        print("third")
    '''
    output:
               first
               second
               third
    '''
    
    #2、设置daemon都为False
    import multiprocessing
    import time,datetime
    def create_test_function(statement):
        time.sleep(2)
        print(statement)
    if __name__ == "__main__":
        print("first")
        create_one_process = multiprocessing.Process(target=create_test_function,args=("good",))
        create_one_process.daemon = False
        create_one_process.start()
        print("second")
        create_two_process = multiprocessing.Process(target=create_test_function,args=("nice",))
        create_two_process.daemon = False
        create_two_process.start()
        print("third")
    '''
    output:
               first
               second
               third
               good
               nice
    '''
    
    #3、设置daemon一个True一个False
    import multiprocessing
    import time,datetime
    def create_test_function(statement):
        time.sleep(2)
        print(statement)
    if __name__ == "__main__":
        print("first")
        create_one_process = multiprocessing.Process(target=create_test_function,args=("good",))
        create_one_process.daemon = True
        create_one_process.start()
        print("second")
        create_two_process = multiprocessing.Process(target=create_test_function,args=("nice",))
        create_two_process.daemon = False
        create_two_process.start()
        print("third")
    '''
    output:
               first
               second
               third
               nice
    '''
    
    #4、设置daemon一个False一个True
    import multiprocessing
    import time,datetime
    
    def create_test_function(statement):
        time.sleep(2)
        print(statement)
        
    if __name__ == "__main__":
        print("first")
        create_one_process = multiprocessing.Process(target=create_test_function,args=("good",))
        create_one_process.daemon = False
        create_one_process.start()
        print("second")
        create_two_process = multiprocessing.Process(target=create_test_function,args=("nice",))
        create_two_process.daemon = True
        create_two_process.start()
        print("third")
    '''
    output:
            first
            second
            third
            good
    '''    
    设置daemon
    import multiprocessing
    import time,datetime
    def create_test_function(statement):
        time.sleep(2)
        print(statement)   
    if __name__ == "__main__":
        print("first")
        create_one_process = multiprocessing.Process(target=create_test_function,args=("1",))
        create_one_process.daemon = True
        create_one_process.start()
        create_one_process.join()
        print("second")
        create_two_process = multiprocessing.Process(target=create_test_function,args=("2",))
        create_two_process.daemon = True
        create_two_process.start()
        #create_two_process.join()
        print("third")
    '''output:
    first
    1
    second
    third
    '''
    
    import multiprocessing
    import time,datetime
    def create_test_function(statement):
        time.sleep(2)
        print(statement)   
    if __name__ == "__main__":
        print("first")
        create_one_process = multiprocessing.Process(target=create_test_function,args=("1",))
        create_one_process.daemon = True
        create_one_process.start()
        create_one_process.join()
        print("second")
        create_two_process = multiprocessing.Process(target=create_test_function,args=("2",))
        create_two_process.daemon = True
        create_two_process.start()
        create_two_process.join()
        print("third")
    '''output:
    first
    1
    second
    2
    third
    '''
    设置join
    import multiprocessing
    import time,datetime
    def create_test_function(statement):
        time.sleep(2)
        print(statement)  
    if __name__ == "__main__":
        for i in xrange(4):
            create_one_process = multiprocessing.Process(target=create_test_function,args=(i,))
            create_one_process.daemon = True
            create_one_process.start()
            create_one_process.join()
        print("cheers")

    2、Queue

    Queue的功能是将每个核或线程的运算结果放在队里中, 等到每个线程或核运行完毕后再从队列中取出结果, 继续加载运算。原因很简单, 多线程调用的函数不能有返回值, 所以使用Queue存储多个线程运算的结果。

    import multiprocessing as mp
    
    def job(q):
        res=0
        for i in range(1000):
            res+=i+i**2+i**3
        q.put(res)    #queue
    
    if __name__=='__main__':
        q = mp.Queue()
        p1 = mp.Process(target=job,args=(q,))
        p2 = mp.Process(target=job,args=(q,))
        p1.start()
        p2.start()
        p1.join()
        p2.join()
        res1 = q.get()
        res2 = q.get()
        print(res1+res2)
    put和get

    3、Pool

    进程池就是我们将所要运行的东西,放到池子里,Python会自行解决多进程的问题。 Pool和之前的Process的不同点是丢向Pool的函数有返回值,而Process的没有返回值。

    对于需要使用几个甚至十几个进程时,我们使用Process还是比较方便的,但是如果要成百上千个进程,用Process显然太笨了,multiprocessing提供了Pool类,即现在要讲的进程池,能够将众多进程放在一起,设置一个运行进程上限,每次只运行设置的进程数,等有进程结束,再添加新的进程。
    Pool(processes =num):设置运行进程数,当一个进程运行完,会添加新的进程进去
    apply_async(函数,(参数)):非阻塞,其中参数是tulpe类型,
    apply(函数,(参数)):阻塞
    close():关闭pool,不能再添加新的任务
    terminate():结束运行的进程,不再处理未完成的任务
    join():和Process介绍的作用一样, 但要在close或terminate之后使用。

    import multiprocessing as mp
    
    def job(x):
        return x*x
    def multicore():
        pool = mp.Pool()
        res = pool.map(job, range(10))
        print(res)
        
    if __name__ == '__main__':
        multicore()
    map
    #coding=utf-8
    #Pool默认大小是CPU的核数,我们也可以通过在Pool中传入processes参数即可自定义需要的核数量,
    import multiprocessing as mp
    def job(x):
        return x*x
    def multicore():
        pool = mp.Pool(processes=3) # 定义CPU核数量为3
        res = pool.map(job, range(10))
        print(res)
        
    if __name__ == '__main__':
        multicore()
    Pool
    #coding=utf-8
    #apply_async()中只能传递一个值,它只会放入一个核进行运算,但是传入值时要注意是可迭代的,所以在传入值后需要加逗号, 同时需要用get()方法获取返回值。
    import multiprocessing as mp
    def job(x):
        return x*x
    def multicore():
        pool = mp.Pool() 
        res = pool.map(job, range(10))
        print(res)
        res = pool.apply_async(job, (2,))
        # 用get获得结果
        print(res.get())
    if __name__ == '__main__':
        multicore()
    apply_async
    #coding=utf-8
    #用 apply_async() 输出多个结果 。
    import multiprocessing as mp
    def job(x):
        return x*x
    def multicore():
        pool = mp.Pool() 
        res = pool.map(job, range(10))
        print(res)
        res = pool.apply_async(job, (2,))
        # 用get获得结果
        print(res.get())
        # 迭代器,i=0时apply一次,i=1时apply一次等等
        multi_res = [pool.apply_async(job, (i,)) for i in range(10)]
        # 从迭代器中取出
        print([res.get() for res in multi_res])
    if __name__ == '__main__':
        multicore()
    apply_async(二)

    1、Pool默认调用是CPU的核数,传入processes参数可自定义CPU核数
    2、map() 放入迭代参数,返回多个结果
    3、apply_async()只能放入一组参数,并返回一个结果,如果想得到map()的效果需要通过迭代

    4、共享内存

    #我们可以通过使用Value数据存储在一个共享的内存表中。
    import multiprocessing as mp
    value1 = mp.Value('i', 0) 
    value2 = mp.Value('d', 3.14)
    #其中d和i参数用来设置数据类型的,d表示一个双精浮点类型,i表示一个带符号的整型。
    Value
    #在Python的mutiprocessing中,有还有一个Array类,可以和共享内存交互,来实现在进程之间共享数据。
    
    array = mp.Array('i', [1, 2, 3, 4])
    #这里的Array和numpy中的不同,它只能是一维的,不能是多维的。同样和Value 一样,需要定义数据形式,否则会报错。
    Array
    各参数代表的数据类型
    | Type code | C Type             | Python Type       | Minimum size in bytes |
    | --------- | ------------------ | ----------------- | --------------------- |
    | `'b'`     | signed char        | int               | 1                     |
    | `'B'`     | unsigned char      | int               | 1                     |
    | `'u'`     | Py_UNICODE         | Unicode character | 2                     |
    | `'h'`     | signed short       | int               | 2                     |
    | `'H'`     | unsigned short     | int               | 2                     |
    | `'i'`     | signed int         | int               | 2                     |
    | `'I'`     | unsigned int       | int               | 2                     |
    | `'l'`     | signed long        | int               | 4                     |
    | `'L'`     | unsigned long      | int               | 4                     |
    | `'q'`     | signed long long   | int               | 8                     |
    | `'Q'`     | unsigned long long | int               | 8                     |
    | `'f'`     | float              | float             | 4                     |
    | `'d'`     | double             | float             | 8                     |
    各参数代表的数据类型

    5、进程锁

    为了解决不同进程抢共享资源的问题,我们可以用加进程锁来解决。

    #coding=utf-8
    import multiprocessing as mp
    import time
    def job(v, num, l):
        l.acquire() # 锁住
        for _ in range(5):
            time.sleep(0.1) 
            v.value += num # 获取共享内存
            print(v.value)
        l.release() # 释放
    
    def multicore():
        l = mp.Lock() # 定义一个进程锁
        v = mp.Value('i', 0) # 定义共享内存
        p1 = mp.Process(target=job, args=(v,1,l)) # 需要将lock传入
        p2 = mp.Process(target=job, args=(v,3,l)) 
        p1.start()
        p2.start()
        p1.join()
        p2.join()
    
    if __name__ == '__main__':
        multicore()
    Lock
  • 相关阅读:
    微人事项目-mybatis-持久层
    通过外键连接多个表
    springioc
    Redis 消息中间件 ServiceStack.Redis 轻量级
    深度数据对接 链接服务器 数据传输
    sqlserver 抓取所有执行语句 SQL语句分析 死锁 抓取
    sqlserver 索引优化 CPU占用过高 执行分析 服务器检查
    sql server 远程备份 bak 删除
    冒泡排序
    多线程 异步 beginInvoke EndInvoke 使用
  • 原文地址:https://www.cnblogs.com/windyrainy/p/10749767.html
Copyright © 2011-2022 走看看