zoukankan      html  css  js  c++  java
  • python--(十五步代码学会进程)

    python--(十五步代码学会进程)

    一.进程的创建

    import time
    import os
    
    #os.getpid() 获取自己进程的id号
    #os.getppid() 获取自己进程的父进程id号
    
    from multiprocessing import Process
    def func():
        print("aaa")
        time.sleep(1)
        print("子进程>>>",os.getpid())
        print("该子进程的父进程>>>",os.getppid())
        print(12345)
    
    if __name__ == "__main__":
        p = Process(target=func,)
        p.start()
        print("*" * 10)
        print("主进程>>>",os.getpid())
        print("父进程>>>",os.getppid())
    
    给要执行的函数传参数
    import time
    from multiprocessing import Process
    def func(x,y):
        print(x)
        time.sleep(1)
        print(y)
    
    if __name__ == "__main__":
        p = Process(target=func,args=("姑娘","来玩啊"))#这是func需要接受的参数的传输方式
        p.start()
        print("父进程执行结束")

    二.join方法

    import time
    from multiprocessing import Process
    
    验证join方法
    global_num = 100
    def func1():
        time.sleep(2)
        global global_num
        global_num = 0
        print("子进程全局变量>>>",global_num)
    if __name__ == "__main__":
        p1 = Process(target=func1,)
        p1.start()
        print("子进程执行")
        time.sleep(3)
        p1.join()#阻塞住,等待你的p1子进程执行sing结束,主进程的程序才能从这里继续往下执行
        print("主进程的全局变量>>>",global_num)
    
    验证了一下并发的执行时间
    import time
    from multiprocessing import Process
    
    def func1(n):
        time.sleep(n)
        print("func1",n)
    def func2(n):
        time.sleep(n)
        print("func2",n)
    def func3(n):
        time.sleep(n)
        print("func3",n)
    if __name__ == "__main__":
        p1 = Process(target=func1,args=(1,))
        p2 = Process(target=func2,args=(2,))
        p3 = Process(target=func3,args=(3,))
    
        p1.start()
        p2.start()
        p3.start()
    
    for循环在创建进程中的应用
    import time
    from multiprocessing import Process
    def func1(n):
        time.sleep(1)
        print(n)
    
    if __name__ == "__main__":
        pro_list = []
        for i in range(10):
            p1 = Process(target=func1,args=(i,))
            p1.start()
            pro_list.append(p1)
            # p1.join()
    
        # for p in pro_list:
        #     # p.join()
        p1.join()
        print("主进程结束")
    View Code

      僵尸进程和孤儿进程

    import time
    import os
    from multiprocessing import Process
    
    def func1():
        time.sleep(30)
        print(os.getpid())
        print('子进程')
    
    if __name__ == '__main__':
        p1 = Process(target=func1,)
        p1.start()
        # p1.join()
        # time.sleep(2)
        # print(p1.pid)
        print('主进程的ID',os.getpid())
        print('主进程结束')
    View Code


    三.创建进程的两种方式

    import time
    from multiprocessing import Process
    import os
    # import test01
    # def func1(n):
    #     # time.sleep(1)
    #     print(n)
    #
    # def func2(n):
    #     # time.sleep(1)
    #     print(n)
    #
    # def func3(n):
    #     # time.sleep(1)
    #     print(n)
    #
    # def func4(n):
    #     # time.sleep(1)
    #     print(n)
    #
    # if __name__ == '__main__':
    #     p1 = Process(target=func1,args=(1,))
    #     p2 = Process(target=func2,args=(2,))
    #     p3= Process(target=func3,args=(3,))
    #     p4 = Process(target=func4,args=(4,))
    #     p1.start() # run()
    #     p2.start()
    #     p3.start()
    #     p4.start()
    #     # time.sleep(0.5)
    #     print('主进程结束')
    
        # 之前同步执行的
        # func1(1)
        # func2(2)
        # func3(3)
        # func4(4)
    
    创建进程的第一种方式:
        # p1 = Process(target=func1, args=(1,))
        # p1.start()
    创建进行的第二种方式:
        #自己定义一个类,继承Process类,必须写一个run方法,想传参数,自行写init方法,然后执行super父类的init方法
    
    # class MyProcess(Process):
    #     def __init__(self,n,name):
    #         super().__init__()
    #         self.n = n
    #         self.name = name
    #
    #     def run(self):
    #         # print(1+1)
    #         # print(123)
    #         print('子进程的进程ID',os.getpid())
    #         print('你看看n>>',self.n)
    #
    # if __name__ == '__main__':
    #     p1 = MyProcess(100,name='子进程1')
    #     p1.start() #给操作系统发送创建进程的指令,子进程创建好之后,要被执行,执行的时候就会执行run方法
    #     print('p1.name',p1.name)
    #     print('p1.pid',p1.pid)
    #     print('主进程结束')
    View Code

    四.进程的其他方法terminate is_alive.py

    import time
    from multiprocessing import Process
    def func1():
        time.sleep(2)
        print()
        print("子进程")
    if __name__ == "__main__":
        p1 = Process(target=func1,)
        p1.start()
        p1.terminate() #给操作系统发了一个关闭p1子进程的信号,关闭进程
        time.sleep(1)
        print("进程是否还活着:",p1.is_alive())#是返回True,否返回False
        print(p1.pid)
        print("主进程结束")
    View Code

    五.守护进程

    #守护的子进程跟着主进程走
    
    import time
    import os
    from multiprocessing import Process
    def func():
        time.sleep(5)
        print('子进程', os.getpid())
    
    if __name__ == '__main__':
        p1 = Process(target=func)
        p1.daemon = True   # 设置守护进程, 当主进程结束时全部子进程立即结束
        p1 .start()
        # time.sleep(5.5)
        print('主进程结束')
    View Code

    六.验证进程之间是空间隔离的

    import time
    from multiprocessing import Process
    #进程之间是空间隔离的,不共享资源
    global_num = 100
    def func1():
        global global_num
        global_num = 0
        print("子进程全局变量>>>",global_num)
    if __name__ == "__main__":
        p1 = Process(target=func1,)
        p1.start()
        time.sleep(1)
        print("主进程的全局变量>>>",global_num)
    View Code

    七.子进程中不能使用input

    from multiprocessing import Process
    
    def func1():
        s = input('>>>')
    
    if __name__ == '__main__':
    
        p1 = Process(target=func1,)
        p1.start()
    
        # a = input('>>>:')
        print('主进程结束')
    
    ##报错
    View Code


     

    八.进程锁

      ticket_lock = Lock()#创建锁  .acquire()#加锁,  .release()#解锁

    同步锁的作用:#加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。 # 虽然可以用文件共享数据实现进程间通信,但问题是:
    # 1.效率低(共享数据基于文件,而文件是硬盘上的数据) # 2.需要自己加锁处理
    import json
    import time
    import random
    from multiprocessing import Process,Lock
    
    def get_ticket(i,ticket_lock):
        print("我们都到齐了,大家预备!!123")
        time.sleep(1)
        #所有代码 异步执行,到这里等待,同时再去抢下面的代码执行
        ticket_lock.acquire()
        #这里有个门,只有一个人能够抢到这个钥匙,加锁
        with open("ticket","r") as f:
            last_ticket_info = json.load(f)
            #将文件数据load为字典类型的数据
        last_ticket = last_ticket_info["count"]
        print(last_ticket)
        #查看一下余票的信息
        if last_ticket > 0:
            #如果看到余票大于零,说明你可以抢到票
            time.sleep(random.random())
            #模拟网络延迟时间
            last_ticket = last_ticket - 1
            last_ticket_info["count"] = last_ticket
            with open("ticket","w") as f:
                #将修改后的参数写回文件
                json.dump(last_ticket_info,f)
            print("%s号抢到了,丫nb!" % i)
        else:
            print("%s号傻逼,没票了,明年再来" % i)
            ticket_lock.release()
    if __name__ ==  "__main__":
        ticket_lock = Lock()
        #创建一个进程锁
        for i in range(10):
            p = Process(target=get_ticket, args=(i, ticket_lock))
            p.start()
    进程锁模拟购票系统

    九.信号量

    Semaphore()
    互斥锁同时只允许一个线程更改数据,而信号量Semaphore是同时允许一定数量的线程更改数据 。
    假设商场里有4个迷你唱吧,所以同时可以进去4个人,如果来了第五个人就要在外面等待,等到有人出来才能再进去玩。
    实现:
    信号量同步基于内部计数器,每调用一次acquire(),计数器减1;每调用一次release(),计数器加1.当计数器为0时,acquire()调用被阻塞。这是迪科斯彻(Dijkstra)信号量概念P()和V()的Python实现。信号量同步机制适用于访问像服务器这样的有限资源。
    信号量与进程池的概念很像,但是要区分开,信号量涉及到加锁的概念
    import time
    import random
    from multiprocessing import Process,Semaphore
    
    def dbj(i,s):
        s.acquire()
        print('%s号男主人公来洗脚'%i)
        print('-------------')
        time.sleep(random.randrange(3,6))
        # print(time.time())
        s.release()
    
    if __name__ == '__main__':
        s = Semaphore(4) #创建一个计数器,每次acquire就减1,直到减到0,那么上面的任务只有4个在同时异步的执行,后面的进程需要等待.
        for i in range(10):
            p1 = Process(target=dbj,args=(i,s,))
            p1.start()
    View Code
    
    

    十.事件

    e = Event()#  e.set()#将e改为True  e.clear()   # 将e改为False
    python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法 set、wait、clear。

        事件处理的机制:全局定义了一个“Flag”,如果“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,如果“Flag”值为True,那么event.wait 方法时便不再阻塞。
    from multiprocessing import Process, Event
    e = Event() #False True
    
    print(e.is_set())
    e.set() #将e事件的状态改为True
    print("在这里等待")
    e.clar()   #将e事件的状态改为False
    print("111")
    e.wait()
    print("是真的吗")
    View Code
    import time
    from multiprocessing import Process,Event
    
    #模拟红绿灯执行状态的函数
    def traffic_lights(e):
        while 1:
            print("红灯啦")
            time.sleep(5)
            e.set() #将e改为True
            print("绿灯了")
            time.sleep(3)
            e.clear()   #将e改为False
    def car(i,e):
        if not e.is_set():  #新来的车看到的是红灯
            print("我们在等待....")
            e.wait()
            print("走你")
        else:
            print("可以走了!!!")
    if __name__ == "__main__":
        e = Event()
        hld = Process(target=traffic_lights, args=(e,))
        hld.start()
        while 1:
            time.sleep(0.5)
            #创建10个车
            for i in range(3):
                p1 = Process(target=car,args=(i,e,))
                p1.start()
    Even模拟红绿灯

    十一.队列

    # 遵循先进先出的原则   q = Queue(3)    创建3个队列 q.put()发送数据  q.get()接受数据
    q = Queue([maxsize])  #创建共享的进程队列
    
    q.get( [ block [ ,timeout ] ] )  #返回q中的一个项目。如果q为空,此方法将阻塞,直到队列中有项目可用为止。block用于控制阻塞行为,默认为True. 如果设置为False,将引发Queue.Empty异常(定义在Queue模块中)。timeout是可选超时时间,用在阻塞模式中。如果在制定的时间间隔内没有项目变为可用,将引发Queue.Empty异常。
    
    q.get_nowait( ) #和q.get(False)方法,一样
    
    q.put(item [, block [,timeout ] ] ) #将item放入队列。如果队列已满,此方法将阻塞至有空间可用为止。block控制阻塞行为,默认为True。如果设置为False,将引发Queue.Empty异常(定义在Queue库模块中)。timeout指定在阻塞模式中等待可用空间的时间长短。超时后将引发Queue.Full异常。
    
    q.qsize() #返回队列中目前项目的正确数量。此函数的结果并不可靠,因为在返回结果和在稍后程序中使用结果之间,队列中可能添加或删除了项目。在某些系统上,此方法可能引发NotImplementedError异常。
    
    q.empty() #如果调用此方法时 q为空,返回True。如果其他进程或线程正在往队列中添加项目,结果是不可靠的。也就是说,在返回和使用结果之间,队列中可能已经加入新的项目。
    
    .full() #如果q已满,返回为True. 由于线程的存在,结果也可能是不可靠的(参考q.empty()方法)。。
    
    q.close() #关闭队列,防止队列中加入更多数据。调用此方法时,后台线程将继续写入那些已入队列但尚未写入的数据,但将在此方法完成时马上关闭。如果q被垃圾收集,将自动调用此方法。关闭队列不会在队列使用者中生成任何类型的数据结束信号或异常。例如,如果某个使用者正被阻塞在get()操作上,关闭生产者中的队列不会导致get()方法返回错误。
    
    q.cancel_join_thread()  #不会再进程退出时自动连接后台线程。这可以防止join_thread()方法阻塞。
    
    q.join_thread() #连接队列的后台线程。此方法用于在调用q.close()方法后,等待所有队列项被消耗。默认情况下,此方法由不是q的原始创建者的所有进程调用。调用q.cancel_join_thread()方法可以禁止这种行为。
    队列的相关方法
     
    from multiprocessing import Process,Queue
    #先进先出
    q = Queue(3)
    q.put(1)
    q.put(2)
    # print(q.full()) #q.full()队列满了返回True,不满返回False
    q.put(3)
    # print('>>>>',q.full())
    
    q.get_nowait()= ()  #不会阻塞住,相当于空队列
    
    # try:
    #     q.get(False)  # queue.Empty
    #     q.get_nowait() #queue.Empty
    # except:
    #     print('队列目前是空的')
    
    # while 1:
    #     try:
    #         q.get(False)  #queue.Empty
    #     except:
    #         print('队列目前是空的')
    View Code

      队列实现进程的通信

    import time
    from multiprocessing import Process,Queue
    
    def girl(q):
        print('来自boy的信息',q.get())
        print('来自校领导的凝视',q.get())
    def boy(q):
        q.put('约吗')
    
    if __name__ == '__main__':
        q = Queue(5)
        boy_p = Process(target=boy,args=(q,))
        girl_p = Process(target=girl,args=(q,))
        boy_p.start()
        girl_p.start()
        time.sleep(1)
        q.put('好好工作,别乱搞')
    队列实现进程的通信


    十二.生产者消费者模式

    #生产者消费者模型总结
    
        #程序中有两类角色
            一类负责生产数据(生产者)
            一类负责处理数据(消费者)
            
        #引入生产者消费者模型为了解决的问题是:
            平衡生产者与消费者之间的工作能力,从而提高程序整体处理数据的速度
            
        #如何实现:
            生产者<-->队列<——>消费者
        #生产者消费者模型实现类程序的解耦和
    import time
    from multiprocessing import Process,Queue
    def producer(q):
        for i in range(1,11):
            time.sleep(1)
            print('生产了包子%s号' % i)
            q.put(i)
        q.put(None)  #针对第三个版本的消费者,往队列里面加了一个结束信号
    #版本1
    # def consumer(q):
    #     while 1:
    #         time.sleep(2)
    #         s = q.get()
    #         print('消费者吃了%s包子' % s)
    
    #版本2
    # def consumer(q):
    #     while 1:
    #         time.sleep(0.5)
    #         try:
    #             s = q.get(False)
    #             print('消费者吃了%s包子' % s)
    #         except:
    #             break
    
    def consumer(q):
        while 1:
            time.sleep(2)
            s = q.get()
            if s == None:
                break
            else:
                print('消费者吃了%s包子' % s)
    生产者消费者模型
    生产者消费者模型
    import time
    from multiprocessing import Process,Queue
    
    def producer(q):
        for i in range(1,11):
            time.sleep(1)
            print('生产了包子%s号' % i)
            q.put(i)
    
    def consumer(q):
        while 1:
            time.sleep(2)
            s = q.get()
            if s == None:
                break
            else:
                print('消费者吃了%s包子' % s)
    
    if __name__ == '__main__':
        #通过队列来模拟缓冲区,大小设置为20
        q = Queue(20)
        #生产者进程
        pro_p = Process(target=producer,args=(q,))
        pro_p.start()
        #消费者进程
        con_p = Process(target=consumer,args=(q,))
        con_p.start()
        pro_p.join()
    
        q.put(None)
    生产者消费者模型主进程发送结束信号
     1 #生产者消费者模型
     2 import time
     3 from multiprocessing import Process,Queue,JoinableQueue
     4 
     5 def producer(q):
     6     for i in range(1,11):
     7         time.sleep(0.5)
     8         print('生产了包子%s号' % i)
     9         q.put(i)
    10     q.join()
    11     print('在这里等你')
    12 def consumer(q):
    13     while 1:
    14         time.sleep(1)
    15         s = q.get()
    16         print('消费者吃了%s包子' % s)
    17         q.task_done()  #给q对象发送一个任务结束的信号
    18 
    19 if __name__ == '__main__':
    20     #通过队列来模拟缓冲区,大小设置为20
    21     q = JoinableQueue(20)
    22     #生产者进程
    23     pro_p = Process(target=producer,args=(q,))
    24     pro_p.start()
    25     #消费者进程
    26     con_p = Process(target=consumer,args=(q,))
    27     con_p.daemon = True #
    28     con_p.start()
    29     pro_p.join()
    30     print('主进程结束')
    JoinableQueue的生产者消费者模型

     十三.管道

     from multiprocessing import Process,Pipe
    conn1,conn2 = Pipe()
    进程间通信(IPC)方式二:管道(不推荐使用,了解即可),会导致数据不安全的情况出现
    # 管道
    from multiprocessing import Process,Pipe
    import time
    # conn1,conn2 = Pipe()
    # conn1.send("你好")
    # print(">>>>>")
    # msg = conn2.recv()
    # print(msg)
    
    # def func1(conn2):
    #     try:
    #         msg = conn2.recv()
    #         print(">>>",msg)
    #         #如果管道一端关闭了,那么另外一端在接收消息的时候回报错
    #     except EOFError:
    #         print("对方管道一端已经关闭")
    #         conn2.close()
    # if __name__ == '__main__':
    #     conn1,conn2 = Pipe()
    #     p = Process(target=func1,args=(conn2,))
    #     p.start()
    #     conn1.send("收到了吗")
    
    def func1(conn1,conn2):
        msg = conn2.recv()  #阻塞
        print(">>>>",msg)
    if __name__ == '__main__':
        conn1,conn2 = Pipe()
        p = Process(target=func1, args=(conn1, conn2,))
        p.start()
        conn1.send("收到了吗")
        conn1.close()
        #conn1.recv()        #OSError: handle is closed
    View Code

    十四.数据共享(不安全)

    # 数据共享
    # from multiprocessing import Process,Manager
    #
    # def func(m_dic):
    #     m_dic["辉哥"] = "辉哥大帅比"
    # if __name__ == '__main__':
    #     m = Manager()
    #     m_dic = m.dict({"辉哥":"辉哥帅不帅"})
    #     print("主进程",m_dic)
    #     p = Process(target=func, args=(m_dic,))
    #     p.start()
    #     p.join()
    #     print("主进程2",m_dic)
    
    # 数据共享manager不安全
    # from multiprocessing import Process,Manager,Lock
    # def func(m_dic, ml):
    #     """不加锁的情况会出现数据错乱
    #         m_dic["count"] -= 1
    #         下面是加锁的另一种形式
    #         等同 : ml.acquire()
    #                m_dic["count"] -= 1
    #                ml.release()"""
    #     with ml:
    #         m_dic["count"] -= 1
    # if __name__ == '__main__':
    #     m = Manager()
    #     ml = Lock()
    #     m_dic = m.dict({"count":100})
    #     p_list = []
    #     for i in range(20):
    #         p1 = Process(target=func,args=(m_dic, ml,))
    #         p1.start()
    #         p_list.append(p1)
    #     [pp.join() for pp in p_list]
    #     print("主进程",m_dic)
    View Code

    十五.进程池

      multiprocess.Poll模块

    创建进程池的类:如果指定numprocess为3,则进程池会从无到有创建三个进程,然后自始至终使用这三个进程去执行所有任务(高级一些的进程池可以根据你的并发量,搞成动态增加或减少进程池中的进程数量的操作),不会开启其他进程,提高操作系统效率,减少空间的占用等。

    进程池相关方法:

    p.apply(func [, args [, kwargs]]):
    在一个池工作进程中执行func(*args,**kwargs),然后返回结果。 '''需要强调的是:此操作并不会在所有池工作进程中并执行func函数。如果要通过不同参数并发地执行func函数,必须从不同线程调用p.apply()函数或者使用p.apply_async()''' p.apply_async(func [, args [, kwargs]]):
    在一个池工作进程中执行func(
    *args,**kwargs),然后返回结果。 '''此方法的结果是AsyncResult类的实例,callback是可调用对象,接收输入参数。当func的结果变为可用时,将理解传递给callback。callback禁止执行任何阻塞操作,否则将接收其他异步操作中的结果。''' p.close():
    关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成 P.jion():
    等待所有工作进程退出。此方法只能在close()或teminate()之后调用 方法apply_async()和map_async()的返回值是AsyncResul的实例obj。实例具有以下方法 obj.get():返回结果,如果有必要则等待结果到达。timeout是可选的。如果在指定时间内还没有到达,将引发一场。如果远程操作中引发了异常,它将在调用此方法时再次被引发。 obj.ready():如果调用完成,返回True obj.successful():如果调用完成且没有引发异常,返回True,如果在结果就绪之前调用此方法,引发异常 obj.wait([timeout]):等待结果变为可用。 obj.terminate():立即终止所有工作进程,同时不执行任何清理或结束任何挂起工作。如果p被垃圾回收,将自动调用此函数
    import time
    from multiprocessing import Process,Pool
    
    def func(n):
        print(n)
    
    
    if __name__ == '__main__':
        pool = Pool(4)
        # pool.map(func,range(100)) #参数是可迭代的
        pool.map(func,['sb',(1,2)]) #参数是可迭代的
    进程池的map传参
    # 进程池
    import time
    from multiprocessing import Process,Pool
    def func(n):
        for i in range(5):
            time.sleep(1)
            n= n + i
        print(n)
    if __name__ == '__main__':
        #用时间验证一下传参
        pool_start_time = time.time()
        pool = Pool(4) #4个进程
        pool.map(func,range(100))
        #map(方法,可迭代对象) 映射 自带join功能,异步执行任务
        pool_end_time = time.time()
        pool_dif_time = pool_end_time - pool_start_time
    
        #多进程的执行时间
        # p_s_time = time.time()
        # p_list = []
        # for i in range(200):
        #     p1 = Process(target=func, args=(i,))
        #     p1.start()
        #     p_list.append(p1)
        # [p.join() for p in p_list]
        # p_e_time = time.time()
        # p_dif_time = p_e_time - p_s_time
    
        print('进程池的执行时间', pool_dif_time)
        print('多进程的执行时间', p_dif_time)
    进程池和多进程时间对比
    import time
    from multiprocessing import Process,Pool
    def fun(i):
        time.sleep(0.5)
        return i**2
    if __name__ == '__main__':
        p = Pool(4)
        for i in range(10):
            res = p.apply(fun,args=(i,))
            # apply 同步执行的进程方法,他会等待你的任务的返回结果
            print(res)
    进程池的同步方法 apply
    import time
    from multiprocessing import Process,Pool
    def fun(i):
        time.sleep(1)
        print(i)
        return i**2
    if __name__ == '__main__':
        p = Pool(4)
        res_list = []
        for i in range(10):
            res = p.apply_async(fun,args=(i,))
            # #同步执行的方法,他会等待你的任务的返回结果,
            res_list.append(res)
    
        p.close() # 不是关闭进程池,而是不允许再有其他任务来使用进程池
        p.join()  # 这是感知进程池中任务的方法,进程池中所有的进程随着主进程的结束而结束了,等待进程池的任务全部执行完
        for e_res in res_list:
            print("结果", e_res.get())
    进程池的异步方法apply_async

     

    回调函数:

    需要回调函数的场景:进程池中任何一个任务一旦处理完了,就立即告知主进程:我好了额,你可以处理我的结果了。主进程则调用一个函数去处理该结果,该函数即回调函数,这是进程池特有的,普通进程没有这个机制,但是我们也可以通过进程通信来拿到返回值,进程池的这个回调也是进程通信的机制完成的。

    我们可以把耗时间(阻塞)的任务放到进程池中,然后指定回调函数(主进程负责执行),这样主进程在执行回调函数时就省去了I/O的过程,直接拿到的是任务的结果

    import os
    from multiprocessing import Pool
    
    def func1(n):
        print('func1>>',os.getpid())
        # print('func1')
        return n*n
    
    
    def func2(nn):
        print('func2>>',os.getpid())
        # print('func2')
        print(nn)
        # import time
        # time.sleep(0.5)
    if __name__ == '__main__':
        print('主进程:',os.getpid())
        p = Pool(4)
        p.apply_async(func1,args=(10,),callback=func2)
        p.close()
        p.join()
    进程池的回调函数

     进程池版的socket并发聊天代码示例:

    #Pool内的进程数默认是cpu核数,假设为4(查看方法os.cpu_count())
    #开启6个客户端,会发现2个客户端处于等待状态
    #在每个进程内查看pid,会发现pid使用为4个,即多个客户端公用4个进程
    from socket import *
    from multiprocessing import Pool
    import os
    
    server=socket(AF_INET,SOCK_STREAM)
    server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
    server.bind(('127.0.0.1',8080))
    server.listen(5)
    
    def talk(conn):
        print('进程pid: %s' %os.getpid())
        while True:
            try:
                msg=conn.recv(1024)
                if not msg:break
                conn.send(msg.upper())
            except Exception:
                break
    
    if __name__ == '__main__':
        p=Pool(4)
        while True:
            conn,*_=server.accept()
            p.apply_async(talk,args=(conn,))
            # p.apply(talk,args=(conn,client_addr)) #同步的话,则同一时间只有一个客户端能访问
    
    复制代码
    sserver 服务端:ftp
    from socket import *
    
    client=socket(AF_INET,SOCK_STREAM)
    client.connect(('127.0.0.1',8080))
    
    
    while True:
        msg=input('>>: ').strip()
        if not msg:continue
    
        client.send(msg.encode('utf-8'))
        msg=client.recv(1024)
        print(msg.decode('utf-8'))
    client 用户端:ftp

    进程池中爬虫示例:

    from multiprocessing import Pool
    import time,random
    import requests
    import re
    
    def get_page(url,pattern):
        response=requests.get(url)
        if response.status_code == 200:
            return (response.text,pattern)
    
    def parse_page(info):
        page_content,pattern=info
        res=re.findall(pattern,page_content)
        for item in res:
            dic={
                'index':item[0],
                'title':item[1],
                'actor':item[2].strip()[3:],
                'time':item[3][5:],
                'score':item[4]+item[5]
    
            }
            print(dic)
    if __name__ == '__main__':
        pattern1=re.compile(r'<dd>.*?board-index.*?>(d+)<.*?title="(.*?)".*?star.*?>(.*?)<.*?releasetime.*?>(.*?)<.*?integer.*?>(.*?)<.*?fraction.*?>(.*?)<',re.S)
    
        url_dic={
            'http://maoyan.com/board/7':pattern1,
        }
    
        p=Pool()
        res_l=[]
        for url,pattern in url_dic.items():
            res=p.apply_async(get_page,args=(url,pattern),callback=parse_page)
            res_l.append(res)
    
        for i in res_l:
            i.get()
    
        # res=requests.get('http://maoyan.com/board/7')
        # print(re.findall(pattern,res.text))
    爬虫示例
    
    
     
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    

     

  • 相关阅读:
    Zabbix 添加内存告警
    Oracle Drop 表数据恢复
    Kubernetes port-forward
    Jenkins指定tag发布到k8s环境
    Jenkins系列之六——拉取指定branch或tag
    MySQL5.7 报错 ERROR 1820 (HY000): You must reset your password using ALTER USER statement before executing this statement
    docker部署常见应用
    Vim 中进行文件目录操作
    Oracle使用expdp/impdp迁移数据
    UmengAppDemo【友盟统计SDK集成以及多渠道打包配置,基于V7.5.3版本】
  • 原文地址:https://www.cnblogs.com/konghui/p/9839343.html
Copyright © 2011-2022 走看看