zoukankan      html  css  js  c++  java
  • 将socket通信变成并发的方式

    一 利用multiprocessing模块,开启多进程,实现socket通信并发

    1. 开启子进程的两种方式

    import time
    import random
    from multiprocessing import Process
    def piao(name):
        print('%s piaoing' %name)
        time.sleep(random.randrange(1,5))
        print('%s piao end' %name)
    
    
    
    p1=Process(target=piao,args=('egon',)) #必须加,号
    p2=Process(target=piao,args=('alex',))
    p3=Process(target=piao,args=('wupeqi',))
    p4=Process(target=piao,args=('yuanhao',))
    
    p1.start()
    p2.start()
    p3.start()
    p4.start()
    print('主线程')
    定义函数的方式
    #开进程的方法二:
    import time
    import random
    from multiprocessing import Process
    
    
    class Piao(Process):  #注意一定要继承Process
        def __init__(self,name):
            super().__init__()
            self.name=name
        def run(self):
            print('%s piaoing' %self.name)
    
            time.sleep(random.randrange(1,5))
            print('%s piao end' %self.name)
    
    p1=Piao('egon')
    p2=Piao('alex')
    p3=Piao('wupeiqi')
    p4=Piao('yuanhao')
    
    p1.start() #start会自动调用run
    p2.start()
    p3.start()
    p4.start()
    print('主进程')
    定义类的方式

    2.多进程实现socket并发通信

    服务端

    from socket import *
    from multiprocessing import Process
    
    server=socket(AF_INET,SOCK_STREAM)
    server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
    server.bind(('127.0.0.1',8081))
    server.listen(5)
    
    def talk(conn,client_addr):
        while True:
            try:     #若不用此句,客户端关闭时,服务端会因报错,停止
                msg=conn.recv(1024)
                if not msg:break
                conn.send(msg.upper())
            except Exception:
                break
    
    if __name__ == '__main__': #windows下start进程一定要写到这下面
        while True:
            conn,client_addr=server.accept()
            p=Process(target=talk,args=(conn,client_addr))
            p.start()

    客户端

    from socket import *
    
    client=socket(AF_INET,SOCK_STREAM)
    client.connect(('127.0.0.1',8081))
    
    
    while True:
        msg=input('>>: ').strip()
        if not msg:
            continue
        client.send(msg.encode('utf-8'))
        msg=client.recv(1024)
        print(msg.decode('utf-8'))

    存在的问题:每来一个客户端,都在服务端开启一个进程,如果并发来一个万个客户端,要开启一万个进程吗,你自己尝试着在你自己的机器上开启一万个,10万个进程试一试。

    解决办法:进程池3 进程池实现并发通信

    3 进程池实现并发通信

    使用进程池维护固定数目的进程

    #Pool内的进程数默认是cpu核数,假设为4(查看方法os.cpu_count())
    #开启6个客户端,会发现2个客户端处于等待状态
    #在每个进程内查看pid,会发现pid使用为4个,即多个客户端公用4个进程
    from socket import *
    from multiprocessing import Pool
    import os
    
    server=socket(AF_INET,SOCK_STREAM)
    server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
    server.bind(('127.0.0.1',8080))
    server.listen(5)
    
    def talk(conn,client_addr):
        print('进程pid: %s' %os.getpid())
        while True:
            try:
                msg=conn.recv(1024)
                if not msg:break
                conn.send(msg.upper())
            except Exception:
                break
    
    if __name__ == '__main__':
        p=Pool()
        while True:
            conn,client_addr=server.accept()
            p.apply_async(talk,args=(conn,client_addr))
            # p.apply(talk,args=(conn,client_addr)) #同步的话,则同一时间只有一个客户端能访问
    服务端
    from socket import *
    
    client=socket(AF_INET,SOCK_STREAM)
    client.connect(('127.0.0.1',8081))
    
    
    while True:
        msg=input('>>: ').strip()
        if not msg:
            continue
        client.send(msg.encode('utf-8'))
        msg=client.recv(1024)
        print(msg.decode('utf-8'))
    客户端

    发现:并发开启多个客户端,服务端同一时间只有4个不同的pid,干掉一个客户端,另外一个客户端才会进来,被3个进程之一处理

    二 利用threading模块,开启多线程,实现socket通信并发

    1. 开启多线程的两种方式

    #方式一
    from threading import Thread
    import time
    def sayhi(name):
        time.sleep(2)
        print('%s say hello' %name)
    
    if __name__ == '__main__':
        t=Thread(target=sayhi,args=('egon',))
        t.start()
        print('主线程')
    定义函数的方式
    #方式二
    from threading import Thread
    import time
    class Sayhi(Thread):   #注意继承Thread类
        def __init__(self,name):
            super().__init__()
            self.name=name
        def run(self):
            time.sleep(2)
            print('%s say hello' % self.name)
    
    
    if __name__ == '__main__':
        t = Sayhi('egon')
        t.start()
        print('主线程')
    定义类的方式

    2.多进程实现socket并发通信

    服务端

    from threading import Thread
    from socket import *
    
    
    server=socket(AF_INET,SOCK_STREAM)
    server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
    server.bind(('127.0.0.1',8081))
    server.listen(5)
    
    def talk(conn,addr):
        while True:
            try:
                msg=conn.recv(1024)
                if not msg:
                    break
                conn.send(msg.upper())
            except Exception:
                break
    
    if __name__ == '__main__':
        while True:
            conn,addr=server.accept()
            t=Thread(target=talk,args=(conn,addr))
            t.start()

     客户端

    from socket import *
    
    client=socket(AF_INET,SOCK_STREAM)
    client.connect(('127.0.0.1',8081))
    
    
    while True:
        msg=input('>>: ').strip()
        if not msg:
            continue
        client.send(msg.encode('utf-8'))
        msg=client.recv(1024)
        print(msg.decode('utf-8'))

    三 利用concurrent模块,开启进程池、线程池实现socket通信并发

     1. 进程池

    服务端

    from concurrent.futures import ProcessPoolExecutor
    from socket import *
    
    server=socket(AF_INET,SOCK_STREAM)
    server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
    server.bind(('127.0.0.1',8081))
    server.listen(5)
    
    def talk(conn,addr):
        while True:
            try:
                msg=conn.recv(1024)
                if not msg:
                    break
                conn.send(msg.upper())
            except Exception:
                break
    
    if __name__ == '__main__':
        p=ProcessPoolExecutor()  #不填则默认为cpu的个数
        while True:
            conn,addr=server.accept()
            p.submit(talk,conn,addr)

    客户端

    from socket import *
    
    client=socket(AF_INET,SOCK_STREAM)
    client.connect(('127.0.0.1',8081))
    
    
    while True:
        msg=input('>>: ').strip()
        if not msg:
            continue
        client.send(msg.encode('utf-8'))
        msg=client.recv(1024)
        print(msg.decode('utf-8'))
    客户端

    2. 线程池

    服务端

    from concurrent.futures import ThreadPoolExecutor
    from socket import *
    
    server=socket(AF_INET,SOCK_STREAM)
    server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
    server.bind(('127.0.0.1',8081))
    server.listen(5)
    
    def talk(conn,addr):
        while True:
            try:
                msg=conn.recv(1024)
                if not msg:
                    break
                conn.send(msg.upper())
            except Exception:
                break
    
    if __name__ == '__main__':
        p=ThreadPoolExecutor()  #不填则默认为cpu的个数*5
        while True:
            conn,addr=server.accept()
            p.submit(talk,conn,addr)

    客户端

    from socket import *
    
    client=socket(AF_INET,SOCK_STREAM)
    client.connect(('127.0.0.1',8081))
    
    
    while True:
        msg=input('>>: ').strip()
        if not msg:
            continue
        client.send(msg.encode('utf-8'))
        msg=client.recv(1024)
        print(msg.decode('utf-8'))
    客户端

    四 利用gevent模块,协程实现单线程下的socket通信并发

     通过gevent实现单线程下的socket并发(from gevent import monkey;monkey.patch_all()一定要放到导入socket模块之前,否则gevent无法识别socket的阻塞)

    from gevent import monkey;monkey.patch_all()
    from socket import *
    import gevent
    
    #如果不想用money.patch_all()打补丁,可以用gevent自带的socket
    # from gevent import socket
    # s=socket.socket()
    
    def server(server_ip,port):
        s=socket(AF_INET,SOCK_STREAM)
        s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
        s.bind((server_ip,port))
        s.listen(5)
        while True:
            conn,addr=s.accept()
            gevent.spawn(talk,conn,addr)
    
    def talk(conn,addr):
        try:
            while True:
                res=conn.recv(1024)
                print('client %s:%s msg: %s' %(addr[0],addr[1],res))
                conn.send(res.upper())
        except Exception as e:
            print(e)
        finally:
            conn.close()
    
    if __name__ == '__main__':
        server('127.0.0.1',8080)
    服务端
    from socket import *
    
    client=socket(AF_INET,SOCK_STREAM)
    client.connect(('127.0.0.1',8080))
    
    
    while True:
        msg=input('>>: ').strip()
        if not msg:continue
    
        client.send(msg.encode('utf-8'))
        msg=client.recv(1024)
        print(msg.decode('utf-8'))
    客户端
    from threading import Thread
    from socket import *
    import threading
    
    def client(server_ip,port):
        c=socket(AF_INET,SOCK_STREAM) #套接字对象一定要加到函数内,即局部名称空间内,放在函数外则被所有线程共享,则大家公用一个套接字对象,那么客户端端口永远一样了
        c.connect((server_ip,port))
    
        count=0
        while True:
            c.send(('%s say hello %s' %(threading.current_thread().getName(),count)).encode('utf-8'))
            msg=c.recv(1024)
            print(msg.decode('utf-8'))
            count+=1
    if __name__ == '__main__':
        for i in range(500):
            t=Thread(target=client,args=('127.0.0.1',8080))
            t.start()
    多线程并发多个客户端

     五 利用selectors模块,实现socket并发通信

    selectors模块,帮我们默认选择当前平台下最合适的IO多路复用模型(select、poll和epoll)

    #服务端
    from socket import *
    import selectors
    
    sel=selectors.DefaultSelector()
    def accept(server_fileobj,mask):
        conn,addr=server_fileobj.accept()
        sel.register(conn,selectors.EVENT_READ,read)
    
    def read(conn,mask):
        try:
            data=conn.recv(1024)
            if not data:
                print('closing',conn)
                sel.unregister(conn)
                conn.close()
                return
            conn.send(data.upper()+b'_SB')
        except Exception:
            print('closing', conn)
            sel.unregister(conn)
            conn.close()
    
    
    
    server_fileobj=socket(AF_INET,SOCK_STREAM)
    server_fileobj.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
    server_fileobj.bind(('127.0.0.1',8088))
    server_fileobj.listen(5)
    server_fileobj.setblocking(False) #设置socket的接口为非阻塞
    sel.register(server_fileobj,selectors.EVENT_READ,accept) #相当于网select的读列表里append了一个文件句柄server_fileobj,并且绑定了一个回调函数accept
    
    while True:
        events=sel.select() #检测所有的fileobj,是否有完成wait data的
        for sel_obj,mask in events:
            callback=sel_obj.data #callback=accpet
            callback(sel_obj.fileobj,mask) #accpet(server_fileobj,1)
    服务端
    #客户端
    from socket import *
    c=socket(AF_INET,SOCK_STREAM)
    c.connect(('127.0.0.1',8088))
    
    while True:
        msg=input('>>: ')
        if not msg:continue
        c.send(msg.encode('utf-8'))
        data=c.recv(1024)
        print(data.decode('utf-8'))
    客户端


  • 相关阅读:
    文件的上传下载
    HttpServletResponse
    HttpServletRequest
    web工程中URL地址的推荐写法
    servlet二
    Servlet
    HTTP-崔希凡笔记
    HTTP协议-引自孤傲苍狼博客
    浏览器与服务器交互的过程
    Tomcat 配置
  • 原文地址:https://www.cnblogs.com/huchong/p/7505314.html
Copyright © 2011-2022 走看看