zoukankan      html  css  js  c++  java
  • IO多路复用深入浅出

    http://www.jianshu.com/p/1020c11f016c

    Java程序员进阶三条必经之路:数据库、虚拟机、异步通信。

    前言

    从零单排高性能问题,这次轮到异步通信了。这个领域入门有点难,需要了解UNIX五种IO模型和TCP协议,熟练使用三大异步通信框架:Netty、NodeJS、Tornado。目前所有标榜异步的通信框架用的都不是异步IO模型,而是IO多路复用中的epoll。因为Python提供了对Linux内核API的友好封装,所以我选择Python来学习IO多路复用。

    IO多路复用

    1. select

      举一个EchoServer的例子,客户端发送任何内容,服务端会原模原样返回。

      #!/usr/bin/env python
      # -*- coding: utf-8 -*-
      '''
      Created on Feb 16, 2016
      
      @author: mountain
      '''
      import socket
      import select
      from Queue import Queue
      
      #AF_INET指定使用IPv4协议,如果要用更先进的IPv6,就指定为AF_INET6。
      #SOCK_STREAM指定使用面向流的TCP协议,如果要使用面向数据包的UCP协议,就指定SOCK_DGRAM。
      server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
      server.setblocking(False)
      #设置监听的ip和port
      server_address = ('localhost', 1234)
      server.bind(server_address)
      #设置backlog为5,client向server发起connect,server accept后建立长连接,
      #backlog指定排队等待server accept的连接数量,超过这个数量,server将拒绝连接。
      server.listen(5)
      #注册在socket上的读事件
      inputs = [server]
      #注册在socket上的写事件
      outputs = []
      #注册在socket上的异常事件
      exceptions = []
      #每个socket有一个发送消息的队列
      msg_queues = {}
      print "server is listening on %s:%s." % server_address
      while inputs:
          #第四个参数是timeout,可选,表示n秒内没有任何事件通知,就执行下面代码
          readable, writable, exceptional = select.select(inputs, outputs, exceptions)
          for sock in readable:
              #client向server发起connect也是读事件,server accept后产生socket加入读队列中
              if sock is server:
                  conn, addr = sock.accept()
                  conn.setblocking(False)
                  inputs.append(conn)
                  msg_queues[conn] = Queue()
                  print "server accepts a conn."
              else:
                  #读取client发过来的数据,最多读取1k byte。
                  data = sock.recv(1024)
                  #将收到的数据返回给client
                  if data:
                      msg_queues[sock].put(data)
                      if sock not in outputs:
                          #下次select的时候会触发写事件通知,写和读事件不太一样,前者是可写就会触发事件,并不一定要真的去写
                          outputs.append(sock)
                  else:
                      #client传过来的消息为空,说明已断开连接
                      print "server closes a conn."
                      if sock in outputs:
                          outputs.remove(sock)
                      inputs.remove(sock)
                      sock.close()
                      del msg_queues[sock]
          for sock in writable:
              if not msg_queues[sock].empty():
                  sock.send(msg_queues[sock].get_nowait())
              if msg_queues[sock].empty():
                  outputs.remove(sock)
          for sock in exceptional:
              inputs.remove(sock)
              if sock in outputs:
                  outputs.remove(sock)
              sock.close()
              del msg_queues[sock]
      [mountain@king ~/workspace/wire]$ telnet localhost 1234
      Trying 127.0.0.1...
      Connected to localhost.
      Escape character is '^]'.
      1
      1

      select有3个缺点:

      1. 每次调用select,都需要把fd集合从用户态拷贝到内核态,这个开销在fd很多时会很大。
      2. 每次调用select后,都需要在内核遍历传递进来的所有fd,这个开销在fd很多时也很大。
        这点从python的例子里看不出来,因为python select api更加友好,直接返回就绪的socket列表。事实上linux内核select api返回的是就绪socket数目:
        int select (int n, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
      3. fd数量有限,默认1024。
    2. poll

      采用poll重新实现EchoServer,只要搞懂了select,poll也不难,只是api的参数不太一样而已。

      #!/usr/bin/env python
      # -*- coding: utf-8 -*-
      '''
      Created on Feb 27, 2016
      
      @author: mountain
      '''
      import select
      import socket
      import sys
      import Queue
      
      server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
      server.setblocking(False)
      server_address = ('localhost', 1234)
      server.bind(server_address)
      server.listen(5)
      print 'server is listening on %s port %s' % server_address
      msg_queues = {}
      timeout = 1000 * 60
      #POLLIN: There is data to read
      #POLLPRI: There is urgent data to read
      #POLLOUT: Ready for output
      #POLLERR: Error condition of some sort
      #POLLHUP: Hung up
      #POLLNVAL: Invalid request: descriptor not open
      READ_ONLY = select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR
      READ_WRITE = READ_ONLY | select.POLLOUT
      poller = select.poll()
      #注册需要监听的事件
      poller.register(server, READ_ONLY)
      #文件描述符和socket映射
      fd_to_socket = { server.fileno(): server}
      while True:
          events = poller.poll(timeout)
          for fd, flag in events:
              sock = fd_to_socket[fd]
              if flag & (select.POLLIN | select.POLLPRI):
                  if sock is server:
                      conn, client_address = sock.accept()
                      conn.setblocking(False)
                      fd_to_socket[conn.fileno()] = conn
                      poller.register(conn, READ_ONLY)
                      msg_queues[conn] = Queue.Queue()
                  else:
                      data = sock.recv(1024)
                      if data:
                          msg_queues[sock].put(data)
                          poller.modify(sock, READ_WRITE)
                      else:
                          poller.unregister(sock)
                          sock.close()
                          del msg_queues[sock]
              elif flag & select.POLLHUP:
                  poller.unregister(sock)
                  sock.close()
                  del msg_queues[sock]
              elif flag & select.POLLOUT:
                  if not msg_queues[sock].empty():
                      msg = msg_queues[sock].get_nowait()
                      sock.send(msg)
                  else:
                      poller.modify(sock, READ_ONLY)
              elif flag & select.POLLERR:
                  poller.unregister(sock)
                  sock.close()
                  del msg_queues[sock]

      poll解决了select的第三个缺点,fd数量不受限制,但是失去了select的跨平台特性,它的linux内核api是这样的:

      int poll (struct pollfd *fds, unsigned int nfds, int timeout);
      struct pollfd { 
          int fd; /* file descriptor */
          short events; /* requested events to watch */
          short revents; /* returned events witnessed */
      };
    3. epoll

      用法与poll几乎一样。

      #!/usr/bin/env python
      # -*- coding: utf-8 -*-
      '''
      Created on Feb 28, 2016
      
      @author: mountain
      '''
      import select
      import socket
      import Queue
      
      server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
      server.setblocking(False)
      server_address = ('localhost', 1234)
      server.bind(server_address)
      server.listen(5)
      print 'server is listening on %s port %s' % server_address
      msg_queues = {}
      timeout = 60
      READ_ONLY = select.EPOLLIN | select.EPOLLPRI
      READ_WRITE = READ_ONLY | select.EPOLLOUT
      epoll = select.epoll()
      #注册需要监听的事件
      epoll.register(server, READ_ONLY)
      #文件描述符和socket映射
      fd_to_socket = { server.fileno(): server}
      while True:
          events = epoll.poll(timeout)
          for fd, flag in events:
              sock = fd_to_socket[fd]
              if flag & READ_ONLY:
                  if sock is server:
                      conn, client_address = sock.accept()
                      conn.setblocking(False)
                      fd_to_socket[conn.fileno()] = conn
                      epoll.register(conn, READ_ONLY)
                      msg_queues[conn] = Queue.Queue()
                  else:
                      data = sock.recv(1024)
                      if data:
                          msg_queues[sock].put(data)
                          epoll.modify(sock, READ_WRITE)
                      else:
                          epoll.unregister(sock)
                          sock.close()
                          del msg_queues[sock]
              elif flag & select.EPOLLHUP:
                  epoll.unregister(sock)
                  sock.close()
                  del msg_queues[sock]
              elif flag & select.EPOLLOUT:
                  if not msg_queues[sock].empty():
                      msg = msg_queues[sock].get_nowait()
                      sock.send(msg)
                  else:
                      epoll.modify(sock, READ_ONLY)
              elif flag & select.EPOLLERR:
                  epoll.unregister(sock)
                  sock.close()
                  del msg_queues[sock]

      epoll解决了select的三个缺点,是目前最好的IO多路复用解决方案。为了更好地理解epoll,我们来看一下linux内核api的用法。

      int epoll_create(int size)//创建一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大。
      int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)//注册事件,每个fd只拷贝一次。
      int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout)/*等待IO事件,事件发生时,
      内核调用回调函数,把就绪fd放入就绪链表中,并唤醒epoll_wait,epoll_wait只需要遍历就绪链表即可,
      而select和poll都是遍历所有fd,这效率高下立判。*/
  • 相关阅读:
    参数_门店
    实现百分比和百分比的累加以及A、B、C类别的标识
    参数范围的选择
    栏目数据合并表达式
    父子维度转化为组
    从参数中获得特定字符串
    多参与多轴
    数据库链接字符串大集合
    闰年2月29天
    sum函数按照类别的值进行取值
  • 原文地址:https://www.cnblogs.com/zengkefu/p/5720850.html
Copyright © 2011-2022 走看看