zoukankan      html  css  js  c++  java
  • boost的asio接收单路大数据量udp包的方法

    开发windows客户端接收RTP视频流,当h264视频达到1080P 60fps的时候,按包来调用recvfrom的函数压力比较大,存在丢包的问题,windows的完成端口的性能效果当然可以解决这个问题,而boost的asio在windows上是基于完成端口来开发的,所以采用boost的asio和环形缓冲区的方法,可以解决接收单路大数据量udp包中丢包的问题。

        需要引入的头文件为:

    [cpp] view plain copy
     
    1. #include "CircledBuffer.h"  
    2. #include <iostream>  
    3. #include <boost/asio.hpp>  
    4. #include <boost/bind.hpp>  

    其中CircledBuffer.h是自定义的缓冲区的类,之后会有介绍,boost的两个文件是asio必需的两个文件。

        需要定义的全局变量为:

    [cpp] view plain copy
     
    1. using boost::asio::ip::udp;  
    2. boost::asio::io_service service;  
    3. boost::asio::ip::udp::socket sock(service);  
    4. boost::asio::ip::udp::endpoint sender_ep;  
    5. CircledBuffer readBuffer;  
    6. PacketBuffer* packet;  

    其中io_service是用来标示启动的,后面会调用run。sock和endpoint类似于描述符和sockaddr_in的关系。CircledBuffer和PacketBuffer*,是自定义缓冲区。

    主函数为:

    [cpp] view plain copy
     
    1. int main(int argc, char* argv[]) {  
    2.     boost::asio::ip::udp::endpoint ep( boost::asio::ip::address::from_string("192.168.1.206"),  
    3.         9002);  
    4.     sock.open(ep.protocol());  
    5.     sock.set_option(boost::asio::ip::udp::socket::reuse_address(true));  
    6.     boost::asio::socket_base::receive_buffer_size recv_option(8*65534);  
    7.     sock.set_option(recv_option);  
    8.     sock.bind(ep);  
    9.     packet = readBuffer.GetLast();  
    10.     sock.async_receive_from(boost::asio::buffer(packet->data, packet->bufferSize), sender_ep, &on_read);  
    11.     service.run();  
    12. }  


    初始化ep和sock,其中udp接收的数量比较大的话,需要设定receive_buffer_size,然后bind,设置接受buffer为packet。

    介绍一下async_receive_from函数,它有三个参数,分别为接收的buffer,远端的ep,注意与本端的ep不同,远端的ep不用初始化设置,再就是buffer收满后的回调函数。

    回调函数的内容是:

    [cpp] view plain copy
     
    1. void on_read(const boost::system::error_code & err, std::size_t  
    2.              read_bytes) {  
    3.         std::cout << "read: " << read_bytes << std::endl;  
    4.         readBuffer.MoveNext();  
    5.         packet= readBuffer.GetLast();  
    6.         sock.async_receive_from(boost::asio::buffer(packet->data, packet->bufferSize), sender_ep, &on_read);  
    7. }  

    与main函数的接收部分一致,这里用了不断的自身回调,来实现while recvfrom的功能。

    补充说一句,用申请好的CircledBuffer,便于后期的多线程或者异步strand的处理,而不阻塞接收。

    缓冲区类的代码:

    头文件:

    [cpp] view plain copy
     
    1. #ifndef CIRCLED_BUFFER_H  
    2. #define CIRCLED_BUFFER_H  
    3.   
    4. #include <memory.h>  
    5. #include <boost/atomic.hpp>  
    6. #define CIRCLED_BUFFER_SIZE 300  
    7. #define BUFFER_SIZE 2000  
    8.   
    9. struct PacketBuffer  
    10. {  
    11.     PacketBuffer(){bufferSize=BUFFER_SIZE;dataSize=0;}   
    12.     unsigned int bufferSize;  
    13.     unsigned int dataSize;  
    14.     char data[BUFFER_SIZE];  
    15.   
    16.     PacketBuffer& operator=(PacketBuffer& other)  
    17.     {  
    18.         memcpy(data,other.data,other.dataSize);  
    19.         dataSize = other.dataSize;  
    20.         bufferSize = other.bufferSize;  
    21.         return *this;  
    22.     }  
    23. };  
    24.   
    25. class CircledBuffer  
    26. {  
    27. public:  
    28.     CircledBuffer(unsigned int bufSize=CIRCLED_BUFFER_SIZE);  
    29. public:  
    30.     ~CircledBuffer(void);  
    31.     PacketBuffer* GetAt(unsigned int idx){return &packets[idx];}  
    32.     PacketBuffer* GetLast()  
    33.     {         
    34.         return GetAt(writeIndex.load(boost::memory_order_consume));  
    35.     };  
    36.     void MoveNext()  
    37.     {  
    38.         unsigned int idx = writeIndex.load(boost::memory_order_relaxed);  
    39.         writeIndex.store((idx+1)%bufferSize,boost::memory_order_release);  
    40.     };  
    41.     unsigned int GetLastIndex(){return writeIndex.load(boost::memory_order_consume);};  
    42.     unsigned int GetSize(){return bufferSize;};  
    43. protected:  
    44.     boost::atomic<unsigned int> writeIndex;  
    45.     unsigned int bufferSize;  
    46.     PacketBuffer* packets;  
    47. };  
    48. #endif  


    缓冲区类的构造函数与析构函数

    [cpp] view plain copy
     
    1. #include "CircledBuffer.h"  
    2.   
    3. CircledBuffer::CircledBuffer(unsigned int bufSize)  
    4. :bufferSize(bufSize),  
    5. writeIndex(0)  
    6. {  
    7.     packets = new PacketBuffer[bufSize];  
    8. }  
    9.   
    10. CircledBuffer::~CircledBuffer(void)  
    11. {  
    12.     delete []packets;  
    13. }  


    源代码下载链接

  • 相关阅读:
    Process对象的其他属性:
    python网络编程之开启进程的方式
    python网络编程之进程论
    python网络编程之C/S架构介绍
    面向对象之元类介绍(未完待续)
    面向对象之继承
    面向对象之元类介绍
    python异常处理
    面向对象之内置方法
    面向对象之反射
  • 原文地址:https://www.cnblogs.com/lidabo/p/8317296.html
Copyright © 2011-2022 走看看