zoukankan      html  css  js  c++  java
  • Boost.Asio基础(五) 异步编程初探

    异步编程

           本节深入讨论异步编程将遇到的若干问题。建议多次阅读,以便吃透这一节的内容,这一节是对整个boost.asio来说是非常重要的。

    为什么须要异步

           如前所述,通常同步编程要比异步编程更简单。同步编程下,我们非常easy线性地对问题进行考量。函数A调用完,继续运行B。B运行完,继续运行C。以此类推。相对照较直观。而对于异步编程,如果有5个事件,我们非常难知道它们详细的运行顺序,你甚至不知道,它究竟会不会被运行。
    尽管编写异步的程序,非常难,可是依旧须要使用这样的方法。

    由于server程序须要同一时候并行的处理大量的客户端。并行的客户端越多,异步编程的优势就越明显。
            如果有一个server程序,须要同一时候处理1000个并行的客户端,客户端和server之间的通信消息。以’ ’来结尾。
    这是同步模式下的代码,使用1个线程:

    using namespace boost::asio;
    struct client{
        ip::tcp::socket sock;
        char buff[1024]; //每一个消息最大1024个字节
        int already_read; //已经读取了多少字节
    };
    std::vector<client> clients;
    void handle_clients() {
        while(true) {
            for(int i=0; i<clients.size(); ++i) {
                if(clients[i].sock.available() ) on_read(clients[i]));
            }
        }
    }
    
    void on_read(client& c) {
        int to_read = std::min(1024 - c.already_read, c.sock.available());
        c.sock.read_some(buffer(c.buff + c.already_read, to_read);
        c.already_read += to_read;
        if(std::find(c.buff, c.buff + c.already_read, '
    ') < c.buff + c.already_read) {
            int pos = std::find(c.buff, c.buff + c.alread_read, '
    ') - c.buff;
            std::string msg(c.buff, c.buff + pos);
            std::copy(c.buff + pos, c.buff + 1024, c.buff);
            c.already_read -= pos;
            on_read_msg(c, msg);
        }
    }
    
    void on_read_msg(client & c, const std::string& msg) {
        if(msg == "request_login")
            c.sock.write("request_ok
    ");
        else if ...
    }

         在任务server程序中,你都须要避免代码被堵塞。看上面的代码,我们希望handle_clients()这个函数尽可能的不被堵塞。

    不论什么时候函数堵塞。从客户端发来的消息就会等待。直到函数不堵塞的时候才干来处理它们。为了避免程序被堵塞,我们仅仅在socket中确实有数据的时候才去读取,实现的代码是:

    if(clients[i].sock.available() ) on_read(clients[i]));

    在on_read函数中,我们仅仅读取socket确实有效的数据;调用read_util(c.sock, buffer(…),’ ’);是非常不好的选择。它会堵塞程序,直到完整的读取了所有的数据才返回。

    我们永远不知道究竟什么时候才干读取完成。


         程序的瓶颈是on_read_msg()这个函数。所有输入的数据都被卡在这里。on_read_msg()应该尽量避免出现这样的情况,但有时这又是全然没办法避免的。(比方,当socket的缓冲区满了以后,操作必将被堵塞)。
         以下是同步模式下的代码,使用10个线程:

    using namespace boost::asio;
        //...这里的定义和前面一样
        bool set_reading() {
            boost::mutex::scope_lock lk(cs_);
            if(is_reading_) return false;
            else { if_reading_ = true; return true; }
        }
        void unset_reading() {
            boost::mutex::scoped_lock lk(cs_);
            is_reading_ = false;
        }
        private:
            boost::mutex cs_;
            bool is_reading_;
    };
    std::vector<client> clients;
    void handle_clients() {
        for(int i=0; i<10; ++i) {
            boost::thread(handle_clients_thread);
        }
    }
    
    void handle_clients_thread() {
        while(true) {
            for(int i=0; i<clients.size(); ++i) {
                if(clients[i].sock.available()) {
                    if(clients[i].set_reading()) {
                        on_read(clients[i]);
                        clients[i].unset_reading();
                    }
                }
            }
        }
    }
    
    void on_read(client & c) {
    ...
    }
    void on_read_msg(client & c, const std::string& msg) {
    ...
    }

         为了使用多线程,我们须要同步机制,set_reading()和unset_reading()就是在做这个用的。set_reading()。非常重要。它測试当前是否有线程在做读取操作。
         能够看出来代码已经比較复杂了。
         另一种同步编程的方法,就是每一个client分配一个线程。可是随着并行的客户端数量的增长,这显然是不可行的。


         如今我们来看看异步方法。当client请求数据的时候,on_read被调用,发送返回数据,之后又等待下一个请求(运行另一个异步的读操作)。


    异步代码,10个线程:

    using namespace boost::asio;
    io_service service;
    struct client {
        ip::tcp::socket sock;
        streambuf buff;
    };
    std::vector<client> clients;
    void handle_clients() {
        for(int i=0; i<clients.size(); ++i) {
            async_read_util(clients[i].sock, clients[i].buff, '
    ',
                boost::bind(on_read, clients[i], _1, _2));
        }
        for(int i=0; i<10; ++i)
            boost::thread(handle_clients_thread);
    }
    void handle_clients_thread() {
        service_run();
    }
    
    void on_read(client& c, const error_code& err, size_t read_bytes) {
        std::istream in(&c.buff);
        std::string msg;
        std::getline(in, msg);
        if(msg == "request_login")
            c.sock.async_write("request_ok
    ", on_write);
        else if ...
        ...
        //如今在同一个socket上等待下一个读取请求
        async_read_util(c.sock, c.buff, '
    ',
            boost::bind(on_read, c, _1, _2));
    }
  • 相关阅读:
    scala的泛型浅析
    spark2.0的10个特性介绍
    spark2.0 DataSet操作的一些问题记录
    scala中ClassOf、asInstenceOf、isInstanceOf三个预定义方法分析
    Java 多线程与并发编程专题
    java nio入门
    MySQL索引优化详解
    shiro学习笔记-Subject#login(token)实现过程
    【转】线程八锁
    ReadWriteLock读写锁
  • 原文地址:https://www.cnblogs.com/zfyouxi/p/5183529.html
Copyright © 2011-2022 走看看