zoukankan      html  css  js  c++  java
  • WebRTC进阶流媒体服务器开发(三)Mediasoup源码分析之应用层(代码组成、Server.js、Room.js)

    一:Mediasoup Demo分析

    了解Mediasoup运行机制,以及如何调用Mediasoup核心库

    (一)Mediasoup Demo组成

    其中mediasoup-demo为整个代码框架:(包含所有)

    app应用:提供客户端所需要的应用代码

    broadcasters:用于广播使用,用于推流的模块。单向传输,只有去或者只有回

    server端:信令服务和媒体流服务,两者通过管道通信。细分为下面几部分:

    --->config.js:配置文件/js文件,通过js获取一些基本信息。将配置信息交给servere.js使用

    --->server.js:从config.js中去获取基本信息,获取信息之后去启动基本服务,比如wensocket服务、信令服务

    --->lib:server.js使用的库文件,细分为以下几部分

    ------->Room.js:所有的真正的信令处理逻辑都是在这里实现,还描述了房间相关信息

    ------->interactiveClient.js:运行时内部信息查询客户端,与客户端交互(debug使用)

    ------->interactiveServer.js:运行时内部信息查询服务端,与服务端交换(debug使用)

    mediasoup C++:C++部分,用于处理流媒体传输,包括lib与worker两部分

    --->lib:一些js文件组成,主要用于对mediasoup的管理工作

    --->worker:C++核心代码

    二:Server.js分析

    (一)配置环境,从config.js获取参数

    process.title = 'mediasoup-demo-server';                            //启动进程之后,进程的名字
    process.env.DEBUG = process.env.DEBUG || '*INFO* *WARN* *ERROR*';    //Debug环境变量
    
    /*引入config.js,内部定义了一些参数:
    https(证书位置,监听IP、端口)、
    mediasoup(CPU个数,worker进程参数定义如日志级别、端口范围,router:room的概念在C++表示定义了音视频编解码参数)
    webRtcTransport传输(IP地址、端口、输入输出码率)
    */
    const config = require('./config');                                    
    
    /* eslint-disable no-console */
    console.log('process.env.DEBUG:', process.env.DEBUG);
    console.log('config.js:
    %s', JSON.stringify(config, null, '  '));

    (二)引入模块,初始化变量

    const fs = require('fs');
    const https = require('https');
    const url = require('url');
    const protoo = require('protoo-server');
    const mediasoup = require('mediasoup');            //mediasoup库
    const express = require('express');
    const bodyParser = require('body-parser');
    const { AwaitQueue } = require('awaitqueue');    //同步队列
    const Logger = require('./lib/Logger');
    const Room = require('./lib/Room');                //房间管理
    const interactiveServer = require('./lib/interactiveServer');
    const interactiveClient = require('./lib/interactiveClient');
    
    const logger = new Logger();
    
    // Async queue to manage rooms.
    // @type {AwaitQueue}
    const queue = new AwaitQueue();
    
    // Map of Room instances indexed by roomId.
    // @type {Map<Number, Room>}
    const rooms = new Map();
    
    // HTTPS server.
    // @type {https.Server}
    let httpsServer;
    
    // Express application.
    // @type {Function}
    let expressApp;
    
    // Protoo WebSocket server.
    // @type {protoo.WebSocketServer}
    let protooWebSocketServer;
    
    // mediasoup Workers.
    // @type {Array<mediasoup.Worker>}
    const mediasoupWorkers = [];      //数组,存放所有创建的worker进程
    
    // Index of next mediasoup Worker to use.
    // @type {Number}
    let nextMediasoupWorkerIdx = 0;

    (三)进入主函数,分析主函数run方法

    run();                                            //进入run方法,开始执行
    
    async function run()
    {
        // Open the interactive server.
        await interactiveServer();                    //启动interactive server,用于交互---不是重点
    
        // Open the interactive client.
        if (process.env.INTERACTIVE === 'true' || process.env.INTERACTIVE === '1')
            await interactiveClient();                //启动interactive client,用于交互---不是重点
    
        // Run a mediasoup Worker.
        await runMediasoupWorkers();                //将所有的需要的进程启动,重点!!!
    
        // Create Express app.
        await createExpressApp();                    //https业务管理,主要用于broadcast---不是重点,但是内部创建了expressApp全局变量,在下面创建https服务中使用
    
        // Run HTTPS server.
        await runHttpsServer();                        //https server运行
    
        // Run a protoo WebSocketServer.
        await runProtooWebSocketServer();            //启动websocket,用于处理接受发送信令,重点!!!
    
        // Log rooms status every X seconds.
        setInterval(() =>
        {
            for (const room of rooms.values())
            {
                room.logStatus();
            }
        }, 120000);
    }

    (四)分析runMediasoupWorkers方法,启动相关进程服务

    /**
     * Launch as many mediasoup Workers as given in the configuration file.
     */
    async function runMediasoupWorkers()
    {
        const { numWorkers } = config.mediasoup;                    //从配置中获取要启动的进程个数,下面进行循环创建
    
        logger.info('running %d mediasoup Workers...', numWorkers);
    
        for (let i = 0; i < numWorkers; ++i)
        {
            const worker = await mediasoup.createWorker(            //底层调用fork创建子进程,传入相关参数
                {
                    logLevel   : config.mediasoup.workerSettings.logLevel,
                    logTags    : config.mediasoup.workerSettings.logTags,
                    rtcMinPort : Number(config.mediasoup.workerSettings.rtcMinPort),
                    rtcMaxPort : Number(config.mediasoup.workerSettings.rtcMaxPort)
                });
    
            worker.on('died', () =>                                    //每个worker进程,监听一个退出事件
            {
                logger.error(
                    'mediasoup Worker died, exiting  in 2 seconds... [pid:%d]', worker.pid);
    
                setTimeout(() => process.exit(1), 2000);
            });
    
            mediasoupWorkers.push(worker);                            //将创建完成的worker进程,存放到数组
    
            // Log worker resource usage every X seconds.
            setInterval(async () =>
            {
                const usage = await worker.getResourceUsage();
    
                logger.info('mediasoup Worker resource usage [pid:%d]: %o', worker.pid, usage);
            }, 120000);
        }
    }

    查看createWorker方法:

    /**
     * Create a Worker.
     */
    async function createWorker({ logLevel = 'error', logTags, rtcMinPort = 10000, rtcMaxPort = 59999, dtlsCertificateFile, dtlsPrivateKeyFile, appData = {} } = {}) {
        logger.debug('createWorker()');
        if (appData && typeof appData !== 'object')
            throw new TypeError('if given, appData must be an object');
        const worker = new Worker_1.Worker({          //创建worker对象
            logLevel,
            logTags,
            rtcMinPort,
            rtcMaxPort,
            dtlsCertificateFile,
            dtlsPrivateKeyFile,
            appData
        });
        return new Promise((resolve, reject) => {       //监听worker创建是否成功
            worker.on('@success', () => {
                // Emit observer event.
                observer.safeEmit('newworker', worker);
                resolve(worker);                 //成功则直接返回
            });
            worker.on('@failure', reject);
        });
    }
    exports.createWorker = createWorker;            //模块导出,其他文件可以使用

    查看Worker类:

    class Worker extends EnhancedEventEmitter_1.EnhancedEventEmitter {
        /**
         * @private
         * @emits died - (error: Error)
         * @emits @success
         * @emits @failure - (error: Error)
         */
        constructor({ logLevel, logTags, rtcMinPort, rtcMaxPort, dtlsCertificateFile, dtlsPrivateKeyFile, appData }) {  //构造函数
            super();
            // Closed flag.
            this._closed = false;
            // Routers set.
            this._routers = new Set();
            // Observer instance.
            this._observer = new EnhancedEventEmitter_1.EnhancedEventEmitter();
            logger.debug('constructor()');
            let spawnBin = workerBin;
            let spawnArgs = [];
            if (process.env.MEDIASOUP_USE_VALGRIND === 'true') {
                spawnBin = process.env.MEDIASOUP_VALGRIND_BIN || 'valgrind';
                if (process.env.MEDIASOUP_VALGRIND_OPTIONS) {
                    spawnArgs = spawnArgs.concat(process.env.MEDIASOUP_VALGRIND_OPTIONS.split(/s+/));
                }
                spawnArgs.push(workerBin);
            }
            if (typeof logLevel === 'string' && logLevel)
                spawnArgs.push(`--logLevel=${logLevel}`);
            for (const logTag of (Array.isArray(logTags) ? logTags : [])) {
                if (typeof logTag === 'string' && logTag)
                    spawnArgs.push(`--logTag=${logTag}`);
            }
            if (typeof rtcMinPort === 'number' && !Number.isNaN(rtcMinPort))
                spawnArgs.push(`--rtcMinPort=${rtcMinPort}`);
            if (typeof rtcMaxPort === 'number' && !Number.isNaN(rtcMaxPort))
                spawnArgs.push(`--rtcMaxPort=${rtcMaxPort}`);
            if (typeof dtlsCertificateFile === 'string' && dtlsCertificateFile)
                spawnArgs.push(`--dtlsCertificateFile=${dtlsCertificateFile}`);
            if (typeof dtlsPrivateKeyFile === 'string' && dtlsPrivateKeyFile)
                spawnArgs.push(`--dtlsPrivateKeyFile=${dtlsPrivateKeyFile}`);
            logger.debug('spawning worker process: %s %s', spawnBin, spawnArgs.join(' '));
            this._child = child_process_1.spawn(          //传入参数,启动进程
            // command
            spawnBin, 
            // args
            spawnArgs, 
            // options
            {
                env: {
                    MEDIASOUP_VERSION: '3.7.11',
                    // Let the worker process inherit all environment variables, useful
                    // if a custom and not in the path GCC is used so the user can set
                    // LD_LIBRARY_PATH environment variable for runtime.
                    ...process.env
                },
                detached: false,
                // fd 0 (stdin)   : Just ignore it.
                // fd 1 (stdout)  : Pipe it for 3rd libraries that log their own stuff.
                // fd 2 (stderr)  : Same as stdout.
                // fd 3 (channel) : Producer Channel fd.
                // fd 4 (channel) : Consumer Channel fd.
                // fd 5 (channel) : Producer PayloadChannel fd.
                // fd 6 (channel) : Consumer PayloadChannel fd.
                stdio: ['ignore', 'pipe', 'pipe', 'pipe', 'pipe', 'pipe', 'pipe'],
                windowsHide: true
            });
            this._pid = this._child.pid;
            this._channel = new Channel_1.Channel({
                producerSocket: this._child.stdio[3],
                consumerSocket: this._child.stdio[4],
                pid: this._pid
            });
            this._payloadChannel = new PayloadChannel_1.PayloadChannel({
                // NOTE: TypeScript does not like more than 5 fds.
                // @ts-ignore
                producerSocket: this._child.stdio[5],
                // @ts-ignore
                consumerSocket: this._child.stdio[6]
            });
            this._appData = appData;
            let spawnDone = false;
            // Listen for 'running' notification.
            this._channel.once(String(this._pid), (event) => {
                if (!spawnDone && event === 'running') {
                    spawnDone = true;
                    logger.debug('worker process running [pid:%s]', this._pid);
                    this.emit('@success');
                }
            });
            this._child.on('exit', (code, signal) => {
                this._child = undefined;
                this.close();
                if (!spawnDone) {
                    spawnDone = true;
                    if (code === 42) {
                        logger.error('worker process failed due to wrong settings [pid:%s]', this._pid);
                        this.emit('@failure', new TypeError('wrong settings'));
                    }
                    else {
                        logger.error('worker process failed unexpectedly [pid:%s, code:%s, signal:%s]', this._pid, code, signal);
                        this.emit('@failure', new Error(`[pid:${this._pid}, code:${code}, signal:${signal}]`));
                    }
                }
                else {
                    logger.error('worker process died unexpectedly [pid:%s, code:%s, signal:%s]', this._pid, code, signal);
                    this.safeEmit('died', new Error(`[pid:${this._pid}, code:${code}, signal:${signal}]`));
                }
            });
            this._child.on('error', (error) => {
                this._child = undefined;
                this.close();
                if (!spawnDone) {
                    spawnDone = true;
                    logger.error('worker process failed [pid:%s]: %s', this._pid, error.message);
                    this.emit('@failure', error);
                }
                else {
                    logger.error('worker process error [pid:%s]: %s', this._pid, error.message);
                    this.safeEmit('died', error);
                }
            });
            // Be ready for 3rd party worker libraries logging to stdout.
            this._child.stdout.on('data', (buffer) => {
                for (const line of buffer.toString('utf8').split('
    ')) {
                    if (line)
                        workerLogger.debug(`(stdout) ${line}`);
                }
            });
            // In case of a worker bug, mediasoup will log to stderr.
            this._child.stderr.on('data', (buffer) => {
                for (const line of buffer.toString('utf8').split('
    ')) {
                    if (line)
                        workerLogger.error(`(stderr) ${line}`);
                }
            });
        }
        /**
         * Worker process identifier (PID).
         */
        get pid() {
            return this._pid;
        }
        /**
         * Whether the Worker is closed.
         */
        get closed() {
            return this._closed;
        }
        /**
         * App custom data.
         */
        get appData() {
            return this._appData;
        }
        /**
         * Invalid setter.
         */
        set appData(appData) {
            throw new Error('cannot override appData object');
        }
        /**
         * Observer.
         *
         * @emits close
         * @emits newrouter - (router: Router)
         */
        get observer() {
            return this._observer;
        }
        /**
         * Close the Worker.
         */
        close() {
            if (this._closed)
                return;
            logger.debug('close()');
            this._closed = true;
            // Kill the worker process.
            if (this._child) {
                // Remove event listeners but leave a fake 'error' hander to avoid
                // propagation.
                this._child.removeAllListeners('exit');
                this._child.removeAllListeners('error');
                this._child.on('error', () => { });
                this._child.kill('SIGTERM');
                this._child = undefined;
            }
            // Close the Channel instance.
            this._channel.close();
            // Close the PayloadChannel instance.
            this._payloadChannel.close();
            // Close every Router.
            for (const router of this._routers) {
                router.workerClosed();
            }
            this._routers.clear();
            // Emit observer event.
            this._observer.safeEmit('close');
        }
        /**
         * Dump Worker.
         */
        async dump() {
            logger.debug('dump()');
            return this._channel.request('worker.dump');
        }
        /**
         * Get mediasoup-worker process resource usage.
         */
        async getResourceUsage() {
            logger.debug('getResourceUsage()');
            return this._channel.request('worker.getResourceUsage');
        }
        /**
         * Update settings.
         */
        async updateSettings({ logLevel, logTags } = {}) {
            logger.debug('updateSettings()');
            const reqData = { logLevel, logTags };
            await this._channel.request('worker.updateSettings', undefined, reqData);
        }
        /**
         * Create a Router.
         */
        async createRouter({ mediaCodecs, appData = {} } = {}) {
            logger.debug('createRouter()');
            if (appData && typeof appData !== 'object')
                throw new TypeError('if given, appData must be an object');
            // This may throw.
            const rtpCapabilities = ortc.generateRouterRtpCapabilities(mediaCodecs);
            const internal = { routerId: uuid_1.v4() };
            await this._channel.request('worker.createRouter', internal);
            const data = { rtpCapabilities };
            const router = new Router_1.Router({
                internal,
                data,
                channel: this._channel,
                payloadChannel: this._payloadChannel,
                appData
            });
            this._routers.add(router);
            router.on('@close', () => this._routers.delete(router));
            // Emit observer event.
            this._observer.safeEmit('newrouter', router);
            return router;
        }
    }
    exports.Worker = Worker;

    (五)分析runHttpsServer方法,启动https服务

    /**
     * Create a Node.js HTTPS server. It listens in the IP and port given in the
     * configuration file and reuses the Express application as request listener.
     */
    async function runHttpsServer()
    {
        logger.info('running an HTTPS server...');
    
        // HTTPS server for the protoo WebSocket server.
        const tls =
        {
            cert : fs.readFileSync(config.https.tls.cert),
            key  : fs.readFileSync(config.https.tls.key)
        };
    
        httpsServer = https.createServer(tls, expressApp);    //传入证书和expressApp,创建https服务,存放在全局变量中,在后面使用websocket时使用
    
        await new Promise((resolve) =>
        {
            httpsServer.listen(
                Number(config.https.listenPort), config.https.listenIp, resolve);  //进行监听端口
        });
    }

    (六)分析runProtooWebSocketServer方法,用于处理接受发送信令

    /**
     * Create a protoo WebSocketServer to allow WebSocket connections from browsers.
     */
    async function runProtooWebSocketServer()
    {
        logger.info('running protoo WebSocketServer...');
    
        // Create the protoo WebSocket server.
        protooWebSocketServer = new protoo.WebSocketServer(httpsServer,  //创建websocket对象,依赖于前面创建的httpsServer
            {
                maxReceivedFrameSize     : 960000, // 960 KBytes.
                maxReceivedMessageSize   : 960000,
                fragmentOutgoingMessages : true,
                fragmentationThreshold   : 960000
            });
    
        // Handle connections from clients.
        protooWebSocketServer.on('connectionrequest', (info, accept, reject) =>  //侦听connectionrequest事件,处理请求
        {
            // The client indicates the roomId and peerId in the URL query.
            const u = url.parse(info.request.url, true);
            const roomId = u.query['roomId'];              //请求参数包含roomid
            const peerId = u.query['peerId'];              //用户id
    
            if (!roomId || !peerId)
            {
                reject(400, 'Connection request without roomId and/or peerId');
    
                return;
            }
    
            logger.info(
                'protoo connection request [roomId:%s, peerId:%s, address:%s, origin:%s]',
                roomId, peerId, info.socket.remoteAddress, info.origin);
    
            // Serialize this code into the queue to avoid that two peers connecting at
            // the same time with the same roomId create two separate rooms with same
            // roomId.
            queue.push(async () =>            //放入同步队列,防止冲突
            {
                const room = await getOrCreateRoom({ roomId });  //如果是第一个用户,则创建房间,不然就加入房间
    
                // Accept the protoo WebSocket connection.
                const protooWebSocketTransport = accept();
    
                room.handleProtooConnection({ peerId, protooWebSocketTransport });  //各种消息处理,后面分析Room.js会分析这个方法
            })
                .catch((error) =>
                {
                    logger.error('room creation or room joining failed:%o', error);
    
                    reject(error);
                });
        });
    }

    三:Room.js分析

    (一)Mediasoup基本概念

    Room/Router:在业务层称为Room,在C++层称为Router。

    Transport/WebRtcTransport:Transport是基类,WebRtcTransport是子类,所以还有其他Transport的子类。Transport是客户端与服务端建立连接的管理层

    Produce/Consume:生产者/消费者,每个用户本身是一个生产者,同时又是多个用户的消费者。数据传输通过Transport进行传输。通过Transport,可以将数据上传到流媒体服务器,同样流媒体服务器可以通过Transport下发数据到消费者。

    (二)Room主要逻辑

    (三)Mediasoup支持的信令

    createWebRtcTransport:建立WebRTC连接,在服务端创建一个与客户端对等的点(含有信息,比如IP、端口),有了这个点之后才能建立连接

    connectWebRtcTransport:真正的与客户端建立连接,数据可以开始传输

    setConsumerPreferedLayers:设置更喜欢的层,比如simulcast分层,选取其中最合适的分辨率传输

    requestConsumerKeyFrame:请求关键帧,避免花屏,比如一个新的用户加入视频,如果不及时请求IDR帧,那么可能获取的P、B帧无法解析,导致花屏

    如果有特殊需要,可以设置自己定义的信令!!!

    (四)代码分析

    class Room extends EventEmitter
    {
        static async create({ mediasoupWorker, roomId })
        {
            logger.info('create() [roomId:%s]', roomId);
    
            // Create a protoo Room instance.
            const protooRoom = new protoo.Room();                    //protoo是websocket库,实现两种功能,一个是实现房间管理(socket.io中也有这个概念),一个是websocket功能
    
            // Router media codecs.
            const { mediaCodecs } = config.mediasoup.routerOptions;    //获取媒体流编解码信息
    
            // Create a mediasoup Router.
            const mediasoupRouter = await mediasoupWorker.createRouter({ mediaCodecs });    //创建Router,后面在new Room的时候将protoo.Room和C++中的Router绑定在一起了
    
            // Create a mediasoup AudioLevelObserver.
            const audioLevelObserver = await mediasoupRouter.createAudioLevelObserver(        //音频音量信息
                {
                    maxEntries : 1,
                    threshold  : -80,
                    interval   : 800
                });
    
            const bot = await Bot.create({ mediasoupRouter });
    
            return new Room(    //关联信息,创建Room,调用构造函数
                {
                    roomId,
                    protooRoom,
                    mediasoupRouter,
                    audioLevelObserver,
                    bot
                });
        }
    
    
        constructor({ roomId, protooRoom, mediasoupRouter, audioLevelObserver, bot })
        {
            super();
            this.setMaxListeners(Infinity);
            this._roomId = roomId;
            this._closed = false;
            this._protooRoom = protooRoom;
    
            this._broadcasters = new Map();
    
            this._mediasoupRouter = mediasoupRouter;
            this._audioLevelObserver = audioLevelObserver;
    
            this._bot = bot;
    
            this._networkThrottled = false;
            this._handleAudioLevelObserver();    //处理音频音量事件
            global.audioLevelObserver = this._audioLevelObserver;
            global.bot = this._bot;
        }
    
    
    
        handleProtooConnection({ peerId, consume, protooWebSocketTransport })    //由server.js中runProtooWebSocketServer调用,用于处理客户端的连接
        {
            const existingPeer = this._protooRoom.getPeer(peerId);
    
            if (existingPeer)    //如果用户已经存在,则关闭,重新进入
            {
                logger.warn(
                    'handleProtooConnection() | there is already a protoo Peer with same peerId, closing it [peerId:%s]',
                    peerId);
    
                existingPeer.close();
            }
    
            let peer;
    
            try
            {
                peer = this._protooRoom.createPeer(peerId, protooWebSocketTransport);    //创建一个新的peer用户
            }
            catch (error)
            {
                logger.error('protooRoom.createPeer() failed:%o', error);
            }
    
            // Not joined after a custom protoo 'join' request is later received.
            //设置用户信息
            peer.data.consume = consume;
            peer.data.joined = false;
            peer.data.displayName = undefined;
            peer.data.device = undefined;
            peer.data.rtpCapabilities = undefined;
            peer.data.sctpCapabilities = undefined;
    
            // Have mediasoup related maps ready even before the Peer joins since we
            // allow creating Transports before joining.
            peer.data.transports = new Map();
            peer.data.producers = new Map();
            peer.data.consumers = new Map();
            peer.data.dataProducers = new Map();
            peer.data.dataConsumers = new Map();
    
            peer.on('request', (request, accept, reject) =>        //监听request信令
            {
                logger.debug(
                    'protoo Peer "request" event [method:%s, peerId:%s]',
                    request.method, peer.id);
    
                this._handleProtooRequest(peer, request, accept, reject)    //调用私有方法,处理请求。内部实现状态机switch...case...处理各种状态(信令,来自于request.method)
                    .catch((error) =>
                    {
                        logger.error('request failed:%o', error);
    
                        reject(error);
                    });
            });
    
            peer.on('close', () =>
            {
                if (this._closed)
                    return;
    
                logger.debug('protoo Peer "close" event [peerId:%s]', peer.id);
    
                // If the Peer was joined, notify all Peers.
                if (peer.data.joined)
                {
                    for (const otherPeer of this._getJoinedPeers({ excludePeer: peer }))
                    {
                        otherPeer.notify('peerClosed', { peerId: peer.id })
                            .catch(() => {});
                    }
                }
    
                for (const transport of peer.data.transports.values())
                {
                    transport.close();
                }
    
                // If this is the latest Peer in the room, close the room.
                if (this._protooRoom.peers.length === 0)
                {
                    logger.info(
                        'last Peer in the room left, closing the room [roomId:%s]',
                        this._roomId);
    
                    this.close();
                }
            });
        }
    }
  • 相关阅读:
    c#的运算符
    modbus-poll和modbus-slave工具的学习使用——modbus协议功能码1的解析
    modbus-poll和modbus-slave工具的学习使用——环境搭建
    STM32L4R9使用HAL库调试IIC注意事项
    蓝牙透传——介绍蓝牙最简单、最常见的通讯方式
    Chapter 2 Open Book——11
    Chapter 2 Open Book——10
    Chapter 2 Open Book——9
    线程中sleep方法和wait方法有什么区别?
    你所不知道的mybatis居然也有拦截器
  • 原文地址:https://www.cnblogs.com/ssyfj/p/14847097.html
Copyright © 2011-2022 走看看