zoukankan      html  css  js  c++  java
  • 你真的会websocket吗

    Websocket

    WebSocket协议是基于TCP的一种新的网络协议。它实现了浏览器与服务器全双工(full-duplex)通信——允许服务器主动发送信息给客户端。
    WebSocket通信协议于2011年被IETF定为标准RFC 6455,并被RFC7936所补充规范。
     
    WebSocket协议支持(在受控环境中运行不受信任的代码的)客户端与(选择加入该代码的通信的)远程主机之间进行全双工通信。用于此的安全模型是Web浏览器常用的基于原始的安全模式。 协议包括一个开放的握手以及随后的TCP层上的消息帧。 该技术的目标是为基于浏览器的、需要和服务器进行双向通信的(服务器不能依赖于打开多个HTTP连接(例如,使用XMLHttpRequest或<iframe>和长轮询))应用程序提供一种通信机制。
     
    这个协议目前仍是草案,只有最新的一些浏览器可以支持它。但是,它的好处是显而易见的,随着支持它的浏览器越来越多,我们将看到它越来越流行。(和以往的Web开发一样,必须谨慎地坚持依赖可用的新功能并能在必要时回滚到旧技术的务实策略。)
     

    Django用法

    在1.9版本之后,Django实现了对Channels的支持,他所使用的是WebSocket通信,解决了实时通信的问题,而且在使用WebSocket进行通信的同时依旧能够支持HTTP通信。

    1.1目录结构

    在此结构中必须有硬性要求,具体如下:

    新的目录如下:
    |-- channels_example
    |    |--channels_example
    |        |-- __init__.py
    |        |-- settings.py
    |        |-- urls.py
    |        |-- wsgi.py
    |        |-- routing.py   #必须
    |        |-- consumer.py  #必须
    |        |-- asgi.py
    |    |-- manage.py

    1.2配置settings.py文件

    1.2.1将其添加到APP列表里

    INSTALLED_APPS = [
        'django.contrib.admin',
        'django.contrib.auth',
        'django.contrib.contenttypes',
        'django.contrib.sessions',
        'django.contrib.messages',
        'django.contrib.staticfiles',
        'channels',
    ]

    1.2.2然后,添加新的参数CHANNEL_LAYERS,如下:

    CHANNEL_LAYERS = {
        "default": {
            "BACKEND": "asgiref.inmemory.ChannelLayer",
            "ROUTING": "channels_example.routing.channel_routing",
        },
    }

    需要注意的是 ROUTING 参数,他是用来指定WebSocket表单的位置,当有WebSocket请求访问时,就会根据这个路径找到相应表单,调用相应的函数进行处理。
    channels_example.routing 就是我们刚才建好的routing,py文件,里面的channel_routing我们下面会进行填充。

    1.3填写路由映射地址

    from channels.routing import route
    import consumers
    
    channel_routing = [
        route('websocket.connect', consumers.ws_connect),        
        route('websocket.disconnect', consumers.ws_disconnect),        
        # route('websocket.receive', consumers.ws_message),        
        route('websocket.receive', consumers.ws_message_uuid),        
    ]

    1.4路由映射到相对应的函数

    from django.http import HttpResponse
    from channels.handler import AsgiHandler
    
    #message.reply_channel    一个客户端通道的对象
    #message.reply_channel.send(chunk)  用来唯一返回这个客户端
    
    #一个管道大概会持续30s
    
    
    def ws_connect(message):
        auth = True
    
        if not auth:
            reply = json.dumps({'error': error})
            message.reply_channel.send({'text': reply, 'close': True})
        else:
            reply = "{}"
            message.reply_channel.send({'text': reply})
            print(">>> %s connected" % str(message))
    
    
    def ws_disconnect(message):
        print("<<< %s disconnected" % str(message))
        # with message_queue.mutex:
        #     message_queue.queue.clear()
        while not message_queue.empty():
            try:
                message_queue.get(False)
            except Empty:
                continue
    
            message_queue.task_done()
    
    
    def ws_message_uuid(message):
        task = Task.create(message)
    
        if task:
            message_queue.put(task)

     tornado用法

    1.1Tornado的WebSocket模块

    Tornado在websocket模块中提供了一个WebSocketHandler类。这个类提供了和已连接的客户端通信的WebSocket事件和方法的钩子。当一个新的WebSocket连接打开时,open方法被调用,而on_messageon_close方法分别在连接接收到新的消息和客户端关闭时被调用。

    此外,WebSocketHandler类还提供了write_message方法用于向客户端发送消息,close方法用于关闭连接。

    class EchoHandler(tornado.websocket.WebSocketHandler):
        def open(self):
            self.write_message('connected!')
    
        def on_message(self, message):
            self.write_message(message)

    正如你在我们的EchoHandler实现中所看到的,open方法只是使用WebSocketHandler基类提供的write_message方法向客户端发送字符串"connected!"。每次处理程序从客户端接收到一个新的消息时调用on_message方法,我们的实现中将客户端提供的消息原样返回给客户端。这就是全部!让我们通过一个完整的例子看看实现这个协议是如何简单的吧。

    WebSocketHandler.open()

    当一个WebSocket连接建立后被调用。

    WebSocketHandler.on_message(message)

    当客户端发送消息message过来时被调用,注意此方法必须被重写。

    WebSocketHandler.on_close()

    当WebSocket连接关闭后被调用。

    WebSocketHandler.write_message(message, binary=False)

    向客户端发送消息messagea,message可以是字符串或字典(字典会被转为json字符串)。若binary为False,则message以utf8编码发送;二进制模式(binary=True)时,可发送任何字节码。

    WebSocketHandler.close()

    关闭WebSocket连接。

    WebSocketHandler.check_origin(origin)

    判断源origin,对于符合条件(返回判断结果为True)的请求源origin允许其连接,否则返回403。可以重写此方法来解决WebSocket的跨域请求(如始终return True)。

    1.2实例--工作websocket实际应用

    #coding=utf-8
    
    import uuid
    import os
    from works.actions import work
    import hashlib
    import json
    import Queue
    from threading import Thread
    import numpy as np
    import cv2
    import base64
    import jwt
    import tornado.gen
    from handlers.base_handler import BaseWebSocket
    from config import MEDIA_ROOT
    import time
    
    message_queue = Queue.PriorityQueue()
    
    
    def work_loop():
        while True:
            task = message_queue.get()
    
            iuuid = task.uuid
            offset_top = task.offset_top
            image_data = task.image_data
            channel = task.channel
            zoom = task.zoom
            rType = task.rType
            responseType = task.responseType
    
            print(">>> len: %d | current offset: %d" % (message_queue.qsize(), offset_top))
    
            filename = str(uuid.uuid1()) + '.jpg'
            filepath = os.path.join(MEDIA_ROOT, filename)
    
            with open(filepath, 'wb') as f:
                f.write(image_data.decode("base64"))
    
            if zoom != 1.0:
                im = cv2.imread(filepath)
    
                if im is None:
                    continue
    
                osize = im.shape[1], im.shape[0]
                size = int(im.shape[1] * zoom), int(im.shape[0] * zoom)
                im = cv2.resize(im, size)
                cv2.imwrite(filepath, im)
    
            try:
                reply = work(filepath, use_crop=False, result=rType,responseType=responseType)
            except Exception as e:
                print("!!!!!! %s -> %s caused error" % (iuuid, filename))
                print(e)
                cmd = u"cp %s %s" % (filepath, os.path.join(MEDIA_ROOT, 'rb_' + filename))
                os.system(cmd.encode('utf-8'))
                continue
    
    
            if responseType == 'url':
                # rtn_url = 'http://101.236.17.104:3389/upload/' + 'rb_' + filename
                rtn_url = 'http://192.168.0.254:8000/upload/' + 'rb_' + filename
                reply = {'url': rtn_url, 'uuid': iuuid}
    
            reply['uuid'] = iuuid
            channel.write_message({'text': json.dumps(reply)})
            print '%s end time:' % channel, time.time()
    
    
    class BrowserWebSocket(BaseWebSocket):
    
        '''浏览器websocket服务器'''
    
    
        def open(self):
            '''新的WebSocket连接打开时被调用'''
            # message = {}
            # remote_ip = self.request.remote_ip
            # message['query_string']=self.get_argument('query_string')
            # message['remote_ip']=remote_ip
            # auth, error = verify_auth_token(message)
            auth = True
            error = 'error'
    
            if not auth:
                reply = json.dumps({'error': error})
                self.write_message({'text': reply, 'close': True})
            else:
                reply = "{}"
                self.write_message({'text': reply})
                print(">>> %s connected" % self.request.remote_ip)
    
    
        def on_message(self, message):
            '''连接收到新消息时被调用'''
            print '%s start time:'%self,time.time()
            task = Task.create(message,self)
    
            if task:
                message_queue.put(task)
    
        @tornado.gen.coroutine
        def on_messages(self, message):
            '''连接收到新消息时被调用'''
            task = Task.create(message,self)
    
            if task:
                message_queue.put(task)
    
    
        def on_close(self):
            '''客户端关闭时被调用'''
            print("<<< %s disconnected" % str(self.request.remote_ip))
            # with message_queue.mutex:
            #     message_queue.queue.clear()
            while not message_queue.empty():
                try:
                    message_queue.get(False)
                except Queue.Empty:
                    continue
    
                message_queue.task_done()
    
    
        def check_origin(self, origin):
            '''允许WebSocket的跨域请求'''
    
            return True
    
    class Task(object):
        def __init__(self, uuid, offset_top, image_data, channel, zoom, rType, responseType, *args):
            self.uuid = uuid
            self.offset_top = int(float(offset_top))
            self.image_data = image_data
            self.channel = channel
            self.zoom = zoom
            self.rType = rType
            self.responseType = responseType
    
        @classmethod
        def create(clz, message,sel):
            # data = message.get('text')
            data = message
    
            try:
                params = json.loads(data[:150])
    
                image_data = data[150:]
                image_data = image_data.replace(" ", "+")
    
                params['image_data'] = image_data
                params['channel'] = sel
    
                # add Type
                if params.get('responseType') is None:
                    params['responseType'] = 'url'
    
                # request type
                if params.get('rType') is None:
                    params['rType'] = 'rl'
    
                task = Task(**params)
    
    
            except ValueError as e:
                task = None
                print(">>>message data error!")
                print(e)
    
            return task
    
        def __cmp__(self, other):
            return cmp(self.offset_top, other.offset_top)
    
    
    
    def verify_auth_token(message):
        '''token 验证'''
    
        token = message.get('query_string')
        secret_key = 'aoiakai'
    
        try:
            payload = jwt.decode(token, secret_key, algorithms=['HS256'])
            if payload.get('ip') != message.get('remote_ip'):
                return False, 'ip mismatch'
        except jwt.ExpiredSignatureError as e:
            print(e)
            return False, 'token expired'
        except Exception as e:
            print(e)
            return False, 'enter correct token'
    
        return True, ''
    
    
    work_thread = Thread(target=work_loop)
    work_thread.daemon = True
    work_thread.start()
  • 相关阅读:
    可扩展设计的三个维度
    今天用批处理脚本遇到的两个问题
    响应式编程学习记录
    ThreadLocal使用注意
    JDK8 函数式接口
    Java多线程相关的常用接口
    java异步编程
    java多线程同步器
    paramiko获取远程主机的环境变量
    python为不同的对象如何分配内存的小知识
  • 原文地址:https://www.cnblogs.com/aylin/p/8831135.html
Copyright © 2011-2022 走看看