zoukankan      html  css  js  c++  java
  • boost asio tcp 多线程异步读写,服务器与客户端。

      1 // server.cpp
      2 
      3 #if 0
      4 多个线程对同一个io_service 对象处理
      5 用到第三方库:log4cplus, google::protobuf
      6 用到C++11的特性,Windows 需要用到vs2013 gcc 4.8
      7 #endif
      8 
      9 #include <iostream>
     10 #include <thread>
     11 #include <vector>
     12 
     13 #include <boost/asio.hpp>
     14 
     15 #include <boost/shared_array.hpp>
     16 #include <boost/make_shared.hpp>
     17 #include <boost/function.hpp>
     18 #include <boost/bind.hpp>
     19 
     20 #include <common.pb.h>
     21 
     22 void async_accept();
     23 void handle_accept(boost::shared_ptr<boost::asio::ip::tcp::socket> new_conn,
     24     const boost::system::error_code &ec);
     25 void async_read_head(boost::shared_ptr<boost::asio::ip::tcp::socket> conn);
     26 void handle_head(
     27     boost::shared_ptr<boost::asio::ip::tcp::socket> conn,
     28     boost::shared_array<char> sa_len,
     29     const boost::system::error_code &ec,
     30     std::size_t bytes_transfered);
     31 void async_read_proto(boost::shared_ptr<boost::asio::ip::tcp::socket> conn, int32_t len);
     32 void handle_proto(
     33     boost::shared_ptr<boost::asio::ip::tcp::socket> conn,
     34     boost::shared_array<char> sa_data,
     35     const boost::system::error_code &ec,
     36     std::size_t bytes_transfered);
     37 
     38 boost::asio::io_service io_svc;
     39 boost::asio::ip::address_v4 lis_ip; // 默认监听本机所有IP
     40 boost::asio::ip::tcp::endpoint lis_ep(lis_ip, 20017);
     41 boost::asio::ip::tcp::acceptor acceptor(io_svc, lis_ep);
     42 
     43 #include <Log.h>    // log4cplus 的相关头文件,这里不再一个一个敲出来了。
     44 
     45 log4cplus::Logger *gLog = nullptr;
     46 
     47 static const int PACKAGE_LENGTH = 16;
     48 
     49 int main(int argc, char *argv[])
     50 {
     51     log4cplus::initialize();
     52 
     53     static log4cplus::Logger s_log = log4cplus::Logger::getInstance("server");
     54     gLog = &s_log;
     55 
     56     LOG4CPLUS_INFO_FMT(*gLog, "main begin...");
     57 
     58     for (int i = 0; i < 5; ++i)
     59     {
     60         async_accept();
     61     }
     62 
     63     // 捕获信号
     64     boost::asio::signal_set signals_(io_svc);
     65     signals_.add(SIGINT);
     66     signals_.add(SIGTERM);
     67     signals_.async_wait([](const boost::system::error_code &ec, int sig)
     68     {
     69         LOG4CPLUS_INFO_FMT(*gLog, "signal: %d, error_message: %s", 
     70             sig, ec.message().c_str());
     71         io_svc.stop();
     72     });
     73 
     74     std::vector<std::thread> vecThread;
     75     for (int i = 0; i < 10; ++i)
     76     {
     77         vecThread.emplace_back(std::thread([](){
     78             LOG4CPLUS_INFO_FMT(*gLog, "thread start...");
     79             io_svc.run();
     80             LOG4CPLUS_INFO_FMT(*gLog, "thread finished.");
     81         }));
     82     }
     83 
     84     for (size_t i = 0; i < vecThread.size(); ++i)
     85     {
     86         vecThread[i].join();
     87     }
     88     assert(io_svc.stopped();
     89 
     90 #ifdef WIN32
     91     system("pause");
     92 #endif
     93 
     94     return 0;
     95 }
     96 
     97 // 标记异步监听,投放到指定io_service 对象中
     98 void async_accept()
     99 {
    100     LOG4CPLUS_INFO_FMT(*gLog, "async_accept waitting...");
    101 
    102     boost::shared_ptr<boost::asio::ip::tcp::socket> new_sock 
    103         = boost::make_shared<boost::asio::ip::tcp::socket>(boost::ref(ios_svc));
    104 
    105     boost::function<void(const boost::system::error_code &> cb_accept;
    106     cb_accept = boost::bind(handle_accept, new_sock, _1);
    107     acceptor.async_accept(*new_sock, cb_accept);
    108 }
    109 
    110 // 监听返回的处理
    111 void handle_accept(boost::shared_ptr<boost::asio::ip::tcp::socket> new_conn,
    112     const boost::system::error_code &ec)
    113 {
    114     if (ec != 0)
    115     {
    116         LOG4CPLUS_INFO(*gLog, "accept failed: " << ec.message());
    117 
    118         return;
    119     }
    120     LOG4CPLUS_INFO(*gLog, "a new client connected. " << new_conn->remote_endpoint());
    121 
    122     async_read_head(new_conn);
    123 
    124     // 处理下一个连接,每次处理完了之后,需要再次accept.
    125     // 否则io_service 将只处理一次,然后结束监听。
    126     // 所以这里可以处理一个情况,就是当你要结束监听的时候,人要在这里return
    127     // 那么io_service 的run() 函数就会stop. 但如果还有其他的异步操作被记录, 
    128     // run() 函数还是会继续运行,以处理其他的异步操作。
    129     async_accept();
    130 }
    131 
    132 // 对一个指定的连接标记异步读头部,然后投放到io_service 对象
    133 void async_read_head(boost::shared_ptr<boost::asio::ip::tcp::socket> conn)
    134 {
    135     // 固定报文头长度为${PACKAGE_LENGTH} 个字节
    136     boost::shared_array<char> sa_len(new char[PACKAGE_LENGTH]);
    137 
    138     // 回调函数
    139     boost::function<void(const boost::system::error_code &, std::size_t)> cb_msg_len;
    140     cb_msg_len = boost::bind(handle_head, conn, sa_len, _1, _2);
    141 
    142     // 异步读,读一个报文的长度,boost::asio::async_read() 函数有个特点,
    143     // 它会将这里指定的buffer 缓冲区读满了才会回调handle_head 函数。
    144     boost::asio::async_read(
    145         *conn, boost::asio::buffer(sa_len.get(), PACKAGE_LENGTH), cb_msg_len);
    146 }
    147 
    148 // 头部数据完整读取后的处理函数
    149 void handle_head(
    150     boost::shared_ptr<boost::asio::ip::tcp::socket> conn, 
    151     boost::shared_array<char> sa_len, 
    152     const boost::system::error_code &ec, 
    153     std::size_t bytes_transfered)
    154 {
    155     if (!conn->is_open())
    156     {
    157         LOG4CPLUS_INFO(*gLog, "socket was not opened.");
    158         return ;
    159     }
    160 
    161     if (ec != 0)
    162     {
    163         if (ec == boost::asio::error::eof)
    164         {
    165             LOG4CPLUS_INFO(*gLog, "Disconnect from " << conn->remote_endpoint());
    166         }
    167         else
    168         {
    169             LOG4CPLUS_INFO(*gLog, "Error on receive: " << ec.message());
    170         }
    171 
    172         return ;
    173     }
    174 
    175     // 这里对的数据做处理
    176     assert(bytes_transfered == PACKAGE_LENGTH);
    177     int32_t len_net = 0;    // 网络字节序:数据部分长度
    178     int32_t len_loc = 0;    // 本地字节序:数据部分长度
    179     memcpy(&len_net, sa_len.get(), sizeof(len_net));
    180     len_loc = boost::asio::detail::socket_ops::network_to_host_long(len_net);
    181     LOG4CPLUS_INFO_FMT(*gLog, "nLenLoc: %d", len_loc);
    182 
    183     async_read_proto(conn, len_loc);
    184 }
    185 
    186 // 对一个指定的连接标记异步读数据部,然后投放到io_service 对象
    187 void async_read_proto(boost::shared_ptr<boost::asio::ip::tcp::socket> conn, int32_t len)
    188 {
    189     // 数据部分
    190     boost::shared_array<char> sa_data(new char[len]());
    191 
    192     // 回调函数
    193     boost::function<void(const boost::system::error_code &, std::size_t)> cb_proto;
    194     cb_proto = boost::bind(handle_proto, conn, sa_data, _1, _2);
    195 
    196     boost::asio::async_read(*conn, 
    197         boost::asio::buffer(sa_data.get(), len), cb_proto);
    198 }
    199 
    200 // 数据部分读完整后的处理函数
    201 void handle_proto(
    202     boost::shared_ptr<boost::asio::ip::tcp::socket> conn, 
    203     boost::shared_array<char> sa_data, 
    204     const boost::system::error_code &ec, 
    205     std::size_t bytes_transfered)
    206 {
    207     if (!conn->is_open())
    208     {
    209         LOG4CPLUS_INFO(*gLog, "socket was not opened.");
    210         return ;
    211     }
    212 
    213     if (ec != 0)
    214     {
    215         if (ec == boost::asio::error::eof)
    216         {
    217             LOG4CPLUS_INFO(*gLog, "Disconnect from " << conn->remote_endpoint());
    218         }
    219         else
    220         {
    221             LOG4CPLUS_INFO(*gLog, "Error on receive: " << ec.message());
    222         }
    223         return ;
    224     }
    225 
    226     // 处理这个proto 数据
    227     // 这里将这个数组转换成一个proto, 然后处理这个proto
    228     MessageHead pro;
    229     if (!pro.ParseFromArray(sa_data.get(), (int32_t)bytes_transfered))
    230     {
    231         LOG4CPLUS_ERROR_FMT(*gLog, "ParseFromArray() failed");
    232         return ;
    233     }
    234 
    235     int port = conn->remote_endpoint().port();
    236     LOG4CPLUS_INFO_FMT(*gLog, "port: %d
    %s", port, pro.DebugString().c_str()0;
    237 
    238     // 处理完了之后,类似accept 的异步调用一样,需要继续调用异步的读数据
    239     // 同样的,如果要结束一个连接,正常的结算应该在这里return 调用。
    240     // 当然了,使用socket 的close(), shut_down() 函数也可以关闭这个连接。
    241     async_read_head(conn);
    242 }
    View Code
      1 // client.cpp
      2 
      3 #include <iostream>
      4 #include <thread>
      5 #include <vector>
      6 
      7 #include <boost/asio.hpp>
      8 
      9 #include <boost/shared_array.hpp>
     10 #include <boost/make_shared.hpp>
     11 #include <boost/function.hpp>
     12 #include <boost/bind.hpp>
     13 
     14 #include <boost/pool/pool.hpp>
     15 #include <boost/pool/singleton_pool.hpp>
     16 
     17 // proto buffer 生成的头文件
     18 #include <common.pb.h>
     19 
     20 void async_connect();
     21 void handle_connect(boost::shared_ptr<boost::asio::ip::tcp::socket> new_conn,
     22     const boost::system::error_code &ec);
     23 void async_write(boost::shared_ptr<boost::asio::ip::tcp::socket> conn);
     24 void handle_write_head(boost::shared_ptr<boost::asio::ip::tcp::socket> conn,
     25     const std::shared_ptr<std::string> sp_data_proto,
     26     const boost::system::error_code &ec,
     27     std::size_t bytes_transfered);
     28 void handle_write_proto(boost::shared_ptr<boost::asio::ip::tcp::socket> conn,
     29     const std::shared_ptr<std::string> sp_data_proto, 
     30     const boost::system::error_code &ec, std::size_t bytes_transfered);
     31 
     32 
     33 
     34 boost::asio::io_service             io_svc;
     35 boost::asio::ip::tcp::endpoint      svr_ep(
     36     boost::asio::ip::address_v4::from_string("127.0.0.1"), 20017);
     37 
     38 #include <Log.h>    // log4cplus 的相关头文件,这里不再一个一个敲出来了。
     39 
     40 log4cplus::Logger *gLog = nullptr;  // 应该是直接使用对象,但是懒得改就保留了。
     41 
     42 // 包头固定长度
     43 static const int PACKAGE_LENGTH = 16;
     44 
     45 using pool_head   = boost::singleton_pool<struct struHead, PACKAGE_LENGTH>;
     46 using pool_string = boost::singleton_pool<struct struString, sizeof(std::string)>;
     47 
     48 std::shared_ptr<std::string> createSharedString()
     49 {
     50     std::shared_ptr<std::string> spTp(new (pool_string::malloc()) std::string, 
     51         [](std::string *tp)
     52     {
     53         tp->~basic_string();
     54         pool_string::free(tp);
     55     });
     56 
     57     return spTp;
     58 }
     59 
     60 int main(int argc, char *argv[])
     61 {
     62     log4cplus::initialize();
     63 
     64     static log4cplus::Logger s_log = log4cplus::Logger::getInstance("client");
     65     gLog = &s_log;
     66     assert(gLog != nullptr);
     67 
     68     LOG4CPLUS_INFO_FMT(*gLog, "main begin...");
     69 
     70     for (int i = 0; i < 50; ++i)
     71     {
     72         async_connect();
     73     }
     74 
     75     std::vector<std::thread> vecThread;
     76     for (int i = 0; i < 5; ++i)
     77     {
     78         vecThread.emplace_back(std::thread([]() {
     79             LOG4CPLUS_INFO_FMT(*gLog, "thread start...");
     80             io_svc.run();
     81             LOG4CPLUS_INFO_FMT(*gLog, "thread finished.");
     82         }));
     83     }
     84 
     85     for (size_t i = 0; i < vecThread.size(); ++i)
     86     {
     87         vecThread[i].join();
     88     }
     89     assert(io_svc.stopped());
     90 
     91 #ifdef WIN32
     92     system("pause");
     93 #endif
     94 
     95 
     96     return 0;
     97 }
     98 
     99 void async_connect()
    100 {
    101     LOG4CPLUS_INFO_FMT(*gLog, "async_connect waitting...");
    102 
    103     boost::shared_ptr<boost::asio::ip::tcp::socket> new_sock
    104         = boost::make_shared<boost::asio::ip::tcp::socket>(boost::ref(io_svc));
    105 
    106     new_sock->async_connect(svr_ep, boost::bind(
    107         handle_connect, new_sock, 
    108         boost::asio::placeholders::error));
    109 }
    110 
    111 void handle_connect(boost::shared_ptr<boost::asio::ip::tcp::socket> new_conn, 
    112     const boost::system::error_code &ec)
    113 {
    114     if (ec != 0)
    115     {
    116         LOG4CPLUS_INFO(*gLog, "connect failed: " << ec.message());
    117         return ;
    118     }
    119 
    120     LOG4CPLUS_INFO(*gLog, "connect success, server: " << new_conn->remote_endpoint());
    121 
    122     async_write(new_conn);
    123 }
    124 
    125 #if 0
    126 message messageHead
    127 {
    128     optional uint32 FunCode     = 1;
    129     optional uint32 RequestID   = 2;
    130     optional uint32 AccountId   = 3;
    131     optional uint32 AccessId    = 4;
    132     optional int64  ClientTime  = 5;
    133     optional uint32 GoodsId     = 6;
    134     optional bytes  UUID        = 7;
    135 }
    136 #endif
    137 
    138 void async_write(boost::shared_ptr<boost::asio::ip::tcp::socket> conn)
    139 {
    140     MessageHead pro;
    141     pro.set_funcode(9527);
    142     pro.set_requestid(10081);
    143     pro.set_accountid(49005);
    144     pro.set_clienttime(time(NULL));
    145     pro.set_goodsid(35023);
    146     pro.set_uuid(std::string("uuid_500384"));
    147 
    148     std::shared_ptr<std::string> sp_data = createSharedString();
    149     if (!pro.SerializeToString(sp_data.get())
    150     {
    151         LOG4CPLUS_ERROR_FMT(*gLOg, "SerializeToString failed.");
    152 
    153         return ;
    154     }
    155 
    156     LOG4CPLUS_INFO_FMT(*gLog, "data.size() = %lld", sp_data->size());
    157 
    158     char    ch_head[PACKAGE_LENGTH] = {};
    159     int32_t len_net = boost::asio::detail::socket_ops::host_to_network_long((int32_t)sp_data->size());
    160     memcpy(ch_head, &len_net, sizeof(len_net));
    161 
    162     if (sp_data->size() == 0)
    163     {
    164         return ;
    165     }
    166 
    167     boost::function<void(const boost::system::error_code&, std::size_t)> cb_write_head;
    168     cb_write_head = boost::bind(handle_write_head, conn, sp_data, _1, _2);
    169     boost::asio::async_write(*conn, boost::asio::buffer(ch_head, PACKAGE_LENGTH), cb_write_head);
    170 }
    171 
    172 void handle_write_head(boost::shared_ptr<boost::asio::ip::tcp::socket> conn, 
    173     const std::shared_ptr<std::string> sp_data_proto,
    174     const boost::system::error_code &ec, 
    175     std::size_t bytes_transfered)
    176 {
    177     if (!conn->is_open())
    178     {
    179         LOG4CPLUS_INFO(*gLog, "socket was not opened.");
    180         return;
    181     }
    182 
    183     if (ec != 0)
    184     {
    185         if (ec == boost::asio::error::eof)
    186         {
    187             LOG4CPLUS_INFO(*gLog, "Disconnect from " << conn->remote_endpoint());
    188         }
    189         else
    190         {
    191             LOG4CPLUS_INFO(*gLog, "Error on receive: " << ec.message());
    192         }
    193 
    194         return ;
    195     }
    196 
    197     boost::function<void(const boost::system::error_code&, std::size_t)> cb_write_proto;
    198     cb_write_proto = boost::bind(handle_write_proto, conn, sp_data_proto, _1, _2);
    199     boost::asio::async_write(*conn, boost::asio::buffer(*sp_data_proto), cb_write_proto);
    200 }
    201 
    202 // 这里的sp_data_proto 在该函数中并不需要使用,用它作参数的唯一作用,就是保留它的生命周期,
    203 // 保证在数据写完之前它不会被析构。
    204 // 因为,如果该对象在async_write 还未写完之前就被析构的话, 就会造成数据的错乱,最终导致对端的数据是错误的。
    205 void handle_write_proto(boost::shared_ptr<boost::asio::ip::tcp::socket> conn, 
    206     const std::shared_ptr<std::string> sp_data_proto, 
    207     const boost::system::error_code &ec, std::size_t bytes_transfered)
    208 {
    209     if (!conn->is_open())
    210     {
    211         LOG4CPLUS_INFO(*gLog, "socket was not opened.");
    212         return ;
    213     }
    214 
    215     if (ec != 0)
    216     {
    217         if (ec == boost::asio::error::eof)
    218         {
    219             LOG4CPLUS_INFO(*gLog, "Disconnect from " << conn->remote_endpoint());
    220         }
    221         else
    222         {
    223             LOG4CPLUS_INFO(*gLog, "Error on receive: " << ec.message());
    224         }
    225 
    226         return ;
    227     }
    228 
    229     LOG4CPLUS_INFO(*gLog, "write proto finished.");
    230     // 数据写完了之后,可以读对端发送过来的数据。
    231     // 如果不再读对端的数据,直接该socket 将会被断开。
    232     //async_read_head(conn);
    233 }
    View Code
  • 相关阅读:
    Eureka 系列(04)客户端源码分析
    Eureka 系列(03)Spring Cloud 自动装配原理
    Eureka 系列(02)Eureka 一致性协议
    Eureka 系列(01)最简使用姿态
    Feign 系列(05)Spring Cloud OpenFeign 源码解析
    python 线程,进程与协程
    Python IO多路复用
    python 作用域
    python 网络编程:socket(二)
    python 网络编程:socket
  • 原文地址:https://www.cnblogs.com/suyunhong/p/7120882.html
Copyright © 2011-2022 走看看