zoukankan      html  css  js  c++  java
  • Soul 学习笔记---数据同步 websocket 连接建立过程分析(五)

    上一篇讲到 soul 是如何使用 websocket 进行数据同步的,今天来分析下,websocket 连接是什么时候建立的。

    上一篇也讲到,启动 soul-admin 时,因为 yml 配置了使用 websocket 进行同步,会加载这三个类,WebsocketCollector 这个类就是开启一个 websocket 服务。

    soul-bootstrappom 文件引入了这个 starter

    <!--soul data sync start use websocket-->
    <dependency>
        <groupId>org.dromara</groupId>
        <artifactId>soul-spring-boot-starter-sync-data-websocket</artifactId>
        <version>${project.version}</version>
    </dependency>
    

    启动 soul-bootstrap 时,就会去寻找 soul-spring-boot-starter-sync-data-websocketresources/META-INF/spring.factories 文件,然后根据文件中配置去加载指定模块。

    //spring.factories 文件内容
    org.springframework.boot.autoconfigure.EnableAutoConfiguration=
    org.dromara.soul.spring.boot.starter.sync.data.websocket.WebsocketSyncDataConfiguration
    

    这个文件配置的就是 WebsocketSyncDataConfiguration 类。

    看到这个类的代码,我先去查了下 ObjectProvider 相关知识。

    spring4.3 之前,我们需要在一个类 A 里注入另一个类 B 时,会使用 @Autowired 注解,不加程序会报异常。4.3 后,引入了一个新特性,我们只需要在类 A 加一个构造函数,B 作为构造函数的参数传进来,就可以不加 @Autowired,但 B 必须要在 spring 容器中,否则会出现异常,此时我们就需要 引入 ObjectProvider

    //4.3 之前
    @Service
    public class A {
        private final B b;
        @Autowired
        public A (B b) {
            this.b = b
        }
    }
    //4.3 之后,不需要加 @Autowired 注解,但如果 B 不在 spring 容器,会报异常
    @Service
    public class A {
        private final B b;
        public A (B b) {
            this.b = b
        }
    }
    //引入 ObjectProvider
    @Service
    public class A {
        private final B b;
        public A(ObjectProvider<B> bProvider) {
        	//如果不可用或不唯一(没有指定primary)则返回null。否则,返回对象。
            this.b = bProvider.getIfUnique();
        }
    }
    

    在下面这个类里,使用的是 ObjectProvidergetIfAvailable(Supplier<T> defaultSupplier) 方法,这个方法是说 如果对象存在直接返回,对象不存在,就进行回调,回调对象由 defaultSupplier 提供。

    @Configuration
    @ConditionalOnClass(WebsocketSyncDataService.class)
    @ConditionalOnProperty(prefix = "soul.sync.websocket", name = "urls")
    @Slf4j
    public class WebsocketSyncDataConfiguration {
    
        /**
         * Websocket sync data service.
         */
        @Bean
        public SyncDataService websocketSyncDataService(final ObjectProvider<WebsocketConfig> websocketConfig, final ObjectProvider<PluginDataSubscriber> pluginSubscriber,
                                               final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) {
            log.info("you use websocket sync soul data.......");
            return new WebsocketSyncDataService(websocketConfig.getIfAvailable(WebsocketConfig::new), pluginSubscriber.getIfAvailable(),
                    metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList));
        }
    
        /**
         * Config websocket config.
         *
         * @return the websocket config
         */
        @Bean
        @ConfigurationProperties(prefix = "soul.sync.websocket")
        public WebsocketConfig websocketConfig() {
            return new WebsocketConfig();
        }
    }
    

    WebsocketSyncDataConfiguration 这个类的加载依赖于 soul.sync.websocket,而我们在 soul-bootstrap 配置如下,所以会加载这个类。

    可以看到这个类创建了一个 WebsocketSyncDataService 对象,就是在这个 service,创建了一个 webSocket 客户端,和我们在 soul-admin 创建的 websocket 服务建立了连接,这里还有一个线程池 ScheduledThreadPoolExecutor,如果客户端连接关闭,会定时尝试重新连接。

        public WebsocketSyncDataService(final WebsocketConfig websocketConfig,
                                        final PluginDataSubscriber pluginDataSubscriber,
                                        final List<MetaDataSubscriber> metaDataSubscribers,
                                        final List<AuthDataSubscriber> authDataSubscribers) {
            String[] urls = StringUtils.split(websocketConfig.getUrls(), ",");
            executor = new ScheduledThreadPoolExecutor(urls.length, SoulThreadFactory.create("websocket-connect", true));
            for (String url : urls) {
                try {
                    clients.add(new SoulWebsocketClient(new URI(url), Objects.requireNonNull(pluginDataSubscriber), metaDataSubscribers, authDataSubscribers));
                } catch (URISyntaxException e) {
                    log.error("websocket url({}) is error", url, e);
                }
            }
            try {
                for (WebSocketClient client : clients) {
                    boolean success = client.connectBlocking(3000, TimeUnit.MILLISECONDS);
                    if (success) {
                        log.info("websocket connection is successful.....");
                    } else {
                        log.error("websocket connection is error.....");
                    }
                    executor.scheduleAtFixedRate(() -> {
                        try {
                            if (client.isClosed()) {
                                boolean reconnectSuccess = client.reconnectBlocking();
                                if (reconnectSuccess) {
                                    log.info("websocket reconnect is successful.....");
                                } else {
                                    log.error("websocket reconnection is error.....");
                                }
                            }
                        } catch (InterruptedException e) {
                            log.error("websocket connect is error :{}", e.getMessage());
                        }
                    }, 10, 30, TimeUnit.SECONDS);
                }
                /* client.setProxy(new Proxy(Proxy.Type.HTTP, new InetSocketAddress("proxyaddress", 80)));*/
            } catch (InterruptedException e) {
                log.info("websocket connection...exception....", e);
            }
    
        }
    

    因此 soul-adminsoul-bootstrap 项目启动完成,websocket 连接也建立成功了,接下来有数据变动,soul-admin 服务端就会向客户端发送消息,数据就及时的同步到内存中。

    参考资料

    spring ObjectProvider 源码分析

  • 相关阅读:
    QtCreator无法编辑源文件
    【Newtonsoft.Json】自己实现JsonConverter ,精简返回的数据结果
    Ghostscript 将PDF文件转换成PNG图片 问题一二
    Nginx--面试基础必会
    Nginx日志配置
    Nginx缓存原理及机制
    Nginx限流
    Nginx 实现 Rewrite 跳转
    Nginx正确配置Location
    渐进深入理解Nginx
  • 原文地址:https://www.cnblogs.com/fightingting/p/14300409.html
Copyright © 2011-2022 走看看