客户端发送tcp请求给服务端,最终所有服务端的逻辑都存在于服务端的netty中,进一步说就是channelHandler中
public class NettyServerPipelineFactory implements ChannelPipelineFactory { private NettyServer server; private static CodecConfig codecConfig = CodecConfigFactory.createClientConfig(); public NettyServerPipelineFactory(NettyServer server) { this.server = server; } public ChannelPipeline getPipeline() { ChannelPipeline pipeline = pipeline(); pipeline.addLast("framePrepender", new FramePrepender()); pipeline.addLast("frameDecoder", new FrameDecoder()); pipeline.addLast("crc32Handler", new Crc32Handler(codecConfig)); pipeline.addLast("compressHandler", new CompressHandler(codecConfig)); pipeline.addLast("providerDecoder", new ProviderDecoder()); pipeline.addLast("providerEncoder", new ProviderEncoder()); pipeline.addLast("serverHandler", new NettyServerHandler(server)); return pipeline; } }
经过解码,crc校验,解压缩,反序列化之后,最后到达 NettyServerHandler
@Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent message) { CodecEvent codecEvent = (CodecEvent) (message.getMessage()); if (!codecEvent.isValid() || codecEvent.getInvocation() == null) { return; } InvocationRequest request = (InvocationRequest) codecEvent.getInvocation(); ProviderContext invocationContext = new DefaultProviderContext(request, new NettyServerChannel(ctx.getChannel())); try { this.server.processRequest(request, invocationContext); } catch (Throwable e) { String msg = "process request failed:" + request; // 心跳消息只返回正常的, 异常不返回 if (request.getCallType() == Constants.CALLTYPE_REPLY && request.getMessageType() != Constants.MESSAGE_TYPE_HEART) { ctx.getChannel().write(ProviderUtils.createFailResponse(request, e)); } log.error(msg, e); } }
AbstractServer # processRequest
public Future<InvocationResponse> processRequest(InvocationRequest request, ProviderContext providerContext) { return requestProcessor.processRequest(request, providerContext); }
AbstractRequestProcessor # processRequest
public Future<InvocationResponse> processRequest(final InvocationRequest request, final ProviderContext providerContext) { if (request.getCreateMillisTime() == 0) { request.setCreateMillisTime(System.currentTimeMillis()); } Future<InvocationResponse> invocationResponse = null; try { invocationResponse = doProcessRequest(request, providerContext); } catch (Throwable e) { String msg = "process request failed:" + request; if (request.getCallType() == Constants.CALLTYPE_REPLY && request.getMessageType() != Constants.MESSAGE_TYPE_HEART) { providerContext.getChannel().write(providerContext, ProviderUtils.createFailResponse(request, e)); } // logger.error(msg, e); } providerContext.setFuture(invocationResponse); return invocationResponse; }
RequestThreadPoolProcessor#doProcessRequest
public Future<InvocationResponse> doProcessRequest(final InvocationRequest request, final ProviderContext providerContext) { requestContextMap.put(request, providerContext); startMonitorData(request, providerContext); Callable<InvocationResponse> requestExecutor = new Callable<InvocationResponse>() { @Override public InvocationResponse call() throws Exception { providerContext.getTimeline().add(new TimePoint(TimePhase.T)); try { ServiceInvocationHandler invocationHandler = ProviderProcessHandlerFactory .selectInvocationHandler(providerContext.getRequest().getMessageType()); if (invocationHandler != null) { providerContext.setThread(Thread.currentThread()); return invocationHandler.handle(providerContext); } } catch (Throwable t) { logger.error("Process request failed with invocation handler, you should never be here.", t); } finally { requestContextMap.remove(request); } return null; } }; final ThreadPool pool = selectThreadPool(request);//选择执行线程池部分 不关注,因为一般也不会设置都是用默认的 try { checkRequest(pool, request); providerContext.getTimeline().add(new TimePoint(TimePhase.T)); return pool.submit(requestExecutor); } catch (RejectedExecutionException e) { requestContextMap.remove(request); endMonitorData(request, providerContext); throw new RejectedException(getProcessorStatistics(pool), e); } }
如果不设置 就是
public static final int DEFAULT_PROVIDER_COREPOOLSIZE = 60;//coresize
public static final int DEFAULT_PROVIDER_MAXPOOLSIZE = 500; // maxsize
重点看这段逻辑
ServiceInvocationHandler invocationHandler = ProviderProcessHandlerFactory .selectInvocationHandler(providerContext.getRequest().getMessageType()); if (invocationHandler != null) { providerContext.setThread(Thread.currentThread()); return invocationHandler.handle(providerContext);
又是熟悉的责任链模式,看来作者真的很擅长责任链
ProviderProcessHandlerFactory #
public static void init() { registerBizProcessFilter(new TraceFilter()); if (Constants.MONITOR_ENABLE) { registerBizProcessFilter(new MonitorProcessFilter()); } registerBizProcessFilter(new WriteResponseProcessFilter()); registerBizProcessFilter(new ContextTransferProcessFilter()); registerBizProcessFilter(new ExceptionProcessFilter()); registerBizProcessFilter(new SecurityFilter()); registerBizProcessFilter(new GatewayProcessFilter()); registerBizProcessFilter(new BusinessProcessFilter()); bizInvocationHandler = createInvocationHandler(bizProcessFilters);
上面这几个filter比较重要的是
1 WriteResponseProcessFilter 负责把结果写回给客户端
2 GatewayProcessFilter 限流相关
3 BusinessProcessFilter 业务执行相关
GatewayProcessFilter 后续讲降级限流会具体展开。
先看看WriteResponseProcessFilter 的原理
WriteResponseProcessFilter# invoke(ServiceInvocationHandler handler, ProviderContext invocationContext)
public InvocationResponse invoke(ServiceInvocationHandler handler, ProviderContext invocationContext) throws Throwable { try { ProviderChannel channel = invocationContext.getChannel(); InvocationRequest request = invocationContext.getRequest(); InvocationResponse response = handler.handle(invocationContext);//执行后续的handler,也就是业务handler if (request.getCallType() == Constants.CALLTYPE_REPLY) { invocationContext.getTimeline().add(new TimePoint(TimePhase.P)); channel.write(invocationContext, response);//将执行结果写回到客户端 invocationContext.getTimeline().add(new TimePoint(TimePhase.P)); } if (request.getMessageType() == Constants.MESSAGE_TYPE_SERVICE) { List<ProviderProcessInterceptor> interceptors = ProviderProcessInterceptorFactory.getInterceptors(); for (ProviderProcessInterceptor interceptor : interceptors) { interceptor.postInvoke(request, response); } List<ProviderInterceptor> contextInterceptors = ProviderInterceptorFactory.getInterceptors(); for (ProviderInterceptor interceptor : contextInterceptors) { interceptor.postInvoke(invocationContext); } } return response; } finally { // ContextUtils.clearContext(); // ContextUtils.clearLocalContext(); } }
BusinessProcessFilter
public InvocationResponse invoke(ServiceInvocationHandler handler, ProviderContext invocationContext) throws Throwable { invocationContext.getTimeline().add(new TimePoint(TimePhase.U)); InvocationRequest request = invocationContext.getRequest(); if (request.getMessageType() == Constants.MESSAGE_TYPE_SERVICE) { // 重置超时时间 if (ConfigManagerLoader.getConfigManager().getBooleanValue(KEY_TIMEOUT_RESET, true) && request.getTimeout() > 0) { ContextUtils.putLocalContext(Constants.REQUEST_TIMEOUT, request.getTimeout()); } // 有特定的定时线程监听调用超时时间,如果超时会进行打断 if (Thread.currentThread().isInterrupted()) { StringBuilder msg = new StringBuilder(); msg.append("the request has been canceled by timeout checking processor:").append(request); throw new RequestAbortedException(msg.toString()); } // 执行自定义拦截器的前置调用 List<ProviderProcessInterceptor> interceptors = ProviderProcessInterceptorFactory.getInterceptors(); for (ProviderProcessInterceptor interceptor : interceptors) { interceptor.preInvoke(request); } List<ProviderInterceptor> contextInterceptors = ProviderInterceptorFactory.getInterceptors(); for (ProviderInterceptor interceptor : contextInterceptors) { interceptor.preInvoke(invocationContext); } InvocationResponse response = null; // 类似于缓存,之前获取过会放入 ServiceMethod method = invocationContext.getServiceMethod(); if (method == null) { // 缓存不存在,重新获取,根据服务名、服务版本、方法名、方法参数等信息找到最佳匹配方法 method = ServiceMethodFactory.getMethod(request); } if (Constants.REPLY_MANUAL) { if (request.getCallType() == Constants.CALLTYPE_REPLY) { request.setCallType(Constants.CALLTYPE_MANUAL); } } ProviderHelper.setContext(invocationContext); invocationContext.getTimeline().add(new TimePoint(TimePhase.M, System.currentTimeMillis())); Object returnObj = null; try { // 反射调用实际业务方法 returnObj = method.invoke(request.getParameters()); } finally { // 清空上下文数据 ProviderHelper.clearContext(); } invocationContext.getTimeline().add(new TimePoint(TimePhase.M, System.currentTimeMillis())); if (request.getCallType() == Constants.CALLTYPE_REPLY) { // 需要响应,传入方法调用返回结果,创建成功响应 response = ProviderUtils.createSuccessResponse(request, returnObj); } return response; } // 非MESSAGE_TYPE_SERVICE走到这一步报错 throw new BadRequestException("message type[" + request.getMessageType() + "] is not supported!"); }