zoukankan      html  css  js  c++  java
  • nginx+flask+gevent+uwsgi实现websocket

    Websocket简介

    WebSocket是HTML5开始提供的一种在单个 TCP 连接上进行全双工通讯的协议。在WebSocket API中,浏览器和服务器只需要做一个握手的动作,然后,浏览器和服务器之间就形成了一条快速通道。两者之间就直接可以数据互相传送。浏览器通过 JavaScript向服务器发出建立 WebSocket 连接的请求,连接建立以后,客户端和服务器端就可以通过 TCP 连接直接交换数据。

    Websocket本身是一个持久化的新协议,和http没有很大的关系,但websocket是基于http协议的,借用http协议完成了部分握手。

    我们在某个linux服务器上部署了一个websocket,然后我们发起连接请求,看下请求发送的内容,对比下http请求和websocket请求的不同:

          image.png

    可以看到,主要的不同点,Connection: Upgrade;Upgrade: websocket,还有3个Sec开头的参数。前两个参数是用来告诉nginx服务,我发起的是websocket协议,而不是http,后三个参数是用来校验websocket服务是否是websocket的,以及校验websocket协议的版本。

    在消息发出后,websocket服务端返回一个response:

    Connection:upgrade

    Date:Thu, 26 Oct 2017 06:01:51 GMT

    Sec-WebSocket-Accept:3P+uxAGozJzj3vcr89gxFzydOu8=

    Sec-WebSocket-Origin:http://10.43.35.31:2327

    Server:nginx/1.12.0

    Upgrade:WebSocket

    当收到服务端返回,证明协议已经转换成功,后面合http再无关系了,而Sec开头的是证明我是websocket。

    为什么使用websocket

    没有websocket的时候,常用的实时同步数据的方式一般有两种:1、轮询,即每隔一定的时间客户端就向服务端发起一次请求,客户端根据返回数据更新界面,这需要服务器有很快的处理速度和资源。2、长链接,即客户端向服务端发送查询请求,如果服务端无数据,则一直阻塞,直到有数据返回,返回后客户端再重复此过程,这种需要很高的并发。

    这两种方式都需要不断的建立http连接,服务端不能主动推送数据给客户端,比较被动,比较耗资源,比如多次请求的HTTP头部,TCP连接复用会导致的线头阻塞(即由于前面数据丢失导致的后续数据堆积)。

    Websocket很好的解决了上面的问题:只需要在建立连接的时候发送一次http请求,后面的交互都不需要再发了;WebSocket的连接是双向通信的连接,在同一个TCP连接上,既可以发送,也可以接收;几个不同的URI可以复用同一个WebSocket连接。

    Websocket基础架构及实现

     本次websocket的搭建考虑性能问题,采取的是flask+python+uwsgi+gevent+nginx的基础架构。采用uwsgi+gevent的模式启动websocket进程,为gevent指定并发为100,web框架使用小而轻的Flask,使用virtualenv创建虚拟环境,保证websocket不影响主工程,且可以单独发布部署,本进程的依赖为Flask,gevent和uwsgi。

    Nginx需要分别为http和https增加配置

    image.png

    上面是nginx的配置,nginx是从1.3开始支持websocket的,upgrade将http连接升级到websocket连接,Upgrade机制使用了Upgrade协议头和Connection协议头,因此当代理服务器拦截到来自客户端的Upgrade请求时,代理服务器需要将自己的Upgrade请求发送给后端服务器,包括适合的请求头。Nginx通过在客户端和后端服务器之间建立隧道来支持WebSockets通信。为了让Nginx可以将来自客户端的Upgrade请求发送到后端服务器,Upgrade和Connection的头信息必须被显式的设置。

    Webserver采用的是Flask,下面简单介绍下本工程代码实现,包含几个部分:

    1、客户端部分,客户端采用

    ws = new WebSocket('ws://' + document.domain + ':' + location.port + '/websocket');或

    ws = new WebSocket('wss://' + document.domain + ':' + location.port + '/websocket');来建立连接,并监听onopen,onmessage,onclose事件。

    2、服务端部分

    首先创建一个flask的app,然后传入GeventWebSocket,得到一个websocket的对象。

    app = Flask(__name__)
    ws = GeventWebSocket(app)

    GeventWebSocket继承了websocket,重写了init_app,因为要使用gevent,重写主要是为了打上猴子补丁,还替换了websocket中间件,也是为了适配gevent。

    class GeventWebSocket(WebSocket):
        middleware = GeventWebSocketMiddleware

        def init_app(self, app):
            logger.info("init_app")
            aggressive = app.config.get('UWSGI_WEBSOCKET_AGGRESSIVE_PATCH', True)
            patch_all(aggressive=aggressive)
            super(GeventWebSocket, self).init_app(app)

    Websocket模块主要为了按要求启动websocket服务端,主要的3个函数:

    route:主要是为view添加url规则;

    def route(self, rule, **options):
        def decorator(f):
            endpoint = options.pop('endpoint', None)
            self.add_url_rule(rule, endpoint, f, **options)
            return f
        return decorator

    init_app:修改flask的app的属性,使在启动app时能以uwsgi启动,并重写wsgi_app,供uwsgi进行调用。

    def init_app(self, app):
        self.app = app
        app.wsgi_app = self.middleware(app.wsgi_app, self)
        app.run = lambda **kwargs: self.run(**kwargs)

    run:以uwsgi方式启动app,并指定相关启动函数。

    def run(self, app=None, debug=False, host='127.0.0.1', port=5000, uwsgi_binary=None, **kwargs):
        if not app:
            app = self.app.name + ':app'

        if self.app.debug:
            debug = True
        run_uwsgi(app, debug, host, port, uwsgi_binary, **kwargs)

    中间件替换为了子类的中间件,主要是提供了wsgi_app供uwsgi进行调用。wsgi server工作流程如下:服务器创建socket,监听端口,等待客户端连接;当有请求来时,服务器解析客户端信息放到环境变量environ中,并调用绑定的handler来处理请求;handler解析这个http请求,将请求信息例如method,path等放到environ中;wsgi handler再将一些服务器端信息也放到environ中,最后服务器信息,客户端信息,本次请求信息全部都保存到了环境变量environ中;wsgi handler 调用注册的wsgi app,并将environ和回调函数传给wsgi app;wsgi app 将reponse header/status/body 回传给wsgi handler;handler还是通过socket将response信息塞回给客户端。

    Wsgi app先通过environ获取到请求的url,然后定位到具体调用的函数,

    urls = self.websocket.url_map.bind_to_environ(environ)
    try:
        endpoint, args = urls.match()
        handler = self.websocket.view_functions[endpoint]
    except HTTPException:
        handler = None

    然后,根据environ获取websocket_key,之后与客户端完成握手,

    uwsgi.websocket_handshake(environ['HTTP_SEC_WEBSOCKET_KEY'],
                              environ.get('HTTP_ORIGIN', ''))

    然后创建基于gevent的发送事件、队列和监听事件、队列;然后创建client来操控这两个队列完成发送和接收。

    send_event = Event()
    send_queue = Queue()
    recv_event = Event()
    recv_queue = Queue()

    client = self.client(environ, uwsgi.connection_fd(), send_event,
                         send_queue, recv_event, recv_queue,
                         self.websocket.timeout)

    然后启动协程,创建发送消息的tcp链路,

    handler = spawn(handler, client, **args)
    def listener(client):
        select([client.fd], [], [], client.timeout)
        recv_event.set()
    listening = spawn(listener, client)

    进入等待,一旦监听到发送或接收事件,则调用uwsgi方法对消息进行处理,如果handler完成或者被杀死,则终止监听,

    wait([handler, send_event, recv_event], None, 1)
    # handle send events
    if send_event.is_set():
        try:
            while True:
                uwsgi.websocket_send(send_queue.get_nowait())
        except Empty:
            send_event.clear()
        except IOError:
            client.connected = False

    # handle receive events
    elif recv_event.is_set():
        recv_event.clear()
        try:
            message = True
            while message:
                message = uwsgi.websocket_recv_nb()
                recv_queue.put(message)
            listening = spawn(listener, client)
        except IOError:
            client.connected = False

    # handler done, we're outta here
    elif handler.ready():
        listening.kill()
        return ''

    这里面使用的client实例主要就实现了对于队列的操作和事件状态的变更,

    def send(self, msg, binary=True):
        if binary:
            return self.send_binary(msg)
        self.send_queue.put(msg)
        self.send_event.set()

    def receive(self):
        return self.recv_queue.get()

    def close(self):
        self.connected = False

    然后就是app里使用websocket的方法了,我在全局生命了一个client的list,用来存储已连接的wensocket对象,每次建立连接,都会将新的连接对象存进去,

    client = []

    @ws.route('/websocket')
    def start(channel):
        global client
        client.append(channel)
        channel.send('websocket is running')
        while True:
            channel.receive()

    又起了一个view,定义为post,接受我们自己工程的消息上报,每次消息上报只上报正在连接的对象,已经不连接的从列表删除,

    @app.route('/send', methods=['POST'])
    def send_msg():
        message = request.data
        send_all_client(message)
        delete_close_client()
        return 'SUCCESS'

    所有需要上报的消息,统一走这个rest接口通知就可以,app会将消息上报到每个连接的客户端,最后为app指定gevent线程数和监听端口,

    if __name__ == '__main__':
        app.run(gevent=100, port=5000)

    每次通过python app.py启动工程即可,代码会自动拼装启动命令,如下:

    /home/ngomm/websocket/ENV/bin/uwsgi --http 127.0.0.1:5000 --http-websockets --virtualenv /home/ngomm/websocket/ENV --gevent 100 --master --wsgi app:app

    因为希望可以独立部署,所以为工程创建了虚拟环境,该工程依赖的包为Flask (0.12.2),gevent (1.1.2),uWSGI (2.0.11.2)。

  • 相关阅读:
    记录s标签范例
    链表问题总结
    Hibernate学习总结
    HDU2460-Network
    CF464C-Substitutes in Number
    CF666E-Forensic Examination
    CF373C-Counting Kangaroos is Fun
    CF558E-A Simple Task
    HDU5669-Road
    CF341D-Iahub and Xors
  • 原文地址:https://www.cnblogs.com/small-office/p/8682872.html
Copyright © 2011-2022 走看看