java 中的对象,请求和响应都是一个 RemotingCommand 对象。
RemotingCommand
code 请求码,请求码/响应码
CommandCustomHeader 对应的请求头
byte[] 请求体,非必需
如下是一个 RegisterBrokerRequestHeader 对应的 RemotingCommand,RegisterBrokerRequestHeader 对象中的属性会被放入到 extFields 中
// org.apache.rocketmq.remoting.protocol.RemotingCommand#makeCustomHeaderToNet { "code": 103, "extFields": { "brokerId": "0", "bodyCrc32": "546992350", "clusterName": "DefaultCluster", "brokerAddr": "172.18.232.137:10911", "haServerAddr": "172.18.232.137:10912", "compressed": "false", "brokerName": "broker-a" }, "flag": 0, "language": "JAVA", "opaque": 42, "serializeTypeCurrentRPC": "JSON", "version": 275 }
请求响应对
Register Broker request
Register Broker response
对象序列化
header:使用 fastjson 转化为 json,然后 getByte
body:业务自行序列化
RemotingCommand
head length, head data, body data
网络发送
tcp 包
total length, head length, head data, body data
// 编码成 tcp public class NettyEncoder extends MessageToByteEncoder<RemotingCommand>
// 对 tcp 包进行解码 public class NettyDecoder extends LengthFieldBasedFrameDecoder
请求和响应对
/** * This map caches all on-going requests. * opaque 是请求 id,AtomicInteger 持续增加 */ protected final ConcurrentMap<Integer /* opaque */, ResponseFuture> responseTable = new ConcurrentHashMap<Integer, ResponseFuture>(256); // 同时有定时任务,扫描过期的请求 NettyRemotingAbstract#scanResponseTable
请求的发送,分为同步/异步/oneway
同步和异步,需要服务端的响应,使用请求 id 来把请求和响应对应起来
NettyRemotingAbstract#invokeAsyncImpl
NettyRemotingAbstract#invokeSyncImpl
异步和 oneway,在极端情况下,会导致 on-going 的请求太多,导致 OOM,因此需要设限,使用的是 Semaphore