zoukankan      html  css  js  c++  java
  • 9.2 服务端接收请求消息并发送响应消息源码

    一 总体流程图

    服务端接收请求消息
    NettyHandler.messageReceived(ChannelHandlerContext ctx, MessageEvent e)
    -->MultiMessageHandler.received(Channel channel, Object message)
      -->HeartbeatHandler.received(Channel channel, Object message)
        -->AllChannelHandler.received(Channel channel, Object message)
          -->ExecutorService cexecutor = getExecutorService()
          -->cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message))
            -->ChannelEventRunnable.run()
              -->DecodeHandler.received(Channel channel, Object message)
                -->decode(Object message)
                -->HeaderExchangeHandler.received(Channel channel, Object message)
                  -->Response response = handleRequest(exchangeChannel, request)
                    -->DubboProtocol.requestHandler.reply(ExchangeChannel channel, Object message)//这里的message就是上边的RpcInvocation
    		  //首先获取exporter,之后再获取invoker
    		  -->getInvoker(Channel channel, Invocation inv)//组装serviceKey=com.alibaba.dubbo.demo.DemoService:20880
    		    -->(DubboExporter<?>) exporterMap.get(serviceKey)//从Map<String, Exporter<?>> exporterMap中根据serviceKey获取DubboExport实例,
    		    -->exporter.getInvoker()//获取RegistryProtocol$InvokerDelegete实例
    		  //执行filter链
    		  -->EchoFilter.invoke(Invoker<?> invoker, Invocation inv)
    		    -->ClassLoaderFilter.nvoke(Invoker<?> invoker, Invocation invocation)
    		      -->GenericFilter.invoke(Invoker<?> invoker, Invocation inv)
    		        -->ContextFilter.invoke(Invoker<?> invoker, Invocation invocation)
    			  -->TraceFilter.invoke(Invoker<?> invoker, Invocation invocation)
    			    -->TimeoutFilter.invoke(Invoker<?> invoker, Invocation invocation)
    			      -->MonitorFilter.invoke(Invoker<?> invoker, Invocation invocation)
    			        -->ExceptionFilter.invoke(Invoker<?> invoker, Invocation invocation)
    			          //执行真正的invoker调用
    				  -->AbstractProxyInvoker.invoke(Invocation invocation)
    			            -->JavassistProxyFactory$AbstractProxyInvoker.doInvoke
    				      -->Wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments)
    					-->DemoServiceImpl.sayHello(String name)
    			            -->new RpcResult(Object result)//将返回值result包装成RpcResult(最后该参数会被包装为Response)
    	      服务端发送响应消息
                  -->channel.send(response)//NettyChannel
                    -->NioAcceptedSocketChannel.write(Object message)//已经是netty的东西了,这里的message=Response实例:最重要的是RpcResult [result=Hello world, response form provider: 10.211.55.2:20880, exception=null]

    二 源码解析

    netty通信是在netty的handler中进行消息的接收处理和发送。来看一下NettyServer的handler。

     1     protected void doOpen() throws Throwable {
     2         ...
     3         final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
     4         ...
     5         bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
     6             public ChannelPipeline getPipeline() {
     7                 NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
     8                 ChannelPipeline pipeline = Channels.pipeline();
     9                 pipeline.addLast("decoder", adapter.getDecoder());
    10                 pipeline.addLast("encoder", adapter.getEncoder());
    11                 pipeline.addLast("handler", nettyHandler);
    12                 return pipeline;
    13             }
    14         });
    15         ...
    16     }

    NettyHandler.messageReceived

     1     private final ChannelHandler handler;//NettyServer
     2 
     3     @Override
     4     public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
     5         NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
     6         try {
     7             handler.received(channel, e.getMessage());
     8         } finally {
     9             NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
    10         }
    11     }

    首先会执行NettyServer父类AbstractPeer的received方法,其调用MultiMessageHandler.received:

     1     protected ChannelHandler handler;//HeartbeatHandler
     2     public void received(Channel channel, Object message) throws RemotingException {
     3         if (message instanceof MultiMessage) {
     4             MultiMessage list = (MultiMessage) message;
     5             for (Object obj : list) {
     6                 handler.received(channel, obj);
     7             }
     8         } else {
     9             handler.received(channel, message);
    10         }
    11     }

    HeartbeatHandler.received(Channel channel, Object message)

     1     protected ChannelHandler handler;//AllChannelHandler
     2     public void received(Channel channel, Object message) throws RemotingException {
     3         setReadTimestamp(channel);
     4         if (isHeartbeatRequest(message)) {
     5             ...
     6             return;
     7         }
     8         if (isHeartbeatResponse(message)) {
     9            ...
    10             return;
    11         }
    12         handler.received(channel, message);
    13     }

    AllChannelHandler.received(Channel channel, Object message)

     1     protected final ExecutorService executor;//ThreadPoolExecutor
     2     protected final ChannelHandler handler;//DecodeHandler
     3 
     4     public void received(Channel channel, Object message) throws RemotingException {
     5         ExecutorService cexecutor = getExecutorService();
     6         try {
     7             cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
     8         } catch (Throwable t) {
     9             ...
    10             throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
    11         }
    12     }
    13 
    14     private ExecutorService getExecutorService() {
    15         ExecutorService cexecutor = executor;
    16         if (cexecutor == null || cexecutor.isShutdown()) {
    17             cexecutor = SHARED_EXECUTOR;
    18         }
    19         return cexecutor;
    20     }

    这里首先创建了一个线程任务ChannelEventRunnable,之后丢入线程池进行执行。

    ChannelEventRunnable.run()

     1     private final ChannelHandler handler;//DecodeHandler
     2     public void run() {
     3         switch (state) {
     4             case CONNECTED:
     5                 ...
     6                 break;
     7             case DISCONNECTED:
     8                 ...
     9                 break;
    10             case SENT:
    11                 ...              
    12                 break;
    13             case RECEIVED:
    14                 try {
    15                     handler.received(channel, message);
    16                 } catch (Exception e) {
    17                     logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
    18                             + ", message is " + message, e);
    19                 }
    20                 break;
    21             case CAUGHT:
    22                 ...
    23                 break;
    24             default:
    25                 logger.warn("unknown state: " + state + ", message is " + message);
    26         }
    27     }

     DecodeHandler.received(Channel channel, Object message)

     1     protected ChannelHandler handler;//HeaderExchangeHandler
     2     public void received(Channel channel, Object message) throws RemotingException {
     3         if (message instanceof Decodeable) {
     4             decode(message);
     5         }
     6 
     7         if (message instanceof Request) {
     8             decode(((Request) message).getData());//解码
     9         }
    10 
    11         if (message instanceof Response) {
    12             decode(((Response) message).getResult());
    13         }
    14 
    15         handler.received(channel, message);
    16     }

    HeaderExchangeHandler.received(Channel channel, Object message)

     1     private final ExchangeHandler handler;//DubboProtocol$ExchangeHandler
     2 
     3     public void received(Channel channel, Object message) throws RemotingException {
     4         channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
     5         ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
     6         try {
     7             if (message instanceof Request) {
     8                 // handle request.
     9                 Request request = (Request) message;
    10                 if (request.isEvent()) {
    11                     handlerEvent(channel, request);
    12                 } else {
    13                     if (request.isTwoWay()) {
    14                         Response response = handleRequest(exchangeChannel, request);
    15                         channel.send(response);
    16                     } else {
    17                         handler.received(exchangeChannel, request.getData());
    18                     }
    19                 }
    20             } else if (message instanceof Response) {
    21                 handleResponse(channel, (Response) message);
    22             } else if (message instanceof String) {
    23                 if (isClientSide(channel)) {
    24                     Exception e = new Exception(...);
    25                 } else {
    26                     String echo = handler.telnet(channel, (String) message);
    27                     if (echo != null && echo.length() > 0) {
    28                         channel.send(echo);
    29                     }
    30                 }
    31             } else {
    32                 handler.received(exchangeChannel, message);
    33             }
    34         } finally {
    35             HeaderExchangeChannel.removeChannelIfDisconnected(channel);
    36         }
    37     }
    38 
    39     Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException {
    40         Response res = new Response(req.getId(), req.getVersion());
    41         if (req.isBroken()) {
    42             Object data = req.getData();
    43 
    44             String msg;
    45             if (data == null) msg = null;
    46             else if (data instanceof Throwable) msg = StringUtils.toString((Throwable) data);
    47             else msg = data.toString();
    48             res.setErrorMessage("Fail to decode request due to: " + msg);
    49             res.setStatus(Response.BAD_REQUEST);
    50 
    51             return res;
    52         }
    53         // find handler by message class.
    54         Object msg = req.getData();
    55         try {
    56             // handle data.
    57             Object result = handler.reply(channel, msg);
    58             res.setStatus(Response.OK);
    59             res.setResult(result);
    60         } catch (Throwable e) {
    61             res.setStatus(Response.SERVICE_ERROR);
    62             res.setErrorMessage(StringUtils.toString(e));
    63         }
    64         return res;
    65     }

    DubboProtocol$ExchangeHandler.reply(ExchangeChannel channel, Object message)

    1         public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
    2             if (message instanceof Invocation) {
    3                 Invocation inv = (Invocation) message;
    4                 Invoker<?> invoker = getInvoker(channel, inv);
    5                 ...
    6                 return invoker.invoke(inv);
    7             }
    8             throw new RemotingException(...);
    9         }

    首先是获取Invoker,之后使用该invoker执行真正调用。

     1     protected final Map<String, Exporter<?>> exporterMap = new ConcurrentHashMap<String, Exporter<?>>();
     2 
     3     Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
     4         ...
     5         int port = channel.getLocalAddress().getPort();//20880
     6         String path = inv.getAttachments().get(Constants.PATH_KEY);
     7         ...
     8         String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY));
     9 
    10         DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);
    11 
    12         if (exporter == null)
    13             throw new RemotingException(...);
    14 
    15         return exporter.getInvoker();
    16     }

    这里serviceKey是:com.alibaba.dubbo.demo.DemoService:20880。实际上是group/serviceName:serviceVersion:port。

     1     public static String serviceKey(int port, String serviceName, String serviceVersion, String serviceGroup) {
     2         StringBuilder buf = new StringBuilder();
     3         if (serviceGroup != null && serviceGroup.length() > 0) {
     4             buf.append(serviceGroup);
     5             buf.append("/");
     6         }
     7         buf.append(serviceName);
     8         if (serviceVersion != null && serviceVersion.length() > 0 && !"0.0.0".equals(serviceVersion)) {
     9             buf.append(":");
    10             buf.append(serviceVersion);
    11         }
    12         buf.append(":");
    13         buf.append(port);
    14         return buf.toString();
    15     }

    Map<String, Exporter<?>> exporterMap在服务暴露时就已经初始化好了。"com.alibaba.dubbo.demo.DemoService:20880"->DubboExporter实例。该实例包含一个呗filter链包裹的Invoker实例:RegistryProtocol$InvokerDelegete实例。

    之后开始执行filter链了,直到最后执行到RegistryProtocol$InvokerDelegete.invoke,该方法实际上是在RegistryProtocol$InvokerDelegete的父类InvokerWrapper执行,InvokerWrapper调用AbstractProxyInvoker.invoke(Invocation invocation)。

     1     private final T proxy;//DemoServiceImpl实例
     2 
     3     public Result invoke(Invocation invocation) throws RpcException {
     4         try {
     5             return new RpcResult(doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments()));
     6         } catch (InvocationTargetException e) {
     7             return new RpcResult(e.getTargetException());
     8         } catch (Throwable e) {
     9             throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e);
    10         }
    11     }

    这里先调用子类JavassistProxyFactory$AbstractProxyInvoker.doInvoke,之后将返回结果封装为RpcResult返回。

    1 protected Object doInvoke(T proxy, String methodName,
    2                                       Class<?>[] parameterTypes,
    3                                       Object[] arguments) throws Throwable {
    4                 return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
    5             }

    这里调用了Wrapper类的invokeMethod方法,Wrapper是一个动态生成的类,笔者给出:

     1 import com.alibaba.dubbo.common.bytecode.Wrapper;
     2 import java.util.HashMap;
     3 
     4 public class Wrapper1 extends Wrapper {
     5 
     6     public static String[] pns;//property name array
     7     public static java.util.Map pts = new HashMap();//<property key, property value>
     8     public static String[] mns;//method names
     9     public static String[] dmns;//
    10     public static Class[] mts0;
    11     /**
    12      * @param o  实现类
    13      * @param n  方法名称
    14      * @param p  参数类型
    15      * @param v  参数名称
    16      * @return
    17      * @throws java.lang.reflect.InvocationTargetException
    18      */
    19     public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws java.lang.reflect.InvocationTargetException {
    20         com.alibaba.dubbo.demo.provider.DemoServiceImpl w;
    21         try {
    22             w = ((com.alibaba.dubbo.demo.provider.DemoServiceImpl) o);
    23         } catch (Throwable e) {
    24             throw new IllegalArgumentException(e);
    25         }
    26         try {
    27             if ("sayHello".equals(n) && p.length == 1) {
    28                 return ($w) w.sayHello((java.lang.String) v[0]);
    29             }
    30         } catch (Throwable e) {
    31             throw new java.lang.reflect.InvocationTargetException(e);
    32         }
    33         throw new com.alibaba.dubbo.common.bytecode.NoSuchMethodException("Not found method "" + n + "" in class com.alibaba.dubbo.demo.provider.DemoServiceImpl.");
    34     }
    35 }

    这里距执行到了DemoServiceImpl的sayHello(String name)方法。之后将返回结果封装为RpcResult并返回,一直返回到HeaderExchangeHandler的received(Channel channel, Object message)

     1     public void received(Channel channel, Object message) throws RemotingException {
     2         channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
     3         ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
     4         try {
     5             if (message instanceof Request) {
     6                 // handle request.
     7                 Request request = (Request) message;
     8                 if (request.isEvent()) {
     9                     handlerEvent(channel, request);
    10                 } else {
    11                     if (request.isTwoWay()) {
    12                         Response response = handleRequest(exchangeChannel, request);
    13                         channel.send(response);
    14                     } else {
    15                         handler.received(exchangeChannel, request.getData());
    16                     }
    17                 }
    18             } else if (message instanceof Response) {
    19                 handleResponse(channel, (Response) message);
    20             } else if (message instanceof String) {
    21                 if (isClientSide(channel)) {
    22                     Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
    23                     logger.error(e.getMessage(), e);
    24                 } else {
    25                     String echo = handler.telnet(channel, (String) message);
    26                     if (echo != null && echo.length() > 0) {
    27                         channel.send(echo);
    28                     }
    29                 }
    30             } else {
    31                 handler.received(exchangeChannel, message);
    32             }
    33         } finally {
    34             HeaderExchangeChannel.removeChannelIfDisconnected(channel);
    35         }
    36     }

    之后将响应结果返回给客户端,这里的channel是NettyChannel,执行NettyChannel的send方法,其调用NioAcceptedSocketChannel.write(Object message)将消息写会给客户端,结束!

  • 相关阅读:
    【转】主从同步出现一下错误:Slave_IO_Running: Connecting
    解决Mysql的主从数据库没有同步的两种方法
    xargs命令详解,xargs与管道的区别
    常用的排序算法的时间复杂度和空间复杂度
    IP 地址分类(A、B、C、D、E类)
    SNMP 协议介绍 转载
    TCP首部的TimeStamp时间戳选项 转载
    TCP SACK 介绍 转载
    HTTP header 介绍 转载
    CRT小键盘输入乱码
  • 原文地址:https://www.cnblogs.com/java-zhao/p/7821908.html
Copyright © 2011-2022 走看看