zoukankan      html  css  js  c++  java
  • day 39

    在单线程实现并发,形成时,采用的是导入一个第三方模块,gevent模块,结合一个monkey补丁使用来完成单线程并发,

    实现协程。

    在使用补丁时一定要放在程序还没运行之前,要是已经运行起来了,再导入使用补丁就毫无意义了。

    原理就是,将任务中所有IO阻塞改成非阻塞。这样就可以当遇到阻塞时就及时切换到其他任务上去来实现将持有cpu的时间

    最大化利用。

    而我们也可以自己来实现主动避开IO,遇到时就切换到其他任务上去。

    首先是最常见的阻塞IO模型,当程序遇到IO操作时就会在原地等待IO结束,再去执行其他任务,这样的效率是低下的,所以,

    会开启多进程或者多线程来提高效率,虽然有一定效果,但是并没有解决IO阻塞的问题,遇到IO操作还是会停在原地等待。效率并没有太大的提升,因为IO阻塞就是影响效率的最大障碍。

    为了解决这个问题,想到了非阻塞IO,导入socket模块,将setblocking的默认值由True改为False这样无论程序遇到任何IO操作都及时避开去执行其他任务,但是这样服务器就会不停的运作这样就导致cpu的占用率过高,可能导致其他程序没有办法及时执行。并且程序代码如果处理的细节不够的话,如果有一方,意外出现强制下线的情况,可能会导致程序崩溃掉。

    这个时候可以借助列表来对连接到服务器的客服端进行一个存储,当对方往服务器发送消息时就通过遍历存储连接客户端的列表,来循环接收发来的消息,这里需要做一个异常处理,防止收消息时有客户端下线。同样的对于往客户端发送消息时也是一样的。也是借助列表,做异常处理的判断来对程序进行保护,但是,这样还是不能解决非IO模型情况下,cpu占用率过高的问题。

    新的解决办法就是从IO本身来入手,IO模式的两种模式:waitdata和copydata,首先客户端往服务器发送信息就是有一个copydata产生,但是copydata花费的时间非常少,可以忽略不计他是客户端将要发送的信息copy一份到操作系统内存中,不存在waitdata阶段,在服务器接收信息就是设计两个阶段了,这样就会导致运行的效率降低,我们可以利用好waitdata阶段来提高效率,使用select模块。首先也要基于模式是非阻塞状态下,这里使用select的好处是可以不过多的占用cpu的情况下,提高效率,实现协程。

    首先导入select模块,把列表作为参数放入select.select中它会返回一个可读或者可写的列表。将连接过来的socket对象。放入列表中监听。首先对装有socket对象的列表进行遍历,并循环接收客户端发来的数据,这里也要做一个异常判断。

    防止对方下线,将异常捕获到来保护程序,这里因为是单线程并发来执行任务,当意外发生以后不能使用break要让其他的热舞继续走下去。使用continue。将没有值或者下线的socket加入单独的一个列表中,等循环接收后删除。不能对正在遍历的列表进行删除操作。再将有值的scket加入可写的列表中,做数据处理,再返回给客户端,索要做的异常处理和接收数据时是一样的。

    from socket import *
    import select
    
    s = socket()
    s.bind(('127.0.0.1',8080))
    s.listen(5)
    s.setblocking(False)
    # print(s)
    
    r_list=[s,]
    w_list=[]
    w_data={}
    while True:
        print('被检测r_list: ',len(r_list))
        print('被检测w_list: ',len(w_list))
        rl,wl,xl=select.select(r_list,w_list,[],) #r_list=[server,conn]
    
        # print('rl: ',len(rl)) #rl=[conn,]
        # print('wl: ',len(wl))
    
        # 收消息
        for r in rl: #r=conn
            if r == s:
                conn,addr=r.accept()
                r_list.append(conn)
            else:
                try:
                    data=r.recv(1024)
                    if not data:
                        r.close()
                        r_list.remove(r)
                        continue
                    # r.send(data.upper())
                    w_list.append(r)
                    w_data[r]=data.upper()
                except ConnectionResetError:
                    r.close()
                    r_list.remove(r)
                    continue
    
        # 发消息
        for w in wl:
            w.send(w_data[w])
            w_list.remove(w)
            w_data.pop(w)

    但是可以处理的并发量是有限的最大可以支持1024个并发。

    为了扩展并发量可以使用epoll模块。epoll是在linux中提供的多路复用io模型,仅在linux下可以实现。

    程序的阻塞过程

    假设系统目前运行了三个进程 A B C

    进程A正在运行一下socket程序

    server = socket.socket()
    server.bind(("127.0.0.1",1688))
    server.listen()
    server.accept()

    1.系统会创建文件描述符指向一个socket对象 ,其包含了读写缓冲区,已经进行等待队列

    2.当执行到accept / recv 时系统会讲进程A 从工作队列中移除

    3.将进程A的引用添加到 socket对象的等待队列中

     

     ![image-20190611012015309](https://ws1.sinaimg.cn/large/006tNc79gy1g3wkhb4sgmj30ke0dkgm9.jpg)

    select监控多个socket

    select的实现思路比较直接

    1.先将所有socket放到一个列表中,

    2.遍历这个列表将进程A 添加到每个socket的等待队列中 然后阻塞进程

    3.当数据到达时,cpu执行中断程序将数据copy给socket 同时唤醒处于等待队列中的进程A

    为了防止重复添加等待队列 还需要移除已经存在的进程A

    4.进程A唤醒后 由于不清楚那个socket有数据,所以需要遍历一遍所有socket列表


    从上面的过程中不难看出

     

    1.select,需要遍历socket列表,频繁的对等待队列进行添加移除操作,

    2.数据到达后还需要给遍历所有socket才能获知哪些socket有数据

    两个操作消耗的时间随着要监控的socket的数量增加而大大增加,

    处于效率考虑才规定了最大只能监视1024个socket

    epol l要解决的问题

    1.避免频繁的对等待队列进行操作
    2.避免遍历所有socket

     

    对于第一个问题我们先看select的处理方式
    ##### 
    
    ```python
    while True:
        r_list,w_list,x_list = select.select(rlist,wlist,xlist)
      
    ```
    
    每次处理完一次读写后,都需要将所用过冲重复一遍,包括移除进程,添加进程,默认就会将进程添加到等待队列,并阻塞住进程,然而等待队列的更新操作并不频繁,
    
    所以对于第一个问题epoll,采取的方案是,将对等待队列的维护和,阻塞进程这两个操作进行拆分,
    
    相关代码如下
    
    ```python
    import socket,select
    server = socket.socket()
    server.bind(("127.0.0.1",1688))
    server.listen(5)
    
    #创建epoll事件对象,后续要监控的事件添加到其中
    epoll = select.epoll()
    #注册服务器监听fd到等待读事件集合
    epoll.register(server.fileno(), select.EPOLLIN)   # 需要关注 server这个socket的可读事件
    
    # 等待事件发生
    while True:
        for sock,event in epoll.poll():
        pass
    ```
    
    在epoll中register 与 unregister函数用于维护等待队列
    
    register  是进程添加到等待队列中  unregister 把进程从等待队列中删除   
    
    使用这两个函数我们自己来控制等待队列的添加和删除 从而避免频繁操作等待队列  
    
    
    
    epoll.poll则用于阻塞进程
    
    
    
    ![image-20190611024356728](https://ws2.sinaimg.cn/large/006tNc79gy1g3wmwdiyhxj30q409egm3.jpg)
    
    这样一来就避免了 每次处理都需要重新操作等待队列的问题 
    第二个问题是select中进程无法获知哪些socket是有数据的所以需要遍历
    epol为了解决这个问题,在内核中维护了一个就绪列表,
    
    1.创建epoll对象,epoll也会对应一个文件,由文件系统管理
    
    2.执行register时,将epoll对象 添加到socket的等待队列中
    
    ![image-20190611032427955](https://ws3.sinaimg.cn/large/006tNc79gy1g3wo2jnv9vj30kr0du3yx.jpg)
    
    3.数据到达后,CPU执行中断程序,将数据copy给socket
    
    4.在epoll中,中断程序接下来会执行epoll对象中的回调函数,传入就绪的socket对象 
    
    5.将socket,添加到就绪列表中 
    
    6.唤醒epoll等待队列中的进程,
    
    进程唤醒后,由于存在就绪列表,所以不需要再遍历socket了,直接处理就绪列表即可 
    
    
    
    解决了这两个问题后,并发量得到大幅度提升,最大可同时维护上万级别的socket

    epoll相关函数

    ## 
    
    ```python
    import select 导入select模块
    
    epoll = select.epoll() 创建一个epoll对象
    
    epoll.register(文件句柄,事件类型) 注册要监控的文件句柄和事件
    
    事件类型:
    
      select.EPOLLIN    可读事件
    
      select.EPOLLOUT   可写事件
    
      select.EPOLLERR   错误事件
    
      select.EPOLLHUP   客户端断开事件
    
    epoll.unregister(文件句柄)   销毁文件句柄
    
    epoll.poll(timeout)  当文件句柄发生变化,则会以列表的形式主动报告给用户进程,timeout
    
                         为超时时间,默认为-1,即一直等待直到文件句柄发生变化,如果指定为1
    
                         那么epoll每1秒汇报一次当前文件句柄的变化情况,如果无变化则返回空
    
    epoll.fileno() 返回epoll的控制文件描述符(Return the epoll control file descriptor)
    
    epoll.modfiy(fineno,event) fineno为文件描述符 event为事件类型  作用是修改文件描述符所对应的事件
    
    epoll.fromfd(fileno) 从1个指定的文件描述符创建1个epoll对象
    
    epoll.close()   关闭epoll对象的控制文件描述符
    ```

    案例

    #coding:utf-8
    #客户端
    #创建客户端socket对象
    import socket
    clientsocket = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    #服务端IP地址和端口号元组
    server_address = ('127.0.0.1',1688)
    #客户端连接指定的IP地址和端口号
    clientsocket.connect(server_address)
    
    while True:
        #输入数据
        data = raw_input('please input:')
        if data == "q":
            break
        if not data:
          continue
        #客户端发送数据
        clientsocket.send(data.encode("utf-8"))
        #客户端接收数据
        server_data = clientsocket.recv(1024)
        print ('客户端收到的数据:',server_data)
    #关闭客户端socket
    clientsocket.close()

    服务器

    # coding:utf-8
    import socket, select
    
    server = socket.socket()
    server.bind(("127.0.0.1", 1688))
    server.listen(5)
    
    msgs = []
    
    
    fd_socket = {server.fileno(): server}
    epoll = select.epoll()
    # 注册服务器的 写就绪
    epoll.register(server.fileno(), select.EPOLLIN)
    
    while True:
        for fd, event in epoll.poll():
            sock = fd_socket[fd]
            print(fd, event)
            # 返回的是文件描述符 需要获取对应socket
            if sock == server:  # 如果是服务器 就接受请求
                client, addr = server.accept()
                # 注册客户端写就绪
                epoll.register(client.fileno(), select.EPOLLIN)
                # 添加对应关系
                fd_socket[client.fileno()] = client
    
            # 读就绪
            elif event == select.EPOLLIN:
                data = sock.recv(2018)
                if not data:
                    # 注销事件
                    epoll.unregister(fd)
                    # 关闭socket
                    sock.close()
                    # 删除socket对应关系
                    del fd_socket[fd]
                    print(" somebody fuck out...")
                    continue
    
                print(data.decode("utf-8"))
                # 读完数据 需要把数据发回去所以接下来更改为写就绪=事件
                epoll.modify(fd, select.EPOLLOUT)
                #记录数据
                msgs.append((sock,data.upper()))
            elif event == select.EPOLLOUT:
                for item in msgs[:]:
                    if item[0] == sock:
                        sock.send(item[1])
                        msgs.remove(item)
                # 切换关注事件为写就绪
                epoll.modify(fd,select.EPOLLIN)

     

  • 相关阅读:
    如何通过wlst部署应用程序到weblogic12c上
    Oracle数据库账户口令复杂度-等保测评之身份鉴别
    等保测评中与oracle有关的工作
    如何做好项目管理工作
    weblogic 安全漏洞 CVE-2017-5638
    中间件中文技术文档2018年3月-5月更新内容
    Admin Console 反应慢的相关bug
    HotSpot jdk 资料汇总
    weblogic patch log显示
    Where Can I Download Full Installers for WebLogic Server
  • 原文地址:https://www.cnblogs.com/1624413646hxy/p/11005877.html
Copyright © 2011-2022 走看看