zoukankan      html  css  js  c++  java
  • licode服务端总结

    1.系统架构

    image

    (来源:https://github.com/lynckia/licode/issues/335)

    2.nuve模块

    nuve-api-architecture

    (修改:https://blog.csdn.net/u012908515/article/details/53940787)

    app.post('/rooms', roomsResource.createRoom);
    app.get('/rooms', roomsResource.represent);
    
    app.get('/rooms/:room', roomResource.represent);
    app.put('/rooms/:room', roomResource.updateRoom);
    app.patch('/rooms/:room', roomResource.patchRoom);
    app.delete('/rooms/:room', roomResource.deleteRoom);
    
    app.post('/rooms/:room/tokens', tokensResource.create);
    
    app.post('/services', servicesResource.create);
    app.get('/services', servicesResource.represent);
    
    app.get('/services/:service', serviceResource.represent);
    app.delete('/services/:service', serviceResource.deleteService);
    
    app.get('/rooms/:room/users', usersResource.getList);
    
    app.get('/rooms/:room/users/:user', userResource.getUser);
    app.delete('/rooms/:room/users/:user', userResource.deleteUser);

    2.1签名验证

    app.get('*', nuveAuthenticator.authenticate);
    app.post('*', nuveAuthenticator.authenticate);
    app.put('*', nuveAuthenticator.authenticate);
    app.patch('*', nuveAuthenticator.authenticate);
    app.delete('*', nuveAuthenticator.authenticate);

    每一次客户端请求都会进行签名验证。

    2.2cloudHandle.js

    image

    cloudHandler提供了nuve对EC的调用:获取房间中的所有用户,删除房间,删除指定用户;以及EC对nuve的调用:删除token,EC的添加、状态更新,删除以及保活。

    调用方法通过RPCMQ实现,他们共同维护了两个队列,队列A用来传递调用消息,Nuve发布调用消息后,并且会维护一个调用方法和回调函数的字典;EC从队列A中获取调用消息后执行调用方法,并将结果消息push进队列B;Nuve从队列B中获取调用结果消息,并从字典中得到回调函数并执行callback,删除对应字典值。

    2.3服务、房间、用户管理

    image

    1.只有superService才有权限进行service的相关操作,即配置文件中的superService = config.nuve.superserviceID

    2.在room模块中currentService即当前请求的service,会维护一个rooms列表,且创建房间的时候可携带房间的媒体信息:room.mediaConfiguration

    3.Nuve中的user管理仅仅提供查询和删除功能,通过房间id,对EC发出获取当前房间用户信息,或者删除该用户
     
    room {name: '', [p2p: bool], [data: {}], _id: ObjectId}
    service {name: '', key: '', rooms: Array[room], testRoom: room, testToken: token, _id: ObjectId}
    token {host: '', userName: '', room: '', role: '', service: '', creationDate: Date(), [use: int], [p2p: bool], _id: ObjectId}

    2.4总结

         Nuve功能相当于一个负载均衡,它负责入库的这些信息能够更好的辅助这个功能,ch_policies决定了负载均衡算法,默认取EC队列的首位。EC服务的启动并不归Nuve管理,EC创建成功后在Nuve中入库,Nuve仅仅维护列表以及保活,负责EC的分配。

    3.ErizoController

     

     EC是一个service服务,会维护一个房间集合Rooms,创建成功后会在Nuve中进行注册,并且连接amqper

    server.listen(global.config.erizoController.listen_port);
      // eslint-disable-next-line global-require, import/no-extraneous-dependencies
    const io = require('socket.io').listen(server, { log: false });
    
    io.set('transports', ['websocket']);
      const addECToCloudHandler = (attempt) => {
        if (attempt <= 0) {
          log.error('message: addECtoCloudHandler cloudHandler does not respond - fatal');
          process.exit(-1);
          return;
        }
    
        const controller = {
          cloudProvider: global.config.cloudProvider.name,
          ip: publicIP,
          hostname: global.config.erizoController.hostname,
          port: global.config.erizoController.port,
          ssl: global.config.erizoController.ssl,
        };
        nuve.addNewErizoController(controller).then((msg) => {
          log.info('message: succesfully added to cloudHandler');
    
          publicIP = msg.publicIP;
          myId = msg.id;
          myState = 2;
    
          startKeepAlives(myId, publicIP);
          callback('callback');
        }).catch((reason) => {
          if (reason === 'timeout') {
            log.warn('message: addECToCloudHandler cloudHandler does not respond, ' +
                         `attemptsLeft: ${attempt}`);
    
                // We'll try it more!
            setTimeout(() => {
              attempt -= 1;
              addECToCloudHandler(attempt);
            }, 3000);
          } else {
            log.error('message: cannot contact cloudHandler');
          }
        });
      };
      addECToCloudHandler(5);

    连接amqper即RPCMQ,此时会将Nuve对EC方法的调用绑定

    amqper.connect(() => {
      try {
        rooms.on('updated', updateMyState);
        amqper.setPublicRPC(rpcPublic);
    
        addToCloudHandler(() => {
          const rpcID = `erizoController_${myId}`;
          amqper.bind(rpcID, listen);
        });
      } catch (error) {
        log.info(`message: Error in Erizo Controller, ${logger.objectToLog(error)}`);
      }
    });

    在Nuve注册成功后回调会新建一个RPCMQ消息消费队列,同时会对EC服务端口监听“connection”消息,此时会新建房间(若房间已经存在即加入反那个房间),并新建socket连接和client。

    const listen = () => {
      io.sockets.on('connection', (socket) => {
        log.info(`message: socket connected, socketId: ${socket.id}`);
    
        const channel = new Channel(socket, nuve);
    
        channel.on('connected', (token, options, callback) => {
          options = options || {};
          try {
            const room = rooms.getOrCreateRoom(myId, token.room, token.p2p);
            options.singlePC = getSinglePCConfig(options.singlePC);
            const client = room.createClient(channel, token, options);
            log.info(`message: client connected, clientId: ${client.id}, ` +
                `singlePC: ${options.singlePC}`);
            if (!room.p2p && global.config.erizoController.report.session_events) {
              const timeStamp = new Date();
              amqper.broadcast('event', { room: room.id,
                user: client.id,
                type: 'user_connection',
                timestamp: timeStamp.getTime() });
            }
    
            const streamList = [];
            room.streamManager.forEachPublishedStream((stream) => {
              streamList.push(stream.getPublicStream());
            });
    
            callback('success', { streams: streamList,
              id: room.id,
              clientId: client.id,
              singlePC: options.singlePC,
              p2p: room.p2p,
              defaultVideoBW: global.config.erizoController.defaultVideoBW,
              maxVideoBW: global.config.erizoController.maxVideoBW,
              iceServers: global.config.erizoController.iceServers });
          } catch (e) {
            log.warn('message: error creating Room or Client, error:', e);
          }
        });
    
        channel.on('reconnected', (clientId) => {
          rooms.forEachRoom((room) => {
            const client = room.getClientById(clientId);
            if (client !== undefined) {
              client.setNewChannel(channel);
            }
          });
        });
    
        socket.channel = channel;
      });
    };

     client相当于客户端中的用户,它会根据EC新建的channel监听来自客户端的socket消息,客户端的信令消息就是在这里进行处理的。client监听到事件后会通过room.controller(即roomController)中的方法进行具体实现。

    listenToSocketEvents() {
        log.debug(`message: Adding listeners to socket events, client.id: ${this.id}`);
        this.socketEventListeners.set('sendDataStream', this.onSendDataStream.bind(this));
        this.socketEventListeners.set('connectionMessage', this.onConnectionMessage.bind(this));
        this.socketEventListeners.set('streamMessage', this.onStreamMessage.bind(this));
        this.socketEventListeners.set('streamMessageP2P', this.onStreamMessageP2P.bind(this));
        this.socketEventListeners.set('updateStreamAttributes', this.onUpdateStreamAttributes.bind(this));
        this.socketEventListeners.set('publish', this.onPublish.bind(this));
        this.socketEventListeners.set('subscribe', this.onSubscribe.bind(this));
        this.socketEventListeners.set('startRecorder', this.onStartRecorder.bind(this));
        this.socketEventListeners.set('stopRecorder', this.onStopRecorder.bind(this));
        this.socketEventListeners.set('unpublish', this.onUnpublish.bind(this));
        this.socketEventListeners.set('unsubscribe', this.onUnsubscribe.bind(this));
        this.socketEventListeners.set('autoSubscribe', this.onAutoSubscribe.bind(this));
        this.socketEventListeners.set('getStreamStats', this.onGetStreamStats.bind(this));
        this.socketEventListeners.forEach((value, key) => {
          this.channel.socketOn(key, value);
        });
        this.channel.on('disconnect', this.onDisconnect.bind(this));
      }
      stopListeningToSocketEvents() {
        log.debug(`message: Removing listeners to socket events, client.id: ${this.id}`);
        this.socketEventListeners.forEach((value, key) => {
          this.channel.socketRemoveListener(key, value);
        });
      }
    
      disconnect() {
        this.stopListeningToSocketEvents();
        this.channel.disconnect();
      }

     ecCloudHandler负责EC对ErizoAgent的分配(负载均衡),amqper会对绑定到MQ上的所有“ErizoAgent”消息队列定时广播获取新建的ErizoAgent并放入ErizoAgent列表中。同时负责向ErizoAgent申请创建ErizoJS和删除ErizoJS。

    that.getErizoJS = (agentId, internalId, callback) => {
        let agentQueue = 'ErizoAgent';
    
        if (getErizoAgent) {
          agentQueue = getErizoAgent(agents, agentId);
        }
    
        log.info(`message: createErizoJS, agentId: ${agentQueue}`);
    
        amqper.callRpc(agentQueue, 'createErizoJS', [internalId], { callback(resp) {

    roomController中会创建一个ErizoList,用于EC这边erizo的分配以及erizo状态的维护以及保活。同时roomController会通过MQ调用ErizoJS的方法。

    erizoPosition->erizo(undefine)->ecch.getErizoJS->分配Agent->Agent创建erizo
    const erizos = new ErizoList(maxErizosUsedByRoom);
    ..........
    ErizoJS保活 const sendKeepAlive
    = () => { erizos.forEachUniqueErizo((erizo) => { const erizoId = erizo.erizoId; amqper.callRpc(`ErizoJS_${erizoId}`, 'keepAlive', [], { callback: callbackFor(erizoId) }); }); }; setInterval(sendKeepAlive, KEEPALIVE_INTERVAL); ........
    通过ecch申请创建ErizoJS getErizoJS
    = (callback, previousPosition = undefined) => { let agentId; let erizoIdForAgent; const erizoPosition = previousPosition !== undefined ? previousPosition : currentErizo += 1; if (waitForErizoInfoIfPending(erizoPosition, callback)) { return; } const erizo = erizos.get(erizoPosition); if (!erizo.erizoId) { erizos.markAsPending(erizoPosition); } else { agentId = erizo.agentId; erizoIdForAgent = erizo.erizoIdForAgent; } log.debug(`message: Getting ErizoJS, agentId: ${agentId}, ` + `erizoIdForAgent: ${erizoIdForAgent}`); ecch.getErizoJS(agentId, erizoIdForAgent, (gotErizoId, gotAgentId, gotErizoIdForAgent) => { const theErizo = erizos.get(erizoPosition); if (!theErizo.erizoId && gotErizoId !== 'timeout') { erizos.set(erizoPosition, gotErizoId, gotAgentId, gotErizoIdForAgent); } else if (theErizo.erizoId) { theErizo.agentId = gotAgentId; theErizo.erizoIdForAgent = gotErizoIdForAgent; } callback(gotErizoId, gotAgentId, gotErizoIdForAgent); }); };

    总结:

    ErizoController是一个信令服务器,且负责erizo的负载均衡,分配erizoJS应该在哪个Agent上创建,client监听客户端socket连接,通过roomController调用erizo。

    4.ErizoAgent

     

     ErizoAgent是erizoJS的代理,代理启动后会被EC定时扫描到并存入EC的Agent列表中,然后它会被分配给不同的使用者,监听ErizoJS的创建和删除申请,erizoJS的创建就是Agent启动子进程运行erizoJS主程序(父子进程会分离异步)。
    const launchErizoJS = (erizo) => {
      const id = erizo.id;
      log.debug(`message: launching ErizoJS, erizoId: ${id}`);
      let erizoProcess; let out; let
        err;
      const erizoLaunchOptions = ['./../erizoJS/erizoJS.js', id, privateIP, publicIP];
      if (global.config.erizoAgent.launchDebugErizoJS) {
        erizoLaunchOptions.push('-d');
      }
    
      if (global.config.erizoAgent.useIndividualLogFiles) {
        out = fs.openSync(`${global.config.erizoAgent.instanceLogDir}/erizo-${id}.log`, 'a');
        err = fs.openSync(`${global.config.erizoAgent.instanceLogDir}/erizo-${id}.log`, 'a');
        erizoProcess = spawn(LAUNCH_SCRIPT, erizoLaunchOptions,
          { detached: true, stdio: ['ignore', out, err] });
      } else {
        erizoProcess = spawn(LAUNCH_SCRIPT, erizoLaunchOptions,
          { detached: true, stdio: ['ignore', 'pipe', 'pipe'] });
        erizoProcess.stdout.setEncoding('utf8');
        erizoProcess.stdout.on('data', (message) => {
          printErizoLogMessage(`[erizo-${id}]`, message.replace(/
    $/, ''));
        });
        erizoProcess.stderr.setEncoding('utf8');
        erizoProcess.stderr.on('data', (message) => {
          printErizoLogMessage(`[erizo-${id}]`, message.replace(/
    $/, ''));
        });
      }
      erizoProcess.unref();
      erizoProcess.on('close', () => {
        log.info(`message: closed, erizoId: ${id}`);
        erizos.delete(id);
    
        if (out !== undefined) {
          fs.close(out, (message) => {
            if (message) {
              log.error('message: error closing log file, ',
                                  `erizoId: ${id}`, 'error:', message);
            }
          });
        }
    
        if (err !== undefined) {
          fs.close(err, (message) => {
            if (message) {
              log.error('message: error closing log file, ',
                                  `erizoId: ${id}`, 'error:', message);
            }
          });
        }
        erizos.fill();
      });
      log.info(`message: launched new ErizoJS, erizoId: ${id}`);
      // eslint-disable-next-line no-param-reassign
      erizo.process = erizoProcess;
    };

    5.ErizoJS

     
     在erizoJS中client依然是与用户一一对应,与EC中的操作是异步的,只有在开始推流即addPublisher的时候才会将对应的client创建,并为对应的stream建立connection,在非singlePC模式下,每一个stream都会与客户端建立一个流连接,每一个输入流是一个Publisher,订阅者是Subscriber,一个client会有多个Publisher,而每个Publisher会有多个Subscriber,每个Publisher都会有一个媒体转发器OneToManyProcessor。底层的C++ erizo实现了 stream connection的建立,以及媒体转发和带宽控制。
     
      /*
       * Adds a publisher to the room. This creates a new OneToManyProcessor
       * and a new WebRtcConnection. This WebRtcConnection will be the publisher
       * of the OneToManyProcessor.
       */
      that.addPublisher = (erizoControllerId, clientId, streamId, options, callbackRpc) => {
        updateUptimeInfo();
        let publisher;
        log.info('addPublisher, clientId', clientId, 'streamId', streamId);
        const client = getOrCreateClient(erizoControllerId, clientId, options.singlePC);
    
        if (publishers[streamId] === undefined) {
          // eslint-disable-next-line no-param-reassign
          options.publicIP = that.publicIP;
          // eslint-disable-next-line no-param-reassign
          options.privateRegexp = that.privateRegexp;
          //新建connection的时候会在C++层建立WebRtcConnection,附带的有媒体设置消息,
          //同时会创建mediaStream,同时维护一个mediaStream字典
          const connection = client.getOrCreateConnection(options);
          log.info('message: Adding publisher, ' +
            `clientId: ${clientId}, ` +
            `streamId: ${streamId}`,
            logger.objectToLog(options),
            logger.objectToLog(options.metadata));
          publisher = new Publisher(clientId, streamId, connection, options);
          publishers[streamId] = publisher;
          publisher.initMediaStream();
          publisher.on('callback', onAdaptSchemeNotify.bind(this, callbackRpc));
          publisher.on('periodic_stats', onPeriodicStats.bind(this, streamId, undefined));
          publisher.promise.then(() => {
            //更新connection状态并初始化,options.createOffer=true就会创建offer,
            //并且在connection状态到达onInitialized/onGathered的时候发送offer
            connection.init(options.createOffer);
          });
          connection.onInitialized.then(() => {
            callbackRpc('callback', { type: 'initializing', connectionId: connection.id });
          });
          connection.onReady.then(() => {
            callbackRpc('callback', { type: 'ready' });
          });
          connection.onStarted.then(() => {
            callbackRpc('callback', { type: 'started' });
          });
          if (options.createOffer) {
            let onEvent;
            if (options.trickleIce) {
              onEvent = connection.onInitialized;
            } else {
              onEvent = connection.onGathered;
            }
            onEvent.then(() => {
              connection.sendOffer();
            });
          }
        } else {
          publisher = publishers[streamId];
          if (publisher.numSubscribers === 0) {
            log.warn('message: publisher already set but no subscribers will ignore, ' +
              `code: ${WARN_CONFLICT}, streamId: ${streamId}`,
              logger.objectToLog(options.metadata));
          } else {
            log.warn('message: publisher already set has subscribers will ignore, ' +
              `code: ${WARN_CONFLICT}, streamId: ${streamId}`);
          }
        }
      };

    6.时序图

    (未完待续)

  • 相关阅读:
    模拟Promise
    js 重写concat
    js 重写 slice
    es6继承
    es5 简单继承
    iterator 和yield的关系
    iterator接口 ...和for of依赖的关键
    e.offsetX,Y到底是相对于谁
    mysql alter修改数据库表结构用法
    mysql修改表结构
  • 原文地址:https://www.cnblogs.com/bloglearning/p/12012632.html
Copyright © 2011-2022 走看看