zoukankan      html  css  js  c++  java
  • libjingle的thread机制

         本地调试发现当指明服务端ip地址而非域名时,Win32SocketServer::Pump被执行了多次,但是message_queue_->Dispatch(&msg);函数则被执行了一次,即响应VideoCapturer::OnMessage函数中响应MSG_STATE_CHANGE消息。VideoCapturer类的构造函数总共被执行了两次,一次代表的是本地端,在构造函数中将类成员变量talk_base::Thread* thread_;赋值为主线程;另一次代表的是远端,成员变量thread_;被赋值为PeerConnectionFactory类中创建的signaling_thread_,这是因为是在该线程的消息处理中完成的(因为调用Conductor::OnMessageFromPeer函数中调用PeerConnectionProxy::SetRemoteDescription函数,而PeerConnectionProxy会通过其封装的talk_base::Thread* signaling_thread_线程来执行PeerConnection::SetRemoteDescription函数,即在该线程中执行)。具体就是conductor.cc文件的Conductor::OnMessageFromPeer函数(主线程运行)中,调用peer_connection_->SetRemoteDescription,而peer_connection_则是Conductor::InitializePeerConnection函数中调用PeerConnectionFactory::CreatePeerConnection函数创建的PeerConnectionProxy对象,该对象封装的线程就是PeerConnectionFactory::CreatePeerConnection_s函数中设定的signaling_thread。所以在主线程调用peer_connection_->SetRemoteDescription,由于代理的作用(具体看proxy.h文件),调用这个函数就是用signaling_thread发送了一个消息,并对该消息绑定了其实现,即代理类所关联的PeerConnection类的SetRemoteDescription方法,这样线程的消息响应最终会调用PeerConnection::SetRemoteDescription。这个函数最终会第二次调用VideoCapturer的构造函数,所以第二次赋值给VideoCapturer的成员变量thread_就是PeerConnectionFactory类中的signaling_thread。

          类PhysicalSocketServer和类Win32SocketServer都继承自SocketServer类,在该类中定义了Wait和WakeUp方法,一个是用来等待消息的,另一个则是唤醒等待对消息进行响应。以Win32SocketServer为例,在函数Win32SocketServer::Wait中调用GetMessage等待消息,由于"GetMessage不接收属于其他线程或应用程序的消息。获取消息成功后,线程将从消息队列中删除该消息。函数会一直等待直到有消息到来才有返回值。",所以会一直等待直到有消息时该函数返回,外层函数Wait返回。

          主线程的PeerConnectionFactory::Initialize函数(trunk alkappwebrtcpeerconnectionfactory.cc文件)中会调用signaling_thread_->Send发送MSG_INIT_FACTORY消息。libjingle中的talk_base::Thread类,运行peerconnection,会调用六次构造函数Thread::Thread,其中的两次为解析dns地址,一个是解析localhost,另一个是解析stun的域名。另外四个则是main函数中的继承自Thread类的talk_base::Win32Thread对象,以及获取ThreadManager的单例对象时调用其构造函数ThreadManager::ThreadManager,然后在该构造函数中创建了一个Thread对象。另外两个则是PeerConnectionFactory类中定义的两个变量:talk_base::Thread* signaling_thread_;和talk_base::Thread* worker_thread_;。其中Win32Thread对象在main函数中通过调用ThreadManager::SetCurrentThread被设置成主线程。注意ThreadManager类中的TlsGetValue和TlsSetValue这一组函数用到了线程的局部存储。

          事实上PeerConnectionFactory类中的两个线程变量会传递到底层的某些类中,如果这些类需要发送消息,直接调用Send或者Post即可。以PeerConnectionFactory::Initialize函数中的signaling_thread_->Send(this, MSG_INIT_FACTORY, &result);为例子,当主线程执行了上面这个语句后,主线程会一直阻塞直到MSG_INIT_FACTORY消息的响应,即在signaling_thread_线程的消息循环中响应MSG_INIT_FACTORY。在这个消息的响应中,又会调用ChannelManager::Init进而调用ChannelManager::SetAudioOptions函数,该函数中通过talk_base::Thread::Invoke来调用worker_thread_->Send。

          以Thread::Send和Win32SocketServer::WakeUp两个函数为入口点来理解talk_base::Thread的工作机制。先看Thread::Send函数,以signaling_thread_->Send(this, MSG_INIT_FACTORY, &result);发送消息为例,通过IsCurrent来判断如果是当前这个线程则直接处理,比如在signaling_thread_线程的消息响应函数中又调用signaling_thread_->Send,则会直接就进行消息处理的。否则在发送MSG_INIT_FACTORY消息时,会调用Thread::Current()获取到主线程赋值给current_thread,可以把current_thread理解成父线程。然后将MSG_INIT_FACTORY消息放到signaling_thread_的成员变量std::list<_SendMessage> sendlist_中,由于在PeerConnectionFactory类的构造函数中在初始化其成员变量signaling_thread_和worker_thread_就马上调用其Start方法,这个方法最终调用Thread::ProcessMessages函数,在这个函数的while循环中进行线程的消息响应。

          接下来以主线程的消息循环为例,由于signaling_thread_和worker_thread_两个线程的父类MessageQueue的构造函数中给其变量SocketServer* ss_;赋值为PhysicalSocketServer类型,而主线程Win32Thread则在其构造函数中通过调用MessageQueue::set_socketserver函数将祖先类MessageQueue的变量SocketServer* ss_;赋值为Win32SocketServer类型。这样在talk_base::Thread的消息循环中,即Thread::ProcessMessages的while循环中,调用MessageQueue::Get来获取消息时,若获取到消息则该函数返回,若获取不到消息则调用SocketServer::Wait函数等待,基于以上分析,所以主函数中就是执行Win32SocketServer::Wait函数。

          以上关于调用Win32SocketServer::Wait的分析有点问题。事实上,由于Win32Thread继承自talk_base::Thread类,但是并没有像signaling_thread_和worker_thread_两个线程调用了Start方法,即并没有启动消息循环。Win32SocketServer::Wait被调用是在signaling_thread_->Send时,即在主线程调用这个函数发送消息时,由于主线程作为signaling_thread_的父线程,在Thread::Send函数中被调用(即current_thread->socketserver()->Wait(kForever, false)语句),而且传递的Wait的第二个参数process_io为false,而signaling_thread_和worker_thread_两个线程的消息循环中,调用MessageQueue::Get时候第三个参数为默认参数,所以调用PhysicalSocketServer::Wait时,第二个参数process_io为true。

          signaling_thread_和worker_thread_两个线程调用了Start方法,该方法调用执行线程消息处理函数Thread::ProcessMessages,在该函数中以默认参数来调用MessageQueue::Get,因为默认参数为true,所以这个函数调用的PhysicalSocketServer::Wait(int cmsWait, bool process_io) 其参数process_io为true。

          再回到Thread::Send函数中,还以signaling_thread_->Send(this, MSG_INIT_FACTORY, &result);为例子,将MSG_INIT_FACTORY消息放到signaling_thread_的成员变量std::list<_SendMessage> sendlist_中后,由前面分析,signaling_thread_可能因为在等待消息的到来而阻塞,所以需要调用ss_->WakeUp();在这里就是PhysicalSocketServer::WakeUp()函数。接下来就是用一个变量bool waited = false;来标记当前的消息是否已经被处理,如果没有处理完毕,则当前线程(即发送信息的线程的父线程,在这里就是主线程,即signaling_thread_->Send,main函数中定义的talk_base::Win32Thread w32_thread;)会一直阻塞在while循环,在signaling_thread_->Send(this, MSG_INIT_FACTORY, &result);这个例子中,就是主线程通过调用Win32SocketServer::Wait来一直阻塞自己,直到消息响应函数执行完毕返回才跳出while循环,然后主线程才可以继续执行完该函数,并继续执行signaling_thread_->Send后面的语句,所以这个Send类似于windows函数SendMessage函数。都是等待当前消息响应完毕才返回。

          在Thread::ReceiveSends函数中,通过遍历Thread类成员变量std::list<_SendMessage> sendlist_;,来对该线程的所发送的消息进行处理。而每次Send所发送的消息都是及时响应的,所以sendlist_中最多有一个代表已发送消息的元素。

          回到Thread::Send函数中,在while循环中会调用当前线程的Thread::ReceiveSends来响应已发送消息,由于主线程(即main函数里的talk_base::Win32Thread w32_thread)并未调用Send发送消息,所以主线程的signaling_thread_->Send函数体中,上面while循环调用的主线程的ReceiveSends会直接退出。而在signaling_thread_线程去响应当前的消息时,会去调用worker_thread_->Send,而在这个函数体中while循环调用父线程的ReceiveSends,由于父线程signaling_thread_的成员变量std::list<_SendMessage> sendlist_;中的消息正在被响应,所以其也为空,所以worker_thread_->Send调用到while循环中的也是直接退出。所以可以把这条语句(即Thread::Send函数中的current_thread->ReceiveSends();语句)屏蔽掉也是没有问题的。

          再看下Thread::ReceiveSends函数,在该函数中对当前线程所发送消息进行响应,并在响应完后调用父线程的WakeUp函数。由前面描述,talk_base::Thread类的std::list<_SendMessage> sendlist_;变量中要么保存了一个代表已发送消息的元素,要么为空。

          Thread::ReceiveSends函数中调用smsg.thread->socketserver()->WakeUp();smsg局部变量是从Thread类的私有变量std::list<_SendMessage> sendlist_;中取出,根据Thread::Send函数中对sendlist_保存可知Thread::ReceiveSends函数中调用smsg.thread->socketserver()->WakeUp();即是调用父线程的WakeUp函数。以signaling_thread_->Send(this, MSG_INIT_FACTORY, &result);为例子,主线程调用current_thread->socketserver()->Wait(kForever, false);等待,当signaling_thread_线程处理完MSG_INIT_FACTORY消息调用Win32SocketServer::WakeUp来唤醒主线程。

          以signaling_thread_->Send(this, MSG_INIT_FACTORY, &result);为例子,在响应MSG_INIT_FACTORY消息的signaling_thread_中,又会调用worker_thread_->Send发送消息,所以在调用worker_thread_->Send发送消息的函数体中while循环去通过current_thread->ReceiveSends();语句来调用父线程的Thread::ReceiveSends()方法,由于前面分析,signaling_thread_线程的std::list<_SendMessage> sendlist_;为空,仅有的代表已发送消息的元素的响应正在signaling_thread_线程中被执行,即调用worker_thread_->Send的signaling_thread_线程。

          总结一下,signaling_thread_和worker_thread_两个线程中的消息响应是通过Thread::ProcessMessages函数实现的,在该函数中首先调用MessageQueue::Get,这个函数中会调用虚函数Thread::ReceiveSends,在这个函数中会对当前线程Send的消息进行响应,然后MessageQueue::Get函数继续执行,获取一个Post消息或者PostDelayed消息然后返回,然后调用MessageQueue::Dispatch函数对消息进行响应,完成了talk_base::Thread的消息响应。

          所以基于以上分析,Thread::ReceiveSends函数被两处调用,其一是响应当前线程中Send的消息是被MessageQueue::Get函数中所调用(signaling_thread_和worker_thread_两个线程),其二是Thread::Send函数中通过父线程来调用Thread::ReceiveSends函数响应父线程中的Send消息,由于总是由于父线程的std::list<_SendMessage> sendlist_;为空而直接退出。

          上面是signaling_thread_和worker_thread_两个线程中的消息响应流程。而主线程talk_base::Win32Thread w32_thread;(main函数中定义的)的消息响应则是Win32SocketServer::MessageWindow::OnMessage函数中来实现的。由于Win32SocketServer::MessageWindow继承自talk_base::Win32Window类,在Win32SocketServer的构造函数中初始化其成员变量MessageWindow wnd_;,即创建窗口,由于这一过程是在主线程中进行的,相当于创建了两个窗口。所以在main函数中通过::GetMessage(&msg, NULL, 0, 0)来获取到main函数中创建的MainWnd(代表客户端的窗口)窗口消息并进行相应处理时,也能获取到Win32SocketServer类中的MessageWindow wnd_的窗口消息并执行该窗口的过程函数来响应。

          在win32socketserver.cc文件中,首先在Win32SocketServer构造函数中调用RegisterWindowMessage函数注册了一个消息,然后在Win32SocketServer::WakeUp函数中向成员变量MessageWindow wnd_;发送消息,而Win32SocketServer::Wait函数会调用GetMessage来阻塞等待该消息,这个函数的作用就是在主线程中调用signaling_thread_->Send后,在Thread::Send函数中通过current_thread->socketserver()->Wait来调用让自身(即主线程)阻塞。当本地端呼叫对端时,主线程中会多次调用signaling_thread_->Send函数,这样每一次调用Win32SocketServer::Wait和Win32SocketServer::WakeUp就一一对应起来,其中Win32SocketServer::WakeUp可能会有两次,比如在主线程中调用signaling_thread_->Send,然后在Thread::Send函数中,首先while (!ready) 循环中会调用Win32SocketServer::Wait,然后对于该消息会有两处调用Win32SocketServer::WakeUp,一处是signaling_thread_中通过调用MessageQueue::Get进而调用Thread::ReceiveSends函数,即在signaling_thread_线程中响应Send消息的函数Thread::ReceiveSends中,会调用Win32SocketServer::WakeUp;还有一处是主线程调用signaling_thread_->Send的最后,当判断消息执行完后(即waited变量为true),再次调用Win32SocketServer::WakeUp,这样当主线程下一次继续调用signaling_thread_->Send而进入while循环时,第一次调用current_thread->socketserver()->Wait会直接返回并不会阻塞,然而当前消息并未被处理,所以就跳不出while循环,主线程阻塞。而且这样前一个signaling_thread_->Send所发送的消息,其响应的最后在Thread::Send中调用的第二个Win32SocketServer::WakeUp也被消耗掉了。

          Win32SocketServer::MessageWindow::OnMessage函数中会对Win32SocketServer构造函数中注册的消息响应,该消息也会被Win32SocketServer::Wait函数中GetMessage阻塞等待,一开始对这两处并不是很理解,后来明白这两个函数执行的阶段是不同的。Win32SocketServer::Wait是响应某一个事件过程中(比如前面的客户端呼叫对端双击事件)被多次调用的,这样只要在某一个主线程的事件中调用了signaling_thread_->Send,最终都会调用Win32SocketServer::WakeUp发送一个唤醒的消息(Win32SocketServer构造函数中调用RegisterWindowMessage注册的消息)到Win32SocketServer类的MessageWindow wnd_;中,由于Win32SocketServer::MessageWindow继承自Win32Window类,而父类的窗口过程函数Win32Window::WndProc中调用OnMessage,又因为多态所以调用Win32SocketServer::MessageWindow::OnMessage,在该函数中响应消息注册的窗口消息WM_WAKEUP,即调用Win32SocketServer::Pump函数,在该函数中会对发送到主线程(主线程调用Post)的消息调用MessageQueue::Dispatch进行响应。

          所以每一次只要主线程中的某个事件调用了signaling_thread_->Send,都会由于向Win32SocketServer类的MessageWindow wnd_;发送了唤醒消息而响应Win32SocketServer::Pump()函数(这里有误,只是有机会去响应。如果主线程中继续调用signaling_thread_->Send,则会消耗掉(通过Win32SocketServer::Wait中的GetMessage(&msg, NULL, s_wm_wakeup_id, s_wm_wakeup_id);)Win32SocketServer::WakeUp函数中调用PostMessage(wnd_.handle(), s_wm_wakeup_id, 0, 0);所发送的s_wm_wakeup_id消息,main函数中的GetMessage(&msg, NULL, 0, 0)就获取不到消息,Win32SocketServer::MessageWindow::OnMessage也不会再执行ss_->Pump();),如果该函数中调用MessageQueue::Get没有获取到消息,则直接退出,否则调用MessageQueue::Dispatch进行响应。

          所以Win32SocketServer::Wait和Win32SocketServer::MessageWindow::OnMessage是在不同的阶段中,前一个是在主线程的某个事件的响应中执行,只有在事件响应完毕才会在主线程中消息循环调用GetMessage(&msg, NULL, 0, 0),当获取到消息s_wm_wakeup_id时,然后再去响应即调用Win32SocketServer::MessageWindow::OnMessage。

          看下Thread::Send这个函数,在函数的最后判断当前发送的消息响应后,向父线程发送唤醒消息,即调用current_thread->socketserver()->WakeUp();函数。考虑如下情形,当主线程中调用signaling_thread_->Send发送消息时,在signaling_thread_中会处理该消息,而此时主线程会等待,如果此时调用主线程的Post发送消息,Post会调用WakeUp(),但此时由于主线程等待signaling_thread_执行完毕,会消耗掉此WakeUp()而继续在Thread::Send的while循环中等待,而Post的消息就有可能不会以正确的时间顺序来执行了。

          如果将Thread::Send函数的最后这一条语句(即判断当前消息执行完毕后,调用当前线程的唤醒函数WakeUp())屏蔽掉,与正常流程的客户端通信,调试发现当作为呼叫端时是可以正常通信的,当作为被呼叫端则不能正常通信(这里因为测试不充分,后来发现被呼叫端有时也是可以正常通信的)。分析如下:首先是Win32SocketServer::Pump函数中,当两个正常的客户端通信时,message_queue_->Dispatch(&msg);仅仅被执行了一次,即调用MessageQueue::Dispatch函数后最终调用VideoCapturer::OnMessage函数响应MSG_STATE_CHANGE消息,这是因为客户端主线程只有这一处Post了该消息。关于MSG_STATE_CHANGE消息,总共被发送了两次。CaptureManager::StartVideoCapture函数调用VideoCapturer::SetCaptureState,进而调用MessageQueue::Post函数发送MSG_STATE_CHANGE消息。在ChannelManager::StartVideoCapture函数中,通过Bind和worker_thread_->Invoke(事实上是worker_thread_->Send)来调用CaptureManager::StartVideoCapture函数。

          调用ChannelManager::StartVideoCapture函数总共有两个地方,一个是signaling_thread_线程中响应MSG_CREATE_VIDEOSOURCE消息,另一个是conductor.cc文件中Conductor::OnMessageFromPeer函数中调用peer_connection_->SetRemoteDescription,进而调用ChannelManager::StartVideoCapture函数。其中peer_connection_就是一个封装了signaling_thread_的PeerConnectionProxy对象。

          根据以上分析,将Thread::Send函数的最后一条语句即判断当前线程所发送的消息的响应函数执行完毕后唤醒当前线程屏蔽掉,然后跟一个正常的客户端通信,发现VideoCapturer::SetCaptureState函数还是会Post两次MSG_STATE_CHANGE消息,即响应MSG_CREATE_VIDEOSOURCE消息(主线程Post发送MSG_STATE_CHANGE消息)和Conductor::OnMessageFromPeer函数中调用peer_connection_->SetRemoteDescription(signaling_thread_线程Post发送MSG_STATE_CHANGE消息)。但是VideoCapturer::OnMessage函数中响应MSG_STATE_CHANGE消息的只有一处,即signaling_thread_线程中,主线程中并没有执行该消息的响应,因为Win32SocketServer::Pump从来没有执行,而这又是因为主线程在调用signaling_thread_->Send的最后唤醒当前线程(Win32SocketServer::WakeUp函数)给屏蔽掉了,所以在主线程中Win32SocketServer类中的MessageWindow wnd_;获取不到Win32SocketServer构造函数中的s_wm_wakeup_id消息。所以不会调用Win32SocketServer::Pump,所以主线程Post的任何消息都不会被执行了。

          注意一点,主线程(继承自talk_base::Thread类的talk_base::Win32Thread)和talk_base::Thread的消息响应机制还不太一样,主线程的消息响应是通过Win32SocketServer类中的MessageWindow wnd_;变量的窗口响应函数实现的,即Win32SocketServer::MessageWindow::OnMessage函数,在main函数中获取发送到该窗口的消息,然后根据windows消息响应机制来调用该函数。而talk_base::Thread的消息响应机制则是在Thread::ProcessMessages的while循环中。

          分析一下为什么只有signaling_thread_线程Post的MSG_STATE_CHANGE消息被响应了,这是因为在signaling_thread_线程中,调用worker_thread_->Send,会暂停当前的线程signaling_thread_并转到worker_thread_线程中去执行消息响应,如果worker_thread_线程消息响应中调用了signaling_thread_->Post发送的消息,按正常流程(即不屏蔽Thread::Send的最后一条语句判断执行完后当前线程WakeUp),worker_thread_线程的消息响应完毕(Thread::ReceiveSends函数中的smsg.thread->socketserver()->WakeUp();)会唤醒父线程,因为Thread::Send函数中的while循环阻塞了当前线程(在这里就是调用worker_thread_->Send的signaling_thread_线程),然后signaling_thread_线程才可以继续执行,即while循环中的current_thread->socketserver()->Wait(kForever, false);返回并跳出了while循环,由于worker_thread_线程的消息响应中可能调用了signaling_thread_->Post发送的消息,所以在最后(即signaling_thread_线程中调用worker_thread_->Send的最后一条语句处)会判断worker_thread_->Send所发送的消息响应完后,去调用current_thread->socketserver()->WakeUp();即让signaling_thread_线程唤醒,去响应worker_thread_线程的消息响应中调用的signaling_thread_->Post所发送的消息。这样该消息就可以及时的响应。

          但是如果将这Thread::Send函数的最后一条语句(唤醒当前线程)屏蔽掉,signaling_thread_线程中的MSG_STATE_CHANGE消息也能被响应是因为,虽然没有及时的(即worker_thread_->Send所发送的消息响应完毕)去响应worker_thread_线程的消息响应中调用的signaling_thread_->Post所发送的消息。但是该消息还是保存在signaling_thread_线程的消息队列中(即基类MessageQueue的成员变量MessageList msgq_;),这样当下次在主线程中调用signaling_thread_->Send时,在Thread::Send函数中调用ss_->WakeUp();,由于是signaling_thread_来调用Thread::Send函数,所以唤醒的就是signaling_thread_线程的消息响应机制,再看下talk_base::Thread线程的消息响应机制,即Thread::ProcessMessages函数的while循环。首先是调用MessageQueue::Get函数,该函数中先调用Thread::ReceiveSends函数将当前线程Send的消息处理掉,然后while循环获取到消息,然后在Thread::ProcessMessages调用Dispatch(&msg);处理,直到signaling_thread_线程的消息队列处理完毕并阻塞在MessageQueue::Get函数中的ss_->Wait处。所以说唤醒的是线程消息响应机制,而非只是处理一个消息而已。

          所以基于以上分析,就算在signaling_thread_线程调用worker_thread_->Send所发送的消息响应中调用了signaling_thread_->Post发送了消息,而且Thread::Send函数的最后一条语句(唤醒当前线程)屏蔽掉,导致该消息保存在signaling_thread_的消息队列中并未及时响应(signaling_thread_->Post函数中调用的ss_->WakeUp();由于signaling_thread_等待worker_thread_->Send执行完毕而被Thread::Send函数中的while循环消耗掉),但是当下一次主线程调用signaling_thread_->Send时,上面这个消息会被响应,只是该消息的响应时间要看主线程什么时候调用signaling_thread_->Send,所以并不及时。这一切都是因为屏蔽了Thread::Send函数的最后一条语句(唤醒当前线程)。而主线程的MSG_STATE_CHANGE没有被执行,是因为没有线程去调用主线程来Send消息(也不可能有),否则主线程中的MSG_STATE_CHANGE消息也会被执行。

          将这Thread::Send函数的最后一条语句(唤醒当前线程)屏蔽掉,然后与另外一台主机上的正常的客户端通信,测试发现如果本地端作为呼叫端,则可以正常进行通信;反正如果对端作为呼叫端,则不能正常建立通信。然后抓包分析当本地端作为呼叫端时候是有sdp信息发送到对端的,作为被呼叫端则没有。

          发送sdp消息是WebRtcSessionDescriptionFactory::OnMessage函数中响应MSG_CREATE_SESSIONDESCRIPTION_SUCCESS消息最终会调用Conductor::OnSuccess发送sdp消息,而不管是呼叫端还是接受端,发送这个消息都执行一次,当为呼叫端时发送该消息流程则是:在Conductor::ConnectToPeer函数中会调用peer_connection_->CreateOffer(this, NULL);,而peer_connection_是一个PeerConnectionProxy对象,该对象封装了signaling_thread_,根据代理机制,最终会调用到PeerConnection::CreateOffer,该函数最终调用函数WebRtcSessionDescriptionFactory::PostCreateSessionDescriptionSucceeded发送消息signaling_thread_->Post(this, MSG_CREATE_SESSIONDESCRIPTION_SUCCESS, msg);。当接受呼叫时发送这个消息的流程在是Conductor::OnMessageFromPeer函数以PeerConnectionProxy对象来调用PeerConnection::CreateAnswer,进而调用WebRtcSession::CreateAnswer,最终会发送这个MSG_CREATE_SESSIONDESCRIPTION_SUCCESS消息。

          后来接着调试,发现将Thread::Send函数的最后一条语句(唤醒当前线程)屏蔽掉,然后与另外一台主机上的正常的客户端通信,不论是作为呼叫端还是被呼叫端都是可以进行正常通信的,只不过作为被呼叫端的时候有时候并不能正常通信。这是因为WebRtcSessionDescriptionFactory::PostCreateSessionDescriptionSucceeded函数中signaling_thread_->Post(this, MSG_CREATE_SESSIONDESCRIPTION_SUCCESS, msg);这个MSG_CREATE_SESSIONDESCRIPTION_SUCCESS消息并未被响应,也就没有发送sdp到呼叫端。

          上面在分析signaling_thread_线程Post的MSG_STATE_CHANGE消息被响应的原因是:signaling_thread_线程中worker_thread_->Send所发送的消息响应中调用的signaling_thread_->Post所发送的消息,本来应该是在signaling_thread_线程中worker_thread_->Send的最后通过唤醒当前线程来执行的,由于Thread::Send的最后这条唤醒当前线程消息响应的语句被屏蔽,所以导致该消息存储在signaling_thread_线程中,等待下一次主线程继续调用signaling_thread_->Send,会唤醒signaling_thread_线程的消息响应机制然后被执行。

          当把Thread::Send函数的current_thread->socketserver()->WakeUp();屏蔽掉后,ChannelManager::StartVideoCapture函数中通过worker_thread_调用CaptureManager::StartVideoCapture,在worker_thread_线程消息处理循环中通过主线程发送的MSG_STATE_CHANGE消息没有被执行,而signaling_thread_线程的消息处理循环中,又通过worker_thread_调用CaptureManager::StartVideoCapture,在CaptureManager::StartVideoCapture函数中通过signaling_thread_线程发送的MSG_STATE_CHANGE消息被响应,步骤正如前面对该消息的分析。

          一开始因为测试次数不够,就想当然地以为Thread::Send的最后这条唤醒当前线程消息响应的语句被屏蔽后,作为被呼叫端不可能跟正常的客户端通信。然后作为呼叫端之所以能正常通信是因为signaling_thread_->Post(this, MSG_CREATE_SESSIONDESCRIPTION_SUCCESS, msg);这个发送sdp的消息被正常执行,而之所以被执行是因为在执行这个消息的响应之前,主线程调用了signaling_thread_->Send。后来调试代码才发现在这个消息的响应之前,主线程并没有signaling_thread_->Send。因为MSG_CREATE_SESSIONDESCRIPTION_SUCCESS虽然是通过signaling_thread_来发Send发送的,但是是在signaling_thread_线程消息处理循环中发送的,而不是主线程调用signaling_thread_->Send发送。而且也发现作为被呼叫端也是可以跟正常的客户端通信的。

          MSG_CREATE_SESSIONDESCRIPTION_SUCCESS消息的响应与前面MSG_STATE_CHANGE消息被响应的步骤并不一样。MSG_STATE_CHANGE消息是signaling_thread_线程消息处理循环中通过worker_thread_调用CaptureManager::StartVideoCapture,最终是在worker_thread_线程消息循环中来调用signaling_thread_->Send发送MSG_STATE_CHANGE消息,所以如果没有Thread::Send函数的最后一条语句(唤醒当前线程),则有可能因为"Our Wait loop above may have consumed some WakeUp events for this MessageQueue"(Thread::Send函数中的注释)所以MSG_STATE_CHANGE消息得不到及时响应,但是这个消息是在Conductor::OnMessageFromPeer函数收到对端消息然后调用PeerConnectionProxy::SetRemoteDescription发送的,而且这个流程也会PostThreadMessage NEW_STREAM_ADDED,后面要响应NEW_STREAM_ADDED也会在主线程中通过signaling_thread_->Send来响应消息,所以MSG_STATE_CHANGE消息最终会执行。而MSG_CREATE_SESSIONDESCRIPTION_SUCCESS消息则是signaling_thread_线程消息处理循环中通过signaling_thread_发送,所以不会出现消耗WakeUp的情况。

          如上段所述,其实调用signaling_thread_->Post(this, MSG_CREATE_SESSIONDESCRIPTION_SUCCESS, msg);并不是上面分析的worker_thread_->Send所发送的消息响应中执行的,即并不是在worker_thread_线程中执行的。而是在signaling_thread_线程中执行的,所以Thread::Send的while循环中消耗掉当前线程的WakeUp并不存在。又由于在signaling_thread_线程中Post消息,而MessageQueue::Post会调用PhysicalSocketServer::WakeUp函数,即signaling_thread_线程的消息响应循环并不会阻塞在MessageQueue::Get函数中的PhysicalSocketServer::Wait,因为有消息待处理,所以MSG_CREATE_SESSIONDESCRIPTION_SUCCESS会被signaling_thread_线程的消息响应机制循环到并响应。如果A线程调用B线程的Send发送消息MSG_XXX,然后在消息MSG_XXX的响应中(即线程B中)又调用了A线程的Post,则由于A线程在Thread::Send的while循环中等待B线程来唤醒,但是B并未执行完毕,所以任何的对A的唤醒,包含这里的B线程中调用A的Post都会被消耗掉。而signaling_thread_线程中调用signaling_thread_->Post发送MSG_CREATE_SESSIONDESCRIPTION_SUCCESS消息则不属于这样的情形,所以会在signaling_thread_消息响应机制中对该消息响应。

    分析下为什么Thread::Send的最后这条唤醒当前线程消息响应的语句被屏蔽后,MSG_CREATE_SESSIONDESCRIPTION_SUCCESS有时候会被响应而有时候又不响应的原因?

    PhysicalSocketServer::Wait这个函数中,由于线程要处理Send或者Post给自己的消息,所以根据线程消息处理机制,在PhysicalSocketServer类中定义了一个Signaler* signal_wakeup_;变量,用于唤醒线程等待。所以在PhysicalSocketServer::Wait函数中通过WSAWaitForMultipleEvents函数对该事件进行关注,并且把类的成员变量WSAEVENT socket_ev_;放到事件数组的第一个,以后所有的io的socket都通过WSAEventSelect来关联到这个WSAEVENT socket_ev_变量上,以后当socket上有事件发生时,可以通过WSAEnumNetworkEvents并传入参数socket_ev_来获取socket上的事件并对这些事件分别响应。所以函数PhysicalSocketServer::Wait的参数process_io要设置为true,保证处理io的socket。正常情况下,WSAWaitForMultipleEvents函数所关注的事件数组中只有两个,第一个就是所有socket要绑定的WSAEVENT socket_ev_成员变量,第二个就是Signaler* signal_wakeup_;成员变量,用于唤醒当前线程的。

  • 相关阅读:
    挑战编程 uva100 3n+1
    《算法问题实战策略》 BOGGLE
    图论 最短路专辑
    acwing 76. 和为S的连续正数序列
    leetcode 19 删除链表的倒数第N个节点
    水文一篇 汇报下最*的学**况
    acwing 81. 扑克牌的顺子
    Solr基础理论与维护管理快速上手(含查询参数说明)
    Solr基础理论与维护管理快速上手(含查询参数说明)
    利用SolrJ操作solr API完成index操作
  • 原文地址:https://www.cnblogs.com/wongdu2014/p/5829301.html
Copyright © 2011-2022 走看看