zoukankan      html  css  js  c++  java
  • Live555学习之(四)------建立RTSP连接的过程(RTSP客户端)

      Live555不仅实现了RTSP服务器端,还实现了RTSP客户端,我们通过testRTSPClient.cpp这个程序来看一下,Live555的RTSP客户端与服务器端建立RTSP连接的过程。

      首先来看一下main函数:

     1 char eventLoopWatchVariable = 0;
     2 
     3 int main(int argc, char** argv) {
     4   // Begin by setting up our usage environment:
     5   TaskScheduler* scheduler = BasicTaskScheduler::createNew();
     6   UsageEnvironment* env = BasicUsageEnvironment::createNew(*scheduler);
     7 
     8   // We need at least one "rtsp://" URL argument:
     9   if (argc < 2) {
    10     usage(*env, argv[0]);
    11     return 1;
    12   }
    13 
    14   // There are argc-1 URLs: argv[1] through argv[argc-1].  Open and start streaming each one:
    15   for (int i = 1; i <= argc-1; ++i) {
    16     openURL(*env, argv[0], argv[i]);
    17   }
    18 
    19   // All subsequent activity takes place within the event loop:
    20   env->taskScheduler().doEventLoop(&eventLoopWatchVariable);
    21     // This function call does not return, unless, at some point in time, "eventLoopWatchVariable" gets set to something non-zero.
    22 
    23   return 0;
    24 
    25   // If you choose to continue the application past this point (i.e., if you comment out the "return 0;" statement above),
    26   // and if you don't intend to do anything more with the "TaskScheduler" and "UsageEnvironment" objects,
    27   // then you can also reclaim the (small) memory used by these objects by uncommenting the following code:
    28   /*
    29     env->reclaim(); env = NULL;
    30     delete scheduler; scheduler = NULL;
    31   */
    32 }

      和testOnDeamandRTSPServer.cpp一样,首先也是创建TaskScheduler对象和UsageEnvironment对象,然后调用openURL函数去请求某个媒体资源,参数是该媒体资源的RTSP地址,最后使程序进入主循环。

     1 void openURL(UsageEnvironment& env, char const* progName, char const* rtspURL) {
     2   // Begin by creating a "RTSPClient" object.  Note that there is a separate "RTSPClient" object for each stream that we wish
     3   // to receive (even if more than stream uses the same "rtsp://" URL).
     4   RTSPClient* rtspClient = ourRTSPClient::createNew(env, rtspURL, RTSP_CLIENT_VERBOSITY_LEVEL, progName);
     5   if (rtspClient == NULL) {
     6     env << "Failed to create a RTSP client for URL "" << rtspURL << "": " << env.getResultMsg() << "
    ";
     7     return;
     8   }
     9 
    10   ++rtspClientCount;
    11 
    12   // Next, send a RTSP "DESCRIBE" command, to get a SDP description for the stream.
    13   // Note that this command - like all RTSP commands - is sent asynchronously; we do not block, waiting for a response.
    14   // Instead, the following function call returns immediately, and we handle the RTSP response later, from within the event loop:
    15   rtspClient->sendDescribeCommand(continueAfterDESCRIBE); //发送DESCRIBE命令,并传入回调函数
    16 }

      OpenURL函数很简单,创建一个RTSPClient对象,一个RTSPClient对象代表一个RTSP客户端。然后调用sendDescribeCommand函数发送DESCRIBE命令,回调函数是continueAfterDESCRIBE函数,在收到RTSP服务器端对DESCRIBE命令的回复时调用。

     1 void continueAfterDESCRIBE(RTSPClient* rtspClient, int resultCode, char* resultString) {
     2   do {
     3     UsageEnvironment& env = rtspClient->envir(); // alias
     4     StreamClientState& scs = ((ourRTSPClient*)rtspClient)->scs; // alias
     5 
     6     if (resultCode != 0) {  // 返回结果码非0表示出错
     7       env << *rtspClient << "Failed to get a SDP description: " << resultString << "
    ";
     8       delete[] resultString;
     9       break;
    10     }
    11     // resultString即从服务器端返回的SDP信息字符串
    12     char* const sdpDescription = resultString;
    13     env << *rtspClient << "Got a SDP description:
    " << sdpDescription << "
    ";
    14 
    15     // Create a media session object from this SDP description:
    16     scs.session = MediaSession::createNew(env, sdpDescription);   //根据SDP信息创建一个MediaSession对象
    17     delete[] sdpDescription; // because we don't need it anymore
    18     if (scs.session == NULL) {
    19       env << *rtspClient << "Failed to create a MediaSession object from the SDP description: " << env.getResultMsg() << "
    ";
    20       break;
    21     } else if (!scs.session->hasSubsessions()) {
    22       env << *rtspClient << "This session has no media subsessions (i.e., no "m=" lines)
    ";
    23       break;
    24     }
    25 
    26     // Then, create and set up our data source objects for the session.  We do this by iterating over the session's 'subsessions',
    27     // calling "MediaSubsession::initiate()", and then sending a RTSP "SETUP" command, on each one.
    28     // (Each 'subsession' will have its own data source.)
    29     scs.iter = new MediaSubsessionIterator(*scs.session);
    30     setupNextSubsession(rtspClient);        //开始对服务器端的每个ServerMediaSubsession发送SETUP命令请求建立连接
    31     return;
    32   } while (0);
    33 
    34   // An unrecoverable error occurred with this stream.
    35   shutdownStream(rtspClient);
    36 }

      客户端收到服务器端对DESCRIBE命令的回复,取得SDP信息后,客户端创建一个MediaSession对象。MediaSession和ServerMediaSession是相对应的概念,MediaSession表示客户端请求服务器端某个媒体资源的会话,类似地,客户端还存在与ServerMediaSubsession相对应的MediaSubsession,表示MediaSession的子会话,创建MediaSession的同时也创建了包含的MediaSubsession对象。然后客户端对服务器端的每个ServerMediaSubsession发送SETUP命令请求建立连接。

     1 void setupNextSubsession(RTSPClient* rtspClient) {
     2   UsageEnvironment& env = rtspClient->envir(); // alias
     3   StreamClientState& scs = ((ourRTSPClient*)rtspClient)->scs; // alias
     4   
     5   scs.subsession = scs.iter->next();
     6   if (scs.subsession != NULL) {
     7     if (!scs.subsession->initiate()) {  // 调用initiate函数初始化MediaSubsession对象
     8       env << *rtspClient << "Failed to initiate the "" << *scs.subsession << "" subsession: " << env.getResultMsg() << "
    ";
     9       setupNextSubsession(rtspClient); // give up on this subsession; go to the next one
    10     } else {
    11       env << *rtspClient << "Initiated the "" << *scs.subsession << "" subsession (";
    12       if (scs.subsession->rtcpIsMuxed()) {
    13     env << "client port " << scs.subsession->clientPortNum();
    14       } else {
    15     env << "client ports " << scs.subsession->clientPortNum() << "-" << scs.subsession->clientPortNum()+1;
    16       }
    17       env << ")
    ";
    18       // 发送SETUP命令
    19       // Continue setting up this subsession, by sending a RTSP "SETUP" command:
    20       rtspClient->sendSetupCommand(*scs.subsession, continueAfterSETUP, False, REQUEST_STREAMING_OVER_TCP);
    21     }
    22     return;
    23   }
    24   // 成功与所有的ServerMediaSubsession建立了连接,现在发送PLAY命令
    25   // We've finished setting up all of the subsessions.  Now, send a RTSP "PLAY" command to start the streaming:
    26   if (scs.session->absStartTime() != NULL) {
    27     // Special case: The stream is indexed by 'absolute' time, so send an appropriate "PLAY" command:
    28     rtspClient->sendPlayCommand(*scs.session, continueAfterPLAY, scs.session->absStartTime(), scs.session->absEndTime());
    29   } else {
    30     scs.duration = scs.session->playEndTime() - scs.session->playStartTime();
    31     rtspClient->sendPlayCommand(*scs.session, continueAfterPLAY);
    32   }
    33 }
    34 
    35 void continueAfterSETUP(RTSPClient* rtspClient, int resultCode, char* resultString) {
    36   do {
    37     UsageEnvironment& env = rtspClient->envir(); // alias
    38     StreamClientState& scs = ((ourRTSPClient*)rtspClient)->scs; // alias
    39 
    40     if (resultCode != 0) {
    41       env << *rtspClient << "Failed to set up the "" << *scs.subsession << "" subsession: " << resultString << "
    ";
    42       break;
    43     }
    44 
    45     env << *rtspClient << "Set up the "" << *scs.subsession << "" subsession (";
    46     if (scs.subsession->rtcpIsMuxed()) {
    47       env << "client port " << scs.subsession->clientPortNum();
    48     } else {
    49       env << "client ports " << scs.subsession->clientPortNum() << "-" << scs.subsession->clientPortNum()+1;
    50     }
    51     env << ")
    ";
    52 
    53     // Having successfully setup the subsession, create a data sink for it, and call "startPlaying()" on it.
    54     // (This will prepare the data sink to receive data; the actual flow of data from the client won't start happening until later,
    55     // after we've sent a RTSP "PLAY" command.)
    56     //对每个MediaSubsession创建一个MediaSink对象来请求和保存数据
    57     scs.subsession->sink = DummySink::createNew(env, *scs.subsession, rtspClient->url());
    58       // perhaps use your own custom "MediaSink" subclass instead
    59     if (scs.subsession->sink == NULL) {
    60       env << *rtspClient << "Failed to create a data sink for the "" << *scs.subsession
    61       << "" subsession: " << env.getResultMsg() << "
    ";
    62       break;
    63     }
    64 
    65     env << *rtspClient << "Created a data sink for the "" << *scs.subsession << "" subsession
    ";
    66     scs.subsession->miscPtr = rtspClient; // a hack to let subsession handle functions get the "RTSPClient" from the subsession 
    67     scs.subsession->sink->startPlaying(*(scs.subsession->readSource()),
    68                        subsessionAfterPlaying, scs.subsession);          // 调用MediaSink的startPlaying函数准备播放
    69     // Also set a handler to be called if a RTCP "BYE" arrives for this subsession:
    70     if (scs.subsession->rtcpInstance() != NULL) {
    71       scs.subsession->rtcpInstance()->setByeHandler(subsessionByeHandler, scs.subsession);
    72     }
    73   } while (0);
    74   delete[] resultString;
    75 
    76   // Set up the next subsession, if any:  与下一个ServerMediaSubsession建立连接
    77   setupNextSubsession(rtspClient);
    78 }

      setupNextSubsession函数中首先调用MediaSubsession的initiate函数初始化MediaSubsession,然后对ServerMediaSubsession发送SETUP命令,收到回复后回调continueAfterSETUP函数。在continueAfterSETUP函数中,为MediaSubsession创建MediaSink对象来请求和保存服务器端发送的数据,然后调用MediaSink::startPlaying函数开始准备播放对应的ServerMediaSubsession,最后调用setupNextSubsession函数与下一个ServerMediaSubsession建立连接,在setupNextSubsession函数中,会检查是否与所有的ServerMediaSubsession都建立了连接,是则发送PLAY命令请求开始传送数据,收到回复则调用continueAfterPLAY函数。

      在客户端发送PLAY命令之前,我们先看一下MediaSubsession::initiate函数的内容:

      1 Boolean MediaSubsession::initiate(int useSpecialRTPoffset) {
      2   if (fReadSource != NULL) return True; // has already been initiated
      3 
      4   do {
      5     if (fCodecName == NULL) {
      6       env().setResultMsg("Codec is unspecified");
      7       break;
      8     }
      9     //创建客户端socket,包括RTP socket和RTCP socket,准备从服务器端接收数据
     10     // Create RTP and RTCP 'Groupsocks' on which to receive incoming data.
     11     // (Groupsocks will work even for unicast addresses)
     12     struct in_addr tempAddr;
     13     tempAddr.s_addr = connectionEndpointAddress();
     14         // This could get changed later, as a result of a RTSP "SETUP"
     15     //使用指定的RTP端口和RTCP端口,RTP端口必须是偶数,而RTCP端口必须是(RTP端口+1)
     16     if (fClientPortNum != 0 && (honorSDPPortChoice || IsMulticastAddress(tempAddr.s_addr))) {
     17       // The sockets' port numbers were specified for us.  Use these:
     18       Boolean const protocolIsRTP = strcmp(fProtocolName, "RTP") == 0;
     19       if (protocolIsRTP && !fMultiplexRTCPWithRTP) {
     20     fClientPortNum = fClientPortNum&~1;
     21         // use an even-numbered port for RTP, and the next (odd-numbered) port for RTCP
     22       }
     23       if (isSSM()) {
     24     fRTPSocket = new Groupsock(env(), tempAddr, fSourceFilterAddr, fClientPortNum);
     25       } else {
     26     fRTPSocket = new Groupsock(env(), tempAddr, fClientPortNum, 255);
     27       }
     28       if (fRTPSocket == NULL) {
     29     env().setResultMsg("Failed to create RTP socket");
     30     break;
     31       }
     32       
     33       if (protocolIsRTP) {
     34     if (fMultiplexRTCPWithRTP) {
     35       // Use the RTP 'groupsock' object for RTCP as well:
     36       fRTCPSocket = fRTPSocket;
     37     } else {
     38       // Set our RTCP port to be the RTP port + 1:
     39       portNumBits const rtcpPortNum = fClientPortNum|1;
     40       if (isSSM()) {
     41         fRTCPSocket = new Groupsock(env(), tempAddr, fSourceFilterAddr, rtcpPortNum);
     42       } else {
     43         fRTCPSocket = new Groupsock(env(), tempAddr, rtcpPortNum, 255);
     44       }
     45     }
     46       }
     47     } else {
    // 选取随机的RTP端口和RTCP端口
    48 // Port numbers were not specified in advance, so we use ephemeral port numbers. 49 // Create sockets until we get a port-number pair (even: RTP; even+1: RTCP). 50 // (However, if we're multiplexing RTCP with RTP, then we create only one socket, 51 // and the port number can be even or odd.) 52 // We need to make sure that we don't keep trying to use the same bad port numbers over 53 // and over again, so we store bad sockets in a table, and delete them all when we're done. 54 HashTable* socketHashTable = HashTable::create(ONE_WORD_HASH_KEYS); 55 if (socketHashTable == NULL) break; 56 Boolean success = False; 57 NoReuse dummy(env()); 58 // ensures that our new ephemeral port number won't be one that's already in use 59 60 while (1) { 61 // Create a new socket: 62 if (isSSM()) { 63 fRTPSocket = new Groupsock(env(), tempAddr, fSourceFilterAddr, 0); 64 } else { 65 fRTPSocket = new Groupsock(env(), tempAddr, 0, 255); 66 } 67 if (fRTPSocket == NULL) { 68 env().setResultMsg("MediaSession::initiate(): unable to create RTP and RTCP sockets"); 69 break; 70 } 71 72 // Get the client port number: 73 Port clientPort(0); 74 if (!getSourcePort(env(), fRTPSocket->socketNum(), clientPort)) { 75 break; 76 } 77 fClientPortNum = ntohs(clientPort.num()); 78 79 if (fMultiplexRTCPWithRTP) { 80 // Use this RTP 'groupsock' object for RTCP as well: 81 fRTCPSocket = fRTPSocket; 82 success = True; 83 break; 84 } 85 86 // To be usable for RTP, the client port number must be even: 87 if ((fClientPortNum&1) != 0) { // it's odd 88 // Record this socket in our table, and keep trying: 89 unsigned key = (unsigned)fClientPortNum; 90 Groupsock* existing = (Groupsock*)socketHashTable->Add((char const*)key, fRTPSocket); 91 delete existing; // in case it wasn't NULL 92 continue; 93 } 94 95 // Make sure we can use the next (i.e., odd) port number, for RTCP: 96 portNumBits rtcpPortNum = fClientPortNum|1; 97 if (isSSM()) { 98 fRTCPSocket = new Groupsock(env(), tempAddr, fSourceFilterAddr, rtcpPortNum); 99 } else { 100 fRTCPSocket = new Groupsock(env(), tempAddr, rtcpPortNum, 255); 101 } 102 if (fRTCPSocket != NULL && fRTCPSocket->socketNum() >= 0) { 103 // Success! Use these two sockets. 104 success = True; 105 break; 106 } else { 107 // We couldn't create the RTCP socket (perhaps that port number's already in use elsewhere?). 108 delete fRTCPSocket; fRTCPSocket = NULL; 109 110 // Record the first socket in our table, and keep trying: 111 unsigned key = (unsigned)fClientPortNum; 112 Groupsock* existing = (Groupsock*)socketHashTable->Add((char const*)key, fRTPSocket); 113 delete existing; // in case it wasn't NULL 114 continue; 115 } 116 } 117 118 // Clean up the socket hash table (and contents): 119 Groupsock* oldGS; 120 while ((oldGS = (Groupsock*)socketHashTable->RemoveNext()) != NULL) { 121 delete oldGS; 122 } 123 delete socketHashTable; 124 125 if (!success) break; // a fatal error occurred trying to create the RTP and RTCP sockets; we can't continue 126 } 127 128 // Try to use a big receive buffer for RTP - at least 0.1 second of 129 // specified bandwidth and at least 50 KB 130 unsigned rtpBufSize = fBandwidth * 25 / 2; // 1 kbps * 0.1 s = 12.5 bytes 131 if (rtpBufSize < 50 * 1024) 132 rtpBufSize = 50 * 1024; 133 increaseReceiveBufferTo(env(), fRTPSocket->socketNum(), rtpBufSize); 134 135 if (isSSM() && fRTCPSocket != NULL) { 136 // Special case for RTCP SSM: Send RTCP packets back to the source via unicast: 137 fRTCPSocket->changeDestinationParameters(fSourceFilterAddr,0,~0); 138 } 139 //创建FramedSource对象来请求数据 140 // Create "fRTPSource" and "fReadSource": 141 if (!createSourceObjects(useSpecialRTPoffset)) break; 142 143 if (fReadSource == NULL) { 144 env().setResultMsg("Failed to create read source"); 145 break; 146 } 147 // 创建RTCPInstance对象 148 // Finally, create our RTCP instance. (It starts running automatically) 149 if (fRTPSource != NULL && fRTCPSocket != NULL) { 150 // If bandwidth is specified, use it and add 5% for RTCP overhead. 151 // Otherwise make a guess at 500 kbps. 152 unsigned totSessionBandwidth 153 = fBandwidth ? fBandwidth + fBandwidth / 20 : 500; 154 fRTCPInstance = RTCPInstance::createNew(env(), fRTCPSocket, 155 totSessionBandwidth, 156 (unsigned char const*) 157 fParent.CNAME(), 158 NULL /* we're a client */, 159 fRTPSource); 160 if (fRTCPInstance == NULL) { 161 env().setResultMsg("Failed to create RTCP instance"); 162 break; 163 } 164 } 165 166 return True; 167 } while (0); 168 169 deInitiate(); 170 fClientPortNum = 0; 171 return False; 172 }

      在MediaSubsession::initiate函数中,首先创建了两个客户端socket分别用于接收RTP数据和RTCP数据;然后创建FramedSource对象用来从服务器端请求数据,FramedSource对象在createSourceObjects函数中被创建,createSourceObjects根据ServerMediaSubsession资源的不同格式创建不同的FramedSource,我们还是以H264视频为例,则创建的是H264VideoRTPSource对象;最后还创建了RTCPInstance对象。

      接下来,我们继续看客户端收到PLAY命令回复后调用continueAfterPLAY函数:

     1 void continueAfterPLAY(RTSPClient* rtspClient, int resultCode, char* resultString) {
     2   Boolean success = False;
     3 
     4   do {
     5     UsageEnvironment& env = rtspClient->envir(); // alias
     6     StreamClientState& scs = ((ourRTSPClient*)rtspClient)->scs; // alias
     7 
     8     if (resultCode != 0) {
     9       env << *rtspClient << "Failed to start playing session: " << resultString << "
    ";
    10       break;
    11     }
    12 
    13     // Set a timer to be handled at the end of the stream's expected duration (if the stream does not already signal its end
    14     // using a RTCP "BYE").  This is optional.  If, instead, you want to keep the stream active - e.g., so you can later
    15     // 'seek' back within it and do another RTSP "PLAY" - then you can omit this code.
    16     // (Alternatively, if you don't want to receive the entire stream, you could set this timer for some shorter value.)
    17     if (scs.duration > 0) {
    18       unsigned const delaySlop = 2; // number of seconds extra to delay, after the stream's expected duration.  (This is optional.)
    19       scs.duration += delaySlop;
    20       unsigned uSecsToDelay = (unsigned)(scs.duration*1000000);
    21       scs.streamTimerTask = env.taskScheduler().scheduleDelayedTask(uSecsToDelay, (TaskFunc*)streamTimerHandler, rtspClient);
    22     }
    23 
    24     env << *rtspClient << "Started playing session";
    25     if (scs.duration > 0) {
    26       env << " (for up to " << scs.duration << " seconds)";
    27     }
    28     env << "...
    ";
    29 
    30     success = True;
    31   } while (0);
    32   delete[] resultString;
    33 
    34   if (!success) {
    35     // An unrecoverable error occurred with this stream.
    36     shutdownStream(rtspClient);
    37   }
    38 }

      continueAfterPLAY函数的内容很简单,只是简单地打印出“Started  playing  session”。在服务器端收到PLAY命令后,就开始向客户端发送RTP数据包和RTCP数据包,而客户端在MediaSink::startPlaying函数中就开始等待接收来自服务器端的视频数据。

      在continueAfterSETUP函数中创建的MediaSink是DummySink对象,DummySink是MediaSink的子类,这个例子中客户端没有利用收到的视频数据,所以叫做DummySink。

      客户端调用MediaSink::startPlaying函数开始接收服务器端的数据,这个函数和之前介绍服务器端建立RTSP连接过程时是同一个函数

     1 Boolean MediaSink::startPlaying(MediaSource& source,
     2                 afterPlayingFunc* afterFunc,
     3                 void* afterClientData) {
     4   // Make sure we're not already being played:
     5   if (fSource != NULL) {
     6     envir().setResultMsg("This sink is already being played");
     7     return False;
     8   }
     9 
    10   // Make sure our source is compatible:
    11   if (!sourceIsCompatibleWithUs(source)) {
    12     envir().setResultMsg("MediaSink::startPlaying(): source is not compatible!");
    13     return False;
    14   }
    15   fSource = (FramedSource*)&source;       //此处的fSource是之前创立的H264VideoRTPSource对象
    16 
    17   fAfterFunc = afterFunc;
    18   fAfterClientData = afterClientData;
    19   return continuePlaying();
    20 }

     在MediaSink::startPlaying函数中又调用DummySink::continuePlaying函数

    1 Boolean DummySink::continuePlaying() {
    2   if (fSource == NULL) return False; // sanity check (should not happen)
    3 
    4   // Request the next frame of data from our input source.  "afterGettingFrame()" will get called later, when it arrives:
    5   fSource->getNextFrame(fReceiveBuffer, DUMMY_SINK_RECEIVE_BUFFER_SIZE,
    6                         afterGettingFrame, this,
    7                         onSourceClosure, this);
    8   return True;
    9 }

      在DummySink::continuePlaying函数中通过H264VideoRTPSource对象请求服务器端的数据,H264VideoRTPSource是MultiFramedRTPSource的子类,请求成功后回调DummySink::afterGettingFrame函数。在FramedSource::getNextFrame函数中,调用了MultiFramedRTPSource::doGetNextFrame函数:

     1 void MultiFramedRTPSource::doGetNextFrame() {
     2   if (!fAreDoingNetworkReads) {
     3     // Turn on background read handling of incoming packets:
     4     fAreDoingNetworkReads = True;
     5     TaskScheduler::BackgroundHandlerProc* handler
     6       = (TaskScheduler::BackgroundHandlerProc*)&networkReadHandler;
     7     fRTPInterface.startNetworkReading(handler);  //通过RTPInterface对象读取网络数据,在服务器端是通过RTPInterface对象发送网络数据
        //读到数据后回调networkReadHandler函数来处理
    8 } 9 10 fSavedTo = fTo;            //读到的数据保存在fTo中 11 fSavedMaxSize = fMaxSize; 12 fFrameSize = 0; // for now 13 fNeedDelivery = True; 14 doGetNextFrame1(); 15 } 16 17 void MultiFramedRTPSource::doGetNextFrame1() { 18 while (fNeedDelivery) {                          19 // If we already have packet data available, then deliver it now. 20 Boolean packetLossPrecededThis; 21 BufferedPacket* nextPacket 22 = fReorderingBuffer->getNextCompletedPacket(packetLossPrecededThis); 23 if (nextPacket == NULL) break; 24 25 fNeedDelivery = False; 26 27 if (nextPacket->useCount() == 0) { 28 // Before using the packet, check whether it has a special header 29 // that needs to be processed: 30 unsigned specialHeaderSize; 31 if (!processSpecialHeader(nextPacket, specialHeaderSize)) { 32 // Something's wrong with the header; reject the packet: 33 fReorderingBuffer->releaseUsedPacket(nextPacket); 34 fNeedDelivery = True; 35 break; 36 } 37 nextPacket->skip(specialHeaderSize); 38 } 39 40 // Check whether we're part of a multi-packet frame, and whether 41 // there was packet loss that would render this packet unusable: 42 if (fCurrentPacketBeginsFrame) { 43 if (packetLossPrecededThis || fPacketLossInFragmentedFrame) { 44 // We didn't get all of the previous frame. 45 // Forget any data that we used from it: 46 fTo = fSavedTo; fMaxSize = fSavedMaxSize; 47 fFrameSize = 0; 48 } 49 fPacketLossInFragmentedFrame = False; 50 } else if (packetLossPrecededThis) { 51 // We're in a multi-packet frame, with preceding packet loss 52 fPacketLossInFragmentedFrame = True; 53 } 54 if (fPacketLossInFragmentedFrame) { 55 // This packet is unusable; reject it: 56 fReorderingBuffer->releaseUsedPacket(nextPacket); 57 fNeedDelivery = True; 58 break; 59 } 60 61 // The packet is usable. Deliver all or part of it to our caller: 62 unsigned frameSize; 63 nextPacket->use(fTo, fMaxSize, frameSize, fNumTruncatedBytes, 64 fCurPacketRTPSeqNum, fCurPacketRTPTimestamp, 65 fPresentationTime, fCurPacketHasBeenSynchronizedUsingRTCP, 66 fCurPacketMarkerBit); 67 fFrameSize += frameSize; 68 69 if (!nextPacket->hasUsableData()) { 70 // We're completely done with this packet now 71 fReorderingBuffer->releaseUsedPacket(nextPacket); 72 } 73 74 if (fCurrentPacketCompletesFrame) { // 成功读到一帧数据 75 // We have all the data that the client wants. 76 if (fNumTruncatedBytes > 0) { 77 envir() << "MultiFramedRTPSource::doGetNextFrame1(): The total received frame size exceeds the client's buffer size (" 78 << fSavedMaxSize << "). " 79 << fNumTruncatedBytes << " bytes of trailing data will be dropped! "; 80 } 81 // Call our own 'after getting' function, so that the downstream object can consume the data: 82 if (fReorderingBuffer->isEmpty()) { 83 // Common case optimization: There are no more queued incoming packets, so this code will not get 84 // executed again without having first returned to the event loop. Call our 'after getting' function 85 // directly, because there's no risk of a long chain of recursion (and thus stack overflow): 86 afterGetting(this); 87 } else { 88 // Special case: Call our 'after getting' function via the event loop. 89 nextTask() = envir().taskScheduler().scheduleDelayedTask(0, 90 (TaskFunc*)FramedSource::afterGetting, this); 91 } 92 } else { 93 // This packet contained fragmented data, and does not complete 94 // the data that the client wants. Keep getting data: 95 fTo += frameSize; fMaxSize -= frameSize; 96 fNeedDelivery = True; 97 } 98 } 99 }

       在doGetNextFrame1函数中,若成功读取到一个完整的帧,则调用Framed::afterGetting函数,进一步回调了DummySink::afterGettingFrame函数

     1 void DummySink::afterGettingFrame(void* clientData, unsigned frameSize, unsigned numTruncatedBytes,
     2                   struct timeval presentationTime, unsigned durationInMicroseconds) {
     3   DummySink* sink = (DummySink*)clientData;
     4   sink->afterGettingFrame(frameSize, numTruncatedBytes, presentationTime, durationInMicroseconds);
     5 }
     6 
     7 // If you don't want to see debugging output for each received frame, then comment out the following line:
     8 #define DEBUG_PRINT_EACH_RECEIVED_FRAME 1
     9 
    10 void DummySink::afterGettingFrame(unsigned frameSize, unsigned numTruncatedBytes,
    11                   struct timeval presentationTime, unsigned /*durationInMicroseconds*/) {
    12   // We've just received a frame of data.  (Optionally) print out information about it:
    13 #ifdef DEBUG_PRINT_EACH_RECEIVED_FRAME
    14   if (fStreamId != NULL) envir() << "Stream "" << fStreamId << ""; ";
    15   envir() << fSubsession.mediumName() << "/" << fSubsession.codecName() << ":	Received " << frameSize << " bytes";
    16   if (numTruncatedBytes > 0) envir() << " (with " << numTruncatedBytes << " bytes truncated)";
    17   char uSecsStr[6+1]; // used to output the 'microseconds' part of the presentation time
    18   sprintf(uSecsStr, "%06u", (unsigned)presentationTime.tv_usec);
    19   envir() << ".	Presentation time: " << (int)presentationTime.tv_sec << "." << uSecsStr;
    20   if (fSubsession.rtpSource() != NULL && !fSubsession.rtpSource()->hasBeenSynchronizedUsingRTCP()) {
    21     envir() << "!"; // mark the debugging output to indicate that this presentation time is not RTCP-synchronized
    22   }
    23 #ifdef DEBUG_PRINT_NPT
    24   envir() << "	NPT: " << fSubsession.getNormalPlayTime(presentationTime);
    25 #endif
    26   envir() << "
    ";
    27 #endif
    28   
    29   // Then continue, to request the next frame of data:
    30   continuePlaying();
    31 }

      在DummySink::afterGettingFrame函数中只是简单地打印出了某个MediaSubsession接收到了多少字节的数据,然后接着利用FramedSource去读取数据。可以看出,在RTSP客户端,Live555也是在MediaSink和FramedSource之间形成了一个循环,不停地从服务器端读取数据。

  • 相关阅读:
    用 C# 获取 IE 临时文件(转)
    vs2008打包程序需要.net3.5支持问题的解决方案
    关于使用ssh账号上外网
    元数据管理技术及发展应用现状
    一个拨号上网的批处理文件
    windows下启动和关闭oracle数据库的bat脚本
    Solaris下配置网络
    开启windows 2000 server上的远程桌面
    FileZilla客户端使用TIPs
    学习使用gvim
  • 原文地址:https://www.cnblogs.com/jqctop1/p/4396301.html
Copyright © 2011-2022 走看看