zoukankan      html  css  js  c++  java
  • 微前端架构设计之 WebSocket API 断连后重连的设计方案

    问题

    主框架(基座应用) WebSocket 断连后重连成功,但是 SubApp(子应用) 重新订阅 WebSocket 失败。

    原因

    框架重连成功后会重新订阅 topic,是正常的。但是 SubApp 或者组件生命周期内的 $socket 实例并未更新,其实例的 _connected 属性 false 导致重新订阅 topic失败。

    此处存在的问题:

    1. 不应该直接暴露 this.$socket 实例,因为该实例本身存在内部 dirty state
    2. WebSocket 的连接与重连不应该由调用方判断当前状态,增加了调用方的复杂度
    3. 连接异常、连接断开、连接失败,调用方并不知情
    4. 重连成功后应该重置 this.$socket 实例的引用

    重新设计

    目标

    • 调用方不需要处理 socket 内部的状态,直接连接
    • 连接失败或者断开连接后,调用方将收到错误消息
    • 首次或重新连接成功后,调用方的订阅将被重新发起

    思路

    使用发布订阅(pub-sub)观察者模式重新设计接口。WebSocket 的连接分成几个事件下发给调用方,这样调用发只需要注册各种 listener 函数,达到跟 socket 本身状态完全解耦的目的:

    这里存在的问题是:

    register('connected', listenerFn, failedFn) 可能发生在 $socket 连接成功前 ,也可能发生在 连接成功后

    所以在统一把 listenerFn 放进 queue 里面,如果 $socket.isConnectedtrue 则直接执行 listenerFn,否则给回 pending 状态,待 socket 连接成功后再执行 listenerFn.

    API

    • pub-sub & callback style [推荐]

    无论是首次连接成功,还是重连成功,onMessage callback function 都会被执行。
    无论是首次连接失败,还是重连失败,onError callback function 都会被执行 。

    PS:

    • onMessage 函数在收到服务端推送至客户端的数据时会被执行,若没有数据返回,则不会执行。
    • onMessage 在重连后,会立即被执行。这样做的目的是 立即清除客户端 socket 的异常状态
    const SUBSCRIBE_API = '/broadcast/message/user/${userId}'
    
    this.$socket.subscribe(SUBSCRIBE_API, function onMessage(data) {
    	console.log(data)
    })
    }, function onError(error) {
    	console.log(error)
    })
    
    // 关闭当前订阅
    this.$socket.unsubscribe(SUBSCRIBE_API)
    
    • promise style [不推荐]
      只有首次连接成功,resolve function 才会被执行。
      只有首次连接失败,rejection function 才会被执行 。

    PS:
    Promise 只会在 subscribe 执行的时候触发一次 resolve 或者 reject。因此该方式 __只能用于在订阅成功后立刻收到消息便关闭 __当前 topic 的这种连接。

    export default {
    	created() {
    		const SUBSCRIBE_API = '/broadcast/message/user/${userId}'
    
    		this.$socket.subscribe(SUBSCRIBE_API).then((data) => {
    			// connect success
    			console.log(data)
    		}).catch(error => {
    			// connect failed
    			console.error(error)
    		})
    	},
    
    	beforeDestroy(){
    		// 关闭当前订阅
    		this.$socket.unsubscribe(SUBSCRIBE_API)
    	}
    }
    

    DEMO

    订阅 topic

    this.$socket.subscribe(
          this.SUBSCRIBE_API,
          function handleSignatureMessage(data) {
            if (this.socketErrorMsg) {
              this.socketErrorMsg = null
            }
    
            console.log(data)
          },
          function handleSignatureError(error) {
            this.socketErrorMsg =
              error instanceof Error ? error.message : '' + error
          }
        )
    

    关闭订阅

    // 清空错误
    this.socketErrorMsg = null
    // 关闭订阅
    this.$socket.unsubscribe(this.$SUBSCRIBE_API)
    

    最终代码

    ChannelSubscribe 只实现了 Socket 订阅,使用静态属性 _map存放所有的 ChannelSubscribe 实例,实现前文提到的 listeners queue。通过静态方法 ChannelSubscribe.Run 重启当前 ChannelSubscribe._map 中的所有 _subscribe 方法。而 _subscribe 和 _unsubscribe 方法需要在 SocketChannel 中被具体实现,所以 ChannelSubscribe 的逻辑可以用到其他的 Channel 实现。比如说 XHRLoopChannel 长轮询通道订阅中,当轮询中断后重启所有的 ChannelSubscribe 实例,实现 ChannelSubscribe 跟 Channel 内部的完全解藕。

    • ChannelSubscribe.js:
    /**
     * 将订阅抽离成单独的对象,内聚其本身的属性和行为
     */
    export default class ChannelSubscribe {
      /* 用于保存当前所有已存在的 ChannelSubscribe 实例 */
      /** @type {Map<string, ChannelSubscribe>} ChannelSubscribeMap */
      static _map = new Map()
    
      /**
       * 运行所有的 ChannelSubscribe 实例
       * @param {import('stompjs').Client} client 最新的 Client 对象
       */
      static Run(client) {
        if (ChannelSubscribe._map.size) {
          /** @type {Iterable<ChannelSubscribe>} */
          const subscribes = ChannelSubscribe._map.values()
    
          for (const subscribe of subscribes) {
            subscribe.run(client)
          }
        }
      }
    
      /**
       * 实例化一个通道订阅对象 ChannelSubscribe
       * @param {string} api
       * @param {(data: any) => void} listenerFn
       * @param {(error: Error) => void} [failedFn]
       */
      constructor(api) {
        /* 如果此前已存在该订阅,先直接关闭然后重新订阅 */
        if (ChannelSubscribe._map.has(api)) {
          this._unsubscribe(api)
        }
    
        /* 具体实现和赋值在 this._subscribe 方法内部 */
        /** @type {import('stompjs').Subscription} */
        this._subscription = null
    
        /**
         * run() => 执行实例 _subscribe 方法
         *
         * @param {import('stompjs').Client} client
         * @returns {Promise<any>}
         */
        this.run = (client) => {
          ChannelSubscribe._map.set(api, this)
    
          return this._subscribe(client)
        }
      }
    
      /**
       * interface API 仅定义接口,不做具体实现
       * 具体实现代码在 SocketChannel.subscribe 方法内部
       */
      _subscribe(api) {
        throw new TypeError(
          `Must implements 'ChannelSubscribe._subscribe(${api})' interface.`
        )
      }
    
      /**
       * 取消订阅在内部实现,因为其不依赖任何外部状态
       *
       * @param {string} api
       */
      _unsubscribe(api) {
        if (ChannelSubscribe._map.has(api)) {
          const instance = ChannelSubscribe._map.get(api)
    
          /** @type {import('stompjs').Subscription} */
          const subscription = instance._subscription
          subscription && subscription.unsubscribe()
    
          return ChannelSubscribe._map.delete(api)
        }
      }
    }
    
    • SocketChannel.js:
    import Stomp from 'stompjs'
    import SockJS from 'sockjs-client'
    import ChannelSubscribe from './ChannelSubscribe'
    import './helpers/rewrite-receive-info'
    import { debug } from '@@/utils/index'
    import { handleUserMessage } from './handlers/user-message'
    import { handlePublishVersion } from './handlers/publish-version'
    import {
      WEB_SOCKET_API,
      SUBSCRIBE_PUB_API,
      SUBSCRIBE_USER_API
    } from './config/subscribe-url'
    import { MAX_RETRY_COUNT, RECONNECT_DURATION_TIME } from './config/constants'
    import {
      CONNECT_ERROR,
      LOST_CONNECTION,
      NOT_CONNECTED
    } from './config/error-message'
    
    /* 已重试次数 */
    let retryTimes = 0
    
    export default class SocketChannel {
      /* 私有属性 _client 实例引用对象 */
      /** @type {Stomp.Client|null} */
      static _client = null
    
      /* 私有属性 _error 连接错误实例 */
      /** @type {Error|null} */
      static _error = null
    
      /**
       * 建立 socket 连接
       * @param {number} userId
       */
      static connect(userId = '') {
        if (!userId) {
          throw new TypeError(
            `The userId is required for 'Socket.connect(userId: number)' method not ${typeof userId}!`
          )
        }
    
        const socket = new SockJS(WEB_SOCKET_API)
        // http://jmesnil.net/stomp-websocket/doc/
        const client = Stomp.over(socket)
    
        if (client && client.ws && client.ws instanceof WebSocket) {
          // https://developer.mozilla.org/en-US/docs/Web/API/WebSockets_API
          const ws = client.ws
    
          // https://github.com/sockjs/sockjs-client/issues/176#issuecomment-135124313
          ws.onerror((error) => {
            SocketChannel._client = null
            SocketChannel._error = new Error(error)
          })
    
          ws.onclose(() => {
            SocketChannel._client = null
            SocketChannel._error = new Error('WebSocket 已断开连接')
          })
        }
    
        client.connect(
          {},
    
          () => {
            debug(`连接成功`)
    
            // 连接成功
            retryTimes = 0
            SocketChannel._client = client
            SocketChannel._error = null
    
            // 订阅系统内置 topic
            client.subscribe(SUBSCRIBE_PUB_API, handlePublishVersion)
            client.subscribe(
              '/p2p/' + userId + SUBSCRIBE_USER_API,
              handleUserMessage(userId)
            )
    
            // 订阅 Invokers topic
            ChannelSubscribe.Run(client)
          },
    
          (error) => {
            debug(`连接失败`)
    
            // 连接失败
            SocketChannel._client = null
            SocketChannel._error = new Error(error || 'WebSocket 连接失败')
    
            ChannelSubscribe.Run(client)
    
            // 尝试重连 ${MAX_RETRY_COUNT} 次
            if (retryTimes <= MAX_RETRY_COUNT) {
              setTimeout(() => {
                debug(`Socket 接口异常,正在重连第${++retryTimes}次`)
    
                SocketChannel.connect && SocketChannel.connect(userId)
              }, /* 使用 2 的 n 次方代替原先的间隔常量 */ 2 ** retryTimes * RECONNECT_DURATION_TIME)
            }
          }
        )
      }
    
      /**
       * subscribe => SubApp Invoker subscribe API
       * @param {string} api
       * @param {(data: any) => void} listenerFn
       * @param {(error: Error) => void} [failedFn]
       *
       * 此时需要处理两种情况:
       *  1. 此时的 client 还未连接
       *   1.1 放进 _SubscribeMap 等待连接时按顺序执行连接
       *   1.2 连接成功后执行
       *  2. 此时的 client 已经连接,则直接使用 client 开始订阅
       *   2.1 订阅成功 - resolver
       *   2.2 订阅失败 - rejecter
       */
      static subscribe(api, listenerFn, failedFn) {
        const instance = new ChannelSubscribe(api, listenerFn, failedFn)
    
        /* 实现 ChannelSubscribe._subscribe 接口 */
        instance._subscribe = (/** @type {Stomp.Client|null} */ client) => {
          const { _client, _error } = SocketChannel
    
          /** @type {Stomp.Client|null} */
          client = client || _client
    
          return new Promise((resolve, reject) => {
            const onResolve = listenerFn || ((value) => resolve(value))
            const onReject = failedFn || ((error) => reject(error))
    
            // 还未连接
            if (!client) {
              return onReject(new Error(NOT_CONNECTED))
            }
    
            // 连接出现错误
            if (_error) {
              return onReject(new Error(CONNECT_ERROR))
            }
    
            // 已断开连接
            if (client && !client.connected) {
              return onReject(new Error(LOST_CONNECTION))
            }
    
            // 已连接
            if (client && client.connected) {
              /* 赋值 instance._subscription 内部属性 */
              instance._subscription = client.subscribe(api, ({ body } = {}) => {
                onResolve(typeof body === 'string' ? JSON.parse(body) : body)
              })
    
              // 如果之前 ChannelSubscribeMap 中已存在该 api
              // FIXME:即当前接口是重连后发起,需要被立即执行
              // 这样做的目的是清除客户端 socket 连接的异常状态
              if (ChannelSubscribe._map.has(api)) {
                onResolve({})
              }
            }
          })
        }
    
        return instance.run()
      }
    
      /**
       * 取消订阅
       * @param {string} api
       */
      static unsubscribe(api) {
        if (ChannelSubscribe._map.has(api)) {
          const instance = ChannelSubscribe._map.get(api)
    
          return instance._unsubscribe()
        }
      }
    }
    

    END

  • 相关阅读:
    Hibernate之lazy延迟加载(转)
    Hibernate中的一级缓存、二级缓存和懒加载(转)
    Hibernate框架之关联映射入门
    Hibernate框架之入门
    S​Q​L​获​取​当​前​时​间​(​日​期​)
    网页选项卡的应用
    动态获取设置提示框和小箭头的位置
    Jquery实现下拉联动表单
    jquery自己手写表单验证
    鼠标移动到图片上时,显示大图片
  • 原文地址:https://www.cnblogs.com/givingwu/p/14392358.html
Copyright © 2011-2022 走看看