zoukankan      html  css  js  c++  java
  • Dubbo源码解析(六)之provider调用篇

    在之前的源码分析文章中,我们看到了dubbo用netty作为底层的网络通信框架,熟悉netty的同学应该知道,使用netty时我们会使用它的各种Handler作为处理一些网络事件的处理器,在开启netty服务时,dubbo添加了NettyHandler作为处理器,pipeline.addLast("handler", nettyHandler);,我们在Dubbo源码解析之provider初始化这篇文章中详细的介绍了这个过程,同样也详细说明了handler的整个包装过程,我们就以NettyHandler作为入口,来分析dubbo的服务调用的过程。当然在NettyHandler调用之前,请求首先会经过编码器和解码器进行编码和解码,编解码的过程我们会用单独的文章进行分析。NettyHandler继承了netty的SimpleChannelHandler,通过覆盖SimpleChannelHandler的相关方法来处理相关事件,provider在收到服务调用请求时会触发messageReceived事件:

    NettyHandler:

    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
    // 连接成功后添加netty的Channel和dubbo的NettyChannel之间的映射关系
    NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
    try {
    handler.received(channel, e.getMessage());
    } finally {
    // 如果连接断开,移除netty的Channel和dubbo的NettyChannel之间的映射关系
    NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
    }
    }

    方法里我们看到messageReceived事件委托给了内部维护的另外一个handler(ChannelHandler类型)对象,NettyServer和NettyClient在构造NettyHandler时都传入了this作为handler,它们都实现了ChannelHandler,我们首先来看NettyServer的相关实现逻辑:
    AbstractPeer:

    public void received(Channel ch, Object msg) throws RemotingException {
    if (closed) {
    return;
    }
    /* 服务调用请求处理 */
    handler.received(ch, msg);
    }

    这里同样把连接事件委托给内部维护的一个handler(ChannelHandler类型)对象来处理,这里的handler是在构造NettyServer时传入的,我们在Dubbo源码解析之provider初始化这篇文章中看到了handler的包装过程,每次包装都有不同的功能,我们来逐个分析:
    MultiMessageHandler:

    public void received(Channel channel, Object message) throws RemotingException {
    if (message instanceof MultiMessage) {
    MultiMessage list = (MultiMessage) message;
    for (Object obj: list) {
    handler.received(channel, obj);
    }
    } else {
    handler.received(channel, message);
    }
    }
    MultiMessageHandler很好理解,就是为多条的这种消息做一个遍历。
    HeartbeatHandler:
    public void received(Channel channel, Object message) throws RemotingException {
    setReadTimestamp(channel);
    // 心跳请求消息
    if (isHeartbeatRequest(message)) {
    Request req = (Request) message;
    if (req.isTwoWay()) {
    Response res = new Response(req.getId(), req.getVersion());
    res.setEvent(Response.HEARTBEAT_EVENT);
    // 发送心跳影响事件消息
    channel.send(res);
    if (logger.isInfoEnabled()) {
    int heartbeat = channel.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
    if (logger.isDebugEnabled()) {
    logger.debug("Received heartbeat from remote channel " + channel.getRemoteAddress() +
    ", cause: The channel has no data-transmission exceeds a heartbeat period" +
    (heartbeat > 0 ? ": " + heartbeat + "ms" : ""));
    }
    }
    }
    return;
    }
    // 心跳响应消息
    if (isHeartbeatResponse(message)) {
    if (logger.isDebugEnabled()) {
    logger.debug(
    new StringBuilder(32)
    .append("Receive heartbeat response in thread ")
    .append(Thread.currentThread().getName())
    .toString());
    }
    return;
    }
    handler.received(channel, message);
    }
    HeartbeatHandler也很简单,就是对心跳消息的处理。
    AllChannelHandler:
    public void received(Channel channel, Object message) throws RemotingException {
    ExecutorService cexecutor = getExecutorService();
    try {
    /* 封装任务提交到共享线程池执行 */
    cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
    } catch (Throwable t) {
    // 拒绝执行的请求处理
    if (message instanceof Request && t instanceof RejectedExecutionException) {
    Request request = (Request) message;
    if (request.isTwoWay()) {
    String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();
    Response response = new Response(request.getId(), request.getVersion());
    response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
    response.setErrorMessage(msg);
    channel.send(response);
    return;
    }
    }
    throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
    }
    }
    ChannelEventRunnable实现了Runnable,我们来看run方法的实现:
    public void run() {
    switch (state) {
    case CONNECTED:
    try {
    handler.connected(channel);
    } catch (Exception e) {
    logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
    }
    break;
    case DISCONNECTED:
    try {
    handler.disconnected(channel);
    } catch (Exception e) {
    logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
    }
    break;
    case SENT:
    try {
    handler.sent(channel, message);
    } catch (Exception e) {
    logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel +
    ", message is " + message, e);
    }
    break;
    case RECEIVED:
    try {
    handler.received(channel, message);
    } catch (Exception e) {
    logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel +
    ", message is " + message, e);
    }
    break;
    case CAUGHT:
    try {
    handler.caught(channel, exception);
    } catch (Exception e) {
    logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel +
    ", message is: " + message + ", exception is " + exception, e);
    }
    break;
    default:
    logger.warn("unknown state: " + state + ", message is " + message);
    }
    }
    很简单,就是根据不同的事件类型做不同的处理,我们跟着received方法继续分析。接下来请求消息会经过DecodeHandler进行解码,这部分内容我们会在Dubbo编解码的相关文章中进行分析,接下来消息会经过HeaderExchangeHandler:
    public void received(Channel channel, Object message) throws RemotingException {
    channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
    ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
    try {
    if (message instanceof Request) {
    // 处理请求
    Request request = (Request) message;
    if (request.isEvent()) {
    // 处理事件,判断是否是只读请求并设置只读标记
    handlerEvent(channel, request);
    } else {
    if (request.isTwoWay()) {
    /* 处理请求,获得响应 */
    Response response = handleRequest(exchangeChannel, request);
    /* 回复响应 */
    channel.send(response);
    } else {
    /* 无需回复的请求处理 */
    handler.received(exchangeChannel, request.getData());
    }
    }
    } else if (message instanceof Response) {
    /* 响应消息处理 */
    handleResponse(channel, (Response) message);
    } else if (message instanceof String) {
    if (isClientSide(channel)) {
    // dubbo客户端不支持String类型的消息
    Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
    logger.error(e.getMessage(), e);
    } else {
    // 命令响应
    String echo = handler.telnet(channel, (String) message);
    if (echo != null && echo.length() > 0) {
    channel.send(echo);
    }
    }
    } else {
    handler.received(exchangeChannel, message);
    }
    } finally {
    HeaderExchangeChannel.removeChannelIfDisconnected(channel);
    }
    }

    HeaderExchangeHandler:
    Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException {
    Response res = new Response(req.getId(), req.getVersion());
    // 解码失败处理
    if (req.isBroken()) {
    Object data = req.getData();
    String msg;
    if (data == null) msg = null;
    else if (data instanceof Throwable) msg = StringUtils.toString((Throwable) data);
    else msg = data.toString();
    res.setErrorMessage("Fail to decode request due to: " + msg);
    res.setStatus(Response.BAD_REQUEST);
    return res;
    }
    Object msg = req.getData();
    try {
    /* 请求处理应答 */
    Object result = handler.reply(channel, msg);
    res.setStatus(Response.OK);
    res.setResult(result);
    } catch (Throwable e) {
    res.setStatus(Response.SERVICE_ERROR);
    res.setErrorMessage(StringUtils.toString(e));
    }
    return res;
    }
    这里的handler,是由DubboProtocol维护的requestHandler对象,它是ExchangeHandlerAdapter的匿名内部类对象,我们来看相关实现:
    DubboProtocol.requestHandler
    public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
    if (message instanceof Invocation) {
    Invocation inv = (Invocation) message;
    /* 匹配Invoker */
    Invoker<?> invoker = getInvoker(channel, inv);
    // 如果它是回调,则需要考虑向后兼容性
    if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
    String methodsStr = invoker.getUrl().getParameters().get("methods");
    boolean hasMethod = false;
    // 搜索是否有相关方法的存在
    if (methodsStr == null || methodsStr.indexOf(",") == -1) {
    hasMethod = inv.getMethodName().equals(methodsStr);
    } else {
    String[] methods = methodsStr.split(",");
    for (String method : methods) {
    if (inv.getMethodName().equals(method)) {
    hasMethod = true;
    break;
    }
    }
    }
    if (!hasMethod) {
    // 没有搜索到相关方法直接返回null
    logger.warn(new IllegalStateException("The methodName " + inv.getMethodName() + " not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) + " ,invocation is :" + inv);
    return null;
    }
    }
    RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
    /* Invoker调用 */
    return invoker.invoke(inv);
    }
    throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
    }
    DubboProtocol:
    Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
    boolean isCallBackServiceInvoke = false;
    boolean isStubServiceInvoke = false;
    int port = channel.getLocalAddress().getPort();
    String path = inv.getAttachments().get(Constants.PATH_KEY);
    isStubServiceInvoke = Boolean.TRUE.toString().equals(inv.getAttachments().get(Constants.STUB_EVENT_KEY));
    if (isStubServiceInvoke) {
    port = channel.getRemoteAddress().getPort();
    }
    // 回调
    isCallBackServiceInvoke = isClientSide(channel) && !isStubServiceInvoke;
    if (isCallBackServiceInvoke) {
    path = inv.getAttachments().get(Constants.PATH_KEY) + "." + inv.getAttachments().get(Constants.CALLBACK_SERVICE_KEY);
    inv.getAttachments().put(IS_CALLBACK_SERVICE_INVOKE, Boolean.TRUE.toString());
    }
    // 格式(分组/接口全称:服务版本:端口),接口和端口是一定存在的,分组和服务版本不一定
    String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY));
    // 获取Exporter
    DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);
    if (exporter == null)
    throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch " + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + inv);
    // 返回Invoker
    return exporter.getInvoker();
    }

    方法通过exporterMap获取Exporter,exporterMap我们在provider暴露篇中分析过为其添加serviceKey和Exporter映射的过程。

    通过Exporter获取到Invoker后,接下来就是Invoker的调用,同样在provider暴露篇中,我们也看到了在Protocol的包装类ProtocolFilterWrapper中,对Invoker进行了包装,目的是在Invoker调用时先调用一些拦截器进行过滤,具体有哪些拦截器我们在provider暴露篇中也进行了介绍,这里不再赘述。

    经过了拦截器之后,接下来会分别调用RegistryProtocol.InvokerDelegete和DelegateProviderMetaDataInvoker的invoke方法,同样的也是在provider暴露过程中,使用两者对Invoker进行了包装,两者的invoke方法都是直接调用Invoker的invoke方法,并没有什么特殊的处理。

    最后会调用JavassistProxyFactory生成的Wrapper包装类的invokeMethod方法,来真正调用我们提供的服务并获取返回结果,这整个的过程我们在provider暴露篇中都有提及,接下来当然就是将返回结果发送给调用方:
    AbstractPeer:

    public void send(Object message) throws RemotingException {
    /* 发送消息 */
    send(message, url.getParameter(Constants.SENT_KEY, false));
    }
    NettyChannel:
    public void send(Object message, boolean sent) throws RemotingException {
    super.send(message, sent); // 校验channel是否已经关闭
    boolean success = true;
    int timeout = 0;
    try {
    // 调用Netty的Channel的write方法将消息写入缓冲区,发送给调用方
    ChannelFuture future = channel.write(message);
    if (sent) {
    timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
    success = future.await(timeout);
    }
    Throwable cause = future.getCause();
    if (cause != null) {
    throw cause;
    }
    } catch (Throwable e) {
    throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
    }
    if (!success) {
    throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() +
    "in timeout(" + timeout + "ms) limit");
    }
    }
    这样整个接受请求、调用服务、返回响应的过程就分析完了,如果request的isTwoway是false,这个时候是不需要返回响应的,会直接调用received方法接受并处理请求:
    DubboProtocol.requestHandler:
    public void received(Channel channel, Object message) throws RemotingException {
    if (message instanceof Invocation) {
    // 请求处理应答
    reply((ExchangeChannel) channel, message);
    } else {
    super.received(channel, message);
    }
    }
    到这里,provider的调用流程就分析完了。
    
    
     
    
    
  • 相关阅读:
    python os.path.dirname()
    python os.path.basename()方法
    python mmap对象
    python 读取二进制数据到可变缓冲区中
    sklearn常见分类器的效果比较
    用matplotlib获取雅虎股票数据并作图
    使用 lxml 中的 xpath 高效提取文本与标签属性值
    如何用 Python 爬取需要登录的网站
    python 线程及线程池
    使用Python代码处理Excel
  • 原文地址:https://www.cnblogs.com/lanblogs/p/15262240.html
Copyright © 2011-2022 走看看