zoukankan      html  css  js  c++  java
  • 网络 基础 5

    网络部分大整理

    一: 基础概念

    并发编程
    进程
    进程是计算机中最小的资源分配单位
    进程与进程之间数据隔离,执行过程异步
    为什么会出现进程的概念?
    合理利用cpu,提高用户体验
    多个进程是可以同时利用多个cpu的 可以实现并行的效果

    僵尸进程
    进程 状态码 Z/z 僵尸进程 linux

    在主进程中控制子进程的方法
    子进程对象 = Process(target,args) 在创建的这一刻根本就没有通知操作系统
    子进程对象.start() 通知操作系统,开启子进程,异步非阻塞
    子进程对象.terminate() 通知操作系统,结束子进程,异步非阻塞
    子进程对象.is_alive() 查看子进程是否还活着
    子进程对象.join() 阻塞,直到子进程结束
    子进程对象.join(timeout = 10) 阻塞最多10s,期间子进程如果结束就结束阻塞,如果没结束10s之后也结束阻塞

    守护进程
    守护进程是一个子进程
    守护进程会在主进程代码结束之后才结束
    为什么会这样?
    由于主进程必须要回收所有的子进程的资源
    所以主进程必须在子进程结束之后才能结束
    而守护进程就是为了守护主进程存在的
    不能守护到主进程结束,就只能退而求其次,守护到代码结束了
    守护到主进程的代码结束,意味着如果有其他子进程没有结束,守护进程无法继续守护
    解决方案 : 在主进程中加入对其他子进程的join操作,来保证守护进程可以守护所有主进程和子进程的执行
    如何设置守护进程
    子进程对象.daemon = True 这句话写在start之前


    为什么要用锁?
    由于多个进程的并发,导致很多数据的操作都在同时进行
    所以就有可能产生多个进程同时操作 : 文件数据库 中的数据
    导致数据不安全
    所以给某一段修改数据的程序加上锁,就可以控制这段代码永远不会被多个进程同时执行
    保证了数据的安全
    Lock 锁(互斥锁)
    锁实际上是把你的某一段程序变成同步的了,降低了程序运行的速度,为了保证数据的安全性
    没有数据安全的效率都是耍流氓

    二:
    1/信号量   保证一段代码同一时刻只能有n个进程执行  流量控制   from multiprocessing import Semaphore(是类)
    import time
    import random
    from multiprocessing import Process,Semaphore
    def ktv(name,sem):
        sem.acquire()
        print("%s走进了ktv"%name)
        time.sleep(random.randint(5,10))
        print("%s走出了ktv" % name)
        sem.release()
    
    if __name__ == '__main__':
        sem = Semaphore(4)
        for i in range(100):
            p = Process(target=ktv,args = ('name%s'%i,sem))
            p.start()

    2/ 事件 Event(类)    from multiprocessing import Event  

    # 红绿灯 一种实际问题 演示
    import
    time import random from multiprocessing import Event,Process def traffic_light(e): print('33[1;31m红灯亮33[0m') while True: time.sleep(2) if e.is_set(): # 如果当前是绿灯 print('33[1;31m红灯亮33[0m') # 先打印红灯亮 e.clear() # 再把灯改成红色 else : # 当前是红灯 print('33[1;32m绿灯亮33[0m') # 先打印绿灯亮 e.set() # 再把灯变绿色 def car(e,carname): if not e.is_set(): print('%s正在等待通过'%carname) e.wait() print('%s正在通过'%carname) if __name__ == '__main__': e = Event() p = Process(target=traffic_light,args = (e,)) p.start() for i in range(100): time.sleep(random.randrange(0,3)) p = Process(target=car, args=(e,'car%s'%i)) p.start()
    e = Event()
    e 事件对象
    事件本身就带着标识 : False
    wait 阻塞
    它的阻塞条件是 对象标识为False
    结束阻塞条件是 对象标识为True
    
    对象的标识相关的 :
    set  将对象的标识设置为True
    clear 将对象的标识设置为False
    is_set 查看对象的标识是否为True
    
    Event事件
    放到进程中的代码一定不止一段
    这两个操作之间 存在同步关系
    一个操作去确认另一个操作的执行条件是否打成
    
    标识 控制wait是否阻塞的关键
    如何修改这个标识 : clear set
    如何查看这个标识 : is_set
    

    3/    进程间通信 IPC   Inter-Process Communication  

    实现进程之间通信的两种机制:
    # 管道 Pipe from multiprocessing import pipe,Process

    # 队列 Queue from multiprocessing import Queue,Process
      
    # 为什么队列为空 为满 这件事情不够准确
    # q.qsize() 队列的大小
    # q.full() 是否满了 满返回True
    # q.empty() 是否空了 空返回True
        
    from multiprocessing import Queue,Process
    
    def consumer(q):
        print(
           '子进程 :', q.get()
        )
    
    
    if __name__ == '__main__':
        q = Queue()
        p = Process(target=consumer,args=(q,))
        p.start()
        q.put('hello,world')
    

      

    # 生产者消费者模型
    import time
    from multiprocessing import Queue,Process
    
    def producer(name,food,num,q):
        '''生产者'''
        for i in range(num):
            time.sleep(0.3)
            foodi = food + str(i)
            print('%s生产了%s'%(name,foodi))
            q.put(foodi)
    
    def consumer(name,q):
        while True:
            food = q.get()   # 等待接收数据
            if food == None:break
            print('%s吃了%s'%(name,food))
            time.sleep(1)
    
    if __name__ == '__main__':
        q = Queue(maxsize=10)
        p1 = Process(target=producer,args = ('宝元','泔水',20,q))
        p2 = Process(target=producer,args = ('战山','鱼刺',10,q))
        c1 = Process(target=consumer, args=('alex', q))
        c2 = Process(target=consumer, args=('wusir', q))
        p1.start()   # 开始生产
        p2.start()   # 开始生产
        c1.start()
        c2.start()
        p1.join()    # 生产者结束生产了
        p2.join()    # 生产者结束生产了
        q.put(None)  # put None 操作永远放在所有的生产者结束生产之后
        q.put(None)  # 有几个消费者 就put多少个None

    #下面是
    JoinableQueue
    
    
    import  time
    from multiprocessing import JoinableQueue,Process

    def consumer(name,q):
    while True:
    food = q.get()
    time.sleep(1)
    print('%s消费了%s'%(name,food))
    q.task_done()

    def producer(name,food,num,q):
    '''生产者'''
    for i in range(num):
    time.sleep(0.3)
    foodi = food + str(i)
    print('%s生产了%s'%(name,foodi))
    q.put(foodi)
    q.join() # 消费者消费完毕之后会结束阻塞
    if __name__ == '__main__':
    q = JoinableQueue()
    p1 = Process(target=producer, args=('宝元', '泔水', 20, q))
    c1 = Process(target=consumer, args=('alex', q))
    c2 = Process(target=consumer, args=('wusir', q))
    c1.daemon = True
    c2.daemon = True
    p1.start()
    c1.start()
    c2.start()
    p1.join()

    # 消费者每消费一个数据会给队列发送一条信息
    # 当每一个数据都被消费掉之后 joinablequeue的join阻塞行为就会结束
    # 以上就是为什么我们要在生产完所有数据的时候发起一个q.join()

    # 随着生产者子进程的执行完毕,说明消费者的数据都消费完毕了
    # 这个时候主进程中的p1.join结束
    # 主进程的代码结束
    # 守护进程也结束了

    4/ Manager 类

    from multiprocessing import Manager,Process,Lock
    def work(d,lock):
        # with lock: #不加锁而操作共享的数据,肯定会出现数据错乱
        #     d['count']-=1
        lock.acquire()
        d['count'] -= 1
        lock.release()
    
    if __name__ == '__main__':
        lock=Lock()
        m = Manager()
        dic=m.dict({'count':100})
        p_l=[]
        for i in range(100):
            p=Process(target=work,args=(dic,lock))
            p_l.append(p)
            p.start()
        for p in p_l:
            p.join()
        print(dic)
    
    
     Manager是一个类 内部有一些数据类型能够实现进程之间的数据共享
    dict list这样的数据 内部的数字进行自加 自减 是会引起数据不安全的,这种情况下 需要我们手动加锁完成
    因此 我们一般情况下 不适用这种方式来进行进程之间的通信
    我们宁可使用Queue队列或者其他消息中间件 来实现消息的传递 保证数据的安全
    

     5/ pool 进程池 (类)

    什么是进程池? 有限的进程的池子
    为什么要用进程池?
        任务很多 cpu个数*5个任务以上
        为了节省创建和销毁进程的时间 和 操作系统的资源
    一般进程池中进程的个数:
        cpu的1-2倍
        如果是高计算,完全没有io,那么就用cpu的个数
        随着IO操作越多,可能池中的进程个数也可以相应增加
    向进程池中提交任务的三种方式
        map    异步提交任务 简便算法 接收的参数必须是 子进程要执行的func,可迭代的(可迭代中的每一项都会作为参数被传递给子进程)
            能够传递的参数是有限的,所以比起apply_async限制性比较强
        apply  同步提交任务(你删了吧)
        apply_async 异步提交任务
            能够传递比map更丰富的参数,但是比较麻烦
            首先 apply_async提交的任务和主进程完全异步
            可以通过先close进程池,再join进程池的方式,强制主进程等待进程池中任务的完成
            也可以通过get获取返回值的方式,来等待任务的返回值
                我们不能在apply_async提交任务之后直接get获取返回值
                   for i in range(100):
                        ret = p.apply_async(func,args=(i,))  # 自动带join  异步的  apply_async异步提交任务
                        l.append(ret)
                    for ret in l:
                        print(ret.get())
    
    异步方式向进程池提交任务并且获取返回值
    import time
    from multiprocessing import Pool  # 池
    def func(i):
        i -= 1
        time.sleep(1)
        return i**2
    
    # 你的池中打算放多少个进程,个数cpu的个数 * 1|2
    if __name__ == '__main__':
        p = Pool(5)
        l = []
        for i in range(100):
            ret = p.apply_async(func,args=(i,))  # 自动带join  异步的  apply_async异步提交任务
            l.append(ret)
        for ret in l:
            print(ret.get()) 
    回调函数
    import os
    import time
    import random
    from multiprocessing import Pool  # 池
    def func(i):     # [2,1,1,5,0,0.2]
        i -= 1
        time.sleep(random.uniform(0,2))
        return i**2
    
    def back_func(args):
        print(args,os.getpid())
    
    if __name__ == '__main__':
        print(os.getpid())
        p = Pool(5)
        l = []
        for i in range(100):
            ret = p.apply_async(func,args=(i,),callback=back_func)  # 5个任务
        p.close()
        p.join()
    callback回调函数
    主动执行func,然后在func执行完毕之后的返回值,直接传递给back_func作为参数,调用back_func
    处理池中任务的返回值
    回调函数是由谁执行的? 主进程
    
    5000个网页
    5个进程
    import re
    import json
    from urllib.request import urlopen
    from multiprocessing import Pool
    
    def get_page(i):
        ret = urlopen('https://movie.douban.com/top250?start=%s&filter='%i).read()
        ret = ret.decode('utf-8')
        return ret
    
    def parser_page(s):
        com = re.compile(
            '<div class="item">.*?<div class="pic">.*?<em .*?>(?P<id>d+).*?<span class="title">(?P<title>.*?)</span>'
            '.*?<span class="rating_num" .*?>(?P<rating_num>.*?)</span>.*?<span>(?P<comment_num>.*?)评价</span>', re.S)
    
        ret = com.finditer(s)
        with open('file','a',encoding='utf-8') as f:
            for i in ret:
                dic = {
                    "id": i.group("id"),
                    "title": i.group("title"),
                    "rating_num": i.group("rating_num"),
                    "comment_num": i.group("comment_num"),
                }
                f.write(json.dumps(dic,ensure_ascii=False)+'
    ')
    
    if __name__ == '__main__':
        p = Pool(5)
        count = 0
        for i in range(10):
            p.apply_async(get_page,args=(count,),callback=parser_page)
            count += 25
        p.close()
        p.join()
    
    
    import json
    with open('file2','w',encoding='utf-8') as f:
        json.dump({'你好':'alex'},f,ensure_ascii=False)
    

      

      


      
     
  • 相关阅读:
    常用到的Linux基础命令
    adb linux
    adb,monkey,perfdog的区别
    postman中的变量与使用
    fiddler教程-抓包,弱网,断点,mock等
    linux启动服务和开机自启动设置
    .NET Framework 版本和依赖关系
    C#中的Guid
    EPL II 编程打印
    Button 类
  • 原文地址:https://www.cnblogs.com/zhangchen-sx/p/10097962.html
Copyright © 2011-2022 走看看