zoukankan      html  css  js  c++  java
  • dubbo

    dubbo原理结构

    组件角色:

    Registry注册中心

      通过将服务统一管理起来,可以有效地优化内部应用对服务发布/使用的流程和管理。服务注册中心可以通过特定协议来完成服务对外的统一。Dubbo提供的注册中心有如下几种类型可供选择:

    • Multicast注册中心
    • Zookeeper注册中心
    • Redis注册中心
    • Simple注册中心

    Provider服务提供者:

      提供具体服务的提供方。

    Consumer消费者:

      服务的消费方。

    Monitor监控中心:

      统计服务的调用次调和调用时间的监控中心。
    Container: 服务运行容器。

    具体的框架

    Dubbo缺省协议采用单一长连接和NIO异步通讯,适合于小数据量大并发的服务调用,以及服务消费者机器数远大于服务提供者机器数的情况。

    Dubbo缺省协议,使用基于mina1.1.7+hessian3.2.1的tbremoting交互。

    • 连接个数:单连接
    • 连接方式:长连接
    • 传输协议:TCP
    • 传输方式:NIO异步传输
    • 序列化:Hessian二进制序列化
    • 适用范围:传入传出参数数据包较小(建议小于100K),消费者比提供者个数多,单一消费者无法压满提供者,尽量不要用dubbo协议传输大文件或超大字符串。
    • 适用场景:常规远程服务方法调用

    主要原理就是消费方通过服务方提供接口的动态代理实现远程调用,动态代理中将接口名称、方法名称、参数、进行打包,将其序列化再通过底层网络通信组件mina/netty发送到服务方。服务方将请求的结果返回给服务端。

    大致步骤如下:

    1. client一个线程调用远程接口,生成一个唯一的ID(比如一段随机字符串,UUID等),Dubbo是使用AtomicLong从0开始累计数字的
    2. 将打包的方法调用信息(如调用的接口名称,方法名称,参数值列表等),和处理结果的回调对象callback,全部封装在一起,组成一个对象object
    3. 向专门存放调用信息的全局ConcurrentHashMap里面put(ID, object)
    4. ID和打包的方法调用信息封装成一对象connRequest,使用IoSession.write(connRequest)异步发送出去
    5. 当前线程再使用callback的get()方法试图获取远程返回的结果,在get()内部,则使用synchronized获取回调对象callback的锁, 再先检测是否已经获取到结果,如果没有,然后调用callback的wait()方法,释放callback上的锁,让当前线程处于等待状态。
    6. 服务端接收到请求并处理后,将结果(此结果中包含了前面的ID,即回传)发送给客户端,客户端socket连接上专门监听消息的线程收到消息,分析结果,取到ID,再从前面的ConcurrentHashMap里面get(ID),从而找到callback,将方法调用结果设置到callback对象里。
    7. 监听线程接着使用synchronized获取回调对象callback的锁(因为前面调用过wait(),那个线程已释放callback的锁了),再notifyAll(),唤醒前面处于等待状态的线程继续执行(callback的get()方法继续执行就能拿到调用结果了),至此,整个过程结束。

    思考:

    • 问:当前线程怎么让它“暂停”,等结果回来后,再向后执行?

         答:先生成一个对象obj,在一个全局map里put(ID,obj)存放起来,再用synchronized获取obj锁,再调用obj.wait()让当前线程处于等待状态,然后另一消息监听线程等到服务端结果来了后,再map.get(ID)找到obj,再用synchronized获取obj锁,再调用obj.notifyAll()唤醒前面处于等待状态的线程。

    • 问:正如前面所说,Socket通信是一个全双工的方式,如果有多个线程同时进行远程方法调用,这时建立在client server之间的socket连接上会有很多双方发送的消息传递,前后顺序也可能是乱七八糟的,server处理完结果后,将结果消息发送给client,client收到很多消息,怎么知道哪个消息结果是原先哪个线程调用的?
         答:使用一个ID,让其唯一,然后传递给服务端,再服务端又回传回来,这样就知道结果是原先哪个线程的了。

    关于回调对象

    客户端代码:

     1 //同步调用远程接口
     2 public Object invokeWithSync(Object appRequest, RequestControl control) throws RemotingException, InterruptedException {
     3         byte protocol = getProtocol(control);
     4         if (!TRConstants.isValidProtocol(protocol)) {
     5             throw new RemotingException("Invalid serialization protocol [" + protocol + "] on invokeWithSync.");
     6         }
     7         ResponseFuture future = invokeWithFuture(appRequest, control);
     8         return future.get();  //获取结果时让当前线程等待,ResponseFuture其实就是前面说的callback
     9 }
    10 public ResponseFuture invokeWithFuture(Object appRequest, RequestControl control) {
    11          byte protocol = getProtocol(control);
    12          long timeout = getTimeout(control);
    13          ConnectionRequest request = new ConnectionRequest(appRequest);
    14          request.setSerializeProtocol(protocol);
    15          Callback2FutureAdapter adapter = new Callback2FutureAdapter(request);
    16          connection.sendRequestWithCallback(request, adapter, timeout);
    17          return adapter;
    18 }
     1 Callback2FutureAdapter implements ResponseFuture
     2 public Object get() throws RemotingException, InterruptedException {
     3     synchronized (this) {  // 旋锁
     4         while (!isDone) {  // 是否有结果了
     5             wait(); //没结果是释放锁,让当前线程处于等待状态
     6         }
     7     }
     8     if (errorCode == TRConstants.RESULT_TIMEOUT) {
     9          throw new TimeoutException("Wait response timeout, request["
    10          + connectionRequest.getAppRequest() + "].");
    11     }
    12     else if (errorCode > 0) {
    13         throw new RemotingException(errorMsg);
    14     }
    15     else {
    16          return appResp;
    17     }
    18 }
    19 客户端收到服务端结果后,回调时相关方法,即设置isDone = true并notifyAll()
    20 public void handleResponse(Object _appResponse) {
    21          appResp = _appResponse; //将远程调用结果设置到callback中来
    22          setDone();
    23 }
    24 public void onRemotingException(int _errorType, String _errorMsg) {
    25          errorCode = _errorType;
    26          errorMsg = _errorMsg;
    27          setDone();
    28 }
    29 private void setDone() {
    30          isDone = true;
    31          synchronized (this) { //获取锁,因为前面wait()已经释放了callback的锁了
    32              notifyAll(); // 唤醒处于等待的线程
    33          }
    34 }

    通信部分源码

     1  
     2 // 用来存放请求和回调的MAP
     3 private final ConcurrentHashMap<Long, Object[]> requestResidents;
     4  
     5 //发送消息出去
     6 void sendRequestWithCallback(ConnectionRequest connRequest, ResponseCallback callback, long timeoutMs) {
     7          long requestId = connRequest.getId();
     8          long waitBegin = System.currentTimeMillis();
     9          long waitEnd = waitBegin + timeoutMs;
    10          Object[] queue = new Object[4];
    11          int idx = 0;
    12          queue[idx++] = waitEnd;
    13          queue[idx++] = waitBegin;   //用于记录日志
    14          queue[idx++] = connRequest; //用于记录日志
    15          queue[idx++] = callback;
    16          requestResidents.put(requestId, queue); // 记录响应队列
    17          write(connRequest);
    18  
    19          // 埋点记录等待响应的Map的大小
    20          StatLog.addStat("TBRemoting-ResponseQueues", "size", requestResidents.size(),
    21                    1L);
    22 }
    23 public void write(final Object connectionMsg) {
    24 //mina里的IoSession.write()发送消息
    25          WriteFuture writeFuture = ioSession.write(connectionMsg);
    26          // 注册FutureListener,当请求发送失败后,能够立即做出响应
    27          writeFuture.addListener(new MsgWrittenListener(this, connectionMsg));
    28 }
    29  
    30 /**
    31 * 在得到响应后,删除对应的请求队列,并执行回调
    32 * 调用者:MINA线程
    33 */
    34 public void putResponse(final ConnectionResponse connResp) {
    35          final long requestId = connResp.getRequestId();
    36          Object[] queue = requestResidents.remove(requestId);
    37          if (null == queue) {
    38              Object appResp = connResp.getAppResponse();
    39              String appRespClazz = (null == appResp) ? "null" : appResp.getClass().getName();
    40              StringBuilder sb = new StringBuilder();
    41              sb.append("Not found response receiver for requestId=[").append(requestId).append("],");
    42              sb.append("from [").append(connResp.getHost()).append("],");
    43              sb.append("response type [").append(appRespClazz).append("].");
    44              LOGGER.warn(sb.toString());
    45              return;
    46          }
    47          int idx = 0;
    48          idx++;
    49          long waitBegin = (Long) queue[idx++];
    50          ConnectionRequest connRequest = (ConnectionRequest) queue[idx++];
    51          ResponseCallback callback = (ResponseCallback) queue[idx++];
    52          // ** 把回调任务交给业务提供的线程池执行 **
    53          Executor callbackExecutor = callback.getExecutor();
    54          callbackExecutor.execute(new CallbackExecutorTask(connResp, callback));
    55  
    56          long duration = System.currentTimeMillis() - waitBegin; // 实际读响应时间
    57          logIfResponseError(connResp, duration, connRequest.getAppRequest());
    58 }
  • 相关阅读:
    对websoceket进行压力测试(一)
    学习springboot的一个网站
    重装mysql数据库
    websocket扫盲:基础知识(二)
    json-lib 之jsonConfig详细使用
    hibernate的like用法(用占位符解决)
    【转载】hibernate查询参数绑定
    Struts2 Anotation action
    PLSQL怎样导出oracle表结构
    从命令行启动oracle服务
  • 原文地址:https://www.cnblogs.com/parkin/p/7653068.html
Copyright © 2011-2022 走看看