zoukankan      html  css  js  c++  java
  • python-kafka源码解析之socketpair

    socket基本操作包括:
    socket()函数创建socket文件描述符,唯一标识一个socket。
    bind()函数,将ip:port和socket绑定
    listen()函数来监听这个socket,假如客户端connect这个套接字,服务器端就回接收到这个连接请求。
    connect()函数用于和服务端建立连接
    accept()函数,服务端经过bind和listen,并且客户端connect后,服务端用accept接收这个建立连接的请求。
    read()、write()等函数,用于建立连接后的信息交互。
    详情参考:Linux Socket编程(不限Linux)

    python-kafak中vendor包下socketpair源码如下,返回在127.0.0.1本地连接的两端socket,ssock表示服务端socket,csock表示客户端socket。

     1 # pylint: skip-file
     2 # vendored from https://github.com/mhils/backports.socketpair
     3 from __future__ import absolute_import
     4 
     5 import sys
     6 import socket
     7 import errno
     8 
     9 _LOCALHOST    = '127.0.0.1'
    10 _LOCALHOST_V6 = '::1'
    11 
    12 #socketpair返回本地建立连接的两端socket
    13 if not hasattr(socket, "socketpair"):
    14     # Origin: https://gist.github.com/4325783, by Geert Jansen.  Public domain.
    15     def socketpair(family=socket.AF_INET, type=socket.SOCK_STREAM, proto=0):
    16         if family == socket.AF_INET: #ipv4
    17             host = _LOCALHOST
    18         elif family == socket.AF_INET6: #ipv6
    19             host = _LOCALHOST_V6
    20         else:
    21             raise ValueError("Only AF_INET and AF_INET6 socket address families "
    22                              "are supported")
    23         if type != socket.SOCK_STREAM:
    24             raise ValueError("Only SOCK_STREAM socket type is supported")
    25         if proto != 0:
    26             raise ValueError("Only protocol zero is supported")
    27 
    28         # We create a connected TCP socket. Note the trick with
    29         # setblocking(False) that prevents us from having to create a thread.
    30         lsock = socket.socket(family, type, proto)
    31         try:
    32             lsock.bind((host, 0)) #端口0说明由系统动态创建监听端口
    33             lsock.listen(min(socket.SOMAXCONN, 128))#SOMAXCONN定义了系统中每一个端口最大的监听队列的长度,这是个全局的参数,默认值为128,https://jaminzhang.github.io/linux/understand-Linux-backlog-and-somaxconn-kernel-arguments/
    34             # On IPv6, ignore flow_info and scope_id
    35             addr, port = lsock.getsockname()[:2]#返回这个socket的地址
    36             csock = socket.socket(family, type, proto)
    37             try:
    38                 csock.setblocking(False)#设置socket为非阻塞模式,非阻塞模式下recv没有收到数据或send数据没有立即发送出去,则会抛出异常,等价于s.settimeout(0.0)。
    39                 # 阻塞模式,会等待直到有数据或数据发送出去,等价于s.settimeout(None)
    40                 if sys.version_info >= (3, 0):
    41                     try:
    42                         csock.connect((addr, port))
    43                     except (BlockingIOError, InterruptedError):
    44                         pass
    45                 else:
    46                     try:
    47                         csock.connect((addr, port))##连接本地创建的lsock
    48                     except socket.error as e:
    49                         if e.errno != errno.WSAEWOULDBLOCK:
    50                             raise
    51                 csock.setblocking(True)#重新设置socket为阻塞模式
    52                 ssock, _ = lsock.accept()#服务端socket接收连接返回和特定客户端建立连接后的新socket ssock
    53             except Exception:
    54                 csock.close()
    55                 raise
    56         finally:
    57             lsock.close()#关闭监听套接字lsock
    58         return (ssock, csock)#返回本地互相连接的两端socket
    59 
    60     socket.socketpair = socketpair

       其中23行参数为(ip:port),端口0表示端口由操作系统自动选取监听端口;33行参数为backlog,表示服务端连接队列的大小。服务端分为两个队列,一个存放 SYN 的队列(半连接队列)、一个存放已经完成连接的队列(全连接队列)。backlog不能超过内核参数somaxconn,高并发情况下加大连接队列长度需调内核参数somaxconn

  • 相关阅读:
    字符个数统计
    面试题——字符的左右移动
    5. Longest Palindromic Substring
    Linux- AWS之EC2大数据集群定时开关机
    Openldap- 大机群身份验证服务
    Linux- 自动备份MySQL数据库脚本
    Linux- 运维
    JAVA- 切换默认的Java
    HIVE- 新建UDF范例
    Hadoop- 集群启动详解
  • 原文地址:https://www.cnblogs.com/killianxu/p/11075192.html
Copyright © 2011-2022 走看看