zoukankan      html  css  js  c++  java
  • Python Select模型(程序流程)(转)

    缘由 
    之前写socket的CS模型代码,都是利用最原始的多线程方式。服务端是主线程,接到客户端的连接请求就从线程池中获取一个线程去处理整个socket连接的所有操作,虽然在连接数较短的情况下没有什么影响,但是当连接数很大的情况下,线程的切换和线程池的大小问题就明显起来了。

    问题 
    应该存在一种方式可以让一个线程去处理多个连接,在连接有事情做的时候才过去处理,不然的话线程就挂起,让线程的利用率更高,于是后来学习了select以及epoll。在这里我重点总结一下select

    select的原理 
    select是监听触发机制,监听可读和可写队列。当有事件发生时,进行遍历轮询事件发生的对象然后返回 
    比如在服务端启动之后把服务端添加到select的可读监听队列中,当有客户端请求连接服务端时,select函数会返回一个可读事件再让服务端接受客户端的连接。 
    select的返回方式可以是阻塞或者是非阻塞,非阻塞式的select处理方式是轮询的,会不断询问占用Cpu太多的资源和时间,所以建议使用阻塞等待返回的方式去使用select

    优点: 
    没有了多线程的创建、销毁、切换带来的效率和内存上的消耗 
    缺点 
    select存在一个最大可监听文件描述符数量,所以会收到最大连接监听的限制 
    select在事件发生以后也是需要遍历监听对象,并不能直接定位到哪个对象,这个操作在对象数量庞大的情况下是个效率的瓶颈。所以后来有了epoll

    程序流程描述: 
    1. 在创建了server的socket以后,通过无限循环监听select事件执行相应的操作 
    2. 客户端请求连接服务端,inputs非空,outputs为空,为queue增添客户端对象 ,readable非空,writeable为空 
    3. 服务端接收客户端发送的数据,为outputs增添消息数据,readable不为空,writeable为空,如果接收失败说明客户端断开了连接,则终止监听此客户端的消息,回到第1步等待客户端连接 
    4. outputs非空触发select事件,readable为空,writeable不为空,将queue中的数据发送到客户端,如果queue为空触发清除outputs操作 
    5. 等待客户端发送数据,回到第2步

    过程流程图

    服务端代码:

    #!/usr/bin/env python
    #*-* coding:utf-8 *-*
    import select
    import socket
    import Queue
    import time
    import os
    
    class Server():
        #创建socket 套接字
        def __init__(self):
            self.server = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
            self.server.setblocking(False)
            #配置参数
            self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            self.server_address= ('127.0.0.1',8008)
            self.server.bind(self.server_address)
            self.server.listen(10)
            self.inputs = [self.server]
            self.outputs = []
            self.message_queues = {}
            #timeout = 20
    
        def run(self):
            while self.inputs:
                print "=================================="
                print "waiting for next event"
                print "inputs",self.inputs
                print "outputs",self.outputs
                print "queue", self.message_queues
            #readable , writable , exceptional = select.select(inputs, outputs, inputs, timeout)  最后一个是超时,当前连接要是超过这个时间的话,就会kill
                readable , writable , exceptional = select.select(self.inputs, self.outputs, self.inputs)
                print "readable , writable , exceptional",readable , writable , exceptional
                # When timeout reached , select return three empty lists
                if not (readable or writable or exceptional) :
                    print "Time out ! "
                    break;
                for s in readable :
                    if s is self.server:
                        #通过self.inputs查看是否有客户端来
                        connection, client_address = s.accept()
                        print "    connection from ", client_address
                        connection.setblocking(0)
                        self.inputs.append(connection)
                        self.message_queues[connection] = Queue.Queue()
                    else:
                        try:
                            data = s.recv(1024)
                        except:
                            print "  closing", client_address
                            if s in self.outputs :
                                self.outputs.remove(s)
                            self.inputs.remove(s)
                            s.close()
                            #清除队列信息
                            del self.message_queues[s]
                        else:
                            if data :
                                print " received " , data , "from ",s.getpeername()
                                self.message_queues[s].put(data)
                                # Add output channel for response
                                if s not in self.outputs:
                                    self.outputs.append(s)
                for s in writable:
                    try:
                        next_msg = self.message_queues[s].get_nowait()
                    except Queue.Empty:
                        print " " , s.getpeername() , 'queue empty'
                        self.outputs.remove(s)
                    else:
                        print " sending " , next_msg , " to ", s.getpeername()
                        os.popen('sleep 5').read()
                        s.send(next_msg)
    
                for s in exceptional:
                    print " exception condition on ", s.getpeername()
                    #stop listening for input on the connection
                    self.inputs.remove(s)
                    if s in self.outputs:
                        self.outputs.remove(s)
                    s.close()
                    #清除队列信息
                    del self.message_queues[s]
    
    if __name__ == "__main__":
        s = Server()
        s.run()
    

    客户端代码:

    # -*- coding:UTF-8 -*-
    import socket
    import os
    import signal
    import time, threading
    def SendDataToServer(sock):
        while True:
            put = str(raw_input())
            sock.send(put)
    
    def ReceiveData(sock):
        while True:
            try:
                data = sock.recv(1024)
                time.sleep(1)
            except:
                print "Server Down!"
                os._exit(0)
            else:
                if data:
                    print "RECEIVE:",data
    
    if __name__ == "__main__":
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        s.connect(('127.0.0.1', 8008))
        send_tick = threading.Thread(target=SendDataToServer, args=(s,))
        rec_tick = threading.Thread(target=ReceiveData, args=(s,))
        send_tick.start()
        rec_tick.start()
  • 相关阅读:
    LeetCode 25 Reverse Nodes in k-Group
    圆桌派:家世背景对人的影响有多大
    BibTex 学习笔记
    R parallel包实现多线程1
    IIS学习笔记
    高效完成R代码
    圆桌派 :我们,朋友一生一起走
    高文欣个人简介
    R语言函数话学习笔记5
    git学习笔记1
  • 原文地址:https://www.cnblogs.com/Justin-Tim/p/9645567.html
Copyright © 2011-2022 走看看