zoukankan      html  css  js  c++  java
  • Boost ASIO 实现异步IO远控

    Boost 利用ASIO框架实现一个跨平台的反向远控程序,该远控支持保存套接字,当有套接字连入时,自动存储到map容器,当客户下线时自动从map容器中移除,当我们需要与特定客户端通信时,只需要指定客户端ID号即可。

    客户端代码 客户端代码如下,心跳检测,异步请求。

    #define BOOST_BIND_GLOBAL_PLACEHOLDERS
    #include <iostream>
    #include <string>
    #include <boost/asio.hpp> 
    #include <boost/bind.hpp>  
    #include <boost/array.hpp>
    #include <boost/date_time/posix_time/posix_time_types.hpp>  
    #include <boost/noncopyable.hpp>
    
    using namespace std;
    using boost::asio::ip::tcp;
    
    // 异步连接地址与端口
    class AsyncConnect
    {
    public:
        AsyncConnect(boost::asio::io_service& ios, tcp::socket &s)
            :io_service_(ios), timer_(ios), socket_(s) {}
    
        // 异步连接
        bool aysnc_connect(const tcp::endpoint &ep, int million_seconds)
        {
            bool connect_success = false;
    
            // 异步连接,当连接成功后将触发 connect_handle 函数
            socket_.async_connect(ep, boost::bind(&AsyncConnect::connect_handle, this, _1, boost::ref(connect_success)));
    
            // 设置一个定时器  million_seconds 
            timer_.expires_from_now(boost::posix_time::milliseconds(million_seconds));
            bool timeout = false;
    
            // 异步等待 如果超时则执行 timer_handle
            timer_.async_wait(boost::bind(&AsyncConnect::timer_handle, this, _1, boost::ref(timeout)));
            do
            {
                // 等待异步操作完成
                io_service_.run_one();
                // 判断如果timeout没超时,或者是连接建立了,则不再等待
            } while (!timeout && !connect_success);
            timer_.cancel();
            return connect_success;
        }
    
        // 验证服务器端口是否开放
        bool port_is_open(std::string address, int port, int timeout)
        {
            try
            {
                boost::asio::io_service io;
                tcp::socket socket(io);
                AsyncConnect hander(io, socket);
                tcp::endpoint ep(boost::asio::ip::address::from_string(address), port);
                if (hander.aysnc_connect(ep, timeout))
                {
                    io.run();
                    io.reset();
                    return true;
                }
                else
                {
                    return false;
                }
            }
            catch (...)
            {
                return false;
            }
        }
    
    private:
        // 如果连接成功了,则 connect_success = true
        void connect_handle(boost::system::error_code ec, bool &connect_success)
        {
            if (!ec)
            {
                connect_success = true;
            }
        }
    
        // 定时器超时timeout = true
        void timer_handle(boost::system::error_code ec, bool &timeout)
        {
            if (!ec)
            {
                socket_.close();
                timeout = true;
            }
        }
        boost::asio::io_service &io_service_;
        boost::asio::deadline_timer timer_;
        tcp::socket &socket_;
    };
    
    int main(int argc, char * argv[])
    {
        try
        {
            boost::asio::io_service io;
            tcp::socket socket(io);
            AsyncConnect hander(io, socket);
            boost::system::error_code error;
            tcp::endpoint ep(boost::asio::ip::address::from_string("127.0.0.1"), 10000);
    
            // 循环验证是否在线
        while (1)
            {
                // 验证是否连接成功,并定义超时时间为5秒
                if (hander.aysnc_connect(ep, 5000))
                {
                    io.run();
                    std::cout << "已连接到服务端." << std::endl;
    
                    // 循环接收命令
                    while (1)
                    {
                        // 验证地址端口是否开放,默认等待5秒
                        bool is_open = hander.port_is_open("127.0.0.1", 10000,5000);
    
                        // 客户端接收数据包
                        boost::array<char, 4096> buffer = { 0 };
    
                        // 如果在线则继续执行
                        if (is_open == true)
                        {
                            socket.read_some(boost::asio::buffer(buffer), error);
    
                            // 判断收到的命令是否为GetCPU
                            if (strncmp(buffer.data(), "GetCPU", strlen("GetCPU")) == 0)
                            {
                                std::cout << "获取CPU参数并返回给服务端." << std::endl;
                                socket.write_some(boost::asio::buffer("CPU: 15 %"));
                            }
    
                            // 判断收到的命令是否为GetMEM
                            if (strncmp(buffer.data(), "GetMEM", strlen("GetMEM")) == 0)
                            {
                                std::cout << "获取MEM参数并返回给服务端." << std::endl;
                                socket.write_some(boost::asio::buffer("MEM: 78 %"));
                            }
    
                            // 判断收到的命令是否为终止程序
                            if (strncmp(buffer.data(), "Exit", strlen("Exit")) == 0)
                            {
                                std::cout << "终止客户端." << std::endl;
                                return 0;
                            }
                        }
                        else
                        {
                        }
                    }
                }
                else
                {
                    std::cout << "连接失败,正在重新连接." << std::endl;
                }
            }
        }
        catch (...)
        {
            return false;
        }
    
        std::system("pause");
        return 0;
    }
    

    服务端代码: 服务器端,通过异步IO接口,当上线后直接回调输出,发送数据使用同步机制。

    #include <string>
    #include <vector>
    #include <iostream>
    
    using namespace std;
    
    class Handler : public TcpServer
    {
    public:
        // 客户端连接时触发
        void ClientConn(int clientId)
        {
            // 将登录客户端加入到容器中
            // cout << "客户端ID: " << clientId << endl;
            tcp_client_id.push_back(clientId);
        }
    
        // 客户端退出时触发
        void ClientDis(int clientId)
        {
            // 将登出的客户端从容器中移除
            // cout << "客户端ID: " << clientId << endl;
            vector<int>::iterator item = find(tcp_client_id.begin(), tcp_client_id.end(), clientId);
            if (item != tcp_client_id.cend())
                tcp_client_id.erase(item);
        }
    
        // 客户端获取数据
        void ReceiveData(int clientId, const BYTE* data, size_t length)
        {
            // cout << "客户端ID: " << clientId << endl;
            std::cout <<"返回数据: " <<  data << " 返回长度: " << length << std::endl;
        }
    };
    
    // 同步发送数据到指定的线程中
    void send_message(string message)
    {
        BYTE* buf = new BYTE(message.length() + 1);
        // std::cout << "长度: " << message.length() << std::endl;
        int i;
        for (int i = 0; i<message.length(); i++)
        {
            buf[i] = message.at(i);
        }
        std::cout << "发送数据: " << buf << std::endl;
        tcpServer.Send(clientId, buf, message.length());
    }
    
    int main(int argc, char* argv[])
    {
        std::map<int, std::string> ptr;
        TcpServer tcpServer(10, 10000);
    
        string flag;
        while (1)
        {
            _sleep(1000);
            std::cout << "输入命令: ";
            cin >> flag;
            // 发送一条测试消息
            if (flag == "send")
            {
                int the_id;
                std::cout << "输入发送序号: ";
                std::cin >> the_id;
                send_message(tcpServer, the_id, "hello lyshark");
            }
    
            // 输出当前在线的IP地址列表
            else if (flag == "list")
            {
                std::cout << "--------------------------------------------" << std::endl;
                for (int x = 0; x < tcp_client_id.size(); x++)
                {
                    std::cout << "客户端序号: " << tcp_client_id[x]
                    << "  客户端IP: " << tcpServer.GetRemoteAddress(tcp_client_id[x]) 
                    << std::endl;
                }
                std::cout << "--------------------------------------------" << std::endl;
            }
        }
        return 0;
    }
    


    许可协议: 文章中的代码均为学习时整理的笔记,博客中除去明确标注有参考文献的文章,其他文章【均为原创】作品,转载请务必【添加出处】,您添加出处是我创作的动力!
    反制措施: 《点我预览协议》
  • 相关阅读:
    flume sink两种类型 file_rool 自定义sing com.mycomm.MySink even if there is only one event, the event has to be sent in an array
    为什么引入进程20年后,又引入线程?
    As of Flume 1.4.0, Avro is the default RPC protocol.
    Google Protocol Buffer 的使用和原理
    Log4j 2
    统一日志 统一订单
    网站行为跟踪 Website Activity Tracking Log Aggregation 日志聚合 In comparison to log-centric systems like Scribe or Flume
    Percolator
    友盟吴磊:移动大数据平台的架构、实践与数据增值
    Twitter的RPC框架Finagle简介
  • 原文地址:https://www.cnblogs.com/LyShark/p/14738086.html
Copyright © 2011-2022 走看看