zoukankan      html  css  js  c++  java
  • 进程

    进程  进程即正在执行的一个过程。进程是对正在运行程序的一个抽象.

    开启进程的方法:

    import time
    import os
    from multiprocessing import Process
    
    def func(args,args2):
        print(args,args2)
        print('',os.getpid())
        print('子的父',os.getppid())
    
    if __name__=='__main__':
        p = Process(target=func,args=('参数1','参数2'))  #主进程
              # p是一个进程对象  还没启动
        p.start()  #创建一个子进程 和父进程异步进行
        print('*'*5)
        print('',os.getpid())
        print('父的父',os.getppid())
    
    #进程的生命周期
        #开启子进程的主进程:
            #主进程的代码如果长 等待自己的代码执行结束
            #子进程始行时间长 主进程执行完毕后 等待子进行执行完毕后 主进程才结束
    函数方法
     import os
    # from multiprocessing import Process
    #
    # class Myprocess(Process):
    #     def __init__(self,args,args2):
    #         super().__init__()
    #         self.args = args
    #         self.args2 = args2
    #     def run(self):
    #         print(self.pid)
    #
    #
    # if __name__=='__main__':
    #     print('主:',os.getpid())
    #     p1 = Myprocess(1,2)
    #     p1.start()
    #     p2 = Myprocess(2,3)
    #     p2.start()
    继承方法

    开启多个子进程:

    # import time
    # from multiprocessing import Process
    
    # def func(args,args2):
    #     print('*'*args)
    #     time.sleep(3)
    #     print('*'*args2)
    #
    # if __name__=='__main__':
    #     p_list = []
    #     for i in range(1,10):
    #         p = Process(target=func,args=(1*i,2*i))
    #         p_list.append(p)
    #         p.start()
    #     [p.join() for p in p_list]
    #     print('执行完了')
    多进程用list

    多进程中的几种方法:

    join()方法  感知一个子进程的结束
    p.daemon = True    在start()前插入代码 就是将该子进程设置为守护进程
    守护进程在主进程代码读完后结束,此时主进程可能还没有结束 而是在等待别的子进程执行结束才结束

    p.is_alive检验一个进程是否还活着
    # import time
    # from multiprocessing import Process
    # def fun():
    #     while 1:
    #         print('活着')
    #         time.sleep(0.1)
    # def fun2():
    #     print('fun2 start')
    #     time.sleep(10)
    #     print('fub2 end')
    #
    # if __name__=='__main__':
    #     p = Process(target=fun)
    #     p.daemon = True    # 设置子进程为守护进程
    #     p.start()
    #     p2 = Process(target=fun2)
    #     p2.start()
    #     print(p2.is_alive())  # is_alive检验一个进程是否活着
    #     print(p2.name)
    #     # i = 0
        # while i<5:
        #     print('socket server')
        #     time.sleep(1)
        #     i += 1
    上面方法的使用

    进程锁:

    多个进程同时抢一个数据就会出现数据安全问题,此时需要用进程锁限制数据,同一时间只能被一个进程操作

    归还数据之后才能被另外的进程操作。此时牺牲效率保证安全

    import time
    import json
    from multiprocessing import Process
    from multiprocessing import Lock
    
    def show(i):
        with open('ticket') as f:
            dic = json.load(f)
        print('余票:%s'%dic['ticket'])
    
    def buy_tk(i,lock):
        lock.acquire()
        with open('ticket') as f:
            dic = json.load(f)
            time.sleep(0.1)
        if  dic['ticket']>0:
            dic['ticket'] -= 1
            print('33[32m%s买到票了33[0m'%i)
        else:
            print('33[31m%s没买到票33[0m'%i)
        time.sleep(0.1)
        with open('ticket','w') as f:
            json.dump(dic,f)
        lock.release()
    
    if __name__=='__main__':
        for i in range(10):
            p = Process(target=show,args=(i,))
            p.start()
        lock = Lock()
        p.join()
        for i in range(10):
            p = Process(target=buy_tk,args=(i,lock))
            p.start()
    火车票问题

    事件

    一个信号是所有进程都进入阻塞状态,也可以控制解除阻塞

    事件被创建 默认是阻塞状态

    # from multiprocessing import Event,Process
    # import time
    # import random
    # 
    # def cars(i,e):
    #     if not e.is_set():
    #         print('33[31m%s等待33[0m'%i)
    #         e.wait()
    #     print('33[32m%s通过33[0m'%i)
    # 
    # 
    # 
    # def light(e):
    #     while 1:
    #         if e.is_set():
    #             e.clear()
    #             print('33[31m红灯亮了33[0m')
    # 
    #         else:
    #             e.set()
    #             print('33[32m绿灯亮了33[0m')
    # 
    #         time.sleep(2)
    # 
    # if __name__=='__main__':
    #     e = Event()
    #     p = Process(target=light,args=(e,))
    #     p.start()
    #     for i in range(20):
    #         car = Process(target=cars,args=(i,e))
    #         car.start()
    #         time.sleep(random.random())
    红绿灯问题

    信息量:

    同一时间只能n进程进行操作,房间上n把锁,只有等里面的出来,外面的才能再进去

    import time
    import random
    from multiprocessing import Process
    from multiprocessing import Semaphore
    
    def func(i,sem):
        sem.acquire()
        print('%i走进ktv'%i)
        time.sleep(random.randint(2,5))
        print('%i走出ktv'%i)
        sem.release()
    
    if __name__=='__main__':
        sem = Semaphore(4)        # 给房间上锁 每次只能进4个
        for i in range(20):
            Process(target=func,args=(i,sem)).start()

    队列:

    队列能实现进程之间的通信

    # from multiprocessing import Queue
    #
    # q = Queue(5)
    # q.put(1)                 #put方法 往队列里放
    # q.put(2)
    # q.put(3)
    # q.put(4)
    # q.put(5)
    # # q.put(5)            #若队列已满 再放会阻塞
    # print(q.full())          #判断队列是否满了
    # q.get()               #get方法 从队列里取
    # q.get()
    # q.get()
    # q.get()
    # q.get()
    # # q.get()            #若队列为空 再取就会阻塞
    # print(q.empty())     # 队列是否为空
    # try:
    #     q.get_nowait()       # 有值就取 没有报错 不等待 不阻塞
    # except:
    #     print('已经没有值了')
    
    
    # 用Queue实现进程之间通信  数据传递
    from multiprocessing import Queue,Process
    def prodect(q):
        q.put('hello')
    def consume(q):
        print(q.get())
    
    if __name__=='__main__':
        q = Queue()
        p = Process(target=prodect,args=(q,))
        p.start()
        c = Process(target=consume,args=(q,))
        c.start()
    from multiprocessing import Queue,Process
    import time
    import random
    
    def producer(name,food,q):
        for i in range(4):
            time.sleep(random.randint(1,3))
            fo = '%s%s'%(food,i)
            print('%s生产了%s'%(name,fo))
            q.put(fo)
    
    def consumer(name,q):
        while 1:
            food = q.get()
            print('33[31m%s消费了%s33[0m'%(name,food))
            time.sleep(random.randint(1,3))
    
    
    if __name__=='__main__':
        q = Queue()
        p = Process(target=producer,args=('dh','包子',q))
        p2 = Process(target=producer,args=('大黄','榴莲',q))
        c = Process(target=consumer,args=('小包',q))
        c2 = Process(target=consumer,args=('二黄',q))
        p.start()
        p2.start()
        c.start()
        c2.start()
        p.join()
        p2.join()
    生产者消费者模型 queue

    该方法最后会阻塞,进程不会结束

    from multiprocessing import JoinableQueue,Process
    import time
    import random
    
    def producer(name,food,q):
        for i in range(4):
            time.sleep(random.randint(1,3))
            fo = '%s%s'%(food,i)
            print('%s生产了%s'%(name,fo))
            q.put(fo)
        q.join()  #阻塞 直到队列中的所有数据全部被处理完毕
    
    def consumer(name,q):
        while 1:
            food = q.get()
            print('33[31m%s消费了%s33[0m'%(name,food))
            time.sleep(random.randint(1,3))
            q.task_done()
    
    if __name__=='__main__':
        q = JoinableQueue()
        p = Process(target=producer,args=('dh','包子',q))
        p2 = Process(target=producer,args=('大黄','榴莲',q))
        c = Process(target=consumer,args=('小包',q))
        c2 = Process(target=consumer,args=('二黄',q))
        p.start()
        p2.start()
        c.daemon = True
        c2.daemon = True
        c.start()
        c2.start()
        p.join()
        p2.join()
    生产者消费者模型 joinablequeue

    管道:

    管道的返回值是两个管道口,对应输入与输出,可以实现进程之间的通信

    若管道口关闭,仍从管道内取值,管道中没有值时就会抛出异常

    #生产者消费者模型用管道实现
    
    from multiprocessing import Pipe,Process,Lock
    import time
    import random
    
    def producer(con,pro,name,food):
        con.close()
        for i in range(10):
            fo = '%s%s'%(food,i)
            print('%s生产了%s'%(name,fo))
            pro.send(fo)
            time.sleep(random.randint(1,3))
        pro.close()
    def consumer(con,pro,name,lock):
        pro.close()
        while 1:
            try:
                lock.acquire()
                food = con.recv()
                lock.release()
                print('%s吃了%s'%(name,food))
                time.sleep(random.randint(1,3))
            except EOFError:
                con.close()
                break
    
    
    if __name__=='__main__':
        con,pro = Pipe()
        lock = Lock()
        p = Process(target=producer,args=(con,pro,'dh','包子'))
        p.start()
        c = Process(target=consumer,args=(con,pro,'大黄',lock))
        c.start()
        # c2 = Process(target=consumer,args=(con,pro,'大黄2',lock))
        # c2.start()           
        con.close()
        pro.close()

    当存在多个消费者时,就是出现多个消费者抢资源,从而数据不安全。

    解决方法就是加锁

    管道加锁就实现了队列。

    进程中的数据共享

    rom multiprocessing import Manager,Process,Lock

    def main(dic,l):
    l.acquire()
    dic['con']-=1
    l.release()


    if __name__=='__main__':
    m = Manager()
    l = Lock()
    dic = m.dict({'con':100}) # 数据共享
    p_lst = []
    for i in range(50):
    p = Process(target=main,args=(dic,l))
    p.start()
    p_lst.append(p)
    for i in p_lst:i.join()
    print(dic)

    正常情况下,执行完数据的结果应该是50,但实际上每次的结果可能都不同

    说明数据共享的同时,有子进程同时拿到了数据,进行了重复的操作

    数据共享是不安全的,解决方法是加锁

    进程池:

    每开启一个子进程,都要开启一个属于这个子进程内存空间

    操作系统调度效率降低

    进程池:在python中创建一个装进程的池子,指定池子中有一定数量的进程,这些进程被创建好,等待被使用

    每次进入一定数量的进程,一个处理完话另一个进

    #    进程池 map方法 与开启多进程时间上的比较
    # from multiprocessing import Process,Pool
    # import time
    #
    # def fun(n):
    #     print(n+1)
    #
    # if __name__=='__main__':
    #     st = time.time()
    #     pool = Pool(5)       # 进程池 提升效率
    #     pool.map(fun,range(100))
    #     t1 = time.time()-st
    #
    #     enn = time.time()
    #     p_lst = []
    #     for i in range(100):
    #         p = Process(target=fun,args=(i,))
    #         p.start()
    #         p_lst.append(p)
    #     for p in p_lst:p.join()
    #     t2 = time.time()-enn
    #     print(t1,t2)

    map方法中要放一个可迭代的

    # from multiprocessing import Pool
    # import time,os
    # def fun(n):
    #     print('%sstart'%n,os.getpid())
    #     time.sleep(0.3)
    #     print('%send'%n,os.getpid())
    #
    # if __name__=='__main__':
    #     p = Pool(5)
    #     for i in range(10):
    #         p.apply_async(fun,args=(i,))   #异步接收
    #     p.close()   # 结束进程池接受任务
    #     p.join()    # 感知进程池中的任务结束

    p.apply()为同步方法

    进程池的返回值

    # from multiprocessing import Pool
    # import time
    # def fun(i):
    #     time.sleep(0.5)
    #     return i*i
    # if __name__=='__main__':
    #     p = Pool(5)
        # for i in range(10):
            # res = p.apply(fun,args=(i,))     #同步 一个一个打印
            # print(res)
    
        # res_l = []
        # for i in range(10):
        #     res = p.apply_async(fun,args=(i,))
        #     res_l.append(res)
        # for res in res_l:           # get方法会阻塞 放在生成进程的外面
        #     print(res.get())          #异步方法 一次打印5个(进程池为5)
    
        # res = p.map(fun,range(10))
        # print(res)      # 一次打印所有[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
                        # 所有数据处理完 一块返回 结果是一个列表

    map简单调用异步函数 apply_async异步实现更强大

    进程池的回调函数

    # from multiprocessing import Pool
    # import os
    # def func1(n):
    #     print('in func1',os.getpid())
    #     return n*n
    # def func2(nn):
    #     print(nn,os.getpid())
    #
    # if __name__=='__main__':
    #     print(os.getpid())
    #     p = Pool(5)
    #     p.apply_async(func1,args=(10,),callback=func2)  #callback回调函数
    #     p.close()           # 执行func1的结果作为func2的参数传进回调函数
    #     p.join()            # 回调函数在主进程空间

    第一个函数的返回值 作为回调函数的参数进行处理

    回调函数是主进程中的操作

    回调函数常用于爬虫,子进程爬取复杂的信息,交给主进程处理,避免在切换进程过程中的网络延时

    最后,一个没反应的程序,不知道哪错了,学了后面的再回来找补。

    import requests
    from multiprocessing import Pool
    from urllib.request import urlopen
    
    # def get(url):
    #     response = requests.get(url)
    #     if response.status_code == 200:
    #         return url,response.content.decode('utf-8')
    
    def get_urllib(url):
        ret = urlopen(url)
        return ret.read().decode('utf-8')
    
    def call_back(args):
        url,content = args
        print(url,len(content))
    
    if __name__=='__main__':
        url_lst = [
            'https://www.baidu.com'
            'https://www.sohu.com'
            'https://www.sougou.com'
        ]
        p = Pool(5)
        for ult in url_lst:
            p.apply_async(get_urllib,args=(ult,),callback = call_back)
        p.close()
        p.join()
  • 相关阅读:
    Java判断字符串是否包含数字
    char 与 String 之间的转换
    hive与hbase整合方式和优劣
    曾经的你-许巍
    Hbase表重命名 表改名
    Eclipse 快键键(持续更新)
    Linux 查看一个端口的连接数
    hbase性能调优(转载)
    Hbase优化记录
    记录下Linux难记实用的命令
  • 原文地址:https://www.cnblogs.com/mu-tang/p/14194256.html
Copyright © 2011-2022 走看看