zoukankan      html  css  js  c++  java
  • Pigeon源码分析(四) -- 服务端接收请求过程

    客户端发送tcp请求给服务端,最终所有服务端的逻辑都存在于服务端的netty中,进一步说就是channelHandler中

    public class NettyServerPipelineFactory implements ChannelPipelineFactory {
    
        private NettyServer server;
    
        private static CodecConfig codecConfig = CodecConfigFactory.createClientConfig();
    
        public NettyServerPipelineFactory(NettyServer server) {
            this.server = server;
        }
    
        public ChannelPipeline getPipeline() {
            ChannelPipeline pipeline = pipeline();
            pipeline.addLast("framePrepender", new FramePrepender());
            pipeline.addLast("frameDecoder", new FrameDecoder());
            pipeline.addLast("crc32Handler", new Crc32Handler(codecConfig));
            pipeline.addLast("compressHandler", new CompressHandler(codecConfig));
            pipeline.addLast("providerDecoder", new ProviderDecoder());
            pipeline.addLast("providerEncoder", new ProviderEncoder());
            pipeline.addLast("serverHandler", new NettyServerHandler(server));
            return pipeline;
        }
    
    }

      经过解码,crc校验,解压缩,反序列化之后,最后到达 NettyServerHandler

     @Override
        public void messageReceived(ChannelHandlerContext ctx, MessageEvent message) {
            CodecEvent codecEvent = (CodecEvent) (message.getMessage());
    
            if (!codecEvent.isValid() || codecEvent.getInvocation() == null) {
                return;
            }
    
            InvocationRequest request = (InvocationRequest) codecEvent.getInvocation();
    
            ProviderContext invocationContext = new DefaultProviderContext(request, new NettyServerChannel(ctx.getChannel()));
            try {
                this.server.processRequest(request, invocationContext);
    
            } catch (Throwable e) {
                String msg = "process request failed:" + request;
                // 心跳消息只返回正常的, 异常不返回
                if (request.getCallType() == Constants.CALLTYPE_REPLY
                        && request.getMessageType() != Constants.MESSAGE_TYPE_HEART) {
                    ctx.getChannel().write(ProviderUtils.createFailResponse(request, e));
                }
                log.error(msg, e);
            }
        }

      AbstractServer # processRequest

    public Future<InvocationResponse> processRequest(InvocationRequest request, ProviderContext providerContext) {
            return requestProcessor.processRequest(request, providerContext);
        }

      AbstractRequestProcessor # processRequest

    public Future<InvocationResponse> processRequest(final InvocationRequest request,
                final ProviderContext providerContext) {
            if (request.getCreateMillisTime() == 0) {
                request.setCreateMillisTime(System.currentTimeMillis());
            }
            Future<InvocationResponse> invocationResponse = null;
            try {
                invocationResponse = doProcessRequest(request, providerContext);
            } catch (Throwable e) {
                String msg = "process request failed:" + request;
                if (request.getCallType() == Constants.CALLTYPE_REPLY
                        && request.getMessageType() != Constants.MESSAGE_TYPE_HEART) {
                    providerContext.getChannel().write(providerContext, ProviderUtils.createFailResponse(request, e));
                }
                // logger.error(msg, e);
            }
            providerContext.setFuture(invocationResponse);
            return invocationResponse;
        }

    RequestThreadPoolProcessor#doProcessRequest

    public Future<InvocationResponse> doProcessRequest(final InvocationRequest request,
                                                           final ProviderContext providerContext) {
            requestContextMap.put(request, providerContext);
    
            startMonitorData(request, providerContext);
    
            Callable<InvocationResponse> requestExecutor = new Callable<InvocationResponse>() {
    
                @Override
                public InvocationResponse call() throws Exception {
                    providerContext.getTimeline().add(new TimePoint(TimePhase.T));
                    try {
                        ServiceInvocationHandler invocationHandler = ProviderProcessHandlerFactory
                                .selectInvocationHandler(providerContext.getRequest().getMessageType());
                        if (invocationHandler != null) {
                            providerContext.setThread(Thread.currentThread());
                            return invocationHandler.handle(providerContext);
                        }
                    } catch (Throwable t) {
                        logger.error("Process request failed with invocation handler, you should never be here.", t);
                    } finally {
                        requestContextMap.remove(request);
                    }
                    return null;
                }
            };
            final ThreadPool pool = selectThreadPool(request);//选择执行线程池部分 不关注,因为一般也不会设置都是用默认的
    
            try {
                checkRequest(pool, request);
                providerContext.getTimeline().add(new TimePoint(TimePhase.T));
                return pool.submit(requestExecutor);
            } catch (RejectedExecutionException e) {
                requestContextMap.remove(request);
                endMonitorData(request, providerContext);
                throw new RejectedException(getProcessorStatistics(pool), e);
            }
    
        }

    如果不设置 就是 

    public static final int DEFAULT_PROVIDER_COREPOOLSIZE = 60;//coresize

    public static final int DEFAULT_PROVIDER_MAXPOOLSIZE = 500; // maxsize

    重点看这段逻辑

    ServiceInvocationHandler invocationHandler = ProviderProcessHandlerFactory
                                .selectInvocationHandler(providerContext.getRequest().getMessageType());
                        if (invocationHandler != null) {
                            providerContext.setThread(Thread.currentThread());
                            return invocationHandler.handle(providerContext);

    又是熟悉的责任链模式,看来作者真的很擅长责任链

       ProviderProcessHandlerFactory # 

    public static void init() {
            registerBizProcessFilter(new TraceFilter());
            if (Constants.MONITOR_ENABLE) {
                registerBizProcessFilter(new MonitorProcessFilter());
            }
            registerBizProcessFilter(new WriteResponseProcessFilter());
            registerBizProcessFilter(new ContextTransferProcessFilter());
            registerBizProcessFilter(new ExceptionProcessFilter());
            registerBizProcessFilter(new SecurityFilter());
            registerBizProcessFilter(new GatewayProcessFilter());
            registerBizProcessFilter(new BusinessProcessFilter());
            bizInvocationHandler = createInvocationHandler(bizProcessFilters);

    上面这几个filter比较重要的是 

    1  WriteResponseProcessFilter 负责把结果写回给客户端

    2  GatewayProcessFilter 限流相关

    3  BusinessProcessFilter 业务执行相关

    GatewayProcessFilter 后续讲降级限流会具体展开。

    先看看WriteResponseProcessFilter 的原理 

    WriteResponseProcessFilter# invoke(ServiceInvocationHandler handler, ProviderContext invocationContext)

    public InvocationResponse invoke(ServiceInvocationHandler handler, ProviderContext invocationContext)
                throws Throwable {
            try {
                ProviderChannel channel = invocationContext.getChannel();
                InvocationRequest request = invocationContext.getRequest();
                InvocationResponse response = handler.handle(invocationContext);//执行后续的handler,也就是业务handler
                if (request.getCallType() == Constants.CALLTYPE_REPLY) {
                    invocationContext.getTimeline().add(new TimePoint(TimePhase.P));
                    channel.write(invocationContext, response);//将执行结果写回到客户端
                    invocationContext.getTimeline().add(new TimePoint(TimePhase.P));
                }
                if (request.getMessageType() == Constants.MESSAGE_TYPE_SERVICE) {
                    List<ProviderProcessInterceptor> interceptors = ProviderProcessInterceptorFactory.getInterceptors();
                    for (ProviderProcessInterceptor interceptor : interceptors) {
                        interceptor.postInvoke(request, response);
                    }
                    List<ProviderInterceptor> contextInterceptors = ProviderInterceptorFactory.getInterceptors();
                    for (ProviderInterceptor interceptor : contextInterceptors) {
                        interceptor.postInvoke(invocationContext);
                    }
                }
                return response;
            } finally {
                // ContextUtils.clearContext();
                // ContextUtils.clearLocalContext();
            }
        }

    BusinessProcessFilter

    public InvocationResponse invoke(ServiceInvocationHandler handler, ProviderContext invocationContext) throws Throwable {
        invocationContext.getTimeline().add(new TimePoint(TimePhase.U));
        InvocationRequest request = invocationContext.getRequest();
        if (request.getMessageType() == Constants.MESSAGE_TYPE_SERVICE) {
            // 重置超时时间
            if (ConfigManagerLoader.getConfigManager().getBooleanValue(KEY_TIMEOUT_RESET, true)
                    && request.getTimeout() > 0) {
                ContextUtils.putLocalContext(Constants.REQUEST_TIMEOUT, request.getTimeout());
            }
            // 有特定的定时线程监听调用超时时间,如果超时会进行打断
            if (Thread.currentThread().isInterrupted()) {
                StringBuilder msg = new StringBuilder();
                msg.append("the request has been canceled by timeout checking processor:").append(request);
                throw new RequestAbortedException(msg.toString());
            }
            // 执行自定义拦截器的前置调用
            List<ProviderProcessInterceptor> interceptors = ProviderProcessInterceptorFactory.getInterceptors();
            for (ProviderProcessInterceptor interceptor : interceptors) {
                interceptor.preInvoke(request);
            }
            List<ProviderInterceptor> contextInterceptors = ProviderInterceptorFactory.getInterceptors();
            for (ProviderInterceptor interceptor : contextInterceptors) {
                interceptor.preInvoke(invocationContext);
            }
            InvocationResponse response = null;
            // 类似于缓存,之前获取过会放入
            ServiceMethod method = invocationContext.getServiceMethod();
            if (method == null) {
                // 缓存不存在,重新获取,根据服务名、服务版本、方法名、方法参数等信息找到最佳匹配方法
                method = ServiceMethodFactory.getMethod(request);
            }
            if (Constants.REPLY_MANUAL) {
                if (request.getCallType() == Constants.CALLTYPE_REPLY) {
                    request.setCallType(Constants.CALLTYPE_MANUAL);
                }
            }
            ProviderHelper.setContext(invocationContext);
            invocationContext.getTimeline().add(new TimePoint(TimePhase.M, System.currentTimeMillis()));
            Object returnObj = null;
            try {
                // 反射调用实际业务方法
                returnObj = method.invoke(request.getParameters());
            } finally {
                // 清空上下文数据
                ProviderHelper.clearContext();
            }
    
            invocationContext.getTimeline().add(new TimePoint(TimePhase.M, System.currentTimeMillis()));
            if (request.getCallType() == Constants.CALLTYPE_REPLY) {
                // 需要响应,传入方法调用返回结果,创建成功响应
                response = ProviderUtils.createSuccessResponse(request, returnObj);
            }
            return response;
        }
        // 非MESSAGE_TYPE_SERVICE走到这一步报错
        throw new BadRequestException("message type[" + request.getMessageType() + "] is not supported!");
    }
  • 相关阅读:
    python入门之函数及其方法
    Python入门知识点2---字符串
    Python列表 元组 字典 以及函数
    Python入门知识
    Autofac使用代码
    优化EF以及登录验证
    CRM框架小知识以及增删查改逻辑代码
    分页SQL
    触发器SQL
    动态生成lambda表达式
  • 原文地址:https://www.cnblogs.com/juniorMa/p/14846559.html
Copyright © 2011-2022 走看看