zoukankan      html  css  js  c++  java
  • 进程

    1、操作系统的作用:

    隐藏丑陋复杂的硬件接口,提供良好的抽象接口

    管理调度进程,让进程对硬件的竞争变得有序

      进程的调度算法:

        FCFS先来先服务调度算法

        段作业优先调度算法

        时间片轮转法

        多级反馈队列

      进程的并行与并发:

        并行:是从微观上,也就是在一个精确的时间片刻,有不同的程序在执行,这就要求必须有多个处理器。(资源够用,比如三个线程,四核的CPU )

        并发:是从宏观上,在一个时间段上可以看出是同时执行的,比如一个服务器同时处理多个session。

    2、同步和异步

        所谓同步就是一个任务的完成需要依赖另外一个任务时,只有等待被依赖的任务完成后,依赖的任务才能算完成,这是一种可靠的任务序列。要么成功都成功,失败都失败,两个任务的状态可以保持一致。

        所谓异步是不需要等待被依赖的任务完成,只是通知被依赖的任务要完成什么工作,依赖的任务也立即执行,只要自己完成了整个任务就算完成了。至于被依赖的任务最终是否真正完成,依赖它的任务无法确定,所以它是不可靠的任务序列

        同步:一个进程的执行依赖于另外的进程,只有被依赖的进程执行完,自己的任务才执行。

        异步:只通知被以来的进程完成什么工作,不需要等待被以来的进程完成。

    3、开一个多进程

    from  multiprocessing  import Process
    def func(i):
        print("输出:%s"%i)
    if __name__ == '__main__':
        for  i in range (10):
            p=Process(target=func,args=(i,))
            p.start()
        print("你好")
    

     p.join()的使用

    from  multiprocessing  import Process
    import time
    def func(i):
        print("输出:%s"%i)
    if __name__ == '__main__':
        p_list=[]
        for  i in range (20):
            p=Process(target=func,args=(i,))
            p_list.append(p)
            p.start()
        for p  in p_list:  #[p.join() for p  in p_list]
            p.join()
        print('结束')
    

    p.join()感知一个子进程的结束,将异步程序同步

    用类的方式开启一个多进程的第二种方式

    from multiprocessing import Process
    import os
    class Myprocess(Process):
    def run(self):
    print(os.getpid())
    if __name__ == '__main__':
    for i in range(20):
    print('主',os.getpid())
    p1=Myprocess()
    p1.start()

     自定义继承类中必须有一个run方法,run方法是在子进程中自行的程序

    打印进度条

    import time
    def run(i):
        num=str(i)
        print('*'*i+num+'%',end='\r')
    for  i  in  range(101):
        run(i)
        time.sleep(0.1)
    

     使用多进程实现socket服务端的并发效果

    server端

    import socket
    from  multiprocessing  import Process
    def  server(conn):
        conn.send('你好'.encode('utf-8'))
        info = conn.recv(1024).decode('utf-8')
        print(info)
        conn.close()
    
    if __name__ == '__main__':
        sk = socket.socket()
        sk.bind(('127.0.0.1',8080))
        sk.listen()
        while True:
            conn,addr = sk.accept()
            q = Process(target=server,args=(conn,))
            q.start()
        sk.close()
    

     client端

    import socket
    sk = socket.socket()
    sk.connect(('127.0.0.1',8080))
    ret = sk.recv(1024).decode('utf-8')
    print(ret)
    msg = input("》》》》").encode('utf-8')
    sk.send(msg)
    sk.close()
    

     主进程在执行完自己的代码之后,不会立马结束,而是等待子进程结束之后,回收子进程资源

    守护进程 由子进程 转化而来   伴随着主进程的结束而结束

    from  multiprocessing  import Process
    import time
    def func():
        for i in range (10):
            time.sleep(1)
            print("输出:%s"%i)
    if __name__ == '__main__':
            p=Process(target=func)
            p.daemon = True
            p.start()
            for i in range (10) :
                print("你好%s"%i)
                time.sleep(0.5)
    

     p.terminate()结束一个子进程,

    if __name__ == '__main__':
            p=Process(target=func)
            p.daemon = True
            p.start()
            print(p.is_alive())  #  判断一个进程是否活着
            p.terminate()    # 关闭一个子进程
            time.sleep(0.5)
            print(p.name)   # 查看进程的名字
            print(p.is_alive())
    

     4、进程锁     相当于将异步的程序变成同步

    对程序加锁,同一时间只能有一个进程执行程序

    from  multiprocessing  import Process,Lock
    import time
    def func(i,lock):
            lock.acquire()
            time.sleep(1)
            print("输出:%s"%i)
            lock.release()
    if __name__ == '__main__':
            lock = Lock()
            for  i in range (10):
                p=Process(target=func,args=(i,lock))
                p.start()
    

     5、信号量   规定同一时间允许几个进程来同时执行程序

    
    
    from  multiprocessing  import Process,Semaphore
    import time
    def func(i,sem):
            sem.acquire()
            time.sleep(1)
            print("输出:%s"%i)
            sem.release()
    if __name__ == '__main__':
            sem = Semaphore(3)
            for  i in range (10):
                p=Process(target=func,args=(i,sem))
                p.start()
    
    

    6、Event()   事件 通过一个信号来控制多个进程同时执行或者阻塞,一个事件被创建后默认是阻塞状态

    from  multiprocessing   import Event
    e = Event()  #实例化   默认阻塞状态
    print(1111111)
    e.set() # 将事件改为true
    print(e.is_set()) # 打印事件的状态
    e.wait()
    print(22222)
    e.clear() #将事件改为False
    e.wait()
    print(33333)

    红绿灯      事件练习    (用到进程之间的通信IPC)

    import time
    from  multiprocessing   import Event,Process
    def light(e):
        while  True:
            if e.is_set() :
                e.clear()
                print('\033[31m红灯亮了\033[0m')
                time.sleep(2)
    
            else :
                e.set()
                print('\033[32m绿灯亮了\033[0m')
                #print('\033[31m红灯亮了\033[0m')
                time.sleep(2)
    
                #print('\033[32m绿灯亮了\033[0m')
    def car(e,i):
        if not e.is_set():
            print('%s车在等待'%i)
        e.wait()
        print('%s车通过'%i)
    
    if  __name__ =='__main__':
        e = Event()
        p1 = Process(target=light,args=(e,))
        p1.start()
        for  i  in range (20):
            time.sleep(0.5)
            p2 = Process(target=car,args=(e,i))
            p2.start()
    

     7、队列  (生产者消费者模型练习)

    from multiprocessing   import Queue
    q = Queue()  # 实例化队列
    q.put() # 往队列里放数据
    q.get() # 从队列里取数据
    print(q.full())#判断队列是否满了
    print(q.empty())#判断队列是否为空
    

     生产者消费者模型

    from multiprocessing   import Process,Queue
    import time
    import random
    def producer(p,name,food):
        for i in range (10):
            time.sleep(1)
            f = '%s生产了第%s个%s'%(name,i,food)
            p.put(f)
            print(f)
    
    def constom(q,name):
        while True :
            time.sleep(2)
            food = q.get()
            if food ==None:break
            print('%s吃了一个%s'%(name,food))
    
    if __name__=='__main__':
        q = Queue()
        p1 = Process(target=producer,args=(q,'egon','包子'))
        p2 = Process(target=producer,args=(q,'xiaohong','泔水'))
        c1 = Process(target=constom,args=(q,'jin'))
        c2 = Process(target=constom,args=(q,'alex'))
        p1.start()
        p2.start()
        c1.start()
        c2.start()
        p1.join()
        p2.join()
        q.put(None)
        q.put(None)
    

     使用Joinable实现生产者消费者模型

    from multiprocessing   import Process,JoinableQueue
    import time
    import random
    def producer(p,name,food):
        for i in range (10):
            time.sleep(1)
            f = '%s生产了第%s个%s'%(name,i,food)
            p.put(f)
            print(f)
        p.join()  #阻塞状态,感知一个队列中的数据全部被处理完,才能结束
    def constom(q,name):
        while True :
            time.sleep(2)
            food = q.get()
            print('%s吃了一个%s'%(name,food))
            q.task_done()
    if __name__=='__main__':
        q = JoinableQueue()
        p1 = Process(target=producer,args=(q,'egon','包子'))
        p2 = Process(target=producer,args=(q,'xiaohong','泔水'))
        c1 = Process(target=constom,args=(q,'jin'))
        c2 = Process(target=constom,args=(q,'alex'))
        p1.start()
        p2.start()
        c1.daemon=True
        c2.daemon=True
        c1.start()
        c2.start()
        p1.join()
        p2.join()
    

     8、管道    Pipe

    def f(conn):
        try :
            while True :
                print(conn.recv())
        except EOFError:
            conn.close()
    if __name__ == '__main__':
        parent_conn, child_conn = Pipe()
        p = Process(target=f, args=(child_conn,))
        p.start()
        for i in range (20):
            ret = parent_conn.send(b'hello')
        parent_conn.close()
    

     通过管道实现生产者消费者模型

    from multiprocessing   import Process,Pipe
    import time
    import random
    def producer(pro,conn,name,food):
        conn.close()
        for i in range (10):
            time.sleep(1)
            f = '%s生产了第%s个%s'%(name,i,food)
            print(f)
            pro.send(f)
        pro.close()
    def constom(pro,conn,name):
        pro.close()
        while True :
            try:
                time.sleep(2)
                food = conn.recv()
                print('%s吃了一个%s'%(name,food))
            except EOFError:
                conn.close()
                break
    if __name__=='__main__':
        pro,conn = Pipe()
        p1 = Process(target=producer,args=(pro,conn,'egon','包子'))
        c1 = Process(target=constom,args=(pro,conn,'jin'))
        p1.start()
        c1.start()
        pro.close()
        conn.close()
    

     可能出现同一个进程拿到同一个数据,需要加锁来控制管道的行为,来避免进程之间争夺数据造成的数据不安全现象

     9、进程池
    定义一个池子,在里面放上固定数量的进程,有需求来了,就拿一个池中的进程来处理任务,等到处理完毕,进程并不关闭,而是将进程再放回进程池中继续等待任务。
    如果有很多任务需要执行,池中的进程数量不够,任务就要等待之前的进程执行任务完毕归来,拿到空闲进程才能继续执行。
    也就是说,池中进程的数量是固定的,那么同一时间最多有固定数量的进程在运行。
    这样不会增加操作系统的调度难度,还节省了开闭进程的时间,也一定程度上能够实现并发效果。

    进程池的同步调用和异步调用
    import os,time
    from multiprocessing import Pool
    
    def work(n):
        print('%s run' %os.getpid())
        time.sleep(1)
        return n**2
    
    if __name__ == '__main__':
        start_time = time.time()
        p=Pool(3) #进程池中从无到有创建三个进程,以后一直是这三个进程在执行任务
        res_l=[]
        for i in range(10):
            res=p.apply(work,args=(i,)) # 同步调用,直到本次任务执行完毕拿到res,等待任务work执行的过程中可能有阻塞也可能没有阻塞
            print(res)                            # 但不管该任务是否存在阻塞,同步调用都会在原地等着
        p.close()
        end_time = time.time()
        print(end_time-start_time)
    同步方式调用进程池
    
    
    import os
    import time
    import random
    from multiprocessing import Pool
    
    def work(n):
        print('%s run' %os.getpid())
        time.sleep(3)
        return n**2
    if __name__ == '__main__':
        start_time = time.time()
        p=Pool(5) #进程池中从无到有创建三个进程,以后一直是这三个进程在执行任务
        res_l=[]
        for i in range(10):
            res=p.apply_async(work,args=(i,)) # 异步运行,根据进程池中有的进程数,每次最多3个子进程在异步执行
            print(res)                                 # 返回结果之后,将结果放入列表,归还进程,之后再执行新的任务
                                              # 需要注意的是,进程池中的三个进程不会同时开启或者同时结束
                                              # 而是执行完一个就释放一个进程,这个进程就去接收新的任务。
            res_l.append(res)
    
        # 异步apply_async用法:如果使用异步提交的任务,主进程需要使用jion,等待进程池内任务都处理完,然后可以用get收集结果
        # 否则,主进程结束,进程池可能还没来得及执行,也就跟着一起结束了
        p.close()
        p.join()
        # end_time = time.time()
        # print(end_time-start_time)
        for res in res_l:
            print(res.get()) #使用get来获取apply_aync的结果,如果是apply,则没有get方法,因为apply是同步执行,立刻获取结果,也根本无需get
    异步方式调用进程池
    import os
    import time
    import random
    from multiprocessing import Pool
    
    def work(n):
        print('%s run' %os.getpid())
        time.sleep(3)
        return n**2
    if __name__ == '__main__':
        start_time = time.time()
        p=Pool(5) #进程池中从无到有创建三个进程,以后一直是这三个进程在执行任务
        p.map(work,range(30))
    p.map进程池
    用进程池实现socket服务端的并发效果
    server端
    import socket
    from  multiprocessing import  Pool
    def  chat(conn):
        conn.send(b'hello')
        print(conn.recv(1024).decode('utf-8'))
        conn.close()
    
    if __name__ =='__main__':
        p = Pool(5)
        sk = socket.socket()
        sk.bind(('127.0.0.1',8080))
        sk.listen()
    
        while True :
             conn,addr = sk.accept()
             p.apply_async(chat,args=(conn,))
        sk.close()
    
    
    

     client端

    import socket
    sk = socket.socket()
    sk.connect(('127.0.0.1',8080))
    ret = sk.recv(1024).decode('utf-8')
    print(ret)
    msg = input('>>>>: ').encode('utf-8')
    sk.send(msg)
    sk.close()
    
    import sys,os,io
    sys.stdout=io.TextIOWrapper(sys.stdout.buffer,encoding='gb18030')
    from multiprocessing import Pool
    import requests
    import json
    import os
    
    def get_page(url):
        print('<进程%s> get %s' %(os.getpid(),url))
        respone=requests.get(url)
        if respone.status_code == 200:
            return {'url':url,'text':respone.text}
    
    def pasrse_page(res):
        print('<进程%s> parse %s' %(os.getpid(),res['url']))
        parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text']))
        with open('db.txt','a+') as f:
            f.write(parse_res)
    if __name__ == '__main__':
        urls=[
            'https://www.baidu.com',
            'https://www.python.org',
            'https://www.openstack.org',
            'https://help.github.com/',
            'http://www.sina.com.cn/'
        ]
        p=Pool(3)
        res_l=[]
        for url in urls:
            res=p.apply_async(get_page,args=(url,),callback=pasrse_page)
            res_l.append(res)
        p.close()
        p.join()
        print([res.get() for res in res_l]) #拿到的是get_page的结果,其实完全没必要拿该结果,该结果已经传给回调函数处理了
    使用进程池请求多个URL减少网络等待时间
    import sys,os,io
    sys.stdout=io.TextIOWrapper(sys.stdout.buffer,encoding='gb18030')
    import re
    from urllib.request import urlopen
    from multiprocessing import Pool
    def get_page(url,pattern):
        response=urlopen(url).read().decode('utf-8')
        return pattern,response
    
    def parse_page(info):
        pattern,page_content=info
        res=re.findall(pattern,page_content)
        for item in res:
            dic={
                'index':item[0].strip(),
                'title':item[1].strip(),
                'actor':item[2].strip(),
                'time':item[3].strip(),
            }
            print(dic)
    if __name__ == '__main__':
        regex = r'<dd>.*?<.*?class="board-index.*?>(\d+)</i>.*?title="(.*?)".*?class="movie-item-info".*?<p class="star">(.*?)</p>.*?<p class="releasetime">(.*?)</p>'
        pattern1=re.compile(regex,re.S)
        url_dic={
            'http://maoyan.com/board/':pattern1,
        }
        p=Pool()
        res_l=[]
        for url,pattern in url_dic.items():
            res=p.apply_async(get_page,args=(url,pattern),callback=parse_page)
            res_l.append(res)
        for i in res_l:
            i.get()
    爬虫实例,爬取猫眼热门电影
    
    
    

    I can feel you forgetting me。。 有一种默契叫做我不理你,你就不理我

  • 相关阅读:
    D2. Remove the Substring (hard version)(思维 )
    暑假集训
    AcWing:167. 木棒(dfs + 剪枝)
    AcWing:109. 天才ACM(倍增 + 归并排序)
    AcWing:99. 激光炸弹(前缀和)
    B. Interesting Array(线段树)
    Best Reward HDU
    G. Swapping Places
    How many HDU
    GSS4&&花仔游历各国
  • 原文地址:https://www.cnblogs.com/weidaijie/p/9772499.html
Copyright © 2011-2022 走看看