zoukankan      html  css  js  c++  java
  • WebRTC进阶流媒体服务器开发(三)Mediasoup源码分析之应用层(代码组成、Server.js、Room.js)

    一:Mediasoup Demo分析

    了解Mediasoup运行机制,以及如何调用Mediasoup核心库

    (一)Mediasoup Demo组成

    其中mediasoup-demo为整个代码框架:(包含所有)

    app应用:提供客户端所需要的应用代码

    broadcasters:用于广播使用,用于推流的模块。单向传输,只有去或者只有回

    server端:信令服务和媒体流服务,两者通过管道通信。细分为下面几部分:

    --->config.js:配置文件/js文件,通过js获取一些基本信息。将配置信息交给servere.js使用

    --->server.js:从config.js中去获取基本信息,获取信息之后去启动基本服务,比如wensocket服务、信令服务

    --->lib:server.js使用的库文件,细分为以下几部分

    ------->Room.js:所有的真正的信令处理逻辑都是在这里实现,还描述了房间相关信息

    ------->interactiveClient.js:运行时内部信息查询客户端,与客户端交互(debug使用)

    ------->interactiveServer.js:运行时内部信息查询服务端,与服务端交换(debug使用)

    mediasoup C++:C++部分,用于处理流媒体传输,包括lib与worker两部分

    --->lib:一些js文件组成,主要用于对mediasoup的管理工作

    --->worker:C++核心代码

    二:Server.js分析

    (一)配置环境,从config.js获取参数

    process.title = 'mediasoup-demo-server';                            //启动进程之后,进程的名字
    process.env.DEBUG = process.env.DEBUG || '*INFO* *WARN* *ERROR*';    //Debug环境变量
    
    /*引入config.js,内部定义了一些参数:
    https(证书位置,监听IP、端口)、
    mediasoup(CPU个数,worker进程参数定义如日志级别、端口范围,router:room的概念在C++表示定义了音视频编解码参数)
    webRtcTransport传输(IP地址、端口、输入输出码率)
    */
    const config = require('./config');                                    
    
    /* eslint-disable no-console */
    console.log('process.env.DEBUG:', process.env.DEBUG);
    console.log('config.js:
    %s', JSON.stringify(config, null, '  '));

    (二)引入模块,初始化变量

    const fs = require('fs');
    const https = require('https');
    const url = require('url');
    const protoo = require('protoo-server');
    const mediasoup = require('mediasoup');            //mediasoup库
    const express = require('express');
    const bodyParser = require('body-parser');
    const { AwaitQueue } = require('awaitqueue');    //同步队列
    const Logger = require('./lib/Logger');
    const Room = require('./lib/Room');                //房间管理
    const interactiveServer = require('./lib/interactiveServer');
    const interactiveClient = require('./lib/interactiveClient');
    
    const logger = new Logger();
    
    // Async queue to manage rooms.
    // @type {AwaitQueue}
    const queue = new AwaitQueue();
    
    // Map of Room instances indexed by roomId.
    // @type {Map<Number, Room>}
    const rooms = new Map();
    
    // HTTPS server.
    // @type {https.Server}
    let httpsServer;
    
    // Express application.
    // @type {Function}
    let expressApp;
    
    // Protoo WebSocket server.
    // @type {protoo.WebSocketServer}
    let protooWebSocketServer;
    
    // mediasoup Workers.
    // @type {Array<mediasoup.Worker>}
    const mediasoupWorkers = [];      //数组,存放所有创建的worker进程
    
    // Index of next mediasoup Worker to use.
    // @type {Number}
    let nextMediasoupWorkerIdx = 0;

    (三)进入主函数,分析主函数run方法

    run();                                            //进入run方法,开始执行
    
    async function run()
    {
        // Open the interactive server.
        await interactiveServer();                    //启动interactive server,用于交互---不是重点
    
        // Open the interactive client.
        if (process.env.INTERACTIVE === 'true' || process.env.INTERACTIVE === '1')
            await interactiveClient();                //启动interactive client,用于交互---不是重点
    
        // Run a mediasoup Worker.
        await runMediasoupWorkers();                //将所有的需要的进程启动,重点!!!
    
        // Create Express app.
        await createExpressApp();                    //https业务管理,主要用于broadcast---不是重点,但是内部创建了expressApp全局变量,在下面创建https服务中使用
    
        // Run HTTPS server.
        await runHttpsServer();                        //https server运行
    
        // Run a protoo WebSocketServer.
        await runProtooWebSocketServer();            //启动websocket,用于处理接受发送信令,重点!!!
    
        // Log rooms status every X seconds.
        setInterval(() =>
        {
            for (const room of rooms.values())
            {
                room.logStatus();
            }
        }, 120000);
    }

    (四)分析runMediasoupWorkers方法,启动相关进程服务

    /**
     * Launch as many mediasoup Workers as given in the configuration file.
     */
    async function runMediasoupWorkers()
    {
        const { numWorkers } = config.mediasoup;                    //从配置中获取要启动的进程个数,下面进行循环创建
    
        logger.info('running %d mediasoup Workers...', numWorkers);
    
        for (let i = 0; i < numWorkers; ++i)
        {
            const worker = await mediasoup.createWorker(            //底层调用fork创建子进程,传入相关参数
                {
                    logLevel   : config.mediasoup.workerSettings.logLevel,
                    logTags    : config.mediasoup.workerSettings.logTags,
                    rtcMinPort : Number(config.mediasoup.workerSettings.rtcMinPort),
                    rtcMaxPort : Number(config.mediasoup.workerSettings.rtcMaxPort)
                });
    
            worker.on('died', () =>                                    //每个worker进程,监听一个退出事件
            {
                logger.error(
                    'mediasoup Worker died, exiting  in 2 seconds... [pid:%d]', worker.pid);
    
                setTimeout(() => process.exit(1), 2000);
            });
    
            mediasoupWorkers.push(worker);                            //将创建完成的worker进程,存放到数组
    
            // Log worker resource usage every X seconds.
            setInterval(async () =>
            {
                const usage = await worker.getResourceUsage();
    
                logger.info('mediasoup Worker resource usage [pid:%d]: %o', worker.pid, usage);
            }, 120000);
        }
    }

    查看createWorker方法:

    /**
     * Create a Worker.
     */
    async function createWorker({ logLevel = 'error', logTags, rtcMinPort = 10000, rtcMaxPort = 59999, dtlsCertificateFile, dtlsPrivateKeyFile, appData = {} } = {}) {
        logger.debug('createWorker()');
        if (appData && typeof appData !== 'object')
            throw new TypeError('if given, appData must be an object');
        const worker = new Worker_1.Worker({          //创建worker对象
            logLevel,
            logTags,
            rtcMinPort,
            rtcMaxPort,
            dtlsCertificateFile,
            dtlsPrivateKeyFile,
            appData
        });
        return new Promise((resolve, reject) => {       //监听worker创建是否成功
            worker.on('@success', () => {
                // Emit observer event.
                observer.safeEmit('newworker', worker);
                resolve(worker);                 //成功则直接返回
            });
            worker.on('@failure', reject);
        });
    }
    exports.createWorker = createWorker;            //模块导出,其他文件可以使用

    查看Worker类:

    class Worker extends EnhancedEventEmitter_1.EnhancedEventEmitter {
        /**
         * @private
         * @emits died - (error: Error)
         * @emits @success
         * @emits @failure - (error: Error)
         */
        constructor({ logLevel, logTags, rtcMinPort, rtcMaxPort, dtlsCertificateFile, dtlsPrivateKeyFile, appData }) {  //构造函数
            super();
            // Closed flag.
            this._closed = false;
            // Routers set.
            this._routers = new Set();
            // Observer instance.
            this._observer = new EnhancedEventEmitter_1.EnhancedEventEmitter();
            logger.debug('constructor()');
            let spawnBin = workerBin;
            let spawnArgs = [];
            if (process.env.MEDIASOUP_USE_VALGRIND === 'true') {
                spawnBin = process.env.MEDIASOUP_VALGRIND_BIN || 'valgrind';
                if (process.env.MEDIASOUP_VALGRIND_OPTIONS) {
                    spawnArgs = spawnArgs.concat(process.env.MEDIASOUP_VALGRIND_OPTIONS.split(/s+/));
                }
                spawnArgs.push(workerBin);
            }
            if (typeof logLevel === 'string' && logLevel)
                spawnArgs.push(`--logLevel=${logLevel}`);
            for (const logTag of (Array.isArray(logTags) ? logTags : [])) {
                if (typeof logTag === 'string' && logTag)
                    spawnArgs.push(`--logTag=${logTag}`);
            }
            if (typeof rtcMinPort === 'number' && !Number.isNaN(rtcMinPort))
                spawnArgs.push(`--rtcMinPort=${rtcMinPort}`);
            if (typeof rtcMaxPort === 'number' && !Number.isNaN(rtcMaxPort))
                spawnArgs.push(`--rtcMaxPort=${rtcMaxPort}`);
            if (typeof dtlsCertificateFile === 'string' && dtlsCertificateFile)
                spawnArgs.push(`--dtlsCertificateFile=${dtlsCertificateFile}`);
            if (typeof dtlsPrivateKeyFile === 'string' && dtlsPrivateKeyFile)
                spawnArgs.push(`--dtlsPrivateKeyFile=${dtlsPrivateKeyFile}`);
            logger.debug('spawning worker process: %s %s', spawnBin, spawnArgs.join(' '));
            this._child = child_process_1.spawn(          //传入参数,启动进程
            // command
            spawnBin, 
            // args
            spawnArgs, 
            // options
            {
                env: {
                    MEDIASOUP_VERSION: '3.7.11',
                    // Let the worker process inherit all environment variables, useful
                    // if a custom and not in the path GCC is used so the user can set
                    // LD_LIBRARY_PATH environment variable for runtime.
                    ...process.env
                },
                detached: false,
                // fd 0 (stdin)   : Just ignore it.
                // fd 1 (stdout)  : Pipe it for 3rd libraries that log their own stuff.
                // fd 2 (stderr)  : Same as stdout.
                // fd 3 (channel) : Producer Channel fd.
                // fd 4 (channel) : Consumer Channel fd.
                // fd 5 (channel) : Producer PayloadChannel fd.
                // fd 6 (channel) : Consumer PayloadChannel fd.
                stdio: ['ignore', 'pipe', 'pipe', 'pipe', 'pipe', 'pipe', 'pipe'],
                windowsHide: true
            });
            this._pid = this._child.pid;
            this._channel = new Channel_1.Channel({
                producerSocket: this._child.stdio[3],
                consumerSocket: this._child.stdio[4],
                pid: this._pid
            });
            this._payloadChannel = new PayloadChannel_1.PayloadChannel({
                // NOTE: TypeScript does not like more than 5 fds.
                // @ts-ignore
                producerSocket: this._child.stdio[5],
                // @ts-ignore
                consumerSocket: this._child.stdio[6]
            });
            this._appData = appData;
            let spawnDone = false;
            // Listen for 'running' notification.
            this._channel.once(String(this._pid), (event) => {
                if (!spawnDone && event === 'running') {
                    spawnDone = true;
                    logger.debug('worker process running [pid:%s]', this._pid);
                    this.emit('@success');
                }
            });
            this._child.on('exit', (code, signal) => {
                this._child = undefined;
                this.close();
                if (!spawnDone) {
                    spawnDone = true;
                    if (code === 42) {
                        logger.error('worker process failed due to wrong settings [pid:%s]', this._pid);
                        this.emit('@failure', new TypeError('wrong settings'));
                    }
                    else {
                        logger.error('worker process failed unexpectedly [pid:%s, code:%s, signal:%s]', this._pid, code, signal);
                        this.emit('@failure', new Error(`[pid:${this._pid}, code:${code}, signal:${signal}]`));
                    }
                }
                else {
                    logger.error('worker process died unexpectedly [pid:%s, code:%s, signal:%s]', this._pid, code, signal);
                    this.safeEmit('died', new Error(`[pid:${this._pid}, code:${code}, signal:${signal}]`));
                }
            });
            this._child.on('error', (error) => {
                this._child = undefined;
                this.close();
                if (!spawnDone) {
                    spawnDone = true;
                    logger.error('worker process failed [pid:%s]: %s', this._pid, error.message);
                    this.emit('@failure', error);
                }
                else {
                    logger.error('worker process error [pid:%s]: %s', this._pid, error.message);
                    this.safeEmit('died', error);
                }
            });
            // Be ready for 3rd party worker libraries logging to stdout.
            this._child.stdout.on('data', (buffer) => {
                for (const line of buffer.toString('utf8').split('
    ')) {
                    if (line)
                        workerLogger.debug(`(stdout) ${line}`);
                }
            });
            // In case of a worker bug, mediasoup will log to stderr.
            this._child.stderr.on('data', (buffer) => {
                for (const line of buffer.toString('utf8').split('
    ')) {
                    if (line)
                        workerLogger.error(`(stderr) ${line}`);
                }
            });
        }
        /**
         * Worker process identifier (PID).
         */
        get pid() {
            return this._pid;
        }
        /**
         * Whether the Worker is closed.
         */
        get closed() {
            return this._closed;
        }
        /**
         * App custom data.
         */
        get appData() {
            return this._appData;
        }
        /**
         * Invalid setter.
         */
        set appData(appData) {
            throw new Error('cannot override appData object');
        }
        /**
         * Observer.
         *
         * @emits close
         * @emits newrouter - (router: Router)
         */
        get observer() {
            return this._observer;
        }
        /**
         * Close the Worker.
         */
        close() {
            if (this._closed)
                return;
            logger.debug('close()');
            this._closed = true;
            // Kill the worker process.
            if (this._child) {
                // Remove event listeners but leave a fake 'error' hander to avoid
                // propagation.
                this._child.removeAllListeners('exit');
                this._child.removeAllListeners('error');
                this._child.on('error', () => { });
                this._child.kill('SIGTERM');
                this._child = undefined;
            }
            // Close the Channel instance.
            this._channel.close();
            // Close the PayloadChannel instance.
            this._payloadChannel.close();
            // Close every Router.
            for (const router of this._routers) {
                router.workerClosed();
            }
            this._routers.clear();
            // Emit observer event.
            this._observer.safeEmit('close');
        }
        /**
         * Dump Worker.
         */
        async dump() {
            logger.debug('dump()');
            return this._channel.request('worker.dump');
        }
        /**
         * Get mediasoup-worker process resource usage.
         */
        async getResourceUsage() {
            logger.debug('getResourceUsage()');
            return this._channel.request('worker.getResourceUsage');
        }
        /**
         * Update settings.
         */
        async updateSettings({ logLevel, logTags } = {}) {
            logger.debug('updateSettings()');
            const reqData = { logLevel, logTags };
            await this._channel.request('worker.updateSettings', undefined, reqData);
        }
        /**
         * Create a Router.
         */
        async createRouter({ mediaCodecs, appData = {} } = {}) {
            logger.debug('createRouter()');
            if (appData && typeof appData !== 'object')
                throw new TypeError('if given, appData must be an object');
            // This may throw.
            const rtpCapabilities = ortc.generateRouterRtpCapabilities(mediaCodecs);
            const internal = { routerId: uuid_1.v4() };
            await this._channel.request('worker.createRouter', internal);
            const data = { rtpCapabilities };
            const router = new Router_1.Router({
                internal,
                data,
                channel: this._channel,
                payloadChannel: this._payloadChannel,
                appData
            });
            this._routers.add(router);
            router.on('@close', () => this._routers.delete(router));
            // Emit observer event.
            this._observer.safeEmit('newrouter', router);
            return router;
        }
    }
    exports.Worker = Worker;

    (五)分析runHttpsServer方法,启动https服务

    /**
     * Create a Node.js HTTPS server. It listens in the IP and port given in the
     * configuration file and reuses the Express application as request listener.
     */
    async function runHttpsServer()
    {
        logger.info('running an HTTPS server...');
    
        // HTTPS server for the protoo WebSocket server.
        const tls =
        {
            cert : fs.readFileSync(config.https.tls.cert),
            key  : fs.readFileSync(config.https.tls.key)
        };
    
        httpsServer = https.createServer(tls, expressApp);    //传入证书和expressApp,创建https服务,存放在全局变量中,在后面使用websocket时使用
    
        await new Promise((resolve) =>
        {
            httpsServer.listen(
                Number(config.https.listenPort), config.https.listenIp, resolve);  //进行监听端口
        });
    }

    (六)分析runProtooWebSocketServer方法,用于处理接受发送信令

    /**
     * Create a protoo WebSocketServer to allow WebSocket connections from browsers.
     */
    async function runProtooWebSocketServer()
    {
        logger.info('running protoo WebSocketServer...');
    
        // Create the protoo WebSocket server.
        protooWebSocketServer = new protoo.WebSocketServer(httpsServer,  //创建websocket对象,依赖于前面创建的httpsServer
            {
                maxReceivedFrameSize     : 960000, // 960 KBytes.
                maxReceivedMessageSize   : 960000,
                fragmentOutgoingMessages : true,
                fragmentationThreshold   : 960000
            });
    
        // Handle connections from clients.
        protooWebSocketServer.on('connectionrequest', (info, accept, reject) =>  //侦听connectionrequest事件,处理请求
        {
            // The client indicates the roomId and peerId in the URL query.
            const u = url.parse(info.request.url, true);
            const roomId = u.query['roomId'];              //请求参数包含roomid
            const peerId = u.query['peerId'];              //用户id
    
            if (!roomId || !peerId)
            {
                reject(400, 'Connection request without roomId and/or peerId');
    
                return;
            }
    
            logger.info(
                'protoo connection request [roomId:%s, peerId:%s, address:%s, origin:%s]',
                roomId, peerId, info.socket.remoteAddress, info.origin);
    
            // Serialize this code into the queue to avoid that two peers connecting at
            // the same time with the same roomId create two separate rooms with same
            // roomId.
            queue.push(async () =>            //放入同步队列,防止冲突
            {
                const room = await getOrCreateRoom({ roomId });  //如果是第一个用户,则创建房间,不然就加入房间
    
                // Accept the protoo WebSocket connection.
                const protooWebSocketTransport = accept();
    
                room.handleProtooConnection({ peerId, protooWebSocketTransport });  //各种消息处理,后面分析Room.js会分析这个方法
            })
                .catch((error) =>
                {
                    logger.error('room creation or room joining failed:%o', error);
    
                    reject(error);
                });
        });
    }

    三:Room.js分析

    (一)Mediasoup基本概念

    Room/Router:在业务层称为Room,在C++层称为Router。

    Transport/WebRtcTransport:Transport是基类,WebRtcTransport是子类,所以还有其他Transport的子类。Transport是客户端与服务端建立连接的管理层

    Produce/Consume:生产者/消费者,每个用户本身是一个生产者,同时又是多个用户的消费者。数据传输通过Transport进行传输。通过Transport,可以将数据上传到流媒体服务器,同样流媒体服务器可以通过Transport下发数据到消费者。

    (二)Room主要逻辑

    (三)Mediasoup支持的信令

    createWebRtcTransport:建立WebRTC连接,在服务端创建一个与客户端对等的点(含有信息,比如IP、端口),有了这个点之后才能建立连接

    connectWebRtcTransport:真正的与客户端建立连接,数据可以开始传输

    setConsumerPreferedLayers:设置更喜欢的层,比如simulcast分层,选取其中最合适的分辨率传输

    requestConsumerKeyFrame:请求关键帧,避免花屏,比如一个新的用户加入视频,如果不及时请求IDR帧,那么可能获取的P、B帧无法解析,导致花屏

    如果有特殊需要,可以设置自己定义的信令!!!

    (四)代码分析

    class Room extends EventEmitter
    {
        static async create({ mediasoupWorker, roomId })
        {
            logger.info('create() [roomId:%s]', roomId);
    
            // Create a protoo Room instance.
            const protooRoom = new protoo.Room();                    //protoo是websocket库,实现两种功能,一个是实现房间管理(socket.io中也有这个概念),一个是websocket功能
    
            // Router media codecs.
            const { mediaCodecs } = config.mediasoup.routerOptions;    //获取媒体流编解码信息
    
            // Create a mediasoup Router.
            const mediasoupRouter = await mediasoupWorker.createRouter({ mediaCodecs });    //创建Router,后面在new Room的时候将protoo.Room和C++中的Router绑定在一起了
    
            // Create a mediasoup AudioLevelObserver.
            const audioLevelObserver = await mediasoupRouter.createAudioLevelObserver(        //音频音量信息
                {
                    maxEntries : 1,
                    threshold  : -80,
                    interval   : 800
                });
    
            const bot = await Bot.create({ mediasoupRouter });
    
            return new Room(    //关联信息,创建Room,调用构造函数
                {
                    roomId,
                    protooRoom,
                    mediasoupRouter,
                    audioLevelObserver,
                    bot
                });
        }
    
    
        constructor({ roomId, protooRoom, mediasoupRouter, audioLevelObserver, bot })
        {
            super();
            this.setMaxListeners(Infinity);
            this._roomId = roomId;
            this._closed = false;
            this._protooRoom = protooRoom;
    
            this._broadcasters = new Map();
    
            this._mediasoupRouter = mediasoupRouter;
            this._audioLevelObserver = audioLevelObserver;
    
            this._bot = bot;
    
            this._networkThrottled = false;
            this._handleAudioLevelObserver();    //处理音频音量事件
            global.audioLevelObserver = this._audioLevelObserver;
            global.bot = this._bot;
        }
    
    
    
        handleProtooConnection({ peerId, consume, protooWebSocketTransport })    //由server.js中runProtooWebSocketServer调用,用于处理客户端的连接
        {
            const existingPeer = this._protooRoom.getPeer(peerId);
    
            if (existingPeer)    //如果用户已经存在,则关闭,重新进入
            {
                logger.warn(
                    'handleProtooConnection() | there is already a protoo Peer with same peerId, closing it [peerId:%s]',
                    peerId);
    
                existingPeer.close();
            }
    
            let peer;
    
            try
            {
                peer = this._protooRoom.createPeer(peerId, protooWebSocketTransport);    //创建一个新的peer用户
            }
            catch (error)
            {
                logger.error('protooRoom.createPeer() failed:%o', error);
            }
    
            // Not joined after a custom protoo 'join' request is later received.
            //设置用户信息
            peer.data.consume = consume;
            peer.data.joined = false;
            peer.data.displayName = undefined;
            peer.data.device = undefined;
            peer.data.rtpCapabilities = undefined;
            peer.data.sctpCapabilities = undefined;
    
            // Have mediasoup related maps ready even before the Peer joins since we
            // allow creating Transports before joining.
            peer.data.transports = new Map();
            peer.data.producers = new Map();
            peer.data.consumers = new Map();
            peer.data.dataProducers = new Map();
            peer.data.dataConsumers = new Map();
    
            peer.on('request', (request, accept, reject) =>        //监听request信令
            {
                logger.debug(
                    'protoo Peer "request" event [method:%s, peerId:%s]',
                    request.method, peer.id);
    
                this._handleProtooRequest(peer, request, accept, reject)    //调用私有方法,处理请求。内部实现状态机switch...case...处理各种状态(信令,来自于request.method)
                    .catch((error) =>
                    {
                        logger.error('request failed:%o', error);
    
                        reject(error);
                    });
            });
    
            peer.on('close', () =>
            {
                if (this._closed)
                    return;
    
                logger.debug('protoo Peer "close" event [peerId:%s]', peer.id);
    
                // If the Peer was joined, notify all Peers.
                if (peer.data.joined)
                {
                    for (const otherPeer of this._getJoinedPeers({ excludePeer: peer }))
                    {
                        otherPeer.notify('peerClosed', { peerId: peer.id })
                            .catch(() => {});
                    }
                }
    
                for (const transport of peer.data.transports.values())
                {
                    transport.close();
                }
    
                // If this is the latest Peer in the room, close the room.
                if (this._protooRoom.peers.length === 0)
                {
                    logger.info(
                        'last Peer in the room left, closing the room [roomId:%s]',
                        this._roomId);
    
                    this.close();
                }
            });
        }
    }
  • 相关阅读:
    查看mysql版本的四种方法及常用命令
    newInstance和new的区别(good)
    Citrix 服务器虚拟化之六 Xenserver虚拟机创建与快照
    Java实现 蓝桥杯 算法训练 排序
    Java实现 蓝桥杯 算法训练 排序
    Java实现 蓝桥杯 算法训练 排序
    Java实现 蓝桥杯 算法训练 2的次幂表示
    Java实现 蓝桥杯 算法训练 2的次幂表示
    Java实现 蓝桥杯 算法训练 前缀表达式
    Java实现 蓝桥杯 算法训练 前缀表达式
  • 原文地址:https://www.cnblogs.com/ssyfj/p/14847097.html
Copyright © 2011-2022 走看看