zoukankan      html  css  js  c++  java
  • socket实现并发之socketserver模块的使用

    socket实现并发:

    基于tcp的套接字,关键就是两个循环,一个链接循环,一个通信循环

    socketserver模块中分两大类:server类(解决链接问题)和request类(解决通信问题)

    server类:

     request类:

     继承关系:

     

     

    以下述代码为例,分析socketserver源码:

    ftpserver=socketserver.ThreadingTCPServer(('127.0.0.1',8080),FtpServer)
    ftpserver.serve_forever()

    查找属性的顺序:ThreadingTCPServer->ThreadingMixIn->TCPServer->BaseServer

    1. 实例化得到ftpserver,先找类ThreadingTCPServer的init,在TCPServer中找到,进而执行server_bind,server_active
    2. 找ftpserver下的serve_forever,在BaseServer中找到,进而执行self._handle_request_noblock(),该方法同样是在BaseServer中
    3. 执行self._handle_request_noblock()进而执行request, client_address = self.get_request()(就是TCPServer中的self.socket.accept()),然后执行self.process_request(request, client_address)
    4. 在ThreadingMixIn中找到process_request,开启多线程应对并发,进而执行process_request_thread,执行self.finish_request(request, client_address)
    5. 上述四部分完成了链接循环,本部分开始进入处理通讯部分,在BaseServer中找到finish_request,触发我们自己定义的类的实例化,去找init方法,而我们自己定义的类没有该方法,则去它的父类也就是BaseRequestHandler中找....

    源码分析总结:

    基于tcp的socketserver我们自己定义的类中的

    1. self.server即套接字对象
    2. self.request即一个链接
    3. self.client_address即客户端地址

    基于udp的socketserver我们自己定义的类中的

    1. self.request是一个元组(第一个元素是客户端发来的数据,第二部分是服务端的udp套接字对象),如(b'adsf', )
    2. self.client_address即客户端地址

    一、基于tcp的

    服务端:

    import socketserver
    
    class MyRequestHandle(socketserver.BaseRequestHandler):
        def handle(self):
            # 如果tcp协议,self.request=>conn
            print(self.client_address)
            while True:
                try:
                    msg = self.request.recv(1024)
                    if len(msg) == 0: break
                    self.request.send(msg.upper())
                except Exception:
                    break
            self.request.close()
    
    
    
    #  服务端应该做两件事
    # 第一件事:循环地从半连接池中取出链接请求与其建立双向链接,拿到链接对象
    s=socketserver.ThreadingTCPServer(('127.0.0.1',8889),MyRequestHandle)
    s.serve_forever()
    # 等同于
    # while True:
    #     conn,client_addr=server.accept()
    #     启动一个线程(conn,client_addr)
    
    # 第二件事:拿到链接对象,与其进行通信循环===>handle
    服务端

    客户端:

    from socket import *
    
    client=socket(AF_INET,SOCK_STREAM)
    client.connect(('127.0.0.1',8889))
    
    while True:
        msg=input('请输入命令>>:').strip()
        if len(msg) == 0:continue
        client.send(msg.encode('utf-8'))
    
        res=client.recv(1024)
        print(res.decode('utf-8'))
    客户端1
    from socket import *
    
    client=socket(AF_INET,SOCK_STREAM)
    client.connect(('127.0.0.1',8889))
    
    while True:
        msg=input('请输入命令>>:').strip()
        if len(msg) == 0:continue
        client.send(msg.encode('utf-8'))
    
        res=client.recv(1024)
        print(res.decode('utf-8'))
    客户端2
    from socket import *
    
    client=socket(AF_INET,SOCK_STREAM)
    client.connect(('127.0.0.1',8889))
    
    while True:
        msg=input('请输入命令>>:').strip()
        if len(msg) == 0:continue
        client.send(msg.encode('utf-8'))
    
        res=client.recv(1024)
        print(res.decode('utf-8'))
    客户端3

     示例:(远程执行命令,并发版)

    import socketserver
    import subprocess
    import struct
    
    class MyRequestHandle(socketserver.BaseRequestHandler):
        def handle(self):
            # 如果tcp协议,self.request=>conn
            print(self.client_address)
            print("接到来自:{}的连接".format(self.client_address[0]))
            # 循环通信
            while True:
                try:
                    cmd = self.request.recv(1024)
                    if not cmd:
                        print("客户端:{} 已经断开连接".format(self.client_address[0]))
                        break
                    print("命令名称:%s" % cmd.decode("utf-8"), )
                    obj = subprocess.Popen(cmd.decode("utf-8"), shell=True,
                                           stdout=subprocess.PIPE,
                                           stderr=subprocess.PIPE,
                                           )
                    obj.wait()
                    cmd_stat = subprocess.Popen.poll(obj)
                    if not cmd_stat:
                        cmd_data = obj.stdout.read()
                        # print("cmd_data", cmd_data)
                        # print(len(cmd_data))
                        if not cmd_data:
                            header = struct.pack("i",0)
                            self.request.send(header)
                            continue
                        cmd_data_size = len(cmd_data)
                        header = struct.pack("i", cmd_data_size)
                        self.request.send(header)
                        self.request.send(cmd_data)
                    else:
                        err_data = obj.stderr.read()
                        err_data_size=len(err_data)
                        header = struct.pack("i", err_data_size)
                        self.request.send(header)
                        self.request.send(err_data)
                except Exception:
                    print("客户端:{} 非正常退出".format(self.client_address[0]))
                    break
    
    
    #  服务端应该做两件事
    # 第一件事:循环地从半连接池中取出链接请求与其建立双向链接,拿到链接对象
    s=socketserver.ThreadingTCPServer(('127.0.0.1',9000),MyRequestHandle)
    s.serve_forever()
    # 等同于
    # while True:
    #     conn,client_addr=server.accept()
    #     启动一个线程(conn,client_addr)
    
    # 第二件事:拿到链接对象,与其进行通信循环===>handle
    server
    import socket
    import struct
    
    phone=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    
    phone.connect(("127.0.0.1",9000))
    
    #循环通信
    while True:
        cmd=input("请输入命令:>>>").strip()
        if not cmd:continue
        if cmd == "exit()":
            phone.close()
            break
        phone.send(cmd.encode("utf-8"))
        header = phone.recv(4)
        cmd_datasize=struct.unpack("i",header)[0]
        if not cmd_datasize:continue
        recv_size=0
    
        while recv_size<cmd_datasize:
            cmd_data=phone.recv(1024)
            recv_size+=len(cmd_data)
            print(cmd_data.decode("gbk"))
        # else:
        #     print("end")
    
    phone.close()
    client

    二、基于UDP的

    服务端:

    import socketserver
    
    class MyRequestHanlde(socketserver.BaseRequestHandler):
        def handle(self):
            client_data=self.request[0]
            server=self.request[1]
            client_address=self.client_address
            print('客户端发来的数据%s' %client_data)
            server.sendto(client_data.upper(),client_address)
    
    
    s=socketserver.ThreadingUDPServer(("127.0.0.1",8888),MyRequestHanlde)
    s.serve_forever()
    # 相当于:只负责循环地收
    # while True:
    #     data,client_addr=server.recvfrom(1024)
    #     启动一个线程处理后续的事情(data,client_addr)
    服务端

    客户端:

    import socket
    
    client=socket.socket(socket.AF_INET,socket.SOCK_DGRAM) # 流式协议=》tcp协议
    
    while True:
        msg=input('>>>: ').strip()
        client.sendto(msg.encode('utf-8'),('115.29.65.16',8888))
        res=client.recvfrom(1024)
        print(res)
    
    client.close()
    客户端1
    import socket
    
    client=socket.socket(socket.AF_INET,socket.SOCK_DGRAM) # 流式协议=》tcp协议
    
    while True:
        msg=input('>>>: ').strip()
        client.sendto(msg.encode('utf-8'),('115.29.65.16',8888))
        res=client.recvfrom(1024)
        print(res)
    
    client.close()
    客户端2

     

    实例:ftp

    import socketserver
    import struct
    import json
    import os
    class FtpServer(socketserver.BaseRequestHandler):
        coding='utf-8'
        server_dir='file_upload'
        max_packet_size=1024
        BASE_DIR=os.path.dirname(os.path.abspath(__file__))
        def handle(self):
            print(self.request)
            while True:
                data=self.request.recv(4)
                data_len=struct.unpack('i',data)[0]
                head_json=self.request.recv(data_len).decode(self.coding)
                head_dic=json.loads(head_json)
                # print(head_dic)
                cmd=head_dic['cmd']
                if hasattr(self,cmd):
                    func=getattr(self,cmd)
                    func(head_dic)
        def put(self,args):
            file_path = os.path.normpath(os.path.join(
                self.BASE_DIR,
                self.server_dir,
                args['filename']
            ))
    
            filesize = args['filesize']
            recv_size = 0
            print('----->', file_path)
            with open(file_path, 'wb') as f:
                while recv_size < filesize:
                    recv_data = self.request.recv(self.max_packet_size)
                    f.write(recv_data)
                    recv_size += len(recv_data)
                    print('recvsize:%s filesize:%s' % (recv_size, filesize))
    
    
    ftpserver=socketserver.ThreadingTCPServer(('127.0.0.1',8080),FtpServer)
    ftpserver.serve_forever()
    ftpserver
    import socket
    import struct
    import json
    import os
    
    
    
    class MYTCPClient:
        address_family = socket.AF_INET
    
        socket_type = socket.SOCK_STREAM
    
        allow_reuse_address = False
    
        max_packet_size = 8192
    
        coding='utf-8'
    
        request_queue_size = 5
    
        def __init__(self, server_address, connect=True):
            self.server_address=server_address
            self.socket = socket.socket(self.address_family,
                                        self.socket_type)
            if connect:
                try:
                    self.client_connect()
                except:
                    self.client_close()
                    raise
    
        def client_connect(self):
            self.socket.connect(self.server_address)
    
        def client_close(self):
            self.socket.close()
    
        def run(self):
            while True:
                inp=input(">>: ").strip()
                if not inp:continue
                l=inp.split()
                cmd=l[0]
                if hasattr(self,cmd):
                    func=getattr(self,cmd)
                    func(l)
    
    
        def put(self,args):
            cmd=args[0]
            filename=args[1]
            if not os.path.isfile(filename):
                print('file:%s is not exists' %filename)
                return
            else:
                filesize=os.path.getsize(filename)
    
            head_dic={'cmd':cmd,'filename':os.path.basename(filename),'filesize':filesize}
            print(head_dic)
            head_json=json.dumps(head_dic)
            head_json_bytes=bytes(head_json,encoding=self.coding)
    
            head_struct=struct.pack('i',len(head_json_bytes))
            self.socket.send(head_struct)
            self.socket.send(head_json_bytes)
            send_size=0
            with open(filename,'rb') as f:
                for line in f:
                    self.socket.send(line)
                    send_size+=len(line)
                    print(send_size)
                else:
                    print('upload successful')
    
    
    
    
    client=MYTCPClient(('127.0.0.1',8080))
    
    client.run()
    ftpclient
  • 相关阅读:
    Oracle 按一行里某个字段里的值分割成多行进行展示
    Property or method "openPageOffice" is not defined on the instance but referenced during render. Make sure that this property is reactive, either in the data option, or for class-based components, by
    SpringBoot 项目启动 Failed to convert value of type 'java.lang.String' to required type 'cn.com.goldenwater.dcproj.dao.TacPageOfficePblmListDao';
    Maven 设置阿里镜像
    JS 日期格式化,留作参考
    JS 过滤数组里对象的某个属性
    原生JS实现简单富文本编辑器2
    Chrome控制台使用详解
    android权限(permission)大全
    不借助第三方网站四步实现手机网站转安卓APP
  • 原文地址:https://www.cnblogs.com/baicai37/p/12745585.html
Copyright © 2011-2022 走看看