zoukankan      html  css  js  c++  java
  • Rabbitmq解决连接时阻塞的问题(amqp_open_socket)

    在使用接口Channel::Create()连接到rabbitmq时,如果网络中断或者ip端口地址不对的时候,程序就会一直阻塞在这个调用上,没有 返回值没有异常提示,这种情况如果你想提示个错误什么的就无能为力了,Panda工作中也遇到这个问题,我想:如果他能提供一个连接超时异常就好了,毕竟 SimpleAmqpClient只是对另外一个c语言开源项目rabbitmq-c的封装,而且我记得rabbitmq-c是支持我所说的功能的。下面 请跟随我一起一步一步完成这个事情吧。

     1
    1 int m_nSockfd;
    2 int m_nChannelIdSend;
    3 int m_nChannelIdReve;
    4 int m_nChannelIdResult;
    5 amqp_connection_state_t m_Connection;
    6 amqp_bytes_t m_stReply_to_queue;
    View Code
           m_Connection = amqp_new_connection();
     2     m_nSockfd = amqp_open_socket(m_strIp.toLocal8Bit().data(), m_nPort);
     3     amqp_set_sockfd(m_Connection, m_nSockfd);
     4     amqp_login(m_Connection, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN,m_strRabbitUser.toLocal8Bit().data(), m_strRabbitPwd.toLocal8Bit().data());
     5 
     6     //生产者
     7     amqp_channel_open(m_Connection, m_nChannelIdSend);
     8     amqp_get_rpc_reply(m_Connection);
     9     amqp_exchange_declare(m_Connection, m_nChannelIdSend, amqp_cstring_bytes("ping") , Type,
    10                           0,1,0,0, amqp_empty_table);//绑定交换器 amqp_cstring_bytes("ping")
    11 
    12     m_strExchange = "ping";
    13     m_strRoutingkey = "rpc";
    14     m_pProducer1 = new CMqProducerThread(m_Connection, m_nChannelIdSend, m_strExchange, m_strRoutingkey, this);
    15     connect(m_pProducer1, SIGNAL(SendProcess(int, QString)), this, SLOT(SetProcess(int, QString)));
    16     m_pProducer1->start();
    17 
    18     //测试结果上传
    19     amqp_channel_open(m_Connection, m_nChannelIdResult);
    20     amqp_get_rpc_reply(m_Connection);
    21     amqp_exchange_declare(m_Connection, m_nChannelIdResult, amqp_cstring_bytes("testResult") , Type,
    22                           0,1,0,0, amqp_empty_table);
    23     m_strExchange = "testResult";
    24     m_strRoutingkey = "result";
    25     m_pResoultThread = new MQResultThread(m_Connection, m_nChannelIdResult, m_strExchange, m_strRoutingkey, this);

    先来看一下Channel::Channel(…) 

    然后在rabbitmq-c项目头文件amqp.h中找到创建非阻塞socket的函数

    代码实现
    有方向了,终于可以快乐的写代码o(∩_∩)o 。根据设计模式的开闭原则:我们做的事情更好的是扩展而不是修改现有的功能,所以比较优雅的方案应该是增加一个工厂函数生成创建一个channel,做法如下: 
    在Channel.h增加两个函数

        /**
         * 以非阻塞的方法创建Channel
         * author: panxianzhan
         * @param timeout 最大等待事件,为NULL时采用阻塞方式打开
         */
        explicit Channel(const std::string &host,
            int port,
            const std::string &username,
            const std::string &password,
            const std::string &vhost,
            int frame_max,
            timeval*
            );
    
        /**
         * 工厂方法
         * 以非阻塞的方法创建Channel
         * author: panxianzhan
         * @param timeout 最大等待事件,为NULL时采用阻塞方式打开
         */
        static ptr_t CreateNoBlock(const std::string &host = "127.0.0.1",
            int port = 5672,
            const std::string &username = "guest",
            const std::string &password = "guest",
            const std::string &vhost = "/",
            int frame_max = 131072,
            timeval* timeout = NULL)
        {
            return boost::make_shared<Channel>(host, port, username, password, vhost, frame_max, timeout);
        }

    然后在Channel.cpp实现

    Channel::Channel(const std::string &host,
                     int port,
                     const std::string &username,
                     const std::string &password,
                     const std::string &vhost,
                     int frame_max,
                     timeval* timeout) :
        m_impl(new Detail::ChannelImpl)
    {
        m_impl->m_connection = amqp_new_connection();
    
        if (NULL == m_impl->m_connection)
        {
            throw std::bad_alloc();
        }
    
        try
        {
            amqp_socket_t *socket = amqp_tcp_socket_new(m_impl->m_connection);
            int sock = amqp_socket_open_noblock(socket, host.c_str(), port, timeout);
            }
    
            //如果连接超时,下面这一行就会抛出异常
            m_impl->CheckForError(sock);
    
            m_impl->DoLogin(username, password, vhost, frame_max);
        }
        catch (...)
        {
            amqp_destroy_connection(m_impl->m_connection);
            throw;
        }
    
        m_impl->SetIsConnected(true);
    }

    使用例子如下:

    int main()
    {
        timeval tv = {0};
        tv.tv_usec = 200 * 1000; //等待200毫秒
        try 
        {
            Channel::ptr_t channel = Channel::CreateNoBlock(
            "127.0.0.1", 5567"guest""guest""/", 131072, &tv);
            ...
            ...
        } catch (AmqpLibraryException& ex)
        {
            //提示连接失败;
        }
        return 0;
    }
  • 相关阅读:
    The model backing the 'XXX' context has changed 错误
    MVC5+EF6 入门完整教程四
    MVC5 + EF6 完整入门教程三
    MVC5 + EF6 入门完整教程二
    每日总结9.11
    setTextColor的几个注意事项
    selector使用注意事项
    每日总结9.9
    android popWindow使用注意事项
    有关TextView的drawaleTop属性
  • 原文地址:https://www.cnblogs.com/Lijcyy/p/9045895.html
Copyright © 2011-2022 走看看