ContextFilter
/**
* ContextInvokerFilter
* 用于服务提供者 排序第一
*/
@Activate(group = Constants.PROVIDER, order = -10000)
public class ContextFilter implements Filter {
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
Map<String, String> attachments = invocation.getAttachments();
// 创建新的 `attachments` 集合,清理公用的隐式参数
if (attachments != null) {
attachments = new HashMap<String, String>(attachments);
attachments.remove(Constants.PATH_KEY);
attachments.remove(Constants.GROUP_KEY);
attachments.remove(Constants.VERSION_KEY);
attachments.remove(Constants.DUBBO_VERSION_KEY);
attachments.remove(Constants.TOKEN_KEY);
attachments.remove(Constants.TIMEOUT_KEY);
attachments.remove(Constants.ASYNC_KEY);// Remove async property to avoid being passed to the following invoke chain.
}
//设置到Context 线程缓存 存储了服务和调用方的相关信息
RpcContext.getContext()
.setInvoker(invoker)
.setInvocation(invocation)
// .setAttachments(attachments) // merged from dubbox
.setLocalAddress(invoker.getUrl().getHost(),
invoker.getUrl().getPort());
// mreged from dubbox
// we may already added some attachments into RpcContext before this filter (e.g. in rest protocol)
if (attachments != null) {
//避免在此之前 我们已经初始化了attachment 比如我们自定义过滤器 为-1
if (RpcContext.getContext().getAttachments() != null) {
RpcContext.getContext().getAttachments().putAll(attachments);
} else {
RpcContext.getContext().setAttachments(attachments);
}
}
//为RpcInvocation设置Invoker
if (invocation instanceof RpcInvocation) {
((RpcInvocation) invocation).setInvoker(invoker);
}
try {
//执行
RpcResult result = (RpcResult) invoker.invoke(invocation);
// pass attachments to result 返回结果增加入参的attatchments
result.addAttachments(RpcContext.getServerContext().getAttachments());
return result;
} finally {
//还原RPCContext 使用线程此技术 所以每次完毕还原
RpcContext.removeContext();
RpcContext.getServerContext().clearAttachments();
}
}
}
ConsumerContextFilter
/**
* ConsumerContextInvokerFilter
*应用于服务消费者
*/
@Activate(group = Constants.CONSUMER, order = -10000)
public class ConsumerContextFilter implements Filter {
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
//设置到上线文 同一个线程有效
RpcContext.getContext()
.setInvoker(invoker)
.setInvocation(invocation)
.setLocalAddress(NetUtils.getLocalHost(), 0)
.setRemoteAddress(invoker.getUrl().getHost(),
invoker.getUrl().getPort());
if (invocation instanceof RpcInvocation) {
((RpcInvocation) invocation).setInvoker(invoker);
}
try {
RpcResult result = (RpcResult) invoker.invoke(invocation);
RpcContext.getServerContext().setAttachments(result.getAttachments());
return result;
} finally {
//清空
RpcContext.getContext().clearAttachments();
}
}
}