zoukankan      html  css  js  c++  java
  • Dubbo源码解析(五)之registry注册中心

    正文
    注册中心是Dubbo的重要组成部分,主要用于服务的注册与发现,我们可以选择Redis、数据库、Zookeeper作为Dubbo的注册中心,Dubbo推荐用户使用Zookeeper作为注册中心,在provider和consumer的初始化过程中,我们看到了dubbo通过调用RegistryFactory的getRegistry方法来获取注册中心实例,我们就以这个方法作为入口来分析注册中心的相关流程:
    AbstractRegistryFactory:

    public Registry getRegistry(URL url) {
    url = url.setPath(RegistryService.class.getName())
    .addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
    .removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);
    String key = url.toServiceString();
    // 锁定注册中心的访问过程以确保注册中心的的单个实例
    LOCK.lock();
    try {
    Registry registry = REGISTRIES.get(key); // 缓存
    if (registry != null) {
    return registry;
    }
    registry = createRegistry(url); /* 创建注册中心实例 */
    if (registry == null) {
    throw new IllegalStateException("Can not create registry " + url);
    }
    REGISTRIES.put(key, registry);
    return registry;
    } finally {
    LOCK.unlock();
    }
    }
    ZookeeperRegistryFactory:
    public Registry createRegistry(URL url) {
    /* 创建ZookeeperRegistry实例 */
    return new ZookeeperRegistry(url, zookeeperTransporter);
    }
    ZookeeperRegistry:
    public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
    super(url); /* 调用父类构造方法 */
    if (url.isAnyHost()) {
    throw new IllegalStateException("registry address == null");
    }
    String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
    if (!group.startsWith(Constants.PATH_SEPARATOR)) {
    group = Constants.PATH_SEPARATOR + group;
    }
    this.root = group;
    /* 连接zookeeper */
    zkClient = zookeeperTransporter.connect(url);
    // 添加监听器
    zkClient.addStateListener(new StateListener() {
    public void stateChanged(int state) {
    if (state == RECONNECTED) {
    try {
    /* 如果监听到的状态是重连,做恢复操作 */
    recover();
    } catch (Exception e) {
    logger.error(e.getMessage(), e);
    }
    }
    }
    });
    }
    FailbackRegistry:
    public FailbackRegistry(URL url) {
    /* 调用父类构造方法 */
    super(url);
    // 重试间隔,默认5000ms
    int retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
    this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() {
    public void run() {
    try {
    retry(); /* ScheduledExecutorService固定间隔重试 */
    } catch (Throwable t) {
    logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t);
    }
    }
    }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);
    }
    AbstractRegistry:
    public AbstractRegistry(URL url) {
    setUrl(url);
    // 是否同步保存文件,默认false
    syncSaveFile = url.getParameter(Constants.REGISTRY_FILESAVE_SYNC_KEY, false);
    // 配置缓存文件,默认在用户根目录/.duboo文件夹下
    String filename = url.getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getParameter(Constants.APPLICATION_KEY) + "-" + url.getAddress() + ".cache");
    File file = null;
    if (ConfigUtils.isNotEmpty(filename)) {
    file = new File(filename);
    if (!file.exists() && file.getParentFile() != null && !file.getParentFile().exists()) {
    if (!file.getParentFile().mkdirs()) {
    throw new IllegalArgumentException("Invalid registry store file " + file + ", cause: Failed to create directory " + file.getParentFile() + "!");
    }
    }
    }
    this.file = file;
    loadProperties(); // 将缓存文件加载成为Properties
    notify(url.getBackupUrls()); // 通知更新配置
    }

    这里提到了缓存文件,简单来说它就是用来做容灾的,consumer从注册中心订阅了provider等信息后会缓存到本地文件中,这样当注册中心不可用时,consumer启动时就可以加载这个缓存文件里面的内容与provider进行交互,这样就可以不依赖注册中心,但是无法获取到新的provider变更通知,所以如果provider信息在注册中心不可用这段时间发生了很大变化,那就很可能会出现服务无法调用的情况,在2.5.7(记得是这个版本)版本之前,dubbo有一个bug,如果启动时注册中心连接不上,启动程序会hang住,无法启动,所以在2.5.7版本之前这个文件是没用的,在2.5.7版本进行了修复。下面我们来看FailbackRegistry用ScheduledExecutorService重试什么东西:
    FailbackRegistry:

    protected void retry() {
    // 失败注册的重试
    if (!failedRegistered.isEmpty()) {
    Set<URL> failed = new HashSet<URL>(failedRegistered);
    if (failed.size() > 0) {
    if (logger.isInfoEnabled()) {
    logger.info("Retry register " + failed);
    }
    try {
    for (URL url : failed) {
    try {
    doRegister(url);
    failedRegistered.remove(url);
    } catch (Throwable t) {
    logger.warn("Failed to retry register " + failed + ", waiting for again, cause: " + t.getMessage(), t);
    }
    }
    } catch (Throwable t) {
    logger.warn("Failed to retry register " + failed + ", waiting for again, cause: " + t.getMessage(), t);
    }
    }
    }
    // 失败注销的重试
    if (!failedUnregistered.isEmpty()) {
    Set<URL> failed = new HashSet<URL>(failedUnregistered);
    if (failed.size() > 0) {
    if (logger.isInfoEnabled()) {
    logger.info("Retry unregister " + failed);
    }
    try {
    for (URL url : failed) {
    try {
    doUnregister(url);
    failedUnregistered.remove(url);
    } catch (Throwable t) {
    logger.warn("Failed to retry unregister " + failed + ", waiting for again, cause: " + t.getMessage(), t);
    }
    }
    } catch (Throwable t) {
    logger.warn("Failed to retry unregister " + failed + ", waiting for again, cause: " + t.getMessage(), t);
    }
    }
    }
    // 失败订阅的重试
    if (!failedSubscribed.isEmpty()) {
    Map<URL, Set<NotifyListener>> failed = new HashMap<URL, Set<NotifyListener>>(failedSubscribed);
    for (Map.Entry<URL, Set<NotifyListener>> entry : new HashMap<URL, Set<NotifyListener>>(failed).entrySet()) {
    if (entry.getValue() == null || entry.getValue().size() == 0) {
    failed.remove(entry.getKey());
    }
    }
    if (failed.size() > 0) {
    if (logger.isInfoEnabled()) {
    logger.info("Retry subscribe " + failed);
    }
    try {
    for (Map.Entry<URL, Set<NotifyListener>> entry : failed.entrySet()) {
    URL url = entry.getKey();
    Set<NotifyListener> listeners = entry.getValue();
    for (NotifyListener listener : listeners) {
    try {
    doSubscribe(url, listener);
    listeners.remove(listener);
    } catch (Throwable t) {
    logger.warn("Failed to retry subscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t);
    }
    }
    }
    } catch (Throwable t) {
    logger.warn("Failed to retry subscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t);
    }
    }
    }
    // 失败退订的重试
    if (!failedUnsubscribed.isEmpty()) {
    Map<URL, Set<NotifyListener>> failed = new HashMap<URL, Set<NotifyListener>>(failedUnsubscribed);
    for (Map.Entry<URL, Set<NotifyListener>> entry : new HashMap<URL, Set<NotifyListener>>(failed).entrySet()) {
    if (entry.getValue() == null || entry.getValue().size() == 0) {
    failed.remove(entry.getKey());
    }
    }
    if (failed.size() > 0) {
    if (logger.isInfoEnabled()) {
    logger.info("Retry unsubscribe " + failed);
    }
    try {
    for (Map.Entry<URL, Set<NotifyListener>> entry : failed.entrySet()) {
    URL url = entry.getKey();
    Set<NotifyListener> listeners = entry.getValue();
    for (NotifyListener listener : listeners) {
    try {
    doUnsubscribe(url, listener);
    listeners.remove(listener);
    } catch (Throwable t) {
    logger.warn("Failed to retry unsubscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t);
    }
    }
    }
    } catch (Throwable t) {
    logger.warn("Failed to retry unsubscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t);
    }
    }
    }
    // 失败通知的重试
    if (!failedNotified.isEmpty()) {
    Map<URL, Map<NotifyListener, List<URL>>> failed = new HashMap<URL, Map<NotifyListener, List<URL>>>(failedNotified);
    for (Map.Entry<URL, Map<NotifyListener, List<URL>>> entry : new HashMap<URL, Map<NotifyListener, List<URL>>>(failed).entrySet()) {
    if (entry.getValue() == null || entry.getValue().size() == 0) {
    failed.remove(entry.getKey());
    }
    }
    if (failed.size() > 0) {
    if (logger.isInfoEnabled()) {
    logger.info("Retry notify " + failed);
    }
    try {
    for (Map<NotifyListener, List<URL>> values : failed.values()) {
    for (Map.Entry<NotifyListener, List<URL>> entry : values.entrySet()) {
    try {
    NotifyListener listener = entry.getKey();
    List<URL> urls = entry.getValue();
    listener.notify(urls);
    values.remove(listener);
    } catch (Throwable t) {
    logger.warn("Failed to retry notify " + failed + ", waiting for again, cause: " + t.getMessage(), t);
    }
    }
    }
    } catch (Throwable t) {
    logger.warn("Failed to retry notify " + failed + ", waiting for again, cause: " + t.getMessage(), t);
    }
    }
    }
    }

    我们看到,重试就是对注册、订阅等各个失败的操作进行重试,dubbo会在这些动作失败时将失败的记录存入集合或map中,这里会取出这些记录进行重试。下面我们来看zookeeper的链接,可以选择使用ZkClient或curator来连接zookeeper,默认为ZkClient:
    ZkclientZookeeperTransporter:

    public ZookeeperClient connect(URL url) {
    /* 构建ZkclientZookeeperClient */
    return new ZkclientZookeeperClient(url);
    }
    ZkclientZookeeperClient:
    public ZkclientZookeeperClient(URL url) {
    super(url);
    /* 构建ZkClientWrapper,可以在连接超时后自动监控连接的状态 */
    client = new ZkClientWrapper(url.getBackupAddress(), 30000);
    // 添加监听器,用于连接状态变更通知监听器
    client.addListener(new IZkStateListener() {
    public void handleStateChanged(KeeperState state) throws Exception {
    ZkclientZookeeperClient.this.state = state;
    if (state == KeeperState.Disconnected) {
    stateChanged(StateListener.DISCONNECTED);
    } else if (state == KeeperState.SyncConnected) {
    stateChanged(StateListener.CONNECTED);
    }
    }
    public void handleNewSession() throws Exception {
    stateChanged(StateListener.RECONNECTED);
    }
    });
    client.start(); /* 开启线程,连接zookeeper */
    }
    ZkClientWrapper:
    public ZkClientWrapper(final String serverAddr, long timeout) {
    this.timeout = timeout;
    // 创建任务创建ZkClient
    listenableFutureTask = ListenableFutureTask.create(new Callable<ZkClient>() {
    @Override
    public ZkClient call() throws Exception {
    return new ZkClient(serverAddr, Integer.MAX_VALUE);
    }
    });
    }
    ZkClientWrapper:
    public void start() {
    if (!started) {
    Thread connectThread = new Thread(listenableFutureTask);
    connectThread.setName("DubboZkclientConnector");
    connectThread.setDaemon(true);
    connectThread.start(); // 创建线程执行创建ZkClient任务
    try {
    client = listenableFutureTask.get(timeout, TimeUnit.MILLISECONDS);
    } catch (Throwable t) {
    logger.error("Timeout! zookeeper server can not be connected in : " + timeout + "ms!", t);
    }
    started = true;
    } else {
    logger.warn("Zkclient has already been started!");
    }
    }
    创建ZkClient之后接下来添加添加了一个连接状态变更监听器,目的是在重连时做恢复操作:
    FailbackRegistry:
    protected void recover() throws Exception {
    Set<URL> recoverRegistered = new HashSet<URL>(getRegistered());
    if (!recoverRegistered.isEmpty()) {
    if (logger.isInfoEnabled()) {
    logger.info("Recover register url " + recoverRegistered);
    }
    for (URL url : recoverRegistered) {
    failedRegistered.add(url); // 将注册相关信息添加到失败注册集合中
    }
    }
    Map<URL, Set<NotifyListener>> recoverSubscribed = new HashMap<URL, Set<NotifyListener>>(getSubscribed());
    if (!recoverSubscribed.isEmpty()) {
    if (logger.isInfoEnabled()) {
    logger.info("Recover subscribe url " + recoverSubscribed.keySet());
    }
    for (Map.Entry<URL, Set<NotifyListener>> entry : recoverSubscribed.entrySet()) {
    URL url = entry.getKey();
    for (NotifyListener listener : entry.getValue()) {
    // 将订阅相关信息添加到失败订阅map中
    addFailedSubscribed(url, listener);
    }
    }
    }
    }
    我们看到,恢复操作其实就是将注册和订阅的信息保存起来,我们之前看到的重试流程会拉取这些信息进行重试。接下来就是将服务信息注册到注册中心:
    FailbackRegistry:
    public void register(URL url) {
    if (destroyed.get()){
    return;
    }
    super.register(url); // 添加到已注册服务集合中
    failedRegistered.remove(url); // 从失败的注册集合中移除
    failedUnregistered.remove(url); // 从失败的注销集合中移除
    try {
    doRegister(url); /* 发起注册请求 */
    } catch (Exception e) {
    Throwable t = e;
    // 如果启动检测被打开,则直接抛出异常
    boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
    && url.getParameter(Constants.CHECK_KEY, true)
    && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
    boolean skipFailback = t instanceof SkipFailbackWrapperException;
    if (check || skipFailback) {
    if (skipFailback) {
    t = t.getCause();
    }
    throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
    } else {
    logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
    }
    // 将失败的注册请求记录到失败的列表中,定期重试
    failedRegistered.add(url);
    }
    }
    ZookeeperRegistry:
    protected void doRegister(URL url) {
    try {
    /* 创建zookeeper节点 */
    zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
    } catch (Throwable e) {
    throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
    }
    }
    AbstractZookeeperClient:
    public void create(String path, boolean ephemeral) {
    int i = path.lastIndexOf('/');
    if (i > 0) {
    String parentPath = path.substring(0, i);
    if (!checkExists(parentPath)) {
    // 递归创建父节点
    create(parentPath, false);
    }
    }
    if (ephemeral) {
    createEphemeral(path); /* 创建临时节点 */
    } else {
    createPersistent(path); /* 创建持久节点 */
    }
    }
    ZkclientZookeeperClient:
    public void createEphemeral(String path) {
    try {
    /* 创建临时节点 */
    client.createEphemeral(path);
    } catch (ZkNodeExistsException e) {
    }
    }
    ZkClientWrapper:
    public void createEphemeral(String path) {
    Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!"));
    // 调用ZkClient的createEphemeral方法创建临时节点
    client.createEphemeral(path);
    }
    ZkclientZookeeperClient:
    public void createPersistent(String path) {
    try {
    /* 创建持久节点 */
    client.createPersistent(path);
    } catch (ZkNodeExistsException e) {
    }
    }
    ZkClientWrapper:
    public void createPersistent(String path) {
    Assert.notNull(client, new IllegalStateException("Zookeeper is not connected yet!"));
    // 调用ZkClient的createPersistent方法创建临时节点
    client.createPersistent(path, true);
    }
    在注册中心创建完成服务信息节点之后是订阅操作:
    FailbackRegistry:
    public void subscribe(URL url, NotifyListener listener) {
    if (destroyed.get()){
    return;
    }
    super.subscribe(url, listener); // 添加到已订阅的列表
    removeFailedSubscribed(url, listener); // 移除相关失败订阅记录
    try {
    /* 发送订阅请求 */
    doSubscribe(url, listener);
    } catch (Exception e) {
    Throwable t = e;
    List<URL> urls = getCacheUrls(url);
    if (urls != null && urls.size() > 0) {
    notify(url, listener, urls);
    logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getUrl().getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/dubbo-registry-" + url.getHost() + ".cache") + ", cause: " + t.getMessage(), t);
    } else {
    // 如果启动检测被打开,则直接抛出异常
    boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
    && url.getParameter(Constants.CHECK_KEY, true);
    boolean skipFailback = t instanceof SkipFailbackWrapperException;
    if (check || skipFailback) {
    if (skipFailback) {
    t = t.getCause();
    }
    throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t);
    } else {
    logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
    }
    }
    // 将失败的订阅请求记录到失败的列表中,定期重试
    addFailedSubscribed(url, listener);
    }
    }

    ZookeeperRegistry:
    protected void doSubscribe(final URL url, final NotifyListener listener) {
    try {
    // 通配
    if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
    String root = toRootPath();
    ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
    if (listeners == null) {
    zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
    listeners = zkListeners.get(url);
    }
    ChildListener zkListener = listeners.get(listener);
    if (zkListener == null) {
    // 添加子节点变更监听器
    listeners.putIfAbsent(listener, new ChildListener() {
    public void childChanged(String parentPath, List<String> currentChilds) {
    for (String child : currentChilds) {
    child = URL.decode(child);
    if (!anyServices.contains(child)) {
    anyServices.add(child);
    // 递归订阅子节点变更通知
    subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child,
    Constants.CHECK_KEY, String.valueOf(false)), listener);
    }
    }
    }
    });
    zkListener = listeners.get(listener);
    }
    // 创建持久根节点,已存在则忽略
    zkClient.create(root, false);
    /* zookeeper添加子节点变更监听器,返回路径当前的子节点列表 */
    List<String> services = zkClient.addChildListener(root, zkListener);
    if (services != null && services.size() > 0) {
    for (String service : services) {
    service = URL.decode(service);
    anyServices.add(service);
    // 递归订阅
    subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service,
    Constants.CHECK_KEY, String.valueOf(false)), listener);
    }
    }
    } else { // 具体服务
    List<URL> urls = new ArrayList<URL>();
    for (String path : toCategoriesPath(url)) {
    ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
    if (listeners == null) {
    zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
    listeners = zkListeners.get(url);
    }
    ChildListener zkListener = listeners.get(listener);
    if (zkListener == null) {
    // 添加子节点变更监听器
    listeners.putIfAbsent(listener, new ChildListener() {
    public void childChanged(String parentPath, List<String> currentChilds) {
    /* 变更通知 */
    ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
    }
    });
    zkListener = listeners.get(listener);
    }
    // 创建持久节点,已存在则忽略
    zkClient.create(path, false);
    List<String> children = zkClient.addChildListener(path, zkListener);
    if (children != null) {
    urls.addAll(toUrlsWithEmpty(url, path, children));
    }
    }
    /* 订阅之后首先通知一次 */
    notify(url, listener, urls);
    }
    } catch (Throwable e) {
    throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
    }
    }
    FailbackRegistry:
    protected void notify(URL url, NotifyListener listener, List<URL> urls) {
    if (url == null) {
    throw new IllegalArgumentException("notify url == null");
    }
    if (listener == null) {
    throw new IllegalArgumentException("notify listener == null");
    }
    try {
    doNotify(url, listener, urls); /* 通知 */
    } catch (Exception t) {
    // 将失败的通知请求记录到失败的列表中,定期重试
    Map<NotifyListener, List<URL>> listeners = failedNotified.get(url);
    if (listeners == null) {
    failedNotified.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, List<URL>>());
    listeners = failedNotified.get(url);
    }
    listeners.put(listener, urls);
    logger.error("Failed to notify for subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
    }
    }

    FailbackRegistry:
    protected void doNotify(URL url, NotifyListener listener, List<URL> urls) {
    super.notify(url, listener, urls); /* 调用父类通知方法 */
    }
    AbstractRegistry:
    protected void notify(URL url, NotifyListener listener, List<URL> urls) {
    if (url == null) {
    throw new IllegalArgumentException("notify url == null");
    }
    if (listener == null) {
    throw new IllegalArgumentException("notify listener == null");
    }
    if ((urls == null || urls.size() == 0)
    && !Constants.ANY_VALUE.equals(url.getServiceInterface())) {
    logger.warn("Ignore empty notify urls for subscribe url " + url);
    return;
    }
    if (logger.isInfoEnabled()) {
    logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);
    }
    Map<String, List<URL>> result = new HashMap<String, List<URL>>();
    for (URL u : urls) {
    if (UrlUtils.isMatch(url, u)) {
    String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
    List<URL> categoryList = result.get(category);
    if (categoryList == null) {
    categoryList = new ArrayList<URL>();
    result.put(category, categoryList);
    }
    categoryList.add(u);
    }
    }
    if (result.size() == 0) {
    return;
    }
    Map<String, List<URL>> categoryNotified = notified.get(url);
    if (categoryNotified == null) {
    notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>());
    categoryNotified = notified.get(url);
    }
    for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
    String category = entry.getKey();
    List<URL> categoryList = entry.getValue();
    categoryNotified.put(category, categoryList);
    saveProperties(url); /* 更新本地文件缓存 */
    listener.notify(categoryList); /* 通知监听器 */
    }
    }
    AbstractRegistry:
    private void saveProperties(URL url) {
    if (file == null) {
    return;
    }
    try {
    StringBuilder buf = new StringBuilder();
    Map<String, List<URL>> categoryNotified = notified.get(url);
    if (categoryNotified != null) {
    for (List<URL> us : categoryNotified.values()) {
    for (URL u : us) {
    if (buf.length() > 0) {
    buf.append(URL_SEPARATOR);
    }
    buf.append(u.toFullString());
    }
    }
    }
    properties.setProperty(url.getServiceKey(), buf.toString());
    // 增加缓存更新次数计数
    long version = lastCacheChanged.incrementAndGet();
    if (syncSaveFile) {
    doSaveProperties(version); /* 同步刷新文件缓存 */
    } else {
    /* 单线程池异步刷新文件缓存 */
    registryCacheExecutor.execute(new SaveProperties(version));
    }
    } catch (Throwable t) {
    logger.warn(t.getMessage(), t);
    }
    }
    AbstractRegistry:
    public void doSaveProperties(long version) {
    if (version < lastCacheChanged.get()) {
    return;
    }
    if (file == null) {
    return;
    }
    try {
    File lockfile = new File(file.getAbsolutePath() + ".lock"); // 文件锁
    if (!lockfile.exists()) {
    // 锁文件不存在则创建新的锁文件
    lockfile.createNewFile();
    }
    RandomAccessFile raf = new RandomAccessFile(lockfile, "rw");
    try {
    FileChannel channel = raf.getChannel();
    try {
    FileLock lock = channel.tryLock();
    if (lock == null) {
    throw new IOException("Can not lock the registry cache file " + file.getAbsolutePath() + ", ignore and retry later, maybe multi java process use the file, please config: dubbo.registry.file=xxx.properties");
    }
    try {
    if (!file.exists()) {
    // 缓存文件不存在则创建新的缓存文件
    file.createNewFile();
    }
    FileOutputStream outputFile = new FileOutputStream(file);
    try {
    // 保存
    properties.store(outputFile, "Dubbo Registry Cache");
    } finally {
    outputFile.close();
    }
    } finally {
    lock.release();
    }
    } finally {
    channel.close();
    }
    } finally {
    raf.close();
    }
    } catch (Throwable e) {
    if (version < lastCacheChanged.get()) {
    return;
    } else {
    registryCacheExecutor.execute(new SaveProperties(lastCacheChanged.incrementAndGet()));
    }
    logger.warn("Failed to save registry store file, cause: " + e.getMessage(), e);
    }
    }
    更新完本地文件缓存之后,最后就是通知相关监听器:
    RegistryProtocol.OverrideListener:
    public synchronized void notify(List<URL> urls) {
    logger.debug("original override urls: " + urls);
    List<URL> matchedUrls = getMatchedUrls(urls, subscribeUrl); // 找到与订阅url相匹配的url集合
    logger.debug("subscribe url: " + subscribeUrl + ", override urls: " + matchedUrls);
    if (matchedUrls.isEmpty()) {
    return;
    }
    // 转换覆盖url以便在重新引用时使用,每次发送所有规则,url将被重新组合和计算
    List<Configurator> configurators = RegistryDirectory.toConfigurators(matchedUrls);
    final Invoker<?> invoker;
    if (originInvoker instanceof InvokerDelegete) {
    invoker = ((InvokerDelegete<?>) originInvoker).getInvoker();
    } else {
    invoker = originInvoker;
    }
    // 原invoker url
    URL originUrl = RegistryProtocol.this.getProviderUrl(invoker);
    String key = getCacheKey(originInvoker);
    ExporterChangeableWrapper<?> exporter = bounds.get(key);
    if (exporter == null) {
    logger.warn(new IllegalStateException("error state, exporter should not be null"));
    return;
    }
    URL currentUrl = exporter.getInvoker().getUrl();
    // 与此配置合并,产生新的url
    URL newUrl = getConfigedInvokerUrl(configurators, originUrl);
    if (!currentUrl.equals(newUrl)) {
    // 重新暴露修改后的url的invoker
    RegistryProtocol.this.doChangeLocalExport(originInvoker, newUrl);
    logger.info("exported provider url changed, origin url: " + originUrl + ", old export url: " + currentUrl + ", new export url: " + newUrl);
    }
    }
    还有一个Listener是在consumer初始化时注册的RegistryDirectory,我们来看它的notify方法实现:
    RegistryDirectory:
    public synchronized void notify(List<URL> urls) {
    List<URL> invokerUrls = new ArrayList<URL>();
    List<URL> routerUrls = new ArrayList<URL>();
    List<URL> configuratorUrls = new ArrayList<URL>();
    // 循环确认是哪一种url
    for (URL url : urls) {
    String protocol = url.getProtocol();
    String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
    if (Constants.ROUTERS_CATEGORY.equals(category)
    || Constants.ROUTE_PROTOCOL.equals(protocol)) {
    routerUrls.add(url);
    } else if (Constants.CONFIGURATORS_CATEGORY.equals(category)
    || Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
    configuratorUrls.add(url);
    } else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
    invokerUrls.add(url);
    } else {
    logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost());
    }
    }
    // 配置
    if (configuratorUrls != null && configuratorUrls.size() > 0) {
    // 将重写URL转换为map以供重新引用时使用。每次发送所有规则,url将被重新组装和计算
    this.configurators = toConfigurators(configuratorUrls);
    }
    // 路由
    if (routerUrls != null && routerUrls.size() > 0) {
    List<Router> routers = toRouters(routerUrls);
    if (routers != null) {
    setRouters(routers);
    }
    }
    List<Configurator> localConfigurators = this.configurators;
    // 合并覆盖参数
    this.overrideDirectoryUrl = directoryUrl;
    if (localConfigurators != null && localConfigurators.size() > 0) {
    for (Configurator configurator : localConfigurators) {
    this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);
    }
    }
    /* 刷新provider信息 */
    refreshInvoker(invokerUrls);
    }
    RegistryDirectory:
    private void refreshInvoker(List<URL> invokerUrls) {
    if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
    && Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
    this.forbidden = true; // 禁止访问
    this.methodInvokerMap = null; // method invoker map置为null
    destroyAllInvokers(); // 调用invoker的destory方法关闭所有invoker
    } else {
    this.forbidden = false; // 允许访问
    Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap;
    if (invokerUrls.size() == 0 && this.cachedInvokerUrls != null) {
    invokerUrls.addAll(this.cachedInvokerUrls);
    } else {
    this.cachedInvokerUrls = new HashSet<URL>();
    // 缓存invoker url,方便比较
    this.cachedInvokerUrls.addAll(invokerUrls);
    }
    if (invokerUrls.size() == 0) {
    return;
    }
    /* 将url翻译为invoker map */
    Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);
    // 更改方法名称以映射invoker map
    Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap);
    // 如果计算错误,则不进行处理
    if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
    logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString()));
    return;
    }
    // multiGroup为true需要将group相同的invoker合并到一起
    this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
    this.urlInvokerMap = newUrlInvokerMap;
    try {
    // 检查缓存中的invoker是否需要销毁,检查的方式就是判断旧的map中是否存在新的map中不存在的invoker
    // 如果有,则调用destory方法进行销毁
    destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap);
    } catch (Exception e) {
    logger.warn("destroyUnusedInvokers error. ", e);
    }
    }
    }
    RegistryDirectory:
    private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
    Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<String, Invoker<T>>();
    if (urls == null || urls.size() == 0) {
    return newUrlInvokerMap;
    }
    Set<String> keys = new HashSet<String>();
    String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY);
    for (URL providerUrl : urls) {
    // 如果在引用侧配置协议,则仅选择匹配的协议
    if (queryProtocols != null && queryProtocols.length() > 0) {
    boolean accept = false;
    String[] acceptProtocols = queryProtocols.split(",");
    for (String acceptProtocol : acceptProtocols) {
    if (providerUrl.getProtocol().equals(acceptProtocol)) {
    accept = true;
    break;
    }
    }
    if (!accept) {
    continue;
    }
    }
    if (Constants.EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {
    continue;
    }
    if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) {
    logger.error(new IllegalStateException("Unsupported protocol " + providerUrl.getProtocol() + " in notified url: " + providerUrl + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost()
    + ", supported protocol: " + ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions()));
    continue;
    }
    URL url = mergeUrl(providerUrl); // 合并url参数
    String key = url.toFullString();
    // 判断url是否重复
    if (keys.contains(key)) {
    continue;
    }
    keys.add(key);
    // 缓存的key不与consumer参数合并的URL,无论consumer如何组合参数,如果服务URL更改,则再次引用
    Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap;
    Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
    // 没有在缓存中,再次引用
    if (invoker == null) {
    try {
    boolean enabled = true;
    if (url.hasParameter(Constants.DISABLED_KEY)) {
    enabled = !url.getParameter(Constants.DISABLED_KEY, false);
    } else {
    enabled = url.getParameter(Constants.ENABLED_KEY, true);
    }
    if (enabled) {
    // 构建新的invoker
    invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);
    }
    } catch (Throwable t) {
    logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
    }
    if (invoker != null) {
    // 将新的invoker放入缓存中
    newUrlInvokerMap.put(key, invoker);
    }
    } else {
    newUrlInvokerMap.put(key, invoker);
    }
    }
    keys.clear();
    return newUrlInvokerMap;
    }
    到这里,Dubbo注册中心相关操作的源码分析就完成了。


  • 相关阅读:
    五步搞定Android开发环境部署
    centos7安装MongoDB3.4
    java数据结构之三叉链表示的二叉树
    java数据结构之二叉树遍历的非递归实现
    java数据结构之二叉树的定义和递归实现
    java数据结构之树
    java数据结构之递归算法
    java数据结构之(堆)栈
    redis主从复制配置
    Redis 发布订阅
  • 原文地址:https://www.cnblogs.com/lanblogs/p/15262409.html
Copyright © 2011-2022 走看看