zoukankan      html  css  js  c++  java
  • 实时大规模数据的订阅和推送服务

    1. 背景  

         服务后台实时收集千万级别在线终端、全国近400个城市的出租车、手机和pad等移动终端的位置点gps信息,然后根据gps所在城市区域,持久化并推送分发给不同的订阅用户。

         其业务逻辑图如下:

                                   

     

     1.1 需求特征

      a 实时性(gps点本身具有实时性的特征,例如打车服务,需要周边实时出租车位置信息)

      b 数据量大(全国实时gps点数据规模 T级别/per day,高峰期时达到1G/min) 

     1.2 推送方式选择    

      数据推送方式通常有两种类型:  

      a Pull方式,这种方式服务端开发相对简单,可以采用缓存+httpserver的方式解决;

      b Push方式,这种方式通常满足实时性的需求,对服务端而言逻辑相对复杂,需要维持大并发的连接和发送

      由于实时性和大数据量需求的特征,所以系统采用"Push+长连接方式"进行推送。当然实现一套支持实时海量数据和客户推送的系统,需要解决的关键技术问题有很多: 如分布式集群,集群的failover和balancer能力,集群节点的配置管理等等。本系统借鉴hadoop的RPC模块,实现一套订阅发布实时推送服务,下面主要说说为了提高单节点的并发和吞吐量的些trick. 

    2. 架构图    

    3. 性能优化

     3.1 异步数据发送

       异步发送逻辑如下:  

    private int channelIO(WritableByteChannel writeCh, ByteBuffer buf)
                throws IOException {
            int originalLimit = buf.limit();
            int initialRemaining = buf.remaining();
            int ret = 0;
            while (buf.remaining() > 0) {
                try {
                    int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
                    buf.limit(buf.position() + ioSize);
                    ret = writeCh.write(buf);
                    if (ret < ioSize) {
                        break;
                    }
                } finally {
                    buf.limit(originalLimit);
                }
            }
            int nBytes = initialRemaining - buf.remaining(); 
            return (nBytes > 0) ? nBytes : ret;
        }
    

       这里主要有两优化点:①为防止待写的数据量过大导致独占线程时间片过长,在8行代码对ByteBuffer进行了分片发送(尽管tcp尽量避免数据分片和组包),②在11行代码,通道没法写完数据时,应让出线程,立刻返回注册到selector,待下次writeCh通道变成writable可写状态时,再进行channelIO写操作(这是niobio的最大区别)。 

     3.2 用多selector机制,分离网络读写操作

          ReadSelector负责监听用户的请求和鉴权响应,若用户请求为合法,则把相应连接注册给WriteSelector;WriteSelector负责将接收的实时gps点数据推送给已鉴权成功的注册用户连接         

     3.3 使用多selector机制,进行异步写数据

         可以根据客户端的端口hash到不同的Selector上去执行写的操作,如下: 

        private Responder selectResponder(int remotePort){
            int index = Math.abs(remotePort % responderCount);
            return responders[index];
        }
    

    4. 容错健壮性

          最后还得考虑实时数据流大和频率高的特征,当存在网络不好或带宽不足时,服务会存在数据发送不赢而导致堆积的潜在风险。所以为每个连接增加个队列ResponseQueue,来维护待发送数据集。只有数据队列中存在数据时,就将相应连接注册到WriteSelector。如下图:

                                           

            这里主要用到两个trick:  

       4.1 避免服务数据堆积

         当网络状况不好对方接收较慢或发送数据量比较大时,这两种情况下,都会造成服务数据堆积。因此,引入参数连接的缓冲数据队列大小限制maxAllowedQueueSize如果数据批次队列大于maxAllowedQueueSize,则直接丢弃,避免数据无上限增长,如下代码: 

    void doRespond(Call call) throws IOException {
            try {
                        synchronized (call.connection.responseQueue) {
                            if (call.connection.responseQueue.size() < maxAllowedQueueSize) {
                                call.connection.responseQueue.addLast(call);
                                if (call.connection.responseQueue.size() == 1) {
                                    processResponse(call.connection.responseQueue, true);
                                }
                            } else {
                                logger.warn(
                                        "incoming data discarded from connection {}",
                                        call.connection);
                            }
                        }
                    } catch (NullPointerException e) {
                        logger.error(e.getMessage(), e);
                    }
                }
    

       4.2 定期扫描和关闭坏掉的连接资源

         这里的坏掉是指数据在一段时间内一直停留在连接connection的数据队列里,则认为该连接已失效而直接清理队列数据和关闭相应连接。代码如下: 

                private void doPurge(Call call, long now) throws IOException {
                  if(call.connection == null || call.connection.responseQueue == null){
                      return ;
                  }
                  LinkedList<Call> responseQueue = call.connection.responseQueue;
                  synchronized (responseQueue) {
                    Iterator<Call> iter = responseQueue.listIterator(0);
                    while (iter.hasNext()) {
                      call = iter.next();
                      if (now > call.timestamp + PURGE_INTERVAL) {
                        logger.info("dalay of current connection {}  exceeds 10 mins",call.connection);
                        closeConnection(call.connection);
                        
                      }
                    }
                  }
                }
    

         希望对有类似需求的网友能提供些参考和讨论。

  • 相关阅读:
    解剖Nginx·自动脚本篇(7)类型相关脚本系列
    解剖Nginx·自动脚本篇(6)编译器名称变量脚本 auto/cc/name
    解剖Nginx·自动脚本篇(5)编译器相关主脚本
    解剖Nginx·自动脚本篇(4)工具型脚本系列
    解剖Nginx·自动脚本篇(3)源码相关变量脚本 auto/sources
    解剖Nginx·自动脚本篇(2)设置初始变量脚本 auto/init
    解剖Nginx·自动脚本篇(1)解析配置选项脚本 auto/options
    解剖Nginx·模块开发篇(5)解读内置非默认模块 ngx_http_stub_status_module
    jupyter nb + scite 混合搭建成我的最爱IDE
    md语法之行内代码和代码片续集
  • 原文地址:https://www.cnblogs.com/gisorange/p/3581493.html
Copyright © 2011-2022 走看看