zoukankan      html  css  js  c++  java
  • boost asio tcp 多线程

    common/pools.h

     1 // common/pools.h
     2 
     3 #pragma once
     4 
     5 #include <string>
     6 
     7 #include <boost/pool/pool.hpp>
     8 #include <boost/pool/singleton_pool.hpp>
     9 
    10 class head
    11 {
    12 public:
    13     head()  = default;
    14     ~head() = default;
    15 
    16     static const int HEAD_LENGTH = 16;
    17     void serialize();
    18     void parse();
    19 
    20     char* get_data();
    21     void set_len_body(int32_t len_body);
    22     int32_t get_len_body() const;
    23 
    24 private: 
    25     char    m_data[HEAD_LENGTH];
    26     int32_t m_len_body { 0 };
    27 };
    28 
    29 using pool_head   = boost::singleton_pool<struct struHead, sizeof(head)>;
    30 using pool_string = boost::singleton_pool<struct struString, sizeof(std::string)>;
    31 
    32 std::shared_ptr<head> createSharedHead();
    33 std::shared_ptr<std::string> createSharedString();
    View Code

    common/pools.cpp

     1 // common/pools.cpp
     2 
     3 #include "pools.h"
     4 #include <boost/asio.hpp>
     5 
     6 void head::serialize()
     7 {
     8     using namespace boost::asio::detail::socket_ops;
     9     int32_t len_net = host_tp_network_long(m_len_body);
    10 
    11     memcpy(m_data, &len_net, sizeof(len_net));
    12 }
    13 
    14 void head::parse()
    15 {
    16     using namespace boost::asio::detail::socket_ops;
    17     int32_t len_net = 0;
    18 
    19     memcpy(&len_net, m_data, sizeof(len_net));
    20     m_len_body = network_to_host_long(len_net);
    21 }
    22 
    23 char* head::get_data()
    24 {
    25     return m_data;
    26 }
    27 
    28 void head::set_len_body(int32_t len_body)
    29 {
    30     m_len_body = len_body;
    31 }
    32 
    33 int32_t head::get_len_body() const
    34 {
    35     return m_len_body;
    36 }
    37 
    38 std::shared_ptr<head> createSharedHead()
    39 {
    40     std::shared_ptr<head> sp(new (pool_head::malloc()) head,
    41         [](head *h)
    42     {
    43         h->~head();
    44         pool_head::free(h);
    45     });
    46 
    47     return sp;
    48 }
    49 
    50 std::shared_ptr<std::string> createSharedString()
    51 {
    52     std::shared_ptr<std::string> sp(new (pool_string::mallock()) std::string,
    53         [](std::string *s)
    54     {
    55         s->~basic_string();
    56         pool_string::free(s);
    57     });
    58 
    59     return sp;
    60 }
    View Code

    server/main.cpp

     1 // server/main.cpp
     2 
     3 #include <iostream>
     4 #include "asio_server.h"
     5 
     6 #include <Log.h>
     7 
     8 log4cplus::logger gLog;
     9 
    10 int main(int argc, char* argv[])
    11 {
    12     log4cplus::initialize();
    13     log4cplus::PropertyConfigurator::doConfigure(LOG4CPLUS_TEXT("log4cplus.properties"));
    14     gLog = log4cplus::Logger::getInstance("server");
    15 
    16     LOG4CPLUS_INFO_FMT(gLog, "main begin...");
    17 
    18     boost::asio::io_service io_svc;
    19     boost::shared_ptr<asio_server> sp_serv
    20         = boost::make_shared<asio_server>(io_svc, "127.0.0.1", 20017);
    21     sp_serv->listen(5);
    22 
    23     // 捕获信号
    24     boost::asio::signal_set sigs(io_svc);
    25     sigs.add(SIGINT);
    26     sigs.add(SIGTERM);
    27     sigs.async_wait([&io_svc](const boost::system::error_code &ec, int sig)
    28     {
    29         LOG4CPLUS_INFO_FMT(gLog, "signal: %d, message: %s", sig, ec.message().c_str());
    30     });
    31 
    32     std::vector<std::thread> vecThread;
    33     for (int i = 0; i < 10; ++i)
    34     {
    35         vecThread.emplace_back(std::thread([&io_svc]() 
    36         {
    37             LOG4CPLUS_INFO_FMT(gLog, "thread start...");
    38             io_svc.run();
    39             LOG4CPLUS_INFO_FMT(gLog, "thread finish.");
    40         }));
    41     }
    42 
    43     for (int i = 0; i < vecThread.size(); ++i)
    44     {
    45         vecThread[i].join();
    46     }
    47     assert(io_svc.stopped());
    48 
    49     return 0;
    50 }
    View Code

    server/asio_server.h

     1 // server/asio_server.h
     2 
     3 #pragma once
     4 
     5 #include <vector>
     6 #include <string>
     7 #include <cstdint>
     8 
     9 #include <boost/asio.hpp>
    10 #include <boost/enable_shared_from_this.hpp>
    11 #include <boost/noncopyable.hpp>
    12 #include <boost/make_shared.hpp>
    13 #include <boost/function.hpp>
    14 #include <boost/bind.hpp>
    15 
    16 
    17 
    18 class head;
    19 
    20 class asio_server : public boost::enable_shared_from_this<asio_server>,
    21                     public boost::noncopyable
    22 {
    23 public:
    24     asio_server(boost::asio::io_service &io_svc, const std::string &lis_ip, uint16_t lis_port);
    25     virtual ~asio_server();
    26 
    27     bool listen(int accept_num = 1);
    28 
    29 private:
    30     void async_accept();
    31     void handle_accept(boost::shared_ptr<boost::asio::ip::tcp::socket> new_conn,
    32         const boost::system::error_code &ec);
    33     void async_read_head(boost::shared_ptr<boost::asio::ip::tcp::socket> conn);
    34     void hand_head(
    35         boost::shared_ptr<boost::asio::ip::tcp::socket> conn,
    36         std::shared_ptr<head> sp_hd,
    37         const boost::system::error_code &ec,
    38         std::size_t bytes_transfered);
    39     void async_read_proto(boost::shared_ptr<boost::asio::ip::tcp::socket> conn, int32_t len);
    40     void handle_proto(
    41         boost::shared_ptr<boost::asio::ip::tcp::socket> conn,
    42         std::shared_ptr<std::string> sp_body,
    43         const boost::system::error_code &ec,
    44         std::size_t bytes_transfered);
    45 
    46 private:
    47     boost::asio::io_service&                m_io_svc;
    48     std::string                             m_lis_ip;
    49     uint16_t                                m_lis_port;
    50     boost::asio::ip::tcp::acceptor          m_acceptor;
    51 }
    View Code

    server/asio_server.cpp

      1 // server/asio_server.cpp
      2 
      3 #include "asio_server.h"
      4 
      5 #include <common/pools.h>
      6 #include <atomic>
      7 #include <Log.h>
      8 #include <common.pb.h>
      9 
     10 extern log4cplus::Logger gLog;
     11 
     12 asio_server::asio_server(boost::asio::io_service &io_svc,
     13     const std::string &lis_ip, uint16_t lis_port)
     14     : m_io_svc(io_svc)
     15     , m_lis_ip(lis_ip)
     16     , m_lis_port(lis_port)
     17     , m_acceptor(io_vc)
     18 {
     19     boost::asio::ip::address_v4 lis_addr;
     20 
     21     if (!m_lis_ip.empty())
     22     {
     23         m_lis_addr = boost::asio::ip::address_v4::from_string(m_lis_ip);
     24     }
     25     boost::asio::ip::tcp::endpoint lis_ep(lis_addr, m_lisport);
     26 
     27     boost::asio::ip::tcp::acceptor acc(io_svc, lis_ep);
     28     m_acceptor = std::move(acc);
     29 }
     30 
     31 asio_server::~asio_server()
     32 {
     33 }
     34 
     35 bool asio_server::listen(int accept_num /*= 1*/)
     36 {
     37     if (accept_num <= 0)
     38         return false;
     39 
     40     for (int i = 0; i < accept_num; ++i)
     41         async_accept();
     42 
     43     return true;
     44 }
     45 
     46 void asio_server::async_accept()
     47 {
     48     LOG4CPLUS_INFO_FMT(gLog, "async_accept waitting...");
     49 
     50     boost::shared_ptr<boost::asio::ip::tcp::socket> new_sock
     51         = boost::make_shared<boost::asio::ip::tcp::socket>(boost::ref(m_io_svc));
     52 
     53     boost::function<void(const boost::system::error_code &)> cb_accept;
     54     cb_accept = boost::bind(&asio_server::handle_accept, shared_from_this(), new_sock, _1);
     55     m_acceptor.async_accept(*new_sock, cb_accept);
     56 }
     57 
     58 void asio_server::handle_accept(
     59     boost::shared_ptr<boost::asio::ip::tcp::socket> new_conn,
     60     const boost::system::error_code &ec)
     61 {
     62     static std::atomic<int32_t> s_num = 0;
     63 
     64     if (ec != 0)
     65     {
     66         LOG4CPLUS_INFO(gLOg, "accept failed: " << ec.message());
     67         return;
     68     }
     69     LOG4CPLUS_INFO(gLog, "a new client connected." << new_conn->remote_endpoint());
     70     LOG4CPLUS_INFO_FMT(gLog, "current connect number: %d", ++s_num);
     71 
     72     async_read_head(new_conn);
     73 
     74     // 处理下一个连接,每次处理完了之后,需要再次accept。
     75     // 否则BOOST  将只处理一次,然后结束监听。
     76     // 所以这里可以处理一个情况,就是当你要结束监听 的时候只要在这里return
     77     // 那io_service 的run() 函数就会stop。但如果还有其他的异步操作时,
     78     // run() 函数还是会继续运行。
     79     async_accept();
     80 }
     81 
     82 void asio_server::async_read_head(boost::shared_ptr<boost::asio::ip::tcp::socket> conn)
     83 {
     84     std::shared_ptr<head> sp_hd = createSharedHead();
     85 
     86     // 回调函数
     87     boost::function<void(const boost::system::error_code &std::size_t)> cb_msg_head;
     88 
     89     cb_msg_head = boost::bind(&asio_server::handle_head, shared_from_this(), conn, sp_hd, _1, _2);
     90 
     91     // 异步读,读一个报文的长度,boost::asio::async_read() 函数有个特点,
     92     // 它会将这里指定的buffer 缓冲区读满了才会去回调handle_head 函数。
     93     boost::asio::async_read(
     94         *conn, boost::asio::buffer(sp_hd->get_data(), head::HEAD_LENGTH), cb_msg_head);
     95 }
     96 
     97 void asio_server::handle_head(
     98     boost::shared_ptr<boost::asio::ip::tcp::socket> conn,
     99     std::shared_ptr<head> sp_hd,
    100     const boost::system::error_code &ec,
    101     std::size_t bytes_transfered)
    102 {
    103     if (!conn->is_open())
    104     {
    105         LOG4CPLUS_INFO(gLog, "socket was not opened.");
    106         return;
    107     }
    108 
    109     if (ec != 0)
    110     {
    111         if (ec == boost::asio::error_eof)
    112             LOG4CPLUS_INFO(gLog, "Disconnect from " << conn->remote_endpoint());
    113         else
    114             LOG4CPLUS_INFO(gLog, "Error on receive: " << ec.message());
    115     }
    116 
    117     // 这里对的数据做处理
    118     assert(bytes_transfered == head::HEAD_LENGTH);
    119     sp_hd->parse();
    120     LOG4CPLUS_INFO_FMT(gLog, "nLenLoc: %d", sp_hd->get_len_body());
    121 
    122     async_read_proto(conn, sp_hd->get_len_body());
    123 }
    124 
    125 void asio_server::async_read_proto(
    126     boost::shared_ptr<boost::asio::ip::tcp::socket> conn, int32_t len)
    127 {
    128     // 数据部分
    129     std::shared_ptr<std::string> sp_body = createSharedString();
    130     std::string& str_body = *sp_body;
    131     str_body.resize(len);
    132 
    133     // 回调函数
    134     boost::function<void(const boost::system::error_code &, std::size_t)> cb_proto;
    135     cb_proto = boost::bind(&asio_server::handle_proto, shared_from_this(), conn, sp_body, _1, _2);
    136 
    137     boost::asio::async_read(*conn,
    138         boost::asio::buffer(&str_body[0], len), cb_proto);
    139 }
    140 
    141 void asio_server::handle_proto(
    142     boost::shared_ptr<boost::asio::ip::tcp::socket> conn,
    143     std::shared_ptr<std::string> sp_body,
    144     const boost::system::error_code &ec,
    145     std::size_t bytes_transfered)
    146 {
    147     if (!conn->is_open())
    148     {
    149         LOG4CPLUS_INFO(gLog, "socket was not opened.");
    150         return;
    151     }
    152     
    153     if (ec != 0)
    154     {
    155         if (ec == boost::asio::error::eof)
    156             LOG4CPLUS_INFO(gLog, "Disconnect from " << conn->remote_endpoint());
    157         else
    158             LOG4CPLUS_INFO(gLog, "Error on receive: " << ec.message());
    159 
    160         return;
    161     }
    162 
    163     // 可以将下一个消息的到这里,让处理proto 数据的同时下一个数据。
    164     // async_read_head(conn);
    165 
    166     LOG4CPLUS_INFO_FMT(gLog, "body len: %ld", bytes_transfered);
    167     // 处理这个proto 数据
    168     // ...
    169     // 这里将这个std::string 转换成一个proto, 然后处理这个proto
    170     MessageHead pro;
    171     if (!pro.ParseFromArray(sp_body->data(), (int32_t)bytes_transfered))
    172     {
    173         LOG4CPLUS_ERROR_FMT(gLog, "ParseFromArray() failed.");
    174         return;
    175     }
    176 
    177     int port = conn->remote_endpoint().port();
    178     LOG4CPLUS_INFO_FMT(gLog, "port: %d
    %s", port, pro.DebugString().c_str());
    179 
    180     async_read_head(conn);
    181 }
    View Code

    client/main.cpp

     1 // client/main.cpp
     2 
     3 #include <iostream>
     4 #include <thread>
     5 #include <vector>
     6 
     7 #include "asio_client.h"
     8 #include <Log.h>
     9 
    10 log4cplus::Logger gLog;
    11 
    12 int main(int argc, char *argv[])
    13 {
    14     log4cplus::initialize();
    15     log4cplus::PropertyConfigurator::doConfigure(LOG4CPLUS_TEXT("log4cplus.properties"));
    16     gLog = log4cplus::Logger::getInstance("client");
    17 
    18     LOG4CPLUS_INFO_FMT(gLog, "main begin...");
    19 
    20     boost::asio::io_service io_svc;
    21 
    22     for (int i = 0; i < 50; ++i)
    23     {
    24         boost::shared_ptr<asio_client> client
    25             = boost::make_shared<asio_client>(io_svc, "127.0.0.1", 20017);
    26         client->async_connect();
    27     }
    28 
    29     std::vector<std::thread> vecThread;
    30     for (int i = 0; i < 5; ++i)
    31     {
    32         vecThread.emplace_back(std::thread([&io_svc](){
    33             LOG4CPLUS_INFO_FMT(gLog, "thread start...");
    34             io_svc.run();
    35             LOG4CPLUS_INFO_FMT(gLog, "thread finish.");
    36         }));
    37     }
    38 
    39     for (int i = 0; i < vecThread.size(); ++i)
    40     {
    41         vecThread[i].join();
    42     }
    43     assert(io_svc.stopped());
    44 
    45     return 0;
    46 }
    View Code

    client/asio_client.h

     1 // client/asio_client.h
     2 
     3 #pragma once
     4 
     5 #include <string>
     6 #include <cstdint>
     7 #include <boost/asio.hpp>
     8 #include <boost/enable_shared_from_this.hpp>
     9 #include <boost/noncopyable.hpp>
    10 #include <boost/make_shared.hpp>
    11 #include <boost/function.hpp>
    12 #include <boost/bind.hpp>
    13 
    14 #include <common/pools.h>
    15 
    16 class asio_client : public boost::enable_shared_from_this<asio_client>
    17                   , public boost::noncopyable
    18 {
    19 public:
    20     asio_client(boost::asio::io_service& io_svc, 
    21         const std::string &str_svr_ip, uint16_t svr_port);
    22     ~asio_client() = default;
    23 
    24     void async_connect();
    25 
    26 protected:
    27     void handle_connect(const boost::system::error_code &ec);
    28 
    29     void async_write();
    30 
    31     void handle_write_head(
    32         const std::shared_ptr<std::string> sp_data_proto,
    33         const boost::system::error_code &ec,
    34         std::size_t bytes_transfered);
    35 
    36     // 这里的sp_data_proto 在函数中并不需要使用,用它作参数的唯一作用,就是保留它的生命周期,
    37     // 保证在数据写完之前它不会被析构。
    38     // 因为,如果该对象在async_write 还未写之前就被析构的话,就会造成数据的错乱,最终端的数据是错误的。
    39     void handle_write_proto(
    40         const std::shared_ptr<std::string> sp_data_proto,
    41         const boost::system::error_code &ec, 
    42         std::size_t bytes_transfered);
    43 
    44 private:
    45     boost::asio::io_service&        m_io_svc;
    46     boost::asio::ip::tcp::endpoint  m_svr_ep;
    47     boost::shared_ptr<boost::asio::ip::tcp::socket> m_conn;
    48 }
    View Code

    client/asio_client.cpp

      1 // client/asio_client.cpp
      2 
      3 #include "asio_client.h"
      4 #include <Log.h>
      5 #include <common.pb.h>
      6 
      7 extern log4cplus::Logger gLog;
      8 
      9 asio_client::asio_client(
     10     boost::asio::io_service& io_svc,
     11     const std::string &str_svr_ip,
     12     uint16_t svr_port)
     13     : m_io_svc(io_svc)
     14 {
     15     if (str_svr_ip.empty())
     16         std::abort();
     17 
     18     using namespace boost::asio::ip;
     19     address_v4 addr(address_v4::from_string(str_svr_ip));
     20     m_svr_ep.address(addr);
     21     m_svr_ep.port(svr_port);
     22 
     23     m_conn = boost::make_shared<boost::asio::ip::tcp::socket>(boost::ref(m_io_svc));
     24 }
     25 
     26 void asio_client::async_connect()
     27 {
     28     LOG4CPLUS_INFO_FMT(gLog, "async_connect waitting...");
     29 
     30     boost::function<void(const boost::system::error_code &)> cb_connect 
     31         = boost::bind(&asio_client::handle_connect, shared_from_this(), _1);
     32     new_sock->async_connect(m_svr_ep, cb_connect);
     33 }
     34 
     35 void asio_client::handle_connect(const boost::system::error_code &ec)
     36 {
     37     if (ec != 0)
     38     {
     39         LOG4CPLUS_INFO(gLog, "connect failed: " << ec.message());
     40         return ;
     41     }
     42     LOG4CPLUS_INFO(gLog, "connect success, server: " << m_conn->remote_endpoint());
     43 
     44     async_write();
     45 }
     46 
     47 void asio_client::async_write()
     48 {
     49 #if 0
     50 message MessageHead
     51 {
     52     optional uint32 FunCode = 1;
     53     optional uint32 RequestID = 2;
     54     optional uint32 AccountId = 3;
     55     optional uint32 AccountId = 4;
     56     optional int64  ClientTime = 5;
     57     optional uint32 GoodsId = 6;
     58     optional bytes UUID = 7;
     59 }
     60 #endif
     61 
     62     if (!m_conn->is_open())
     63     {
     64         LOG4CPLUS_INFO(gLog, "socket was not opened.");
     65         return ;
     66     }
     67 
     68     if (ec != 0)
     69     {
     70         if (ec == boost::asio::error::eof)
     71             LOG4CPLUS_INFO(gLog, "Disconnect from " << m_conn->remote_endpoint());
     72         else
     73             LOG4CPLUS_INFO(GLog, "Error on receive: " << ec.message());
     74 
     75         return ;
     76     }
     77 
     78     MessageHead pro;
     79 
     80     pro.set_funcode(9527);
     81     pro.set_requestid(10081);
     82     pro.set_accountid(49005);
     83     pro.set_clienttime(time(NULL));
     84     pro.set_uuid(std::string("uuid_500384"));
     85 
     86     std::shared_ptr<std::string> sp_data = createSharedString();
     87     if (!pro.SeralizeToString(sp_data.get()))
     88     {
     89         LOG4CPLUS_ERROR_FMT(gLog, "SeraializeToString failed.");
     90         return;
     91     }
     92 
     93     LOG4CPLUS_INFO_FMT(gLog, "data.size() = %lld", sp_data->size());
     94     if (sp_data->size() == 0)
     95         return;
     96 
     97     std::shared_ptr<head> sp_head = createSharedHead();
     98     sp_head->set_len_body((int32_t)sp_data->size());
     99     sp_head->serialize();
    100 
    101     boost::function<void(const boost::system::error_code &, std::size_t)> cb_write_head
    102         = boost::bind(&asio_client::headle_write_head, shared_from_this(), sp_data, _1, _2);
    103     boost::asio::async_write(
    104         *m_conn, boost::asio::buffer(sp_head->get_data(), head::HEAD_LENGTH), cb_write_head);
    105 }
    106 
    107 void asio_client::handle_write_head(
    108     const std::shared_ptr<std::string> sp_data_proto,
    109     const boost::system::error_code &ec,
    110     std::size_t bytes_transfered)
    111 {
    112     if (!m_conn->is_open())
    113     {
    114         LOG4CPLUS_INFO(gLog, "socket was not opended.");
    115         return;
    116     }
    117 
    118     if (ec != 0)
    119     {
    120         if (ec == boost::asio::error::eof)
    121             LOG4CPLUS_INFO(gLog, "Disconnect from " << m_conn->remote_endpoint());
    122         else
    123             LOG4CPLUS_INFO(gLog, "Error on receive: " << ec.message());
    124 
    125         return;
    126     }
    127 
    128     boost::function<void(const boost::system::error_code &, std::size_t)> cb_write_proto
    129         = boost::bind(&asio_client::handle_write_proto, shared_from_this(), sp_data_proto, _1, _2);
    130     boost::asio::async_write(*m_conn, boost::asio::buffer(*sp_data_proto), cb_write_proto);
    131 }
    132 
    133 void asio_client::handle_write_proto(
    134     const std::shared_ptr<std::string> sp_data_proto,
    135     const boost::system::error_code &ec,
    136     std::size_t bytes_transfered)
    137 {
    138     if (!m_conn->is_open())
    139     {
    140         LOG4CPLUS_INFO(gLog, "socket was not opened.");
    141         return;
    142     }
    143 
    144     if (ec != 0)
    145     {
    146         if (ec == boost::asio::error::eof)
    147             LOG4CPLUS_INFO(gLog, "Disconnect from " << m_conn->remote_endpoint());
    148         else
    149             LOG4CPLUS_INFO(gLog, "Error on receive: " << ec.message());
    150 
    151         return;
    152     }
    153 
    154     LOG4CPLUS_INFO(gLog, "write proto finished.");
    155     // 数据写完了之后,可以读对端发送过来的数据。
    156     // 如果 不再读对端的数据,直接该socket 将会被断开。
    157     // async_read_head();
    158 }
    View Code
  • 相关阅读:
    常用设计模式:装饰者模式
    常用数据结构算法 : 堆排序
    常用数据结构算法:二叉树的最近公共祖先
    java网络通信:HTTP协议 之 Sessions与Cookies
    java网络通信:HTTP协议
    常见的设计模式:工厂模式
    Java基础:类加载机制
    一个C++右值引用的问题
    剖析一个用C++写的行情交易系统
    C++ Coroutine简明教程
  • 原文地址:https://www.cnblogs.com/suyunhong/p/7171267.html
Copyright © 2011-2022 走看看