zoukankan      html  css  js  c++  java
  • rocketmq netty底层设计

    rocketmq底层网络使用的netty框架,类图如下

      RecketMQ通信模块的顶层结构是RemotingServer和RemotingClient,分别对应通信的服务端和客户端

    首先看看RemotingServer

     1 public interface RemotingServer extends RemotingService {
     2 
     3     void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
     4         final ExecutorService executor);
     5 
     6     void registerDefaultProcessor(final NettyRequestProcessor processor, final ExecutorService executor);
     7 
     8     int localListenPort();
     9 
    10     Pair<NettyRequestProcessor, ExecutorService> getProcessorPair(final int requestCode);
    11 
    12     RemotingCommand invokeSync(final Channel channel, final RemotingCommand request,
    13         final long timeoutMillis) throws InterruptedException, RemotingSendRequestException,
    14         RemotingTimeoutException;
    15 
    16     void invokeAsync(final Channel channel, final RemotingCommand request, final long timeoutMillis,
    17         final InvokeCallback invokeCallback) throws InterruptedException,
    18         RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;
    19 
    20     void invokeOneway(final Channel channel, final RemotingCommand request, final long timeoutMillis)
    21         throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException,
    22         RemotingSendRequestException;
    23 
    24 }
    RemotingServer

      RemotingServer类中比较重要的是:localListenPort、registerProcessor和registerDefaultProcessor,

    registerDefaultProcesor用来设置接收到消息后的处理方法。

      RemotingClient类和RemotingServer类相对应,比较重要的方法是updateNameServerAddressList、

    invokeSync和invokeOneway,updateNameServerAddresList用来获取有效的NameServer地址,invoke-

    Sync与invokeOneway用来向Server端发送请求,如下。

     1 public interface RemotingClient extends RemotingService {
     2 
     3     void updateNameServerAddressList(final List<String> addrs);
     4 
     5     List<String> getNameServerAddressList();
     6 
     7     RemotingCommand invokeSync(final String addr, final RemotingCommand request,
     8         final long timeoutMillis) throws InterruptedException, RemotingConnectException,
     9         RemotingSendRequestException, RemotingTimeoutException;
    10 
    11     void invokeAsync(final String addr, final RemotingCommand request, final long timeoutMillis,
    12         final InvokeCallback invokeCallback) throws InterruptedException, RemotingConnectException,
    13         RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;
    14 
    15     void invokeOneway(final String addr, final RemotingCommand request, final long timeoutMillis)
    16         throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException,
    17         RemotingTimeoutException, RemotingSendRequestException;
    18 
    19     void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
    20         final ExecutorService executor);
    21 
    22     void setCallbackExecutor(final ExecutorService callbackExecutor);
    23 
    24     ExecutorService getCallbackExecutor();
    25 
    26     boolean isChannelWritable(final String addr);
    27 }
    RemotingClient

    二、自定义协议

      NettyRemotingServer和NettyRemotingClient分别实现了RemotingServer和RemotingClient这两个接

    口,但它们有很多共有的内容,比如invokeSync、invokeOneway等,所以这些共有函数被提取到NettyRe-

    motingAbstract共同继承的父类中。首先来分析一下在NettyRemotingAbstract中是如何处理接收到的内容

    的,如下。

     1    public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
     2         final RemotingCommand cmd = msg;
     3         if (cmd != null) {
     4             switch (cmd.getType()) {
     5                 case REQUEST_COMMAND:
     6                     processRequestCommand(ctx, cmd);
     7                     break;
     8                 case RESPONSE_COMMAND:
     9                     processResponseCommand(ctx, cmd);
    10                     break;
    11                 default:
    12                     break;
    13             }
    14         }
    15     }
     1     public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
     2         final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
     3         final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
     4         final int opaque = cmd.getOpaque();
     5 
     6         if (pair != null) {
     7             Runnable run = new Runnable() {
     8                 @Override
     9                 public void run() {
    10                     try {
    11                         RPCHook rpcHook = NettyRemotingAbstract.this.getRPCHook();
    12                         if (rpcHook != null) {
    13                             rpcHook.doBeforeRequest(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
    14                         }
    15 
    16                         final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);
    17                         if (rpcHook != null) {
    18                             rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
    19                         }
    20 
    21                         if (!cmd.isOnewayRPC()) {
    22                             if (response != null) {
    23                                 response.setOpaque(opaque);
    24                                 response.markResponseType();
    25                                 try {
    26                                     ctx.writeAndFlush(response);
    27                                 } catch (Throwable e) {
    28                                     log.error("process request over, but response failed", e);
    29                                     log.error(cmd.toString());
    30                                     log.error(response.toString());
    31                                 }
    32                             } else {
    33 
    34                             }
    35                         }
    36                     } catch (Throwable e) {
    37                         log.error("process request exception", e);
    38                         log.error(cmd.toString());
    39 
    40                         if (!cmd.isOnewayRPC()) {
    41                             final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,
    42                                 RemotingHelper.exceptionSimpleDesc(e));
    43                             response.setOpaque(opaque);
    44                             ctx.writeAndFlush(response);
    45                         }
    46                     }
    47                 }
    48             };
    49 
    50             if (pair.getObject1().rejectRequest()) {
    51                 final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
    52                     "[REJECTREQUEST]system busy, start flow control for a while");
    53                 response.setOpaque(opaque);
    54                 ctx.writeAndFlush(response);
    55                 return;
    56             }
    57 
    58             try {
    59                 final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
    60                 pair.getObject2().submit(requestTask);
    61             } catch (RejectedExecutionException e) {
    62                 if ((System.currentTimeMillis() % 10000) == 0) {
    63                     log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())
    64                         + ", too many requests and system thread pool busy, RejectedExecutionException "
    65                         + pair.getObject2().toString()
    66                         + " request code: " + cmd.getCode());
    67                 }
    68 
    69                 if (!cmd.isOnewayRPC()) {
    70                     final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
    71                         "[OVERLOAD]system busy, start flow control for a while");
    72                     response.setOpaque(opaque);
    73                     ctx.writeAndFlush(response);
    74                 }
    75             }
    76         } else {
    77             String error = " request type " + cmd.getCode() + " not supported";
    78             final RemotingCommand response =
    79                 RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
    80             response.setOpaque(opaque);
    81             ctx.writeAndFlush(response);
    82             log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
    83         }
    84     }
    processRequestCommand
     1    public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
     2         final int opaque = cmd.getOpaque();
     3         final ResponseFuture responseFuture = responseTable.get(opaque);
     4         if (responseFuture != null) {
     5             responseFuture.setResponseCommand(cmd);
     6 
     7             responseTable.remove(opaque);
     8 
     9             if (responseFuture.getInvokeCallback() != null) {
    10                 executeInvokeCallback(responseFuture);
    11             } else {
    12                 responseFuture.putResponse(cmd);
    13                 responseFuture.release();
    14             }
    15         } else {
    16             log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
    17             log.warn(cmd.toString());
    18         }
    19     }
    processResponseCommand

      无论是服务端还是客户端都需要处理接收到的请求,处理方法由processMessageReceived定义,

    注意这里接收到的消息已经被转换成RemotingCommand了,而不是原始的字节流。

      RemotingCommand是RocketMQ自定义的协议,具体格式如下

      这个协议只有四部分,但是覆盖了RocketMQ各个角色间几乎所有的通信过程,RemotingCommand有

    实际的数据类型和各部分对应,如下所示。

     1     private int code;
     2     private LanguageCode language = LanguageCode.JAVA;
     3     private int version = 0;
     4     private int opaque = requestId.getAndIncrement();
     5     private int flag = 0;
     6     private String remark;
     7     private HashMap<String, String> extFields;
     8     private transient CommandCustomHeader customHeader;
     9 
    10     private SerializeType serializeTypeCurrentRPC = serializeTypeConfigInThisServer;
    11 
    12     private transient byte[] body;

      RocketMQ各个组件间的通信需要频繁地在字节码和RemotingCommand间相互转换,也就是编码、

    解码过程,好在Netty提供了codec支持,这个频繁地操作只需要一行设置即可:pipeline().addLoast(new

    NettyEncoder(), now NettyDecoder() )

      RocketMQ对通信过程的另一个抽象是Processor和Executor,当接收到一个消息后,直接根据消息的类

    型调用对应的Processor和Executor,把通信过程和业务逻辑分离开来。通过一个Broker中的代码段来看看

    注册Processor的过程

     1    public void registerProcessor() {
     2         /**
     3          * SendMessageProcessor
     4          */
     5         SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
     6         sendProcessor.registerSendMessageHook(sendMessageHookList);
     7         sendProcessor.registerConsumeMessageHook(consumeMessageHookList);
     8 
     9         this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
    10         this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
    11         this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
    12         this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
    13         this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
    14         this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
    15         this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
    16         this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
    17         /**
    18          * PullMessageProcessor
    19          */
    20         this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);
    21         this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);
    22 
    23         /**
    24          * QueryMessageProcessor
    25          */
    26         NettyRequestProcessor queryProcessor = new QueryMessageProcessor(this);
    27         this.remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);
    28         this.remotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);
    29 
    30         this.fastRemotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);
    31         this.fastRemotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);
    32 
    33         /**
    34          * ClientManageProcessor
    35          */
    36         ClientManageProcessor clientProcessor = new ClientManageProcessor(this);
    37         this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);
    38         this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);
    39         this.remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);
    40 
    41         this.fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);
    42         this.fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);
    43         this.fastRemotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);
    44 
    45         /**
    46          * ConsumerManageProcessor
    47          */
    48         ConsumerManageProcessor consumerManageProcessor = new ConsumerManageProcessor(this);
    49         this.remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);
    50         this.remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
    51         this.remotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
    52 
    53         this.fastRemotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);
    54         this.fastRemotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
    55         this.fastRemotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
    56 
    57         /**
    58          * EndTransactionProcessor
    59          */
    60         this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.sendMessageExecutor);
    61         this.fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.sendMessageExecutor);
    62 
    63         /**
    64          * Default
    65          */
    66         AdminBrokerProcessor adminProcessor = new AdminBrokerProcessor(this);
    67         this.remotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);
    68         this.fastRemotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);
    69     }
    BrokerController
  • 相关阅读:
    Unstar A File:
    star 一个文件
    列出大星号文件
    删除用户
    curl -d
    创建用户
    Check Account Info
    List Accounts
    认证ping
    python 解接口返回的json字符串
  • 原文地址:https://www.cnblogs.com/toUpdating/p/10055801.html
Copyright © 2011-2022 走看看