zoukankan      html  css  js  c++  java
  • Python 之并发编程之manager与进程池pool

    .manager

    常用的数据类型:dict list 能够实现进程之间的数据共享

    进程之间如果同时修改一个数据,会导致数据冲突,因为并发的特征,导致数据更新不同步。

    def work(dic, lock):

        # 简写:使用with语法自动给你上锁和解锁

        with lock:

            dic["count"] -= 1

            '''

            #上锁的正常写法

            #上锁

            lock.acquire()

            #数据值减一

            dic["conut"] -=1

            # 解锁

            lock.release()

            

            '''

    if __name__ == "__main__":

        # 创建Manager对象

        m = Manager()

        # 创建一个锁对象(为了保证数据的同步)

        lock = Lock()

        lst = []

        # 创建共享字典

        dic = m.dict({"count": 100})

        # 产生一百个进程。每个进程减一。

        for i in range(100):

            # 返回进程对象p

            p = Process(target=work, args=(dic, lock))

            p.start()

            lst.append(p)

        for i in lst:

            i.join()

        print(dic)

    二.进程池pool

    小知识点:

    import os

    # 计算你的机器有多少cpu

    print(os.cpu_count())

    1.比较pool Process 执行的速度

    因为进程池可以实现并行的概念,比process单核并发的速度快

    def func(num):

        # time.sleep(3)

        # time.sleep(random.uniform(0.1,1))

        print("这是发送的第%d邮件" % (num))

    if __name__ == "__main__":

        startime = time.time()

    # 1)进程池实现并行

    # 创建进程对象

        # pool() 里面的参数是同一个时间允许多少个进程并行

        '''

        4个任务

    (1)1个人做4

    (2)4个人做4

    (3)4个人做1

    任务量较少时,3的速度较快,任务量较大时,2的速度更快.

    因为如果任务线拉长,频繁切换cpu会占点时间.

        '''

    p = Pool()  #默认是电脑cpu的核数,默认的时候任务量大更好

    # 1 的时候 0.2560138702392578,如果是1表示电脑核数同时执行1个进程

    # 不停的更换cpu运行进程任务,这样避免cpu过热降频

        for i in range(100):

            p.apply_async(func, args=(i,))

        # 关闭进程池,不在接收新的进程

        p.close()

        # 主进程阻塞,等待子进程全部完成后再退出

        p.join()

        endtime = time.time()

        print(endtime - startime)  # 0.43866443634033203

        # (2) Process 单核并发程序

        startime = time.time()

        lst = []

        for i in range(100):

            p = Process(target=func,args=(i,))

            p.start()

            lst.append(p)

        for i in lst:

            i.join()

        endtime = time.time()

        print(endtime-startime) # 8.061640739440918

    2.apply 开启进程(未来可能去掉)

    同步阻塞,每次都要等待当前任务完成之后,在开启下一个进程,可加上返回值。

    def task(num):

        time.sleep(random.uniform(0.1,1)) # 同步程序

        print("%s:%s" % (num,os.getpid()))

        return num

    if __name__ == "__main__":

        p = Pool()

        for i in range(20):

            res = p.apply(task,args=(i,))

            print("-->",res)

        # 完完全全的同步程序,等上面走完了再执行finish

    print("finish")

    同一时间只有4个进程。

    3.apply_async 异步非阻塞程序 可以有返回值

    Process 产生的子进程,默认主进程等待所有子进程执行完毕之后再终止

    Pool进程池,只要主进程跑完了,立刻终止所有程序

    未来避免还没有执行就结束,进程time.sleep  和使用join守护。

    例:

    def task(num):

        #time.sleep(3)

        time.sleep(random.uniform(0.1,1)) #同步程序

        print("%s:%s" %(num,os.getpid()))

        return os.getpid()

    if __name__ == "__main__":

        p = Pool()

        lst = []

        lst2 = []

        for i in range(20):

            res = p.apply_async(task,args=(i,))  # res 是对象

            # print(res)

            # 1.把返回的对象一个一个插入到列表里

            lst.append(res)

        for i in lst:

            # 2.使用get方法获取返回值

            lst2.append(i.get())

        # 关闭进程池.不在接受新的进程

        p.close()

        # 主进程阻塞,等待 子进程全部完成后再退出

        p.join()

        # 主进程阻塞,等待进程全部完成后再退出

        # 返回的是默认 4个进程,因为当期机器是4个核心cpu

        print(set(lst2),len(set(lst2)))

        print("finish")

    4.进程池.map

     (与高阶函数map使用方法一样,只不过该map支持并行并发)

    # 进程池.map 返回的是列表

    # map默认底层中加了阻塞,等全部执行完毕之后,主进程在终止程序,区别于3

    例:

    if __name__ == "__main__":

        p = Pool()

        lst = p.map(task, range(100))

        print(lst)

        # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361, 400, 441, 484, 529, 576, 625, 676, 729, 784, 841, 900, 961, 1024, 1089, 1156, 1225, 1296, 1369, 1444, 1521, 1600, 1681, 1764, 1849, 1936, 2025, 2116, 2209, 2304, 2401, 2500, 2601, 2704, 2809, 2916, 3025, 3136, 3249, 3364, 3481, 3600, 3721, 3844, 3969, 4096, 4225, 4356, 4489, 4624, 4761, 4900, 5041, 5184, 5329, 5476, 5625, 5776, 5929, 6084, 6241, 6400, 6561, 6724, 6889, 7056, 7225, 7396, 7569, 7744, 7921, 8100, 8281, 8464, 8649, 8836, 9025, 9216, 9409, 9604, 9801]

        # 如果出现了join,一定需要加上close,要么同时出现,要么都没有

        # p.close()

        # p.join()

    print(123455)

    5.关闭进程池

    关闭进程池,不会再接受新的进程 

    例:

    def task(num):

        time.sleep(random.uniform(0.1,1))

        print("%s:%s" % (num,os.getpid()))

        return num ** 2

    if __name__ == "__main__":

        p = Pool()

        lst= []

        for i in range(20):

            res = p.apply_async(task,args=(i,))

            lst.append(res)

        # get 函数内部默认加了阻塞,获取完所有值之后再向下执行

        for i in lst:

            print(i.get())

        p.close()

        # 如果执行close,不能够继续往进程池里面加进程了

        # res = p.apply_async(task,args=(112233,))

        p.join()

        print("finish")

    去掉程序例:  # res = p.apply_async(task,args=(112233,))的注释就出现想要的结果:

  • 相关阅读:
    jquery 序列化form表单
    nginx for windows 安装
    nodejs idea 创建项目 (一)
    spring 配置 shiro rememberMe
    idea 2018 解决 双击shift 弹出 search everywhere 搜索框的方法
    redis 在windows 集群
    spring IOC控制反转和DI依赖注入
    redis 的安装
    shiro 通过jdbc连接数据库
    handlebars的用法
  • 原文地址:https://www.cnblogs.com/hszstudypy/p/11222612.html
Copyright © 2011-2022 走看看