zoukankan      html  css  js  c++  java
  • 用Jetty和redis实现接入服务器adapter

    传统的服务器端为若干个客户端提供服务,一般需要开启多个服务器端进程。为了进一步提升服务器端的处理能力,可以如下图所示将服务解耦为两部分(adapter与workers),它们之间通过消息队列传输数据,其中workers处理具体业务,adapter负责接入请求以及反馈结果,具体包含下面两个工作。

    1,将所有客户端的请求发送到消息队列(进而传给后台处理)

    2,后台处理完毕后将结果返回响应队列,client adapter获取到结果后返回给相应客户端。

    流程图如下:

    我们选择用Jetty(8),redis以及php简单实现这个场景,主要用到Jetty的continuation机制和redis的list先进先出数据结构

    接入服务器

    A.1,先配置一个服务器如下,同时开启一个守护线程阻塞监听response queue(用到json lib库以及jedis库)(使用专用线程处理响应可以避免“惊群”现象,不影响业务线程)

    package test;
    
    import java.util.HashMap;
    import java.util.List;
    
    import org.eclipse.jetty.continuation.Continuation;
    import org.eclipse.jetty.server.*;
    import org.eclipse.jetty.server.nio.SelectChannelConnector;
    import org.eclipse.jetty.servlet.ServletContextHandler;
    import org.eclipse.jetty.servlet.ServletHolder;
    import org.eclipse.jetty.util.thread.QueuedThreadPool;
    
    import org.json.simple.*;
    
    import redis.clients.jedis.Jedis;
    
    public class PJetty{
        
        public static HashMap<String,Continuation>globalMap = new HashMap<String,Continuation>();
        
        // 用一个守护线程阻塞等待结果队列返回数据
        public static class DaemonThread extends Thread{
            
            private JSONObject obj = new JSONObject();
    
            private Jedis jedis = new Jedis("127.0.0.1",6379);
            private List<String> res;
            
            public void run(){
                
                while(true){
                    // 阻塞等待响应队列
                    res = jedis.brpop(0, "response_queue");
                    
                    // 获取结果信息
                    String response = res.get(1);
    
                    obj=(JSONObject) JSONValue.parse(response);
                    String request_sid = obj.get("request_sid").toString();
                    String result = obj.get("results").toString();
                    
                    if(request_sid == null){
                        continue;
                    }
                    
                    // 通过消息中的连接的sessonid获取到响应的continuation实例,然后设置结果信息再唤醒实例
                    Continuation con = globalMap.get(request_sid);
                    if(con == null){continue;}
                    globalMap.remove(request_sid);
                    
                    //客户端异常断开这里会抛错
                    try{
                        con.setAttribute("results", result);
                        con.resume();
                    } catch(Exception e){
                        continue;
                    }
                }
                
            }
        }
        
        public static void main(String[] args) throws Exception {
            
            //开启守护线程去阻塞等待响应结果队列,唤醒请求
            DaemonThread dt = new DaemonThread();
            dt.start();
            
            //设置connectors
            SelectChannelConnector connector1 = new SelectChannelConnector();
            connector1.setPort(1987);
            connector1.setThreadPool(new QueuedThreadPool(5));
            
            Server server = new Server();
            server.setConnectors(new Connector[]{connector1});
    
            //使用servlet处理请求
            ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
            context.setContextPath("/");
            context.addServlet(new ServletHolder(new NonBlockingServlet()), "/fetch");
            server.setHandler(context);
            
            server.start();
            server.join();
        }
    }

    A.2,实现自定义的servlets接受前端client连接,将请求信息传入队列request queue

    package test;
    
    import java.io.IOException;
    
    import javax.servlet.http.HttpServlet;
    import javax.servlet.http.HttpServletRequest;
    import javax.servlet.http.HttpServletResponse;
    import javax.servlet.http.HttpSession;
    
    import org.eclipse.jetty.continuation.Continuation;
    import org.eclipse.jetty.continuation.ContinuationSupport;
    import org.json.simple.JSONObject;
    
    import redis.clients.jedis.Jedis;
    
    public class NonBlockingServlet extends HttpServlet {
    
        /**
         * generated serialize number
         */
        private static final long serialVersionUID = 3313258432391586994L;
        
        
        protected void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException
        {
            // 用sleeptime来模拟后台工作量
             String sleepTime = request.getParameter("st");
             if(sleepTime == null){
                 sleepTime = "0";
             }
             
             // 查看结果队列是否返回此连接的请求结果
             Object results = request.getAttribute("results");
             if (results==null) // 如果异步处理尚未返回结果
              {
               Continuation continuation = ContinuationSupport.getContinuation(request);
         
               if(continuation.isInitial()){
                   // 设置连接超时时间
                    continuation.setTimeout(10000);
                       response.setContentType("text/plain");
                    response.getWriter().flush();
                    
                    HttpSession session=request.getSession();
                    String sid = session.getId();
                    
                    Jedis jedis = new Jedis("127.0.0.1",6379);
                    //将请求连接sessionid以及请求内容encode后传到处理队列中
                    JSONObject obj=new JSONObject();
                    obj.put("request_sid",sid);
                    obj.put("params",sleepTime);
                    
                    jedis.lpush("request_queue", obj.toJSONString());
                    
                    //将连接和continuation实例做个映射关系存到全局hashmap中,不确定这里是否应该加锁
                    PJetty.globalMap.put(sid, continuation);
               }
               
               // 判断是否超时
               if (continuation.isExpired())
               {
                 // 返回超时Response
                 response.getWriter().println("timeout");
                    response.getWriter().flush();  
                 return;
               }
         
               // 挂起HTTP连接
               continuation.suspend(); 
         
               return; // or continuation.undispatch();
             }
         
             // 连接恢复后返回结果
             response.getWriter().println("Got Result:	" + results);
             response.getWriter().flush();  
        }
    }

    业务服务器

    B,实现后端worker.php(可以自定义worker进程数,我这里设置为5个php进程,进程数多能获取更好的并发)(用到predis库)

    #!/root/bin/php
    <?php
    
    require_once("lib/Predis/Autoloader.php");
    
    function worker_thread()
    {
            PredisAutoloader::register();
            $redis = new PredisClient('tcp://127.0.0.1:6379');
    
                    while(true){
                            try{
                                    $request = $redis->brpop("request_queue", 0);
                            } catch(Exception $e){
                                    continue;
                            }
                            /** demo
                             array(2) {
                             [0]=>
                             string(13) "request_queue"
                             [1]=>
                             string(55) "{"request_sid":"q0muxazo8k1h1k3uw85wuayh","params":"4"}"
                             }
                             */
                            $request = json_decode($request[1], true);
                            // sleep represents work loads
                            sleep(intval($request["params"]));
                            $results = array("request_sid"=>$request["request_sid"], "results"=>$request["params"]);
                            $response = json_encode($results);
                            $redis->lpush("response_queue",$response);
                    }
    }
    
    //开启多个worker进程提供服务
    for ($worker_nbr = 0; $worker_nbr < 5; $worker_nbr++) {
            $pid = pcntl_fork();
            if ($pid == 0) {
                    worker_thread();
    
                    return;
            }
    }
    
    ?>

    运行结果如下

     这只是一个简单的demo,为了防止redis,workers进程挂掉或者客户端异常断开,还需要做些异常处理,比如设置请求超时,捕获一些空指针等,超时需要将continuation从globalMap中剔除,防止内存得不到释放。

    root # for((i=10;i>=1;i--)) ; do lynx -dump http://127.0.0.1:1987/fetch?st=$i & done
    [1] 14112
    [2] 14113
    [3] 14114
    [4] 14115
    [5] 14116
    [6] 14117
    [7] 14118
    [8] 14119
    [9] 14120
    [10] 14121
    root # Got Result:     3
    Got Result:     4
    Got Result:     2
    Got Result:     7
    Got Result:     1
    Got Result:     9
    Got Result:     6
    timeout
    timeout
    timeout
    
    [1]   Done                    lynx -dump http://127.0.0.1:1987/fetch?st=$i
    [2]   Done                    lynx -dump http://127.0.0.1:1987/fetch?st=$i
    [3]   Done                    lynx -dump http://127.0.0.1:1987/fetch?st=$i
    [4]   Done                    lynx -dump http://127.0.0.1:1987/fetch?st=$i
    [5]   Done                    lynx -dump http://127.0.0.1:1987/fetch?st=$i
    [6]   Done                    lynx -dump http://127.0.0.1:1987/fetch?st=$i
    [7]   Done                    lynx -dump http://127.0.0.1:1987/fetch?st=$i
    [8]   Done                    lynx -dump http://127.0.0.1:1987/fetch?st=$i
    [9]-  Done                    lynx -dump http://127.0.0.1:1987/fetch?st=$i
    [10]+  Done                    lynx -dump http://127.0.0.1:1987/fetch?st=$i

    redis数据库中存储的内容如下,可以看出虽然经后台处理后顺序变化了,但是对应关系正确,接入服务器能够根据request_sid把结果返回给相应的用户:

    redis 127.0.0.1:6379> lrange request_queue 0 15
     1) "{"request_sid":"igiwkwnb715aphw8uvtfa6rj","params":"3"}"
     2) "{"request_sid":"wsrglxa3h6ef19ik5i0nbiiys","params":"2"}"
     3) "{"request_sid":"tyiqoj6awj5t16ddpqusftwc8","params":"6"}"
     4) "{"request_sid":"1052tgkiyy7c31bmxjtbom7ca","params":"5"}"
     5) "{"request_sid":"17jo1xwnnkd3h1mhcqcfplrl5k","params":"8"}"
     6) "{"request_sid":"1xk521sq6vmmf6enxauwzduj9","params":"4"}"
     7) "{"request_sid":"1cxnir1slgjiq1o2n3xwznh0kk","params":"9"}"
     8) "{"request_sid":"961vf8hao3stsv4vt1qif3ws","params":"7"}"
     9) "{"request_sid":"35pfn5au6p8qdbri17p636si","params":"10"}"
    10) "{"request_sid":"1ca4wy8qsfr7av0hwk8xtlqhp","params":"1"}"
    
    redis 127.0.0.1:6379> lrange response_queue 0 15
     1) "{"request_sid":"tyiqoj6awj5t16ddpqusftwc8","results":"6"}"
     2) "{"request_sid":"igiwkwnb715aphw8uvtfa6rj","results":"3"}"
     3) "{"request_sid":"wsrglxa3h6ef19ik5i0nbiiys","results":"2"}"
     4) "{"request_sid":"35pfn5au6p8qdbri17p636si","results":"10"}"
     5) "{"request_sid":"1052tgkiyy7c31bmxjtbom7ca","results":"5"}"
     6) "{"request_sid":"1cxnir1slgjiq1o2n3xwznh0kk","results":"9"}"
     7) "{"request_sid":"17jo1xwnnkd3h1mhcqcfplrl5k","results":"8"}"
     8) "{"request_sid":"961vf8hao3stsv4vt1qif3ws","results":"7"}"
     9) "{"request_sid":"1xk521sq6vmmf6enxauwzduj9","results":"4"}"
    10) "{"request_sid":"1ca4wy8qsfr7av0hwk8xtlqhp","results":"1"}"
  • 相关阅读:
    c++ time_t
    sql 一些题目
    vc 找到一个或多个多重定义的符号
    c++ json 详解
    c++ json cpp
    C++ string(转)
    java web 复选框checked
    20_采用ContentProvider对外共享数据
    16_采用SharedPreferences保存用户偏好设置参数
    android开发 eclipse alt+”/”自动提示失效
  • 原文地址:https://www.cnblogs.com/ciaos/p/3913701.html
Copyright © 2011-2022 走看看