zoukankan      html  css  js  c++  java
  • motan源码分析十一:部分特性

    本章将描述motan部分的特性并对源码进行分析。

    1.requestid的维护,使用了当前时间左移20位,再和一个自增变量组合

    public class RequestIdGenerator {
        protected static final AtomicLong offset = new AtomicLong(0);
        protected static final int BITS = 20;
        protected static final long MAX_COUNT_PER_MILLIS = 1 << BITS;
    
    
        /**
         * 获取 requestId
         * 
         * @return
         */
        public static long getRequestId() {
            long currentTime = System.currentTimeMillis();
            long count = offset.incrementAndGet();
            while(count >= MAX_COUNT_PER_MILLIS){
                synchronized (RequestIdGenerator.class){
                    if(offset.get() >= MAX_COUNT_PER_MILLIS){
                        offset.set(0);
                    }
                }
                count = offset.incrementAndGet();
            }
            return (currentTime << BITS) + count;
        }
    
        public static long getRequestIdFromClient() {
            // TODO 上下文 requestid
            return 0;
    
        }
    
    }

    2.限流,motan支持简单的限流,是利用filter来实现的

    @SpiMeta(name = "active")
    @Activation(sequence = 1)
    public class ActiveLimitFilter implements Filter {
    
        @Override
        public Response filter(Caller<?> caller, Request request) {
            int maxAcvitivyCount = caller.getUrl().getIntParameter(URLParamType.actives.getName(), URLParamType.actives.getIntValue());
            if (maxAcvitivyCount > 0) {
                int activeCount = RpcStats.getServiceStat(caller.getUrl()).getActiveCount();
                if (activeCount >= maxAcvitivyCount) {
                    throw new MotanServiceException(String.format("Request(%s) active count exceed the limit (%s), referer:%s", request,
                            maxAcvitivyCount, caller.getUrl()), MotanErrorMsgConstant.SERVICE_REJECT);
                }
            }
    
            long startTime = System.currentTimeMillis();
            RpcStats.beforeCall(caller.getUrl(), request);
            try {
                Response rs = caller.call(request);
                RpcStats.afterCall(caller.getUrl(), request, true, System.currentTimeMillis() - startTime);
                return rs;
            } catch (RuntimeException re) {
                RpcStats.afterCall(caller.getUrl(), request, false, System.currentTimeMillis() - startTime);
                throw re;
            }
    
        }
    
    }

    3.对于连续失败的client进行不可用操作

        void incrErrorCount() {
            long count = errorCount.incrementAndGet();
    
            // 如果节点是可用状态,同时当前连续失败的次数超过限制maxClientConnection次,那么把该节点标示为不可用
            if (count >= maxClientConnection && state.isAliveState()) {
                synchronized (this) {
                    count = errorCount.longValue();
    
                    if (count >= maxClientConnection && state.isAliveState()) {
                        LoggerUtil.error("NettyClient unavailable Error: url=" + url.getIdentity() + " "
                                + url.getServerPortStr());
                        state = ChannelState.UNALIVE;
                    }
                }
            }
        }
    
        void resetErrorCount() {
            errorCount.set(0);
    
            if (state.isAliveState()) {
                return;
            }
    
            synchronized (this) {
                if (state.isAliveState()) {
                    return;
                }
    
                // 如果节点是unalive才进行设置,而如果是 close 或者 uninit,那么直接忽略
                if (state.isUnAliveState()) {
                    long count = errorCount.longValue();
    
                    // 过程中有其他并发更新errorCount的,因此这里需要进行一次判断
                    if (count < maxClientConnection) {
                        state = ChannelState.ALIVE;
                        LoggerUtil.info("NettyClient recover available: url=" + url.getIdentity() + " "
                                + url.getServerPortStr());
                    }
                }
            }
        }

    4.支持多注册中心,因此cluster的refer集合是所有注册中心包含服务器的集合,如果同一个服务器在不同的注册中心注册,则cluster中当作两个服务器

    5.服务端的采用boss线程池+工作线程池+业务线程池的处理方式

    	private final static ChannelFactory channelFactory = new NioServerSocketChannelFactory(//boss线程池和工作线程池,主要负责接收消息
    			Executors.newCachedThreadPool(new DefaultThreadFactory("nettyServerBoss", true)),
    			Executors.newCachedThreadPool(new DefaultThreadFactory("nettyServerWorker", true)));
    
            private StandardThreadExecutor standardThreadExecutor = null;//业务线程池,负责具体的业务处理
    
            standardThreadExecutor = (standardThreadExecutor != null && !standardThreadExecutor.isShutdown()) ? standardThreadExecutor
    				: new StandardThreadExecutor(minWorkerThread, maxWorkerThread, workerQueueSize,
    						new DefaultThreadFactory("NettyServer-" + url.getServerPortStr(), true));
    		standardThreadExecutor.prestartAllCoreThreads();
    
           final NettyChannelHandler handler = new NettyChannelHandler(NettyServer.this, messageHandler,
    				standardThreadExecutor);//handler使用业务线程池今天处理具体的业务
    

      

  • 相关阅读:
    蓝桥杯训练 2n皇后问题
    AtCoder Grand Contest #026 B
    AtCoder Grand Contest #026 A
    AtCoder Beginner Contest 103
    7-26 单词长度(15 分)
    uva 10006 Carmichael Numbers
    java并发带返回结果的批量任务执行(CompletionService:Executor + BlockingQueue)
    并发容器之CopyOnWriteArrayList
    模板模式
    静态工具类中使用注解注入service
  • 原文地址:https://www.cnblogs.com/mantu/p/5887224.html
Copyright © 2011-2022 走看看