1.系统架构
(来源:https://github.com/lynckia/licode/issues/335)
2.nuve模块
(修改:https://blog.csdn.net/u012908515/article/details/53940787)
app.post('/rooms', roomsResource.createRoom); app.get('/rooms', roomsResource.represent); app.get('/rooms/:room', roomResource.represent); app.put('/rooms/:room', roomResource.updateRoom); app.patch('/rooms/:room', roomResource.patchRoom); app.delete('/rooms/:room', roomResource.deleteRoom); app.post('/rooms/:room/tokens', tokensResource.create); app.post('/services', servicesResource.create); app.get('/services', servicesResource.represent); app.get('/services/:service', serviceResource.represent); app.delete('/services/:service', serviceResource.deleteService); app.get('/rooms/:room/users', usersResource.getList); app.get('/rooms/:room/users/:user', userResource.getUser); app.delete('/rooms/:room/users/:user', userResource.deleteUser);
2.1签名验证
app.get('*', nuveAuthenticator.authenticate); app.post('*', nuveAuthenticator.authenticate); app.put('*', nuveAuthenticator.authenticate); app.patch('*', nuveAuthenticator.authenticate); app.delete('*', nuveAuthenticator.authenticate);
每一次客户端请求都会进行签名验证。
2.2cloudHandle.js
cloudHandler提供了nuve对EC的调用:获取房间中的所有用户,删除房间,删除指定用户;以及EC对nuve的调用:删除token,EC的添加、状态更新,删除以及保活。
调用方法通过RPCMQ实现,他们共同维护了两个队列,队列A用来传递调用消息,Nuve发布调用消息后,并且会维护一个调用方法和回调函数的字典;EC从队列A中获取调用消息后执行调用方法,并将结果消息push进队列B;Nuve从队列B中获取调用结果消息,并从字典中得到回调函数并执行callback,删除对应字典值。
2.3服务、房间、用户管理
1.只有superService才有权限进行service的相关操作,即配置文件中的superService = config.nuve.superserviceID
2.在room模块中currentService即当前请求的service,会维护一个rooms列表,且创建房间的时候可携带房间的媒体信息:room.mediaConfiguration
room {name: '', [p2p: bool], [data: {}], _id: ObjectId} service {name: '', key: '', rooms: Array[room], testRoom: room, testToken: token, _id: ObjectId} token {host: '', userName: '', room: '', role: '', service: '', creationDate: Date(), [use: int], [p2p: bool], _id: ObjectId}
2.4总结
3.ErizoController
EC是一个service服务,会维护一个房间集合Rooms,创建成功后会在Nuve中进行注册,并且连接amqper
server.listen(global.config.erizoController.listen_port); // eslint-disable-next-line global-require, import/no-extraneous-dependencies const io = require('socket.io').listen(server, { log: false }); io.set('transports', ['websocket']);
const addECToCloudHandler = (attempt) => { if (attempt <= 0) { log.error('message: addECtoCloudHandler cloudHandler does not respond - fatal'); process.exit(-1); return; } const controller = { cloudProvider: global.config.cloudProvider.name, ip: publicIP, hostname: global.config.erizoController.hostname, port: global.config.erizoController.port, ssl: global.config.erizoController.ssl, }; nuve.addNewErizoController(controller).then((msg) => { log.info('message: succesfully added to cloudHandler'); publicIP = msg.publicIP; myId = msg.id; myState = 2; startKeepAlives(myId, publicIP); callback('callback'); }).catch((reason) => { if (reason === 'timeout') { log.warn('message: addECToCloudHandler cloudHandler does not respond, ' + `attemptsLeft: ${attempt}`); // We'll try it more! setTimeout(() => { attempt -= 1; addECToCloudHandler(attempt); }, 3000); } else { log.error('message: cannot contact cloudHandler'); } }); }; addECToCloudHandler(5);
连接amqper即RPCMQ,此时会将Nuve对EC方法的调用绑定
amqper.connect(() => { try { rooms.on('updated', updateMyState); amqper.setPublicRPC(rpcPublic); addToCloudHandler(() => { const rpcID = `erizoController_${myId}`; amqper.bind(rpcID, listen); }); } catch (error) { log.info(`message: Error in Erizo Controller, ${logger.objectToLog(error)}`); } });
在Nuve注册成功后回调会新建一个RPCMQ消息消费队列,同时会对EC服务端口监听“connection”消息,此时会新建房间(若房间已经存在即加入反那个房间),并新建socket连接和client。
const listen = () => { io.sockets.on('connection', (socket) => { log.info(`message: socket connected, socketId: ${socket.id}`); const channel = new Channel(socket, nuve); channel.on('connected', (token, options, callback) => { options = options || {}; try { const room = rooms.getOrCreateRoom(myId, token.room, token.p2p); options.singlePC = getSinglePCConfig(options.singlePC); const client = room.createClient(channel, token, options); log.info(`message: client connected, clientId: ${client.id}, ` + `singlePC: ${options.singlePC}`); if (!room.p2p && global.config.erizoController.report.session_events) { const timeStamp = new Date(); amqper.broadcast('event', { room: room.id, user: client.id, type: 'user_connection', timestamp: timeStamp.getTime() }); } const streamList = []; room.streamManager.forEachPublishedStream((stream) => { streamList.push(stream.getPublicStream()); }); callback('success', { streams: streamList, id: room.id, clientId: client.id, singlePC: options.singlePC, p2p: room.p2p, defaultVideoBW: global.config.erizoController.defaultVideoBW, maxVideoBW: global.config.erizoController.maxVideoBW, iceServers: global.config.erizoController.iceServers }); } catch (e) { log.warn('message: error creating Room or Client, error:', e); } }); channel.on('reconnected', (clientId) => { rooms.forEachRoom((room) => { const client = room.getClientById(clientId); if (client !== undefined) { client.setNewChannel(channel); } }); }); socket.channel = channel; }); };
client相当于客户端中的用户,它会根据EC新建的channel监听来自客户端的socket消息,客户端的信令消息就是在这里进行处理的。client监听到事件后会通过room.controller(即roomController)中的方法进行具体实现。
listenToSocketEvents() { log.debug(`message: Adding listeners to socket events, client.id: ${this.id}`); this.socketEventListeners.set('sendDataStream', this.onSendDataStream.bind(this)); this.socketEventListeners.set('connectionMessage', this.onConnectionMessage.bind(this)); this.socketEventListeners.set('streamMessage', this.onStreamMessage.bind(this)); this.socketEventListeners.set('streamMessageP2P', this.onStreamMessageP2P.bind(this)); this.socketEventListeners.set('updateStreamAttributes', this.onUpdateStreamAttributes.bind(this)); this.socketEventListeners.set('publish', this.onPublish.bind(this)); this.socketEventListeners.set('subscribe', this.onSubscribe.bind(this)); this.socketEventListeners.set('startRecorder', this.onStartRecorder.bind(this)); this.socketEventListeners.set('stopRecorder', this.onStopRecorder.bind(this)); this.socketEventListeners.set('unpublish', this.onUnpublish.bind(this)); this.socketEventListeners.set('unsubscribe', this.onUnsubscribe.bind(this)); this.socketEventListeners.set('autoSubscribe', this.onAutoSubscribe.bind(this)); this.socketEventListeners.set('getStreamStats', this.onGetStreamStats.bind(this)); this.socketEventListeners.forEach((value, key) => { this.channel.socketOn(key, value); }); this.channel.on('disconnect', this.onDisconnect.bind(this)); } stopListeningToSocketEvents() { log.debug(`message: Removing listeners to socket events, client.id: ${this.id}`); this.socketEventListeners.forEach((value, key) => { this.channel.socketRemoveListener(key, value); }); } disconnect() { this.stopListeningToSocketEvents(); this.channel.disconnect(); }
ecCloudHandler负责EC对ErizoAgent的分配(负载均衡),amqper会对绑定到MQ上的所有“ErizoAgent”消息队列定时广播获取新建的ErizoAgent并放入ErizoAgent列表中。同时负责向ErizoAgent申请创建ErizoJS和删除ErizoJS。
that.getErizoJS = (agentId, internalId, callback) => { let agentQueue = 'ErizoAgent'; if (getErizoAgent) { agentQueue = getErizoAgent(agents, agentId); } log.info(`message: createErizoJS, agentId: ${agentQueue}`); amqper.callRpc(agentQueue, 'createErizoJS', [internalId], { callback(resp) {
roomController中会创建一个ErizoList,用于EC这边erizo的分配以及erizo状态的维护以及保活。同时roomController会通过MQ调用ErizoJS的方法。
const erizos = new ErizoList(maxErizosUsedByRoom); ..........
ErizoJS保活 const sendKeepAlive = () => { erizos.forEachUniqueErizo((erizo) => { const erizoId = erizo.erizoId; amqper.callRpc(`ErizoJS_${erizoId}`, 'keepAlive', [], { callback: callbackFor(erizoId) }); }); }; setInterval(sendKeepAlive, KEEPALIVE_INTERVAL); ........
通过ecch申请创建ErizoJS getErizoJS = (callback, previousPosition = undefined) => { let agentId; let erizoIdForAgent; const erizoPosition = previousPosition !== undefined ? previousPosition : currentErizo += 1; if (waitForErizoInfoIfPending(erizoPosition, callback)) { return; } const erizo = erizos.get(erizoPosition); if (!erizo.erizoId) { erizos.markAsPending(erizoPosition); } else { agentId = erizo.agentId; erizoIdForAgent = erizo.erizoIdForAgent; } log.debug(`message: Getting ErizoJS, agentId: ${agentId}, ` + `erizoIdForAgent: ${erizoIdForAgent}`); ecch.getErizoJS(agentId, erizoIdForAgent, (gotErizoId, gotAgentId, gotErizoIdForAgent) => { const theErizo = erizos.get(erizoPosition); if (!theErizo.erizoId && gotErizoId !== 'timeout') { erizos.set(erizoPosition, gotErizoId, gotAgentId, gotErizoIdForAgent); } else if (theErizo.erizoId) { theErizo.agentId = gotAgentId; theErizo.erizoIdForAgent = gotErizoIdForAgent; } callback(gotErizoId, gotAgentId, gotErizoIdForAgent); }); };
总结:
ErizoController是一个信令服务器,且负责erizo的负载均衡,分配erizoJS应该在哪个Agent上创建,client监听客户端socket连接,通过roomController调用erizo。
4.ErizoAgent
const launchErizoJS = (erizo) => { const id = erizo.id; log.debug(`message: launching ErizoJS, erizoId: ${id}`); let erizoProcess; let out; let err; const erizoLaunchOptions = ['./../erizoJS/erizoJS.js', id, privateIP, publicIP]; if (global.config.erizoAgent.launchDebugErizoJS) { erizoLaunchOptions.push('-d'); } if (global.config.erizoAgent.useIndividualLogFiles) { out = fs.openSync(`${global.config.erizoAgent.instanceLogDir}/erizo-${id}.log`, 'a'); err = fs.openSync(`${global.config.erizoAgent.instanceLogDir}/erizo-${id}.log`, 'a'); erizoProcess = spawn(LAUNCH_SCRIPT, erizoLaunchOptions, { detached: true, stdio: ['ignore', out, err] }); } else { erizoProcess = spawn(LAUNCH_SCRIPT, erizoLaunchOptions, { detached: true, stdio: ['ignore', 'pipe', 'pipe'] }); erizoProcess.stdout.setEncoding('utf8'); erizoProcess.stdout.on('data', (message) => { printErizoLogMessage(`[erizo-${id}]`, message.replace(/ $/, '')); }); erizoProcess.stderr.setEncoding('utf8'); erizoProcess.stderr.on('data', (message) => { printErizoLogMessage(`[erizo-${id}]`, message.replace(/ $/, '')); }); } erizoProcess.unref(); erizoProcess.on('close', () => { log.info(`message: closed, erizoId: ${id}`); erizos.delete(id); if (out !== undefined) { fs.close(out, (message) => { if (message) { log.error('message: error closing log file, ', `erizoId: ${id}`, 'error:', message); } }); } if (err !== undefined) { fs.close(err, (message) => { if (message) { log.error('message: error closing log file, ', `erizoId: ${id}`, 'error:', message); } }); } erizos.fill(); }); log.info(`message: launched new ErizoJS, erizoId: ${id}`); // eslint-disable-next-line no-param-reassign erizo.process = erizoProcess; };
5.ErizoJS
/* * Adds a publisher to the room. This creates a new OneToManyProcessor * and a new WebRtcConnection. This WebRtcConnection will be the publisher * of the OneToManyProcessor. */ that.addPublisher = (erizoControllerId, clientId, streamId, options, callbackRpc) => { updateUptimeInfo(); let publisher; log.info('addPublisher, clientId', clientId, 'streamId', streamId); const client = getOrCreateClient(erizoControllerId, clientId, options.singlePC); if (publishers[streamId] === undefined) { // eslint-disable-next-line no-param-reassign options.publicIP = that.publicIP; // eslint-disable-next-line no-param-reassign options.privateRegexp = that.privateRegexp; //新建connection的时候会在C++层建立WebRtcConnection,附带的有媒体设置消息, //同时会创建mediaStream,同时维护一个mediaStream字典 const connection = client.getOrCreateConnection(options); log.info('message: Adding publisher, ' + `clientId: ${clientId}, ` + `streamId: ${streamId}`, logger.objectToLog(options), logger.objectToLog(options.metadata)); publisher = new Publisher(clientId, streamId, connection, options); publishers[streamId] = publisher; publisher.initMediaStream(); publisher.on('callback', onAdaptSchemeNotify.bind(this, callbackRpc)); publisher.on('periodic_stats', onPeriodicStats.bind(this, streamId, undefined)); publisher.promise.then(() => { //更新connection状态并初始化,options.createOffer=true就会创建offer, //并且在connection状态到达onInitialized/onGathered的时候发送offer connection.init(options.createOffer); }); connection.onInitialized.then(() => { callbackRpc('callback', { type: 'initializing', connectionId: connection.id }); }); connection.onReady.then(() => { callbackRpc('callback', { type: 'ready' }); }); connection.onStarted.then(() => { callbackRpc('callback', { type: 'started' }); }); if (options.createOffer) { let onEvent; if (options.trickleIce) { onEvent = connection.onInitialized; } else { onEvent = connection.onGathered; } onEvent.then(() => { connection.sendOffer(); }); } } else { publisher = publishers[streamId]; if (publisher.numSubscribers === 0) { log.warn('message: publisher already set but no subscribers will ignore, ' + `code: ${WARN_CONFLICT}, streamId: ${streamId}`, logger.objectToLog(options.metadata)); } else { log.warn('message: publisher already set has subscribers will ignore, ' + `code: ${WARN_CONFLICT}, streamId: ${streamId}`); } } };
6.时序图
(未完待续)