zoukankan      html  css  js  c++  java
  • Tornado实现一个消息墙。

    Tornado对asynchronous http有很好的支持。 所以跟着demo,总结下一个消息墙要怎么做。

    思路: 首先查了下有两种思路,一种是client pull 一种是server push。 

    这里使用的是server pull,技术就是挺流行的comet技术。

    comet大概就是说:我客户端发送一个请求到服务器端,然后服务器端启动一个无线循环,将clinet需要的数据放到response中,并继续刷新,直到整个clinet与server的连接断开。 所以他是一个基于长连接的技术。

    1.第一步 就是要做发送新消息的处理,即一个client发送一个消息后,要广播通知到所有的client

     1 class NewMessage(tornado.web.RequestHandler):
     2     #overwrite post method
     3     def post(self):
     4         #define messages to send
     5         message = {
     6              "id": str(uuid.uuid4()),
     7              "body",self.get_argument("body"),
     8         }
     9         message["html"] =  tornado.escape.to_basestring(
    10              self.render_string("message.html",message=message))
    11         if self.get_argument("next",None):
    12             self.redirect(self.get_arugment("next"))
    13         else:
    14             self.write(message)
    15        #global_message_buffer IS a global var
    16  
    17         global_message_buffer.new_message([message])

      解释: message[html] 这部分 是将message传入模板,然后返回html代码。

      最关键的是,将这个mesage广播出去。 使用的global_message_buffer. 

    global_message_buffer = MessageBuffer()

    class MessageBuffe():
        self.__init__(self):
            
            self.waiters = set()
            self.cache = []
            self.cache_size = 10 #define max message cache
        
        self.new_message(self,message):
               #send message to waiters
                for future in self.waiters:
                    future.set_result(message)
                #update waiters to empty
                self.waiters = set()
                #update cache
                self.cache.extend(message)
                #check cache size
                if len(self.cache)>self.cache_size:
                    self.cache = self.cache[-self.cache_size:]

    回来再写。。

    继续。

    MessagBuffer做的事情,waiters存储了等待消息的Future.  What is future? Placeholder for an asynchronous result.

    cache存储了已经发送了消息,当新的用户连接进来后,会把cache里面的消息推送给他。 后面写的update

    new_message做的事情也是很清楚,将新发送的消息给每一个在waiters里的future。然后更新waiters为空。 (这个地方还没有明天哪里的原因)

    然后更新cache,如果新加消息后超出了size,那么就取最新的。

    2. 新进来的用户怎么拿到cache里的东西呢?

    这个还是简单,讲所有的cache里的东西,render到主页就好了

    1 class MainHandler(object):
    2     def get():
    3         self.render("index.html", messages=global_message_buffer.cache)

    3. 难点是,如何持续接受新发布的消息呢?也就是说,我在第一步中广播出去后,所有的client怎么接受?

    在前端的,有个showMessage的方法。这个方法应该是这样被调用:每从服务器接受到一个response(也就是服务器有消息了),被调用一次,在这个area中添加一个message.html中render后的东西,也就是在第一步中render_string中做的结果。然后调用下滑动的效果-_- 前端这个不会。。

    谁调用这个showMessage呢。应该是这样的流程。 client 进入页面后,拿到cache中的message后,调用function poll(). 

    Poll() 使用ajax向后端请求update我当前的消息,当返回success后,调用showMessage。

    挑战下js

    1 function poll(){
    2     $.ajax({url:"/message/update,type:"post",datatype:"text",
    3                data:"",onSuccess:showMessage,error:error})   
    4 }            

    前端部分暂时这样。

    那么这并没有解决刚才的问题,需要的是,我要传一个东西,让服务器知道,有了消息要给这个client回复response。

    class UpdateMessage(tornado.web.RequestHanlder):
        @gen.coroutine
        def post(self):
            client_id = self.get_arguments("client_id")
            #将这个clinet加入到message_wait中去,等有了消息就返回给这个Future。
            self.future = global_message_buffer.wait_for_message(client_id)
        
            messages = yield self.future
            self.write(dict(messages=message))

     这个wait_for_message

     1 wait_for_message(self,cursor=None):
     2     #考虑如果要给新来的client返回消息的话,需要知道从cache哪个地方开始返回
     3     #因为是异步的,所以要知道你请求的时候的位置。也就是当前message的id
     4     future = Future()
     5     
     6     if cursor:
     7     
     8         count = 0
     9         
    10         for m in reversed(self.cache):
    11             #如果到了当前消息了,就不再超后面拿。数据。
    12             if m["id"] == cursor:
    13                 break
    14             
    15             count+=1
    16         if count:
    17            future.set_result(self.cache[-count:])
    18             return future
    19      #将这个future加入到需要push到的集合中去。
    20      self.waiter.add(future)
    21      return future

      到这步,我们总结下整个流程与调用。 首先是用户载入,载入后调用updateMessage。 这一步参数为 client_id,初始为None 直接将其加入waiters。到 self.messages = yield self.future这部等待future被更新,然后调用self.write(). 何时被更新呢? 是在MessageBuffer中  future.set_result(message) 这一步,此时被更新了,然后立即调用self.write().

    当一次ajax请求结束后,js中设置timeout来进行下一次连接。 直到有response,此次ajax请求才结束。

    需要注意的是,只有UpdateMessage的post方法需要@gen.coroutine

    基本流程就是这样了。

  • 相关阅读:
    第三章 Jenkins参数及web项目
    第二章 Jenkins的详细介绍
    第一章 Git+Gitlab介绍和安装
    第二章 Centos7下Airflow2.1.0安装
    第一章 Airflow基本原理
    第五章 Pinpoint-Apm常见报错
    第四章 Docker方式安装 Pinpoint
    数论练习
    CF练习
    矩阵乘法
  • 原文地址:https://www.cnblogs.com/-Doraemon/p/4878006.html
Copyright © 2011-2022 走看看