zoukankan      html  css  js  c++  java
  • BOOST ASIO 学习专贴

    本文已于20170903更新完毕,所有boost asio 代码均为本人手抄。编译器为vs2013,并且所有代码已经上传,本文下方可下载源码

     

    为了学习boost asio库,我是从boost的官方boost asio的教程学起的。

    每一个示例我都抄写了一遍以加深记忆,每一个例子我都用自己的话概括一遍,虽然概括的不是很好,代码觉得难懂的地方我都加注释。

    1.同步使用Timer


    本便使用了boost::asio::deadline_timer,这个timer有两种状态:过期和不过期。wait函数调用一个过期的timer直接返回。

    int _tmain(int argc, _TCHAR* argv[])
    {
        boost::asio::io_service io;
        boost::asio::deadline_timer t(io,boost::posix_time::seconds(5));
        t.wait();
        std::cout<<"wait finished!"<<std::endl;
        return 0;
    }

    2.异步使用Timer

    下在演示了使用deadline_timer的asyn_wati函数实现异步等待。但要注意的一点是异步等待必须要调用io.run才可以。而且必须在io.run函数执行之前调用asyn_wait,否则io.run会立即返回,因为他没有可以做的事。这说明io.run必须至少有一个等待的,否则它会直接返回。asio函数保证回调函数执行和io.run所在的线程一样!

    //异步Timer
    void print(const boost::system::error_code & )
    {
        std::cout<<"Wait Finished"<<std::endl;
    }
    int _tmain(int argc, _TCHAR* argv[])
    {
        boost::asio::io_service io;
        boost::asio::deadline_timer t(io,boost::posix_time::seconds(5));
        t.async_wait(&print);
        io.run();
    
        return 0;
    }

    3.为回调函数绑定参数

    这个例子一个是说明异步Timer的可持续性问题,也就是在回调中设置Time的超时时间。另一个说明回调函数参数的绑定 。但是实际发现我官的代码没有发生那个重复回调的效果。原因是我只是调用了expire_at而没有调用再次等待函数async_wait。这让我更加明白expires_at这个函数相当于下次触发的时间。而async_wait提交一个等待申请。

    async_wait提交一次,回调函数执行一次,而expire_at设定下次回调函数调用的时间。

    #include <boost/bind.hpp>
    void Print(const boost::system::error_code & ,
               boost::asio::deadline_timer * t,int * count)
    {
        if(*count < 5)
        {
            std::cout<<*count<<std::endl;
            ++(*count);
            t->expires_at(t->expires_at() + boost::posix_time::seconds(1));
            t->async_wait(boost::bind(Print,boost::asio::placeholders::error,t,count));
        }
    }
    int _tmain(int argc, _TCHAR* argv[])
    {
        boost::asio::io_service io;
        int count = 0;
        boost::asio::deadline_timer t(io,boost::posix_time::seconds(1));
        t.async_wait(boost::bind(Print,boost::asio::placeholders::error,&t,&count));
        io.run();
        return 0;
    }

    4.类成员做为timer的回调函数

    这个例子主要演示了,如何绑定一个类成员函数作为一个回调

    class Print
    {
    public:
        Print(boost::asio::io_service & io)
            :timer_(io,boost::posix_time::seconds(1)),count_(0)
        {
            timer_.async_wait(boost::bind(&Print::print,this));
        }
        ~Print()
        {
            std::cout<<"finnal count is "<<count_<<std::endl;
        }
        void print()
        {
            if(count_ < 5)
            {
                std::cout<<count_<<std::endl;
                ++count_;
                timer_.expires_at(timer_.expires_at() + boost::posix_time::seconds(1));
                timer_.async_wait(boost::bind(&Print::print,this));
            }
        }
    protected:
        boost::asio::deadline_timer timer_;
        int count_;
    };
    
    int _tmain(int argc, _TCHAR* argv[])
    {
        boost::asio::io_service io;
        Print p(io);
        io.run();
        return 0;
    }

    4.在多线程程序中的同步回调

    先前的例子通过io_service.run和同步回调在同一个线程内,正如你所知的那样,asio保证回调函数只能被在io_service.run()所在的线程调用 。因此,只有在一个线程内调用io_service::run保证回调函数不会并发执行。这样在服务器程序中有两个局限性:

    1.当回调函数执行时间比较长时响应太慢
    2.没有起到多处理器的优势

    如果你注意到这个局限性,一个可供选择的方案是创建一个线程池去调用io_service.run()函数,这样实现的回调的并发,我们需要去同步一个共享变量。

    下面的例子使用到了An boost::asio::strand ,他保证这些回调函数通过strans派遣,它可以允许一个回调函数在另一个回调函数执行之前完成。简单点说,这里的strand就是让回调函数不会并发的执行。但是这里的strand到底的意图在哪里?不是要演示多线程执行回调吗?这里又做了strand使回调又依次执行好想没有达到多线程效果

    #include <boost/thread/thread.hpp>
    
    class printer
    {
    public:
        printer(boost::asio::io_service & io)
            :strand_(io),
            timer1_(io,boost::posix_time::seconds(1)),
            timer2_(io,boost::posix_time::seconds(1)),
            count_(0)
        {
            timer1_.async_wait(strand_.wrap(boost::bind(&printer::print1,this)));
            timer2_.async_wait(strand_.wrap(boost::bind(&printer::print2,this)));
        }
        void print1()
        {
            if(count_ < 10)
            {
                std::cout<<"Timer 1:"<<count_<<std::endl;
                ++count_;
    
                timer1_.expires_at(timer1_.expires_at() + boost::posix_time::seconds(1));
                timer1_.async_wait(strand_.wrap(boost::bind(&printer::print1,this)));
            }
        }
    
        void print2()
        {
            if(count_ < 10)
            {
                std::cout<<"Timer 2:"<<count_<<std::endl;
                ++count_;
    
                timer2_.expires_at(timer2_.expires_at() + boost::posix_time::seconds(1));
                timer2_.async_wait(strand_.wrap(boost::bind(&printer::print2,this)));
            }
        }
    private:
        boost::asio::io_service::strand strand_;
        boost::asio::deadline_timer timer1_;
        boost::asio::deadline_timer timer2_;
        int count_;
    };
    int _tmain(int argc, _TCHAR* argv[])
    {
        boost::asio::io_service io;
        printer p(io);
        boost::thread t(boost::bind(&boost::asio::io_service::run,&io));
        io.run();
        t.join();
        return 0;
    }

    下面有时间研究一下 boost::asio::strand的用法

    5.简单的一个TCP服务端

    下面程序演示一个boost做的最简单的一个服务端程序,客户端连接之后服务器给客户端发送一个当前时间的字符串

     下面值得一提的是tcp::acceptor,他被封装为socket的服务端接收器,构造他时需要一个io_service和一个tcp::endpoint。

    std::string make_daytime_string()
    {
        using namespace std;
        time_t now = time(0);
        return ctime(&now);
    }
    
    int _tmain(int argc, _TCHAR* argv[])
    {
        try
        {
            boost::asio::io_service io_service;
            tcp::acceptor acceptor(io_service,tcp::endpoint(tcp::v4(),13));
            for(;;)
            {
                tcp::socket socket(io_service);
                acceptor.accept(socket);//接受一个客户端socket
                std::string message = make_daytime_string();
                boost::system::error_code ignored_error;
                boost::asio::write(socket,boost::asio::buffer(message),ignored_error);
            }
        }
        catch(std::exception & e)
        {
            std::cerr<<e.what()<<std::endl;
        }
    
        return 0;
    }

    6.简单的一个TCP客户端

    int _tmain(int argc, _TCHAR* argv[])
    {
        try
        {
            if(argc != 2)
            {
                std::cerr<<"Usage:client <host>"<<std::endl;
                return 1;
            }
    
            boost::asio::io_service io_service;
            tcp::resolver resolver(io_service);
            tcp::resolver::query query(argv[1],"daytime");
            tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);
            tcp::socket socket(io_service);
            boost::asio::connect(socket,endpoint_iterator);
            for(;;)
            {
                boost::array<char,128> buf;
                boost::system::error_code error;
                size_t len = socket.read_some(boost::asio::buffer((buf)),error);
                if(error == boost::asio::error::eof)
                    break;
                else if(error)
                    throw boost::system::system_error(error);
                std::cout.write(buf.data(),len);
    
            }
        }
        catch(std::exception & e)
        {
            std::cerr<<e.what()<<std::endl;
        }
        return 0;
    }

    7.TCP异步服务端

    std::string make_daytime_string()
    {
        using namespace std;
        time_t now = time(0);
        return ctime(&now);
    }
    class tcp_connection : public boost::enable_shared_from_this<tcp_connection>
    {
    public:
        typedef boost::shared_ptr<tcp_connection> tcp_connection_ptr;
        static tcp_connection_ptr Create(boost::asio::io_service & io)
        {
            return tcp_connection_ptr(new tcp_connection(io));
        }
        tcp::socket & Socket()
        {
            return socket_;
        }
        void Start()
        {
            message_ = make_daytime_string();
            boost::asio::async_write(socket_,boost::asio::buffer(message_),
                boost::bind(&tcp_connection::handle_write,shared_from_this(),
                boost::asio::placeholders::error,
                boost::asio::placeholders::bytes_transferred));
    
        }
    private:
        tcp_connection(boost::asio::io_service & io) : socket_(io)    {}
        void handle_write(const boost::system::error_code & ,size_t) 
        {
    
        }
        tcp::socket socket_;
        std::string message_;
    };
    
    class TcpServer
    {
    public:
        TcpServer(boost::asio::io_service & io) : acceptor_(io,tcp::endpoint(tcp::v4(),13)) 
        {
            start_accept();
        }
    protected:
        void start_accept()
        {
            tcp_connection::tcp_connection_ptr new_connection = tcp_connection::Create(acceptor_.get_io_service());
            acceptor_.async_accept(new_connection->Socket(),
                boost::bind(&TcpServer::handle_accept,this,new_connection,boost::asio::placeholders::error));
        }
        tcp::acceptor acceptor_;
        void handle_accept(tcp_connection::tcp_connection_ptr new_connection,
            const boost::system::error_code & error)
        {
            if(!error)
                new_connection->Start();
            start_accept();
        }
    };
    
    int _tmain(int argc, _TCHAR* argv[])
    {
        try
        {
            boost::asio::io_service io_service;
            TcpServer server(io_service);
            io_service.run();
        }
        catch (std::exception & e)
        {
            std::cerr << e.what() << std::endl;
        }
        return 0;
    }

    以上代码调用异步函数asyn_accept和asyn_write分别进行异步接受socket和异步socket发送。

    以上代码是官方tutorial的代码,有几点特别的地方值得学习:

    • 构造函数私有化
      一般自己写代码构造函数不可能给私有化,而类tcp_connection使用一个静态类成员函数Create生产一个对象,而使得类的构造函数可以私有。
    • 使用enable_shared_from_this
      boost类enable_shared_from_this的好处是避免在类成员函数中传递this而传递一个shared_ptr智能指针,这样不用担心释放的问题。而在这里,如果传指针则有可能所持有的指针指向的对象已经被释放,如果用shared_ptr则可以保证不被释放,引用官方的一句话:We will use shared_ptr and enable_shared_from_this because we want to keep the tcp_connection object alive as long as there is an operation that refers to it.
    • 不指定没有用的参数,有可能注意到handle_write()没有error和byte_transfered参数,因为body中没有用到这两个参数,如果参数不使用可能以移除参数

    8.Custom Allocation

    // Async_Allocation.cpp : 定义控制台应用程序的入口点。
    //
    
    #include "stdafx.h"
    
    
    #include <array>
    #include <cstdlib>
    #include <iostream>
    #include <memory>
    #include <type_traits>
    #include <utility>
    #include <boost/asio.hpp>
    #include <boost/bind.hpp>
    
    using boost::asio::ip::tcp;
    
    
    
    //这个类实现了一个分配函数,他的功能是不让频繁的分配和释放。
    class handler_allocator
    {
    public:
        handler_allocator() : in_use_(false){}
        handler_allocator(const handler_allocator &) = delete;
        handler_allocator& operator=(const handler_allocator &) = delete;
    
        //分配函数
        //如果当前没有被使用,那标记为已使用 并返回指针
        //如果当前正在使用,则分配新的内存 new
        void * allocate(std::size_t size)
        {
            if (!in_use_ && size < sizeof(storage_))
            {
                in_use_ = true;
                return &storage_;
            }
            else
                return ::operator new(size);
        }
    
        //释放函数
        //如果当前要释放的指针就是本身,则标记为未使用
        //如果当前要释放的指针不是本身,那进行默认释放 delete
        void dealocate(void * pointer)
        {
            if (pointer == &storage_)
                in_use_ = false;
            else
                operator delete(pointer);
        }
    private:
        std::aligned_storage<1024>  storage_;
        bool in_use_;
    };
    
    
    template<typename Handler>
    class custom_alloc_handler
    {
    public:
        custom_alloc_handler(handler_allocator & a, Handler h) : allocator_(a), handler_(h){}
    
        //这个函数重置()运算符,使用可变参模板,调用handler_()
        template<typename ...Args>
        void operator()(Args&&... args)
        {
            handler_(std::forward<Args>(args)...);
        }
    
        //??
        friend void * asio_handler_allocate(std::size_t size, custom_alloc_handler<Handler>*this_handler)
        {
            return this_handler->allocator_.allocate(size);
        }
        //??
        friend void asio_handler_deallocate(void * pointer, std::size_t, custom_alloc_handler<Handler> * this_handler)
        {
            this_handler->allocator_.dealocate(pointer);
        }
        
    private:
        handler_allocator & allocator_;
        Handler handler_;
    
    };
    
    
    //他返回一个Handle
    template<typename Handler>
    inline custom_alloc_handler<Handler> make_custom_alloc_handler(handler_allocator & a, Handler h)
    {
        return custom_alloc_handler<Handler>(a, h);
    }
    
    
    class session : public std::enable_shared_from_this<session>
    {
    public:
        session(tcp::socket socket) : socket_(std::move(socket)){}
        void start()
        {
            do_read();
        }
    private:
        void do_read()
        {
            auto self(shared_from_this());
            socket_.async_read_some(boost::asio::buffer(data_),
                make_custom_alloc_handler(allocator_, 
                [this, self](boost::system::error_code ec, std::size_t length)
            {
                if (!ec)
                    do_write(length);
            }
            ));
        }
        void do_write(std::size_t length)
        {
            auto self(shared_from_this());
            boost::asio::async_write(socket_,boost::asio::buffer(data_,length),make_custom_alloc_handler(allocator_,
                [this, self](boost::system::error_code ec, std::size_t)
            {
                if (!ec)
                    do_read();
            }));
        }
        tcp::socket socket_;
        std::array<char, 1024> data_;//存储从客户端接受来的数据
        handler_allocator allocator_;//自定义内存分配
    };
    
    class server
    {
    public:
        server(boost::asio::io_service & io,short port)
            : acceptor_(io, tcp::endpoint(tcp::v4(), port)),
            socket_(io)
        {
            do_accept();
        }
    private:
        void do_accept()
        {
            acceptor_.async_accept(socket_, [this](boost::system::error_code  ec)
            {
                if (!ec)
                    std::make_shared<session>(std::move(socket_))->start();
                else
                    std::cerr << ec.value() << ec.message() << std::endl;
                do_accept();
            });
        }
        tcp::acceptor acceptor_;
        tcp::socket socket_;
    };
    
    int _tmain(int argc, _TCHAR* argv[])
    {
        try
        {
            boost::asio::io_service io;
            server s(io, 13);
            io.run();
        }
        catch (std::exception & e)
        {
            std::cerr << "Exception " << e.what() << std::endl;
        }
        return 0;
    }

    以上代码在调用socket.async_read_some的时候,第二个参数原来是一个Handler,原型如下:

    void handler(
      const boost::system::error_code& error, // Result of operation.
      std::size_t bytes_transferred           // Number of bytes read.
    );
    函数make_custom_alloc_handler产生一个custom_alloc_handler<Handler>对象,custom_alloc_handler<Handler>重载括号运算符实现对回调的调用,这种方法对于我来说感觉很厉害。总之这片代码我看得不是很懂。

    首先:回调用函数应该是一个执行体,也就是std::function,而这里来一个custom_alloc_handler<Handler>对象,对象也可以当作执行体?
    其次:这个函数没有用到asio_handler_allocate和asio_handler_deallocate,我也不知道如何使用。这个放到以后再研究 

    经过学习和查询信息得出的结果:

    1. 异步操作可以增加一个临时的分配对象asio_handler_allocate。因为异步操作有一个handler函数对象,这个临时对象可以堪称是与handler函数对象相关联的。本例中asio_handler_allocate为handler类对象的一个友元成员函数。这样在分配内存时,默认就调用此函数进行分配内存。任何与handler相关联的临时对象会在handler执行完之后被析构,而asio_handler_allocate这里除了size参数可以额外增加参数,例如本例中的this_handler参数一样,所以这里允许同一块内存可以被后来的异步操作重复利用,asio_handler_allocate原型如下:
      void * asio_handler_allocate(
          std::size_t size,
          ... );
    2. 
      Handler允许有多种形式存在
      1. 函数形式
        void read_handler(
            const boost::system::error_code& ec,
            std::size_t bytes_transferred)
        {
          ...
        }
        这种形式最为普通,就是一个回调用函数而已
      2. 类对象(重载括号运算符)
        struct read_handler
        {
          ...
          void operator()(
              const boost::system::error_code& ec,
              std::size_t bytes_transferred)
          {
            ...
          }
          ...
        };
        本例中应该就是这种
      3. 类成员函数
        void my_class::read_handler(
            const boost::system::error_code& ec,
            std::size_t bytes_transferred)
        {
          ...
        }
        ...
        socket.async_read(...,
            boost::bind(&my_class::read_handler,
              this, boost::asio::placeholders::error,
              boost::asio::placeholders::bytes_transferred));
    3. 通过以上知识点,可以清楚知道本例代码是如何执行的了。

     

    9.Buffers

    // BB_CountedReferenceBuffer.cpp : 定义控制台应用程序的入口点。
    //
    
    #include "stdafx.h"
    
    #include <string>
    
    #include <boost/asio.hpp>
    #include <memory>
    
    #include <utility>
    
    #include <iostream>
    
    using boost::asio::ip::tcp;
    
    class shared_const_buffer
    {
    public:
        //构造函数用std::string 分别初始化buffer_放data_对象
        shared_const_buffer(const std::string & data)
            : data_(new std::vector<char>(data.begin(), data.end())),
            buffer_(boost::asio::buffer(*data_)) {}
    
        typedef boost::asio::const_buffer value_type;
        typedef const boost::asio::const_buffer* const_iterator;
    
        const boost::asio::const_buffer* begin() const { return &buffer_; }
        const boost::asio::const_buffer* end() const { return &buffer_ + 1; }
    
    private:
        std::shared_ptr<std::vector<char>> data_;//vector char
        boost::asio::const_buffer buffer_;
    };
    
    class session
        : public std::enable_shared_from_this<session>
    {
    public:
        session(tcp::socket socket)
            :socket_(std::move(socket))
        {}
    
        void start()
        {
            do_write();
        }
    private:
        void do_write()
        {
            std::time_t now = std::time(0);
            shared_const_buffer buffer(std::ctime(&now));
    
            auto self(shared_from_this());
            boost::asio::async_write(socket_, buffer, 
                [this, self](boost::system::error_code,std::size_t)
            {
            });
        }
    
        tcp::socket socket_;
    };
    
    class server
    {
    public:
        server(boost::asio::io_service & io,short port)
            : acceptor_(io, tcp::endpoint(tcp::v4(),port)),
            socket_(io)
        {
            do_accept();
        }
    
    private:
        void do_accept()
        {
            acceptor_.async_accept(socket_,
                [this](boost::system::error_code ec)
            {
                if (!ec)
                    std::make_shared<session>(std::move(socket_))->start();
                else
                    std::cerr << ec.message() << std::endl;
                do_accept();
            });
        }
    
        tcp::acceptor acceptor_;
        tcp::socket socket_;
    
    };
    
    int _tmain(int argc, _TCHAR* argv[])
    {
        try
        {
            boost::asio::io_service io_service;
            server s(io_service, 13);
            io_service.run();
        }
        catch (std::exception & e)
        {
            std::cerr << "Exception:  " << e.what() << std::endl;
        }
        return 0;
    }

    代码解析:

    本例主要演示了,异步操作中可以自定义的buffer。

    以上代码自定义一个类shared_const_buffer,在调用async_write用这个类对象。async_write有多个重载,这里主要说示例中用到的重载形式,即:

    template<
        typename AsyncWriteStream,
        typename ConstBufferSequence,
        typename WriteHandler>
    void-or-deduced async_write(
        AsyncWriteStream & s,
        const ConstBufferSequence & buffers,
        WriteHandler handler);

    第二具模板参数ConstBufferSequence为一个模板参数,自定义ConstBufferSequence模板类有一些要求如下 :

    在下面要求列表中,X表示为一个包含类型T对象的类。a表示表示一个类型为X的值,u表示一个标识符
    本例中T为boost::asio::const_buffer buffer_,X为本例中的shared_const_buffer。

    1. X::value_type 返回类型为T,用于表示X实际表示的value_type为T,本例中为boost::asio::const_buffer
    2. X::const_iterator 指向T的迭代器类型,表示iterator类型实际为哪种类型,本例中为 const boost::asio::const_buffer * 
    3. X(a)  构造函数
    4. X u(a) 暂时不知如何解释
    5. (&a)->~X() 暂时不知如何解释
    6. a.begin() 返回起始迭代器
    7. a.end() 返回终止迭代器

    10.Chat_message数据包类

    class chat_message
    {
    public:
        enum { header_length = 4 };
        enum { max_body_length = 512 };
    
        chat_message() : body_length_(0) {}
    
        const char * data() const
        {
            return data_;
        }
        char * data()
        {
            return data_;
        }
        std::size_t length() const
        {
            return header_length + body_length_;
        }
    
        const char * body() const
        {
            return data_ + header_length;
        }
        char * body()
        {
            return data_ + header_length;
        }
    
        std::size_t body_length() const
        {
            return body_length_;
        }
        void body_length(std::size_t new_length)
        {
            body_length_ = new_length;
            if (body_length_ > max_body_length)
                body_length_ = max_body_length;
        }
    
        bool decode_header()
        {
            char header[header_length + 1] = "";
            strncat_s(header, data_, header_length);
            body_length_ = std::atoi(header);
            if (body_length_ > max_body_length)
            {
                body_length_ = 0;
                return false;
            }
            return true;
        }
        void encode_header()
        {
            char header[header_length + 1] = "";
            sprintf_s(header, "%4d", static_cast<int>(body_length_));
            std::memcpy(data_, header, header_length);
        }
    private:
        char data_[header_length + max_body_length];
        std::size_t body_length_;
    };

    这个类比较简单,他把一个数据包定义为头和体。头部是一个整形,代表body的大小。 

    11.Chat_Server详解

    先上代码

    // CB_ChatServer.cpp : 定义控制台应用程序的入口点。
    //
    
    #include "stdafx.h"
    
    #include "..CA_ChatMessagechat_message.h"
    
    #include <deque>
    #include <set>
    #include <memory>
    #include <boost/asio.hpp>
    #include <iostream>
    
    using boost::asio::ip::tcp;
    
    //deque类似于vector,但他可以快速的在头部和尾部插入元素
    typedef std::deque<chat_message> chat_message_queue;
    
    
    //聊天参与者
    class chat_participant
    {
    public:
        //析构函数
        virtual ~chat_participant() {}
    
        //交付
        virtual void deliver(const chat_message & msg) = 0;
    };
    
    typedef std::shared_ptr<chat_participant> chat_participant_ptr;
    
    
    /*
    chat_room
    对所有client进行管理 
    */
    class chat_room
    {
    public:
        //有客户端加入
        void join(chat_participant_ptr participant)
        {
            participants_.insert(participant);
    
            //当客户端加入之后,先把最近消息给广播一下
            for (auto msg : recent_msg_)
                participant->deliver(msg);
        }
    
        //有客户端离开
        void leave(chat_participant_ptr participant)
        {
            participants_.erase(participant);
        }
    
        //广播一条消息
        void deliver(const chat_message & msg)
        {
            recent_msg_.push_back(msg);//增加到最近消息列表
            while (recent_msg_.size() > max_recent_msgs)
                recent_msg_.pop_front();
            
            //给所有客户端广播这条消息
            for (auto participant : participants_)
                participant->deliver(msg);
        }
    private:
        std::set<chat_participant_ptr> participants_;
        enum{max_recent_msgs = 100};
        chat_message_queue recent_msg_;
    };
    
    class chat_session
        : public chat_participant,
        public std::enable_shared_from_this<chat_session>
    {
    public:
        chat_session(tcp::socket socket,chat_room & room)
            :socket_(std::move(socket)),
            room_(room){}
        void start()
        {
            room_.join(shared_from_this());
            do_read_header();
        }
    
        //广播一个消息,这里最主要做的其实是 write_msgs_.push_back(msg);
        //而do_write,只显为了驱动,大多数的write_msgs是在驱动后的on write里面执行的。
        virtual void deliver(const chat_message & msg)
        {
            //为什么要这样写,因为到后面room 的recent_msg有可能有几十个,例如是50个。则
            //Post write会 Post这么多次,而这里直接Post进一个对队列write_msgs,然后post一次
            //而其它的post只在OnPost里面再次去调用post write
            bool write_in_process = !write_msgs_.empty();//先前write_msgs是否不为空
            write_msgs_.push_back(msg);//添加要广播的消息到write_msgs对列里面
            if (!write_in_process)//如果先前write_msgs为空的话,说明写的消息正在投递。
                do_write();
        }
    private:
        //读消息
        void do_read_header()
        {
            auto self(shared_from_this());
            boost::asio::async_read(socket_,
                boost::asio::buffer(read_msg_.data(),chat_message::header_length),
                [this, self](boost::system::error_code ec, std::size_t)
            {
                if (!ec && read_msg_.decode_header())
                    do_read_body();
                else
                    room_.leave(shared_from_this());
            });
        }
        void do_read_body()
        {
            auto self(shared_from_this());
            boost::asio::async_read(socket_,
                boost::asio::buffer(read_msg_.body(),read_msg_.body_length()),
                [this, self](boost::system::error_code ec, std::size_t)
            {
                if (!ec)
                {
                    room_.deliver(read_msg_);//聊天室将消息保存到recent消息之后再将此消息广播出去
                    do_read_header();
                }
                else
                    room_.leave(shared_from_this());
            });
        }
    
        //这里是给客户端发次所有的write_msgs_。直到write_msgs_为空停止post write
        void do_write()
        {
            auto self(shared_from_this());
            boost::asio::async_write(socket_,
                boost::asio::buffer(write_msgs_.front().data(),write_msgs_.front().length()),
                [this, self](boost::system::error_code ec, std::size_t)
            {
                if (!ec)
                {
                    write_msgs_.pop_front();
                    if (!write_msgs_.empty())
                    {
                        do_write();
                    }
                }
                else
                    room_.leave(shared_from_this());
            });
        }
        tcp::socket socket_;//通信socket
        chat_room & room_;//房间引用
        chat_message read_msg_;//通信用到的read_msg
        chat_message_queue write_msgs_;//这个是最近消息队列
    };
    
    
    class chat_server
    {
    public:
        chat_server(boost::asio::io_service & io, const tcp::endpoint & endpoint)
            : acceptor_(io, endpoint),
            socket_(io)
        {
            do_accept();
        }
    private:
        void do_accept()
        {
            acceptor_.async_accept(socket_,[this](boost::system::error_code ec)
            {
                if (!ec)
                    std::make_shared<chat_session>(std::move(socket_), room_)->start();
                do_accept();
            });
        }
        tcp::acceptor acceptor_;
        tcp::socket socket_;
        chat_room room_;
    };
    int _tmain(int argc, _TCHAR* argv[])
    {
        try
        {
            boost::asio::io_service io;
            chat_server  s(io, tcp::endpoint(boost::asio::ip::address::from_string("127.0.0.1"), 13));
            io.run();
        }
        catch (std::exception & e)
        {
            std::cerr << "Exception: " << e.what() << std::endl;
        }
    
        return 0;
    }

    服务端做异步监听,当有客户端到来,把这个客户端(session) 放到聊天室对象里,当这个客户端断开时,从聊天室客户端列表里删除。
    这个聊天室实现了一个广播功能,当客户端发送消息至服务器时,服务器给所有客户端广播这条消息,并且聊天室记录最近客户端发送到服务器的消息,当客户端连接到服务器时,服务器主动把最近消息记录发送给这个客户端。
    这里要注意到一点的是,他的发送是类似消息驱动的形式,就是用一个对象保存要发送的消息,当发送成功回调OnSend里发现有未发完的消息时,再骈PostSend。而不是主动发送。我暂时不知道这种做法的意图。但是可以注意到的一点是这种发送是依次的,也就是PostSend顺序是这样的 PostSend OnSend PostSend OnSend,而我们经常的做法则是PostSend PostSend OnSend OnSend。这个好处不言而喻。提供了一种缓存机制。

    12.Chat_Client详解

    先上代码

    // CC_ChatClient.cpp : 定义控制台应用程序的入口点。
    //
    
    #include "stdafx.h"
    
    #include <cstdlib>
    #include <deque>
    #include <iostream>
    #include <thread>
    #include <boost/asio.hpp>
    #include "..CA_ChatMessagechat_message.h"
    using boost::asio::ip::tcp;
    
    typedef std::deque<chat_message> chat_message_queue;
    
    class chat_client
    {
    public:
        chat_client(boost::asio::io_service & io,
            tcp::resolver::iterator endpoint_iterator) :
            io_service_(io),
            socket_(io)
        {
            do_connect(endpoint_iterator);
        }
    
        //这里的做法与平时做法不太一样,
        //平时我们一般是write一条就去do_write一次,
        //而这里是write一次把内容加到write_msgs里面
        //当目前没有正在post正在执行时去do_write一下,否则把do_write 的操作放到on_write里面进行
        void write(const chat_message& msg)
        {
            io_service_.post(
                [this, msg]()
            {
                bool write_in_progress = !write_msgs_.empty();
                write_msgs_.push_back(msg);
                if (!write_in_progress)
                {
                    do_write();
                }
            });
        }
    private:
        void do_connect(tcp::resolver::iterator endpoint_iterator)
        {
            boost::asio::async_connect(socket_, endpoint_iterator,
                [this](boost::system::error_code ec, tcp::resolver::iterator)
            {
                if (!ec)
                    do_read_header();
                else
                    std::cerr << "connect failed:" << ec.message() << std::endl;
            });
        }
    
        void do_read_header()
        {
            boost::asio::async_read(socket_,
                boost::asio::buffer(read_msg_.data(),chat_message::header_length),
                [this](boost::system::error_code ec, std::size_t)
            {
                if (!ec && read_msg_.decode_header())
                    do_read_body();
                else
                    socket_.close();
            });
        }
    
        void do_read_body()
        {
            boost::asio::async_read(socket_,
                boost::asio::buffer(read_msg_.body(), read_msg_.body_length()),
                [this](boost::system::error_code ec, std::size_t)
            {
                if (!ec)
                {
                    std::cout.write(read_msg_.body(), read_msg_.body_length());
                    std::cout << "
    ";
                    do_read_header();
                }
                else
                    socket_.close();
            });
        }
    
        void do_write()
        {
            boost::asio::async_write(socket_,
                boost::asio::buffer(write_msgs_.front().data(),
                write_msgs_.front().length()),
                [this](boost::system::error_code ec, std::size_t)
            {
                if (!ec)
                {
                    write_msgs_.pop_front();
                    if (!write_msgs_.empty())
                        do_write();
                }
                else
                    socket_.close();
            });
        }
    
        boost::asio::io_service & io_service_;
        tcp::socket socket_;
        chat_message read_msg_;
        chat_message_queue write_msgs_;
    };
    
    
    int _tmain(int argc, _TCHAR* argv[])
    {
        try
        {
            boost::asio::io_service io;
            tcp::resolver resolver(io);
            auto end_point_iter = resolver.resolve({ "127.0.0.1", "13" });
            chat_client c(io, end_point_iter);
    
            std::thread t([&io](){io.run(); });
            char line[chat_message::max_body_length + 1] = { 0 };
            while (std::cin.getline(line, chat_message::max_body_length + 1))
            {
                chat_message msg;
                msg.body_length(std::strlen(line));
                std::memcpy(msg.body(), line, msg.body_length());
                msg.encode_header();
                c.write(msg);
            }
        }
        catch (std::exception & e)
        {
            std::cerr << "Exception: " << e.what() << "
    ";
        }
        return 0;
    }

    这个客户端没有什么特点,最大的特别就是我上节在服务端说到的,消息回调Post机制。

    13.echo

    echo都是非常简单的socket示例,暂时不做熬述

    14.Futures

    // EA_Futures.cpp : 定义控制台应用程序的入口点。
    //
    
    #include "stdafx.h"
    
    
    #include <array>
    #include <future>
    #include <iostream>
    #include <thread>
    #include <boost/asio/io_service.hpp>
    #include <boost/asio/ip/udp.hpp>
    #include <boost/asio/use_future.hpp>
    
    using boost::asio::ip::udp;
    
    void get_daytime(boost::asio::io_service & io, const char * host_name)
    {
        try
        {
            udp::resolver resv(io);
            std::future<udp::resolver::iterator> iter = resv.async_resolve({ udp::v4(), host_name, "daytime" }, boost::asio::use_future);
            udp::socket sock(io, udp::v4());
    
            std::array<char, 1> send_buf = { { 0 } };
            std::future < std::size_t> send_length = sock.async_send_to(boost::asio::buffer(send_buf), *iter.get(),
                boost::asio::use_future);
    
            send_length.get();//阻塞,直到发送完成
    
            std::array<char, 128> recv_buf;
            udp::endpoint sender_endpoint;
            std::future<std::size_t> recv_length = sock.async_receive_from(boost::asio::buffer(recv_buf),
                sender_endpoint,
                boost::asio::use_future);
    
            //当接收完成去做其它事
    
            std::cout.write(recv_buf.data(), recv_length.get());
    
        }
        catch (std::exception &e)
        {
            std::cerr << e.what() << std::endl;
        }
    }
    
    int _tmain(int argc, _TCHAR* argv[])
    {
        try
        {
            boost::asio::io_service io;
            boost::asio::io_service::work work(io);
            std::thread t([&io](){io.run(); });
    
            get_daytime(io, "127.0.0.1");
            io.stop();
            t.join();
        }
        catch (std::exception & e)
        {
            std::cerr << e.what() << std::endl;
        }
        return 0;
    }

    知识点:

    1.  io_service::work 这是一个很小的辅助类,只支持构造函数和析构函数。构造一个 work时,outstanding_work_+1,使得io.run在完成异步消息之后判断outstanding_work_时不为0,因而会使io.run()不至于返回。通俗的讲它就是让io.run一直运行不退出,只到work析构。
    2. std::future 他是获取异步执行函数的返回值的,相当于你创建了一个线程线程在计算某个结果,你要得到这个结果时,你得同步一下,还要看一下,结果算完了没有。future就是做这件事的。关于这个std::future我会另外开一篇文章写一下。这里有一篇文件详细介绍一下这个std::future干了什么http://blog.csdn.net/wangshubo1989/article/details/49872199
    3. io.stop 这个函数是告诉io_service要停止 。

    18.HttpServer

    本例用boost asio  写了一个简易http服务器,与前面的相比新的知识点不多。

    下面提供源码下载:

     

    源码下载

  • 相关阅读:
    Python eval 函数妙用
    502与504故障分析与解决方法
    [转]谈谈select, iocp, epoll,kqueue及各种网络I/O复用机制
    计算阶乘n!末尾0的个数
    C++中的Overload、Override和Overwrite
    C++中的空类与空结构体大小
    多线程编程之优先级翻转问题
    数据库原理之事务(二)
    数据库原理之事务(一)
    NoSQL之基础篇
  • 原文地址:https://www.cnblogs.com/zhangdongsheng/p/6984634.html
Copyright © 2011-2022 走看看