zoukankan      html  css  js  c++  java
  • 使用jetty的continuations实现"服务器推"

     在实际的开发中,我们可能会有这样的场景:许多客户端都连接到服务器端,当有某个客户端的消息的时候,服务器端会主动"推"消息给客户端,手机app的推送是一个典型的场景(IOS的推送都是要经过苹果的服务器的,一般是通过苹果的APNS服务来实现,不需要做过多的开发,安卓的推送就需要我们自己来实现了)

    我们可选的技术方案实际上是很多的,使用netty这样的异步的网络通信框架或者servlet容器提供的异步的方案都是可以实现的,它们的理念都是一样的,异步和事件驱动,客户端请求服务器,当服务器没有需要推送的数据(或者是需要执行很长时间的IO操作)的时候,请求会被挂起,当服务器端的数据准备好的时候(例如需要向客户端推送一个消息的时候,或者是服务器端IO操作执行完毕了)请求会被重新激活,数据返回客户端.

    使用jetty的continuations或者是netty来实现这两种是我觉得比较好的实现方案,今天介绍一下如何使用jetty的continuations来实现一个服务器推的原型,和正式环境中向安卓手机的推送的实现方法是完全一样的

    continuations介绍:jetty的continuations是jetty实现的实现异步请求和事件驱动的组件,从jetty7起,continuations不止在jetty中可以使用,任何支持servlet3.0规范的servlet容器都可以使用continuations来实现异步和事件驱动,相比servlet3.0规范中的异步servlet,continuations提供了更加简化的编程模型.

    目标:用浏览器请求服务器的一个URL(用浏览器来模拟我们的客户端),实现任何时候当服务器需要推送数据的时候,浏览器能够立即显示出来

    我们需要提供两个接口:提供给客户端做长连接的接口,向客户端发送数据的接口

    提供给客户端连接的servlet:

    package com.jiaoyiping.websample.asyncServlet.jetty;
     /*
      * Created with Intellij IDEA
      * USER: 焦一平
      * Mail: jiaoyiping@gmail.com
      * Date: 2016/10/23
      * Time: 23:52
      * To change this template use File | Settings | Editor | File and Code Templates
     */
    
    import org.eclipse.jetty.continuation.Continuation;
    import org.eclipse.jetty.continuation.ContinuationListener;
    import org.eclipse.jetty.continuation.ContinuationSupport;
    import javax.servlet.ServletException;
    import javax.servlet.annotation.WebServlet;
    import javax.servlet.http.HttpServlet;
    import javax.servlet.http.HttpServletRequest;
    import javax.servlet.http.HttpServletResponse;
    import java.io.IOException;
    import java.io.OutputStream;
    import java.io.OutputStreamWriter;
    import java.io.Writer;
    import java.util.Map;
    
    @WebServlet(urlPatterns = "/pull", asyncSupported = true)
    public class ContinuationServlet extends HttpServlet {
        @Override
        protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
            String user = req.getParameter("user");
            Map<String, PushAgent> pushAgentMap = (Map<String, PushAgent>) req.getServletContext().getAttribute("agentmap");
            if (pushAgentMap.containsKey(user)) {
                PushAgent pushAgent = pushAgentMap.get(user);
                Continuation continuation = ContinuationSupport.getContinuation(req);
                continuation.setTimeout(90000000);
                //第一次请求进来
                if (continuation.isInitial()) {
                    resp.setContentType("text/evf;charset=utf-8");
                    resp.setHeader("Connection", "keep-alive");
                    resp.setHeader("Keep-Alive", "timeout=2000");
                    PushAdapter pushAdapter = new PushAdapter(continuation, pushAgent);
                    continuation.setAttribute("adapter", pushAdapter);
                    continuation.addContinuationListener(new ContinuationListener() {
                        @Override
                        public void onComplete(Continuation continuation) {
                            PushAdapter adapter = (PushAdapter) continuation.getAttribute("adapter");
                            if (null != adapter) {
                                continuation.setAttribute("adapter", null);
                            }
                        }
    
                        @Override
                        public void onTimeout(Continuation continuation) {
                            onComplete(continuation);
                        }
    
                    });
                    resp.flushBuffer();
                }
                if (continuation.isExpired()) {
                    return;
                }
                Writer writer = getWriter(resp);
                PushAdapter adapter = (PushAdapter) continuation.getAttribute("adapter");
                Message message;
                while (true) {
                    message = adapter.getPushAgent().pull();
                    if (null == message)
                        break;
                    try {
                        writer.write(message.getContent());
                        writer.flush();
                        writer.write("
    ");
                        writer.flush();
                        resp.flushBuffer();
                    } catch (Exception e) {
                        throw e;
    
                    }
                }
                //若没有该客户端的消息,则请求被挂起
                continuation.suspend();
            }
    
        }
    
        private Writer getWriter(HttpServletResponse response) throws IOException {
            OutputStream os = response.getOutputStream();
            return new OutputStreamWriter(os, "UTF-8");
        }
    
    
    }

    向客户端推送消息的servlet:

    package com.jiaoyiping.websample.asyncServlet.jetty;
    
    import javax.servlet.ServletException;
    import javax.servlet.annotation.WebServlet;
    import javax.servlet.http.HttpServlet;
    import javax.servlet.http.HttpServletRequest;
    import javax.servlet.http.HttpServletResponse;
    import java.io.IOException;
    import java.io.PrintWriter;
    import java.util.Map;
    
    /*
      * Created with Intellij IDEA
      * USER: 焦一平
      * Mail: jiaoyiping@gmail.com
      * Date: 2016/10/25
      * Time: 23:46
      * To change this template use File | Settings | Editor | File and Code Templates
     */
    @WebServlet(urlPatterns = "/send")
    public class MesssageSendServlet extends HttpServlet {
        @Override
        protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
            //不要在自己实现的servlet中调用 super.doGet(0)或者是super.doPost()
            //因为在tomcat它们的默认实现是报405(HTTP1.1)或者400(其他版本的HTTP)
            //        super.doPost(req, resp);
    
            String target = req.getParameter("target");
            String messageStr = req.getParameter("message");
    
            Map<String, PushAgent> agentMap = (Map<String, PushAgent>) req.getServletContext().getAttribute("agentmap");
            if (agentMap.keySet().contains(target)) {
                Message message = new Message();
                message.setTarget(target);
                message.setContent(messageStr);
                if (agentMap.get(target).isInited()) {
                    agentMap.get(target).onEvent(message);
                }
                agentMap.get(target).send(message);
                PrintWriter out = resp.getWriter();
                out.print("发送成功");
                out.flush();
                out.close();
            }
    
    
        }
    
        @Override
        protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
            this.doPost(req, resp);
        }
    }

    推送代理:就是可以拿到客户端相关信息,并且维护客户端消息队列的类,在这个推送代理中,我们可以加入一个监听器,当有数据需要推送的时候,激活请求

    package com.jiaoyiping.websample.asyncServlet.jetty;
     /*
      * Created with Intellij IDEA
      * USER: 焦一平
      * Mail: jiaoyiping@gmail.com
      * Date: 2016/10/25
      * Time: 22:56
      * To change this template use File | Settings | Editor | File and Code Templates
     */
    
    public interface PushAgent {
    
        Terminal getTerminal();
    
        String getAddress();
    
        String getToken();
    
        Message send(Message message);
    
        Message pull();
    
        void addListener(MessageListener messageListener);
    
        void onEvent(Message message);
    
        boolean isInited();
    }

    默认实现(每个需要接受推送的用户对应一个PushAgent,和用户端保持长连接的线程从queue里读取mesage对象,向某个用户推送的时候将message对象放到该用户对应的PushAgent的queue里,这里是一个生产者-消费者模式):

    package com.jiaoyiping.websample.asyncServlet.jetty;
     /*
      * Created with Intellij IDEA
      * USER: 焦一平
      * Mail: jiaoyiping@gmail.com
      * Date: 2016/10/25
      * Time: 23:17
      * To change this template use File | Settings | Editor | File and Code Templates
     */
    
    import java.util.PriorityQueue;
    import java.util.Queue;
    
    public class DefaultPushAgent implements PushAgent {
    
        private Terminal terminal;
        //客户端通过长连接连接到服务器时,服务器不断地从该队列poll(),若果拿到新的消息,则返回给客户端
        private Queue<Message> messages = new PriorityQueue<>();
        private MessageListener messageListener;
    
        @Override
        public Terminal getTerminal() {
            return this.terminal;
        }
    
        @Override
        public String getAddress() {
            return null;
        }
    
        @Override
        public String getToken() {
            return null;
        }
    
        @Override
        public Message send(Message message) {
            synchronized (message) {
                messages.add(message);
            }
    
            return message;
        }
    
        @Override
        public Message pull() {
            synchronized (messages) {
                return messages.poll();
            }
    
        }
    
        @Override
        public void addListener(MessageListener messageListener) {
            this.messageListener = messageListener;
        }
    
        @Override
        public void onEvent(Message message) {
            this.messageListener.onMessage(message);
        }
    
        @Override
        public boolean isInited() {
            return this.messageListener != null;
        }
    
    
        public DefaultPushAgent(Terminal terminal) {
            this.terminal = terminal;
        }
    }
    PushAdapter的实现(用户将Continuation和PushAgent关联起来):
    package com.jiaoyiping.websample.asyncServlet.jetty;
     /*
      * Created with Intellij IDEA
      * USER: 焦一平
      * Mail: jiaoyiping@gmail.com
      * Date: 2016/10/25
      * Time: 23:37
      * To change this template use File | Settings | Editor | File and Code Templates
     */
    
    import org.eclipse.jetty.continuation.Continuation;
    
    public class PushAdapter {
        private Continuation continuation;
        private PushAgent pushAgent;
    
        public PushAdapter(Continuation continuation, PushAgent pushAgent) {
            this.continuation = continuation;
            this.pushAgent = pushAgent;
            this.pushAgent.addListener(message -> {
                if (PushAdapter.this.continuation.isSuspended()) {
                    PushAdapter.this.continuation.resume();
                }
            });
        }
    
        public Continuation getContinuation() {
            return continuation;
        }
    
        public void setContinuation(Continuation continuation) {
            this.continuation = continuation;
        }
    
        public PushAgent getPushAgent() {
            return pushAgent;
        }
    
    
        public void setPushAgent(PushAgent pushAgent) {
            this.pushAgent = pushAgent;
        }
    }

    MessageListener的实现(监听需要推送消息的事件,这里为了做演示,并没有实现一个完整的观察者模式,只是在需要推送消息的时候,手工调用 onMessage()):

    package com.jiaoyiping.websample.asyncServlet.jetty;
     /*
      * Created with Intellij IDEA
      * USER: 焦一平
      * Mail: jiaoyiping@gmail.com
      * Date: 2016/10/26
      * Time: 2:03
      * To change this template use File | Settings | Editor | File and Code Templates
     */
    
    public interface MessageListener {
        void onMessage(Message message);
    }    

    测试数据:使用一个listener在应用初始化的时候,初始化一些数据做为测试数据

    package com.jiaoyiping.websample.asyncServlet.jetty;
    
    import javax.servlet.ServletContextEvent;
    import javax.servlet.ServletContextListener;
    import javax.servlet.annotation.WebListener;
    import java.util.HashMap;
    import java.util.Map;
    
    /*
      * Created with Intellij IDEA
      * USER: 焦一平
      * Mail: jiaoyiping@gmail.com
      * Date: 2016/10/25
      * Time: 23:55
      * To change this template use File | Settings | Editor | File and Code Templates
     */
    @WebListener
    public class PushListener implements ServletContextListener {
        @Override
        public void contextInitialized(ServletContextEvent sce) {
            Map<String, PushAgent> agentMap = new HashMap<>();
            agentMap.put("zhangsan", new DefaultPushAgent(new Terminal() {{
                setAddress("zhangsan");
                setToken("zhangsan_token");
            }}));
            agentMap.put("lisi", new DefaultPushAgent(new Terminal() {{
                setAddress("lisi");
                setToken("lisi_token");
            }}));
    
            sce.getServletContext().setAttribute("agentmap",agentMap);
        }
    
        @Override
        public void contextDestroyed(ServletContextEvent sce) {
    
        }
    }

    最终的效果是这样的,我截了一个git图:

  • 相关阅读:
    记录按钮点击次数,点击三次之后跳转页面
    HTML拖放
    .Net实现发送邮件功能
    HTTP 400 错误
    方法(参数的传递)
    方法
    c# 属性 (get、set)
    Python和C++交互
    从Windows远程Ubuntu
    Eclipse+Tomcat WEB开发配置
  • 原文地址:https://www.cnblogs.com/jiaoyiping/p/5979298.html
Copyright © 2011-2022 走看看