zoukankan      html  css  js  c++  java
  • python select

    server

      1 #!/usr/bin/env python3
      2 # -*- coding: utf-8 -*-
      3 """
      4 @author: zengchunyun
      5 """
      6 import socket
      7 import select
      8 import queue
      9 import sys
     10 import time
     11 
     12 
     13 class MyServer(object):
     14     def __init__(self, server_address):
     15         """
     16         初始化服务器配置
     17         :param server_address:
     18         :return:
     19         """
     20         self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)  # 创建TCP socket
     21         self.socket.setblocking(False)  # 设置非阻塞
     22         self.server_address = server_address  # 设置服务器IP端口
     23 
     24         self.readlist = []  # 生成可读列表,当有可接收消息时,说明有可连接请求发送消息到此服务器
     25         self.writelist = []  # 生成可写列表,当该队列含有对象时,说明可以向该对象发送消息,
     26         self.message_queue = {}  # 生成消息队列字典,以socket:queue.Queue形式存储接收的请求信息
     27         self.recv_buffer = 1024  # 设置接收的缓冲区大小
     28         self.bind()  # 服务器绑定IP端口
     29 
     30     def bind(self):
     31         """
     32         绑定服务器IP端口,最大监听5个队列
     33         :return:
     34         """
     35         self.socket.bind(self.server_address)
     36         sys.stdout.write("starting up on {} port {}
    ".format(*self.server_address))
     37         sys.stdout.flush()
     38         self.socket.listen(5)
     39         self.readlist.append(self.socket)  # 将服务器socket实例添加到可读事件列表
     40 
     41     def serve_forever(self, interval=0.5):
     42         """
     43         开始轮询事件
     44         :param interval: 轮询超时时间,单位s
     45         :return:
     46         """
     47         while self.readlist:  # 由于绑定服务器端口时已加入元素,所以该条件成立
     48             try:
     49                 #  每次都轮询下面事件列表,当有事件触发时,则继续执行,否则一直阻塞,如果设置了超时时间,则超时后,继续执行
     50                 readlist, writelist, exceptionlist = select.select(self.readlist, self.writelist, self.readlist, interval)
     51             except ValueError:  # 出现该错误 filedescriptor out of range in select(),说明文件句柄已耗尽
     52                 time.sleep(10)
     53                 continue
     54             if not (readlist, writelist, exceptionlist):  # 如果没有事件被触发,三个列表都是空的
     55                 continue  # 当三个事件列表都没有被触发都为空,则不继续往下执行
     56             for sock in readlist:  # 轮询可读列表,开始接收客户端发来对消息
     57                 if sock is self.socket:
     58                     request, client_address = sock.accept()  # 当服务器本身实例可读时,说明有新连接请求接入
     59                     sys.stdout.write("new connection from {} port {}
    ".format(*client_address))
     60                     sys.stdout.flush()
     61                     request.setblocking(False)  # 设置非阻塞模式
     62                     self.readlist.append(request)  # 将新的socket连接请求实例加入到可读列表,下次该客户端发送消息时,由select轮询处理
     63                     self.message_queue[request] = queue.Queue()  # 以socket实例命名生成一个队列实例,存储该客户端发来的消息
     64                 else:  # 只有之前建立过连接的客户端才不会触发服务器自身的socket对象,即该对象不是服务器自身socket对象,而是新连接生成对的对象
     65                     data = sock.recv(self.recv_buffer)  # 如果可读事件不等于服务器本身socket实例,则说明有客户端发送消息过来了
     66                     if data:  # 如果接收到新消息,则说明客户端发送消息过来了
     67                         sys.stdout.write("received [{}] from {} port {}
    ".format(data, *sock.getpeername()))
     68                         sys.stdout.flush()
     69                         self.message_queue[sock].put(data)  # 将客户端发来的消息放入它对应的队列里
     70                         if sock not in self.writelist:  # 并且,如果它没有被放进可写列表,则先添加到该列表,然后接下来统一处理该列表
     71                             self.writelist.append(sock)  # 当收到该客户端消息,不进行立即回复,先加入到可写事件列表
     72                     else:  # 如果没有消息,说明客户端断开连接了
     73                         sys.stdout.write("closing client {} port {}
    ".format(*sock.getpeername()))  # 由于收到空消息,说明客户端已断开
     74                         sys.stdout.flush()
     75                         if sock in self.writelist:  # 由于客户端断开连接,则需要清除该socket实例,避免发送异常
     76                             self.writelist.remove(sock)  # 将该客户端从可写列表移除,避免回复客户端时由于断开了,造成阻塞
     77                         self.readlist.remove(sock)  # 从可读事件列表移除不存在的客户端
     78                         sock.close()  # 关闭该连接
     79                         del self.message_queue[sock]  # 删除该客户端的消息队列
     80 
     81             for sock in writelist:  # 轮询可写列表,该列表仅存储还没有对客户端请求回复的对象
     82                 try:
     83                     get_msg = self.message_queue[sock].get_nowait()  # 开始获取客户端发来的数据,由于数据队列可能为空,避免阻塞使用nowait()方法
     84                 except queue.Empty:  # 如果队列为空,可能会触发队列空异常,需要处理该异常,避免影响其他客户端连接
     85                     sys.stdout.write("queue is empty
    ")
     86                     sys.stdout.flush()
     87                     self.writelist.remove(sock)  # 将该客户端从可写事件移除,即不需要对该客户端发送消息了
     88                 except KeyError:  # 并发时,可能出现此问题
     89                     pass
     90                 else:  # 表示没有异常,则说明获取到队列消息了
     91                     sys.stdout.write("beginning send message to client {} port {}
    ".format(*sock.getpeername()))
     92                     sys.stdout.flush()
     93                     sock.send(get_msg)  # 直接将用户发来的消息返回给客户端
     94 
     95             for sock in exceptionlist:  # 轮询异常事件列表
     96                 sys.stdout.write("handling exception condition from {} port {}
    ".format(*sock.getpeername()))
     97                 sys.stdout.flush()
     98                 self.readlist.remove(sock)  # 移除异常列表对象
     99                 if sock in self.writelist:  # 由于客户端异常,所以如果还未对客户端回复消息,则不需要再进行回复了,直接移除该客户端
    100                     self.writelist.remove(sock)
    101                 sock.close()  # 关闭该客户端连接
    102                 del self.message_queue[sock]  # 删除该客户端的消息队列
    103 
    104 
    105 if __name__ == "__main__":
    106     server = ("0.0.0.0", 9999)
    107     servermq = MyServer(server)
    108     servermq.serve_forever()

    client

     1 #!/usr/bin/env python3
     2 # -*- coding: utf-8 -*-
     3 """
     4 @author: zengchunyun
     5 """
     6 
     7 import socket
     8 import sys
     9 import threading
    10 
    11 
    12 class MyClient(object):
    13     def __init__(self, server_address):
    14         self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    15         self.server_address = server_address
    16         self.recv_buffer = 1024
    17         self.connect()
    18 
    19     def connect(self):
    20         self.socket.connect(self.server_address)
    21         sys.stdout.write("connecting to {} port {}
    ".format(*self.socket.getpeername()))
    22         sys.stdout.flush()
    23 
    24     def client_forever(self, data=b""):
    25         while True:
    26             # data = bytes(input("请输入: "), "utf8")
    27             if type(data) is not bytes:
    28                 data = bytes(str(data), "utf8")
    29             self.socket.send(data)
    30             received_data = self.socket.recv(self.recv_buffer)
    31             if received_data:
    32                 sys.stdout.write("received {} from {} port {}
    ".format(received_data, *self.socket.getpeername()))
    33                 sys.stdout.flush()
    34                 break
    35             else:
    36                 sys.stdout.write("closing socket {} port {}
    ".format(*self.socket.getpeername()))
    37                 self.socket.close()
    38 
    39 
    40 def run(data):
    41     server = ("127.0.0.1", 9999)
    42     clientmq = MyClient(server)
    43     clientmq.client_forever(data)
    44 
    45 
    46 if __name__ == "__main__":
    47     for i in range(50000):
    48         t = threading.Thread(target=run, args=(i,))
    49         t.start()
    50         print("has been send {} times".format(i))
  • 相关阅读:
    字符编码与函数
    linux打印彩色字
    企业级docker仓库Harbor部署
    PyPI使用国内源
    CentOS 7.2 升级内核支持 Docker overlay 网络模式
    购物车2
    购物车
    定制 cobbler TITLE 信息
    06.密码错误3次锁定
    05.for循环语句
  • 原文地址:https://www.cnblogs.com/zengchunyun/p/5291480.html
Copyright © 2011-2022 走看看