zoukankan      html  css  js  c++  java
  • [Python 网络编程] TCP编程/群聊服务端 (二)

    群聊服务端

    需求分析:

    1. 群聊服务端需支持启动和停止(清理资源);

    2. 可以接收客户端的连接; 接收客户端发来的数据

    3. 可以将每条信息分发到所有客户端

    1) 先搭架子:

    #TCP Server
    import threading,logging,time,random,datetime
    DATEFMT="%H:%M:%S"
    FORMAT = "[%(asctime)s]	 [%(threadName)s,%(thread)d] %(message)s"
    logging.basicConfig(level=logging.INFO,format=FORMAT,datefmt=DATEFMT)
    
    class ChatServer:
        def __init__(self):
            pass
    
        def start(self):
            pass
    
        def stop(self):
            pass
    
        def _accept(self):
            pass
    
        def _recv(self):
            #接收数据,TODO 分发
            pass
    

      

    2)基础功能:

    #TCP Server
    import threading,logging,time,random,datetime,socket
    DATEFMT="%H:%M:%S"
    FORMAT = "[%(asctime)s]	 [%(threadName)s,%(thread)d] %(message)s"
    logging.basicConfig(level=logging.INFO,format=FORMAT,datefmt=DATEFMT)
    
    class ChatServer:
        def __init__(self,ip='127.0.0.1',port=9999): # 初始化socket
            self.addr = (ip,port)
            self.sock = socket.socket()
    
    
        def start(self): # 绑定ip地址/端口,启动监听
            self.sock.bind(self.addr)
            self.sock.listen()
            # accept默认阻塞
            threading.Thread(target=self._accept,name='accept').start()
    
        def stop(self):
            pass
    
        def _accept(self):# 接收传入的连接
            conn,client = self.sock.accept()
            # recv默认阻塞
            threading.Thread(target=self._recv, args=(conn,),name='recv').start()
    
        def _recv(self,conn): #接收数据,TODO 分发
            data = conn.recv(1024)
    
    
    cs = ChatServer()
    cs.start()
    

      

    3)功能完善

    3.1 循环接收所有连接,将接收数据原文分发给所有客户端

    #TCP Server
    import threading,logging,time,random,datetime,socket
    DATEFMT="%H:%M:%S"
    FORMAT = "[%(asctime)s]	 [%(threadName)s,%(thread)d] %(message)s"
    logging.basicConfig(level=logging.INFO,format=FORMAT,datefmt=DATEFMT)
    
    class ChatServer:
        def __init__(self,ip='127.0.0.1',port=9999): # 初始化socket
            self.addr = (ip,port)
            self.sock = socket.socket()
    
            self.clients = {} #
    
    
        def start(self): # 绑定ip地址/端口,启动监听
            self.sock.bind(self.addr)
            self.sock.listen()
            # accept默认阻塞
            threading.Thread(target=self._accept,name='accept').start()
    
        def stop(self):
            pass
    
        def _accept(self):# 接收传入的连接
            conn,client = self.sock.accept()
            self.clients[client] = conn # (ip,port)二元组
            # conn = <socket.socket fd=264, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 9999),raddr=('127.0.0.1', 11688)>
            # client = ('127.0.0.1', 11688)
            logging.info("{}-{}".format(conn,client))
            # recv 默认阻塞
            threading.Thread(target=self._recv, args=(conn,),name='recv').start()
    
        def _recv(self,conn): # 循环接收数据,TODO 分发
            while True:
                data = conn.recv(1024)
                logging.info(data.decode())
                msg = "ACK {}".format(data.decode())
                for c in self.clients.values():
                    c.send(msg.encode())
    
    
    
    cs = ChatServer()
    cs.start()
    
    e = threading.Event()
    def showthreads():
        while not e.wait(5):
            logging.info(threading.enumerate())
    
    showthreads()
    
    #运行结果:
    [15:55:39]	 [MainThread,7304] [<_MainThread(MainThread, started 7304)>, <Thread(accept, started 7776)>]
    

      

    #检查服务端状态
    C:Userszhangsan>netstat -an | find "9999"
      TCP    127.0.0.1:9999         0.0.0.0:0              LISTENING
    

      

    客户端连接:

    发送消息:"hello"

    #服务端运行状态变化
    [15:55:54]	 [MainThread,7304] [<_MainThread(MainThread, started 7304)>, <Thread(accept, started 7776)>]
    [15:55:58]	 [accept,7776] <socket.socket fd=404, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 9999), raddr=('127.0.0.1', 11863)>-('127.0.0.1', 11863)
    [15:55:59]	 [MainThread,7304] [<_MainThread(MainThread, started 7304)>, <Thread(recv, started 6788)>] #accept线程不见了,先不关心
    [15:56:04]	 [MainThread,7304] [<_MainThread(MainThread, started 7304)>, <Thread(recv, started 6788)>]
    [15:56:07]	 [recv,6788] hello
    

      

    客户端断开连接:

    客户端主动断开连接,服务器抛出了ConnectionAbortedError异常:

    #异常
    [15:58:57]	 [recv,6788] 
    Exception in thread recv:
    Traceback (most recent call last):
      File "C:UserszhangpengAppDataLocalProgramsPythonPython35lib	hreading.py", line 914, in _bootstrap_inner
        self.run()
      File "C:UserszhangpengAppDataLocalProgramsPythonPython35lib	hreading.py", line 862, in run
        self._target(*self._args, **self._kwargs)
      File "C:/python/test.py", line 35, in _recv
        data = conn.recv(1024)
    ConnectionAbortedError: [WinError 10053] 你的主机中的软件中止了一个已建立的连接。
    

      

    3.2 修复accept线程不能循环接收连接问题

    客户端连接:

    服务器代码:

    #TCP Server
    import threading,logging,time,random,datetime,socket
    DATEFMT="%H:%M:%S"
    FORMAT = "[%(asctime)s]	 [%(threadName)s,%(thread)d] %(message)s"
    logging.basicConfig(level=logging.INFO,format=FORMAT,datefmt=DATEFMT)
    
    class ChatServer:
        def __init__(self,ip='127.0.0.1',port=9999): # 初始化socket
            self.addr = (ip,port)
            self.sock = socket.socket()
    
            self.clients = {} #
    
    
        def start(self): # 绑定ip地址/端口,启动监听
            self.sock.bind(self.addr)
            self.sock.listen()
            # accept默认阻塞
            threading.Thread(target=self._accept,name='accept').start()
    
        def stop(self):
            pass
    
        def _accept(self):# 接收传入的连接
            while True: #修复accept循环接收数据
                conn,client = self.sock.accept()
                self.clients[client] = conn # (ip,port)二元组
                # conn = <socket.socket fd=264, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 9999),raddr=('127.0.0.1', 11688)>
                # client = ('127.0.0.1', 11688)
                logging.info("{}-{}".format(conn,client))
                # recv 默认阻塞
                threading.Thread(target=self._recv, args=(conn,),name='recv').start()
    
        def _recv(self,conn): # 循环接收数据,TODO 分发
            while True:
                data = conn.recv(1024)
                logging.info(data.decode())
                msg = "ACK {}".format(data.decode())
                for c in self.clients.values():
                    c.send(msg.encode())
    
    
    cs = ChatServer()
    cs.start()
    
    e = threading.Event()
    def showthreads():
        while not e.wait(5):
            logging.info(threading.enumerate())
    
    showthreads()
    
    
    
    #运行结果
    [16:03:56]	 [MainThread,944] [<_MainThread(MainThread, started 944)>, <Thread(accept, started 660)>]
    [16:04:01]	 [MainThread,944] [<_MainThread(MainThread, started 944)>, <Thread(accept, started 660)>]
    [16:04:04]	 [accept,660] <socket.socket fd=408, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 9999), raddr=('127.0.0.1', 11988)>-('127.0.0.1', 11988)
    [16:04:06]	 [MainThread,944] [<_MainThread(MainThread, started 944)>, <Thread(recv, started 8320)>, <Thread(accept, started 660)>] #成功启动recv线程,接收数据
    [16:04:11]	 [MainThread,944] [<_MainThread(MainThread, started 944)>, <Thread(recv, started 8320)>, <Thread(accept, started 660)>]
    [16:04:12]	 [recv,8320] client1
    [16:04:15]	 [accept,660] <socket.socket fd=248, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 9999), raddr=('127.0.0.1', 11991)>-('127.0.0.1', 11991)
    [16:04:16]	 [MainThread,944] [<_MainThread(MainThread, started 944)>, <Thread(recv, started 8320)>, <Thread(accept, started 660)>, <Thread(recv, started 7200)>] #又新增一个客户端和recv线程。
    [16:04:19]	 [recv,7200] client2
    [16:04:21]	 [MainThread,944] [<_MainThread(MainThread, started 944)>, <Thread(recv, started 8320)>, <Thread(accept, started 660)>, <Thread(recv, started 7200)>]
    [16:04:26]	 [MainThread,944] [<_MainThread(MainThread, started 944)>, <Thread(recv, started 8320)>, <Thread(accept, started 660)>, <Thread(recv, started 7200)>]
    

    3.3 完善清理资源:

    #TCP Server
    import threading,logging,time,random,datetime,socket
    DATEFMT="%H:%M:%S"
    FORMAT = "[%(asctime)s]	 [%(threadName)s,%(thread)d] %(message)s"
    logging.basicConfig(level=logging.INFO,format=FORMAT,datefmt=DATEFMT)
    
    class ChatServer:
        def __init__(self,ip='127.0.0.1',port=9999):
            self.addr = (ip,port)
            self.sock = socket.socket()
            self.event = threading.Event()
    
            self.clients = {} #
    
    
        def start(self):
            self.sock.bind(self.addr)
            self.sock.listen()
            threading.Thread(target=self._accept,name='accept').start()
    
        def stop(self): # 完善清理工作
            for c in self.clients.values():
                c.close()
            self.sock.close()
            self.event.wait(1)
            self.event.set()
    
        def _accept(self):
            while not self.event.is_set():
                conn,client = self.sock.accept()
                self.clients[client] = conn
                logging.info("{}-{}".format(conn,client))
                
                threading.Thread(target=self._recv, args=(conn,),name='recv').start()
    
        def _recv(self,conn):
            while not self.event.is_set():
                data = conn.recv(1024)
                logging.info(data.decode())
                msg = "ACK {}".format(data.decode())
                for c in self.clients.values():
                    c.send(msg.encode())
    
    
    cs = ChatServer()
    cs.start()
    
    e = threading.Event()
    def showthreads():
        while not e.wait(5):
            logging.info(threading.enumerate())
    
    showthreads()
    
    
    e.wait(30)
    cs.stop()
    

      

    3.4 添加Server端主动断开和Client端通知断开机制,修复处理客户端主动断开引发的异常

    客户端发送"quit"测试主动断开功能:

    服务端代码:

    #TCP Server
    import threading,logging,time,random,datetime,socket
    DATEFMT="%H:%M:%S"
    FORMAT = "[%(asctime)s]	 [%(threadName)s,%(thread)d] %(message)s"
    logging.basicConfig(level=logging.INFO,format=FORMAT,datefmt=DATEFMT)
    
    class ChatServer:
        def __init__(self,ip='127.0.0.1',port=9999):
            self.addr = (ip,port)
            self.sock = socket.socket()
            self.event = threading.Event()
    
            self.clients = {} #
    
    
        def start(self):
            self.sock.bind(self.addr)
            self.sock.listen()
            threading.Thread(target=self._accept,name='accept').start()
    
        def stop(self):
            for c in self.clients.values():
                c.close()
            self.sock.close()
            self.event.wait(3)
            self.event.set()
    
        def _accept(self):
            while not self.event.is_set():
                conn,client = self.sock.accept()
                self.clients[client] = conn
                logging.info("{}-{}".format(conn,client))
                # recv 默认阻塞
                threading.Thread(target=self._recv, args=(conn,client),name='recv').start()
    
        def _recv(self,conn,client):
            while not self.event.is_set():
                try:
                    data = conn.recv(1024)
                except Exception as e:
                    logging.info(e)
                    data = 'quit'.encode()
    
                logging.info(data.decode())
                # Client通知退出机制
                if data.decode() == 'quit' or data.decode == '':
                    # logging.info(data.decode())
                    conn.send('Disconnect!!!'.encode())
    
                    self.clients.pop(client)
                    conn.close()
                    break
    
                msg = "ACK {}".format(data.decode())
                for c in self.clients.values():
                    c.send(msg.encode())
    
    
    
    cs = ChatServer()
    cs.start()
    
    e = threading.Event()
    def showthreads():
        while not e.wait(5):
            logging.info(threading.enumerate())
    
    threading.Thread(target=showthreads,daemon=True,name='showthreads').start()
    
    while True: # Sever控制台退出方式
        cmd = input('>>> ').strip()
        if cmd == 'quit':
            cs.stop()
            break
    
    #运行结果:
    >>> [17:32:33]	 [showthreads,8732] [<_MainThread(MainThread, started 3464)>, <Thread(showthreads, started daemon 8732)>, <Thread(accept, started 4388)>]
    [17:32:38]	 [showthreads,8732] [<_MainThread(MainThread, started 3464)>, <Thread(showthreads, started daemon 8732)>, <Thread(accept, started 4388)>]
    [17:32:43]	 [accept,4388] <socket.socket fd=360, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 9999), raddr=('127.0.0.1', 13415)>-('127.0.0.1', 13415)
    [17:32:43]	 [showthreads,8732] [<_MainThread(MainThread, started 3464)>, <Thread(showthreads, started daemon 8732)>, <Thread(recv, started 5556)>, <Thread(accept, started 4388)>]
    [17:32:47]	 [accept,4388] <socket.socket fd=384, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 9999), raddr=('127.0.0.1', 13417)>-('127.0.0.1', 13417)
    [17:32:48]	 [showthreads,8732] [<_MainThread(MainThread, started 3464)>, <Thread(showthreads, started daemon 8732)>, <Thread(recv, started 5556)>, <Thread(accept, started 4388)>, <Thread(recv, started 8248)>]
    [17:32:51]	 [recv,5556] test1
    [17:32:53]	 [showthreads,8732] [<_MainThread(MainThread, started 3464)>, <Thread(showthreads, started daemon 8732)>, <Thread(recv, started 5556)>, <Thread(accept, started 4388)>, <Thread(recv, started 8248)>]
    [17:32:55]	 [recv,8248] test2
    [17:32:58]	 [showthreads,8732] [<_MainThread(MainThread, started 3464)>, <Thread(showthreads, started daemon 8732)>, <Thread(recv, started 5556)>, <Thread(accept, started 4388)>, <Thread(recv, started 8248)>]
    [17:33:00]	 [recv,8248] quit
    [17:33:03]	 [showthreads,8732] [<_MainThread(MainThread, started 3464)>, <Thread(showthreads, started daemon 8732)>, <Thread(recv, started 5556)>, <Thread(accept, started 4388)>]
    [17:33:07]	 [recv,5556] quit
    [17:33:08]	 [showthreads,8732] [<_MainThread(MainThread, started 3464)>, <Thread(showthreads, started daemon 8732)>, <Thread(accept, started 4388)>]
    

      

    其它方法:

    socket.recv(bufsize[,flags])  获取数据。默认是阻塞的方式

    socket.recvfrom(bufsize[,flags])  获取数据,返回一个二元组(bytes,address)(可用于udp)

    socket.recv_into(buffer[,nbytes[,flags]])  获取到nbytes的数据后,存储到buffer中。如果nbytes没有指定或0,将buffer大小的数据存入buffer中。返回接收的字节数。

    socket.recvfrom_into(buffer[,nbytes[,flags]])  获取数据,返回一个二元组(bytes,address)到buffer中

    socket.send(bytes[,flags])  TCP发送数据

    socket.sendall(bytes[,flags])  TCP发送全部数据,成功返回None

    socket.sendto(string[,flag],address)  UDP发送数据

    socket.sendfile(file,offset=0,count=None)  发送一个文件直到EOF,使用高性能的os.sendfile机制,返回发送的字节数。如果win下不支持sendfile,或者不是普通文件,使用send()发送文件。offset告诉起始位置。3.5版本开始

    socket.makefile(mode='r', buffering=None, *, encoding=None, errors=None, newline=None) 

    创建一个与该套接字相关联的文件对象。

    socket.getpeername()  返回连接套接字的远程地址。返回值通常是元祖(ipaddr,port)

    socket.getsockname()  返回套接字自己的地址。通常是一个元祖(ipaddr,port)

    socket.setblocking(flag)  如果flag为0,则将套接字设为非阻塞模式,否则将套接字设为阻塞模式(默认值)。非阻塞模式下,如果调用recv()没有发现任何数据,或send()调用无法立即发送数据,那么将引起socket.error异常。

    socket.settimeout(value)  设置套接字操作的超时期,timeout是一个浮点数,单位是秒。值为None表示没有超时期。一般,超时期应该在刚创建套接字时设置,因为它们可能用于连接的操作(如connect())

    socket.setsockopt(level,optname,value)  设置套接字选项的值。比如缓冲区大小。所有SO_* 开头的常量,不同系统、不同版本都不尽相同

    4) 总结

    从需求分析,到建立框架,完善基本功能,测试/修改,最终虽然完成了一个看似基本功能已经实现的群聊服务端,

    但以上的所有例子只是测试,练习底层的socket使用,生产环境中一般都是使用封装过的socket,且程序还有很多异常没有处理。

    conn.close() 服务端主动和客户端断开

    sock.close() 服务端主动关闭服务端socket

    recv,send,close,都可能在操作过程中出现异常,客户端主动断开服务端也会抛ConnectionAbortedError异常,如果服务端不处理这个异常,客户端下次连接,服务端就不能正常recv数据。

  • 相关阅读:
    ubuntu16.04安装配置nagios
    springboot+mybatis+springmvc整合实例
    网站性能优化小结和spring整合redis
    mybatis的批量更新实例
    安装webpack和webpack打包(此文转自Henery)
    微信扫描二维码下载软件
    ubuntu16.04设置tomcat自启动
    无意中在sql日志中发现如下内容,
    实现虚拟模式的动态数据加载Windows窗体DataGridView控件 .net 4.5 (一)
    (C#)WinForm窗体间传值
  • 原文地址:https://www.cnblogs.com/i-honey/p/8093691.html
Copyright © 2011-2022 走看看